1
//! Circuit reactor's stream window flow control.
2

            
3
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
4
use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
5
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
6

            
7
use crate::congestion::sendme::{self, StreamSendWindow};
8
use crate::stream::flow_ctrl::state::FlowCtrlHooks;
9
use crate::{Error, Result};
10

            
11
#[cfg(doc)]
12
use crate::stream::flow_ctrl::state::StreamFlowCtrl;
13

            
14
/// State for window-based flow control.
15
#[derive(Debug)]
16
pub(crate) struct WindowFlowCtrl {
17
    /// Send window.
18
    window: StreamSendWindow,
19
}
20

            
21
impl WindowFlowCtrl {
22
    /// Returns a new sendme-window-based state.
23
    // TODO: Maybe take the raw u16 and create StreamSendWindow ourselves?
24
    // Unclear whether we need or want to support creating this object from a
25
    // preexisting StreamSendWindow.
26
386
    pub(crate) fn new(window: StreamSendWindow) -> Self {
27
386
        Self { window }
28
386
    }
29
}
30

            
31
impl FlowCtrlHooks for WindowFlowCtrl {
32
14908
    fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
33
14908
        !sendme::cmd_counts_towards_windows(msg.cmd()) || self.window.window() > 0
34
14908
    }
35

            
36
2956
    fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
37
2956
        if sendme::cmd_counts_towards_windows(msg.cmd()) {
38
2956
            self.window.take().map(|_| ())
39
        } else {
40
            // TODO: Maybe make this an error?
41
            // Ideally caller would have checked this already.
42
            Ok(())
43
        }
44
2956
    }
45

            
46
8
    fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
47
8
        let _sendme = msg
48
8
            .decode::<Sendme>()
49
8
            .map_err(|e| Error::from_bytes_err(e, "failed to decode stream sendme message"))?
50
8
            .into_msg();
51

            
52
8
        self.window.put()
53
8
    }
54

            
55
    fn handle_incoming_xon(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
56
        let msg = "XON messages not allowed with window flow control";
57
        Err(Error::CircProto(msg.into()))
58
    }
59

            
60
    fn handle_incoming_xoff(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
61
        let msg = "XOFF messages not allowed with window flow control";
62
        Err(Error::CircProto(msg.into()))
63
    }
64

            
65
    fn maybe_send_xon(&mut self, _rate: XonKbpsEwma, _buffer_len: usize) -> Result<Option<Xon>> {
66
        let msg = "XON messages cannot be sent with window flow control";
67
        Err(Error::CircProto(msg.into()))
68
    }
69

            
70
    fn maybe_send_xoff(&mut self, _buffer_len: usize) -> Result<Option<Xoff>> {
71
        let msg = "XOFF messages cannot be sent with window flow control";
72
        Err(Error::CircProto(msg.into()))
73
    }
74
}