1
//! Code for implementing flow control (stream-level).
2

            
3
use std::sync::Arc;
4

            
5
use postage::watch;
6
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
7
use tor_cell::relaycell::msg::AnyRelayMsg;
8
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
9

            
10
use super::params::FlowCtrlParameters;
11
use super::window::state::WindowFlowCtrl;
12
use super::xon_xoff::reader::DrainRateRequest;
13
#[cfg(feature = "flowctl-cc")]
14
use super::xon_xoff::state::XonXoffFlowCtrl;
15

            
16
use crate::Result;
17
use crate::congestion::sendme;
18
use crate::util::notify::NotifySender;
19

            
20
/// Private internals of [`StreamFlowCtrl`].
21
#[enum_dispatch::enum_dispatch]
22
#[derive(Debug)]
23
enum StreamFlowCtrlEnum {
24
    /// "legacy" sendme-window-based flow control.
25
    WindowBased(WindowFlowCtrl),
26
    /// XON/XOFF flow control.
27
    #[cfg(feature = "flowctl-cc")]
28
    XonXoffBased(XonXoffFlowCtrl),
29
}
30

            
31
/// Manages flow control for a stream.
32
#[derive(Debug)]
33
pub(crate) struct StreamFlowCtrl {
34
    /// Private internal enum.
35
    e: StreamFlowCtrlEnum,
36
}
37

            
38
impl StreamFlowCtrl {
39
    /// Returns a new sendme-window-based [`StreamFlowCtrl`].
40
386
    pub(crate) fn new_window(window: sendme::StreamSendWindow) -> Self {
41
386
        Self {
42
386
            e: StreamFlowCtrlEnum::WindowBased(WindowFlowCtrl::new(window)),
43
386
        }
44
386
    }
45

            
46
    /// Returns a new xon/xoff-based [`StreamFlowCtrl`].
47
    #[cfg(feature = "flowctl-cc")]
48
12
    pub(crate) fn new_xon_xoff(
49
12
        params: Arc<FlowCtrlParameters>,
50
12
        use_sidechannel_mitigations: bool,
51
12
        rate_limit_updater: watch::Sender<StreamRateLimit>,
52
12
        drain_rate_requester: NotifySender<DrainRateRequest>,
53
12
    ) -> Self {
54
12
        Self {
55
12
            e: StreamFlowCtrlEnum::XonXoffBased(XonXoffFlowCtrl::new(
56
12
                params,
57
12
                use_sidechannel_mitigations,
58
12
                rate_limit_updater,
59
12
                drain_rate_requester,
60
12
            )),
61
12
        }
62
12
    }
63
}
64

            
65
// forward all trait methods to the inner enum
66
impl FlowCtrlHooks for StreamFlowCtrl {
67
20908
    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
68
20908
        self.e.can_send(msg)
69
20908
    }
70

            
71
4156
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
72
4156
        self.e.about_to_send(msg)
73
4156
    }
74

            
75
8
    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
76
8
        self.e.put_for_incoming_sendme(msg)
77
8
    }
78

            
79
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
80
        self.e.handle_incoming_xon(msg)
81
    }
82

            
83
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
84
        self.e.handle_incoming_xoff(msg)
85
    }
86

            
87
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
88
        self.e.maybe_send_xon(rate, buffer_len)
89
    }
90

            
91
64
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
92
64
        self.e.maybe_send_xoff(buffer_len)
93
64
    }
94
}
95

            
96
/// Methods that can be called on a [`StreamFlowCtrl`].
97
///
98
/// We use a trait so that we can use `enum_dispatch` on the inner [`StreamFlowCtrlEnum`] enum.
99
#[enum_dispatch::enum_dispatch(StreamFlowCtrlEnum)]
100
pub(crate) trait FlowCtrlHooks {
101
    /// Whether this stream is ready to send `msg`.
102
    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool;
103

            
104
    /// Inform the flow control code that we're about to send `msg`.
105
    /// Returns an error if the message should not be sent,
106
    /// and the circuit should be closed.
107
    // TODO: Consider having this method wrap the message in a type that
108
    // "proves" we've applied flow control. This would make it easier to apply
109
    // flow control earlier, e.g. in `OpenStreamEntStream`, without introducing
110
    // ambiguity in the sending function as to whether flow control has already
111
    // been applied or not.
112
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()>;
113

            
114
    /// Handle an incoming sendme.
115
    ///
116
    /// On success, return the number of cells left in the window.
117
    ///
118
    /// On failure, return an error: the caller should close the stream or
119
    /// circuit with a protocol error.
120
    ///
121
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
122
    /// correct type of flow control.
123
    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
124

            
125
    /// Handle an incoming XON message.
126
    ///
127
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
128
    /// correct type of flow control.
129
    fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
130

            
131
    /// Handle an incoming XOFF message.
132
    ///
133
    /// Takes the [`UnparsedRelayMsg`] so that we don't even try to decode it if we're not using the
134
    /// correct type of flow control.
135
    fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
136

            
137
    /// Check if we should send an XON message.
138
    ///
139
    /// If we should, then returns the XON message that should be sent.
140
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
141
    fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>>;
142

            
143
    /// Check if we should send an XOFF message.
144
    ///
145
    /// If we should, then returns the XOFF message that should be sent.
146
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
147
    fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>>;
148
}
149

            
150
/// A newtype wrapper for a tor stream rate limit that makes the units explicit.
151
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
152
pub(crate) struct StreamRateLimit {
153
    /// The rate in bytes/s.
154
    rate: u64,
155
}
156

            
157
impl StreamRateLimit {
158
    /// A maximum rate limit.
159
    pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
160

            
161
    /// A rate limit of 0.
162
    pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
163

            
164
    /// A new [`StreamRateLimit`] with `rate` bytes/s.
165
    pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
166
        Self { rate }
167
    }
168

            
169
    /// The rate in bytes/s.
170
180
    pub(crate) const fn bytes_per_sec(&self) -> u64 {
171
180
        self.rate
172
180
    }
173
}
174

            
175
impl std::fmt::Display for StreamRateLimit {
176
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177
        write!(f, "{} bytes/s", self.rate)
178
    }
179
}