1
//! Module exposing the relay circuit reactor subsystem.
2
//!
3
//! See [`reactor`](crate::circuit::reactor) for a description of the overall architecture.
4
//!
5
//! #### `ForwardReactor`
6
//!
7
//! It handles
8
//!
9
//!  * unrecognized RELAY cells, by moving them in the forward direction (towards the exit)
10
//!  * recognized RELAY cells, by splitting each cell into messages, and handling
11
//!    each message individually as described in the table below
12
//!    (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell).
13
//!  * RELAY_EARLY cells (**not yet implemented**)
14
//!  * DESTROY cells (**not yet implemented**)
15
//!  * PADDING_NEGOTIATE cells (**not yet implemented**)
16
//!
17
//! ```text
18
//!
19
//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
20
//!
21
//! | RELAY cmd         | Received in | Handled in | Description                            |
22
//! |-------------------|-------------|------------|----------------------------------------|
23
//! | DROP              | F           | F          | Passed to PaddingController for        |
24
//! |                   |             |            | validation                             |
25
//! |-------------------|-------------|------------|----------------------------------------|
26
//! | EXTEND2           | F           |            | Handled by instructing the channel     |
27
//! |                   |             |            | provider to launch a new channel, and  |
28
//! |                   |             |            | waiting for the new channel on its     |
29
//! |                   |             |            | outgoing_chan_rx receiver              |
30
//! |                   |             |            | (**not yet implemented**)              |
31
//! |-------------------|-------------|------------|----------------------------------------|
32
//! | TRUNCATE          | F           | F          | (**not yet implemented**)              |
33
//! |                   |             |            |                                        |
34
//! |-------------------|-------------|------------|----------------------------------------|
35
//! | TODO              |             |            |                                        |
36
//! |                   |             |            |                                        |
37
//! ```
38

            
39
pub(crate) mod backward;
40
pub(crate) mod forward;
41

            
42
use std::sync::Arc;
43
use std::time::Duration;
44

            
45
use futures::channel::mpsc;
46

            
47
use tor_cell::chancell::CircId;
48
use tor_linkspec::OwnedChanTarget;
49
use tor_rtcompat::Runtime;
50

            
51
use crate::channel::Channel;
52
use crate::circuit::circhop::{CircHopOutbound, HopSettings};
53
use crate::circuit::reactor::Reactor as BaseReactor;
54
use crate::circuit::reactor::hop_mgr::HopMgr;
55
use crate::circuit::reactor::stream;
56
use crate::circuit::{CircuitRxReceiver, UniqId};
57
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
58
use crate::memquota::CircuitAccount;
59
use crate::relay::RelayCirc;
60
use crate::relay::channel_provider::ChannelProvider;
61
use crate::relay::reactor::backward::Backward;
62
use crate::relay::reactor::forward::Forward;
63

            
64
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
65
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
66

            
67
/// Type-alias for the relay base reactor type.
68
type RelayBaseReactor<R> = BaseReactor<R, Forward, Backward>;
69

            
70
/// The entry point of the circuit reactor subsystem.
71
#[allow(unused)] // TODO(relay)
72
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
73
pub(crate) struct Reactor<R: Runtime>(RelayBaseReactor<R>);
74

            
75
/// A handler customizing the relay stream reactor.
76
struct StreamHandler;
77

            
78
impl stream::StreamHandler for StreamHandler {
79
4
    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration {
80
4
        let ccontrol = hop.ccontrol();
81

            
82
        // Note: if we have no measurements for the RTT, this will be set to 0,
83
        // so the stream will be removed from the stream map immediately,
84
        // and any subsequent messages arriving on it will trigger
85
        // a proto violation causing the circuit to close.
86
        //
87
        // TODO(relay-tuning): we should make sure that this doesn't cause us to
88
        // wrongly close legitimate circuits that still have in-flight stream data
89
4
        ccontrol
90
4
            .lock()
91
4
            .expect("poisoned lock")
92
4
            .rtt()
93
4
            .max_rtt_usec()
94
4
            .map(|rtt| Duration::from_millis(u64::from(rtt)))
95
            // TODO(relay): we should fallback to a non-zero default here
96
            // if we don't have any RTT measurements yet
97
4
            .unwrap_or_default()
98
4
    }
99
}
100

            
101
#[allow(unused)] // TODO(relay)
102
impl<R: Runtime> Reactor<R> {
103
    /// Create a new circuit reactor.
104
    ///
105
    /// The reactor will send outbound messages on `channel`, receive incoming
106
    /// messages on `input`, and identify this circuit by the channel-local
107
    /// [`CircId`] provided.
108
    ///
109
    /// The internal unique identifier for this circuit will be `unique_id`.
110
    #[allow(clippy::too_many_arguments)] // TODO
111
36
    pub(crate) fn new(
112
36
        runtime: R,
113
36
        channel: &Arc<Channel>,
114
36
        circ_id: CircId,
115
36
        unique_id: UniqId,
116
36
        input: CircuitRxReceiver,
117
36
        crypto_in: Box<dyn InboundRelayLayer + Send>,
118
36
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
119
36
        settings: &HopSettings,
120
36
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
121
36
        padding_ctrl: PaddingController,
122
36
        padding_event_stream: PaddingEventStream,
123
36
        memquota: &CircuitAccount,
124
36
    ) -> crate::Result<(Self, Arc<RelayCirc>)> {
125
        // NOTE: not registering this channel with the memquota subsystem is okay,
126
        // because it has no buffering (if ever decide to make the size of this buffer
127
        // non-zero for whatever reason, we must remember to register it with memquota
128
        // so that it counts towards the total memory usage for the circuit.
129
        #[allow(clippy::disallowed_methods)]
130
36
        let (stream_tx, stream_rx) = mpsc::channel(0);
131

            
132
36
        let mut hop_mgr = HopMgr::new(
133
36
            runtime.clone(),
134
36
            unique_id,
135
36
            StreamHandler,
136
36
            stream_tx,
137
36
            memquota.clone(),
138
        );
139

            
140
        // On the relay side, we always have one "hop" (ourselves).
141
        //
142
        // Clients will need to call this function in response to CtrlMsg::Create
143
        // (TODO: for clients, we probably will need to store a bunch more state here)
144
36
        hop_mgr.add_hop(settings.clone())?;
145

            
146
        // TODO(relay): currently we don't need buffering on this channel,
147
        // but we might need it if we start using it for more than just EXTENDED2 events
148
        #[allow(clippy::disallowed_methods)]
149
36
        let (fwd_ev_tx, fwd_ev_rx) = mpsc::channel(0);
150
36
        let forward = Forward::new(
151
36
            channel,
152
36
            unique_id,
153
36
            crypto_out,
154
36
            chan_provider,
155
36
            fwd_ev_tx,
156
36
            memquota.clone(),
157
        );
158
36
        let backward = Backward::new(crypto_in);
159

            
160
36
        let (inner, handle) = BaseReactor::new(
161
36
            runtime,
162
36
            channel,
163
36
            circ_id,
164
36
            unique_id,
165
36
            input,
166
36
            forward,
167
36
            backward,
168
36
            hop_mgr,
169
36
            padding_ctrl,
170
36
            padding_event_stream,
171
36
            stream_rx,
172
36
            fwd_ev_rx,
173
36
            memquota,
174
36
        );
175

            
176
36
        let reactor = Self(inner);
177
36
        let handle = Arc::new(RelayCirc(handle));
178

            
179
36
        Ok((reactor, handle))
180
36
    }
181

            
182
    /// Launch the reactor, and run until the circuit closes or we
183
    /// encounter an error.
184
    ///
185
    /// Once this method returns, the circuit is dead and cannot be
186
    /// used again.
187
36
    pub(crate) async fn run(mut self) -> crate::Result<()> {
188
36
        self.0.run().await
189
36
    }
190
}
191

            
192
#[cfg(test)]
193
pub(crate) mod test {
194
    // @@ begin test lint list maintained by maint/add_warning @@
195
    #![allow(clippy::bool_assert_comparison)]
196
    #![allow(clippy::clone_on_copy)]
197
    #![allow(clippy::dbg_macro)]
198
    #![allow(clippy::mixed_attributes_style)]
199
    #![allow(clippy::print_stderr)]
200
    #![allow(clippy::print_stdout)]
201
    #![allow(clippy::single_char_pattern)]
202
    #![allow(clippy::unwrap_used)]
203
    #![allow(clippy::unchecked_time_subtraction)]
204
    #![allow(clippy::useless_vec)]
205
    #![allow(clippy::needless_pass_by_value)]
206
    #![allow(clippy::string_slice)] // See arti#2571
207
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
208

            
209
    use super::*;
210
    use crate::circuit::reactor::test::{AllowAllStreamsFilter, rmsg_to_ccmsg};
211
    use crate::circuit::test::fake_mpsc;
212
    use crate::circuit::{CircParameters, CircuitRxSender};
213
    use crate::client::circuit::padding::new_padding;
214
    use crate::congestion::test_utils::params::build_cc_vegas_params;
215
    use crate::crypto::cell::RelayCellBody;
216
    use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
217
    use crate::memquota::SpecificAccount as _;
218
    use crate::relay::channel::test::{DummyChan, DummyChanProvider, working_dummy_channel};
219
    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
220
    use crate::stream::incoming::{IncomingStream, IncomingStreamRequestFilter};
221

            
222
    use futures::{AsyncReadExt as _, StreamExt as _};
223
    use tracing_test::traced_test;
224

            
225
    use tor_cell::chancell::{ChanCell, ChanCmd, msg as chanmsg};
226
    use tor_cell::relaycell::{
227
        AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg,
228
    };
229
    use tor_linkspec::{EncodedLinkSpec, HasRelayIds, LinkSpec};
230
    use tor_protover::{Protocols, named};
231
    use tor_rtcompat::SpawnExt;
232
    use tor_rtcompat::{DynTimeProvider, Runtime};
233
    use tor_rtmock::MockRuntime;
234

            
235
    use chanmsg::{AnyChanMsg, Destroy, DestroyReason, HandshakeType};
236
    use relaymsg::SendmeTag;
237

            
238
    use std::net::IpAddr;
239
    use std::sync::{Arc, Mutex, mpsc};
240

            
241
    // An inbound encryption layer that doesn't do any crypto.
242
    struct DummyInboundCrypto {}
243

            
244
    // An outbound encryption layer that doesn't do any crypto.
245
    struct DummyOutboundCrypto {
246
        /// Channel for controlling whether the current cell is meant for us or not.
247
        ///
248
        /// Useful for tests that check if recognized/unrecognized
249
        /// cells are handled/forwarded correctly.
250
        recognized_rx: mpsc::Receiver<Recognized>,
251
    }
252

            
253
    const DUMMY_TAG: [u8; 20] = [1; 20];
254

            
255
    impl InboundRelayLayer for DummyInboundCrypto {
256
        fn originate(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
257
            DUMMY_TAG.into()
258
        }
259

            
260
        fn encrypt_inbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
261
    }
262

            
263
    impl OutboundRelayLayer for DummyOutboundCrypto {
264
        fn decrypt_outbound(
265
            &mut self,
266
            _cmd: ChanCmd,
267
            _cell: &mut RelayCellBody,
268
        ) -> Option<SendmeTag> {
269
            // Note: this should never block.
270
            let recognized = self.recognized_rx.recv().unwrap();
271

            
272
            match recognized {
273
                Recognized::Yes => Some(DUMMY_TAG.into()),
274
                Recognized::No => None,
275
            }
276
        }
277
    }
278

            
279
    struct ReactorTestCtrl {
280
        /// The relay circuit handle.
281
        relay_circ: Arc<RelayCirc>,
282
        /// Mock channel -> circuit reactor MPSC channel.
283
        circmsg_send: CircuitRxSender,
284
        /// The inbound channel ("towards the client").
285
        inbound_chan: DummyChan,
286
        /// The outbound channel ("away from the client"), if any.
287
        ///
288
        /// Shared with the DummyChanProvider, which initializes this
289
        /// when the relay reactor launches a channel to the next hop
290
        /// via `get_or_launch()`.
291
        outbound_chan: Arc<Mutex<Option<DummyChan>>>,
292
        /// MPSC channel for telling the DummyOutboundCrypto that the next
293
        /// cell we're about to send to the reactor should be "recognized".
294
        recognized_tx: mpsc::Sender<Recognized>,
295
    }
296

            
297
    /// Whether a forward cell to send should be "recognized"
298
    /// or "unrecognized" by the relay under test.
299
    enum Recognized {
300
        /// Recognized
301
        Yes,
302
        /// Unrecognized
303
        No,
304
    }
305

            
306
    impl ReactorTestCtrl {
307
        /// Spawn a relay circuit reactor, returning a `ReactorTestCtrl` for
308
        /// controlling it.
309
        fn spawn_reactor<R: Runtime>(rt: &R) -> Self {
310
            let inbound_chan = working_dummy_channel(rt);
311
            let circid = CircId::new(1337).unwrap();
312
            let unique_id = UniqId::new(8, 17);
313
            let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
314
            let (circmsg_send, circmsg_recv) = fake_mpsc(64);
315
            let params = CircParameters::new(
316
                true,
317
                build_cc_vegas_params(),
318
                FlowCtrlParameters::defaults_for_tests(),
319
            );
320
            let settings = HopSettings::from_params_and_caps(
321
                crate::circuit::circhop::HopNegotiationType::Full,
322
                &params,
323
                &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
324
            )
325
            .unwrap();
326

            
327
            let outbound_chan = Arc::new(Mutex::new(None));
328
            let (recognized_tx, recognized_rx) = mpsc::channel();
329
            let chan_provider = Arc::new(DummyChanProvider::new(
330
                rt.clone(),
331
                Arc::clone(&outbound_chan),
332
            ));
333

            
334
            let (reactor, relay_circ) = Reactor::new(
335
                rt.clone(),
336
                &Arc::clone(&inbound_chan.channel),
337
                circid,
338
                unique_id,
339
                circmsg_recv,
340
                Box::new(DummyInboundCrypto {}),
341
                Box::new(DummyOutboundCrypto { recognized_rx }),
342
                &settings,
343
                chan_provider,
344
                padding_ctrl,
345
                padding_stream,
346
                &CircuitAccount::new_noop(),
347
            )
348
            .unwrap();
349

            
350
            rt.spawn(async {
351
                let _ = reactor.run().await;
352
            })
353
            .unwrap();
354

            
355
            Self {
356
                relay_circ,
357
                circmsg_send,
358
                recognized_tx,
359
                inbound_chan,
360
                outbound_chan,
361
            }
362
        }
363

            
364
        /// Simulate the sending of a forward relay message through our relay.
365
        async fn send_fwd(
366
            &mut self,
367
            id: Option<StreamId>,
368
            msg: relaymsg::AnyRelayMsg,
369
            recognized: Recognized,
370
            early: bool,
371
        ) {
372
            // This a bit janky, but for each forward cell we send to the reactor
373
            // we need to send a bit of metadata to the DummyOutboundLayer
374
            // specifying whether the cell should be treated as recognized
375
            // or unrecognized
376
            self.recognized_tx.send(recognized).unwrap();
377
            self.circmsg_send
378
                .send(rmsg_to_ccmsg(id, msg, early))
379
                .await
380
                .unwrap();
381
        }
382

            
383
        /// Simulate the sending of a forward channel message through our relay.
384
        async fn send_fwd_cmsg(&mut self, msg: chanmsg::AnyChanMsg) {
385
            self.circmsg_send.send(msg).await.unwrap();
386
        }
387

            
388
        /// Whether the reactor opened an outbound channel
389
        /// (i.e. a channel to the next relay in the circuit).
390
        fn outbound_chan_launched(&self) -> bool {
391
            self.outbound_chan.lock().unwrap().is_some()
392
        }
393

            
394
        /// Allow inbound stream requests.
395
        ///
396
        /// Used for testing leaky pipe and exit functionality.
397
        async fn allow_stream_requests<'a, FILT>(
398
            &self,
399
            allow_commands: &'a [RelayCmd],
400
            filter: FILT,
401
        ) -> impl futures::Stream<Item = IncomingStream> + use<'a, FILT>
402
        where
403
            FILT: IncomingStreamRequestFilter,
404
        {
405
            Arc::clone(&self.relay_circ)
406
                .allow_stream_requests(allow_commands, filter)
407
                .await
408
                .unwrap()
409
        }
410

            
411
        /// Perform the CREATE2 handshake.
412
        async fn do_create2_handshake(
413
            &mut self,
414
            rt: &MockRuntime,
415
            expected_hs_type: HandshakeType,
416
        ) -> Option<CircId> {
417
            // First, check that the reactor actually sent a CREATE2 to the next hop...
418
            let (circid, msg) = self.read_outbound().into_circid_and_msg();
419
            let _create2 = match msg {
420
                chanmsg::AnyChanMsg::Create2(c) => {
421
                    assert_eq!(c.handshake_type(), expected_hs_type);
422
                    c
423
                }
424
                _ => panic!("unexpected forwarded {msg:?}"),
425
            };
426

            
427
            let handshake = vec![];
428
            let created2 = chanmsg::Created2::new(handshake.clone());
429
            // ...and then finalize the handshake by pretending to be
430
            // the responding relay
431
            self.write_outbound(circid, chanmsg::AnyChanMsg::Created2(created2));
432
            rt.advance_until_stalled().await;
433

            
434
            // Make sure we actually did send an EXTENDED2 towards the client
435
            let msg = self.read_inbound();
436
            let rmsg = match msg.msg() {
437
                chanmsg::AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
438
                    RelayCellFormat::V0,
439
                    r.clone().into_relay_body(),
440
                )
441
                .unwrap(),
442
                _ => panic!("unexpected forwarded {msg:?}"),
443
            };
444

            
445
            match rmsg.msg() {
446
                relaymsg::AnyRelayMsg::Extended2(e) => {
447
                    assert_eq!(e.clone().into_body(), handshake);
448
                }
449
                _ => panic!("unexpected relay message {rmsg:?}"),
450
            }
451

            
452
            circid
453
        }
