1
//! Module exposing the relay circuit reactor subsystem.
2
//!
3
//! See [`reactor`](crate::circuit::reactor) for a description of the overall architecture.
4
//!
5
//! #### `ForwardReactor`
6
//!
7
//! It handles
8
//!
9
//!  * unrecognized RELAY cells, by moving them in the forward direction (towards the exit)
10
//!  * recognized RELAY cells, by splitting each cell into messages, and handling
11
//!    each message individually as described in the table below
12
//!    (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell).
13
//!  * RELAY_EARLY cells (**not yet implemented**)
14
//!  * DESTROY cells (**not yet implemented**)
15
//!  * PADDING_NEGOTIATE cells (**not yet implemented**)
16
//!
17
//! ```text
18
//!
19
//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
20
//!
21
//! | RELAY cmd         | Received in | Handled in | Description                            |
22
//! |-------------------|-------------|------------|----------------------------------------|
23
//! | DROP              | F           | F          | Passed to PaddingController for        |
24
//! |                   |             |            | validation                             |
25
//! |-------------------|-------------|------------|----------------------------------------|
26
//! | EXTEND2           | F           |            | Handled by instructing the channel     |
27
//! |                   |             |            | provider to launch a new channel, and  |
28
//! |                   |             |            | waiting for the new channel on its     |
29
//! |                   |             |            | outgoing_chan_rx receiver              |
30
//! |                   |             |            | (**not yet implemented**)              |
31
//! |-------------------|-------------|------------|----------------------------------------|
32
//! | TRUNCATE          | F           | F          | (**not yet implemented**)              |
33
//! |                   |             |            |                                        |
34
//! |-------------------|-------------|------------|----------------------------------------|
35
//! | TODO              |             |            |                                        |
36
//! |                   |             |            |                                        |
37
//! ```
38

            
39
pub(crate) mod backward;
40
pub(crate) mod forward;
41

            
42
use std::sync::Arc;
43
use std::time::Duration;
44

            
45
use futures::channel::mpsc;
46

            
47
use tor_cell::chancell::CircId;
48
use tor_linkspec::OwnedChanTarget;
49
use tor_rtcompat::Runtime;
50

            
51
use crate::channel::Channel;
52
use crate::circuit::circhop::{CircHopOutbound, HopSettings};
53
use crate::circuit::reactor::Reactor as BaseReactor;
54
use crate::circuit::reactor::hop_mgr::HopMgr;
55
use crate::circuit::reactor::stream;
56
use crate::circuit::{CircuitRxReceiver, UniqId};
57
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
58
use crate::memquota::CircuitAccount;
59
use crate::relay::RelayCirc;
60
use crate::relay::channel_provider::ChannelProvider;
61
use crate::relay::reactor::backward::Backward;
62
use crate::relay::reactor::forward::Forward;
63

            
64
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
65
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
66

            
67
/// Type-alias for the relay base reactor type.
68
type RelayBaseReactor<R> = BaseReactor<R, Forward, Backward>;
69

            
70
/// The entry point of the circuit reactor subsystem.
71
#[allow(unused)] // TODO(relay)
72
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
73
pub(crate) struct Reactor<R: Runtime>(RelayBaseReactor<R>);
74

            
75
/// A handler customizing the relay stream reactor.
76
struct StreamHandler;
77

            
78
impl stream::StreamHandler for StreamHandler {
79
    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration {
80
        let ccontrol = hop.ccontrol();
81

            
82
        // Note: if we have no measurements for the RTT, this will be set to 0,
83
        // so the stream will be removed from the stream map immediately,
84
        // and any subsequent messages arriving on it will trigger
85
        // a proto violation causing the circuit to close.
86
        //
87
        // TODO(relay-tuning): we should make sure that this doesn't cause us to
88
        // wrongly close legitimate circuits that still have in-flight stream data
89
        ccontrol
90
            .lock()
91
            .expect("poisoned lock")
92
            .rtt()
93
            .max_rtt_usec()
94
            .map(|rtt| Duration::from_millis(u64::from(rtt)))
95
            // TODO(relay): we should fallback to a non-zero default here
96
            // if we don't have any RTT measurements yet
97
            .unwrap_or_default()
98
    }
99
}
100

            
101
#[allow(unused)] // TODO(relay)
102
impl<R: Runtime> Reactor<R> {
103
    /// Create a new circuit reactor.
104
    ///
105
    /// The reactor will send outbound messages on `channel`, receive incoming
106
    /// messages on `input`, and identify this circuit by the channel-local
107
    /// [`CircId`] provided.
108
    ///
109
    /// The internal unique identifier for this circuit will be `unique_id`.
110
    #[allow(clippy::too_many_arguments)] // TODO
111
16
    pub(crate) fn new(
112
16
        runtime: R,
113
16
        channel: &Arc<Channel>,
114
16
        circ_id: CircId,
115
16
        unique_id: UniqId,
116
16
        input: CircuitRxReceiver,
117
16
        crypto_in: Box<dyn InboundRelayLayer + Send>,
118
16
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
119
16
        settings: &HopSettings,
120
16
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
121
16
        padding_ctrl: PaddingController,
122
16
        padding_event_stream: PaddingEventStream,
123
16
        memquota: &CircuitAccount,
124
16
    ) -> crate::Result<(Self, Arc<RelayCirc>)> {
125
        // NOTE: not registering this channel with the memquota subsystem is okay,
126
        // because it has no buffering (if ever decide to make the size of this buffer
127
        // non-zero for whatever reason, we must remember to register it with memquota
128
        // so that it counts towards the total memory usage for the circuit.
129
        #[allow(clippy::disallowed_methods)]
130
16
        let (stream_tx, stream_rx) = mpsc::channel(0);
131

            
132
16
        let mut hop_mgr = HopMgr::new(
133
16
            runtime.clone(),
134
16
            unique_id,
135
16
            StreamHandler,
136
16
            stream_tx,
137
16
            memquota.clone(),
138
        );
139

            
140
        // On the relay side, we always have one "hop" (ourselves).
141
        //
142
        // Clients will need to call this function in response to CtrlMsg::Create
143
        // (TODO: for clients, we probably will need to store a bunch more state here)
144
16
        hop_mgr.add_hop(settings.clone())?;
145

            
146
        // TODO(relay): currently we don't need buffering on this channel,
147
        // but we might need it if we start using it for more than just EXTENDED2 events
148
        #[allow(clippy::disallowed_methods)]
149
16
        let (fwd_ev_tx, fwd_ev_rx) = mpsc::channel(0);
150
16
        let forward_foo = Forward::new(
151
16
            unique_id,
152
16
            crypto_out,
153
16
            chan_provider,
154
16
            fwd_ev_tx,
155
16
            memquota.clone(),
156
        );
157
16
        let backward_foo = Backward::new(crypto_in);
158

            
159
16
        let (inner, handle) = BaseReactor::new(
160
16
            runtime,
161
16
            channel,
162
16
            circ_id,
163
16
            unique_id,
164
16
            input,
165
16
            forward_foo,
166
16
            backward_foo,
167
16
            hop_mgr,
168
16
            padding_ctrl,
169
16
            padding_event_stream,
170
16
            stream_rx,
171
16
            fwd_ev_rx,
172
16
            memquota,
173
16
        );
174

            
175
16
        let reactor = Self(inner);
176
16
        let handle = Arc::new(RelayCirc(handle));
177

            
178
16
        Ok((reactor, handle))
179
16
    }
180

            
181
    /// Launch the reactor, and run until the circuit closes or we
182
    /// encounter an error.
183
    ///
184
    /// Once this method returns, the circuit is dead and cannot be
185
    /// used again.
186
16
    pub(crate) async fn run(mut self) -> crate::Result<()> {
187
16
        self.0.run().await
188
16
    }
189
}
190

            
191
#[cfg(test)]
192
pub(crate) mod test {
193
    // @@ begin test lint list maintained by maint/add_warning @@
194
    #![allow(clippy::bool_assert_comparison)]
195
    #![allow(clippy::clone_on_copy)]
196
    #![allow(clippy::dbg_macro)]
197
    #![allow(clippy::mixed_attributes_style)]
198
    #![allow(clippy::print_stderr)]
199
    #![allow(clippy::print_stdout)]
200
    #![allow(clippy::single_char_pattern)]
201
    #![allow(clippy::unwrap_used)]
202
    #![allow(clippy::unchecked_time_subtraction)]
203
    #![allow(clippy::useless_vec)]
204
    #![allow(clippy::needless_pass_by_value)]
205
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
206

            
207
    use super::*;
208
    use crate::channel::test::{CodecResult, new_reactor};
209
    use crate::circuit::reactor::test::{AllowAllStreamsFilter, rmsg_to_ccmsg};
210
    use crate::circuit::{CircParameters, CircuitRxSender};
211
    use crate::client::circuit::padding::new_padding;
212
    use crate::congestion::test_utils::params::build_cc_vegas_params;
213
    use crate::crypto::cell::RelayCellBody;
214
    use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
215
    use crate::fake_mpsc;
216
    use crate::memquota::SpecificAccount as _;
217
    use crate::relay::channel_provider::{ChannelProvider, OutboundChanSender};
218
    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
219
    use crate::stream::incoming::{IncomingStream, IncomingStreamRequestFilter};
220

            
221
    use futures::channel::mpsc::{Receiver, Sender};
222
    use futures::{AsyncReadExt as _, SinkExt as _, StreamExt as _};
223
    use tracing_test::traced_test;
224

            
225
    use tor_cell::chancell::{AnyChanCell, ChanCell, ChanCmd, msg as chanmsg};
226
    use tor_cell::relaycell::{
227
        AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg,
228
    };
229
    use tor_linkspec::{EncodedLinkSpec, LinkSpec};
230
    use tor_protover::{Protocols, named};
231
    use tor_rtcompat::SpawnExt;
232
    use tor_rtcompat::{DynTimeProvider, Runtime};
233
    use tor_rtmock::MockRuntime;
234

            
235
    use chanmsg::{AnyChanMsg, DestroyReason, HandshakeType};
236
    use relaymsg::SendmeTag;
237

            
238
    use std::net::IpAddr;
239
    use std::sync::{Arc, Mutex, mpsc};
240

            
241
    // An inbound encryption layer that doesn't do any crypto.
242
    struct DummyInboundCrypto {}
243

            
244
    // An outbound encryption layer that doesn't do any crypto.
245
    struct DummyOutboundCrypto {
246
        /// Channel for controlling whether the current cell is meant for us or not.
247
        ///
248
        /// Useful for tests that check if recognized/unrecognized
249
        /// cells are handled/forwarded correctly.
250
        recognized_rx: mpsc::Receiver<Recognized>,
251
    }
252

            
253
    const DUMMY_TAG: [u8; 20] = [1; 20];
254

            
255
    impl InboundRelayLayer for DummyInboundCrypto {
256
        fn originate(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
257
            DUMMY_TAG.into()
258
        }
259

            
260
        fn encrypt_inbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
261
    }
262

            
263
    impl OutboundRelayLayer for DummyOutboundCrypto {
264
        fn decrypt_outbound(
265
            &mut self,
266
            _cmd: ChanCmd,
267
            _cell: &mut RelayCellBody,
268
        ) -> Option<SendmeTag> {
269
            // Note: this should never block.
270
            let recognized = self.recognized_rx.recv().unwrap();
271

            
272
            match recognized {
273
                Recognized::Yes => Some(DUMMY_TAG.into()),
274
                Recognized::No => None,
275
            }
276
        }
277
    }
278

            
279
    struct DummyChanProvider<R> {
280
        /// A handle to the runtime.
281
        runtime: R,
282
        /// The outbound channel, shared with the test controller.
283
        outbound: Arc<Mutex<Option<DummyChan>>>,
284
    }
285

            
286
    impl<R: Runtime> DummyChanProvider<R> {
287
        fn new(runtime: R, outbound: Arc<Mutex<Option<DummyChan>>>) -> Self {
288
            Self { runtime, outbound }
289
        }
290
    }
291

            
292
    impl<R: Runtime> ChannelProvider for DummyChanProvider<R> {
293
        type BuildSpec = OwnedChanTarget;
294

            
295
        fn get_or_launch(
296
            self: Arc<Self>,
297
            _reactor_id: UniqId,
298
            _target: Self::BuildSpec,
299
            tx: OutboundChanSender,
300
        ) -> crate::Result<()> {
301
            let dummy_chan = working_fake_channel(&self.runtime);
302
            let chan = Arc::clone(&dummy_chan.channel);
303
            {
304
                let mut lock = self.outbound.lock().unwrap();
305
                assert!(lock.is_none());
306
                *lock = Some(dummy_chan);
307
            }
308

            
309
            tx.send(Ok(chan));
310

            
311
            Ok(())
312
        }
313
    }
314

            
315
    /// Dummy channel, returned by [`working_fake_channel`].
316
    struct DummyChan {
317
        /// Tor channel output
318
        rx: Receiver<AnyChanCell>,
319
        /// Tor channel input
320
        tx: Sender<CodecResult>,
321
        /// A handle to the Channel object, to prevent the channel reactor
322
        /// from shutting down prematurely.
323
        channel: Arc<Channel>,
324
    }
325

            
326
    struct ReactorTestCtrl {
327
        /// The relay circuit handle.
328
        relay_circ: Arc<RelayCirc>,
329
        /// Mock channel -> circuit reactor MPSC channel.
330
        circmsg_send: CircuitRxSender,
331
        /// The inbound channel ("towards the client").
332
        inbound_chan: DummyChan,
333
        /// The outbound channel ("away from the client"), if any.
334
        ///
335
        /// Shared with the DummyChanProvider, which initializes this
336
        /// when the relay reactor launches a channel to the next hop
337
        /// via `get_or_launch()`.
338
        outbound_chan: Arc<Mutex<Option<DummyChan>>>,
339
        /// MPSC channel for telling the DummyOutboundCrypto that the next
340
        /// cell we're about to send to the reactor should be "recognized".
341
        recognized_tx: mpsc::Sender<Recognized>,
342
    }
343

            
344
    /// Whether a forward cell to send should be "recognized"
345
    /// or "unrecognized" by the relay under test.
346
    enum Recognized {
347
        /// Recognized
348
        Yes,
349
        /// Unrecognized
350
        No,
351
    }
352

            
353
    impl ReactorTestCtrl {
354
        /// Spawn a relay circuit reactor, returning a `ReactorTestCtrl` for
355
        /// controlling it.
356
        fn spawn_reactor<R: Runtime>(rt: &R) -> Self {
357
            let inbound_chan = working_fake_channel(rt);
358
            let circid = CircId::new(1337).unwrap();
359
            let unique_id = UniqId::new(8, 17);
360
            let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
361
            let (circmsg_send, circmsg_recv) = fake_mpsc(64);
362
            let params = CircParameters::new(
363
                true,
364
                build_cc_vegas_params(),
365
                FlowCtrlParameters::defaults_for_tests(),
366
            );
367
            let settings = HopSettings::from_params_and_caps(
368
                crate::circuit::circhop::HopNegotiationType::Full,
369
                &params,
370
                &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
371
            )
372
            .unwrap();
373

            
374
            let outbound_chan = Arc::new(Mutex::new(None));
375
            let (recognized_tx, recognized_rx) = mpsc::channel();
376
            let chan_provider = Arc::new(DummyChanProvider::new(
377
                rt.clone(),
378
                Arc::clone(&outbound_chan),
379
            ));
380

            
381
            let (reactor, relay_circ) = Reactor::new(
382
                rt.clone(),
383
                &Arc::clone(&inbound_chan.channel),
384
                circid,
385
                unique_id,
386
                circmsg_recv,
387
                Box::new(DummyInboundCrypto {}),
388
                Box::new(DummyOutboundCrypto { recognized_rx }),
389
                &settings,
390
                chan_provider,
391
                padding_ctrl,
392
                padding_stream,
393
                &CircuitAccount::new_noop(),
394
            )
395
            .unwrap();
396

            
397
            rt.spawn(async {
398
                let _ = reactor.run().await;
399
            })
400
            .unwrap();
401

            
402
            Self {
403
                relay_circ,
404
                circmsg_send,
405
                recognized_tx,
406
                inbound_chan,
407
                outbound_chan,
408
            }
409
        }
410

            
411
        /// Simulate the sending of a forward relay message through our relay.
412
        async fn send_fwd(
413
            &mut self,
414
            id: Option<StreamId>,
415
            msg: relaymsg::AnyRelayMsg,
416
            recognized: Recognized,
417
            early: bool,
418
        ) {
419
            // This a bit janky, but for each forward cell we send to the reactor
420
            // we need to send a bit of metadata to the DummyOutboundLayer
421
            // specifying whether the cell should be treated as recognized
422
            // or unrecognized
423
            self.recognized_tx.send(recognized).unwrap();
424
            self.circmsg_send
425
                .send(rmsg_to_ccmsg(id, msg, early))
426
                .await
427
                .unwrap();
428
        }
429

            
430
        /// Whether the reactor opened an outbound channel
431
        /// (i.e. a channel to the next relay in the circuit).
432
        fn outbound_chan_launched(&self) -> bool {
433
            self.outbound_chan.lock().unwrap().is_some()
434
        }
435

            
436
        /// Allow inbound stream requests.
437
        ///
438
        /// Used for testing leaky pipe and exit functionality.
439
        async fn allow_stream_requests<'a, FILT>(
440
            &self,
441
            allow_commands: &'a [RelayCmd],
442
            filter: FILT,
443
        ) -> impl futures::Stream<Item = IncomingStream> + use<'a, FILT>
444
        where
445
            FILT: IncomingStreamRequestFilter,
446
        {
447
            Arc::clone(&self.relay_circ)
448
                .allow_stream_requests(allow_commands, filter)
449
                .await
450
                .unwrap()
451
        }
452

            
453
        /// Perform the CREATE2 handshake.
454
        async fn do_create2_handshake(
455
            &mut self,
456
            rt: &MockRuntime,
457
            expected_hs_type: HandshakeType,
458
        ) {
459
            // First, check that the reactor actually sent a CREATE2 to the next hop...
460
            let (circid, msg) = self.read_outbound().into_circid_and_msg();
461
            let _create2 = match msg {
462
                chanmsg::AnyChanMsg::Create2(c) => {
463
                    assert_eq!(c.handshake_type(), expected_hs_type);
464
                    c
465
                }
466
                _ => panic!("unexpected forwarded {msg:?}"),
467
            };
468

            
469
            let handshake = vec![];
470
            let created2 = chanmsg::Created2::new(handshake);
471
            // ...and then finalize the handshake by pretending to be
472
            // the responding relay
473
            self.write_outbound(circid, chanmsg::AnyChanMsg::Created2(created2));
474
            rt.advance_until_stalled().await;
475
        }
476

            
477
        /// Whether the circuit is closing (e.g. due to a proto violation).
478
        fn is_closing(&self) -> bool {
479
            self.relay_circ.is_closing()
480
        }
481

            
482
        /// Read a cell from the inbound channel
483
        /// (moving towards the client).
484
        ///
485
        /// Panics if there are no ready cells on the inbound MPSC channel.
486
        fn read_inbound(&mut self) -> ChanCell<AnyChanMsg> {
487
            #[allow(deprecated)] // TODO(#2386)
488
            self.inbound_chan.rx.try_next().unwrap().unwrap()
489
        }
490

            
491
        /// Read a cell from the outbound channel
492
        /// (moving towards the next hop).
493
        ///
494
        /// Panics if there are no ready cells on the outbound MPSC channel.
495
        fn read_outbound(&mut self) -> ChanCell<AnyChanMsg> {
496
            let mut lock = self.outbound_chan.lock().unwrap();
497
            let chan = lock.as_mut().unwrap();
498
            #[allow(deprecated)] // TODO(#2386)
499
            chan.rx.try_next().unwrap().unwrap()
500
        }
501

            
502
        /// Write to the sending end of the outbound Tor channel.
503
        ///
504
        /// Simulates the receipt of a cell from the next hop.
505
        ///
506
        /// Panics if the outbound chan sender is full.
507
        fn write_outbound(&mut self, circid: Option<CircId>, msg: chanmsg::AnyChanMsg) {
508
            let mut lock = self.outbound_chan.lock().unwrap();
509
            let chan = lock.as_mut().unwrap();
510
            let cell = ChanCell::new(circid, msg);
511

            
512
            chan.tx.try_send(Ok(cell)).unwrap();
513
        }
514
    }
