1
//! Channel tasks of the arti-relay.
2
//!
3
//! The tasks are:
4
//!     * [`ChannelHouseKeepingTask`] which is in charge of regularly going over existing channels
5
//!       to clean up expiring ones and prune duplicates. At the start, it will run in
6
//!       [`ChannelHouseKeepingTask::START_TICK_TIME`] seconds and then the channel expiry function
7
//!       tells it how much time to sleep.
8
//!     * [`update_create_request_handler_netparams`] which is in charge of updating the
9
//!       circuit-related network parameters of the [`CreateRequestHandler`], which is shared with
10
//!       the [`ChanMgr`].
11

            
12
use anyhow::Context;
13
use futures::StreamExt as _;
14
use std::{
15
    sync::{Arc, Weak},
16
    time::Duration,
17
};
18
use tracing::debug;
19

            
20
use tor_chanmgr::ChanMgr;
21
use tor_error::{into_internal, warn_report};
22
use tor_netdir::params::NetParameters;
23
use tor_netdir::{DirEvent, NetDirProvider};
24
use tor_proto::ccparams::{
25
    CongestionWindowParamsBuilder, FixedWindowParamsBuilder, RoundTripEstimatorParamsBuilder,
26
    VegasParamsBuilder,
27
};
28
use tor_proto::relay::{CircNetParameters, CongestionControlNetParams, CreateRequestHandler};
29
use tor_proto::{CellCount, FlowCtrlParameters};
30
use tor_rtcompat::Runtime;
31
use tor_units::Percentage;
32

            
33
/// Channel housekeeping standalone background task.
34
pub(crate) struct ChannelHouseKeepingTask<R: Runtime> {
35
    /// Weak reference to the ChanMgr. If it disappears, task stops.
36
    mgr: Weak<ChanMgr<R>>,
37
}
38

            
39
impl<R: Runtime> ChannelHouseKeepingTask<R> {
40
    /// Starting tick time is to run in 3 minutes. The ChanMgr expire channels uses a default of
41
    /// 180 seconds and so we simply align with that for the first run. After that, the channel
42
    /// subsystems will tell us how long to wait if any channels.
43
    const START_TICK_TIME: Duration = Duration::from_secs(180);
44

            
45
    /// Constructor.
46
    pub(crate) fn new(mgr: &Arc<ChanMgr<R>>) -> Self {
47
        Self {
48
            mgr: Arc::downgrade(mgr),
49
        }
50
    }
51

            
52
    /// Run the background task.
53
    #[allow(clippy::unused_async)] // TODO(relay)
54
    async fn run(&mut self) -> anyhow::Result<Duration> {
55
        let mgr = Weak::upgrade(&self.mgr).context("Channel manager is gone")?;
56
        // Expire any channels that are possibly closing.
57
        let next_expiry = mgr.expire_channels();
58

            
59
        // TODO: Another action is to prune duplicate channels like C-tor does in
60
        // channel_update_bad_for_new_circs().
61
        Ok(next_expiry)
62
    }
63

            
64
    /// Start the task.
65
    pub(crate) async fn start(&mut self) -> anyhow::Result<void::Void> {
66
        let mut next_tick_in = Self::START_TICK_TIME;
67
        debug!("Channel housekeeping task starting.");
68
        loop {
69
            // Sleep until next tick.
70
            tokio::time::sleep(next_tick_in).await;
71
            // Run this task. The returned value is the next tick.
72
            next_tick_in = self
73
                .run()
74
                .await
75
                .context("Shutting down channel housekeeping task")?;
76
        }
77
    }
78
}
79

            
80
/// A task which waits for new consensus documents and updates the parameters
81
/// for a [`CreateRequestHandler`].
82
pub(crate) async fn update_create_request_handler_netparams(
83
    create_request_handler: Arc<CreateRequestHandler>,
84
    netdir: Arc<dyn NetDirProvider>,
85
) -> anyhow::Result<void::Void> {
86
    /// A helper to call [`build_circ_net_params()`] and log errors.
87
    fn build_helper(params: &NetParameters) -> Option<CircNetParameters> {
88
        match build_circ_net_params(params) {
89
            Ok(params) => Some(params),
90
            Err(e) => {
91
                // This is weird, but probably not worth shutting down the relay for.
92
                // Let's ignore this and hope that a future consensus is better.
93
                warn_report!(e, "Could not build circuit params for latest consensus");
94
                None
95
            }
96
        }
97
    }
98

            
99
    let mut consensus_events = netdir
100
        .events()
101
        .filter(|ev| std::future::ready(matches!(ev, DirEvent::NewConsensus)));
102

            
103
    // We do this after subscribing with `events()` above
104
    // so that we don't miss any changes.
105
    // https://gitlab.torproject.org/tpo/core/arti/-/issues/2420
106
    if let Some(params) = build_helper(netdir.params().as_ref().as_ref()) {
107
        create_request_handler.update_params(params);
108
    }
109

            
110
    // Loop forever waiting for consensus updates.
111
    loop {
112
        let _event = consensus_events
113
            .next()
114
            .await
115
            .context("netdir consensus event stream ended unexpectedly")?;
116

            
117
        let Some(params) = build_helper(netdir.params().as_ref().as_ref()) else {
118
            continue;
119
        };
120

            
121
        // Update the handler with the latest parameters.
122
        create_request_handler.update_params(params);
123
    }
124
}
125

            
126
/// Build a [`CircNetParameters`] from a [`NetParameters`].
127
///
128
/// This should just copy values from the network status into a `CircNetParameters`,
129
/// which is how a [`CreateRequestHandler`] accepts network parameters for building circuits.
130
// TODO: This shares a bunch of code with `exit_circparams_from_netparams()`.
131
// It would be nice if we could simplify/dedup this somehow.
132
pub(crate) fn build_circ_net_params(params: &NetParameters) -> anyhow::Result<CircNetParameters> {
133
    // TODO(arti#2442): The builder pattern throughout seems like a footgun.
134

            
135
    let vegas_exit_params = (
136
        params.cc_vegas_alpha_exit.into(),
137
        params.cc_vegas_beta_exit.into(),
138
        params.cc_vegas_delta_exit.into(),
139
        params.cc_vegas_gamma_exit.into(),
140
        params.cc_vegas_sscap_exit.into(),
141
    );
142

            
143
    let vegas_exit = VegasParamsBuilder::default()
144
        .cell_in_queue_params(vegas_exit_params.into())
145
        .ss_cwnd_max(params.cc_ss_max.into())
146
        .cwnd_full_gap(params.cc_cwnd_full_gap.into())
147
        .cwnd_full_min_pct(Percentage::new(
148
            params.cc_cwnd_full_minpct.as_percent().get() as u32,
149
        ))
150
        .cwnd_full_per_cwnd(params.cc_cwnd_full_per_cwnd.into())
151
        .build()
152
        .map_err(into_internal!("Unable to build VegasParams"))?;
153

            
154
    let fixed_window = FixedWindowParamsBuilder::default()
155
        .circ_window_start(params.circuit_window.get() as u16)
156
        .circ_window_min(params.circuit_window.lower() as u16)
157
        .circ_window_max(params.circuit_window.upper() as u16)
158
        .build()
159
        .map_err(into_internal!("Unable to build FixedWindowParams"))?;
160

            
161
    let cwnd = CongestionWindowParamsBuilder::default()
162
        .cwnd_init(params.cc_cwnd_init.into())
163
        .cwnd_inc_pct_ss(Percentage::new(
164
            params.cc_cwnd_inc_pct_ss.as_percent().get() as u32,
165
        ))
166
        .cwnd_inc(params.cc_cwnd_inc.into())
167
        .cwnd_inc_rate(params.cc_cwnd_inc_rate.into())
168
        .cwnd_min(params.cc_cwnd_min.into())
169
        .cwnd_max(params.cc_cwnd_max.into())
170
        .sendme_inc(params.cc_sendme_inc.into())
171
        .build()
172
        .map_err(into_internal!("Unable to build CongestionWindowParams"))?;
173

            
174
    let rtt = RoundTripEstimatorParamsBuilder::default()
175
        .ewma_cwnd_pct(Percentage::new(
176
            params.cc_ewma_cwnd_pct.as_percent().get() as u32
177
        ))
178
        .ewma_max(params.cc_ewma_max.into())
179
        .ewma_ss_max(params.cc_ewma_ss.into())
180
        .rtt_reset_pct(Percentage::new(
181
            params.cc_rtt_reset_pct.as_percent().get() as u32
182
        ))
183
        .build()
184
        .map_err(into_internal!("Unable to build RoundTripEstimatorParams"))?;
185

            
186
    let flow_ctrl = FlowCtrlParameters {
187
        cc_xoff_client: CellCount::new(params.cc_xoff_client.get_u32()),
188
        cc_xoff_exit: CellCount::new(params.cc_xoff_exit.get_u32()),
189
        cc_xon_rate: CellCount::new(params.cc_xon_rate.get_u32()),
190
        cc_xon_change_pct: params.cc_xon_change_pct.get_u32(),
191
        cc_xon_ewma_cnt: params.cc_xon_ewma_cnt.get_u32(),
192
    };
193

            
194
    let cc = CongestionControlNetParams {
195
        fixed_window,
196
        vegas_exit,
197
        cwnd,
198
        rtt,
199
        flow_ctrl,
200
    };
201

            
202
    Ok(CircNetParameters {
203
        extend_by_ed25519_id: params.extend_by_ed25519_id.into(),
204
        cc,
205
    })
206
}