1
//! Code for implementing flow control (stream-level).
2

            
3
use std::sync::Arc;
4

            
5
use postage::watch;
6
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
7
use tor_cell::relaycell::msg::AnyRelayMsg;
8
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
9

            
10
use super::params::FlowCtrlParameters;
11
use super::window::state::{HalfStreamWindowFlowCtrl, WindowFlowCtrl};
12
use super::xon_xoff::reader::DrainRateRequest;
13
#[cfg(feature = "flowctl-cc")]
14
use super::xon_xoff::state::{HalfStreamXonXoffFlowCtrl, XonXoffFlowCtrl};
15

            
16
use crate::Result;
17
use crate::congestion::sendme;
18
use crate::util::notify::NotifySender;
19

            
20
/// Private internals of [`StreamFlowCtrl`].
21
#[enum_dispatch::enum_dispatch]
22
#[derive(Debug)]
23
enum StreamFlowCtrlInner {
24
    /// "legacy" sendme-window-based flow control.
25
    Window(WindowFlowCtrl),
26
    /// XON/XOFF flow control.
27
    #[cfg(feature = "flowctl-cc")]
28
    XonXoff(XonXoffFlowCtrl),
29
}
30

            
31
/// Manages the circuit reactor's flow control for a stream.
32
///
33
/// Note that the flow control logic can be distributed across multiple parts of Arti.
34
/// For example some flow control logic will exist in the circuit reactor,
35
/// but other logic will exist in the stream's `DataStream`.
36
/// So this doesn't include all flow control logic.
37
#[derive(Debug)]
38
pub(crate) struct StreamFlowCtrl {
39
    /// Private internal enum.
40
    inner: StreamFlowCtrlInner,
41
}
42

            
43
impl StreamFlowCtrl {
44
    /// Returns a new sendme-window-based [`StreamFlowCtrl`].
45
386
    pub(crate) fn new_window(window: sendme::StreamSendWindow) -> Self {
46
386
        Self {
47
386
            inner: StreamFlowCtrlInner::Window(WindowFlowCtrl::new(window)),
48
386
        }
49
386
    }
50

            
51
    /// Returns a new xon/xoff-based [`StreamFlowCtrl`].
52
    #[cfg(feature = "flowctl-cc")]
53
16
    pub(crate) fn new_xon_xoff(
54
16
        params: Arc<FlowCtrlParameters>,
55
16
        use_sidechannel_mitigations: bool,
56
16
        rate_limit_updater: watch::Sender<StreamRateLimit>,
57
16
        drain_rate_requester: NotifySender<DrainRateRequest>,
58
16
    ) -> Self {
59
16
        Self {
60
16
            inner: StreamFlowCtrlInner::XonXoff(XonXoffFlowCtrl::new(
61
16
                params,
62
16
                use_sidechannel_mitigations,
63
16
                rate_limit_updater,
64
16
                drain_rate_requester,
65
16
            )),
66
16
        }
67
16
    }
68

            
69
    /// Once this end of the stream is closed and the stream becomes a
70
    /// half-stream (`HalfStream`),
71
    /// this method will turn the flow control object into a version
72
    /// that is designed to be used for half-streams.
73
80
    pub(crate) fn half_stream(self) -> HalfStreamFlowCtrl {
74
80
        let inner = match self.inner {
75
76
            StreamFlowCtrlInner::Window(x) => {
76
76
                HalfStreamFlowCtrlInner::Window(HalfStreamWindowFlowCtrl::new(x))
77
            }
78
            #[cfg(feature = "flowctl-cc")]
79
4
            StreamFlowCtrlInner::XonXoff(x) => {
80
4
                HalfStreamFlowCtrlInner::XonXoff(HalfStreamXonXoffFlowCtrl::new(x))
81
            }
82
        };
83

            
84
80
        HalfStreamFlowCtrl { inner }
85
80
    }
86
}
87

            
88
// forward all trait methods to the inner enum
89
impl FlowCtrlHooks for StreamFlowCtrl {
90
20924
    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
91
20924
        self.inner.can_send(msg)
92
20924
    }
93

            
94
4156
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
95
4156
        self.inner.about_to_send(msg)
96
4156
    }
97

            
98
4
    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
99
4
        self.inner.put_for_incoming_sendme(msg)
100
4
    }
101

            
102
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
103
        self.inner.handle_incoming_xon(msg)
104
    }
105

            
106
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
107
        self.inner.handle_incoming_xoff(msg)
108
    }
109

            
110
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
111
        self.inner.maybe_send_xon(rate, buffer_len)
112
    }
113

            
114
68
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
115
68
        self.inner.maybe_send_xoff(buffer_len)
116
68
    }
117

            
118
136
    fn inbound_queue_max_len(&self) -> usize {
119
136
        self.inner.inbound_queue_max_len()
120
136
    }