454

            
455
        /// Whether the circuit is closing (e.g. due to a proto violation).
456
        fn is_closing(&self) -> bool {
457
            self.relay_circ.is_closing()
458
        }
459

            
460
        /// Read a cell from the inbound channel
461
        /// (moving towards the client).
462
        ///
463
        /// Panics if there are no ready cells on the inbound MPSC channel.
464
        fn read_inbound(&mut self) -> ChanCell<AnyChanMsg> {
465
            #[allow(deprecated)] // TODO(#2386)
466
            self.inbound_chan.rx.try_next().unwrap().unwrap()
467
        }
468

            
469
        /// Read a cell from the outbound channel
470
        /// (moving towards the next hop).
471
        ///
472
        /// Panics if there are no ready cells on the outbound MPSC channel.
473
        fn read_outbound(&mut self) -> ChanCell<AnyChanMsg> {
474
            let mut lock = self.outbound_chan.lock().unwrap();
475
            let chan = lock.as_mut().unwrap();
476
            #[allow(deprecated)] // TODO(#2386)
477
            chan.rx.try_next().unwrap().unwrap()
478
        }
479

            
480
        /// Write to the sending end of the outbound Tor channel.
481
        ///
482
        /// Simulates the receipt of a cell from the next hop.
483
        ///
