1
//! Module exposing the relay circuit reactor subsystem.
2
//!
3
//! See [`reactor`](crate::circuit::reactor) for a description of the overall architecture.
4
//!
5
//! #### `ForwardReactor`
6
//!
7
//! It handles
8
//!
9
//!  * unrecognized RELAY cells, by moving them in the forward direction (towards the exit)
10
//!  * recognized RELAY cells, by splitting each cell into messages, and handling
11
//!    each message individually as described in the table below
12
//!    (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell).
13
//!  * RELAY_EARLY cells (**not yet implemented**)
14
//!  * DESTROY cells (**not yet implemented**)
15
//!  * PADDING_NEGOTIATE cells (**not yet implemented**)
16
//!
17
//! ```text
18
//!
19
//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
20
//!
21
//! | RELAY cmd         | Received in | Handled in | Description                            |
22
//! |-------------------|-------------|------------|----------------------------------------|
23
//! | DROP              | F           | F          | Passed to PaddingController for        |
24
//! |                   |             |            | validation                             |
25
//! |-------------------|-------------|------------|----------------------------------------|
26
//! | EXTEND2           | F           |            | Handled by instructing the channel     |
27
//! |                   |             |            | provider to launch a new channel, and  |
28
//! |                   |             |            | waiting for the new channel on its     |
29
//! |                   |             |            | outgoing_chan_rx receiver              |
30
//! |                   |             |            | (**not yet implemented**)              |
31
//! |-------------------|-------------|------------|----------------------------------------|
32
//! | TRUNCATE          | F           | F          | (**not yet implemented**)              |
33
//! |                   |             |            |                                        |
34
//! |-------------------|-------------|------------|----------------------------------------|
35
//! | TODO              |             |            |                                        |
36
//! |                   |             |            |                                        |
37
//! ```
38

            
39
pub(crate) mod backward;
40
pub(crate) mod forward;
41

            
42
use std::sync::Arc;
43
use std::time::Duration;
44

            
45
use futures::StreamExt as _;
46
use futures::channel::mpsc;
47

            
48
use tor_cell::chancell::CircId;
49
use tor_cell::relaycell::RelayCmd;
50
use tor_linkspec::OwnedChanTarget;
51
use tor_memquota::mq_queue::{ChannelSpec, MpscSpec};
52
use tor_rtcompat::{DynTimeProvider, Runtime};
53

            
54
use crate::channel::Channel;
55
use crate::circuit::circhop::ReactorStreamComponents;
56
use crate::circuit::circhop::{CircHopOutbound, HopSettings};
57
use crate::circuit::reactor::Reactor as BaseReactor;
58
use crate::circuit::reactor::hop_mgr::HopMgr;
59
use crate::circuit::reactor::stream;
60
use crate::circuit::{CircuitRxReceiver, UniqId};
61
use crate::congestion::sendme::StreamRecvWindow;
62
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
63
use crate::memquota::{CircuitAccount, SpecificAccount};
64
use crate::relay::RelayCirc;
65
use crate::relay::channel_provider::ChannelProvider;
66
use crate::relay::reactor::backward::Backward;
67
use crate::relay::reactor::forward::Forward;
68
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
69
use crate::stream::incoming::{
70
    IncomingCmdChecker, IncomingStream, IncomingStreamRequestFilter, IncomingStreamRequestHandler,
71
    StreamReqInfo,
72
};
73
use crate::stream::raw::StreamReceiver;
74
use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
75

            
76
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
77
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
78

            
79
/// Type-alias for the relay base reactor type.
80
type RelayBaseReactor<R> = BaseReactor<R, Forward, Backward>;
81

            
82
/// The entry point of the circuit reactor subsystem.
83
#[allow(unused)] // TODO(relay)
84
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
85
pub(crate) struct Reactor<R: Runtime>(RelayBaseReactor<R>);
86

            
87
/// A handler customizing the relay stream reactor.
88
struct StreamHandler;
89

            
90
impl stream::StreamHandler for StreamHandler {
91
8
    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration {
92
8
        let ccontrol = hop.ccontrol();
93

            
94
        // Note: if we have no measurements for the RTT, this will be set to 0,
95
        // so the stream will be removed from the stream map immediately,
96
        // and any subsequent messages arriving on it will trigger
97
        // a proto violation causing the circuit to close.
98
        //
99
        // TODO(relay-tuning): we should make sure that this doesn't cause us to
100
        // wrongly close legitimate circuits that still have in-flight stream data
101
8
        ccontrol
102
8
            .lock()
103
8
            .expect("poisoned lock")
104
8
            .rtt()
105
8
            .max_rtt_usec()
106
8
            .map(|rtt| Duration::from_millis(u64::from(rtt)))
107
            // TODO(relay): we should fallback to a non-zero default here
108
            // if we don't have any RTT measurements yet
109
8
            .unwrap_or_default()
110
8
    }
111
}
112

            
113
#[allow(unused)] // TODO(relay)
114
impl<R: Runtime> Reactor<R> {
115
    /// Create a new circuit reactor.
116
    ///
117
    /// Returns the [`Reactor`], a [`RelayCirc`] handle to it,
118
    /// and a [`Stream`](futures::Stream) of `IncomingStream`s.
119
    ///
120
    /// The reactor will send outbound messages on `channel`, receive incoming
121
    /// messages on `input`, and identify this circuit by the channel-local
122
    /// [`CircId`] provided.
123
    ///
124
    /// The internal unique identifier for this circuit will be `unique_id`.
125
    ///
126
    /// The returned `IncomingStream`s are exit, dns, or directory streams.
127
    /// An incoming stream is automatically rejected by the reactor
128
    /// if the provided `IncomingStreamRequestFilter` rejects it.
129
    /// You can also explicitly reject a stream by calling [`IncomingStream::reject`].
130
    /// If the `Stream` is dropped, the next incoming stream request
131
    /// (`BEGIN`, `BEGIN_DIR`, or RESOLVE`)
132
    /// on this circuit will cause the stream reactor to shut down,
133
    /// which will trigger a shutdown of all the circuit reactors (FWD, BWD),
134
    /// which causing the circuit to close.
135
    ///
136
    /// The streams not rejected by the `IncomingStreamRequestFilter` will
137
    /// get an entry in the circuit's stream map.
138
    /// Rejecting such a stream using [`IncomingStream::reject`] will remove the entry.
139
    ///
140
    /// The `IncomingStreamRequestFilter` should only perform inexpensive checks
141
    /// that won't block the reactor.
142
    /// More expensive, or blocking checks, should be handled outside of the circuit reactor,
143
    /// when processing new `IncomingStream`s from the returned Rust stream.
144
    ///
145
    /// Data and directory streams can be accepted by calling [`IncomingStream::accept_data`].
146
    /// The caller is responsible for proxying data between the resulting `DataStream`
147
    /// and the local application stream.
148
    ///
149
    // TODO(relay): say how RESOLVE streams should be handled
150
    //
151
    // TODO: declare a type-alias for the impl futures::Stream return type
152
    // when support for impl in type aliases gets stabilized.
153
    //
154
    // See issue #63063 <https://github.com/rust-lang/rust/issues/63063>
155
    //
156
    // TODO(DEDUP): the incoming stream handling is *very* similar
157
    // to the impll from ServiceOnionServiceDataTunnel::allow_stream_requests.
158
    // We should dedupe these someday, when we rewrite the client reactor
159
    // to use the new multi-reactor architecture
160
    #[allow(clippy::too_many_arguments)] // TODO
161
40
    pub(crate) fn new(
162
40
        runtime: R,
163
40
        channel: &Arc<Channel>,
164
40
        circ_id: CircId,
165
40
        unique_id: UniqId,
166
40
        input: CircuitRxReceiver,
167
40
        crypto_in: Box<dyn InboundRelayLayer + Send>,
168
40
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
169
40
        settings: &HopSettings,
170
40
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
171
40
        padding_ctrl: PaddingController,
172
40
        padding_event_stream: PaddingEventStream,
173
40
        incoming_filter: Box<dyn IncomingStreamRequestFilter>,
174
40
        memquota: &CircuitAccount,
175
40
    ) -> crate::Result<(
176
40
        Self,
177
40
        Arc<RelayCirc>,
178
40
        impl futures::Stream<Item = IncomingStream> + use<R>,
179
40
    )> {
180
        // NOTE: not registering this channel with the memquota subsystem is okay,
181
        // because it has no buffering (if ever decide to make the size of this buffer
182
        // non-zero for whatever reason, we must remember to register it with memquota
183
        // so that it counts towards the total memory usage for the circuit.
184
        #[allow(clippy::disallowed_methods)]
185
40
        let (stream_tx, stream_rx) = mpsc::channel(0);
186

            
187
        /// The size of the channel receiving IncomingStreamRequestContexts.
188
        ///
189
        // TODO(relay-tuning): buffer size
190
        //
191
        // This is currently set to 2x the initial receive window,
192
        // the same as the buffer size we use for onion services.
193
        // This value was picked arbitrarily,
194
        // and is not necessarily tuned for relay needs.
195
        const INCOMING_BUFFER: usize = crate::stream::STREAM_READER_BUFFER;
196

            
197
40
        let time_provider = DynTimeProvider::new(runtime.clone());
198
40
        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
199
40
            .new_mq(time_provider.clone(), memquota.as_raw_account())?;
200

            
201
        // Our IncomingCmdChecker does not reject BEGIN, BEGIN_DIR, RESOLVE cells,
202
        // but that doesn't necessarily mean the stream will be accepted.
203
        // An incoming stream can still be rejected at a later stage,
204
        // by the IncomingStreamRequestFilter, or directly by the consumer of the
205
        // futures::Stream<Item = IncomingStream> (by calling IncomingStream::reject()).
206
40
        let cmd_checker =
207
40
            IncomingCmdChecker::new_any(&[RelayCmd::BEGIN, RelayCmd::BEGIN_DIR, RelayCmd::RESOLVE]);
208
40
        let incoming_handler = IncomingStreamRequestHandler {
209
40
            incoming_sender,
210
40
            hop_num: None,
211
40
            cmd_checker,
212
40
            filter: incoming_filter,
213
40
        };
214
40
        let mut hop_mgr = HopMgr::new_with_incoming_handler(
215
40
            runtime.clone(),
216
40
            unique_id,
217
40
            StreamHandler,
218
40
            stream_tx,
219
40
            incoming_handler,
220
40
            memquota.clone(),
221
        );
222

            
223
        // On the relay side, we always have one "hop" (ourselves).
224
        //
225
        // Clients will need to call this function in response to CtrlMsg::Create
226
        // (TODO: for clients, we probably will need to store a bunch more state here)
227
40
        hop_mgr.add_hop(settings.clone())?;
228

            
229
        // TODO(relay): currently we don't need buffering on this channel,
230
        // but we might need it if we start using it for more than just EXTENDED2 events
231
        #[allow(clippy::disallowed_methods)]
232
40
        let (fwd_ev_tx, fwd_ev_rx) = mpsc::channel(0);
233
40
        let forward = Forward::new(
234
40
            channel,
235
40
            unique_id,
236
40
            crypto_out,
237
40
            chan_provider,
238
40
            fwd_ev_tx,
239
40
            memquota.clone(),
240
        );
241
40
        let backward = Backward::new(crypto_in);
242

            
243
40
        let (inner, handle) = BaseReactor::new(
244
40
            runtime,
245
40
            channel,
246
40
            circ_id,
247
40
            unique_id,
248
40
            input,
249
40
            forward,
250
40
            backward,
251
40
            hop_mgr,
252
40
            padding_ctrl,
253
40
            padding_event_stream,
254
40
            stream_rx,
255
40
            fwd_ev_rx,
256
40
            memquota,
257
40
        );
258

            
259
40
        let reactor = Self(inner);
260
40
        let handle = Arc::new(RelayCirc(handle));
261

            
262
        // Note: tunnel is a bit of a misnomer for relays
263
40
        let tunnel = Arc::clone(&handle);
264
        // TODO(relay): this is more or less copy-pasta from client code
265
40
        let stream = incoming_receiver.map(move |req_ctx| {
266
            let StreamReqInfo {
267
8
                req,
268
8
                stream_id,
269
8
                hop,
270
                stream_components:
271
                    ReactorStreamComponents {
272
8
                        stream_inbound_rx,
273
8
                        stream_outbound_tx,
274
8
                        rate_limit_rx,
275
8
                        drain_rate_request_rx,
276
                    },
277
8
                memquota,
278
8
                relay_cell_format,
279
8
            } = req_ctx;
280

            
281
            // There is no originating hop if we're a relay
282
8
            debug_assert!(hop.is_none());
283

            
284
8
            let target = StreamTarget {
285
8
                tunnel: Tunnel::Relay(Arc::clone(&tunnel)),
286
8
                tx: stream_outbound_tx,
287
8
                hop: None,
288
8
                stream_id,
289
8
                relay_cell_format,
290
8
                rate_limit_stream: rate_limit_rx,
291
8
            };
292

            
293
            // can be used to build a reader that supports XON/XOFF flow control
294
8
            let xon_xoff_reader_ctrl =
295
8
                XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
296

            
297
8
            let reader = StreamReceiver {
298
8
                target: target.clone(),
299
8
                receiver: stream_inbound_rx,
300
8
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
301
8
                ended: false,
302
8
            };
303

            
304
8
            let components = StreamComponents {
305
8
                stream_receiver: reader,
306
8
                target,
307
8
                memquota,
308
8
                xon_xoff_reader_ctrl,
309
8
            };
310

            
311
8
            IncomingStream::new(time_provider.clone(), req, components)
312
8
        });
313

            
314
40
        Ok((reactor, handle, stream))
315
40
    }
316

            
317
    /// Launch the reactor, and run until the circuit closes or we
318
    /// encounter an error.
319
    ///
320
    /// Once this method returns, the circuit is dead and cannot be
321
    /// used again.
322
40
    pub(crate) async fn run(mut self) -> crate::Result<()> {
323
40
        self.0.run().await
324
40
    }
325
}
326

            
327
#[cfg(test)]
328
pub(crate) mod test {
329
    // @@ begin test lint list maintained by maint/add_warning @@
330
    #![allow(clippy::bool_assert_comparison)]
331
    #![allow(clippy::clone_on_copy)]
332
    #![allow(clippy::dbg_macro)]
333
    #![allow(clippy::mixed_attributes_style)]
334
    #![allow(clippy::print_stderr)]
335
    #![allow(clippy::print_stdout)]
336
    #![allow(clippy::single_char_pattern)]
337
    #![allow(clippy::unwrap_used)]
338
    #![allow(clippy::unchecked_time_subtraction)]
339
    #![allow(clippy::useless_vec)]
340
    #![allow(clippy::needless_pass_by_value)]
341
    #![allow(clippy::string_slice)] // See arti#2571
342
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
343

            
344
    use super::*;
345
    use crate::circuit::reactor::test::{AllowAllStreamsFilter, rmsg_to_ccmsg};
346
    use crate::circuit::test::fake_mpsc;
347
    use crate::circuit::{CircParameters, CircuitRxSender};
348
    use crate::client::circuit::padding::new_padding;
349
    use crate::congestion::test_utils::params::build_cc_vegas_params;
350
    use crate::crypto::cell::RelayCellBody;
351
    use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
352
    use crate::relay::channel::test::{DummyChan, DummyChanProvider, working_dummy_channel};
353
    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
354
    use crate::stream::incoming::IncomingStream;
355

            
356
    use futures::AsyncReadExt as _;
357
    use tracing_test::traced_test;
358

            
359
    use tor_cell::chancell::{ChanCell, ChanCmd, msg as chanmsg};
360
    use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, msg as relaymsg};
