1
//! Circuit reactor's stream XON/XOFF flow control.
2
//!
3
//! ## Notes on consensus parameters
4
//!
5
//! ### `cc_xoff_client`
6
//!
7
//! This is the number of bytes that we buffer within a [`DataStream`]. The actual total number of
8
//! bytes buffered can be *much* larger. For example there will be additional buffering:
9
//!
10
//! - Within the arti socks/http proxy: Arti's proxy code needs to read some bytes from the stream, store
11
//!   it in a temporary buffer, then write the buffer to the socket. If the socket would block, the
12
//!   data would remain in that temporary buffer. In practice arti uses only a small byte buffer (APP_STREAM_BUF_LEN) at
13
//!   the time of writing, which is hopefully negligible. See `arti::socks::copy_interactive()`.
14
//! - Within the kernel: There are two additional buffers that will store stream data before the
15
//!   application connected over socks will see the data: Arti's socket send buffer and the
16
//!   application's socket receive buffer. If the application were to stop reading from its socket,
17
//!   stream data would accumulate first in the socket's receive buffer. Once full, stream data
18
//!   would accumulate in arti's socket's send buffer. This can become relatively large, especially
19
//!   with buffer autotuning enabled. On a Linux 6.15 system with curl downloading a large file and
20
//!   stopping mid-download, the receive buffer was 6,116,738 bytes and the send buffer was
21
//!   2,631,062 bytes. This sums to around 8.7 MB of stream data buffered in the kernel, which is
22
//!   significantly higher than the current consensus value of `cc_xoff_client`.
23
//!   NOTE: Arti's proxy sockets now use fixed-size `DEFAULT_{SEND,RECV}_BUF_SIZE` kernel buffers.
24
//!
25
//! This means that the total number of bytes buffered before an XOFF is sent can be much larger
26
//! than `cc_xoff_client`.
27
//!
28
//! While we should take into account the kernel and arti socks buffering above, we also need to
29
//! keep in mind that arti-client is a library that can be used by others. These library users might
30
//! not do any kernel or socks buffering, for example if they write a rust program that handles the
31
//! stream data entirely within their program. We don't want to set `cc_xoff_client` too low that it
32
//! harms the performance for these users, even if it's fine for the arti socks proxy case.
33

            
34
use std::num::Saturating;
35
use std::sync::Arc;
36

            
37
use postage::watch;
38
use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
39
use tor_cell::relaycell::msg::AnyRelayMsg;
40
use tor_cell::relaycell::{RelayCmd, RelayMsg, UnparsedRelayMsg};
41
use tracing::trace;
42

            
43
use super::reader::DrainRateRequest;
44

            
45
use crate::stream::flow_ctrl::params::{CellCount, FlowCtrlParameters};
46
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, HalfStreamFlowCtrlHooks, StreamRateLimit};
47
use crate::util::notify::NotifySender;
48
use crate::{Error, Result};
49

            
50
#[cfg(doc)]
51
use {crate::client::stream::DataStream, crate::stream::flow_ctrl::state::StreamFlowCtrl};
52

            
53
/// State for XON/XOFF flow control.
54
#[derive(Debug)]
55
pub(crate) struct XonXoffFlowCtrl {
56
    /// Consensus parameters.
57
    params: Arc<FlowCtrlParameters>,
58
    /// How we communicate rate limit updates to the
59
    /// [`DataWriter`](crate::client::stream::DataWriter).
60
    rate_limit_updater: watch::Sender<StreamRateLimit>,
61
    /// How we communicate requests for new drain rate updates to the
62
    /// [`XonXoffReader`](crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReader).
63
    drain_rate_requester: NotifySender<DrainRateRequest>,
64
    /// The last rate limit we sent.
65
    last_sent_xon_xoff: Option<XonXoffMsg>,
66
    /// The buffer limit at which we should send an XOFF.
67
    ///
68
    /// In prop324 it says that this will be either `cc_xoff_client` or `cc_xoff_exit` depending on
69
    /// whether we're a client/hs or exit, but we deviate from the spec here (see how it is set
70
    /// below).
71
    xoff_limit: CellCount<{ tor_cell::relaycell::PAYLOAD_MAX_SIZE_ALL as u32 }>,
72
    /// DropMark sidechannel mitigations.
73
    ///
74
    /// This is only enabled if we are a client (including an onion service).
75
    //
76
    // We could use a `Box` here so that this only takes up space if sidechannel mitigations are
77
    // enabled. But `SidechannelMitigation` is (at the time of writing) only 16 bytes. We could
78
    // reconsider in the future if we add more functionality to `SidechannelMitigation`.
79
    sidechannel_mitigation: Option<SidechannelMitigation>,
80
}
81

            
82
impl XonXoffFlowCtrl {
83
    /// Returns a new xon/xoff-based state.
84
24
    pub(crate) fn new(
85
24
        params: Arc<FlowCtrlParameters>,
86
24
        use_sidechannel_mitigations: bool,
87
24
        rate_limit_updater: watch::Sender<StreamRateLimit>,
88
24
        drain_rate_requester: NotifySender<DrainRateRequest>,
89
24
    ) -> Self {
90
24
        let sidechannel_mitigation =
91
24
            use_sidechannel_mitigations.then_some(SidechannelMitigation::new());
92

            
93
        // We use the same XOFF limit regardless of if we're a client or exit.
94
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
95
24
        let xoff_limit = std::cmp::max(params.cc_xoff_client, params.cc_xoff_exit);
96

            
97
24
        Self {
98
24
            params,
99
24
            rate_limit_updater,
100
24
            drain_rate_requester,
101
24
            last_sent_xon_xoff: None,
102
24
            xoff_limit,
103
24
            sidechannel_mitigation,
104
24
        }
105
24
    }
106
}
107

            
108
impl FlowCtrlHooks for XonXoffFlowCtrl {
109
6016
    fn can_send<M: RelayMsg>(&self, _msg: &M) -> bool {
110
        // we perform rate-limiting in the `DataWriter`,
111
        // so we send any messages that made it past the `DataWriter`
112
6016
        true
113
6016
    }
114

            
115
1200
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
116
        // if sidechannel mitigations are enabled and this is a RELAY_DATA message,
117
        // notify that we sent a data message
118
1200
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
119
1200
            if let AnyRelayMsg::Data(data_msg) = msg {
120
1200
                sidechannel_mitigation.sent_stream_data(data_msg.as_ref().len());
121
1200
            }
122
        }
