1
//! Incoming data stream cell handlers, shared by the relay and onion service implementations.
2

            
3
use bitvec::prelude::*;
4
use derive_deftly::Deftly;
5
use oneshot_fused_workaround as oneshot;
6

            
7
use tor_cell::relaycell::{RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg, msg};
8
use tor_cell::restricted_msg;
9
use tor_error::internal;
10
use tor_memquota::derive_deftly_template_HasMemoryCost;
11
use tor_memquota::mq_queue::{self, MpscSpec};
12
use tor_rtcompat::DynTimeProvider;
13

            
14
use crate::circuit::CircHopSyncView;
15
use crate::circuit::circhop::ReactorStreamComponents;
16
use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
17
use crate::stream::{CloseStreamBehavior, StreamComponents};
18
use crate::{Error, Result};
19

            
20
// TODO(relay): move these to a shared module
21
use crate::client::stream::DataStream;
22

            
23
use crate::memquota::StreamAccount;
24
use crate::{HopLocation, HopNum};
25

            
26
/// A `CmdChecker` that enforces invariants for inbound data streams.
27
#[derive(Debug, Default)]
28
pub(crate) struct InboundDataCmdChecker;
29

            
30
restricted_msg! {
31
    /// An allowable incoming message on an incoming data stream.
32
    enum IncomingDataStreamMsg:RelayMsg {
33
        // SENDME is handled by the reactor.
34
        Data, End,
35
    }
36
}
37

            
38
impl CmdChecker for InboundDataCmdChecker {
39
28
    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
40
        use StreamStatus::*;
41
28
        match msg.cmd() {
42
28
            RelayCmd::DATA => Ok(Open),
43
            RelayCmd::END => Ok(Closed),
44
            _ => Err(Error::StreamProto(format!(
45
                "Unexpected {} on an incoming data stream!",
46
                msg.cmd()
47
            ))),
48
        }
49
28
    }
50

            
51
    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
52
        let _ = msg
53
            .decode::<IncomingDataStreamMsg>()
54
            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
55
        Ok(())
56
    }
57
}
58

            
59
impl InboundDataCmdChecker {
60
    /// Return a new boxed `DataCmdChecker` in a state suitable for a
61
    /// connection where an initial CONNECTED cell is not expected.
62
    ///
63
    /// This is used by hidden services, exit relays, and directory servers
64
    /// to accept streams.
65
40
    pub(crate) fn new_connected() -> AnyCmdChecker {
66
40
        Box::new(Self)
67
40
    }
68
}
69

            
70
/// A pending request from the other end of the circuit for us to open a new
71
/// stream.
72
///
73
/// Exits, directory caches, and onion services expect to receive these; others
74
/// do not.
75
///
76
/// On receiving one of these objects, the party handling it should accept it or
77
/// reject it.  If it is dropped without being explicitly handled, a reject
78
/// message will be sent anyway.
79
#[derive(Debug)]
80
pub struct IncomingStream {
81
    /// The runtime's time provider.
82
    time_provider: DynTimeProvider,
83
    /// The message that the client sent us to begin the stream.
84
    request: IncomingStreamRequest,
85
    /// Stream components used to assemble the [`DataStream`].
86
    components: StreamComponents,
87
}
88

            
89
impl IncomingStream {
90
    /// Create a new `IncomingStream`.
91
40
    pub(crate) fn new(
92
40
        time_provider: DynTimeProvider,
93
40
        request: IncomingStreamRequest,
94
40
        components: StreamComponents,
95
40
    ) -> Self {
96
40
        Self {
97
40
            time_provider,
98
40
            request,
99
40
            components,
100
40
        }
101
40
    }
102

            
103
    /// Return the underlying message that was used to try to begin this stream.
104
    pub fn request(&self) -> &IncomingStreamRequest {
105
        &self.request
106
    }
107

            
108
    /// Accept this stream as a new [`DataStream`], and send the client a
109
    /// message letting them know the stream was accepted.
110
42
    pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
111
        let Self {
112
28
            time_provider,
113
28
            request,
114
            components:
115
                StreamComponents {
116
28
                    mut target,
117
28
                    stream_receiver,
118
28
                    xon_xoff_reader_ctrl,
119
28
                    memquota,
120
                },
121
28
        } = self;
122

            
123
28
        match request {
124
            IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
125
28
                target.send(message.into()).await?;
126
28
                Ok(DataStream::new_connected(
127
28
                    time_provider,
128
28
                    stream_receiver,
129
28
                    xon_xoff_reader_ctrl,
130
28
                    target,
131
28
                    memquota,
132
28
                ))
133
            }
134
            IncomingStreamRequest::Resolve(_) => {
135
                Err(internal!("Cannot accept data on a RESOLVE stream").into())
136
            }
137
        }
138
28
    }
139

            
140
    /// Reject this request and send an error message to the client.