361
    use tor_linkspec::{EncodedLinkSpec, HasRelayIds, LinkSpec};
362
    use tor_protover::{Protocols, named};
363
    use tor_rtcompat::SpawnExt;
364
    use tor_rtcompat::{DynTimeProvider, Runtime};
365
    use tor_rtmock::MockRuntime;
366

            
367
    use chanmsg::{AnyChanMsg, Destroy, DestroyReason, HandshakeType};
368
    use relaymsg::SendmeTag;
369

            
370
    use std::net::IpAddr;
371
    use std::sync::{Arc, Mutex, mpsc};
372

            
373
    // An inbound encryption layer that doesn't do any crypto.
374
    struct DummyInboundCrypto {}
375

            
376
    // An outbound encryption layer that doesn't do any crypto.
377
    struct DummyOutboundCrypto {
378
        /// Channel for controlling whether the current cell is meant for us or not.
379
        ///
380
        /// Useful for tests that check if recognized/unrecognized
381
        /// cells are handled/forwarded correctly.
382
        recognized_rx: mpsc::Receiver<Recognized>,
383
    }
384

            
385
    const DUMMY_TAG: [u8; 20] = [1; 20];
386

            
387
    impl InboundRelayLayer for DummyInboundCrypto {
388
        fn originate(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
389
            DUMMY_TAG.into()
390
        }
391

            
392
        fn encrypt_inbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
393
    }
