1
//! Congestion control subsystem.
2
//!
3
//! This object is attached to a circuit hop (CircHop) and controls the logic for the congestion
4
//! control support of the Tor Network. It also manages the circuit level SENDME logic which is
5
//! part of congestion control.
6
//!
7
//! # Implementation
8
//!
9
//! The basics of this subsystem is that it is notified when a DATA cell is received or sent. This
10
//! in turn updates the congestion control state so that the very important
11
//! [`can_send`](CongestionControl::can_send) function be accurate to decide if a DATA cell can be
12
//! sent or not.
13
//!
14
//! Any part of the arti code that wants to send a DATA cell on the wire needs to call
15
//! [`can_send`](CongestionControl::can_send) before else we'll risk leaving the circuit in a
16
//! protocol violation state.
17
//!
18
//! Furthermore, as we receive and emit SENDMEs, it also has entry point for those two events in
19
//! order to update the state.
20

            
21
#[cfg(test)]
22
pub(crate) mod test_utils;
23

            
24
mod fixed;
25
pub mod params;
26
mod rtt;
27
pub(crate) mod sendme;
28
mod vegas;
29

            
30
use crate::{Error, Result};
31

            
32
use self::{
33
    params::{Algorithm, CongestionControlParams, CongestionWindowParams},
34
    rtt::{ClockStall, RoundtripTimeEstimator},
35
    sendme::SendmeValidator,
36
};
37
use tor_cell::relaycell::msg::SendmeTag;
38
use tor_rtcompat::{DynTimeProvider, SleepProvider};
39

            
40
/// This trait defines what a congestion control algorithm must implement in order to interface
41
/// with the circuit reactor.
42
///
43
/// Note that all functions informing the algorithm, as in not getters, return a Result meaning
44
/// that on error, it means we can't recover or that there is a protocol violation. In both
45
/// cases, the circuit MUST be closed.
46
pub(crate) trait CongestionControlAlgorithm: Send + std::fmt::Debug {
47
    /// Return true iff this algorithm uses stream level SENDMEs.
48
    fn uses_stream_sendme(&self) -> bool;
49
    /// Return true iff this algorithm uses stream level XON/XOFFs.
50
    fn uses_xon_xoff(&self) -> bool;
51
    /// Return true iff the next cell is expected to be a SENDME.
52
    fn is_next_cell_sendme(&self) -> bool;
53
    /// Return true iff a cell can be sent on the wire according to the congestion control
54
    /// algorithm.
55
    fn can_send(&self) -> bool;
56
    /// Return the congestion window object. The reason is returns an Option is because not all
57
    /// algorithm uses one and so we avoid acting on it if so.
58
    fn cwnd(&self) -> Option<CongestionWindow>;
59

            
60
    /// Inform the algorithm that we just got a DATA cell.
61
    ///
62
    /// Return true if a SENDME should be sent immediately or false if not.
63
    fn data_received(&mut self) -> Result<bool>;
64
    /// Inform the algorithm that we just sent a DATA cell.
65
    fn data_sent(&mut self) -> Result<()>;
66
    /// Inform the algorithm that we've just received a SENDME.
67
    ///
68
    /// This is a core function because the algorithm massively update its state when receiving a
69
    /// SENDME by using the RTT value and congestion signals.
70
    fn sendme_received(
71
        &mut self,
72
        state: &mut State,
73
        rtt: &mut RoundtripTimeEstimator,
74
        signals: CongestionSignals,
75
        clock_stall: ClockStall,
76
    ) -> Result<()>;
77
    /// Inform the algorithm that we just sent a SENDME.
78
    fn sendme_sent(&mut self) -> Result<()>;
79

            
80
    /// Return the number of in-flight cells (sent but awaiting SENDME ack).
81
    ///
82
    /// Optional, because not all algorithms track this.
83
    #[cfg(feature = "conflux")]
84
    fn inflight(&self) -> Option<u32>;
85

            
86
    /// Test Only: Return the congestion window.
87
    #[cfg(test)]
88
    fn send_window(&self) -> u32;
89

            
90
    /// Return the congestion control [`Algorithm`] implemented by this type.
91
    fn algorithm(&self) -> Algorithm;
92
}
93

            
94
/// These are congestion signals used by a congestion control algorithm to make decisions. These
95
/// signals are various states of our internals. This is not an exhaustive list.
96
#[derive(Copy, Clone)]
97
pub(crate) struct CongestionSignals {
98
    /// Indicate if the channel is blocked.
99
    pub(crate) channel_blocked: bool,
100
    /// The size of the channel outbound queue.
101
    pub(crate) channel_outbound_size: u32,
102
}
103

            
104
impl CongestionSignals {
105
    /// Constructor
106
164
    pub(crate) fn new(channel_blocked: bool, channel_outbound_size: usize) -> Self {
107
164
        Self {
108
164
            channel_blocked,
109
164
            channel_outbound_size: channel_outbound_size.saturating_add(0) as u32,
110
164
        }
111
164
    }
112
}
113

            
114
/// Congestion control state.
115
#[derive(Copy, Clone, Default)]
116
pub(crate) enum State {
117
    /// The initial state any circuit starts in. Used to gradually increase the amount of data
118
    /// being transmitted in order to converge towards to optimal capacity.
119
    #[default]
120
    SlowStart,
121
    /// Steady state representing what we think is optimal. This is always after slow start.
122
    Steady,
123
}
124

            
125
impl State {
126
    /// Return true iff this is SlowStart.
127
1220
    pub(crate) fn in_slow_start(&self) -> bool {
128
1220
        matches!(self, State::SlowStart)
129
1220
    }
130
}
131

            
132
/// A congestion window. This is generic for all algorithms but their parameters' value will differ
133
/// depending on the selected algorithm.
134
#[derive(Clone, Copy, Debug)]
135
pub(crate) struct CongestionWindow {
136
    /// Congestion window parameters from the consensus.
137
    params: CongestionWindowParams,
138
    /// The actual value of our congestion window.
139
    value: u32,
140
    /// The congestion window is full.
141
    is_full: bool,
142
}
143

            
144
impl CongestionWindow {
145
    /// Constructor taking consensus parameters.
146
412
    fn new(params: CongestionWindowParams) -> Self {
147
412
        Self {
148
412
            value: params.cwnd_init(),
149
412
            params,
150
412
            is_full: false,
151
412
        }
152
412
    }
153

            
154
    /// Decrement the window by the increment value.
155
10
    pub(crate) fn dec(&mut self) {
156
10
        self.value = self
157
10
            .value
158
10
            .saturating_sub(self.increment())
159
10
            .max(self.params.cwnd_min());
160
10
    }
161

            
162
    /// Increment the window by the increment value.
163
12
    pub(crate) fn inc(&mut self) {
164
12
        self.value = self
165
12
            .value
166
12
            .saturating_add(self.increment())
167
12
            .min(self.params.cwnd_max());
168
12
    }
169

            
170
    /// Return the current value.
171
10800
    pub(crate) fn get(&self) -> u32 {
172
10800
        self.value
173
10800
    }
174

            
175
    /// Return the expected rate for which the congestion window should be updated at.
176
    ///
177
    /// See `CWND_UPDATE_RATE` in prop324.
178
582
    pub(crate) fn update_rate(&self, state: &State) -> u32 {
179
582
        if state.in_slow_start() {
180
514
            1
181
        } else {
182
68
            (self.get() + self.increment_rate() * self.sendme_inc() / 2)
183
68
                / (self.increment_rate() * self.sendme_inc())
184
        }
185
582
    }
186

            
187
    /// Return minimum value of the congestion window.
188
158
    pub(crate) fn min(&self) -> u32 {
189
158
        self.params.cwnd_min()
190
158
    }
191

            
192
    /// Set the congestion window value with a new value.
193
28
    pub(crate) fn set(&mut self, value: u32) {
194
28
        self.value = value;
195
28
    }
196

            
197
    /// Return the increment value.
198
132
    pub(crate) fn increment(&self) -> u32 {
199
132
        self.params.cwnd_inc()
200
132
    }
201

            
202
    /// Return the rate at which we should increment the window.
203
242
    pub(crate) fn increment_rate(&self) -> u32 {
204
242
        self.params.cwnd_inc_rate()
205
242
    }
206

            
207
    /// Return true iff this congestion window is full.
208
260
    pub(crate) fn is_full(&self) -> bool {
209
260
        self.is_full
210
260
    }
211

            
212
    /// Reset the full flag meaning it is now not full.
213
26
    pub(crate) fn reset_full(&mut self) {
214
26
        self.is_full = false;
215
26
    }
216

            
217
    /// Return the number of expected SENDMEs per congestion window.
218
    ///
219
    /// Spec: prop324 SENDME_PER_CWND definition
220
286
    pub(crate) fn sendme_per_cwnd(&self) -> u32 {
221
286
        (self.get() + (self.sendme_inc() / 2)) / self.sendme_inc()
222
286
    }
223

            
224
    /// Return the RFC3742 slow start increment value.
225
    ///
226
    /// Spec: prop324 rfc3742_ss_inc definition
227
104
    pub(crate) fn rfc3742_ss_inc(&mut self, ss_cap: u32) -> u32 {
228
104
        let inc = if self.get() <= ss_cap {
229
98
            ((self.params.cwnd_inc_pct_ss().as_percent() * self.sendme_inc()) + 50) / 100
230
        } else {
231
6
            (((self.sendme_inc() * ss_cap) + self.get()) / (self.get() * 2)).max(1)
232
        };
233
104
        self.value += inc;
234
104
        inc
235
104
    }
236

            
237
    /// Evaluate the fullness of the window with the given parameters.
238
    ///
239
    /// Spec: prop324 see cwnd_is_full and cwnd_is_nonfull definition.
240
    /// C-tor: cwnd_became_full() and cwnd_became_nonfull()
241
156
    pub(crate) fn eval_fullness(&mut self, inflight: u32, full_gap: u32, full_minpct: u32) {
242
156
        if (inflight + (self.sendme_inc() * full_gap)) >= self.get() {
243
118
            self.is_full = true;
244
118
        } else if (100 * inflight) < (full_minpct * self.get()) {
245
24
            self.is_full = false;
246
24
        }
247
156
    }
248

            
249
    /// Return the SENDME increment value.
250
2718
    pub(crate) fn sendme_inc(&self) -> u32 {
251
2718
        self.params.sendme_inc()
252
2718
    }
253

            
254
    /// Return the congestion window params.
255
    #[cfg(any(test, feature = "conflux"))]
256
84
    pub(crate) fn params(&self) -> &CongestionWindowParams {
257
84
        &self.params
258
84
    }
259
}
260

            
261
/// Congestion control state of a hop on a circuit.
262
///
263
/// This controls the entire logic of congestion control and circuit level SENDMEs.
264
pub(crate) struct CongestionControl {
265
    /// Which congestion control state are we in?
266
    state: State,
267
    /// This is the SENDME validator as in it keeps track of the circuit tag found within an
268
    /// authenticated SENDME cell. It can store the tags and validate a tag against our queue of
269
    /// expected values.
270
    sendme_validator: SendmeValidator<SendmeTag>,
271
    /// The RTT estimator for the circuit we are attached on.
272
    rtt: RoundtripTimeEstimator,
273
    /// The congestion control algorithm.
274
    algorithm: Box<dyn CongestionControlAlgorithm>,
275
}
276

            
277
impl CongestionControl {
278
    /// Construct a new CongestionControl
279
1056
    pub(crate) fn new(params: &CongestionControlParams) -> Self {
280
1056
        let state = State::default();
281
        // Use what the consensus tells us to use.
282
1056
        let algorithm: Box<dyn CongestionControlAlgorithm> = match params.alg() {
283
672
            Algorithm::FixedWindow(p) => Box::new(fixed::FixedWindow::new(*p)),
284
384
            Algorithm::Vegas(p) => {
285
384
                let cwnd = CongestionWindow::new(params.cwnd_params());
286
384
                Box::new(vegas::Vegas::new(*p, &state, cwnd))
287
            }
288
        };
289
1056
        Self {
290
1056
            algorithm,
291
1056
            rtt: RoundtripTimeEstimator::new(params.rtt_params()),
292
1056
            sendme_validator: SendmeValidator::new(),
293
1056
            state,
294
1056
        }
295
1056
    }
296

            
297
    /// Return true iff the underlying algorithm uses stream level SENDMEs.
298
    /// At the moment, only FixedWindow uses it. It has been eliminated with Vegas.
299
136
    pub(crate) fn uses_stream_sendme(&self) -> bool {
300
136
        self.algorithm.uses_stream_sendme()
301
136
    }
302

            
303
    /// Return true iff the underlying algorithm uses stream level XON/XOFFs.
304
    /// At the moment, only Vegas uses it.
305
220
    pub(crate) fn uses_xon_xoff(&self) -> bool {
306
220
        self.algorithm.uses_xon_xoff()
307
220
    }
308

            
309
    /// Return true iff a DATA cell is allowed to be sent based on the congestion control state.
310
20412
    pub(crate) fn can_send(&self) -> bool {
311
20412
        self.algorithm.can_send()
312
20412
    }
313

            
314
    /// Called when a SENDME cell is received.
315
    ///
316
    /// An error is returned if there is a protocol violation with regards to congestion control.
317
44
    pub(crate) fn note_sendme_received(
318
44
        &mut self,
319
44
        runtime: &DynTimeProvider,
320
44
        tag: SendmeTag,
321
44
        signals: CongestionSignals,
322
44
    ) -> Result<()> {
323
        // This MUST be the first thing that we do that is validate the SENDME. Any error leads to
324
        // closing the circuit.
325
44
        self.sendme_validator.validate(Some(tag))?;
326

            
327
40
        let now = runtime.now();
328
        // Update our RTT estimate if the algorithm yields back a congestion window. RTT
329
        // measurements only make sense for a congestion window. For example, FixedWindow here
330
        // doesn't use it and so no need for the RTT.
331
40
        let clock_stall = if let Some(cwnd) = self.algorithm.cwnd() {
332
36
            self.rtt
333
36
                .update(now, &self.state, &cwnd)
334
36
                .map_err(|e| Error::CircProto(e.to_string()))?
335
        } else {
336
4
            ClockStall::NotDetected
337
        };
338

            
339
        // Notify the algorithm that we've received a SENDME.
340
40
        self.algorithm
341
40
            .sendme_received(&mut self.state, &mut self.rtt, signals, clock_stall)
342
44
    }
343

            
344
    /// Called when a SENDME cell is sent.
345
    pub(crate) fn note_sendme_sent(&mut self) -> Result<()> {
346
        self.algorithm.sendme_sent()
347
    }
348

            
349
    /// Called when a DATA cell is received.
350
    ///
351
    /// Returns true iff a SENDME should be sent false otherwise. An error is returned if there is
352
    /// a protocol violation with regards to flow or congestion control.
353
100
    pub(crate) fn note_data_received(&mut self) -> Result<bool> {
354
100
        self.algorithm.data_received()
355
100
    }
356

            
357
    /// Called when a DATA cell is sent.
358
    ///
359
    /// An error is returned if there is a protocol violation with regards to flow or congestion
360
    /// control.
361
4156
    pub(crate) fn note_data_sent<U>(&mut self, runtime: &DynTimeProvider, tag: &U) -> Result<()>
362
4156
    where
363
4156
        U: Clone + Into<SendmeTag>,
364
    {
365
        // Inform the algorithm that the data was just sent. This is important to be the very first
366
        // thing so the congestion window can be updated accordingly making the following calls
367
        // using the latest data.
368
4156
        self.algorithm.data_sent()?;
369

            
370
        // If next cell is a SENDME, we need to record the tag of this cell in order to validate
371
        // the next SENDME when it arrives.
372
4156
        if self.algorithm.is_next_cell_sendme() {
373
60
            self.sendme_validator.record(tag);
374
            // Only keep the SENDME timestamp if the algorithm has a congestion window.
375
60
            if self.algorithm.cwnd().is_some() {
376
36
                self.rtt.expect_sendme(runtime.now());
377
36
            }
378
4096
        }
379

            
380
4156
        Ok(())
381
4156
    }
382

            
383
    /// Return the number of in-flight cells (sent but awaiting SENDME ack).
384
    ///
385
    /// Optional, because not all algorithms track this.
386
    #[cfg(feature = "conflux")]
387
16
    pub(crate) fn inflight(&self) -> Option<u32> {
388
16
        self.algorithm.inflight()
389
16
    }
390

            
391
    /// Return the congestion window object.
392
    ///
393
    /// Optional, because not all algorithms track this.
394
    #[cfg(feature = "conflux")]
395
68
    pub(crate) fn cwnd(&self) -> Option<CongestionWindow> {
396
68
        self.algorithm.cwnd()
397
68
    }
398

            
399
    /// Return a reference to the RTT estimator.
400
2468
    pub(crate) fn rtt(&self) -> &RoundtripTimeEstimator {
401
2468
        &self.rtt
402
2468
    }
403

            
404
    /// Return the congestion control algorithm.
405
    #[cfg(feature = "conflux")]
406
60
    pub(crate) fn algorithm(&self) -> Algorithm {
407
60
        self.algorithm.algorithm()
408
60
    }
409
}
410

            
411
#[cfg(test)]
412
mod test {
413
    // @@ begin test lint list maintained by maint/add_warning @@
414
    #![allow(clippy::bool_assert_comparison)]
415
    #![allow(clippy::clone_on_copy)]
416
    #![allow(clippy::dbg_macro)]
417
    #![allow(clippy::mixed_attributes_style)]
418
    #![allow(clippy::print_stderr)]
419
    #![allow(clippy::print_stdout)]
420
    #![allow(clippy::single_char_pattern)]
421
    #![allow(clippy::unwrap_used)]
422
    #![allow(clippy::unchecked_time_subtraction)]
423
    #![allow(clippy::useless_vec)]
424
    #![allow(clippy::needless_pass_by_value)]
425
    #![allow(clippy::string_slice)] // See arti#2571
426
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
427

            
428
    use crate::congestion::test_utils::new_cwnd;
429

            
430
    use super::CongestionControl;
431
    use tor_cell::relaycell::msg::SendmeTag;
432

            
433
    impl CongestionControl {
434
        /// For testing: get a copy of the current send window, and the
435
        /// expected incoming tags.
436
        pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
437
            (
438
                self.algorithm.send_window(),
439
                self.sendme_validator.expected_tags(),
440
            )
441
        }
442
    }
443

            
444
    #[test]
445
    fn test_cwnd() {
446
        let mut cwnd = new_cwnd();
447

            
448
        // Validate the getters are coherent with initialization.
449
        assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
450
        assert_eq!(cwnd.min(), cwnd.params().cwnd_min());
451
        assert_eq!(cwnd.increment(), cwnd.params().cwnd_inc());
452
        assert_eq!(cwnd.increment_rate(), cwnd.params().cwnd_inc_rate());
453
        assert_eq!(cwnd.sendme_inc(), cwnd.params().sendme_inc());
454
        assert!(!cwnd.is_full());
455

            
456
        // Validate changes.
457
        cwnd.inc();
458
        assert_eq!(
459
            cwnd.get(),
460
            cwnd.params().cwnd_init() + cwnd.params().cwnd_inc()
461
        );
462
        cwnd.dec();
463
        assert_eq!(cwnd.get(), cwnd.params().cwnd_init());
464
    }
465
}