123

            
124
1200
        Ok(())
125
1200
    }
126

            
127
    fn put_for_incoming_sendme(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
128
        let msg = "Stream level SENDME not allowed due to congestion control";
129
        Err(Error::CircProto(msg.into()))
130
    }
131

            
132
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
133
        let xon = msg
134
            .decode::<Xon>()
135
            .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
136
            .into_msg();
137

            
138
        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
139
        // > violation.
140
        if *xon.version() != 0 {
141
            return Err(Error::CircProto("Unrecognized XON version".into()));
142
        }
143

            
144
        // if sidechannel mitigations are enabled, notify that an XON was received
145
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
146
            sidechannel_mitigation.received_xon(&self.params)?;
147
        }
148

            
149
        trace!("Received an XON with rate {}", xon.kbps_ewma());
150

            
151
        let rate = match xon.kbps_ewma() {
152
            XonKbpsEwma::Limited(rate_kbps) => {
153
                let rate_kbps = u64::from(rate_kbps.get());
154
                // convert from kbps to bytes/s
155
                StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
156
            }
157
            XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
158
        };
159

            
160
        *self.rate_limit_updater.borrow_mut() = rate;
161
        Ok(())
162
    }
163

            
164
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
165
        let xoff = msg
166
            .decode::<Xoff>()
167
            .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
168
            .into_msg();