394

            
395
    impl OutboundRelayLayer for DummyOutboundCrypto {
396
        fn decrypt_outbound(
397
            &mut self,
398
            _cmd: ChanCmd,
399
            _cell: &mut RelayCellBody,
400
        ) -> Option<SendmeTag> {
401
            // Note: this should never block.
402
            let recognized = self.recognized_rx.recv().unwrap();
403

            
404
            match recognized {
405
                Recognized::Yes => Some(DUMMY_TAG.into()),
406
                Recognized::No => None,
407
            }
408
        }
409
    }
410

            
411
    struct ReactorTestCtrl {
412
        /// The relay circuit handle.
413
        relay_circ: Arc<RelayCirc>,
414
        /// Mock channel -> circuit reactor MPSC channel.
415
        circmsg_send: CircuitRxSender,
416
        /// The inbound channel ("towards the client").
417
        inbound_chan: DummyChan,
418
        /// The outbound channel ("away from the client"), if any.
419
        ///
420
        /// Shared with the DummyChanProvider, which initializes this
421
        /// when the relay reactor launches a channel to the next hop
422
        /// via `get_or_launch()`.
423
        outbound_chan: Arc<Mutex<Option<DummyChan>>>,
424
        /// MPSC channel for telling the DummyOutboundCrypto that the next
425
        /// cell we're about to send to the reactor should be "recognized".
426
        recognized_tx: mpsc::Sender<Recognized>,
427
    }
