1
//! Type and code for handling a "half-closed" stream.
2
//!
3
//! A half-closed stream is one that we've sent an END on, but where
4
//! we might still receive some cells.
5

            
6
use crate::Result;
7
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
8
use crate::stream::flow_ctrl::state::{HalfStreamFlowCtrl, HalfStreamFlowCtrlHooks};
9
use tor_cell::relaycell::UnparsedRelayMsg;
10

            
11
/// Type to track state of half-closed streams.
12
///
13
/// A half-closed stream is one where we've sent an END cell, but where
14
/// the other side might still send us data.
15
///
16
/// We need to track these streams instead of forgetting about them entirely,
17
/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18
/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19
#[derive(Debug)]
20
pub(crate) struct HalfStream {
21
    /// Flow control for this stream.
22
    ///
23
    /// Used to process incoming flow control messages (SENDME, XON, etc).
24
    flow_control: HalfStreamFlowCtrl,
25
    /// Object to tell us which cells to accept on this stream.
26
    cmd_checker: AnyCmdChecker,
27
}
28

            
29
impl HalfStream {
30
    /// Create a new half-closed stream.
31
80
    pub(crate) fn new(flow_control: HalfStreamFlowCtrl, cmd_checker: AnyCmdChecker) -> Self {
32
80
        HalfStream {
33
80
            flow_control,
34
80
            cmd_checker,
35
80
        }
36
80
    }
37

            
38
    /// Process an incoming message and adjust this HalfStream accordingly.
39
    /// Give an error if the protocol has been violated.
40
    ///
41
    /// The caller must handle END cells; it is an internal error to pass
42
    /// END cells to this method.
43
    /// no ends here.
44
1020
    pub(crate) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
45
        use StreamStatus::*;
46

            
47
1020
        let Some(msg) = self.flow_control.handle_incoming_msg(msg)? else {
48
            // The flow control code consumed the message,
49
            // which means that it was a flow control message.
50
            // We don't give flow control messages to the checker below.
51
2
            return Ok(Open);
52
        };
53

            
54
1014
        let status = self.cmd_checker.check_msg(&msg)?;
55
1008
        self.cmd_checker.consume_checked_msg(msg)?;
56
1008
        Ok(status)
57
1020
    }
58
}
59

            
60
#[cfg(test)]
61
mod test {
62
    // @@ begin test lint list maintained by maint/add_warning @@
63
    #![allow(clippy::bool_assert_comparison)]
64
    #![allow(clippy::clone_on_copy)]
65
    #![allow(clippy::dbg_macro)]
66
    #![allow(clippy::mixed_attributes_style)]
67
    #![allow(clippy::print_stderr)]
68
    #![allow(clippy::print_stdout)]
69
    #![allow(clippy::single_char_pattern)]
70
    #![allow(clippy::unwrap_used)]
71
    #![allow(clippy::unchecked_time_subtraction)]
72
    #![allow(clippy::useless_vec)]
73
    #![allow(clippy::needless_pass_by_value)]
74
    #![allow(clippy::string_slice)] // See arti#2571
75
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
76
    use super::*;
77
    use crate::stream::RECV_WINDOW_INIT;
78
    use crate::stream::flow_ctrl::state::StreamFlowCtrl;
79
    use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow};
80
    use rand::{CryptoRng, Rng};
81
    use tor_basic_utils::test_rng::testing_rng;
82
    use tor_cell::relaycell::{
83
        AnyRelayMsgOuter, RelayCellFormat, StreamId,
84
        msg::{self, AnyRelayMsg},
85
    };
86

            
87
    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
88
        UnparsedRelayMsg::from_singleton_body(
89
            RelayCellFormat::V0,
90
            AnyRelayMsgOuter::new(StreamId::new(77), val)
91
                .encode(RelayCellFormat::V0, rng)
92
                .expect("encoding failed"),
93
        )
94
        .unwrap()
95
    }
96

            
97
    #[test]
98
    fn halfstream_sendme() {
99
        let mut rng = testing_rng();
100

            
101
        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
102
        // expecting one is if the window busts its maximum.
103
        //
104
        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
105
        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
106
        let sendw = StreamSendWindow::new(450);
107

            
108
        let mut hs = HalfStream::new(
109
            StreamFlowCtrl::new_window(sendw).half_stream(),
110
            OutboundDataCmdChecker::new_any(),
111
        );
112

            
113
        // one sendme is fine
114
        let m = msg::Sendme::new_empty();
115
        assert!(
116
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
117
                .is_ok()
118
        );
119
        // but no more were expected!
120
        let e = hs
121
            .handle_msg(to_unparsed(&mut rng, m.into()))
122
            .err()
123
            .unwrap();
124
        assert_eq!(
125
            format!("{}", e),
126
            "Circuit protocol violation: Unexpected stream SENDME"
127
        );
128
    }
129

            
130
    fn hs_new() -> HalfStream {
131
        HalfStream::new(
132
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)).half_stream(),
133
            OutboundDataCmdChecker::new_any(),
134
        )
135
    }
136

            
137
    #[test]
138
    fn halfstream_data() {
139
        let mut hs = hs_new();
140
        let mut rng = testing_rng();
141

            
142
        // we didn't give a connected cell during setup, so do it now.
143
        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
144
            .unwrap();
145

            
146
        // `RECV_WINDOW_INIT` (500) data cells are okay.
147
        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
148
        for _ in 0_u16..RECV_WINDOW_INIT {
149
            assert!(
150
                hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
151
                    .is_ok()
152
            );
153
        }
154

            
155
        // But one more is a protocol violation.
156
        let e = hs
157
            .handle_msg(to_unparsed(&mut rng, m.into()))
158
            .err()
159
            .unwrap();
160
        assert_eq!(
161
            format!("{}", e),
162
            "Circuit protocol violation: Received a data cell in violation of a window"
163
        );
164
    }
165

            
166
    #[test]
167
    fn halfstream_connected() {
168
        let mut hs = hs_new();
169
        let mut rng = testing_rng();
170
        // We were told to accept a connected, so we'll accept one
171
        // and no more.
172
        let m = msg::Connected::new_empty();
173
        assert!(
174
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
175
                .is_ok()
176
        );
177
        assert!(
178
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
179
                .is_err()
180
        );
181

            
182
        // If we try that again _after getting a connected_,
183
        // accept any.
184
        let mut cmd_checker = OutboundDataCmdChecker::new_any();
185
        {
186
            cmd_checker
187
                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
188
                .unwrap();
189
        }
190
        let mut hs = HalfStream::new(
191
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)).half_stream(),
192
            cmd_checker,
193
        );
194
        let e = hs
195
            .handle_msg(to_unparsed(&mut rng, m.into()))
196
            .err()
197
            .unwrap();
198
        assert_eq!(
199
            format!("{}", e),
200
            "Stream protocol violation: Received CONNECTED twice on a stream."
201
        );
202
    }
203

            
204
    #[test]
205
    fn halfstream_other() {
206
        let mut hs = hs_new();
207
        let mut rng = testing_rng();
208
        let m = msg::Extended2::new(Vec::new());
209
        let e = hs
210
            .handle_msg(to_unparsed(&mut rng, m.into()))
211
            .err()
212
            .unwrap();
213
        assert_eq!(
214
            format!("{}", e),
215
            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
216
        );
217
    }
218
}