484
        /// Panics if the outbound chan sender is full.
485
        fn write_outbound(&mut self, circid: Option<CircId>, msg: chanmsg::AnyChanMsg) {
486
            let mut lock = self.outbound_chan.lock().unwrap();
487
            let chan = lock.as_mut().unwrap();
488
            let cell = ChanCell::new(circid, msg);
489

            
490
            chan.tx.try_send(Ok(cell)).unwrap();
491
        }
492
    }
493

            
494
    fn dummy_linkspecs() -> Vec<EncodedLinkSpec> {
495
        vec![
496
            LinkSpec::Ed25519Id([43; 32].into()).encode().unwrap(),
497
            LinkSpec::RsaId([45; 20].into()).encode().unwrap(),
498
            LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
499
                .encode()
500
                .unwrap(),
501
        ]
502
    }
503

            
504
    /// Assert that we have sent a DESTROY cell with the specified `reason`
505
    /// both towards the "client" and towards the "next hop", if there is one,
506
    /// and that the relay circuit is shutting down.
507
    ///
508
    /// The test is expected to drain the inbound Tor "channel"
509
    /// of any non-ending cells it might be expecting before calling this function.
510
    fn assert_destroy_sent(ctrl: &mut ReactorTestCtrl, reason: DestroyReason) {
511
        assert!(ctrl.is_closing());
512

            
513
        macro_rules! assert_cell_is_destroy {
514
            ($cell:expr) => {{
515
                match $cell.msg() {
516
                    chanmsg::AnyChanMsg::Destroy(d) => {
517
                        assert_eq!(d.reason(), reason);
518
                    }
519
                    _ => panic!("unexpected ending {:?}", $cell),
520
                }
521
            }};
522
        }
523

            
524
        // We *always* send a DESTROY towards the client
525
        // when killing the circuit
526
        let cell = ctrl.read_inbound();
527
        assert_cell_is_destroy!(cell);
528

            
529
        // If there's an outbound channel, ensure we sent a DESTROY over it too.
530
        if ctrl.outbound_chan_launched() {
531
            let cell = ctrl.read_outbound();
532
            assert_cell_is_destroy!(cell);
533
        }
534
    }
