1
//! Circuit reactor's stream window flow control.
2

            
3
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
4
use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
5
use tor_cell::relaycell::{RelayCmd, RelayMsg, UnparsedRelayMsg};
6

            
7
use crate::congestion::sendme::{
8
    self, StreamRecvWindow, StreamSendWindow, cmd_counts_towards_windows,
9
};
10
use crate::stream::RECV_WINDOW_INIT;
11
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, HalfStreamFlowCtrlHooks};
12
use crate::{Error, Result};
13

            
14
#[cfg(doc)]
15
use crate::stream::flow_ctrl::state::StreamFlowCtrl;
16

            
17
/// State for window-based flow control.
18
#[derive(Debug)]
19
pub(crate) struct WindowFlowCtrl {
20
    /// Send window.
21
    window: StreamSendWindow,
22
}
23

            
24
impl WindowFlowCtrl {
25
    /// Returns a new sendme-window-based state.
26
    // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
27
    // Unclear whether we need or want to support creating this object from a
28
    // preexisting StreamSendWindow.
29
386
    pub(crate) fn new(window: StreamSendWindow) -> Self {
30
386
        Self { window }
31
386
    }
32
}
33

            
34
impl FlowCtrlHooks for WindowFlowCtrl {
35
14908
    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
36
14908
        !sendme::cmd_counts_towards_windows(msg.cmd()) || self.window.window() > 0
37
14908
    }
38

            
39
2956
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
40
2956
        if sendme::cmd_counts_towards_windows(msg.cmd()) {
41
2956
            self.window.take().map(|_| ())
42
        } else {
43
            // TODO: Maybe make this an error?
44
            // Ideally caller would have checked this already.
45
            Ok(())
46
        }
47
2956
    }
48

            
49
8
    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
50
8
        let _sendme = msg
51
8
            .decode::<Sendme>()
52
8
            .map_err(|e| Error::from_bytes_err(e, "failed to decode stream sendme message"))?
53
8
            .into_msg();
54

            
55
8
        self.window.put()
56
8
    }
57

            
58
    fn handle_incoming_xon(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
59
        let msg = "XON messages not allowed with window flow control";
60
        Err(Error::CircProto(msg.into()))
61
    }
62

            
63
    fn handle_incoming_xoff(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
64
        let msg = "XOFF messages not allowed with window flow control";
65
        Err(Error::CircProto(msg.into()))
66
    }
67

            
68
    fn maybe_send_xon(&mut self, _rate: XonKbpsEwma, _buffer_len: usize) -> Result<Option<Xon>> {
69
        let msg = "XON messages cannot be sent with window flow control";
70
        Err(Error::CircProto(msg.into()))
71
    }
72

            
73
    fn maybe_send_xoff(&mut self, _buffer_len: usize) -> Result<Option<Xoff>> {
74
        let msg = "XOFF messages cannot be sent with window flow control";
75
        Err(Error::CircProto(msg.into()))
76
    }
77
}
78

            
79
/// State for window-based flow control on a half-stream.
80
#[derive(Debug)]
81
pub(crate) struct HalfStreamWindowFlowCtrl {
82
    /// The original [`WindowFlowCtrl`] from the full stream.
83
    ///
84
    /// We keep this since we need to continue validating any incoming messages.
85
    inner: WindowFlowCtrl,
86
    /// The stream's receive window.
87
    ///
88
    /// When it was a full-stream, the receive window was tracked by the `DataStream`.
89
    /// But since the `DataStream` has gone away, we need to track it ourselves.
90
    recv_window: StreamRecvWindow,
91
}
92

            
93
impl HalfStreamWindowFlowCtrl {
94
    /// Returns a new sendme-window-based state for a half-stream.
95
76
    pub(crate) fn new(flow_ctrl: WindowFlowCtrl) -> Self {
96
76
        Self {
97
76
            inner: flow_ctrl,
98
76
            // FIXME(eta): we don't copy the receive window, instead just creating a new one,
99
76
            //             so a malicious peer can send us slightly more data than they should
100
76
            //             be able to; see arti#230.
101
76
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
102
76
        }
103
76
    }
104
}
105

            
106
impl HalfStreamFlowCtrlHooks for HalfStreamWindowFlowCtrl {
107
66
    fn handle_incoming_dropped(&mut self, msg_count: u16) -> Result<()> {
108
66
        self.recv_window.decrement_n(msg_count)
109
66
    }
110

            
111
1020
    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>> {
112
1020
        match msg.cmd() {
113
            RelayCmd::SENDME => {
114
4
                self.inner.put_for_incoming_sendme(msg)?;
115
2
                Ok(None)
116
            }
117
            RelayCmd::XON => {
118
                self.inner.handle_incoming_xon(msg)?;
119
                Ok(None)
120
            }
121
            RelayCmd::XOFF => {
122
                self.inner.handle_incoming_xoff(msg)?;
123
                Ok(None)
124
            }
125
1016
            cmd if cmd_counts_towards_windows(cmd) => {
126
                // Discard the returned bool since we aren't sending any more SENDMEs.
127
1006
                let _ = self.recv_window.take()?;
128
1004
                Ok(Some(msg))
129
            }
130
            // Nothing to do here.
131
10
            _ => Ok(Some(msg)),
132
        }
133
1020
    }
134
}