1
//! Type and code for handling a "half-closed" stream.
2
//!
3
//! A half-closed stream is one that we've sent an END on, but where
4
//! we might still receive some cells.
5

            
6
use crate::Result;
7
use crate::congestion::sendme::{StreamRecvWindow, cmd_counts_towards_windows};
8
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
9
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl};
10
use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
11

            
12
/// Type to track state of half-closed streams.
13
///
14
/// A half-closed stream is one where we've sent an END cell, but where
15
/// the other side might still send us data.
16
///
17
/// We need to track these streams instead of forgetting about them entirely,
18
/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
19
/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
20
#[derive(Debug)]
21
pub(crate) struct HalfStream {
22
    /// Flow control for this stream.
23
    ///
24
    /// Used to process incoming flow control messages (SENDME, XON, etc).
25
    flow_control: StreamFlowCtrl,
26
    /// Receive window for this stream. Used to detect whether we get too
27
    /// many data cells.
28
    recvw: StreamRecvWindow,
29
    /// Object to tell us which cells to accept on this stream.
30
    cmd_checker: AnyCmdChecker,
31
}
32

            
33
impl HalfStream {
34
    /// Create a new half-closed stream.
35
82
    pub(crate) fn new(
36
82
        flow_control: StreamFlowCtrl,
37
82
        recvw: StreamRecvWindow,
38
82
        cmd_checker: AnyCmdChecker,
39
82
    ) -> Self {
40
82
        HalfStream {
41
82
            flow_control,
42
82
            recvw,
43
82
            cmd_checker,
44
82
        }
45
82
    }
46

            
47
    /// Process an incoming message and adjust this HalfStream accordingly.
48
    /// Give an error if the protocol has been violated.
49
    ///
50
    /// The caller must handle END cells; it is an internal error to pass
51
    /// END cells to this method.
52
    /// no ends here.
53
60
    pub(crate) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
54
        use StreamStatus::*;
55

            
56
        // We handle SENDME/XON/XOFF separately, and don't give it to the checker.
57
        //
58
        // TODO: this logic is the same as `CircHop::deliver_msg_to_stream`; we should refactor this
59
        // if possible
60
60
        match msg.cmd() {
61
            RelayCmd::SENDME => {
62
4
                self.flow_control.put_for_incoming_sendme(msg)?;
63
2
                return Ok(Open);
64
            }
65
            RelayCmd::XON => {
66
                self.flow_control.handle_incoming_xon(msg)?;
67
                return Ok(Open);
68
            }
69
            RelayCmd::XOFF => {
70
                self.flow_control.handle_incoming_xoff(msg)?;
71
                return Ok(Open);
72
            }
73
56
            _ => {}
74
        }
75

            
76
56
        if cmd_counts_towards_windows(msg.cmd()) {
77
46
            self.recvw.take()?;
78
10
        }
79

            
80
54
        let status = self.cmd_checker.check_msg(&msg)?;
81
48
        self.cmd_checker.consume_checked_msg(msg)?;
82
48
        Ok(status)
83
60
    }
84
}
85

            
86
#[cfg(test)]
87
mod test {
88
    // @@ begin test lint list maintained by maint/add_warning @@
89
    #![allow(clippy::bool_assert_comparison)]
90
    #![allow(clippy::clone_on_copy)]
91
    #![allow(clippy::dbg_macro)]
92
    #![allow(clippy::mixed_attributes_style)]
93
    #![allow(clippy::print_stderr)]
94
    #![allow(clippy::print_stdout)]
95
    #![allow(clippy::single_char_pattern)]
96
    #![allow(clippy::unwrap_used)]
97
    #![allow(clippy::unchecked_time_subtraction)]
98
    #![allow(clippy::useless_vec)]
99
    #![allow(clippy::needless_pass_by_value)]
100
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
101
    use super::*;
102
    use crate::{
103
        client::stream::OutboundDataCmdChecker,
104
        congestion::sendme::{StreamRecvWindow, StreamSendWindow},
105
    };
106
    use rand::{CryptoRng, Rng};
107
    use tor_basic_utils::test_rng::testing_rng;
108
    use tor_cell::relaycell::{
109
        AnyRelayMsgOuter, RelayCellFormat, StreamId,
110
        msg::{self, AnyRelayMsg},
111
    };
112

            
113
    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
114
        UnparsedRelayMsg::from_singleton_body(
115
            RelayCellFormat::V0,
116
            AnyRelayMsgOuter::new(StreamId::new(77), val)
117
                .encode(RelayCellFormat::V0, rng)
118
                .expect("encoding failed"),
119
        )
120
        .unwrap()
121
    }