121
}
122

            
123
/// Methods that can be called on a [`StreamFlowCtrl`].
124
///
125
/// We use a trait so that we can use `enum_dispatch` on the inner [`StreamFlowCtrlInner`] enum.
126
#[enum_dispatch::enum_dispatch(StreamFlowCtrlInner)]
127
pub(crate) trait FlowCtrlHooks {
128
    /// Whether this stream is ready to send `msg`.
129
    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool;
130

            
131
    /// Inform the flow control code that we're about to send `msg`.
132
    /// Returns an error if the message should not be sent,
133
    /// and the circuit should be closed.
134
    // TODO: Consider having this method wrap the message in a type that
135
    // "proves" we've applied flow control. This would make it easier to apply
136
    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
137
    // ambiguity in the sending function as to whether flow control has already
138
    // been applied or not.
139
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()>;
140

            
141
    /// Handle an incoming sendme.
142
    ///
143
    /// On success, return the number of cells left in the window.
144
    ///
145
    /// On failure, return an error: the caller should close the stream or
146
    /// circuit with a protocol error.
147
    ///
148
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
149
    /// correct type of flow control.
150
    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
151

            
152
    /// Handle an incoming XON message.
153
    ///
154
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
155
    /// correct type of flow control.
156
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
157

            
158
    /// Handle an incoming XOFF message.
159
    ///
160
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
161
    /// correct type of flow control.
162
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
163

            
164
    /// Check if we should send an XON message.
165
    ///
166
    /// If we should, then returns the XON message that should be sent.
167
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
168
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>>;
169

            
170
    /// Check if we should send an XOFF message.
171
    ///
172
    /// If we should, then returns the XOFF message that should be sent.
173
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
174
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>>;
175

            
176
    /// The max queue length that should be used for stream messages incoming from the Tor network.
177
    ///
178
    /// This is the queue length between the user-facing stream reader (`DataReader`)
179
    /// and the circuit reactor.
180
    ///
181
    /// If the queue would ever exceed this many messages, the stream should be closed.
182
    fn inbound_queue_max_len(&self) -> usize;
183
}
184

            
185
/// Manages flow control for a half-stream (`HalfStream`).
186
#[derive(Debug)]
187
pub(crate) struct HalfStreamFlowCtrl {
188
    /// Private internal enum.
189
    inner: HalfStreamFlowCtrlInner,
190
}
191

            
192
/// Private internals of [`HalfStreamFlowCtrl`].
193
#[enum_dispatch::enum_dispatch]
194
#[derive(Debug)]
195
enum HalfStreamFlowCtrlInner {
196
    /// "legacy" sendme-window-based flow control.
197
    Window(HalfStreamWindowFlowCtrl),
198
    /// XON/XOFF flow control.
199
    #[cfg(feature = "flowctl-cc")]
200
    XonXoff(HalfStreamXonXoffFlowCtrl),
201
}
202

            
203
/// Methods that can be called on a [`HalfStreamFlowCtrl`].
204
///
205
/// We use a trait so that we can use `enum_dispatch` on the inner [`HalfStreamFlowCtrlInner`] enum.
206
/// While this may seem unnecessary since this trait currently only has two methods,
207
/// it's consistent with the [`FlowCtrlHooks`] trait above.
208
#[enum_dispatch::enum_dispatch(HalfStreamFlowCtrlInner)]
209
pub(crate) trait HalfStreamFlowCtrlHooks {
210
    /// Handle some number of dropped stream messages.
211
    ///
212
    /// We don't know what kinds of stream messages were dropped, only the number of them.
213
    ///
214
    /// This method exists because currently the stream entry may drop some incoming stream
215
    /// messages and they would never be processed by this flow control object otherwise.
216
    fn handle_incoming_dropped(&mut self, msg_count: u16) -> Result<()>;
217

            
218
    /// Handle an incoming message.
219
    ///
220
    /// If it's a flow control message, it will be consumed and `None` will be returned.
221
    /// Otherwise the original message will be returned.
222
    ///
223
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
224
    /// correct type of flow control.
225
    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>>;
226
}
227

            
228
// forward all trait methods to the inner enum
229
impl HalfStreamFlowCtrlHooks for HalfStreamFlowCtrl {
230
70
    fn handle_incoming_dropped(&mut self, msg_count: u16) -> Result<()> {
231
70
        self.inner.handle_incoming_dropped(msg_count)
232
70
    }
233

            
234
1020
    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>> {
235
1020
        self.inner.handle_incoming_msg(msg)
236
1020
    }
237
}
238

            
239
/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
240
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
241
pub(crate) struct StreamRateLimit {
242
    /// The rate in bytes/s.
243
    rate: u64,
244
}
245

            
246
impl StreamRateLimit {
247
    /// A maximum rate limit.
248
    pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
249

            
250
    /// A rate limit of 0.
251
    pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
252

            
253
    /// A new [`StreamRateLimit`] with `rate` bytes/s.
254
    pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
255
        Self { rate }
256
    }
257

            
258
    /// The rate in bytes/s.
259
184
    pub(crate) const fn bytes_per_sec(&self) -> u64 {
260
184
        self.rate
261
184
    }
262
}
263

            
264
impl std::fmt::Display for StreamRateLimit {
265
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266
        write!(f, "{} bytes/s", self.rate)
267
    }
268
}