428

            
429
    /// Whether a forward cell to send should be "recognized"
430
    /// or "unrecognized" by the relay under test.
431
    enum Recognized {
432
        /// Recognized
433
        Yes,
434
        /// Unrecognized
435
        No,
436
    }
437

            
438
    impl ReactorTestCtrl {
439
        /// Spawn a relay circuit reactor, returning a `ReactorTestCtrl` for
440
        /// controlling it.
441
        fn spawn_reactor<R: Runtime>(
442
            rt: &R,
443
        ) -> (Self, impl futures::Stream<Item = IncomingStream>) {
444
            let inbound_chan = working_dummy_channel(rt);
445
            let circid = CircId::new(1337).unwrap();
446
            let unique_id = UniqId::new(8, 17);
447
            let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
448
            let (circmsg_send, circmsg_recv) = fake_mpsc(64);
449
            let params = CircParameters::new(
450
                true,
451
                build_cc_vegas_params(),
452
                FlowCtrlParameters::defaults_for_tests(),
453
            );
454
            let settings = HopSettings::from_params_and_caps(
455
                crate::circuit::circhop::HopNegotiationType::Full,
456
                &params,
457
                &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
458
            )
459
            .unwrap();
460

            
461
            let outbound_chan = Arc::new(Mutex::new(None));
462
            let (recognized_tx, recognized_rx) = mpsc::channel();
463
            let chan_provider = Arc::new(DummyChanProvider::new(
464
                rt.clone(),
465
                Arc::clone(&outbound_chan),
466
            ));
467

            
468
            let (reactor, relay_circ, incoming_streams) = Reactor::new(
469
                rt.clone(),
470
                &Arc::clone(&inbound_chan.channel),
471
                circid,
472
                unique_id,
473
                circmsg_recv,
474
                Box::new(DummyInboundCrypto {}),
475
                Box::new(DummyOutboundCrypto { recognized_rx }),
476
                &settings,
477
                chan_provider,
478
                padding_ctrl,
479
                padding_stream,
480
                Box::new(AllowAllStreamsFilter),
481
                &CircuitAccount::new_noop(),
482
            )
483
            .unwrap();
484

            
485
            rt.spawn(async {
486
                let _ = reactor.run().await;
487
            })
488
            .unwrap();
489

            
490
            let ctrl = Self {
491
                relay_circ,
492
                circmsg_send,
493
                recognized_tx,
494
                inbound_chan,
495
                outbound_chan,
496
            };
497

            
498
            (ctrl, incoming_streams)
499
        }
500

            
501
        /// Simulate the sending of a forward relay message through our relay.
502
        async fn send_fwd(
503
            &mut self,
504
            id: Option<StreamId>,
505
            msg: relaymsg::AnyRelayMsg,
506
            recognized: Recognized,
507
            early: bool,
508
        ) {
509
            // This a bit janky, but for each forward cell we send to the reactor
510
            // we need to send a bit of metadata to the DummyOutboundLayer
511
            // specifying whether the cell should be treated as recognized
512
            // or unrecognized
513
            self.recognized_tx.send(recognized).unwrap();
514
            self.circmsg_send
515
                .send(rmsg_to_ccmsg(id, msg, early))
516
                .await
517
                .unwrap();
518
        }
519

            
520
        /// Simulate the sending of a forward channel message through our relay.
521
        async fn send_fwd_cmsg(&mut self, msg: chanmsg::AnyChanMsg) {
522
            self.circmsg_send.send(msg).await.unwrap();
523
        }
524

            
525
        /// Whether the reactor opened an outbound channel
526
        /// (i.e. a channel to the next relay in the circuit).
527
        fn outbound_chan_launched(&self) -> bool {
528
            self.outbound_chan.lock().unwrap().is_some()
529
        }
530

            
531
        /// Perform the CREATE2 handshake.
532
        async fn do_create2_handshake(
533
            &mut self,
534
            rt: &MockRuntime,
535
            expected_hs_type: HandshakeType,
536
        ) -> Option<CircId> {
537
            // First, check that the reactor actually sent a CREATE2 to the next hop...
538
            let (circid, msg) = self.read_outbound().into_circid_and_msg();
539
            let _create2 = match msg {
540
                chanmsg::AnyChanMsg::Create2(c) => {
541
                    assert_eq!(c.handshake_type(), expected_hs_type);
542
                    c
543
                }
544
                _ => panic!("unexpected forwarded {msg:?}"),
545
            };
546

            
547
            let handshake = vec![];
548
            let created2 = chanmsg::Created2::new(handshake.clone());
549
            // ...and then finalize the handshake by pretending to be
550
            // the responding relay
551
            self.write_outbound(circid, chanmsg::AnyChanMsg::Created2(created2));
552
            rt.advance_until_stalled().await;
553

            
554
            // Make sure we actually did send an EXTENDED2 towards the client
555
            let msg = self.read_inbound();
556
            let rmsg = match msg.msg() {
557
                chanmsg::AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
558
                    RelayCellFormat::V0,
559
                    r.clone().into_relay_body(),
560
                )
561
                .unwrap(),
562
                _ => panic!("unexpected forwarded {msg:?}"),
563
            };
564

            
565
            match rmsg.msg() {
566
                relaymsg::AnyRelayMsg::Extended2(e) => {
567
                    assert_eq!(e.clone().into_body(), handshake);
568
                }
569
                _ => panic!("unexpected relay message {rmsg:?}"),
570
            }
571

            
572
            circid
573
        }
