1
//! Circuit reactor's stream XON/XOFF flow control.
2
//!
3
//! ## Notes on consensus parameters
4
//!
5
//! ### `cc_xoff_client`
6
//!
7
//! This is the number of bytes that we buffer within a [`DataStream`]. The actual total number of
8
//! bytes buffered can be *much* larger. For example there will be additional buffering:
9
//!
10
//! - Within the arti socks/http proxy: Arti's proxy code needs to read some bytes from the stream, store
11
//!   it in a temporary buffer, then write the buffer to the socket. If the socket would block, the
12
//!   data would remain in that temporary buffer. In practice arti uses only a small byte buffer (APP_STREAM_BUF_LEN) at
13
//!   the time of writing, which is hopefully negligible. See `arti::socks::copy_interactive()`.
14
//! - Within the kernel: There are two additional buffers that will store stream data before the
15
//!   application connected over socks will see the data: Arti's socket send buffer and the
16
//!   application's socket receive buffer. If the application were to stop reading from its socket,
17
//!   stream data would accumulate first in the socket's receive buffer. Once full, stream data
18
//!   would accumulate in arti's socket's send buffer. This can become relatively large, especially
19
//!   with buffer autotuning enabled. On a Linux 6.15 system with curl downloading a large file and
20
//!   stopping mid-download, the receive buffer was 6,116,738 bytes and the send buffer was
21
//!   2,631,062 bytes. This sums to around 8.7 MB of stream data buffered in the kernel, which is
22
//!   significantly higher than the current consensus value of `cc_xoff_client`.
23
//!
24
//! This means that the total number of bytes buffered before an XOFF is sent can be much larger
25
//! than `cc_xoff_client`.
26
//!
27
//! While we should take into account the kernel and arti socks buffering above, we also need to
28
//! keep in mind that arti-client is a library that can be used by others. These library users might
29
//! not do any kernel or socks buffering, for example if they write a rust program that handles the
30
//! stream data entirely within their program. We don't want to set `cc_xoff_client` too low that it
31
//! harms the performance for these users, even if it's fine for the arti socks proxy case.
32

            
33
use std::num::Saturating;
34
use std::sync::Arc;
35

            
36
use postage::watch;
37
use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
38
use tor_cell::relaycell::msg::AnyRelayMsg;
39
use tor_cell::relaycell::{RelayCmd, RelayMsg, UnparsedRelayMsg};
40
use tracing::trace;
41

            
42
use super::reader::DrainRateRequest;
43

            
44
use crate::stream::flow_ctrl::params::{CellCount, FlowCtrlParameters};
45
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, HalfStreamFlowCtrlHooks, StreamRateLimit};
46
use crate::util::notify::NotifySender;
47
use crate::{Error, Result};
48

            
49
#[cfg(doc)]
50
use {crate::client::stream::DataStream, crate::stream::flow_ctrl::state::StreamFlowCtrl};
51

            
52
/// State for XON/XOFF flow control.
53
#[derive(Debug)]
54
pub(crate) struct XonXoffFlowCtrl {
55
    /// Consensus parameters.
56
    params: Arc<FlowCtrlParameters>,
57
    /// How we communicate rate limit updates to the
58
    /// [`DataWriter`](crate::client::stream::DataWriter).
59
    rate_limit_updater: watch::Sender<StreamRateLimit>,
60
    /// How we communicate requests for new drain rate updates to the
61
    /// [`XonXoffReader`](crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReader).
62
    drain_rate_requester: NotifySender<DrainRateRequest>,
63
    /// The last rate limit we sent.
64
    last_sent_xon_xoff: Option<XonXoffMsg>,
65
    /// The buffer limit at which we should send an XOFF.
66
    ///
67
    /// In prop324 it says that this will be either `cc_xoff_client` or `cc_xoff_exit` depending on
68
    /// whether we're a client/hs or exit, but we deviate from the spec here (see how it is set
69
    /// below).
70
    xoff_limit: CellCount<{ tor_cell::relaycell::PAYLOAD_MAX_SIZE_ALL as u32 }>,
71
    /// DropMark sidechannel mitigations.
72
    ///
73
    /// This is only enabled if we are a client (including an onion service).
74
    //
75
    // We could use a `Box` here so that this only takes up space if sidechannel mitigations are
76
    // enabled. But `SidechannelMitigation` is (at the time of writing) only 16 bytes. We could
77
    // reconsider in the future if we add more functionality to `SidechannelMitigation`.
78
    sidechannel_mitigation: Option<SidechannelMitigation>,
79
}
80

            
81
impl XonXoffFlowCtrl {
82
    /// Returns a new xon/xoff-based state.
83
12
    pub(crate) fn new(
84
12
        params: Arc<FlowCtrlParameters>,
85
12
        use_sidechannel_mitigations: bool,
86
12
        rate_limit_updater: watch::Sender<StreamRateLimit>,
87
12
        drain_rate_requester: NotifySender<DrainRateRequest>,
88
12
    ) -> Self {
89
12
        let sidechannel_mitigation =
90
12
            use_sidechannel_mitigations.then_some(SidechannelMitigation::new());
91

            
92
        // We use the same XOFF limit regardless of if we're a client or exit.
93
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
94
12
        let xoff_limit = std::cmp::max(params.cc_xoff_client, params.cc_xoff_exit);
95

            
96
12
        Self {
97
12
            params,
98
12
            rate_limit_updater,
99
12
            drain_rate_requester,
100
12
            last_sent_xon_xoff: None,
101
12
            xoff_limit,
102
12
            sidechannel_mitigation,
103
12
        }
104
12
    }
105
}
106

            
107
impl FlowCtrlHooks for XonXoffFlowCtrl {
108
6000
    fn can_send<M: RelayMsg>(&self, _msg: &M) -> bool {
109
        // we perform rate-limiting in the `DataWriter`,
110
        // so we send any messages that made it past the `DataWriter`
111
6000
        true
112
6000
    }
113

            
114
1200
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
115
        // if sidechannel mitigations are enabled and this is a RELAY_DATA message,
116
        // notify that we sent a data message
117
1200
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
118
1200
            if let AnyRelayMsg::Data(data_msg) = msg {
119
1200
                sidechannel_mitigation.sent_stream_data(data_msg.as_ref().len());
120
1200
            }
121
        }
