1
//! Circuit reactor's stream XON/XOFF flow control.
2
//!
3
//! ## Notes on consensus parameters
4
//!
5
//! ### `cc_xoff_client`
6
//!
7
//! This is the number of bytes that we buffer within a [`DataStream`]. The actual total number of
8
//! bytes buffered can be *much* larger. For example there will be additional buffering:
9
//!
10
//! - Within the arti socks/http proxy: Arti's proxy code needs to read some bytes from the stream, store
11
//!   it in a temporary buffer, then write the buffer to the socket. If the socket would block, the
12
//!   data would remain in that temporary buffer. In practice arti uses only a small byte buffer (APP_STREAM_BUF_LEN) at
13
//!   the time of writing, which is hopefully negligible. See `arti::socks::copy_interactive()`.
14
//! - Within the kernel: There are two additional buffers that will store stream data before the
15
//!   application connected over socks will see the data: Arti's socket send buffer and the
16
//!   application's socket receive buffer. If the application were to stop reading from its socket,
17
//!   stream data would accumulate first in the socket's receive buffer. Once full, stream data
18
//!   would accumulate in arti's socket's send buffer. This can become relatively large, especially
19
//!   with buffer autotuning enabled. On a Linux 6.15 system with curl downloading a large file and
20
//!   stopping mid-download, the receive buffer was 6,116,738 bytes and the send buffer was
21
//!   2,631,062 bytes. This sums to around 8.7 MB of stream data buffered in the kernel, which is
22
//!   significantly higher than the current consensus value of `cc_xoff_client`.
23
//!
24
//! This means that the total number of bytes buffered before an XOFF is sent can be much larger
25
//! than `cc_xoff_client`.
26
//!
27
//! While we should take into account the kernel and arti socks buffering above, we also need to
28
//! keep in mind that arti-client is a library that can be used by others. These library users might
29
//! not do any kernel or socks buffering, for example if they write a rust program that handles the
30
//! stream data entirely within their program. We don't want to set `cc_xoff_client` too low that it
31
//! harms the performance for these users, even if it's fine for the arti socks proxy case.
32

            
33
use std::num::Saturating;
34
use std::sync::Arc;
35

            
36
use postage::watch;
37
use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
38
use tor_cell::relaycell::msg::AnyRelayMsg;
39
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
40
use tracing::trace;
41

            
42
use super::reader::DrainRateRequest;
43

            
44
use crate::stream::flow_ctrl::params::{CellCount, FlowCtrlParameters};
45
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamRateLimit};
46
use crate::util::notify::NotifySender;
47
use crate::{Error, Result};
48

            
49
#[cfg(doc)]
50
use {crate::client::stream::DataStream, crate::stream::flow_ctrl::state::StreamFlowCtrl};
51

            
52
/// State for XON/XOFF flow control.
53
#[derive(Debug)]
54
pub(crate) struct XonXoffFlowCtrl {
55
    /// Consensus parameters.
56
    params: Arc<FlowCtrlParameters>,
57
    /// How we communicate rate limit updates to the
58
    /// [`DataWriter`](crate::client::stream::DataWriter).
59
    rate_limit_updater: watch::Sender<StreamRateLimit>,
60
    /// How we communicate requests for new drain rate updates to the
61
    /// [`XonXoffReader`](crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReader).
62
    drain_rate_requester: NotifySender<DrainRateRequest>,
63
    /// The last rate limit we sent.
64
    last_sent_xon_xoff: Option<XonXoffMsg>,
65
    /// The buffer limit at which we should send an XOFF.
66
    ///
67
    /// In prop324 it says that this will be either `cc_xoff_client` or `cc_xoff_exit` depending on
68
    /// whether we're a client/hs or exit, but we deviate from the spec here (see how it is set
69
    /// below).
70
    xoff_limit: CellCount<{ tor_cell::relaycell::PAYLOAD_MAX_SIZE_ALL as u32 }>,
71
    /// DropMark sidechannel mitigations.
72
    ///
73
    /// This is only enabled if we are a client (including an onion service).
74
    //
75
    // We could use a `Box` here so that this only takes up space if sidechannel mitigations are
76
    // enabled. But `SidechannelMitigation` is (at the time of writing) only 16 bytes. We could
77
    // reconsider in the future if we add more functionality to `SidechannelMitigation`.
78
    sidechannel_mitigation: Option<SidechannelMitigation>,
79
}
80

            
81
impl XonXoffFlowCtrl {
82
    /// Returns a new xon/xoff-based state.
83
12
    pub(crate) fn new(
84
12
        params: Arc<FlowCtrlParameters>,
85
12
        use_sidechannel_mitigations: bool,
86
12
        rate_limit_updater: watch::Sender<StreamRateLimit>,
87
12
        drain_rate_requester: NotifySender<DrainRateRequest>,
88
12
    ) -> Self {
89
12
        let sidechannel_mitigation =
90
12
            use_sidechannel_mitigations.then_some(SidechannelMitigation::new());
91

            
92
        // We use the same XOFF limit regardless of if we're a client or exit.
93
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
94
12
        let xoff_limit = std::cmp::max(params.cc_xoff_client, params.cc_xoff_exit);
95

            
96
12
        Self {
97
12
            params,
98
12
            rate_limit_updater,
99
12
            drain_rate_requester,
100
12
            last_sent_xon_xoff: None,
101
12
            xoff_limit,
102
12
            sidechannel_mitigation,
103
12
        }
104
12
    }
105
}
106

            
107
impl FlowCtrlHooks for XonXoffFlowCtrl {
108
6000
    fn can_send<M: RelayMsg>(&self, _msg: &M) -> bool {
109
        // we perform rate-limiting in the `DataWriter`,
110
        // so we send any messages that made it past the `DataWriter`
111
6000
        true
112
6000
    }
113

            
114
1200
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
115
        // if sidechannel mitigations are enabled and this is a RELAY_DATA message,
116
        // notify that we sent a data message
117
1200
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
118
1200
            if let AnyRelayMsg::Data(data_msg) = msg {
119
1200
                sidechannel_mitigation.sent_stream_data(data_msg.as_ref().len());
120
1200
            }
121
        }
