1
//! Circuit reactor's stream XON/XOFF flow control.
2
//!
3
//! ## Notes on consensus parameters
4
//!
5
//! ### `cc_xoff_client`
6
//!
7
//! This is the number of bytes that we buffer within a [`DataStream`]. The actual total number of
8
//! bytes buffered can be *much* larger. For example there will be additional buffering:
9
//!
10
//! - Within the arti socks/http proxy: Arti's proxy code needs to read some bytes from the stream, store
11
//!   it in a temporary buffer, then write the buffer to the socket. If the socket would block, the
12
//!   data would remain in that temporary buffer. In practice arti uses only a small byte buffer (APP_STREAM_BUF_LEN) at
13
//!   the time of writing, which is hopefully negligible. See `arti::socks::copy_interactive()`.
14
//! - Within the kernel: There are two additional buffers that will store stream data before the
15
//!   application connected over socks will see the data: Arti's socket send buffer and the
16
//!   application's socket receive buffer. If the application were to stop reading from its socket,
17
//!   stream data would accumulate first in the socket's receive buffer. Once full, stream data
18
//!   would accumulate in arti's socket's send buffer. This can become relatively large, especially
19
//!   with buffer autotuning enabled. On a Linux 6.15 system with curl downloading a large file and
20
//!   stopping mid-download, the receive buffer was 6,116,738 bytes and the send buffer was
21
//!   2,631,062 bytes. This sums to around 8.7 MB of stream data buffered in the kernel, which is
22
//!   significantly higher than the current consensus value of `cc_xoff_client`.
23
//!
24
//! This means that the total number of bytes buffered before an XOFF is sent can be much larger
25
//! than `cc_xoff_client`.
26
//!
27
//! While we should take into account the kernel and arti socks buffering above, we also need to
28
//! keep in mind that arti-client is a library that can be used by others. These library users might
29
//! not do any kernel or socks buffering, for example if they write a rust program that handles the
30
//! stream data entirely within their program. We don't want to set `cc_xoff_client` too low that it
31
//! harms the performance for these users, even if it's fine for the arti socks proxy case.
32

            
33
use std::num::Saturating;
34
use std::sync::Arc;
35

            
36
use postage::watch;
37
use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
38
use tor_cell::relaycell::msg::AnyRelayMsg;
39
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
40
use tracing::trace;
41

            
42
use super::reader::DrainRateRequest;
43

            
44
use crate::stream::flow_ctrl::params::{CellCount, FlowCtrlParameters};
45
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamRateLimit};
46
use crate::util::notify::NotifySender;
47
use crate::{Error, Result};
48

            
49
#[cfg(doc)]
50
use {crate::client::stream::DataStream, crate::stream::flow_ctrl::state::StreamFlowCtrl};
51

            
52
/// State for XON/XOFF flow control.
53
#[derive(Debug)]
54
pub(crate) struct XonXoffFlowCtrl {
55
    /// Consensus parameters.
56
    params: Arc<FlowCtrlParameters>,
57
    /// How we communicate rate limit updates to the
58
    /// [`DataWriter`](crate::client::stream::DataWriter).
59
    rate_limit_updater: watch::Sender<StreamRateLimit>,
60
    /// How we communicate requests for new drain rate updates to the
61
    /// [`XonXoffReader`](crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReader).
62
    drain_rate_requester: NotifySender<DrainRateRequest>,
63
    /// The last rate limit we sent.
64
    last_sent_xon_xoff: Option<XonXoffMsg>,
65
    /// The buffer limit at which we should send an XOFF.
66
    ///
67
    /// In prop324 it says that this will be either `cc_xoff_client` or `cc_xoff_exit` depending on
68
    /// whether we're a client/hs or exit, but we deviate from the spec here (see how it is set
69
    /// below).
70
    xoff_limit: CellCount<{ tor_cell::relaycell::PAYLOAD_MAX_SIZE_ALL as u32 }>,
71
    /// DropMark sidechannel mitigations.
72
    ///
73
    /// This is only enabled if we are a client (including an onion service).
74
    //
75
    // We could use a `Box` here so that this only takes up space if sidechannel mitigations are
76
    // enabled. But `SidechannelMitigation` is (at the time of writing) only 16 bytes. We could
77
    // reconsider in the future if we add more functionality to `SidechannelMitigation`.
78
    sidechannel_mitigation: Option<SidechannelMitigation>,
79
}
80

            
81
impl XonXoffFlowCtrl {
82
    /// Returns a new xon/xoff-based state.
83
12
    pub(crate) fn new(
84
12
        params: Arc<FlowCtrlParameters>,
85
12
        use_sidechannel_mitigations: bool,
86
12
        rate_limit_updater: watch::Sender<StreamRateLimit>,
87
12
        drain_rate_requester: NotifySender<DrainRateRequest>,
88
12
    ) -> Self {
89
12
        let sidechannel_mitigation =
90
12
            use_sidechannel_mitigations.then_some(SidechannelMitigation::new());
91

            
92
        // We use the same XOFF limit regardless of if we're a client or exit.
93
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
94
12
        let xoff_limit = std::cmp::max(params.cc_xoff_client, params.cc_xoff_exit);
95

            
96
12
        Self {
97
12
            params,
98
12
            rate_limit_updater,
99
12
            drain_rate_requester,
100
12
            last_sent_xon_xoff: None,
101
12
            xoff_limit,
102
12
            sidechannel_mitigation,
103
12
        }
104
12
    }
105
}
106

            
107
impl FlowCtrlHooks for XonXoffFlowCtrl {
108
6000
    fn can_send<M: RelayMsg>(&self, _msg: &M) -> bool {
109
        // we perform rate-limiting in the `DataWriter`,
110
        // so we send any messages that made it past the `DataWriter`
111
6000
        true
112
6000
    }
113

            
114
1200
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
115
        // if sidechannel mitigations are enabled and this is a RELAY_DATA message,
116
        // notify that we sent a data message
117
1200
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
118
1200
            if let AnyRelayMsg::Data(data_msg) = msg {
119
1200
                sidechannel_mitigation.sent_stream_data(data_msg.as_ref().len());
120
1200
            }
121
        }
