1
//! Code for implementing flow control (stream-level).
2

            
3
use std::sync::Arc;
4

            
5
use postage::watch;
6
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
7
use tor_cell::relaycell::msg::AnyRelayMsg;
8
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
9

            
10
use super::params::FlowCtrlParameters;
11
use super::window::state::{HalfStreamWindowFlowCtrl, WindowFlowCtrl};
12
use super::xon_xoff::reader::DrainRateRequest;
13
#[cfg(feature = "flowctl-cc")]
14
use super::xon_xoff::state::{HalfStreamXonXoffFlowCtrl, XonXoffFlowCtrl};
15

            
16
use crate::Result;
17
use crate::congestion::sendme;
18
use crate::util::notify::NotifySender;
19

            
20
/// Private internals of [`StreamFlowCtrl`].
21
#[enum_dispatch::enum_dispatch]
22
#[derive(Debug)]
23
enum StreamFlowCtrlInner {
24
    /// "legacy" sendme-window-based flow control.
25
    WindowBased(WindowFlowCtrl),
26
    /// XON/XOFF flow control.
27
    #[cfg(feature = "flowctl-cc")]
28
    XonXoffBased(XonXoffFlowCtrl),
29
}
30

            
31
/// Manages the circuit reactor's flow control for a stream.
32
///
33
/// Note that the flow control logic can be distributed across multiple parts of Arti.
34
/// For example some flow control logic will exist in the circuit reactor,
35
/// but other logic will exist in the stream's `DataStream`.
36
/// So this doesn't include all flow control logic.
37
#[derive(Debug)]
38
pub(crate) struct StreamFlowCtrl {
39
    /// Private internal enum.
40
    inner: StreamFlowCtrlInner,
41
}
42

            
43
impl StreamFlowCtrl {
44
    /// Returns a new sendme-window-based [`StreamFlowCtrl`].
45
386
    pub(crate) fn new_window(window: sendme::StreamSendWindow) -> Self {
46
386
        Self {
47
386
            inner: StreamFlowCtrlInner::WindowBased(WindowFlowCtrl::new(window)),
48
386
        }
49
386
    }
50

            
51
    /// Returns a new xon/xoff-based [`StreamFlowCtrl`].
52
    #[cfg(feature = "flowctl-cc")]
53
12
    pub(crate) fn new_xon_xoff(
54
12
        params: Arc<FlowCtrlParameters>,
55
12
        use_sidechannel_mitigations: bool,
56
12
        rate_limit_updater: watch::Sender<StreamRateLimit>,
57
12
        drain_rate_requester: NotifySender<DrainRateRequest>,
58
12
    ) -> Self {
59
12
        Self {
60
12
            inner: StreamFlowCtrlInner::XonXoffBased(XonXoffFlowCtrl::new(
61
12
                params,
62
12
                use_sidechannel_mitigations,
63
12
                rate_limit_updater,
64
12
                drain_rate_requester,
65
12
            )),
66
12
        }
67
12
    }
68

            
69
    /// Once this end of the stream is closed and the stream becomes a
70
    /// half-stream (`HalfStream`),
71
    /// this method will turn the flow control object into a version
72
    /// that is designed to be used for half-streams.
73
76
    pub(crate) fn half_stream(self) -> HalfStreamFlowCtrl {
74
76
        let inner = match self.inner {
75
76
            StreamFlowCtrlInner::WindowBased(x) => {
76
76
                HalfStreamFlowCtrlInner::WindowBased(HalfStreamWindowFlowCtrl::new(x))
77
            }
78
            #[cfg(feature = "flowctl-cc")]
79
            StreamFlowCtrlInner::XonXoffBased(x) => {
80
                HalfStreamFlowCtrlInner::XonXoffBased(HalfStreamXonXoffFlowCtrl::new(x))
81
            }
82
        };
83

            
84
76
        HalfStreamFlowCtrl { inner }
85
76
    }
86
}
87

            
88
// forward all trait methods to the inner enum
89
impl FlowCtrlHooks for StreamFlowCtrl {
90
20908
    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
91
20908
        self.inner.can_send(msg)
92
20908
    }
93

            
94
4156
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
95
4156
        self.inner.about_to_send(msg)
96
4156
    }
97

            
98
4
    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
99
4
        self.inner.put_for_incoming_sendme(msg)
100
4
    }
101

            
102
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
103
        self.inner.handle_incoming_xon(msg)
104
    }
105

            
106
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
107
        self.inner.handle_incoming_xoff(msg)
108
    }
109

            
110
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
111
        self.inner.maybe_send_xon(rate, buffer_len)
112
    }
113

            
114
64
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
115
64
        self.inner.maybe_send_xoff(buffer_len)
116
64
    }
