1
//! A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
2
//!
3
//! This allows any `AsyncRead` that implements [`BufferIsEmpty`] to be used with XON/XOFF flow
4
//! control.
5

            
6
use std::io::Error;
7
use std::pin::Pin;
8
use std::task::{Context, Poll};
9

            
10
use futures::{AsyncRead, Stream};
11
use pin_project::pin_project;
12
use tor_basic_utils::assert_val_impl_trait;
13
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
14

            
15
use crate::stream::StreamTarget;
16
use crate::util::notify::NotifyReceiver;
17

            
18
/// A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
19
///
20
/// This reader will take care of communicating with the circuit reactor to handle XON/XOFF-related
21
/// events.
22
#[derive(Debug)]
23
#[pin_project]
24
pub(crate) struct XonXoffReader<R, T: DrainRateNotifier = StreamTarget> {
25
    /// How we communicate with the circuit reactor.
26
    #[pin]
27
    ctrl: XonXoffReaderCtrl<T>,
28
    /// The inner reader.
29
    #[pin]
30
    reader: R,
31
    /// Have we received a drain rate request notification from the reactor,
32
    /// but haven't yet sent a drain rate update back to the reactor?
33
    pending_drain_rate_update: bool,
34
}
35

            
36
impl<R, T: DrainRateNotifier> XonXoffReader<R, T> {
37
    /// Create a new [`XonXoffReader`].
38
    ///
39
    /// The reader must implement [`BufferIsEmpty`], which allows the `XonXoffReader` to check if
40
    /// the incoming stream buffer is empty or not.
41
132
    pub(crate) fn new(ctrl: XonXoffReaderCtrl<T>, reader: R) -> Self {
42
132
        Self {
43
132
            ctrl,
44
132
            reader,
45
132
            pending_drain_rate_update: false,
46
132
        }
47
132
    }
48

            
49
    /// Get a reference to the inner [`AsyncRead`].
50
    ///
51
    /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
52
    /// how you use the returned reader (for example if it uses interior mutability).
53
    pub(crate) fn inner(&self) -> &R {
54
        &self.reader
55
    }
56

            
57
    /// Get a mutable reference to the inner [`AsyncRead`].
58
    ///
59
    /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
60
    /// how you use the returned reader (for example if you read bytes directly).
61
180
    pub(crate) fn inner_mut(&mut self) -> &mut R {
62
180
        &mut self.reader
63
180
    }
64
}
65

            
66
impl<R: AsyncRead + BufferIsEmpty, T: DrainRateNotifier> AsyncRead for XonXoffReader<R, T> {
67
322
    fn poll_read(
68
322
        self: Pin<&mut Self>,
69
322
        cx: &mut Context<'_>,
70
322
        buf: &mut [u8],
71
322
    ) -> Poll<Result<usize, Error>> {
72
322
        let mut self_ = self.project();
73

            
74
        // ensure that `drain_rate_request_stream` is a `FusedStream`,
75
        // which means that we don't need to worry about calling `poll_next()` repeatedly
76
322
        assert_val_impl_trait!(
77
322
            self_.ctrl.drain_rate_request_stream,
78
            futures::stream::FusedStream,
79
        );
80

            
81
        // check if the circuit reactor has requested a drain rate update
82
322
        if let Poll::Ready(Some(())) = self_
83
322
            .ctrl
84
322
            .as_mut()
85
322
            .project()
86
322
            .drain_rate_request_stream
87
322
            .poll_next(cx)
88
12
        {
89
12
            // a drain rate update was requested, so we need to send a drain rate update once we
90
12
            // have no more bytes buffered
91
12
            *self_.pending_drain_rate_update = true;
92
310
        }
93

            
94
        // try reading from the inner reader
95
322
        let res = self_.reader.as_mut().poll_read(cx, buf);
96

            
97
        // if we need to send a drain rate update and the stream buffer is empty, inform the reactor
98
322
        if *self_.pending_drain_rate_update && self_.reader.is_empty() {
99
            // TODO(arti#534): in the future we want to do rate estimation, but for now we'll just
100
            // send an "unlimited" drain rate
101
8
            self_
102
8
                .ctrl
103
8
                .drain_rate_notifier
104
8
                .notify(XonKbpsEwma::Unlimited)?;
105
8
            *self_.pending_drain_rate_update = false;
106
314
        }
107

            
108
322
        res
109
322
    }
110
}
111

            
112
/// Something that sends drain rate updates to the flow control logic (the `XonXoffFlowCtrl`).
113
pub(crate) trait DrainRateNotifier {
114
    /// Send the drain rate update.
115
    fn notify(&mut self, rate: XonKbpsEwma) -> Result<(), Error>;
116
}
117

            
118
impl DrainRateNotifier for StreamTarget {
119
    fn notify(&mut self, rate: XonKbpsEwma) -> Result<(), Error> {
120
        self.drain_rate_update(rate).map_err(Into::into)
121
    }
122
}
123

            
124
/// The control structure for a stream that partakes in XON/XOFF flow control.
125
///
126
/// Used to construct an [`XonXoffReader`].
127
///
128
/// This contains a mechanism for us to be asked for our drain rate,
129
/// and a mechanism of sending the drain rate in response.
130
///
131
/// The `DrainRateNotifier` is typically a `StreamTarget`,
132
/// which sends the drain rate to the circuit reactor so that it can be sent in an XON message.
133
/// We make this a trait to make unit testing possible.
134
#[derive(Debug)]
135
#[pin_project]
136
pub(crate) struct XonXoffReaderCtrl<T: DrainRateNotifier = StreamTarget> {
137
    /// Receive notifications when the reactor requests a new drain rate.
138
    /// When we do, we should begin waiting for the receive buffer to clear.
139
    /// Then when the buffer clears, we should send a new drain rate update to the reactor.
140
    #[pin]
141
    drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
142
    /// An abstract handle to the reactor for this stream.
143
    /// This allows us to send drain rate updates to the circuit reactor.
144
    drain_rate_notifier: T,
145
}
146

            
147
impl<T: DrainRateNotifier> XonXoffReaderCtrl<T> {
148
    /// Create a new [`XonXoffReaderCtrl`].
149
    ///
150
    /// The `drain_rate_request_stream` informs us when we need to send our drain rate,
151
    /// and `drain_rate_notifier` allows us to send that drain rate.
152
144
    pub(crate) fn new(
153
144
        drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
154
144
        drain_rate_notifier: T,
155
144
    ) -> Self {
156
144
        Self {
157
144
            drain_rate_request_stream,
158
144
            drain_rate_notifier,
159
144
        }
160
144
    }
161
}
162

            
163
/// Used by the [`XonXoffReader`] to decide when to send a drain rate update
164
/// (typically resulting in an XON message).
165
pub(crate) trait BufferIsEmpty {
166
    /// Returns `true` if there are no incoming bytes buffered on this stream.
167
    ///
168
    /// This takes a `&mut` so that implementers can
169
    /// [`unobtrusive_peek()`](tor_async_utils::peekable_stream::UnobtrusivePeekableStream::unobtrusive_peek)
170
    /// a stream if necessary.
171
    fn is_empty(self: Pin<&mut Self>) -> bool;
172
}
173

            
174
/// A marker type for a [`NotifySender`](crate::util::notify::NotifySender)
175
/// indicating that notifications are for new drain rate requests.
176
#[derive(Debug)]
177
pub(crate) struct DrainRateRequest;
178

            
179
#[cfg(test)]
180
// This module (and `XonXoffReader`) are always available,
181
// but the flow control code logic that it uses requires the "flowctl-cc" feature.
182
#[cfg(feature = "flowctl-cc")]
183
// We use some tokio-specific types here to make the test easier to write.
184
#[cfg(feature = "tokio")]
185
mod test {
186
    // @@ begin test lint list maintained by maint/add_warning @@
187
    #![allow(clippy::bool_assert_comparison)]
188
    #![allow(clippy::clone_on_copy)]
189
    #![allow(clippy::dbg_macro)]
190
    #![allow(clippy::mixed_attributes_style)]
191
    #![allow(clippy::print_stderr)]
192
    #![allow(clippy::print_stdout)]
193
    #![allow(clippy::single_char_pattern)]
194
    #![allow(clippy::unwrap_used)]
195
    #![allow(clippy::unchecked_time_subtraction)]
196
    #![allow(clippy::useless_vec)]
197
    #![allow(clippy::needless_pass_by_value)]
198
    #![allow(clippy::string_slice)] // See arti#2571
199
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
200

            
201
    use super::*;
202

            
203
    use std::sync::Arc;
204
    use std::sync::atomic::{AtomicU64, Ordering};
205

            
206
    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
207
    use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamRateLimit};