574

            
575
        /// Whether the circuit is closing (e.g. due to a proto violation).
576
        fn is_closing(&self) -> bool {
577
            self.relay_circ.is_closing()
578
        }
579

            
580
        /// Read a cell from the inbound channel
581
        /// (moving towards the client).
582
        ///
583
        /// Panics if there are no ready cells on the inbound MPSC channel.
584
        fn read_inbound(&mut self) -> ChanCell<AnyChanMsg> {
585
            #[allow(deprecated)] // TODO(#2386)
586
            self.inbound_chan.rx.try_next().unwrap().unwrap()
587
        }
588

            
589
        /// Read a cell from the outbound channel
590
        /// (moving towards the next hop).
591
        ///
592
        /// Panics if there are no ready cells on the outbound MPSC channel.
593
        fn read_outbound(&mut self) -> ChanCell<AnyChanMsg> {
594
            let mut lock = self.outbound_chan.lock().unwrap();
595
            let chan = lock.as_mut().unwrap();
596
            #[allow(deprecated)] // TODO(#2386)
597
            chan.rx.try_next().unwrap().unwrap()
598
        }
599

            
600
        /// Write to the sending end of the outbound Tor channel.
601
        ///
602
        /// Simulates the receipt of a cell from the next hop.
603
        ///