169

            
170
        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
171
        // > violation.
172
        if *xoff.version() != 0 {
173
            return Err(Error::CircProto("Unrecognized XOFF version".into()));
174
        }
175

            
176
        // if sidechannel mitigations are enabled, notify that an XOFF was received
177
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
178
            sidechannel_mitigation.received_xoff(&self.params)?;
179
        }
180

            
181
        trace!("Received an XOFF");
182

            
183
        // update the rate limit and notify the `DataWriter`
184
        *self.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
185

            
186
        Ok(())
187
    }
188

            
189
8
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
190
8
        if buffer_len as u64 > self.xoff_limit.as_bytes() {
191
            // we can't send an XON, and we should have already sent an XOFF when the queue first
192
            // exceeded the limit (see `maybe_send_xoff()`)
193
4
            debug_assert!(matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)));
194

            
195
            // inform the stream reader that we need a new drain rate
196
4
            self.drain_rate_requester.notify();
197
4
            return Ok(None);
198
4
        }
199

            
200
4
        self.last_sent_xon_xoff = Some(XonXoffMsg::Xon(rate));
201

            
202
4
        trace!("Want to send an XON with rate {rate}");
203

            
204
4
        Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
205
8
    }
206

            
207
168
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
208
        // if the last XON/XOFF we sent was an XOFF, no need to send another
209
96
        if matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)) {
210
72
            return Ok(None);
211
96
        }
212

            
213
96
        if buffer_len as u64 <= self.xoff_limit.as_bytes() {
214
88
            return Ok(None);
215
8
        }
216

            
217
        // either we have never sent an XOFF or XON, or we last sent an XON
218

            
219
        // remember that we last sent an XOFF
220
8
        self.last_sent_xon_xoff = Some(XonXoffMsg::Xoff);
221

            
222
        // inform the stream reader that we need a new drain rate
223
8
        self.drain_rate_requester.notify();
224

            
225
8
        trace!("Want to send an XOFF");
226

            
227
8
        Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
228
168
    }
229

            
230
16
    fn inbound_queue_max_len(&self) -> usize {
231
        // Congestion control doesn't have an upper limit for the number of in-flight
232
        // cells that the other end might send,
233
        // so we need to expect any number of cells on this stream.
234
        //
235
        // Since dealing with mpsc queues that may be bounded or unbounded is a pain (requires a
236
        // bunch of enum wrappers), we'll set a very high bound.
237
        // This bound should be high enough that we'll never reach it in practice
238
        // (and if we do, it's surely a bug or an attack),
239
        // but not too high as to cause `futures_channel::mpsc::channel()` to panic.
240
        //
241
        // Here we choose a max of 2_000_000 messages,
242
        // which is approx 1000 MB of stream data (assuming packed cells).
243
        //
244
        // TODO(arti#2540): We should use an unbounded queue for XON/XOFF flow control,
245
        // and should return `None` here.
246
16
        2_000_000
247
16
    }