208
    use crate::stream::flow_ctrl::xon_xoff::state::XonXoffFlowCtrl;
209
    use crate::util::notify::NotifySender;
210

            
211
    use futures::channel::mpsc::{self, TryRecvError};
212
    use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
213
    use tokio_crate::io::{DuplexStream, duplex};
214
    use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
215

            
216
    /// The type that will be stored by the [`XonXoffReader`] and used to send drain rate updates.
217
    ///
218
    /// This essentially mocks what the [`StreamTarget`] would do.
219
    struct TestingDrainRateUpdates(mpsc::UnboundedSender<XonKbpsEwma>);
220

            
221
    impl TestingDrainRateUpdates {
222
        pub(crate) fn new(sender: mpsc::UnboundedSender<XonKbpsEwma>) -> Self {
223
            Self(sender)
224
        }
225
    }
226

            
227
    impl DrainRateNotifier for TestingDrainRateUpdates {
228
        fn notify(&mut self, rate: XonKbpsEwma) -> Result<(), Error> {
229
            self.0.unbounded_send(rate).unwrap();
230
            Ok(())
231
        }
232
    }
233

            
234
    /// The writer for a data stream that tracks the length.
235
    #[pin_project::pin_project]
236
    struct WriterWithLength<W> {
237
        #[pin]
238
        writer: W,
239
        length: Arc<AtomicU64>,
240
    }