122

            
123
1200
        Ok(())
124
1200
    }
125

            
126
    fn put_for_incoming_sendme(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
127
        let msg = "Stream level SENDME not allowed due to congestion control";
128
        Err(Error::CircProto(msg.into()))
129
    }
130

            
131
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
132
        let xon = msg
133
            .decode::<Xon>()
134
            .map_err(|e| Error::from_bytes_err(e, "failed to decode XON message"))?
135
            .into_msg();
136

            
137
        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
138
        // > violation.
139
        if *xon.version() != 0 {
140
            return Err(Error::CircProto("Unrecognized XON version".into()));
141
        }
142

            
143
        // if sidechannel mitigations are enabled, notify that an XON was received
144
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
145
            sidechannel_mitigation.received_xon(&self.params)?;
146
        }
147

            
148
        trace!("Received an XON with rate {}", xon.kbps_ewma());
149

            
150
        let rate = match xon.kbps_ewma() {
151
            XonKbpsEwma::Limited(rate_kbps) => {
152
                let rate_kbps = u64::from(rate_kbps.get());
153
                // convert from kbps to bytes/s
154
                StreamRateLimit::new_bytes_per_sec(rate_kbps * 1000 / 8)
155
            }
156
            XonKbpsEwma::Unlimited => StreamRateLimit::MAX,
157
        };
158

            
159
        *self.rate_limit_updater.borrow_mut() = rate;
160
        Ok(())
161
    }
162

            
163
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
164
        let xoff = msg
165
            .decode::<Xoff>()
166
            .map_err(|e| Error::from_bytes_err(e, "failed to decode XOFF message"))?
167
            .into_msg();
168

            
169
        // > Parties SHOULD treat XON or XOFF cells with unrecognized versions as a protocol
170
        // > violation.
171
        if *xoff.version() != 0 {
172
            return Err(Error::CircProto("Unrecognized XOFF version".into()));
173
        }
174

            
175
        // if sidechannel mitigations are enabled, notify that an XOFF was received