535

            
536
    #[traced_test]
537
    #[test]
538
    fn reject_extend2_relay() {
539
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
540
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
541
            rt.advance_until_stalled().await;
542

            
543
            let linkspecs = dummy_linkspecs();
544
            let extend2 = relaymsg::Extend2::new(linkspecs, HandshakeType::NTOR_V3, vec![]).into();
545
            ctrl.send_fwd(None, extend2, Recognized::Yes, false).await;
546
            rt.advance_until_stalled().await;
547

            
548
            assert!(logs_contain("got EXTEND2 in a RELAY cell?!"));
549
            assert!(!ctrl.outbound_chan_launched());
550
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
551
        });
552
    }
553

            
554
    #[traced_test]
555
    #[test]
556
    fn reject_extend2_previous_hop() {
557
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
558
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
559
            rt.advance_until_stalled().await;
560

            
561
            // No outbound circuits yet
562
            assert!(!ctrl.outbound_chan_launched());
563

            
564
            // Build a linkspec with the identities of the dummy channel
565
            let mut linkspecs = ctrl
566
                .inbound_chan
567
                .channel
568
                .target()
569
                .identities()
570
                .map(|id| LinkSpec::from(id.to_owned()).encode())
571
                .collect::<Result<Vec<_>, _>>()
572
                .unwrap();
573

            
574
            // Make sure this channel actually has some identities
575
            // (i.e. that it's not a client channel or something)
576
            assert_eq!(linkspecs.len(), 2);
577

            
578
            // There must be at least one IPv4 OR port address
579
            linkspecs.push(
580
                LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
581
                    .encode()
582
                    .unwrap(),
583
            );
584
            let handshake_type = HandshakeType::NTOR_V3;
585
            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
586
            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
587
            rt.advance_until_stalled().await;
588

            
589
            // The reactor handled the EXTEND2 and launched an outbound channel
590
            assert!(logs_contain("Cannot extend circuit to previous hop"));
591
            assert!(!ctrl.outbound_chan_launched());
592
            assert!(ctrl.is_closing());
593
        });