515

            
516
    fn working_fake_channel<R: Runtime>(rt: &R) -> DummyChan {
517
        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
518
        rt.spawn(async {
519
            let _ignore = chan_reactor.run().await;
520
        })
521
        .unwrap();
522

            
523
        DummyChan { tx, rx, channel }
524
    }
525

            
526
    fn dummy_linkspecs() -> Vec<EncodedLinkSpec> {
527
        vec![
528
            LinkSpec::Ed25519Id([43; 32].into()).encode().unwrap(),
529
            LinkSpec::RsaId([45; 20].into()).encode().unwrap(),
530
            LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
531
                .encode()
532
                .unwrap(),
533
        ]
534
    }
535

            
536
    /// Assert that the relay circuit is shutting down.
537
    ///
538
    /// Also asserts that the next cell on the inbound channel
539
    /// is a DESTROY with the specified `reason`.
540
    /// The test is expected to drain the inbound Tor "channel"
541
    /// of any non-ending cells it might be expecting before calling this function.
542
    fn assert_circuit_destroyed(ctrl: &mut ReactorTestCtrl, reason: DestroyReason) {
543
        assert!(ctrl.is_closing());
544

            
545
        let cell = ctrl.read_inbound();
546

            
547
        match cell.msg() {
548
            chanmsg::AnyChanMsg::Destroy(d) => {
549
                assert_eq!(d.reason(), reason);
550
            }
551
            _ => panic!("unexpected ending {cell:?}"),
552
        }
553
    }