604
        /// Panics if the outbound chan sender is full.
605
        fn write_outbound(&mut self, circid: Option<CircId>, msg: chanmsg::AnyChanMsg) {
606
            let mut lock = self.outbound_chan.lock().unwrap();
607
            let chan = lock.as_mut().unwrap();
608
            let cell = ChanCell::new(circid, msg);
609

            
610
            chan.tx.try_send(Ok(cell)).unwrap();
611
        }
612
    }
613

            
614
    fn dummy_linkspecs() -> Vec<EncodedLinkSpec> {
615
        vec![
616
            LinkSpec::Ed25519Id([43; 32].into()).encode().unwrap(),
617
            LinkSpec::RsaId([45; 20].into()).encode().unwrap(),
618
            LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
619
                .encode()
620
                .unwrap(),
621
        ]
622
    }
623

            
624
    /// Assert that we have sent a DESTROY cell with the specified `reason`
625
    /// both towards the "client" and towards the "next hop", if there is one,
626
    /// and that the relay circuit is shutting down.
627
    ///
628
    /// The test is expected to drain the inbound Tor "channel"
629
    /// of any non-ending cells it might be expecting before calling this function.
630
    fn assert_destroy_sent(ctrl: &mut ReactorTestCtrl, reason: DestroyReason) {
631
        assert!(ctrl.is_closing());
632

            
633
        macro_rules! assert_cell_is_destroy {
634
            ($cell:expr) => {{
635
                match $cell.msg() {
636
                    chanmsg::AnyChanMsg::Destroy(d) => {
637
                        assert_eq!(d.reason(), reason);
638
                    }
639
                    _ => panic!("unexpected ending {:?}", $cell),
640
                }
641
            }};
642
        }
643

            
644
        // We *always* send a DESTROY towards the client
645
        // when killing the circuit
646
        let cell = ctrl.read_inbound();
647
        assert_cell_is_destroy!(cell);
648

            
649
        // If there's an outbound channel, ensure we sent a DESTROY over it too.
650
        if ctrl.outbound_chan_launched() {
651
            let cell = ctrl.read_outbound();
652
            assert_cell_is_destroy!(cell);
653
        }
654
    }
655

            
656
    macro_rules! expect_cell {
657
        ($cell:expr, $chanmsg:tt, $relaymsg:tt) => {{
658
            let msg = match $cell.msg() {
659
                chanmsg::AnyChanMsg::$chanmsg(m) => {
660
                    let body = m.clone().into_relay_body();
661
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, body).unwrap()
662
                }
663
                _ => panic!("unexpected forwarded {:?}", $cell),
664
            };
665

            
666
            match msg.msg() {
667
                relaymsg::AnyRelayMsg::$relaymsg(m) => m.clone(),
668
                _ => panic!("unexpected cell {msg:?}"),
669
            }
670
        }};
671
    }
672

            
673
    #[traced_test]
674
    #[test]
675
    fn reject_extend2_relay() {
676
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
677
            let (mut ctrl, _incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
678
            rt.advance_until_stalled().await;
679

            
680
            let linkspecs = dummy_linkspecs();
681
            let extend2 = relaymsg::Extend2::new(linkspecs, HandshakeType::NTOR_V3, vec![]).into();
682
            ctrl.send_fwd(None, extend2, Recognized::Yes, false).await;
683
            rt.advance_until_stalled().await;
684

            
685
            assert!(logs_contain("got EXTEND2 in a RELAY cell?!"));
686
            assert!(!ctrl.outbound_chan_launched());
687
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
688
        });
689
    }
690

            
691
    #[traced_test]
692
    #[test]
