1
//! Type and code for handling a "half-closed" stream.
2
//!
3
//! A half-closed stream is one that we've sent an END on, but where
4
//! we might still receive some cells.
5

            
6
use crate::Result;
7
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
8
use crate::stream::flow_ctrl::state::{HalfStreamFlowCtrl, HalfStreamFlowCtrlHooks};
9
use tor_cell::relaycell::UnparsedRelayMsg;
10

            
11
/// Type to track state of half-closed streams.
12
///
13
/// A half-closed stream is one where we've sent an END cell, but where
14
/// the other side might still send us data.
15
///
16
/// We need to track these streams instead of forgetting about them entirely,
17
/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
18
/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
19
#[derive(Debug)]
20
pub(crate) struct HalfStream {
21
    /// Flow control for this stream.
22
    ///
23
    /// Used to process incoming flow control messages (SENDME, XON, etc).
24
    flow_control: HalfStreamFlowCtrl,
25
    /// Object to tell us which cells to accept on this stream.
26
    cmd_checker: AnyCmdChecker,
27
}
28

            
29
impl HalfStream {
30
    /// Create a new half-closed stream.
31
76
    pub(crate) fn new(flow_control: HalfStreamFlowCtrl, cmd_checker: AnyCmdChecker) -> Self {
32
76
        HalfStream {
33
76
            flow_control,
34
76
            cmd_checker,
35
76
        }
36
76
    }
37

            
38
    /// Process an incoming message and adjust this HalfStream accordingly.
39
    /// Give an error if the protocol has been violated.
40
    ///
41
    /// The caller must handle END cells; it is an internal error to pass
42
    /// END cells to this method.
43
    /// no ends here.
44
1020
    pub(crate) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
45
        use StreamStatus::*;
46

            
47
1020
        let Some(msg) = self.flow_control.handle_incoming_msg(msg)? else {
48
            // The flow control code consumed the message,
49
            // which means that it was a flow control message.
50
            // We don't give flow control messages to the checker below.
51
2
            return Ok(Open);
52
        };
53

            
54
1014
        let status = self.cmd_checker.check_msg(&msg)?;
55
1008
        self.cmd_checker.consume_checked_msg(msg)?;
56
1008
        Ok(status)
57
1020
    }
58
}
59

            
60
#[cfg(test)]
61
mod test {
62
    // @@ begin test lint list maintained by maint/add_warning @@
63
    #![allow(clippy::bool_assert_comparison)]
64
    #![allow(clippy::clone_on_copy)]
65
    #![allow(clippy::dbg_macro)]
66
    #![allow(clippy::mixed_attributes_style)]
67
    #![allow(clippy::print_stderr)]
68
    #![allow(clippy::print_stdout)]
69
    #![allow(clippy::single_char_pattern)]
70
    #![allow(clippy::unwrap_used)]
71
    #![allow(clippy::unchecked_time_subtraction)]
72
    #![allow(clippy::useless_vec)]
73
    #![allow(clippy::needless_pass_by_value)]
74
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
75
    use super::*;
76
    use crate::stream::RECV_WINDOW_INIT;
77
    use crate::stream::flow_ctrl::state::StreamFlowCtrl;
78
    use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow};
79
    use rand::{CryptoRng, Rng};
80
    use tor_basic_utils::test_rng::testing_rng;
81
    use tor_cell::relaycell::{
82
        AnyRelayMsgOuter, RelayCellFormat, StreamId,
83
        msg::{self, AnyRelayMsg},
84
    };
85

            
86
    fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
87
        UnparsedRelayMsg::from_singleton_body(
88
            RelayCellFormat::V0,
89
            AnyRelayMsgOuter::new(StreamId::new(77), val)
90
                .encode(RelayCellFormat::V0, rng)
91
                .expect("encoding failed"),
92
        )
93
        .unwrap()
94
    }
95

            
96
    #[test]
97
    fn halfstream_sendme() {
98
        let mut rng = testing_rng();
99

            
100
        // Stream level SENDMEs are not authenticated and so the only way to make sure we were not
101
        // expecting one is if the window busts its maximum.
102
        //
103
        // Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
104
        // meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
105
        let sendw = StreamSendWindow::new(450);
106

            
107
        let mut hs = HalfStream::new(
108
            StreamFlowCtrl::new_window(sendw).half_stream(),
109
            OutboundDataCmdChecker::new_any(),
110
        );
111

            
112
        // one sendme is fine
113
        let m = msg::Sendme::new_empty();
114
        assert!(
115
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
116
                .is_ok()
117
        );
118
        // but no more were expected!
119
        let e = hs
120
            .handle_msg(to_unparsed(&mut rng, m.into()))
121
            .err()
122
            .unwrap();
123
        assert_eq!(
124
            format!("{}", e),
125
            "Circuit protocol violation: Unexpected stream SENDME"
126
        );
127
    }
128

            
129
    fn hs_new() -> HalfStream {
130
        HalfStream::new(
131
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)).half_stream(),
132
            OutboundDataCmdChecker::new_any(),
133
        )
134
    }
135

            
136
    #[test]
137
    fn halfstream_data() {
138
        let mut hs = hs_new();
139
        let mut rng = testing_rng();
140

            
141
        // we didn't give a connected cell during setup, so do it now.
142
        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
143
            .unwrap();
144

            
145
        // `RECV_WINDOW_INIT` (500) data cells are okay.
146
        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
147
        for _ in 0_u16..RECV_WINDOW_INIT {
148
            assert!(
149
                hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
150
                    .is_ok()
151
            );
152
        }
153

            
154
        // But one more is a protocol violation.
155
        let e = hs
156
            .handle_msg(to_unparsed(&mut rng, m.into()))
157
            .err()
158
            .unwrap();
159
        assert_eq!(
160
            format!("{}", e),
161
            "Circuit protocol violation: Received a data cell in violation of a window"
162
        );
163
    }
164

            
165
    #[test]
166
    fn halfstream_connected() {
167
        let mut hs = hs_new();
168
        let mut rng = testing_rng();
169
        // We were told to accept a connected, so we'll accept one
170
        // and no more.
171
        let m = msg::Connected::new_empty();
172
        assert!(
173
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
174
                .is_ok()
175
        );
176
        assert!(
177
            hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
178
                .is_err()
179
        );
180

            
181
        // If we try that again _after getting a connected_,
182
        // accept any.
183
        let mut cmd_checker = OutboundDataCmdChecker::new_any();
184
        {
185
            cmd_checker
186
                .check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
187
                .unwrap();
188
        }
189
        let mut hs = HalfStream::new(
190
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)).half_stream(),
191
            cmd_checker,
192
        );
193
        let e = hs
194
            .handle_msg(to_unparsed(&mut rng, m.into()))
195
            .err()
196
            .unwrap();
197
        assert_eq!(
198
            format!("{}", e),
199
            "Stream protocol violation: Received CONNECTED twice on a stream."
200
        );
201
    }
202

            
203
    #[test]
204
    fn halfstream_other() {
205
        let mut hs = hs_new();
206
        let mut rng = testing_rng();
207
        let m = msg::Extended2::new(Vec::new());
208
        let e = hs
209
            .handle_msg(to_unparsed(&mut rng, m.into()))
210
            .err()
211
            .unwrap();
212
        assert_eq!(
213
            format!("{}", e),
214
            "Stream protocol violation: Unexpected EXTENDED2 on a data stream!"
215
        );
216
    }
217
}