594
    }
595

            
596
    #[traced_test]
597
    #[test]
598
    fn extend_and_forward() {
599
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
600
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
601
            rt.advance_until_stalled().await;
602

            
603
            // No outbound circuits yet
604
            assert!(!ctrl.outbound_chan_launched());
605

            
606
            let linkspecs = dummy_linkspecs();
607
            let handshake_type = HandshakeType::NTOR_V3;
608
            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
609
            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
610
            rt.advance_until_stalled().await;
611

            
612
            // The reactor handled the EXTEND2 and launched an outbound channel
613
            assert!(logs_contain(
614
                "Launched channel to the next hop circ_id=Circ 8.17"
615
            ));
616
            assert!(ctrl.outbound_chan_launched());
617
            assert!(!ctrl.is_closing());
618

            
619
            let _circid = ctrl.do_create2_handshake(&rt, handshake_type).await;
620
            assert!(logs_contain("Got CREATED2 response from next hop"));
621
            assert!(logs_contain("Extended circuit to the next hop"));
622

            
623
            // Time to forward a message to the next hop!
624
            let early = false;
625
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
626
            ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
627
                .await;
628
            rt.advance_until_stalled().await;
629

            
630
            macro_rules! expect_cell {
631
                ($chanmsg:tt, $relaymsg:tt) => {{
632
                    let cell = ctrl.read_outbound();
633
                    let msg = match cell.msg() {
634
                        chanmsg::AnyChanMsg::$chanmsg(m) => {
635
                            let body = m.clone().into_relay_body();
636
                            AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, body).unwrap()
637
                        }
638
                        _ => panic!("unexpected forwarded {cell:?}"),
639
                    };
640

            
641
                    match msg.msg() {
642
                        relaymsg::AnyRelayMsg::$relaymsg(m) => m.clone(),
643
                        _ => panic!("unexpected cell {msg:?}"),
644
                    }
645
                }};
646
            }