122

            
123
1200
        Ok(())
124
1200
    }
125

            
126
    fn put_for_incoming_sendme(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
127
        let msg = "Stream level SENDME not allowed due to congestion control";
128
        Err(Error::CircProto(msg.into()))
129
    }
130

            
131
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
132
        let xon = msg
133
            .decode::<Xon>()
134
            .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
135
            .into_msg();
136

            
137
        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
138
        // > violation.
139
        if *xon.version() != 0 {
140
            return Err(Error::CircProto("Unrecognized XON version".into()));
141
        }
142

            
143
        // if sidechannel mitigations are enabled, notify that an XON was received
144
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
145
            sidechannel_mitigation.received_xon(&self.params)?;
146
        }
147

            
148
        trace!("Received an XON with rate {}", xon.kbps_ewma());
149

            
150
        let rate = match xon.kbps_ewma() {
151
            XonKbpsEwma::Limited(rate_kbps) => {
152
                let rate_kbps = u64::from(rate_kbps.get());
153
                // convert from kbps to bytes/s
154
                StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
155
            }
156
            XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
157
        };
158

            
159
        *self.rate_limit_updater.borrow_mut() = rate;
160
        Ok(())
161
    }
162

            
163
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
164
        let xoff = msg
165
            .decode::<Xoff>()
166
            .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
167
            .into_msg();
168

            
169
        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
170
        // > violation.
171
        if *xoff.version() != 0 {
172
            return Err(Error::CircProto("Unrecognized XOFF version".into()));
173
        }
174

            
175
        // if sidechannel mitigations are enabled, notify that an XOFF was received
176
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
177
            sidechannel_mitigation.received_xoff(&self.params)?;
178
        }
179

            
180
        trace!("Received an XOFF");
181

            
182
        // update the rate limit and notify the `DataWriter`
183
        *self.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
184

            
185
        Ok(())
186
    }