122

            
123
1200
        Ok(())
124
1200
    }
125

            
126
    fn put_for_incoming_sendme(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
127
        let msg = "Stream level SENDME not allowed due to congestion control";
128
        Err(Error::CircProto(msg.into()))
129
    }
130

            
131
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
132
        let xon = msg
133
            .decode::<Xon>()
134
            .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
135
            .into_msg();
136

            
137
        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
138
        // > violation.
139
        if *xon.version() != 0 {
140
            return Err(Error::CircProto("Unrecognized XON version".into()));
141
        }
142

            
143
        // if sidechannel mitigations are enabled, notify that an XON was received
144
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
145
            sidechannel_mitigation.received_xon(&self.params)?;
146
        }
147

            
148
        trace!("Received an XON with rate {}", xon.kbps_ewma());
149

            
150
        let rate = match xon.kbps_ewma() {
151
            XonKbpsEwma::Limited(rate_kbps) => {
152
                let rate_kbps = u64::from(rate_kbps.get());
153
                // convert from kbps to bytes/s
154
                StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
155
            }
156
            XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
157
        };
158

            
159
        *self.rate_limit_updater.borrow_mut() = rate;
160
        Ok(())
161
    }
162

            
163
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
164
        let xoff = msg
165
            .decode::<Xoff>()
166
            .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
167
            .into_msg();
168

            
169
        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
170
        // > violation.
171
        if *xoff.version() != 0 {
172
            return Err(Error::CircProto("Unrecognized XOFF version".into()));
173
        }
174

            
175
        // if sidechannel mitigations are enabled, notify that an XOFF was received
176
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
177
            sidechannel_mitigation.received_xoff(&self.params)?;
178
        }
179

            
180
        trace!("Received an XOFF");
181

            
182
        // update the rate limit and notify the `DataWriter`
183
        *self.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
184

            
185
        Ok(())
186
    }
187

            
188
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
189
        if buffer_len as u64 > self.xoff_limit.as_bytes() {
190
            // we can't send an XON, and we should have already sent an XOFF when the queue first
191
            // exceeded the limit (see `maybe_send_xoff()`)
192
            debug_assert!(matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)));
193

            
194
            // inform the stream reader that we need a new drain rate
195
            self.drain_rate_requester.notify();
196
            return Ok(None);
197
        }
198

            
199
        self.last_sent_xon_xoff = Some(XonXoffMsg::Xon(rate));
