1
//! Incoming data stream cell handlers, shared by the relay and onion service implementations.
2

            
3
use bitvec::prelude::*;
4
use derive_deftly::Deftly;
5
use oneshot_fused_workaround as oneshot;
6
use postage::watch;
7

            
8
use tor_cell::relaycell::msg::AnyRelayMsg;
9
use tor_cell::relaycell::{RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg, msg};
10
use tor_cell::restricted_msg;
11
use tor_error::internal;
12
use tor_memquota::derive_deftly_template_HasMemoryCost;
13
use tor_memquota::mq_queue::{self, MpscSpec};
14
use tor_rtcompat::DynTimeProvider;
15

            
16
use crate::circuit::CircHopSyncView;
17
use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
18
use crate::stream::{CloseStreamBehavior, StreamComponents};
19
use crate::{Error, Result};
20

            
21
// TODO(relay): move these to a shared module
22
use crate::client::stream::DataStream;
23

            
24
use crate::memquota::StreamAccount;
25
use crate::stream::StreamMpscSender;
26
use crate::stream::flow_ctrl::state::StreamRateLimit;
27
use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
28
use crate::stream::queue::StreamQueueReceiver;
29
use crate::util::notify::NotifyReceiver;
30
use crate::{HopLocation, HopNum};
31

            
32
use std::mem::size_of;
33

            
34
/// A `CmdChecker` that enforces invariants for inbound data streams.
35
#[derive(Debug, Default)]
36
pub(crate) struct InboundDataCmdChecker;
37

            
38
restricted_msg! {
39
    /// An allowable incoming message on an incoming data stream.
40
    enum IncomingDataStreamMsg:RelayMsg {
41
        // SENDME is handled by the reactor.
42
        Data, End,
43
    }
44
}
45

            
46
impl CmdChecker for InboundDataCmdChecker {
47
24
    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
48
        use StreamStatus::*;
49
24
        match msg.cmd() {
50
24
            RelayCmd::DATA => Ok(Open),
51
            RelayCmd::END => Ok(Closed),
52
            _ => Err(Error::StreamProto(format!(
53
                "Unexpected {} on an incoming data stream!",
54
                msg.cmd()
55
            ))),
56
        }
57
24
    }
58

            
59
    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
60
        let _ = msg
61
            .decode::<IncomingDataStreamMsg>()
62
            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
63
        Ok(())
64
    }