647

            
648
            // Ensure the other end received the BEGIN cell
649
            let recvd_begin = expect_cell!(Relay, Begin);
650
            assert_eq!(begin, recvd_begin);
651

            
652
            // Now send the same message again, but this time in a RELAY_EARLY
653
            let early = true;
654
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
655
            ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
656
                .await;
657
            rt.advance_until_stalled().await;
658
            let recvd_begin = expect_cell!(RelayEarly, Begin);
659
            assert_eq!(begin, recvd_begin);
660
        });
661
    }
662

            
663
    #[traced_test]
664
    #[test]
665
    fn forward_before_extend() {
666
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
667
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
668
            rt.advance_until_stalled().await;
669

            
670
            // Send an arbitrary unrecognized cell. The reactor should flag this as
671
            // a protocol violation, because we don't have an outbound channel to forward it on.
672
            let extend2 = relaymsg::End::new_misc().into();
673
            ctrl.send_fwd(None, extend2, Recognized::No, true).await;
674
            rt.advance_until_stalled().await;
675

            
676
            // The reactor handled the EXTEND2 and launched an outbound channel
677
            assert!(logs_contain(
678
                "Asked to forward cell before the circuit was extended?!"
679
            ));
680
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
681
        });
682
    }
683

            
684
    #[traced_test]