241

            
242
    /// The reader for a data stream that tracks the length.
243
    #[pin_project::pin_project]
244
    struct ReaderWithLength<R> {
245
        #[pin]
246
        reader: R,
247
        length: Arc<AtomicU64>,
248
    }
249

            
250
    /// Wraps a writer and reader to track the queue length.
251
    fn with_length<W, R>(writer: W, reader: R) -> (WriterWithLength<W>, ReaderWithLength<R>) {
252
        let length = Arc::new(AtomicU64::new(0));
253

            
254
        let writer = WriterWithLength {
255
            writer,
256
            length: Arc::clone(&length),
257
        };
258
        let reader = ReaderWithLength { reader, length };
259

            
260
        (writer, reader)
261
    }
262

            
263
    impl<W> WriterWithLength<W> {
264
        /// Amount of bytes queued.
265
        pub(crate) fn len(&self) -> u64 {
266
            self.length.load(Ordering::Acquire)
267
        }
268
    }
269

            
270
    impl<R> BufferIsEmpty for ReaderWithLength<R> {
271
        fn is_empty(self: Pin<&mut Self>) -> bool {
272
            self.length.load(Ordering::Acquire) == 0
273
        }
274
    }
275

            
276
    impl<W: AsyncWrite> AsyncWrite for WriterWithLength<W> {
277
        fn poll_write(
278
            self: Pin<&mut Self>,
279
            cx: &mut Context<'_>,
280
            buf: &[u8],
281
        ) -> Poll<std::io::Result<usize>> {
282
            let self_ = self.project();
283

            
284
            let rv = self_.writer.poll_write(cx, buf);
285

            
286
            // NOTE: There's a race condition here since we don't write to the writer and update the
287
            // length as one atomic operation.
288
            // But this is good enough for our test where the mock runtime is deterministic and
289
            // single-threaded.
290
            //
291
            // We ignore the possibility of overflowing the 64-bit integer here.
292
            if let Poll::Ready(Ok(len)) = rv {
293
                let len: u64 = len.try_into().expect("usize should fit into u64");
294
                // The effect of `poll_write()` above will be visible after another thread checks
295
                // the length with `load(Acquire)`.
296
                self_.length.fetch_add(len, Ordering::Release);
297
            }
298

            
299
            rv
300
        }
301

            
302
        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
303
            self.project().writer.poll_flush(cx)
304
        }
305

            
306
        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