554

            
555
    #[traced_test]
556
    #[test]
557
    fn reject_extend2_relay() {
558
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
559
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
560
            rt.advance_until_stalled().await;
561

            
562
            let linkspecs = dummy_linkspecs();
563
            let extend2 = relaymsg::Extend2::new(linkspecs, HandshakeType::NTOR_V3, vec![]).into();
564
            ctrl.send_fwd(None, extend2, Recognized::Yes, false).await;
565
            rt.advance_until_stalled().await;
566

            
567
            assert!(logs_contain("got EXTEND2 in a RELAY cell?!"));
568
            assert!(!ctrl.outbound_chan_launched());
569
            assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
570
        });
571
    }
572

            
573
    #[traced_test]
574
    #[test]
575
    fn extend_and_forward() {
576
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
577
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
578
            rt.advance_until_stalled().await;
579

            
580
            // No outbound circuits yet
581
            assert!(!ctrl.outbound_chan_launched());
582

            
583
            let linkspecs = dummy_linkspecs();
584
            let handshake_type = HandshakeType::NTOR_V3;
585
            let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
586
            ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
587
            rt.advance_until_stalled().await;
588

            
589
            // The reactor handled the EXTEND2 and launched an outbound channel
590
            assert!(logs_contain(
591
                "Launched channel to the next hop circ_id=Circ 8.17"
592
            ));
593
            assert!(ctrl.outbound_chan_launched());
594
            assert!(!ctrl.is_closing());
595

            
596
            ctrl.do_create2_handshake(&rt, handshake_type).await;
597
            assert!(logs_contain("Got CREATED2 response from next hop"));
598
            assert!(logs_contain("Extended circuit to the next hop"));
599

            
600
            // Time to forward a message to the next hop!
601
            let early = false;
602
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
603
            ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
604
                .await;
605
            rt.advance_until_stalled().await;
606

            
607
            macro_rules! expect_cell {
608
                ($chanmsg:tt, $relaymsg:tt) => {{
609
                    let cell = ctrl.read_outbound();
610
                    let msg = match cell.msg() {
611
                        chanmsg::AnyChanMsg::$chanmsg(m) => {
612
                            let body = m.clone().into_relay_body();
613
                            AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, body).unwrap()
614
                        }
615
                        _ => panic!("unexpected forwarded {cell:?}"),
616
                    };
617

            
618
                    match msg.msg() {
619
                        relaymsg::AnyRelayMsg::$relaymsg(m) => m.clone(),
620
                        _ => panic!("unexpected cell {msg:?}"),
621
                    }
622
                }};
623
            }