685
    #[test]
686
    fn reject_invalid_begin() {
687
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
688
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
689
            rt.advance_until_stalled().await;
690

            
691
            let _streams = ctrl
692
                .allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
693
                .await;
694

            
695
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
696

            
697
            // BEGIN cells *must* have a stream ID, so expect the reactor to reject this
698
            // and close the circuit
699
            ctrl.send_fwd(None, begin, Recognized::Yes, false).await;
700
            rt.advance_until_stalled().await;
701

            
702
            assert!(logs_contain(
703
                "Invalid stream ID [scrubbed] for relay command BEGIN"
704
            ));
705
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
706
        });
707
    }
708

            
709
    #[traced_test]
710
    #[test]
711
    fn destroy_from_client() {
712
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
713
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
714
            rt.advance_until_stalled().await;
715

            
716
            // Simulate the client sending us a DESTROY cell
717
            let destroy = Destroy::new(DestroyReason::PROTOCOL);
718
            ctrl.send_fwd_cmsg(destroy.into()).await;
719
            rt.advance_until_stalled().await;
720

            
721
            assert!(logs_contain(
722
                "Received outbound DESTROY, circuit shutting down"
723
            ));
724

            
725
            // Ensure the destroy reason (PROTOCOL) is not propagated
726
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
727
        });