176
        if let Some(ref mut sidechannel_mitigation) = self.sidechannel_mitigation {
177
            sidechannel_mitigation.received_xoff(&self.params)?;
178
        }
179

            
180
        trace!("Received an XOFF");
181

            
182
        // update the rate limit and notify the `DataWriter`
183
        *self.rate_limit_updater.borrow_mut() = StreamRateLimit::ZERO;
184

            
185
        Ok(())
186
    }
187

            
188
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
189
        if buffer_len as u64 > self.xoff_limit.as_bytes() {
190
            // we can't send an XON, and we should have already sent an XOFF when the queue first
191
            // exceeded the limit (see `maybe_send_xoff()`)
192
            debug_assert!(matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)));
193

            
194
            // inform the stream reader that we need a new drain rate
195
            self.drain_rate_requester.notify();
196
            return Ok(None);
197
        }
198

            
199
        self.last_sent_xon_xoff = Some(XonXoffMsg::Xon(rate));
200

            
201
        trace!("Want to send an XON with rate {rate}");
202

            
203
        Ok(Some(Xon::new(FlowCtrlVersion::V0, rate)))
204
    }
205

            
206
64
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
207
        // if the last XON/XOFF we sent was an XOFF, no need to send another
208
64
        if matches!(self.last_sent_xon_xoff, Some(XonXoffMsg::Xoff)) {
209
            return Ok(None);
210
64
        }
211

            
212
64
        if buffer_len as u64 <= self.xoff_limit.as_bytes() {
213
64
            return Ok(None);
214
        }
215

            
216
        // either we have never sent an XOFF or XON, or we last sent an XON
217

            
218
        // remember that we last sent an XOFF
219
        self.last_sent_xon_xoff = Some(XonXoffMsg::Xoff);
220

            
221
        // inform the stream reader that we need a new drain rate
222
        self.drain_rate_requester.notify();
223

            
224
        trace!("Want to send an XOFF");
225

            
226
        Ok(Some(Xoff::new(FlowCtrlVersion::V0)))
227
64
    }