307
            self.project().writer.poll_close(cx)
308
        }
309
    }
310

            
311
    impl<R: AsyncRead> AsyncRead for ReaderWithLength<R> {
312
        fn poll_read(
313
            self: Pin<&mut Self>,
314
            cx: &mut Context<'_>,
315
            buf: &mut [u8],
316
        ) -> Poll<std::io::Result<usize>> {
317
            let self_ = self.project();
318

            
319
            let rv = self_.reader.poll_read(cx, buf);
320

            
321
            // NOTE: There's a race condition here since we don't read from the reader and update
322
            // the length as one atomic operation.
323
            // But this is good enough for our test where the mock runtime is deterministic and
324
            // single-threaded.
325
            //
326
            // We ignore the possibility of underflowing the integer here.
327
            if let Poll::Ready(Ok(len)) = rv {
328
                let len: u64 = len.try_into().expect("usize should fit into u64");
329
                // The effect of `poll_read()` above will be visible after another thread checks
330
                // the length with `load(Acquire)`.
331
                self_.length.fetch_sub(len, Ordering::Release);
332
            }
333

            
334
            rv
335
        }
336
    }
337

            
338
    /// Set up all of the flow control stuff needed to test the [`XonXoffReader`].
339
    ///
340
    /// Returns:
341
    ///
342
    /// 1. The stream writer (as would be held by the circuit/stream reactor).
343
    /// 2. The stream reader (as would be held in a user-facing `DataStream`).
344
    /// 3. An MPSC receiver of drain rate updates.
345
    /// 4. The flow control logic.
346
    #[allow(clippy::type_complexity)]
347
    fn init_flow_ctrl(
348
        use_sidechannel_mitigations: bool,
349
    ) -> (
350
        WriterWithLength<Compat<DuplexStream>>,
351
        XonXoffReader<ReaderWithLength<Compat<DuplexStream>>, TestingDrainRateUpdates>,
352
        mpsc::UnboundedReceiver<XonKbpsEwma>,
353
        XonXoffFlowCtrl,
354
    ) {
355
        let params = FlowCtrlParameters::defaults_for_tests();
356

            
357
        // For the flow control logic to send rate limit changes to the stream writer.
358
        // We don't use this in this test, but the `XonXoffFlowCtrl` needs the tx side.
359
        let (rate_limit_tx, _rate_limit_rx) = postage::watch::channel_with(StreamRateLimit::MAX);
360

            
361
        // For the flow control logic to request a new drain rate update from the stream reader.
362
        let mut drain_rate_request_tx = NotifySender::new_typed();
363
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
364

            
365
        // The flow control logic.
366
        let flow_ctrl = XonXoffFlowCtrl::new(
367
            Arc::new(params),
368
            use_sidechannel_mitigations,
369
            rate_limit_tx,
370
            drain_rate_request_tx,
371
        );
372

            
373
        // For the `XonXoffReader` to send a drain rate update.
374
        let (drain_rate_sender, drain_rate_receiver) = mpsc::unbounded();
375
        let drain_rate_updates = TestingDrainRateUpdates::new(drain_rate_sender);
376

            
377
        // All of the information needed to build a `XonXoffReader`.
378
        let reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, drain_rate_updates);
379

            
380
        // This is the stream queue for incoming data.
381
        // So the `reader` is the stream reader and the `writer` would be within the reactor.
382
        //
383
        // In arti this stream should be unbounded, so here we use a max size of `usize::MAX`.
384
        let (writer, reader) = duplex(/* max_buf_size= */ usize::MAX);
385
        let writer = writer.compat_write();
386
        let reader = reader.compat();
387

            
388
        // Make the reader+writer pair track the length of the buffer so that it can support
389
        // `BufferIsEmpty`.
390
        let (writer, reader) = with_length(writer, reader);
391

            
392
        // The reader for incoming stream data, with XON/XOFF support.
393
        let reader = XonXoffReader::new(reader_ctrl, reader);
394

            
395
        (writer, reader, drain_rate_receiver, flow_ctrl)
396
    }
397

            
398
    /// Buffer `num_bytes` as if the bytes arrived on the stream.
399
    ///
400
    /// Returns whether the flow control logic wanted to send an XOFF.