693
    fn reject_extend2_previous_hop() {
694
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
695
            let (mut ctrl, _incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
696
            rt.advance_until_stalled().await;
697

            
698
            // No outbound circuits yet
699
            assert!(!ctrl.outbound_chan_launched());
700

            
701
            // Build a linkspec with the identities of the dummy channel
702
            let mut linkspecs = ctrl
703
                .inbound_chan
704
                .channel
705
                .target()
706
                .identities()
707
                .map(|id| LinkSpec::from(id.to_owned()).encode())
708
                .collect::<Result<Vec<_>, _>>()
709
                .unwrap();
710

            
711
            // Make sure this channel actually has some identities
712
            // (i.e. that it's not a client channel or something)
713
            assert_eq!(linkspecs.len(), 2);
714

            
715
            // There must be at least one IPv4 OR port address
716
            linkspecs.push(
717
                LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
718
                    .encode()
719
                    .unwrap(),
720
            );
721
            let handshake_type = HandshakeType::NTOR_V3;
722
            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
723
            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
724
            rt.advance_until_stalled().await;
725

            
726
            // The reactor handled the EXTEND2 and launched an outbound channel
727
            assert!(logs_contain("Cannot extend circuit to previous hop"));
728
            assert!(!ctrl.outbound_chan_launched());
729
            assert!(ctrl.is_closing());
730
        });
731
    }
732

            
733
    #[traced_test]
734
    #[test]
735
    fn extend_and_forward() {
736
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
737
            let (mut ctrl, _incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
738
            rt.advance_until_stalled().await;
739

            
740
            // No outbound circuits yet
741
            assert!(!ctrl.outbound_chan_launched());
742

            
743
            let linkspecs = dummy_linkspecs();
744
            let handshake_type = HandshakeType::NTOR_V3;
745
            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
746
            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
747
            rt.advance_until_stalled().await;
748

            
749
            // The reactor handled the EXTEND2 and launched an outbound channel
750
            assert!(logs_contain(
751
                "Launched channel to the next hop circ_id=Circ 8.17"
752
            ));
753
            assert!(ctrl.outbound_chan_launched());
754
            assert!(!ctrl.is_closing());
755

            
756
            let _circid = ctrl.do_create2_handshake(&rt, handshake_type).await;
757
            assert!(logs_contain("Got CREATED2 response from next hop"));
758
            assert!(logs_contain("Extended circuit to the next hop"));
759

            
760
            // Time to forward a message to the next hop!
761
            let early = false;
762
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
763
            ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
764
                .await;
765
            rt.advance_until_stalled().await;
766

            
767
            // Ensure the other end received the BEGIN cell
768
            let cell = ctrl.read_outbound();
769
            let recvd_begin = expect_cell!(cell, Relay, Begin);
770
            assert_eq!(begin, recvd_begin);
771

            
772
            // Now send the same message again, but this time in a RELAY_EARLY
773
            let early = true;
774
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
775
            ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
776
                .await;
777
            rt.advance_until_stalled().await;
778
            let cell = ctrl.read_outbound();
779
            let recvd_begin = expect_cell!(cell, RelayEarly, Begin);
780
            assert_eq!(begin, recvd_begin);
781
        });
782
    }
783

            
784
    #[traced_test]
785
    #[test]
786
    fn forward_before_extend() {
787
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
788
            let (mut ctrl, _incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
789
            rt.advance_until_stalled().await;
790

            
791
            // Send an arbitrary unrecognized cell. The reactor should flag this as
792
            // a protocol violation, because we don't have an outbound channel to forward it on.
793
            let extend2 = relaymsg::End::new_misc().into();
794
            ctrl.send_fwd(None, extend2, Recognized::No, true).await;
795
            rt.advance_until_stalled().await;
796

            
797
            // The reactor handled the EXTEND2 and launched an outbound channel
798
            assert!(logs_contain(
799
                "Asked to forward cell before the circuit was extended?!"
800
            ));
801
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
802
        });
803
    }
804

            
805
    #[traced_test]
806
    #[test]
807
    fn reject_invalid_begin() {
808
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
809
            let (mut ctrl, _incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
810
            rt.advance_until_stalled().await;
811

            
812
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
813

            
814
            // BEGIN cells *must* have a stream ID, so expect the reactor to reject this
815
            // and close the circuit
816
            ctrl.send_fwd(None, begin, Recognized::Yes, false).await;
817
            rt.advance_until_stalled().await;
818

            
819
            assert!(logs_contain(
820
                "Invalid stream ID [scrubbed] for relay command BEGIN"
821
            ));
822
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
823
        });
824
    }
825

            
826
    #[traced_test]
827
    #[test]
828
    fn destroy_from_client() {
829
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
830
            let (mut ctrl, _incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
831
            rt.advance_until_stalled().await;
832

            
833
            // Simulate the client sending us a DESTROY cell
834
            let destroy = Destroy::new(DestroyReason::PROTOCOL);
835
            ctrl.send_fwd_cmsg(destroy.into()).await;
836
            rt.advance_until_stalled().await;
837

            
838
            assert!(logs_contain(
839
                "Received outbound DESTROY, circuit shutting down"
840
            ));
841

            
842
            // Ensure the destroy reason (PROTOCOL) is not propagated
843
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
844
        });