228
}
229

            
230
/// An XON or XOFF message with no associated data.
231
#[derive(Debug, PartialEq, Eq)]
232
enum XonXoff {
233
    /// XON message.
234
    Xon,
235
    /// XOFF message.
236
    Xoff,
237
}
238

            
239
/// An XON or XOFF message with associated data.
240
#[derive(Debug)]
241
enum XonXoffMsg {
242
    /// XON message with a rate.
243
    // TODO: I'm expecting that we'll want the `XonKbpsEwma` in the future.
244
    // If that doesn't end up being the case, then we should remove it.
245
    #[expect(dead_code)]
246
    Xon(XonKbpsEwma),
247
    /// XOFF message.
248
    Xoff,
249
}
250

            
251
/// Sidechannel mitigations for DropMark attacks.
252
///
253
/// > In order to mitigate DropMark attacks, both XOFF and advisory XON transmission must be
254
/// > restricted.
255
///
256
/// These restrictions should be implemented for clients (OPs and onion services).
257
#[derive(Debug)]
258
struct SidechannelMitigation {
259
    /// The last rate limit update we received.
260
    last_recvd_xon_xoff: Option<XonXoff>,
261
    /// Number of sent stream bytes.
262
    ///
263
    /// We only use this for bytes that are sent early on the stream,
264
    /// checking if it's less than `cc_xon_rate` and/or `cc_xoff_{client,exit}`.
265
    /// Once this value is sufficiently large, we don't care about the exact value.
266
    /// So a saturating u32 should be more than enough bits for what we need.
267
    bytes_sent_total: Saturating<u32>,
268
    /// Number of sent stream bytes since the last advisory XON was received.
269
    bytes_sent_since_recvd_last_advisory_xon: Saturating<u32>,
270
    /// Number of sent stream bytes since the last XOFF was received.
271
    bytes_sent_since_recvd_last_xoff: Saturating<u32>,
272
}
273

            
274
impl SidechannelMitigation {
275
    /// A new [`SidechannelMitigation`].
276
40
    fn new() -> Self {
277
40
        Self {
278
40
            last_recvd_xon_xoff: None,
279
40
            bytes_sent_total: Saturating(0),
280
40
            // We set these to 0 even though we haven't yet received an XON or XOFF. We could use an
281
40
            // `Option` instead, but it makes the code more complicated and increases their size
282
40
            // from 32 bits to 64 bits.
283
40
            bytes_sent_since_recvd_last_advisory_xon: Saturating(0),
284
40
            bytes_sent_since_recvd_last_xoff: Saturating(0),
285
40
        }
286
40
    }
287

            
288
    /// A (likely underestimated) guess of the XOFF limit that the other endpoint is using.
289
44
    fn peer_xoff_limit_bytes(params: &FlowCtrlParameters) -> u64 {
290
        // We need to consider that `xoff_client` and `xoff_exit` may be different, we don't know
291
        // here exactly what kind of peer we're connected to, and that we may have a different view
292
        // of the consensus than the peer.
293
        // We deviate from prop324 here and use a more relaxed threshold.
294
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
295
44
        let min = std::cmp::min(
296
44
            params.cc_xoff_client.as_bytes(),
297
44
            params.cc_xoff_exit.as_bytes(),
298
        );
299
44
        min / 2
300
44
    }
301

            
302
    /// A (likely underestimated) guess of the advisory XON limit that the other endpoint is using.
303
10
    fn peer_xon_limit_bytes(params: &FlowCtrlParameters) -> u64 {
304
        // We need to consider that we may have a different view of the consensus than the peer.
305
        // We deviate from prop324 here and use a more relaxed threshold.
306
        // See https://gitlab.torproject.org/tpo/core/torspec/-/issues/371#note_3260658
307
10
        params.cc_xon_rate.as_bytes() / 2
308
10
    }
309

            
310
    /// Notify that we have sent stream data.
311
1228
    fn sent_stream_data(&mut self, stream_bytes: usize) {
312
        // perform a saturating conversion to u32
313
1228
        let stream_bytes: u32 = stream_bytes.try_into().unwrap_or(u32::MAX);
314

            
315
1228
        self.bytes_sent_total += stream_bytes;
316

            
317
        // when we receive an XON or XOFF, we set the corresponding variable back to 0
318
1228
        self.bytes_sent_since_recvd_last_advisory_xon += stream_bytes;
319
1228
        self.bytes_sent_since_recvd_last_xoff += stream_bytes;
320
1228
    }
321

            
322
    /// Notify that we have received an XON message.
323
12
    fn received_xon(&mut self, params: &FlowCtrlParameters) -> Result<()> {
324
        // Check to make sure that XON is not sent too early, for dropmark attacks. The main
325
        // sidechannel risk is early cells, but we also check to see that we did not get more XONs
326
        // than make sense for the number of bytes we sent.
327
        //
328
        // The ordering is important here. For example we first want to check if we received an
329
        // advisory XON that was too early, before we check if we received the advisory XON too
330
        // frequently.
331

            
332
        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
333
        // is the most important check so we do it explicitly here first.
334
12
        if self.bytes_sent_total.0 == 0 {
335
            const MSG: &str = "Received XON before sending any data";
336
4
            return Err(Error::CircProto(MSG.into()));
337
8
        }
338

            
339
        // is this an advisory XON?
340
8
        let is_advisory = match self.last_recvd_xon_xoff {
341
            // if we last received an XON, then this is advisory since we are already sending data
342
            Some(XonXoff::Xon) => true,
343
            // if we last received an XOFF, then this isn't advisory since we're being asked to
344
            // resume sending data
345
4
            Some(XonXoff::Xoff) => false,
346
            // if we never received an XON nor XOFF, then this is advisory since we are already
347
            // sending data
348
4
            None => true,
349
        };
350

            
351
        // set this before we possibly return early below, since this must be set regardless of if
352
        // it's an advisory XON or not
353
8
        self.last_recvd_xon_xoff = Some(XonXoff::Xon);
354

            
355
        // we only restrict advisory XON messages
356
8
        if !is_advisory {
357
4
            return Ok(());
358
4
        }
359

            
360
        // > Clients also SHOULD ensure that advisory XONs do not arrive before the minimum of the
361
        // > XOFF limit and 'cc_xon_rate' full cells worth of bytes have been transmitted.
362
        //
363
        // NOTE: We use a more relaxed threshold for the XON and XOFF limits than in prop324.
364
4
        let advisory_not_expected_before = std::cmp::min(
365
4
            Self::peer_xoff_limit_bytes(params),
366
4
            Self::peer_xon_limit_bytes(params),
367
        );
368
4
        if u64::from(self.bytes_sent_total.0) < advisory_not_expected_before {
369
            const MSG: &str = "Received advisory XON too early";
370
2
            return Err(Error::CircProto(MSG.into()));
371
2
        }
372

            
373
        // > Clients SHOULD ensure that advisory XONs do not arrive more frequently than every
374
        // > 'cc_xon_rate' cells worth of sent data.
375
        //
376
        // NOTE: We implement this a bit different than C-tor. In C-tor it checks that:
377
        //   conn->total_bytes_xmit < MIN(xoff_{client/exit}, xon_rate_bytes)*conn->num_xon_recv
378
        // which effectively checks that the average XON frequency over the lifetime of the stream
379
        // does not exceed a frequency of `MIN(xoff_{client/exit}, xon_rate_bytes)`. Instead here we
380
        // check that two XON messages never arrive at an interval that would exceed a frequency of
381
        // `cc_xon_rate`.
382
        //
383
        // NOTE: We use a more relaxed threshold for the XON limit than in prop324.
384
2
        if u64::from(self.bytes_sent_since_recvd_last_advisory_xon.0)
385
2
            < Self::peer_xon_limit_bytes(params)
386
        {
387
            const MSG: &str = "Received advisory XON too frequently";
388
2
            return Err(Error::CircProto(MSG.into()));
389
        }
390

            
391
        self.bytes_sent_since_recvd_last_advisory_xon = Saturating(0);
392

            
393
        Ok(())
394
12
    }
395

            
396
    /// Notify that we have received an XOFF message.
397
32
    fn received_xoff(&mut self, params: &FlowCtrlParameters) -> Result<()> {
398
        // Check to make sure that XOFF is not sent too early, for dropmark attacks. The
399
        // main sidechannel risk is early cells, but we also check to make sure that we have not
400
        // received more XOFFs than could have been generated by the bytes we sent.
401
        //
402
        // The ordering is important here. For example we first want to disallow consecutive XOFFs,
403
        // then check if we received an XOFF that was too early, and finally check if we received
404
        // the XOFF too frequently.
405

            
406
        // Ensure that we have sent some bytes. This might be covered by other checks below, but this
407
        // is the most important check so we do it explicitly here first.
408
32
        if self.bytes_sent_total.0 == 0 {
409
            const MSG: &str = "Received XOFF before sending any data";
410
4
            return Err(Error::CircProto(MSG.into()));
411
28
        }
412

            
413
        // disallow consecutive XOFF messages
414
28
        if self.last_recvd_xon_xoff == Some(XonXoff::Xoff) {
415
            const MSG: &str = "Received consecutive XOFF messages";
416
8
            return Err(Error::CircProto(MSG.into()));
417
20
        }
418

            
419
        // > clients MUST ensure that an XOFF does not arrive before it has sent the appropriate
420
        // > XOFF limit of bytes on a stream ('cc_xoff_exit' for exits, 'cc_xoff_client' for
421
        // > onions).
422
        //
423
        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
424
20
        if u64::from(self.bytes_sent_total.0) < Self::peer_xoff_limit_bytes(params) {
425
            const MSG: &str = "Received XOFF too early";
426
4
            return Err(Error::CircProto(MSG.into()));
427
16
        }
428

            
429
        // > Clients also SHOULD ensure than XOFFs do not arrive more frequently than every XOFF
430
        // > limit worth of sent data.
431
        //
432
        // NOTE: We implement this a bit different than C-tor. In C-tor it checks that:
433
        //   conn->total_bytes_xmit < xoff_{client/exit}*conn->num_xoff_recv
434
        // which effectively checks that the average XOFF frequency over the lifetime of the stream
435
        // does not exceed a frequency of `xoff_{client/exit}`. Instead here we check that two XOFF
436
        // messages never arrive at an interval that would exceed a frequency of
437
        // `xoff_{client/exit}`.
438
        //
439
        // NOTE: We use a more relaxed threshold for the XOFF limit than in prop324.
440
16
        if u64::from(self.bytes_sent_since_recvd_last_xoff.0) < Self::peer_xoff_limit_bytes(params)
441
        {
442
            return Err(Error::CircProto("Received XOFF too frequently".into()));
443
16
        }
444

            
445
16
        self.bytes_sent_since_recvd_last_xoff = Saturating(0);
446
16
        self.last_recvd_xon_xoff = Some(XonXoff::Xoff);
447

            
448
16
        Ok(())
449
32
    }
450
}
451

            
452
#[cfg(test)]
453
mod test {
454
    use super::*;
455

            
456
    use crate::stream::flow_ctrl::params::CellCount;
457

            
458
    #[test]
459
    fn sidechannel_mitigation() {
460
        let params = [
461
            FlowCtrlParameters {
462
                cc_xoff_client: CellCount::new(2),
463
                cc_xoff_exit: CellCount::new(4),
464
                cc_xon_rate: CellCount::new(8),
465
                cc_xon_change_pct: 1,
466
                cc_xon_ewma_cnt: 1,
467
            },
468
            FlowCtrlParameters {
469
                cc_xoff_client: CellCount::new(8),
470
                cc_xoff_exit: CellCount::new(4),
471
                cc_xon_rate: CellCount::new(2),
472
                cc_xon_change_pct: 1,
473
                cc_xon_ewma_cnt: 1,
474
            },
475
        ];
476

            
477
        for params in params {
478
            let xon_limit = SidechannelMitigation::peer_xon_limit_bytes(&params);
479
            let xoff_limit = SidechannelMitigation::peer_xoff_limit_bytes(&params);
480

            
481
            let mut x = SidechannelMitigation::new();
482
            // cannot receive XON as first message
483
            assert!(x.received_xon(&params).is_err());
484

            
485
            let mut x = SidechannelMitigation::new();
486
            // cannot receive XOFF as first message
487
            assert!(x.received_xoff(&params).is_err());
488

            
489
            let mut x = SidechannelMitigation::new();
490
            // cannot receive XOFF after sending fewer than `xoff_limit` bytes
491
            x.sent_stream_data(xoff_limit as usize - 1);
492
            assert!(x.received_xoff(&params).is_err());
493

            
494
            let mut x = SidechannelMitigation::new();
495
            // can receive XOFF after sending `xoff_limit` bytes
496
            x.sent_stream_data(xoff_limit as usize);
497
            assert!(x.received_xoff(&params).is_ok());
498
            // but cannot receive another XOFF immediately after
499
            assert!(x.received_xoff(&params).is_err());
500

            
501
            let mut x = SidechannelMitigation::new();
502
            // can receive XOFF after sending `xoff_limit` bytes
503
            x.sent_stream_data(xoff_limit as usize);
504
            assert!(x.received_xoff(&params).is_ok());
505
            // but cannot receive another XOFF even after sending another `xoff_limit` bytes
506
            x.sent_stream_data(xoff_limit as usize);
507
            assert!(x.received_xoff(&params).is_err());
508

            
509
            let mut x = SidechannelMitigation::new();
510
            // can receive XOFF after sending `xoff_limit` bytes
511
            x.sent_stream_data(xoff_limit as usize);
512
            assert!(x.received_xoff(&params).is_ok());
513
            // and can immediately receive an XON
514
            assert!(x.received_xon(&params).is_ok());
515
            // and can receive another XOFF after sending another `xoff_limit` bytes
516
            x.sent_stream_data(xoff_limit as usize);
517
            assert!(x.received_xoff(&params).is_ok());
518

            
519
            let mut x = SidechannelMitigation::new();
520
            // cannot receive XON after sending fewer than `xon_limit` bytes
521
            x.sent_stream_data(xon_limit as usize - 1);
522
            assert!(x.received_xon(&params).is_err());
523
        }
524
    }
525
}