65
}
66

            
67
impl InboundDataCmdChecker {
68
    /// Return a new boxed `DataCmdChecker` in a state suitable for a
69
    /// connection where an initial CONNECTED cell is not expected.
70
    ///
71
    /// This is used by hidden services, exit relays, and directory servers
72
    /// to accept streams.
73
36
    pub(crate) fn new_connected() -> AnyCmdChecker {
74
36
        Box::new(Self)
75
36
    }
76
}
77

            
78
/// A pending request from the other end of the circuit for us to open a new
79
/// stream.
80
///
81
/// Exits, directory caches, and onion services expect to receive these; others
82
/// do not.
83
///
84
/// On receiving one of these objects, the party handling it should accept it or
85
/// reject it.  If it is dropped without being explicitly handled, a reject
86
/// message will be sent anyway.
87
#[derive(Debug)]
88
pub struct IncomingStream {
89
    /// The runtime's time provider.
90
    time_provider: DynTimeProvider,
91
    /// The message that the client sent us to begin the stream.
92
    request: IncomingStreamRequest,
93
    /// Stream components used to assemble the [`DataStream`].
94
    components: StreamComponents,
95
}
96

            
97
impl IncomingStream {
98
    /// Create a new `IncomingStream`.
99
36
    pub(crate) fn new(
100
36
        time_provider: DynTimeProvider,
101
36
        request: IncomingStreamRequest,
102
36
        components: StreamComponents,
103
36
    ) -> Self {
104
36
        Self {
105
36
            time_provider,
106
36
            request,
107
36
            components,
108
36
        }
109
36
    }
110

            
111
    /// Return the underlying message that was used to try to begin this stream.
112
    pub fn request(&self) -> &IncomingStreamRequest {
113
        &self.request
114
    }
115

            
116
    /// Accept this stream as a new [`DataStream`], and send the client a
117
    /// message letting them know the stream was accepted.
118
36
    pub async fn accept_data(self, message: msg::Connected) -> Result<DataStream> {
119
        let Self {
120
24
            time_provider,
121
24
            request,
122
            components:
123
                StreamComponents {
124
24
                    mut target,
125
24
                    stream_receiver,
126
24
                    xon_xoff_reader_ctrl,
127
24
                    memquota,
128
                },
129
24
        } = self;
130

            
131
24
        match request {
132
            IncomingStreamRequest::Begin(_) | IncomingStreamRequest::BeginDir(_) => {
133
24
                target.send(message.into()).await?;
134
24
                Ok(DataStream::new_connected(
135
24
                    time_provider,
136
24
                    stream_receiver,
137
24
                    xon_xoff_reader_ctrl,
138
24
                    target,
139
24
                    memquota,
140
24
                ))
141
            }
142
            IncomingStreamRequest::Resolve(_) => {
143
                Err(internal!("Cannot accept data on a RESOLVE stream").into())
144
            }
145
        }
146
24
    }
147

            
148
    /// Reject this request and send an error message to the client.
149
18
    pub async fn reject(mut self, message: msg::End) -> Result<()> {
150
12
        let rx = self.reject_inner(CloseStreamBehavior::SendEnd(message))?;
151

            
152
12
        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
153
12
    }
154

            
155
    /// Reject this request and possibly send an error message to the client.
156
    ///
157
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
158
12
    fn reject_inner(
159
12
        &mut self,
160
12
        message: CloseStreamBehavior,
161
12
    ) -> Result<oneshot::Receiver<Result<()>>> {
162
12
        self.components.target.close_pending(message)
163
12
    }
164

            
165
    /// Ignore this request without replying to the client.
166
    ///
167
    /// (If you drop an [`IncomingStream`] without calling `accept_data`,
168
    /// `reject`, or this method, the drop handler will cause it to be
169
    /// rejected.)
170
    pub async fn discard(mut self) -> Result<()> {
171
        let rx = self.reject_inner(CloseStreamBehavior::SendNothing)?;
172

            
173
        rx.await.map_err(|_| Error::CircuitClosed)?.map(|_| ())
174
    }
175
}
176

            
177
// NOTE: We do not need to `impl Drop for IncomingStream { .. }`: when its
178
// StreamTarget is dropped, this will drop its internal mpsc::Sender, and the
179
// circuit reactor will see a close on its mpsc::Receiver, and the circuit
180
// reactor will itself send an End.
181

            
182
restricted_msg! {
183
    /// The allowed incoming messages on an `IncomingStream`.
184
    #[derive(Clone, Debug, Deftly)]
185
    #[derive_deftly(HasMemoryCost)]
186
    #[non_exhaustive]
187
    pub enum IncomingStreamRequest: RelayMsg {
188
        /// A BEGIN message.
189
        Begin,
190
        /// A BEGIN_DIR message.
191
        BeginDir,
192
        /// A RESOLVE message.
193
        Resolve,
194
    }
195
}
196

            
197
/// Bit-vector used to represent a list of permitted commands.
198
///
199
/// This is cheaper and faster than using a vec, and avoids side-channel
200
/// attacks.
201
type RelayCmdSet = bitvec::BitArr!(for 256);
202

            
203
/// A `CmdChecker` that enforces correctness for incoming commands on unrecognized streams that
204
/// have a non-zero stream ID.
205
#[derive(Debug)]
206
pub(crate) struct IncomingCmdChecker {
207
    /// The "begin" commands that can be received on this type of circuit:
208
    ///
209
    ///   * onion service circuits only accept `BEGIN`
210
    ///   * all relay circuits accept `BEGIN_DIR`
211
    ///   * exit relays additionally accept `BEGIN` or `RESOLVE` on relay circuits
212
    ///   * once CONNECT_UDP is implemented, relays and later onion services may accept CONNECT_UDP
213
    ///     as well
214
    allow_commands: RelayCmdSet,
215
}
216

            
217
impl IncomingCmdChecker {
218
    /// Create a new boxed `IncomingCmdChecker`.
219
66
    pub(crate) fn new_any(allow_commands: &[RelayCmd]) -> AnyCmdChecker {
220
66
        let mut array = BitArray::ZERO;
221
134
        for c in allow_commands {
222
68
            array.set(u8::from(*c) as usize, true);
223
68
        }
224
66
        Box::new(Self {
225
66
            allow_commands: array,
226
66
        })
227
66
    }
228
}
229

            
230
impl CmdChecker for IncomingCmdChecker {
231
60
    fn check_msg(&mut self, msg: &UnparsedRelayMsg) -> Result<StreamStatus> {
232
60
        if self.allow_commands[u8::from(msg.cmd()) as usize] {
233
44
            Ok(StreamStatus::Open)
234
        } else {
235
16
            Err(Error::StreamProto(format!(
236
16
                "Unexpected {} on incoming stream",
237
16
                msg.cmd()
238
16
            )))
239
        }
240
60
    }
241

            
242
    fn consume_checked_msg(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
243
        let _ = msg
244
            .decode::<IncomingStreamRequest>()
245
            .map_err(|err| Error::from_bytes_err(err, "invalid message on incoming stream"))?;
246

            
247
        Ok(())
248
    }
249
}
250

            
251
/// A callback that can check whether a given stream request is acceptable
252
/// immediately on its receipt.
253
///
254
/// This should only be used for checks that need to be done immediately, with a
255
/// view of the state of the circuit hop the stream request arrived on.
256
/// Any other checks should, if possible,
257
/// be done on the [`IncomingStream`] objects as they are received.
258
pub trait IncomingStreamRequestFilter: Send + 'static {
259
    /// Check an incoming stream request, and decide what to do with it.