624

            
625
            // Ensure the other end received the BEGIN cell
626
            let recvd_begin = expect_cell!(Relay, Begin);
627
            assert_eq!(begin, recvd_begin);
628

            
629
            // Now send the same message again, but this time in a RELAY_EARLY
630
            let early = true;
631
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
632
            ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
633
                .await;
634
            rt.advance_until_stalled().await;
635
            let recvd_begin = expect_cell!(RelayEarly, Begin);
636
            assert_eq!(begin, recvd_begin);
637
        });
638
    }
639

            
640
    #[traced_test]
641
    #[test]
642
    fn forward_before_extend() {
643
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
644
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
645
            rt.advance_until_stalled().await;
646

            
647
            // Send an arbitrary unrecognized cell. The reactor should flag this as
648
            // a protocol violation, because we don't have an outbound channel to forward it on.
649
            let extend2 = relaymsg::End::new_misc().into();
650
            ctrl.send_fwd(None, extend2, Recognized::No, true).await;
651
            rt.advance_until_stalled().await;
652

            
653
            // The reactor handled the EXTEND2 and launched an outbound channel
654
            assert!(logs_contain(
655
                "Asked to forward cell before the circuit was extended?!"
656
            ));
657
            assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
658
        });
659
    }
660

            
661
    #[traced_test]