401
    async fn buffer_incoming_data(
402
        writer: &mut WriterWithLength<impl AsyncWrite + Unpin>,
403
        mut num_bytes: usize,
404
        flow_ctrl: &mut XonXoffFlowCtrl,
405
    ) -> bool {
406
        let mut wants_to_send_xoff = false;
407

            
408
        // Write the requested number of bytes.
409
        while num_bytes > 0 {
410
            // Write 100_000 bytes at a time.
411
            let buf_size = num_bytes.min(100_000);
412
            writer.write_all(&vec![0; buf_size]).await.unwrap();
413
            num_bytes -= buf_size;
414

            
415
            // Inform the flow control logic.
416
            let xoff = flow_ctrl.maybe_send_xoff(writer.len() as usize).unwrap();
417
            wants_to_send_xoff |= xoff.is_some();
418
        }
419

            
420
        wants_to_send_xoff
421
    }
422

            
423
    /// Read `num_bytes` from the stream.
424
    async fn read_incoming_data(mut reader: impl AsyncRead + Unpin, mut num_bytes: usize) {
425
        // Read the requested number of bytes.
426
        while num_bytes > 0 {
427
            // Read 100_000 bytes at a time.
428
            let buf_size = num_bytes.min(100_000);
429
            reader.read_exact(&mut vec![0; buf_size]).await.unwrap();
430
            num_bytes -= buf_size;
431
        }
432
    }
433

            
434
    /// This test is meant to test the drain rate update.
435
    /// It adds a lot of data to the stream queue so that it triggers sending an XOFF
436
    /// and sends a drain rate request to the [`XonXoffReader`],
437
    /// then it reads from the stream until it's empty
438
    /// and the `XonXoffReader` sends a drain rate update.
439
    /// The flow control logic receives the drain rate update and sends an XON.
440
    #[test]
441
    fn drain_rate_update() {
442
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
443
            // This is the stream queue for incoming data.
444
            // So the `reader` is the stream reader and the `writer` would be within the reactor.
445
            let (mut writer, mut reader, mut drain_rate_receiver, mut flow_ctrl) =
446
                init_flow_ctrl(/* use_sidechannel_mitigations= */ true);
447

            
448
            // Data has arrived on the stream.
449
            // We always consider sending an XOFF when a stream has received data.
450
            // The amount of incoming data wasn't very large,
451
            // so we don't expect that it would actually want to send an XOFF.
452
            let wants_to_send_xoff =
453
                buffer_incoming_data(&mut writer, 10_000, &mut flow_ctrl).await;
454
            assert!(!wants_to_send_xoff);
455

            
456
            // We didn't want to send an XOFF,
457
            // so the stream reader will never have been asked for a drain rate update.
458
            assert!(!reader.pending_drain_rate_update);
459

            
460
            // The stream reader reads all of the incoming data.
461
            read_incoming_data(&mut reader, 10_000).await;
462

            
463
            // Check `pending_drain_rate_update` again,
464
            // and also ensure that we didn't send a drain rate update.
465
            assert!(!reader.pending_drain_rate_update);
466
            assert_eq!(drain_rate_receiver.try_recv(), Err(TryRecvError::Empty));
467

            
468
            // Data has arrived on the stream.
469
            // We always consider sending an XOFF when a stream has received data.
470
            // The amount of incoming data was large,
471
            // so we expect that it would want to send an XOFF.
472
            let wants_to_send_xoff =
473
                buffer_incoming_data(&mut writer, 800_000, &mut flow_ctrl).await;
474
            assert!(wants_to_send_xoff);
475

            
476
            // The above code should have sent an XOFF and asked the reader for a drain rate update,
477
            // but the reader hasn't realized this yet.
478
            assert!(!reader.pending_drain_rate_update);
479
            assert_eq!(drain_rate_receiver.try_recv(), Err(TryRecvError::Empty));
480

            
481
            // The reader won't realize it was asked for a drain rate update until after it's tried
482
            // reading once.
483
            let _ = reader.read(&mut [0; 0]).await.unwrap();
484
            assert!(reader.pending_drain_rate_update);
485

            
486
            // The drain rate update is only sent once we've drained the buffer,
487
            // so an update should not have been sent yet.
488
            assert_eq!(drain_rate_receiver.try_recv(), Err(TryRecvError::Empty));
489

            
490
            // Read most (but not all) of the data on the stream.
491
            read_incoming_data(&mut reader, 700_000).await;
492

            
493
            // We haven't read *all* of the data,
494
            // so should still not have sent a drain rate update.
495
            assert!(!Pin::new(reader.inner_mut()).is_empty());
496
            assert!(reader.pending_drain_rate_update);
497
            assert_eq!(drain_rate_receiver.try_recv(), Err(TryRecvError::Empty));
498

            
499
            // Read the last of the data on the stream.
500
            read_incoming_data(&mut reader, 100_000).await;
501

            
502
            // Now that the buffer is empty,
503
            // we should have sent a drain rate update.
504
            assert!(Pin::new(reader.inner_mut()).is_empty());
505
            assert!(!reader.pending_drain_rate_update);
506
            let xon_rate = drain_rate_receiver.try_recv().unwrap();
507
            assert_eq!(xon_rate, XonKbpsEwma::Unlimited);
508

            
509
            // The buffer is still empty,
510
            // so the flow control logic should want to send an XON.
511
            let xon = flow_ctrl
512
                .maybe_send_xon(xon_rate, writer.len() as usize)
513
                .unwrap()
514
                .unwrap();
515
            assert_eq!(xon.kbps_ewma(), xon_rate);
516
        });