248
}
249

            
250
/// State for XON/XOFF flow control on a half-stream.
251
#[derive(Debug)]
252
pub(crate) struct HalfStreamXonXoffFlowCtrl {
253
    /// The original [`XonXoffFlowCtrl`] from the full stream.
254
    ///
255
    /// We keep this since we need to continue validating any incoming messages
256
    /// and continue applying the sidechannel mitigations.
257
    inner: XonXoffFlowCtrl,
258
}
259

            
260
impl HalfStreamXonXoffFlowCtrl {
261
    /// Returns a new xon/xoff-based state for a half-stream.
262
4
    pub(crate) fn new(flow_ctrl: XonXoffFlowCtrl) -> Self {
263
4
        Self { inner: flow_ctrl }
264
4
    }
265
}
266

            
267
impl HalfStreamFlowCtrlHooks for HalfStreamXonXoffFlowCtrl {
268
4
    fn handle_incoming_dropped(&mut self, _msg_count: u16) -> Result<()> {
269
        // Nothing to do here.
270
4
        Ok(())
271
4
    }
272

            
273
    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>> {
274
        match msg.cmd() {
275
            RelayCmd::SENDME => {
276
                self.inner.put_for_incoming_sendme(msg)?;
277
                Ok(None)
278
            }
279
            RelayCmd::XON => {
280
                self.inner.handle_incoming_xon(msg)?;
281
                Ok(None)
282
            }
283
            RelayCmd::XOFF => {
284
                self.inner.handle_incoming_xoff(msg)?;
285
                Ok(None)
286
            }
287
            // Nothing to do here.
288
            _ => Ok(Some(msg)),
289
        }
290
    }
291
}
292

            
293
/// An XON or XOFF message with no associated data.
294
#[derive(Debug, PartialEq, Eq)]
295
enum XonXoff {
296
    /// XON message.
297
    Xon,
298
    /// XOFF message.
299
    Xoff,
300
}
301

            
302
/// An XON or XOFF message with associated data.
303
#[derive(Debug)]
304
enum XonXoffMsg {
305
    /// XON message with a rate.
306
    // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
307
    // If that doesn't end up being the case, then we should remove it.
308
    #[expect(dead_code)]
309
    Xon(XonKbpsEwma),
310
    /// XOFF message.
311
    Xoff,
312
}
313

            
314
/// Sidechannel mitigations for DropMark attacks.
315
///
316
/// > In order to mitigate DropMark attacks, both XOFF and advisory XON transmission must be
317
/// > restricted.
318
///
319
/// These restrictions should be implemented for clients (OPs and onion services).
320
#[derive(Debug)]
321
struct SidechannelMitigation {
322
    /// The last rate limit update we received.
323
    last_recvd_xon_xoff: Option<XonXoff>,
324
    /// Number of sent stream bytes.
325
    ///
326
    /// C-tor has some logic to try to fit this into a 32-bit integer,
327
    /// but lets not do that unless we need to as it will make bugs more likely.
328
    bytes_sent_total: Saturating<u64>,
329
    /// The number of advisory XON messages we've received.
330
    ///
331
    /// Note: Advisory XONs are XON->XON messages, and not XOFF->XON messages.
332
    num_advisory_xon_recvd: Saturating<u64>,
333
    /// The number of XOFF messages we've received.
334
    num_xoff_recvd: Saturating<u64>,
335
}
336

            
337
impl SidechannelMitigation {
338
    /// A new [`SidechannelMitigation`].
339
60
    fn new() -> Self {
340
60
        Self {
341
60
            last_recvd_xon_xoff: None,
342
60
            bytes_sent_total: Saturating(0),
343
60
            num_advisory_xon_recvd: Saturating(0),
344
60
            num_xoff_recvd: Saturating(0),
345
60
        }
346
60
    }
347

            
348
    /// A (likely underestimated) guess of the XOFF limit that the other endpoint is using.
349
92
    fn peer_xoff_limit_bytes(params: &FlowCtrlParameters) -> u64 {
350
        // We need to consider that `xoff_client` and `xoff_exit` may be different, we don't know
351
        // here exactly what kind of peer we're connected to, and that we may have a different view
352
        // of the consensus than the peer.
353
        // We deviate from prop324 here and use a more relaxed threshold.
354
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
355
92
        let min = std::cmp::min(
356
92
            params.cc_xoff_client.as_bytes(),
357
92
            params.cc_xoff_exit.as_bytes(),
358
        );
359
92
        min / 2
360
92
    }
361

            
362
    /// A (likely underestimated) guess of the advisory XON limit that the other endpoint is using.
363
42
    fn peer_xon_limit_bytes(params: &FlowCtrlParameters) -> u64 {
364
        // We need to consider that we may have a different view of the consensus than the peer.
365
        // We deviate from prop324 here and use a more relaxed threshold.
366
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
367
42
        params.cc_xon_rate.as_bytes() / 2
368
42
    }
369

            
370
    /// Notify that we have sent stream data.
371
1236
    fn sent_stream_data(&mut self, stream_bytes: usize) {
372
        // perform a saturating conversion to u64
373
1236
        let stream_bytes: u64 = stream_bytes.try_into().unwrap_or(u64::MAX);
374
1236
        self.bytes_sent_total += stream_bytes;
375
1236
    }
376

            
377
    /// Notify that we have received an XON message.
378
40
    fn received_xon(&mut self, params: &FlowCtrlParameters) -> Result<()> {
379
        // Check to make sure that XON is not sent too early, for dropmark attacks. The main
380
        // sidechannel risk is early cells, but we also check to see that we did not get more XONs
381
        // than make sense for the number of bytes we sent.
382
        //
383
        // The ordering is important here. For example we first want to check if we received an
384
        // advisory XON that was too early, before we check if we received the advisory XON too
385
        // frequently.
386

            
387
        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
388
        // is the most important check so we do it explicitly here first.
389
40
        if self.bytes_sent_total.0 == 0 {
390
            const MSG: &str = "Received XON before sending any data";
391
4
            return Err(Error::CircProto(MSG.into()));
392
36
        }
393

            
394
        // is this an advisory XON?
395
36
        let is_advisory = match self.last_recvd_xon_xoff {
396
            // if we last received an XON, then this is advisory since we are already sending data
397
12
            Some(XonXoff::Xon) => true,
398
            // if we last received an XOFF, then this isn't advisory since we're being asked to
399
            // resume sending data
400
16
            Some(XonXoff::Xoff) => false,
401
            // if we never received an XON nor XOFF, then this is advisory since we are already
402
            // sending data
403
8
            None => true,
404
        };
405

            
406
        // set this before we possibly return early below, since this must be set regardless of if
407
        // it's an advisory XON or not
408
36
        self.last_recvd_xon_xoff = Some(XonXoff::Xon);
409

            
410
        // we only restrict advisory XON messages
411
36
        if !is_advisory {
412
16
            return Ok(());
413
20
        }
414

            
415
20
        self.num_advisory_xon_recvd += 1;
416

            
417
        // > Clients also SHOULD ensure that advisory XONs do not arrive before the minimum of the
418
        // > XOFF limit and 'cc_xon_rate' full cells worth of bytes have been transmitted.
419
        //
420
        // NOTE: We use a more relaxed threshold for the XON and XOFF limits than in prop324.
421
20
        let advisory_not_expected_before = std::cmp::min(
422
20
            Self::peer_xoff_limit_bytes(params),
423
20
            Self::peer_xon_limit_bytes(params),
424
        );
425
20
        if self.bytes_sent_total.0 < advisory_not_expected_before {
426
            const MSG: &str = "Received advisory XON too early";
427
2
            return Err(Error::CircProto(MSG.into()));
428
18
        }
429

            
430
        // > Clients SHOULD ensure that advisory XONs do not arrive more frequently than every
431
        // > 'cc_xon_rate' cells worth of sent data.
432
        //
433
        // It should be an error if
434
        //   XON frequency > 1/peer_xon_limit_bytes
435
        // where
436
        //   XON frequency = num_advisory_xon_recvd/bytes_sent_total
437
        //
438
        // so
439
        //   num_advisory_xon_recvd/bytes_sent_total > 1/peer_xon_limit_bytes
440
        //
441
        // or to better work with integers
442
        //   num_advisory_xon_recvd > bytes_sent_total/peer_xon_limit_bytes
443
        //
444
        // NOTE: We use a more relaxed threshold for the XON limit than in prop324.
445
18
        let peer_xon_limit_bytes = Self::peer_xon_limit_bytes(params);
446
18
        if peer_xon_limit_bytes != 0
447
18
            && self.num_advisory_xon_recvd.0 > self.bytes_sent_total.0 / peer_xon_limit_bytes
448
        {
449
            const MSG: &str = "Received advisory XON too frequently";
450
6
            return Err(Error::CircProto(MSG.into()));
451
12
        }
452

            
453
12
        Ok(())
454
40
    }
455

            
456
    /// Notify that we have received an XOFF message.
457
48
    fn received_xoff(&mut self, params: &FlowCtrlParameters) -> Result<()> {
458
        // Check to make sure that XOFF is not sent too early, for dropmark attacks. The
459
        // main sidechannel risk is early cells, but we also check to make sure that we have not
460
        // received more XOFFs than could have been generated by the bytes we sent.
461
        //
462
        // The ordering is important here. For example we first want to disallow consecutive XOFFs,
463
        // then check if we received an XOFF that was too early, and finally check if we received
464
        // the XOFF too frequently.
465

            
466
48
        self.num_xoff_recvd += 1;
467

            
468
        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
469
        // is the most important check so we do it explicitly here first.
470
48
        if self.bytes_sent_total.0 == 0 {
471
            const MSG: &str = "Received XOFF before sending any data";
472
4
            return Err(Error::CircProto(MSG.into()));
473
44
        }
474

            
475
        // disallow consecutive XOFF messages
476
44
        if self.last_recvd_xon_xoff == Some(XonXoff::Xoff) {
477
            const MSG: &str = "Received consecutive XOFF messages";
478
8
            return Err(Error::CircProto(MSG.into()));
479
36
        }
480

            
481
        // > clients MUST ensure that an XOFF does not arrive before it has sent the appropriate
482
        // > XOFF limit of bytes on a stream ('cc_xoff_exit' for exits, 'cc_xoff_client' for
483
        // > onions).
484
        //
485
        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
486
36
        if self.bytes_sent_total.0 < Self::peer_xoff_limit_bytes(params) {
487
            const MSG: &str = "Received XOFF too early";
488
4
            return Err(Error::CircProto(MSG.into()));
489
32
        }
490

            
491
        // > Clients also SHOULD ensure than XOFFs do not arrive more frequently than every XOFF
492
        // > limit worth of sent data.
493
        //
494
        // It should be an error if
495
        //   XOFF frequency > 1/peer_xoff_limit_bytes
496
        // where
497
        //   XOFF frequency = num_xoff_recvd/bytes_sent_total
498
        //
499
        // so
500
        //   num_xoff_recvd/bytes_sent_total > 1/peer_xoff_limit_bytes
501
        //
502
        // or to better work with integers
503
        //   num_xoff_recvd > bytes_sent_total/peer_xoff_limit_bytes
504
        //
505
        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
506
32
        let peer_xoff_limit_bytes = Self::peer_xoff_limit_bytes(params);
507
32
        if peer_xoff_limit_bytes != 0
508
32
            && self.num_xoff_recvd.0 > self.bytes_sent_total.0 / peer_xoff_limit_bytes
509
        {
510
4
            return Err(Error::CircProto("Received XOFF too frequently".into()));
511
28
        }
512

            
513
28
        self.last_recvd_xon_xoff = Some(XonXoff::Xoff);
514

            
515
28
        Ok(())
516
48
    }
517
}
518

            
519
#[cfg(test)]
520
mod test {
521
    use super::*;
522

            
523
    use crate::stream::flow_ctrl::params::CellCount;
524

            
525
    #[test]
526
    fn sidechannel_mitigation() {
527
        let params = [
528
            FlowCtrlParameters {
529
                cc_xoff_client: CellCount::new(2),
530
                cc_xoff_exit: CellCount::new(4),
531
                cc_xon_rate: CellCount::new(8),
532
                cc_xon_change_pct: 1,
533
                cc_xon_ewma_cnt: 1,
534
            },
535
            FlowCtrlParameters {
536
                cc_xoff_client: CellCount::new(8),
537
                cc_xoff_exit: CellCount::new(4),
538
                cc_xon_rate: CellCount::new(2),
539
                cc_xon_change_pct: 1,
540
                cc_xon_ewma_cnt: 1,
541
            },
542
        ];
543

            
544
        for params in params {
545
            let xon_limit = SidechannelMitigation::peer_xon_limit_bytes(&params);
546
            let xoff_limit = SidechannelMitigation::peer_xoff_limit_bytes(&params);
547

            
548
            let mut x = SidechannelMitigation::new();
549
            // cannot receive XON as first message
550
            assert!(x.received_xon(&params).is_err());
551

            
552
            let mut x = SidechannelMitigation::new();
553
            // cannot receive XOFF as first message
554
            assert!(x.received_xoff(&params).is_err());
555

            
556
            let mut x = SidechannelMitigation::new();
557
            // cannot receive XOFF after sending fewer than `xoff_limit` bytes
558
            x.sent_stream_data(xoff_limit as usize - 1);
559
            assert!(x.received_xoff(&params).is_err());
560

            
561
            let mut x = SidechannelMitigation::new();
562
            // can receive XOFF after sending `xoff_limit` bytes
563
            x.sent_stream_data(xoff_limit as usize);
564
            assert!(x.received_xoff(&params).is_ok());
565
            // but cannot receive another XOFF immediately after
566
            assert!(x.received_xoff(&params).is_err());
567

            
568
            let mut x = SidechannelMitigation::new();
569
            // can receive XOFF after sending `xoff_limit` bytes
570
            x.sent_stream_data(xoff_limit as usize);
571
            assert!(x.received_xoff(&params).is_ok());
572
            // but cannot receive another XOFF even after sending another `xoff_limit` bytes
573
            x.sent_stream_data(xoff_limit as usize);
574
            assert!(x.received_xoff(&params).is_err());
575

            
576
            let mut x = SidechannelMitigation::new();
577
            // can receive XOFF after sending `xoff_limit` bytes
578
            x.sent_stream_data(xoff_limit as usize);
579
            assert!(x.received_xoff(&params).is_ok());
580
            // and can immediately receive an XON
581
            assert!(x.received_xon(&params).is_ok());
582
            // and can receive another XOFF after sending another `xoff_limit` bytes
583
            x.sent_stream_data(xoff_limit as usize);
584
            assert!(x.received_xoff(&params).is_ok());
585

            
586
            let mut x = SidechannelMitigation::new();
587
            // cannot receive XON after sending fewer than `xon_limit` bytes
588
            x.sent_stream_data(xon_limit as usize - 1);
589
            assert!(x.received_xon(&params).is_err());
590

            
591
            let mut x = SidechannelMitigation::new();
592
            // can receive XON after sending a large number of bytes
593
            x.sent_stream_data(xon_limit as usize * 3);
594
            assert!(x.received_xon(&params).is_ok());
595
            // and can immediately receive another XON
596
            assert!(x.received_xon(&params).is_ok());
597
            // and can immediately receive another XON
598
            assert!(x.received_xon(&params).is_ok());
599
            // but cannot receive another XON immediately after
600
            assert!(x.received_xon(&params).is_err());
601

            
602
            let mut x = SidechannelMitigation::new();
603
            // can receive XOFF after sending a large number of bytes
604
            x.sent_stream_data(xoff_limit as usize * 3);
605
            assert!(x.received_xoff(&params).is_ok());
606
            // and can immediately receive an XON
607
            assert!(x.received_xon(&params).is_ok());
608
            // and can immediately receive an XOFF
609
            assert!(x.received_xoff(&params).is_ok());
610
            // and can immediately receive an XON
611
            assert!(x.received_xon(&params).is_ok());
612
            // and can immediately receive an XOFF
613
            assert!(x.received_xoff(&params).is_ok());
614
            // and can immediately receive an XON
615
            assert!(x.received_xon(&params).is_ok());
616
            // but cannot immediately receive an XOFF
617
            assert!(x.received_xoff(&params).is_err());
618
        }
619
    }
620
}