141
18
    pub async fn reject(mut self, message: msg::End) -> Result<()> {
142
12
        let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
143

            
144
12
        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
145
12
    }
146

            
147
    /// Reject this request and possibly send an error message to the client.
148
    ///
149
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
150
12
    fn reject_inner(
151
12
        &mut self,
152
12
        message: CloseStreamBehavior,
153
12
    ) -> Result<oneshot::Receiver<Result<()>>> {
154
12
        self.components.target.close_pending(message)
155
12
    }
156

            
157
    /// Ignore this request without replying to the client.
158
    ///
159
    /// (If you drop an [`IncomingStream`] without calling `accept_data`,
160
    /// `reject`, or this method, the drop handler will cause it to be
161
    /// rejected.)
162
    pub async fn discard(mut self) -> Result<()> {
163
        let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
164

            
165
        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
166
    }
167
}
168

            
169
// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
170
// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
171
// circuit reactor will see a close on its mpsc::Receiver, and the circuit
172
// reactor will itself send an End.
173

            
174
restricted_msg! {
175
    /// The allowed incoming messages on an `IncomingStream`.
176
    #[derive(Clone, Debug, Deftly)]
177
    #[derive_deftly(HasMemoryCost)]
178
    #[non_exhaustive]
179
    pub enum IncomingStreamRequest: RelayMsg {
180
        /// A BEGIN message.
181
        Begin,
182
        /// A BEGIN_DIR message.
183
        BeginDir,
184
        /// A RESOLVE message.
185
        Resolve,
186
    }
187
}
188

            
189
/// Bit-vector used to represent a list of permitted commands.
190
///
191
/// This is cheaper and faster than using a vec, and avoids side-channel
192
/// attacks.
193
type RelayCmdSet = bitvec::BitArr!(for 256);
194

            
195
/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
196
/// have a non-zero stream ID.
197
#[derive(Debug)]
198
pub(crate) struct IncomingCmdChecker {
199
    /// The "begin" commands that can be received on this type of circuit:
200
    ///
201
    ///   * onion service circuits only accept `BEGIN`
202
    ///   * all relay circuits accept `BEGIN_DIR`
203
    ///   * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
204
    ///   * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
205
    ///     as well
206
    allow_commands: RelayCmdSet,
207
}
208

            
209
impl IncomingCmdChecker {
210
    /// Create a new boxed `IncomingCmdChecker`.
211
74
    pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
212
74
        let mut array = BitArray::ZERO;
213
76
        for c in allow_commands {
214
76
            array.set(u8::from(*c) as usize, true);
215
76
        }
216
74
        Box::new(Self {
217
74
            allow_commands: array,
218
74
        })
219
74
    }
220
}
221

            
222
impl CmdChecker for IncomingCmdChecker {
223
64
    fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
224
64
        if self.allow_commands[u8::from(msg.cmd()) as usize] {
225
48
            Ok(StreamStatus::Open)
226
        } else {
227
16
            Err(Error::StreamProto(format!(
228
16
                "Unexpected {} on incoming stream",
229
16
                msg.cmd()
230
16
            )))
231
        }
232
64
    }
233

            
234
    fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
235
        let _ = msg
236
            .decode::<IncomingStreamRequest>()
237
            .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
238

            
239
        Ok(())
240
    }
241
}
242

            
243
/// A callback that can check whether a given stream request is acceptable
244
/// immediately on its receipt.
245
///
246
/// This should only be used for checks that need to be done immediately, with a
247
/// view of the state of the circuit hop the stream request arrived on.
248
/// Any other checks should, if possible,
249
/// be done on the [`IncomingStream`] objects as they are received.
250
pub trait IncomingStreamRequestFilter: Send + 'static {
251
    /// Check an incoming stream request, and decide what to do with it.
252
    fn disposition(
253
        &mut self,
254
        ctx: &IncomingStreamRequestContext<'_>,
255
        circ: &CircHopSyncView<'_>,
256
    ) -> Result<IncomingStreamRequestDisposition>;
257
}
258

            
259
/// What action to take with an incoming stream request.
260
#[derive(Clone, Debug)]
261
#[non_exhaustive]
262
pub enum IncomingStreamRequestDisposition {
263
    /// Accept the request (for now) and pass it to the mpsc::Receiver
264
    /// that is yielding them as [`IncomingStream``
265
    Accept,
266
    /// Rejected the request, and close the circuit on which it was received.
267
    CloseCircuit,
268
    /// Reject the request and send an END message.
269
    RejectRequest(msg::End),
270
}
271

            
272
/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
273
pub struct IncomingStreamRequestContext<'a> {
274
    /// The request message itself
275
    pub(crate) request: &'a IncomingStreamRequest,
276
}
277
impl<'a> IncomingStreamRequestContext<'a> {
278
    /// Return a reference to the message used to request this stream.
279
    pub fn request(&self) -> &'a IncomingStreamRequest {
280
        self.request
281
    }