728
    }
729

            
730
    #[traced_test]
731
    #[test]
732
    fn destroy_from_next_hop() {
733
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
734
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
735
            rt.advance_until_stalled().await;
736

            
737
            // Extend the circuit by another hop
738
            let linkspecs = dummy_linkspecs();
739
            let handshake_type = HandshakeType::NTOR_V3;
740
            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
741
            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
742
            rt.advance_until_stalled().await;
743
            let circid = ctrl.do_create2_handshake(&rt, handshake_type).await;
744
            assert!(logs_contain("Extended circuit to the next hop"));
745
            assert!(ctrl.outbound_chan_launched());
746

            
747
            // Simulate the client sending us a DESTROY cell
748
            let destroy = Destroy::new(DestroyReason::PROTOCOL);
749
            ctrl.write_outbound(circid, destroy.into());
750
            rt.advance_until_stalled().await;
751

            
752
            // We have *not* received an outbound destroy
753
            assert!(!logs_contain(
754
                "Received outbound DESTROY, circuit shutting down"
755
            ));
756

            
757
            // We received an inbound one (from the next hop)
758
            assert!(logs_contain(
759
                "Received inbound DESTROY, circuit shutting down"
760
            ));
761

            
762
            // Ensure the destroy reason (PROTOCOL) is not propagated
763
            // This will check that we've sent a DESTROY cell in both directions.
764
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
765
        });
766
    }
767

            
768
    #[traced_test]
769
    #[test]
770
    fn truncate() {
771
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
772
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
773
            rt.advance_until_stalled().await;
774

            
775
            // Simulate the client sending us a TRUNCATE cell
776
            let truncate = relaymsg::Truncate::default().into();
777
            ctrl.send_fwd(None, truncate, Recognized::Yes, false).await;
778
            rt.advance_until_stalled().await;
779

            
780
            assert!(logs_contain(
781
                "Circuit protocol violation: TRUNCATE not allowed"
782
            ));
783

            
784
            assert_destroy_sent(&mut ctrl, DestroyReason::NONE);
785
        });
786
    }
787

            
788
    #[traced_test]
789
    #[test]
790
    fn data_stream() {
791
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
792
            const TO_SEND: &[u8] = b"The bells were musical in the silvery sun";
793

            
794
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
795
            rt.advance_until_stalled().await;
796

            
797
            let mut incoming_streams = ctrl
798
                .allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
799
                .await;
800

            
801
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
802
            ctrl.send_fwd(StreamId::new(1), begin, Recognized::Yes, false)
803
                .await;
804
            rt.advance_until_stalled().await;
805

            
806
            let data = relaymsg::Data::new(TO_SEND).unwrap().into();
807
            ctrl.send_fwd(StreamId::new(1), data, Recognized::Yes, false)
808
                .await;
809

            
810
            // We should have a pending incoming stream
811
            let pending = incoming_streams.next().await.unwrap();
812

            
813
            // Accept it, and let's see what we have!
814
            let mut stream = pending
815
                .accept_data(relaymsg::Connected::new_empty())
816
                .await
817
                .unwrap();
818

            
819
            let mut recv_buf = [0_u8; TO_SEND.len()];
820
            stream.read_exact(&mut recv_buf).await.unwrap();
821
            assert_eq!(recv_buf, TO_SEND);
822
        });
823
    }
824
}