187

            
188
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
189
        if buffer_len as u64 > self.xoff_limit.as_bytes() {
190
            // we can't send an XON, and we should have already sent an XOFF when the queue first
191
            // exceeded the limit (see `maybe_send_xoff()`)
192
            debug_assert!(matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)));
193

            
194
            // inform the stream reader that we need a new drain rate
195
            self.drain_rate_requester.notify();
196
            return Ok(None);
197
        }
198

            
199
        self.last_sent_xon_xoff = Some(XonXoffMsg::Xon(rate));
200

            
201
        trace!("Want to send an XON with rate {rate}");
202

            
203
        Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
204
    }
205

            
206
64
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
207
        // if the last XON/XOFF we sent was an XOFF, no need to send another
208
64
        if matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)) {
209
            return Ok(None);
210
64
        }
211

            
212
64
        if buffer_len as u64 <= self.xoff_limit.as_bytes() {
213
64
            return Ok(None);
214
        }
215

            
216
        // either we have never sent an XOFF or XON, or we last sent an XON
217

            
218
        // remember that we last sent an XOFF
219
        self.last_sent_xon_xoff = Some(XonXoffMsg::Xoff);
220

            
221
        // inform the stream reader that we need a new drain rate
222
        self.drain_rate_requester.notify();
223

            
224
        trace!("Want to send an XOFF");
225

            
226
        Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
227
64
    }