845
    }
846

            
847
    #[traced_test]
848
    #[test]
849
    fn destroy_from_next_hop() {
850
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
851
            let (mut ctrl, _incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
852
            rt.advance_until_stalled().await;
853

            
854
            // Extend the circuit by another hop
855
            let linkspecs = dummy_linkspecs();
856
            let handshake_type = HandshakeType::NTOR_V3;
857
            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
858
            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
859
            rt.advance_until_stalled().await;
860
            let circid = ctrl.do_create2_handshake(&rt, handshake_type).await;
861
            assert!(logs_contain("Extended circuit to the next hop"));
862
            assert!(ctrl.outbound_chan_launched());
863

            
864
            // Simulate the client sending us a DESTROY cell
865
            let destroy = Destroy::new(DestroyReason::PROTOCOL);
866
            ctrl.write_outbound(circid, destroy.into());
867
            rt.advance_until_stalled().await;
868

            
869
            // We have *not* received an outbound destroy
870
            assert!(!logs_contain(
871
                "Received outbound DESTROY, circuit shutting down"
872
            ));
873

            
874
            // We received an inbound one (from the next hop)
875
            assert!(logs_contain(
876
                "Received inbound DESTROY, circuit shutting down"
877
            ));
878

            
879
            // Ensure the destroy reason (PROTOCOL) is not propagated
880
            // This will check that we've sent a DESTROY cell in both directions.
881
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
882
        });
883
    }
884

            
885
    #[traced_test]
886
    #[test]
887
    fn truncate() {
888
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
889
            let (mut ctrl, _incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
890
            rt.advance_until_stalled().await;
891

            
892
            // Simulate the client sending us a TRUNCATE cell
893
            let truncate = relaymsg::Truncate::default().into();
894
            ctrl.send_fwd(None, truncate, Recognized::Yes, false).await;
895
            rt.advance_until_stalled().await;
896

            
897
            assert!(logs_contain(
898
                "Circuit protocol violation: TRUNCATE not allowed"
899
            ));
900

            
901
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
902
        });
903
    }
904

            
905
    #[traced_test]
906
    #[test]
907
    fn data_stream() {
908
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
909
            const TO_SEND: &[u8] = b"The bells were musical in the silvery sun";
910

            
911
            let (mut ctrl, mut incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
912
            rt.advance_until_stalled().await;
913

            
914
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
915
            ctrl.send_fwd(StreamId::new(1), begin, Recognized::Yes, false)
916
                .await;
917
            rt.advance_until_stalled().await;
918

            
919
            let data = relaymsg::Data::new(TO_SEND).unwrap().into();
920
            ctrl.send_fwd(StreamId::new(1), data, Recognized::Yes, false)
921
                .await;
922

            
923
            // We should have a pending incoming stream
924
            let pending = incoming_streams.next().await.unwrap();
925

            
926
            // Accept it, and let's see what we have!
927
            let mut stream = pending
928
                .accept_data(relaymsg::Connected::new_empty())
929
                .await
930
                .unwrap();
931

            
932
            let mut recv_buf = [0_u8; TO_SEND.len()];
933
            stream.read_exact(&mut recv_buf).await.unwrap();
934
            assert_eq!(recv_buf, TO_SEND);
935
        });
936
    }
937

            
938
    #[traced_test]
939
    #[test]
940
    fn reject_stream() {
941
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
942
            let (mut ctrl, mut incoming_streams) = ReactorTestCtrl::spawn_reactor(&rt);
943
            rt.advance_until_stalled().await;
944

            
945
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
946
            ctrl.send_fwd(StreamId::new(1), begin, Recognized::Yes, false)
947
                .await;
948
            rt.advance_until_stalled().await;
949

            
950
            // We should have a pending incoming stream
951
            let pending = incoming_streams.next().await.unwrap();
952

            
953
            // Reject the stream, and wait for the reactor to finish sending the END
954
            let end = relaymsg::End::new_misc();
955
            pending.reject(end.clone()).await.unwrap();
956
            rt.advance_until_stalled().await;
957

            
958
            // The END cell written to the Tor channel should be the same as
959
            // the one we sent above, in reject().
960
            let cell = ctrl.read_inbound();
961
            let actual_end = expect_cell!(cell, Relay, End);
962
            assert_eq!(end.reason(), actual_end.reason());
963

            
964
            // Sending another message on this stream results is flagged
965
            // as a proto violation
966
            let data = relaymsg::Data::new(b"no dice").unwrap().into();
967
            ctrl.send_fwd(StreamId::new(1), data, Recognized::Yes, false)
968
                .await;
969
            rt.advance_until_stalled().await;
970

            
971
            assert!(logs_contain("Stream protocol violation"));
972
            assert!(logs_contain(
973
                "Unexpected RelayCmd(DATA) message on unknown stream 1"
974
            ));
975
        });
976
    }
977
}