260
    fn disposition(
261
        &mut self,
262
        ctx: &IncomingStreamRequestContext<'_>,
263
        circ: &CircHopSyncView<'_>,
264
    ) -> Result<IncomingStreamRequestDisposition>;
265
}
266

            
267
/// What action to take with an incoming stream request.
268
#[derive(Clone, Debug)]
269
#[non_exhaustive]
270
pub enum IncomingStreamRequestDisposition {
271
    /// Accept the request (for now) and pass it to the mpsc::Receiver
272
    /// that is yielding them as [`IncomingStream``
273
    Accept,
274
    /// Rejected the request, and close the circuit on which it was received.
275
    CloseCircuit,
276
    /// Reject the request and send an END message.
277
    RejectRequest(msg::End),
278
}
279

            
280
/// Information about a stream request, as passed to an [`IncomingStreamRequestFilter`].
281
pub struct IncomingStreamRequestContext<'a> {
282
    /// The request message itself
283
    pub(crate) request: &'a IncomingStreamRequest,
284
}
285
impl<'a> IncomingStreamRequestContext<'a> {
286
    /// Return a reference to the message used to request this stream.
287
    pub fn request(&self) -> &'a IncomingStreamRequest {
288
        self.request
289
    }
290
}
291

            
292
/// Information about an incoming stream request.
293
#[derive(Debug, Deftly)]
294
#[derive_deftly(HasMemoryCost)]
295
pub(crate) struct StreamReqInfo {
296
    /// The [`IncomingStreamRequest`].
297
    pub(crate) req: IncomingStreamRequest,
298
    /// The ID of the stream being requested.
299
    pub(crate) stream_id: StreamId,
300
    /// The [`HopNum`].
301
    ///
302
    /// Set to `None` if we are an exit relay.
303
    //
304
    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
305
    // incoming stream request from two separate hops.  (There is only one that's valid.)
306
    pub(crate) hop: Option<HopLocation>,
307
    /// The format which must be used with this stream to encode messages.
308
    #[deftly(has_memory_cost(indirect_size = "0"))]
309
    pub(crate) relay_cell_format: RelayCellFormat,
310
    /// A channel for receiving messages from this stream.
311
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
312
    pub(crate) receiver: StreamQueueReceiver,
313
    /// A channel for sending messages to be sent on this stream.
314
    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
315
    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
316
    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
317
    // TODO(arti#2068): we should consider making this an `Option`
318
    // the `watch::Sender` owns the indirect data
319
    #[deftly(has_memory_cost(indirect_size = "0"))]
320
    pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
321
    /// A [`Stream`](futures::Stream) that provides notifications when a new drain rate is
322
    /// requested.
323
    #[deftly(has_memory_cost(indirect_size = "0"))]
324
    pub(crate) drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
325
    /// The memory quota account to be used for this stream
326
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
327
    pub(crate) memquota: StreamAccount,
328
}
329

            
330
/// MPSC queue containing stream requests
331
#[cfg(any(feature = "hs-service", feature = "relay"))]
332
pub(crate) type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
333

            
334
/// Data required for handling an incoming stream request.
335
#[derive(educe::Educe)]
336
#[educe(Debug)]
337
#[cfg(any(feature = "hs-service", feature = "relay"))]
338
pub(crate) struct IncomingStreamRequestHandler {
339
    /// A sender for sharing information about an incoming stream request.
340
    pub(crate) incoming_sender: StreamReqSender,
341
    /// The hop to expect incoming stream requests from.
342
    ///
343
    /// Set to `None` if we are a relay.
344
    pub(crate) hop_num: Option<HopNum>,
345
    /// A [`CmdChecker`] for validating incoming streams.
346
    pub(crate) cmd_checker: AnyCmdChecker,
347
    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
348
    /// this request, or wants to reject it immediately.
349
    #[educe(Debug(ignore))]
350
    pub(crate) filter: Box<dyn IncomingStreamRequestFilter>,
351
}
352

            
353
#[cfg(test)]
354
mod test {
355
    // @@ begin test lint list maintained by maint/add_warning @@
356
    #![allow(clippy::bool_assert_comparison)]
357
    #![allow(clippy::clone_on_copy)]
358
    #![allow(clippy::dbg_macro)]
359
    #![allow(clippy::mixed_attributes_style)]
360
    #![allow(clippy::print_stderr)]
361
    #![allow(clippy::print_stdout)]
362
    #![allow(clippy::single_char_pattern)]
363
    #![allow(clippy::unwrap_used)]
364
    #![allow(clippy::unchecked_time_subtraction)]
365
    #![allow(clippy::useless_vec)]
366
    #![allow(clippy::needless_pass_by_value)]
367
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
368

            
369
    use tor_cell::relaycell::{
370
        AnyRelayMsgOuter, RelayCellFormat,
371
        msg::{Begin, BeginDir, Data, Resolve},
372
    };
373

            
374
    use super::*;
375

            
376
    #[test]
377
    fn incoming_cmd_checker() {
378
        // Convert an AnyRelayMsg to an UnparsedRelayCell.
379
        let u = |msg| {
380
            let body = AnyRelayMsgOuter::new(None, msg)
381
                .encode(RelayCellFormat::V0, &mut rand::rng())
382
                .unwrap();
383
            UnparsedRelayMsg::from_singleton_body(RelayCellFormat::V0, body).unwrap()
384
        };
385
        let begin = u(Begin::new("allium.example.com", 443, 0).unwrap().into());
386
        let begin_dir = u(BeginDir::default().into());
387
        let resolve = u(Resolve::new("allium.example.com").into());
388
        let data = u(Data::new(&[1, 2, 3]).unwrap().into());
389

            
390
        {
391
            let mut cc_none = IncomingCmdChecker::new_any(&[]);
392
            for m in [&begin, &begin_dir, &resolve, &data] {
393
                assert!(cc_none.check_msg(m).is_err());
394
            }
395
        }
396

            
397
        {
398
            let mut cc_begin = IncomingCmdChecker::new_any(&[RelayCmd::BEGIN]);
399
            assert_eq!(cc_begin.check_msg(&begin).unwrap(), StreamStatus::Open);
400
            for m in [&begin_dir, &resolve, &data] {
401
                assert!(cc_begin.check_msg(m).is_err());
402
            }
403
        }
404

            
405
        {
406
            let mut cc_any = IncomingCmdChecker::new_any(&[
407
                RelayCmd::BEGIN,
408
                RelayCmd::BEGIN_DIR,
409
                RelayCmd::RESOLVE,
410
            ]);
411
            for m in [&begin, &begin_dir, &resolve] {
412
                assert_eq!(cc_any.check_msg(m).unwrap(), StreamStatus::Open);
413
            }
414
            assert!(cc_any.check_msg(&data).is_err());
415
        }
416
    }
417
}