228
}
229

            
230
/// An XON or XOFF message with no associated data.
231
#[derive(Debug, PartialEq, Eq)]
232
enum XonXoff {
233
    /// XON message.
234
    Xon,
235
    /// XOFF message.
236
    Xoff,
237
}
238

            
239
/// An XON or XOFF message with associated data.
240
#[derive(Debug)]
241
enum XonXoffMsg {
242
    /// XON message with a rate.
243
    // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
244
    // If that doesn't end up being the case, then we should remove it.
245
    #[expect(dead_code)]
246
    Xon(XonKbpsEwma),
247
    /// XOFF message.
248
    Xoff,
249
}
250

            
251
/// Sidechannel mitigations for DropMark attacks.
252
///
253
/// > In order to mitigate DropMark attacks, both XOFF and advisory XON transmission must be
254
/// > restricted.
255
///
256
/// These restrictions should be implemented for clients (OPs and onion services).
257
#[derive(Debug)]
258
struct SidechannelMitigation {
259
    /// The last rate limit update we received.
260
    last_recvd_xon_xoff: Option<XonXoff>,
261
    /// Number of sent stream bytes.
262
    ///
263
    /// C-tor has some logic to try to fit this into a 32-bit integer,
264
    /// but lets not do that unless we need to as it will make bugs more likely.
265
    bytes_sent_total: Saturating<u64>,
266
    /// The number of advisory XON messages we've received.
267
    ///
268
    /// Note: Advisory XONs are XON->XON messages, and not XOFF->XON messages.
269
    num_advisory_xon_recvd: Saturating<u64>,
270
    /// The number of XOFF messages we've received.
271
    num_xoff_recvd: Saturating<u64>,
272
}
273

            
274
impl SidechannelMitigation {
275
    /// A new [`SidechannelMitigation`].
276
48
    fn new() -> Self {
277
48
        Self {
278
48
            last_recvd_xon_xoff: None,
279
48
            bytes_sent_total: Saturating(0),
280
48
            num_advisory_xon_recvd: Saturating(0),
281
48
            num_xoff_recvd: Saturating(0),
282
48
        }
283
48
    }
284

            
285
    /// A (likely underestimated) guess of the XOFF limit that the other endpoint is using.
286
92
    fn peer_xoff_limit_bytes(params: &FlowCtrlParameters) -> u64 {
287
        // We need to consider that `xoff_client` and `xoff_exit` may be different, we don't know
288
        // here exactly what kind of peer we're connected to, and that we may have a different view
289
        // of the consensus than the peer.
290
        // We deviate from prop324 here and use a more relaxed threshold.
291
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
292
92
        let min = std::cmp::min(
293
92
            params.cc_xoff_client.as_bytes(),
294
92
            params.cc_xoff_exit.as_bytes(),
295
        );
296
92
        min / 2
297
92
    }
298

            
299
    /// A (likely underestimated) guess of the advisory XON limit that the other endpoint is using.
300
42
    fn peer_xon_limit_bytes(params: &FlowCtrlParameters) -> u64 {
301
        // We need to consider that we may have a different view of the consensus than the peer.
302
        // We deviate from prop324 here and use a more relaxed threshold.
303
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
304
42
        params.cc_xon_rate.as_bytes() / 2
305
42
    }
306

            
307
    /// Notify that we have sent stream data.
308
1236
    fn sent_stream_data(&mut self, stream_bytes: usize) {
309
        // perform a saturating conversion to u64
310
1236
        let stream_bytes: u64 = stream_bytes.try_into().unwrap_or(u64::MAX);
311
1236
        self.bytes_sent_total += stream_bytes;
312
1236
    }
313

            
314
    /// Notify that we have received an XON message.
315
40
    fn received_xon(&mut self, params: &FlowCtrlParameters) -> Result<()> {
316
        // Check to make sure that XON is not sent too early, for dropmark attacks. The main
317
        // sidechannel risk is early cells, but we also check to see that we did not get more XONs
318
        // than make sense for the number of bytes we sent.
319
        //
320
        // The ordering is important here. For example we first want to check if we received an
321
        // advisory XON that was too early, before we check if we received the advisory XON too
322
        // frequently.
323

            
324
        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
325
        // is the most important check so we do it explicitly here first.
326
40
        if self.bytes_sent_total.0 == 0 {
327
            const MSG: &str = "Received XON before sending any data";
328
4
            return Err(Error::CircProto(MSG.into()));
329
36
        }
330

            
331
        // is this an advisory XON?
332
36
        let is_advisory = match self.last_recvd_xon_xoff {
333
            // if we last received an XON, then this is advisory since we are already sending data
334
12
            Some(XonXoff::Xon) => true,
335
            // if we last received an XOFF, then this isn't advisory since we're being asked to
336
            // resume sending data
337
16
            Some(XonXoff::Xoff) => false,
338
            // if we never received an XON nor XOFF, then this is advisory since we are already
339
            // sending data
340
8
            None => true,
341
        };
342

            
343
        // set this before we possibly return early below, since this must be set regardless of if
344
        // it's an advisory XON or not
345
36
        self.last_recvd_xon_xoff = Some(XonXoff::Xon);
346

            
347
        // we only restrict advisory XON messages
348
36
        if !is_advisory {
349
16
            return Ok(());
350
20
        }
351

            
352
20
        self.num_advisory_xon_recvd += 1;
353

            
354
        // > Clients also SHOULD ensure that advisory XONs do not arrive before the minimum of the
355
        // > XOFF limit and 'cc_xon_rate' full cells worth of bytes have been transmitted.
356
        //
357
        // NOTE: We use a more relaxed threshold for the XON and XOFF limits than in prop324.
358
20
        let advisory_not_expected_before = std::cmp::min(
359
20
            Self::peer_xoff_limit_bytes(params),
360
20
            Self::peer_xon_limit_bytes(params),
361
        );
362
20
        if self.bytes_sent_total.0 < advisory_not_expected_before {
363
            const MSG: &str = "Received advisory XON too early";
364
2
            return Err(Error::CircProto(MSG.into()));
365
18
        }
366

            
367
        // > Clients SHOULD ensure that advisory XONs do not arrive more frequently than every
368
        // > 'cc_xon_rate' cells worth of sent data.
369
        //
370
        // It should be an error if
371
        //   XON frequency > 1/peer_xon_limit_bytes
372
        // where
373
        //   XON frequency = num_advisory_xon_recvd/bytes_sent_total
374
        //
375
        // so
376
        //   num_advisory_xon_recvd/bytes_sent_total > 1/peer_xon_limit_bytes
377
        //
378
        // or to better work with integers
379
        //   num_advisory_xon_recvd > bytes_sent_total/peer_xon_limit_bytes
380
        //
381
        // NOTE: We use a more relaxed threshold for the XON limit than in prop324.
382
18
        let peer_xon_limit_bytes = Self::peer_xon_limit_bytes(params);
383
18
        if peer_xon_limit_bytes != 0
384
18
            && self.num_advisory_xon_recvd.0 > self.bytes_sent_total.0 / peer_xon_limit_bytes
385
        {
386
            const MSG: &str = "Received advisory XON too frequently";
387
6
            return Err(Error::CircProto(MSG.into()));
388
12
        }
389

            
390
12
        Ok(())
391
40
    }
392

            
393
    /// Notify that we have received an XOFF message.
394
48
    fn received_xoff(&mut self, params: &FlowCtrlParameters) -> Result<()> {
395
        // Check to make sure that XOFF is not sent too early, for dropmark attacks. The
396
        // main sidechannel risk is early cells, but we also check to make sure that we have not
397
        // received more XOFFs than could have been generated by the bytes we sent.
398
        //
399
        // The ordering is important here. For example we first want to disallow consecutive XOFFs,
400
        // then check if we received an XOFF that was too early, and finally check if we received
401
        // the XOFF too frequently.
402

            
403
48
        self.num_xoff_recvd += 1;
404

            
405
        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
406
        // is the most important check so we do it explicitly here first.
407
48
        if self.bytes_sent_total.0 == 0 {
408
            const MSG: &str = "Received XOFF before sending any data";
409
4
            return Err(Error::CircProto(MSG.into()));
410
44
        }
411

            
412
        // disallow consecutive XOFF messages
413
44
        if self.last_recvd_xon_xoff == Some(XonXoff::Xoff) {
414
            const MSG: &str = "Received consecutive XOFF messages";
415
8
            return Err(Error::CircProto(MSG.into()));
416
36
        }
417

            
418
        // > clients MUST ensure that an XOFF does not arrive before it has sent the appropriate
419
        // > XOFF limit of bytes on a stream ('cc_xoff_exit' for exits, 'cc_xoff_client' for
420
        // > onions).
421
        //
422
        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
423
36
        if self.bytes_sent_total.0 < Self::peer_xoff_limit_bytes(params) {
424
            const MSG: &str = "Received XOFF too early";
425
4
            return Err(Error::CircProto(MSG.into()));
426
32
        }
427

            
428
        // > Clients also SHOULD ensure than XOFFs do not arrive more frequently than every XOFF
429
        // > limit worth of sent data.
430
        //
431
        // It should be an error if
432
        //   XOFF frequency > 1/peer_xoff_limit_bytes
433
        // where
434
        //   XOFF frequency = num_xoff_recvd/bytes_sent_total
435
        //
436
        // so
437
        //   num_xoff_recvd/bytes_sent_total > 1/peer_xoff_limit_bytes
438
        //
439
        // or to better work with integers
440
        //   num_xoff_recvd > bytes_sent_total/peer_xoff_limit_bytes
441
        //
442
        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
443
32
        let peer_xoff_limit_bytes = Self::peer_xoff_limit_bytes(params);
444
32
        if peer_xoff_limit_bytes != 0
445
32
            && self.num_xoff_recvd.0 > self.bytes_sent_total.0 / peer_xoff_limit_bytes
446
        {
447
4
            return Err(Error::CircProto("Received XOFF too frequently".into()));
448
28
        }
449

            
450
28
        self.last_recvd_xon_xoff = Some(XonXoff::Xoff);
451

            
452
28
        Ok(())
453
48
    }
454
}
455

            
456
#[cfg(test)]
457
mod test {
458
    use super::*;
459

            
460
    use crate::stream::flow_ctrl::params::CellCount;
461

            
462
    #[test]
463
    fn sidechannel_mitigation() {
464
        let params = [
465
            FlowCtrlParameters {
466
                cc_xoff_client: CellCount::new(2),
467
                cc_xoff_exit: CellCount::new(4),
468
                cc_xon_rate: CellCount::new(8),
469
                cc_xon_change_pct: 1,
470
                cc_xon_ewma_cnt: 1,
471
            },
472
            FlowCtrlParameters {
473
                cc_xoff_client: CellCount::new(8),
474
                cc_xoff_exit: CellCount::new(4),
475
                cc_xon_rate: CellCount::new(2),
476
                cc_xon_change_pct: 1,
477
                cc_xon_ewma_cnt: 1,
478
            },
479
        ];
480

            
481
        for params in params {
482
            let xon_limit = SidechannelMitigation::peer_xon_limit_bytes(&params);
483
            let xoff_limit = SidechannelMitigation::peer_xoff_limit_bytes(&params);
484

            
485
            let mut x = SidechannelMitigation::new();
486
            // cannot receive XON as first message
487
            assert!(x.received_xon(&params).is_err());
488

            
489
            let mut x = SidechannelMitigation::new();
490
            // cannot receive XOFF as first message
491
            assert!(x.received_xoff(&params).is_err());
492

            
493
            let mut x = SidechannelMitigation::new();
494
            // cannot receive XOFF after sending fewer than `xoff_limit` bytes
495
            x.sent_stream_data(xoff_limit as usize - 1);
496
            assert!(x.received_xoff(&params).is_err());
497

            
498
            let mut x = SidechannelMitigation::new();
499
            // can receive XOFF after sending `xoff_limit` bytes
500
            x.sent_stream_data(xoff_limit as usize);
501
            assert!(x.received_xoff(&params).is_ok());
502
            // but cannot receive another XOFF immediately after
503
            assert!(x.received_xoff(&params).is_err());
504

            
505
            let mut x = SidechannelMitigation::new();
506
            // can receive XOFF after sending `xoff_limit` bytes
507
            x.sent_stream_data(xoff_limit as usize);
508
            assert!(x.received_xoff(&params).is_ok());
509
            // but cannot receive another XOFF even after sending another `xoff_limit` bytes
510
            x.sent_stream_data(xoff_limit as usize);
511
            assert!(x.received_xoff(&params).is_err());
512

            
513
            let mut x = SidechannelMitigation::new();
514
            // can receive XOFF after sending `xoff_limit` bytes
515
            x.sent_stream_data(xoff_limit as usize);
516
            assert!(x.received_xoff(&params).is_ok());
517
            // and can immediately receive an XON
518
            assert!(x.received_xon(&params).is_ok());
519
            // and can receive another XOFF after sending another `xoff_limit` bytes
520
            x.sent_stream_data(xoff_limit as usize);
521
            assert!(x.received_xoff(&params).is_ok());
522

            
523
            let mut x = SidechannelMitigation::new();
524
            // cannot receive XON after sending fewer than `xon_limit` bytes
525
            x.sent_stream_data(xon_limit as usize - 1);
526
            assert!(x.received_xon(&params).is_err());
527

            
528
            let mut x = SidechannelMitigation::new();
529
            // can receive XON after sending a large number of bytes
530
            x.sent_stream_data(xon_limit as usize * 3);
531
            assert!(x.received_xon(&params).is_ok());
532
            // and can immediately receive another XON
533
            assert!(x.received_xon(&params).is_ok());
534
            // and can immediately receive another XON
535
            assert!(x.received_xon(&params).is_ok());
536
            // but cannot receive another XON immediately after
537
            assert!(x.received_xon(&params).is_err());
538

            
539
            let mut x = SidechannelMitigation::new();
540
            // can receive XOFF after sending a large number of bytes
541
            x.sent_stream_data(xoff_limit as usize * 3);
542
            assert!(x.received_xoff(&params).is_ok());
543
            // and can immediately receive an XON
544
            assert!(x.received_xon(&params).is_ok());
545
            // and can immediately receive an XOFF
546
            assert!(x.received_xoff(&params).is_ok());
547
            // and can immediately receive an XON
548
            assert!(x.received_xon(&params).is_ok());
549
            // and can immediately receive an XOFF
550
            assert!(x.received_xoff(&params).is_ok());
551
            // and can immediately receive an XON
552
            assert!(x.received_xon(&params).is_ok());
553
            // but cannot immediately receive an XOFF
554
            assert!(x.received_xoff(&params).is_err());
555
        }
556
    }
557
}