117
}
118

            
119
/// Methods that can be called on a [`StreamFlowCtrl`].
120
///
121
/// We use a trait so that we can use `enum_dispatch` on the inner [`StreamFlowCtrlInner`] enum.
122
#[enum_dispatch::enum_dispatch(StreamFlowCtrlInner)]
123
pub(crate) trait FlowCtrlHooks {
124
    /// Whether this stream is ready to send `msg`.
125
    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool;
126

            
127
    /// Inform the flow control code that we're about to send `msg`.
128
    /// Returns an error if the message should not be sent,
129
    /// and the circuit should be closed.
130
    // TODO: Consider having this method wrap the message in a type that
131
    // "proves" we've applied flow control. This would make it easier to apply
132
    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
133
    // ambiguity in the sending function as to whether flow control has already
134
    // been applied or not.
135
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()>;
136

            
137
    /// Handle an incoming sendme.
138
    ///
139
    /// On success, return the number of cells left in the window.
140
    ///
141
    /// On failure, return an error: the caller should close the stream or
142
    /// circuit with a protocol error.
143
    ///
144
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
145
    /// correct type of flow control.
146
    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
147

            
148
    /// Handle an incoming XON message.
149
    ///
150
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
151
    /// correct type of flow control.
152
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
153

            
154
    /// Handle an incoming XOFF message.
155
    ///
156
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
157
    /// correct type of flow control.
158
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
159

            
160
    /// Check if we should send an XON message.
161
    ///
162
    /// If we should, then returns the XON message that should be sent.
163
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
164
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>>;
165

            
166
    /// Check if we should send an XOFF message.
167
    ///
168
    /// If we should, then returns the XOFF message that should be sent.
169
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
170
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>>;
171
}
172

            
173
/// Manages flow control for a half-stream (`HalfStream`).
174
#[derive(Debug)]
175
pub(crate) struct HalfStreamFlowCtrl {
176
    /// Private internal enum.
177
    inner: HalfStreamFlowCtrlInner,
178
}
179

            
180
/// Private internals of [`HalfStreamFlowCtrl`].
181
#[enum_dispatch::enum_dispatch]
182
#[derive(Debug)]
183
enum HalfStreamFlowCtrlInner {
184
    /// "legacy" sendme-window-based flow control.
185
    WindowBased(HalfStreamWindowFlowCtrl),
186
    /// XON/XOFF flow control.
187
    #[cfg(feature = "flowctl-cc")]
188
    XonXoffBased(HalfStreamXonXoffFlowCtrl),
189
}
190

            
191
/// Methods that can be called on a [`HalfStreamFlowCtrl`].
192
///
193
/// We use a trait so that we can use `enum_dispatch` on the inner [`HalfStreamFlowCtrlInner`] enum.
194
/// While this may seem unnecessary since this trait currently only has two methods,
195
/// it's consistent with the [`FlowCtrlHooks`] trait above.
196
#[enum_dispatch::enum_dispatch(HalfStreamFlowCtrlInner)]
197
pub(crate) trait HalfStreamFlowCtrlHooks {
198
    /// Handle some number of dropped stream messages.
199
    ///
200
    /// We don't know what kinds of stream messages were dropped, only the number of them.
201
    ///
202
    /// This method exists because currently the stream entry may drop some incoming stream
203
    /// messages and they would never be processed by this flow control object otherwise.
204
    fn handle_incoming_dropped(&mut self, msg_count: u16) -> Result<()>;
205

            
206
    /// Handle an incoming message.
207
    ///
208
    /// If it's a flow control message, it will be consumed and `None` will be returned.
209
    /// Otherwise the original message will be returned.
210
    ///
211
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
212
    /// correct type of flow control.
213
    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>>;
214
}
215

            
216
// forward all trait methods to the inner enum
217
impl HalfStreamFlowCtrlHooks for HalfStreamFlowCtrl {
218
66
    fn handle_incoming_dropped(&mut self, msg_count: u16) -> Result<()> {
219
66
        self.inner.handle_incoming_dropped(msg_count)
220
66
    }
221

            
222
1020
    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>> {
223
1020
        self.inner.handle_incoming_msg(msg)
224
1020
    }
225
}
226

            
227
/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
228
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
229
pub(crate) struct StreamRateLimit {
230
    /// The rate in bytes/s.
231
    rate: u64,
232
}
233

            
234
impl StreamRateLimit {
235
    /// A maximum rate limit.
236
    pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
237

            
238
    /// A rate limit of 0.
239
    pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
240

            
241
    /// A new [`StreamRateLimit`] with `rate` bytes/s.
242
    pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
243
        Self { rate }
244
    }
245

            
246
    /// The rate in bytes/s.
247
180
    pub(crate) const fn bytes_per_sec(&self) -> u64 {
248
180
        self.rate
249
180
    }
250
}
251

            
252
impl std::fmt::Display for StreamRateLimit {
253
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254
        write!(f, "{} bytes/s", self.rate)
255
    }
256
}