200

            
201
        trace!("Want to send an XON with rate {rate}");
202

            
203
        Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
204
    }
205

            
206
64
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
207
        // if the last XON/XOFF we sent was an XOFF, no need to send another
208
64
        if matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)) {
209
            return Ok(None);
210
64
        }
211

            
212
64
        if buffer_len as u64 <= self.xoff_limit.as_bytes() {
213
64
            return Ok(None);
214
        }
215

            
216
        // either we have never sent an XOFF or XON, or we last sent an XON
217

            
218
        // remember that we last sent an XOFF
219
        self.last_sent_xon_xoff = Some(XonXoffMsg::Xoff);
220

            
221
        // inform the stream reader that we need a new drain rate
222
        self.drain_rate_requester.notify();
223

            
224
        trace!("Want to send an XOFF");
225

            
226
        Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
227
64
    }
228
}
229

            
230
/// State for XON/XOFF flow control on a half-stream.
231
#[derive(Debug)]
232
pub(crate) struct HalfStreamXonXoffFlowCtrl {
233
    /// The original [`XonXoffFlowCtrl`] from the full stream.
234
    ///
235
    /// We keep this since we need to continue validating any incoming messages
236
    /// and continue applying the sidechannel mitigations.
237
    inner: XonXoffFlowCtrl,
238
}
239

            
240
impl HalfStreamXonXoffFlowCtrl {
241
    /// Returns a new xon/xoff-based state for a half-stream.
242
    pub(crate) fn new(flow_ctrl: XonXoffFlowCtrl) -> Self {
243
        Self { inner: flow_ctrl }
244
    }
245
}
246

            
247
impl HalfStreamFlowCtrlHooks for HalfStreamXonXoffFlowCtrl {
248
    fn handle_incoming_dropped(&mut self, _msg_count: u16) -> Result<()> {
249
        // Nothing to do here.
250
        Ok(())
251
    }
252

            
253
    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>> {
254
        match msg.cmd() {
255
            RelayCmd::SENDME => {
256
                self.inner.put_for_incoming_sendme(msg)?;
257
                Ok(None)
258
            }
259
            RelayCmd::XON => {
260
                self.inner.handle_incoming_xon(msg)?;
261
                Ok(None)
262
            }
263
            RelayCmd::XOFF => {
264
                self.inner.handle_incoming_xoff(msg)?;
265
                Ok(None)
266
            }
267
            // Nothing to do here.
268
            _ => Ok(Some(msg)),
269
        }
270
    }
271
}
272

            
273
/// An XON or XOFF message with no associated data.
274
#[derive(Debug, PartialEq, Eq)]
275
enum XonXoff {
276
    /// XON message.
277
    Xon,
278
    /// XOFF message.
279
    Xoff,
280
}
281

            
282
/// An XON or XOFF message with associated data.
283
#[derive(Debug)]
284
enum XonXoffMsg {
285
    /// XON message with a rate.
286
    // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
287
    // If that doesn't end up being the case, then we should remove it.
288
    #[expect(dead_code)]
289
    Xon(XonKbpsEwma),
290
    /// XOFF message.
291
    Xoff,
292
}
293

            
294
/// Sidechannel mitigations for DropMark attacks.
295
///
296
/// > In order to mitigate DropMark attacks, both XOFF and advisory XON transmission must be
297
/// > restricted.
298
///
299
/// These restrictions should be implemented for clients (OPs and onion services).
300
#[derive(Debug)]
301
struct SidechannelMitigation {
302
    /// The last rate limit update we received.
303
    last_recvd_xon_xoff: Option<XonXoff>,
304
    /// Number of sent stream bytes.
305
    ///
306
    /// C-tor has some logic to try to fit this into a 32-bit integer,
307
    /// but lets not do that unless we need to as it will make bugs more likely.
308
    bytes_sent_total: Saturating<u64>,
309
    /// The number of advisory XON messages we've received.
310
    ///
311
    /// Note: Advisory XONs are XON->XON messages, and not XOFF->XON messages.
312
    num_advisory_xon_recvd: Saturating<u64>,
313
    /// The number of XOFF messages we've received.
314
    num_xoff_recvd: Saturating<u64>,
315
}
316

            
317
impl SidechannelMitigation {
318
    /// A new [`SidechannelMitigation`].
319
48
    fn new() -> Self {
320
48
        Self {
321
48
            last_recvd_xon_xoff: None,
322
48
            bytes_sent_total: Saturating(0),
323
48
            num_advisory_xon_recvd: Saturating(0),
324
48
            num_xoff_recvd: Saturating(0),
325
48
        }
326
48
    }
327

            
328
    /// A (likely underestimated) guess of the XOFF limit that the other endpoint is using.
329
92
    fn peer_xoff_limit_bytes(params: &FlowCtrlParameters) -> u64 {
330
        // We need to consider that `xoff_client` and `xoff_exit` may be different, we don't know
331
        // here exactly what kind of peer we're connected to, and that we may have a different view
332
        // of the consensus than the peer.
333
        // We deviate from prop324 here and use a more relaxed threshold.
334
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
335
92
        let min = std::cmp::min(
336
92
            params.cc_xoff_client.as_bytes(),
337
92
            params.cc_xoff_exit.as_bytes(),
338
        );
339
92
        min / 2
340
92
    }
341

            
342
    /// A (likely underestimated) guess of the advisory XON limit that the other endpoint is using.
343
42
    fn peer_xon_limit_bytes(params: &FlowCtrlParameters) -> u64 {
344
        // We need to consider that we may have a different view of the consensus than the peer.
345
        // We deviate from prop324 here and use a more relaxed threshold.
346
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
347
42
        params.cc_xon_rate.as_bytes() / 2
348
42
    }
349

            
350
    /// Notify that we have sent stream data.
351
1236
    fn sent_stream_data(&mut self, stream_bytes: usize) {
352
        // perform a saturating conversion to u64
353
1236
        let stream_bytes: u64 = stream_bytes.try_into().unwrap_or(u64::MAX);
354
1236
        self.bytes_sent_total += stream_bytes;
355
1236
    }
356

            
357
    /// Notify that we have received an XON message.
358
40
    fn received_xon(&mut self, params: &FlowCtrlParameters) -> Result<()> {
359
        // Check to make sure that XON is not sent too early, for dropmark attacks. The main
360
        // sidechannel risk is early cells, but we also check to see that we did not get more XONs
361
        // than make sense for the number of bytes we sent.
362
        //
363
        // The ordering is important here. For example we first want to check if we received an
364
        // advisory XON that was too early, before we check if we received the advisory XON too
365
        // frequently.
366

            
367
        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
368
        // is the most important check so we do it explicitly here first.
369
40
        if self.bytes_sent_total.0 == 0 {
370
            const MSG: &str = "Received XON before sending any data";
371
4
            return Err(Error::CircProto(MSG.into()));
372
36
        }
373

            
374
        // is this an advisory XON?
375
36
        let is_advisory = match self.last_recvd_xon_xoff {
376
            // if we last received an XON, then this is advisory since we are already sending data
377
12
            Some(XonXoff::Xon) => true,
378
            // if we last received an XOFF, then this isn't advisory since we're being asked to
379
            // resume sending data
380
16
            Some(XonXoff::Xoff) => false,
381
            // if we never received an XON nor XOFF, then this is advisory since we are already
382
            // sending data
383
8
            None => true,
384
        };
385

            
386
        // set this before we possibly return early below, since this must be set regardless of if
387
        // it's an advisory XON or not
388
36
        self.last_recvd_xon_xoff = Some(XonXoff::Xon);
389

            
390
        // we only restrict advisory XON messages
391
36
        if !is_advisory {
392
16
            return Ok(());
393
20
        }
394

            
395
20
        self.num_advisory_xon_recvd += 1;
396

            
397
        // > Clients also SHOULD ensure that advisory XONs do not arrive before the minimum of the
398
        // > XOFF limit and 'cc_xon_rate' full cells worth of bytes have been transmitted.
399
        //
400
        // NOTE: We use a more relaxed threshold for the XON and XOFF limits than in prop324.
401
20
        let advisory_not_expected_before = std::cmp::min(
402
20
            Self::peer_xoff_limit_bytes(params),
403
20
            Self::peer_xon_limit_bytes(params),
404
        );
405
20
        if self.bytes_sent_total.0 < advisory_not_expected_before {
406
            const MSG: &str = "Received advisory XON too early";
407
2
            return Err(Error::CircProto(MSG.into()));
408
18
        }
409

            
410
        // > Clients SHOULD ensure that advisory XONs do not arrive more frequently than every
411
        // > 'cc_xon_rate' cells worth of sent data.
412
        //
413
        // It should be an error if
414
        //   XON frequency > 1/peer_xon_limit_bytes
415
        // where
416
        //   XON frequency = num_advisory_xon_recvd/bytes_sent_total
417
        //
418
        // so
419
        //   num_advisory_xon_recvd/bytes_sent_total > 1/peer_xon_limit_bytes
420
        //
421
        // or to better work with integers
422
        //   num_advisory_xon_recvd > bytes_sent_total/peer_xon_limit_bytes
423
        //
424
        // NOTE: We use a more relaxed threshold for the XON limit than in prop324.
425
18
        let peer_xon_limit_bytes = Self::peer_xon_limit_bytes(params);
426
18
        if peer_xon_limit_bytes != 0
427
18
            && self.num_advisory_xon_recvd.0 > self.bytes_sent_total.0 / peer_xon_limit_bytes
428
        {
429
            const MSG: &str = "Received advisory XON too frequently";
430
6
            return Err(Error::CircProto(MSG.into()));
431
12
        }
432

            
433
12
        Ok(())
434
40
    }
435

            
436
    /// Notify that we have received an XOFF message.
437
48
    fn received_xoff(&mut self, params: &FlowCtrlParameters) -> Result<()> {
438
        // Check to make sure that XOFF is not sent too early, for dropmark attacks. The
439
        // main sidechannel risk is early cells, but we also check to make sure that we have not
440
        // received more XOFFs than could have been generated by the bytes we sent.
441
        //
442
        // The ordering is important here. For example we first want to disallow consecutive XOFFs,
443
        // then check if we received an XOFF that was too early, and finally check if we received
444
        // the XOFF too frequently.
445

            
446
48
        self.num_xoff_recvd += 1;
447

            
448
        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
449
        // is the most important check so we do it explicitly here first.
450
48
        if self.bytes_sent_total.0 == 0 {
451
            const MSG: &str = "Received XOFF before sending any data";
452
4
            return Err(Error::CircProto(MSG.into()));
453
44
        }
454

            
455
        // disallow consecutive XOFF messages
456
44
        if self.last_recvd_xon_xoff == Some(XonXoff::Xoff) {
457
            const MSG: &str = "Received consecutive XOFF messages";
458
8
            return Err(Error::CircProto(MSG.into()));
459
36
        }
460

            
461
        // > clients MUST ensure that an XOFF does not arrive before it has sent the appropriate
462
        // > XOFF limit of bytes on a stream ('cc_xoff_exit' for exits, 'cc_xoff_client' for
463
        // > onions).
464
        //
465
        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
466
36
        if self.bytes_sent_total.0 < Self::peer_xoff_limit_bytes(params) {
467
            const MSG: &str = "Received XOFF too early";
468
4
            return Err(Error::CircProto(MSG.into()));
469
32
        }
470

            
471
        // > Clients also SHOULD ensure than XOFFs do not arrive more frequently than every XOFF
472
        // > limit worth of sent data.
473
        //
474
        // It should be an error if
475
        //   XOFF frequency > 1/peer_xoff_limit_bytes
476
        // where
477
        //   XOFF frequency = num_xoff_recvd/bytes_sent_total
478
        //
479
        // so
480
        //   num_xoff_recvd/bytes_sent_total > 1/peer_xoff_limit_bytes
481
        //
482
        // or to better work with integers
483
        //   num_xoff_recvd > bytes_sent_total/peer_xoff_limit_bytes
484
        //
485
        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
486
32
        let peer_xoff_limit_bytes = Self::peer_xoff_limit_bytes(params);
487
32
        if peer_xoff_limit_bytes != 0
488
32
            && self.num_xoff_recvd.0 > self.bytes_sent_total.0 / peer_xoff_limit_bytes
489
        {
490
4
            return Err(Error::CircProto("Received XOFF too frequently".into()));
491
28
        }
492

            
493
28
        self.last_recvd_xon_xoff = Some(XonXoff::Xoff);
494

            
495
28
        Ok(())
496
48
    }
497
}
498

            
499
#[cfg(test)]
500
mod test {
501
    use super::*;
502

            
503
    use crate::stream::flow_ctrl::params::CellCount;
504

            
505
    #[test]
506
    fn sidechannel_mitigation() {
507
        let params = [
508
            FlowCtrlParameters {
509
                cc_xoff_client: CellCount::new(2),
510
                cc_xoff_exit: CellCount::new(4),
511
                cc_xon_rate: CellCount::new(8),
512
                cc_xon_change_pct: 1,
513
                cc_xon_ewma_cnt: 1,
514
            },
515
            FlowCtrlParameters {
516
                cc_xoff_client: CellCount::new(8),
517
                cc_xoff_exit: CellCount::new(4),
518
                cc_xon_rate: CellCount::new(2),
519
                cc_xon_change_pct: 1,
520
                cc_xon_ewma_cnt: 1,
521
            },
522
        ];
523

            
524
        for params in params {
525
            let xon_limit = SidechannelMitigation::peer_xon_limit_bytes(&params);
526
            let xoff_limit = SidechannelMitigation::peer_xoff_limit_bytes(&params);
527

            
528
            let mut x = SidechannelMitigation::new();
529
            // cannot receive XON as first message
530
            assert!(x.received_xon(&params).is_err());
531

            
532
            let mut x = SidechannelMitigation::new();
533
            // cannot receive XOFF as first message
534
            assert!(x.received_xoff(&params).is_err());
535

            
536
            let mut x = SidechannelMitigation::new();
537
            // cannot receive XOFF after sending fewer than `xoff_limit` bytes
538
            x.sent_stream_data(xoff_limit as usize - 1);
539
            assert!(x.received_xoff(&params).is_err());
540

            
541
            let mut x = SidechannelMitigation::new();
542
            // can receive XOFF after sending `xoff_limit` bytes
543
            x.sent_stream_data(xoff_limit as usize);
544
            assert!(x.received_xoff(&params).is_ok());
545
            // but cannot receive another XOFF immediately after
546
            assert!(x.received_xoff(&params).is_err());
547

            
548
            let mut x = SidechannelMitigation::new();
549
            // can receive XOFF after sending `xoff_limit` bytes
550
            x.sent_stream_data(xoff_limit as usize);
551
            assert!(x.received_xoff(&params).is_ok());
552
            // but cannot receive another XOFF even after sending another `xoff_limit` bytes
553
            x.sent_stream_data(xoff_limit as usize);
554
            assert!(x.received_xoff(&params).is_err());
555

            
556
            let mut x = SidechannelMitigation::new();
557
            // can receive XOFF after sending `xoff_limit` bytes
558
            x.sent_stream_data(xoff_limit as usize);
559
            assert!(x.received_xoff(&params).is_ok());
560
            // and can immediately receive an XON
561
            assert!(x.received_xon(&params).is_ok());
562
            // and can receive another XOFF after sending another `xoff_limit` bytes
563
            x.sent_stream_data(xoff_limit as usize);
564
            assert!(x.received_xoff(&params).is_ok());
565

            
566
            let mut x = SidechannelMitigation::new();
567
            // cannot receive XON after sending fewer than `xon_limit` bytes
568
            x.sent_stream_data(xon_limit as usize - 1);
569
            assert!(x.received_xon(&params).is_err());
570

            
571
            let mut x = SidechannelMitigation::new();
572
            // can receive XON after sending a large number of bytes
573
            x.sent_stream_data(xon_limit as usize * 3);
574
            assert!(x.received_xon(&params).is_ok());
575
            // and can immediately receive another XON
576
            assert!(x.received_xon(&params).is_ok());
577
            // and can immediately receive another XON
578
            assert!(x.received_xon(&params).is_ok());
579
            // but cannot receive another XON immediately after
580
            assert!(x.received_xon(&params).is_err());
581

            
582
            let mut x = SidechannelMitigation::new();
583
            // can receive XOFF after sending a large number of bytes
584
            x.sent_stream_data(xoff_limit as usize * 3);
585
            assert!(x.received_xoff(&params).is_ok());
586
            // and can immediately receive an XON
587
            assert!(x.received_xon(&params).is_ok());
588
            // and can immediately receive an XOFF
589
            assert!(x.received_xoff(&params).is_ok());
590
            // and can immediately receive an XON
591
            assert!(x.received_xon(&params).is_ok());
592
            // and can immediately receive an XOFF
593
            assert!(x.received_xoff(&params).is_ok());
594
            // and can immediately receive an XON
595
            assert!(x.received_xon(&params).is_ok());
596
            // but cannot immediately receive an XOFF
597
            assert!(x.received_xoff(&params).is_err());
598
        }
599
    }
600
}