1
//! A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
2
//!
3
//! This allows any `AsyncRead` that implements [`BufferIsEmpty`] to be used with XON/XOFF flow
4
//! control.
5

            
6
use std::io::Error;
7
use std::pin::Pin;
8
use std::task::{Context, Poll};
9

            
10
use futures::{AsyncRead, Stream};
11
use pin_project::pin_project;
12
use tor_basic_utils::assert_val_impl_trait;
13
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
14

            
15
use crate::stream::StreamTarget;
16
use crate::util::notify::NotifyReceiver;
17

            
18
/// A wrapper for an [`AsyncRead`] to support XON/XOFF flow control.
19
///
20
/// This reader will take care of communicating with the circuit reactor to handle XON/XOFF-related
21
/// events.
22
#[derive(Debug)]
23
#[pin_project]
24
pub(crate) struct XonXoffReader<R> {
25
    /// How we communicate with the circuit reactor.
26
    #[pin]
27
    ctrl: XonXoffReaderCtrl,
28
    /// The inner reader.
29
    #[pin]
30
    reader: R,
31
    /// Have we received a drain rate request notification from the reactor,
32
    /// but haven't yet sent a drain rate update back to the reactor?
33
    pending_drain_rate_update: bool,
34
}
35

            
36
impl<R> XonXoffReader<R> {
37
    /// Create a new [`XonXoffReader`].
38
    ///
39
    /// The reader must implement [`BufferIsEmpty`], which allows the `XonXoffReader` to check if
40
    /// the incoming stream buffer is empty or not.
41
120
    pub(crate) fn new(ctrl: XonXoffReaderCtrl, reader: R) -> Self {
42
120
        Self {
43
120
            ctrl,
44
120
            reader,
45
120
            pending_drain_rate_update: false,
46
120
        }
47
120
    }
48

            
49
    /// Get a reference to the inner [`AsyncRead`].
50
    ///
51
    /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
52
    /// how you use the returned reader (for example if it uses interior mutability).
53
    pub(crate) fn inner(&self) -> &R {
54
        &self.reader
55
    }
56

            
57
    /// Get a mutable reference to the inner [`AsyncRead`].
58
    ///
59
    /// NOTE: This will bypass the [`XonXoffReader`] and may cause incorrect behaviour depending on
60
    /// how you use the returned reader (for example if you read bytes directly).
61
168
    pub(crate) fn inner_mut(&mut self) -> &mut R {
62
168
        &mut self.reader
63
168
    }
64
}
65

            
66
impl<R: AsyncRead + BufferIsEmpty> AsyncRead for XonXoffReader<R> {
67
234
    fn poll_read(
68
234
        self: Pin<&mut Self>,
69
234
        cx: &mut Context<'_>,
70
234
        buf: &mut [u8],
71
234
    ) -> Poll<Result<usize, Error>> {
72
234
        let mut self_ = self.project();
73

            
74
        // ensure that `drain_rate_request_stream` is a `FusedStream`,
75
        // which means that we don't need to worry about calling `poll_next()` repeatedly
76
234
        assert_val_impl_trait!(
77
234
            self_.ctrl.drain_rate_request_stream,
78
            futures::stream::FusedStream,
79
        );
80

            
81
        // check if the circuit reactor has requested a drain rate update
82
234
        if let Poll::Ready(Some(())) = self_
83
234
            .ctrl
84
234
            .as_mut()
85
234
            .project()
86
234
            .drain_rate_request_stream
87
234
            .poll_next(cx)
88
        {
89
            // a drain rate update was requested, so we need to send a drain rate update once we
90
            // have no more bytes buffered
91
            *self_.pending_drain_rate_update = true;
92
234
        }
93

            
94
        // try reading from the inner reader
95
234
        let res = self_.reader.as_mut().poll_read(cx, buf);
96

            
97
        // if we need to send a drain rate update and the stream buffer is empty, inform the reactor
98
234
        if *self_.pending_drain_rate_update && self_.reader.is_empty() {
99
            // TODO(arti#534): in the future we want to do rate estimation, but for now we'll just
100
            // send an "unlimited" drain rate
101
            self_
102
                .ctrl
103
                .stream_target
104
                .drain_rate_update(XonKbpsEwma::Unlimited)?;
105
            *self_.pending_drain_rate_update = false;
106
234
        }
107

            
108
234
        res
109
234
    }
110
}
111

            
112
/// The control structure for a stream that partakes in XON/XOFF flow control.
113
#[derive(Debug)]
114
#[pin_project]
115
pub(crate) struct XonXoffReaderCtrl {
116
    /// Receive notifications when the reactor requests a new drain rate.
117
    /// When we do, we should begin waiting for the receive buffer to clear.
118
    /// Then when the buffer clears, we should send a new drain rate update to the reactor.
119
    #[pin]
120
    drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
121
    /// A handle to the reactor for this stream.
122
    /// This allows us to send drain rate updates to the circuit reactor.
123
    stream_target: StreamTarget,
124
}
125

            
126
impl XonXoffReaderCtrl {
127
    /// Create a new [`XonXoffReaderCtrl`].
128
132
    pub(crate) fn new(
129
132
        drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
130
132
        stream_target: StreamTarget,
131
132
    ) -> Self {
132
132
        Self {
133
132
            drain_rate_request_stream,
134
132
            stream_target,
135
132
        }
136
132
    }
137
}
138

            
139
/// Used by the [`XonXoffReader`] to decide when to send a drain rate update
140
/// (typically resulting in an XON message).
141
pub(crate) trait BufferIsEmpty {
142
    /// Returns `true` if there are no incoming bytes buffered on this stream.
143
    ///
144
    /// This takes a `&mut` so that implementers can
145
    /// [`unobtrusive_peek()`](tor_async_utils::peekable_stream::UnobtrusivePeekableStream::unobtrusive_peek)
146
    /// a stream if necessary.
147
    fn is_empty(self: Pin<&mut Self>) -> bool;
148
}
149

            
150
/// A marker type for a [`NotifySender`](crate::util::notify::NotifySender)
151
/// indicating that notifications are for new drain rate requests.
152
#[derive(Debug)]
153
pub(crate) struct DrainRateRequest;