662
    #[test]
663
    fn reject_invalid_begin() {
664
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
665
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
666
            rt.advance_until_stalled().await;
667

            
668
            let _streams = ctrl
669
                .allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
670
                .await;
671

            
672
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
673

            
674
            // BEGIN cells *must* have a stream ID, so expect the reactor to reject this
675
            // and close the circuit
676
            ctrl.send_fwd(None, begin, Recognized::Yes, false).await;
677
            rt.advance_until_stalled().await;
678

            
679
            assert!(logs_contain(
680
                "Invalid stream ID [scrubbed] for relay command BEGIN"
681
            ));
682
            assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
683
        });
684
    }
685

            
686
    #[traced_test]
687
    #[test]
688
    #[ignore] // TODO(relay): Sad trombone, this is not yet supported
689
    fn data_stream() {
690
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
691
            const TO_SEND: &[u8] = b"The bells were musical in the silvery sun";
692

            
693
            let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
694
            rt.advance_until_stalled().await;
695

            
696
            let mut incoming_streams = ctrl
697
                .allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
698
                .await;
699

            
700
            let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
701
            ctrl.send_fwd(StreamId::new(1), begin, Recognized::Yes, false)
702
                .await;
703
            rt.advance_until_stalled().await;
704

            
705
            let data = relaymsg::Data::new(TO_SEND).unwrap().into();
706
            ctrl.send_fwd(StreamId::new(1), data, Recognized::Yes, false)
707
                .await;
708

            
709
            // We should have a pending incoming stream
710
            let pending = incoming_streams.next().await.unwrap();
711

            
712
            // Accept it, and let's see what we have!
713
            let mut stream = pending
714
                .accept_data(relaymsg::Connected::new_empty())
715
                .await
716
                .unwrap();
717

            
718
            let mut recv_buf = [0_u8; TO_SEND.len()];
719
            stream.read_exact(&mut recv_buf).await.unwrap();
720
            assert_eq!(recv_buf, TO_SEND);
721
        });
722
    }
723
}