122

            
123
    #[test]
124
    fn halfstream_sendme() {
125
        let mut rng = testing_rng();
126

            
127
        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
128
        // expecting one is if the window busts its maximum.
129
        //
130
        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
131
        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
132
        let sendw = StreamSendWindow::new(450);
133

            
134
        let mut hs = HalfStream::new(
135
            StreamFlowCtrl::new_window(sendw),
136
            StreamRecvWindow::new(20),
137
            OutboundDataCmdChecker::new_any(),
138
        );
139

            
140
        // one sendme is fine
141
        let m = msg::Sendme::new_empty();
142
        assert!(
143
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
144
                .is_ok()
145
        );
146
        // but no more were expected!
147
        let e = hs
148
            .handle_msg(to_unparsed(&mut rng, m.into()))
149
            .err()
150
            .unwrap();
151
        assert_eq!(
152
            format!("{}", e),
153
            "Circuit protocol violation: Unexpected stream SENDME"
154
        );
155
    }
156

            
157
    fn hs_new() -> HalfStream {
158
        HalfStream::new(
159
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)),
160
            StreamRecvWindow::new(20),
161
            OutboundDataCmdChecker::new_any(),
162
        )
163
    }
164

            
165
    #[test]
166
    fn halfstream_data() {
167
        let mut hs = hs_new();
168
        let mut rng = testing_rng();
169

            
170
        // we didn't give a connected cell during setup, so do it now.
171
        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
172
            .unwrap();
173

            
174
        // 20 data cells are okay.
175
        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
176
        for _ in 0_u8..20 {
177
            assert!(
178
                hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
179
                    .is_ok()
180
            );
181
        }
182

            
183
        // But one more is a protocol violation.
184
        let e = hs
185
            .handle_msg(to_unparsed(&mut rng, m.into()))
186
            .err()
187
            .unwrap();
188
        assert_eq!(
189
            format!("{}", e),
190
            "Circuit protocol violation: Received a data cell in violation of a window"
191
        );
192
    }
193

            
194
    #[test]
195
    fn halfstream_connected() {
196
        let mut hs = hs_new();
197
        let mut rng = testing_rng();
198
        // We were told to accept a connected, so we'll accept one
199
        // and no more.
200
        let m = msg::Connected::new_empty();
201
        assert!(
202
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
203
                .is_ok()
204
        );
205
        assert!(
206
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
207
                .is_err()
208
        );
209

            
210
        // If we try that again _after getting a connected_,
211
        // accept any.
212
        let mut cmd_checker = OutboundDataCmdChecker::new_any();
213
        {
214
            cmd_checker
215
                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
216
                .unwrap();
217
        }
218
        let mut hs = HalfStream::new(
219
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)),
220
            StreamRecvWindow::new(20),
221
            cmd_checker,
222
        );
223
        let e = hs
224
            .handle_msg(to_unparsed(&mut rng, m.into()))
225
            .err()
226
            .unwrap();
227
        assert_eq!(
228
            format!("{}", e),
229
            "Stream protocol violation: Received CONNECTED twice on a stream."
230
        );
231
    }
232

            
233
    #[test]
234
    fn halfstream_other() {
235
        let mut hs = hs_new();
236
        let mut rng = testing_rng();
237
        let m = msg::Extended2::new(Vec::new());
238
        let e = hs
239
            .handle_msg(to_unparsed(&mut rng, m.into()))
240
            .err()
241
            .unwrap();
242
        assert_eq!(
243
            format!("{}", e),
244
            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
245
        );
246
    }
247
}