282
}
283

            
284
/// Information about an incoming stream request.
285
#[derive(Debug, Deftly)]
286
#[derive_deftly(HasMemoryCost)]
287
pub(crate) struct StreamReqInfo {
288
    /// The [`IncomingStreamRequest`].
289
    pub(crate) req: IncomingStreamRequest,
290
    /// The ID of the stream being requested.
291
    pub(crate) stream_id: StreamId,
292
    /// The [`HopNum`].
293
    ///
294
    /// Set to `None` if we are an exit relay.
295
    //
296
    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
297
    // incoming stream request from two separate hops.  (There is only one that's valid.)
298
    pub(crate) hop: Option<HopLocation>,
299
    /// The format which must be used with this stream to encode messages.
300
    #[deftly(has_memory_cost(indirect_size = "0"))]
301
    pub(crate) relay_cell_format: RelayCellFormat,
302
    /// A collection of queues/channels that can be used to interact with this stream.
303
    pub(crate) stream_components: ReactorStreamComponents,
304
    /// The memory quota account to be used for this stream
305
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
306
    pub(crate) memquota: StreamAccount,
307
}
308

            
309
/// MPSC queue containing stream requests
310
#[cfg(any(feature = "hs-service", feature = "relay"))]
311
pub(crate) type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
312

            
313
/// Data required for handling an incoming stream request.
314
#[derive(educe::Educe)]
315
#[educe(Debug)]
316
#[cfg(any(feature = "hs-service", feature = "relay"))]
317
pub(crate) struct IncomingStreamRequestHandler {
318
    /// A sender for sharing information about an incoming stream request.
319
    pub(crate) incoming_sender: StreamReqSender,
320
    /// The hop to expect incoming stream requests from.
321
    ///
322
    /// Set to `None` if we are a relay.
323
    pub(crate) hop_num: Option<HopNum>,
324
    /// A [`CmdChecker`] for validating incoming streams.
325
    pub(crate) cmd_checker: AnyCmdChecker,
326
    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
327
    /// this request, or wants to reject it immediately.
328
    #[educe(Debug(ignore))]
329
    pub(crate) filter: Box<dyn IncomingStreamRequestFilter>,
330
}
331

            
332
#[cfg(test)]
333
mod test {
334
    // @@ begin test lint list maintained by maint/add_warning @@
335
    #![allow(clippy::bool_assert_comparison)]
336
    #![allow(clippy::clone_on_copy)]
337
    #![allow(clippy::dbg_macro)]
338
    #![allow(clippy::mixed_attributes_style)]
339
    #![allow(clippy::print_stderr)]
340
    #![allow(clippy::print_stdout)]
341
    #![allow(clippy::single_char_pattern)]
342
    #![allow(clippy::unwrap_used)]
343
    #![allow(clippy::unchecked_time_subtraction)]
344
    #![allow(clippy::useless_vec)]
345
    #![allow(clippy::needless_pass_by_value)]
346
    #![allow(clippy::string_slice)] // See arti#2571
347
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
348

            
349
    use tor_cell::relaycell::{
350
        AnyRelayMsgOuter, RelayCellFormat,
351
        msg::{Begin, BeginDir, Data, Resolve},
352
    };
353

            
354
    use super::*;
355

            
356
    #[test]
357
    fn incoming_cmd_checker() {
358
        // Convert an AnyRelayMsg to an UnparsedRelayCell.
359
        let u = |msg| {
360
            let body = AnyRelayMsgOuter::new(None, msg)
361
                .encode(RelayCellFormat::V0, &mut rand::rng())
362
                .unwrap();
363
            UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
364
        };
365
        let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
366
        let begin_dir = u(BeginDir::default().into());
367
        let resolve = u(Resolve::new("allium.example.com").into());
368
        let data = u(Data::new(&[1, 2, 3]).unwrap().into());
369

            
370
        {
371
            let mut cc_none = IncomingCmdChecker::new_any(&[]);
372
            for m in [&begin, &begin_dir, &resolve, &data] {
373
                assert!(cc_none.check_msg(m).is_err());
374
            }
375
        }
376

            
377
        {
378
            let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
379
            assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
380
            for m in [&begin_dir, &resolve, &data] {
381
                assert!(cc_begin.check_msg(m).is_err());
382
            }
383
        }
384

            
385
        {
386
            let mut cc_any = IncomingCmdChecker::new_any(&[
387
                RelayCmd::BEGIN,
388
                RelayCmd::BEGIN_DIR,
389
                RelayCmd::RESOLVE,
390
            ]);
391
            for m in [&begin, &begin_dir, &resolve] {
392
                assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
393
            }
394
            assert!(cc_any.check_msg(&data).is_err());
395
        }
396
    }
397
}