517
    }
518

            
519
    /// Like the `drain_rate_update()` test,
520
    /// this test causes the `XonXoffReader` to send a drain rate update.
521
    /// But in this case the buffer refills again past the high-water mark
522
    /// before the drain rate update can be processed by the flow control logic,
523
    /// so it *does not* send an XON.
524
    /// Instead it re-requests a drain rate from the `XonXoffReader`.
525
    #[test]
526
    fn drain_rate_update_then_buffer_refill() {
527
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
528
            // This is the stream queue for incoming data.
529
            // So the `reader` is the stream reader and the `writer` would be within the reactor.
530
            let (mut writer, mut reader, mut drain_rate_receiver, mut flow_ctrl) =
531
                init_flow_ctrl(/* use_sidechannel_mitigations= */ true);
532

            
533
            // Data has arrived on the stream.
534
            // We always consider sending an XOFF when a stream has received data.
535
            // The amount of incoming data was large,
536
            // so we expect that it would want to send an XOFF.
537
            let wants_to_send_xoff =
538
                buffer_incoming_data(&mut writer, 800_000, &mut flow_ctrl).await;
539
            assert!(wants_to_send_xoff);
540

            
541
            // Read all of the data on the stream.
542
            read_incoming_data(&mut reader, 700_000).await;
543
            assert!(reader.pending_drain_rate_update);
544
            read_incoming_data(&mut reader, 100_000).await;
545

            
546
            // Now that the buffer is empty,
547
            // we should have sent a drain rate update.
548
            assert!(Pin::new(reader.inner_mut()).is_empty());
549
            assert!(!reader.pending_drain_rate_update);
550

            
551
            // Before this drain rate update can make it to the
552
            // flow control logic with `maybe_send_xon()`,
553
            // the buffer fills again past the high-water mark.
554
            let wants_to_send_xoff =
555
                buffer_incoming_data(&mut writer, 800_000, &mut flow_ctrl).await;
556
            assert!(!wants_to_send_xoff);
557

            
558
            // Now the drain rate update makes it to the flow control logic.
559
            // Since the buffer is past the high-water mark,
560
            // we won't want to send an XON.
561
            let xon_rate = drain_rate_receiver.try_recv().unwrap();
562
            assert_eq!(xon_rate, XonKbpsEwma::Unlimited);
563
            let xon = flow_ctrl
564
                .maybe_send_xon(xon_rate, writer.len() as usize)
565
                .unwrap();
566
            assert!(xon.is_none());
567

            
568
            // Instead the reader will have been asked for a drain rate update again,
569
            // which restarts the entire process.
570
            assert!(!reader.pending_drain_rate_update);
571
            let _ = reader.read(&mut [0; 0]).await.unwrap();
572
            assert!(reader.pending_drain_rate_update);
573
        });
574
    }
575
}