1
//! Module exposing the circuit reactor subsystem.
2
//!
3
//! This module implements the new [multi-reactor circuit subsystem].
4
//!
5
// Note: this is currently only used for the relay side,
6
// but we plan to eventually rewrite client circuit implementation
7
// to use these new reactor types as well.
8
//!
9
//! The entry point of the reactor is [`Reactor::run`], which launches the
10
//! reactor background tasks, and begins listening for inbound cells on the provided
11
//! inbound Tor channel.
12
//!
13
//! ### Architecture
14
//!
15
//! Internally, the circuit reactor consists of multiple reactors,
16
//! each running in a separate task:
17
//!
18
//!   * [`StreamReactor`] (one per hop): handles all messages arriving to,
19
//!     and coming from the streams of a given hop. The ready stream messages
20
//!     are sent to the [`BackwardReactor`]
21
//!   * [`ForwardReactor`]: handles incoming cells arriving on the
22
//!     "inbound" Tor channel (towards the guard, if we are a client, or towards
23
//!     the client, if we are a relay). If we are a client, it moves stream messages
24
//!     towards the corresponding [`StreamReactor`]. If we are a relay,
25
//!     in addition to sending any stream messages to the `StreamReactor`,
26
//!     this reactor also moves cells in the forward direction
27
//!     (from the client towards the exit)
28
//!   * [`BackwardReactor`]: writes cells to the "inbound" Tor channel:
29
//!     towards the client if we are a relay, or the towards the exit
30
//!     if we are a client.
31
//!
32
// TODO: the forward/backward terminology no longer makes sense! Come up with better terms...
33
//!
34
//! If we are an exit relay, the cell flow looks roughly like this:
35
//!
36
//! ```text
37
//!                             <stream_tx
38
//!                              MPSC (0)>
39
//!   +--------------> FWD -------------------------+
40
//!   |                 |                           |
41
//!   |                 |                           |
42
//!   |                 |                           |
43
//!   |                 |                           v
44
//! relay      BackwardReactorCmd            StreamReactor
45
//!   ^             <MPSC (0)>                      |
46
//!   |                 |                           |
47
//!   |                 |                           |
48
//!   |                 |                           |
49
//!   |                 v                           |
50
//!   +--------------- BWD <------------------------+
51
//!     application stream data    <stream_rx
52
//!                                 MPSC (0)>
53
//!
54
//! For a middle relay (the `StreamReactor` is omitted for brevity,
55
//! but middle relays can have one too, if leaky pipe is in use):
56
//!
57
//! ```text                   unrecognized cell
58
//!   +--------------> FWD -------------------------+
59
//!   |                 |                           |
60
//!   |                 |                           |
61
//!   |                 |                           |
62
//!   |                 |                           v
63
//! client      BackwardReactorCmd                relay
64
//! or relay        <MPSC (0)>                      |
65
//!   ^                 |                           |
66
//!   |                 |                           |
67
//!   |                 |                           |
68
//!   |                 |                           |
69
//!   |                 v                           |
70
//!   +--------------- BWD <------------------------+
71
//! ```
72
//!
73
//! On the client-side the `ForwardReactor` reads cells from the Tor channel to the guard,
74
//! and the `BackwardReactor` writes to it.
75
//!
76
//! ```text
77
//!   +--------------- FWD <--------------------+
78
//!   |                 |                       |
79
//!   |                 |                       |
80
//!   |                 |                       |
81
//!   v                 |                       |
82
//! StreamReactor  BackwardReactorCmd         guard
83
//!   |               <MPSC (0)>                ^
84
//!   |                 |                       |
85
//!   |                 |                       |
86
//!   |                 |                       |
87
//!   |                 v                       |
88
//!   +--------------> BWD ---------------------+
89
//! ```
90
//!
91
//! Client with leaky pipe (`SR` = `StreamReactor`):
92
//!
93
//! ```text
94
//!   +------------------------------+
95
//!   |       +--------------------+ | (1 MPSC TX per SR)
96
//!   |       |                    | |
97
//!   |       |       +----------- FWD <------------------+
98
//!   |       |       |             |                     |
99
//!   |       |       |             |                     |
100
//!   |       |       |             |                     |
101
//!   v       v       v             |                     |
102
//!  SR      SR      SR           BackwardReactorCmd    guard
103
//! (hop 4) (hop 3)  (hop 2)      <MPSC (0)>              ^
104
//!   |       |       |             |                     |
105
//!   |       |       |             |                     |
106
//!   |       |       |             |                     |
107
//!   |       |       |             v                     |
108
//!   |       |       |            BWD -------------------+
109
//!   |       |       |             ^
110
//!   |       |       |             |
111
//!   |       |       |             | <stream_rx
112
//!   |       |       |             |  MPSC (0)>
113
//!   +-------+-------+-------------+
114
//! ```
115
//!
116
// TODO(tuning): The inter-reactor MPSC channels have no buffering,
117
// which is likely going to be bad for performance,
118
// so we will need to tune the sizes of these MPSC buffers.
119
//!
120
//! The read and write ends of the inbound and outbound Tor channels are "split",
121
//! such that each reactor holds an `inbound_chan_rx` stream (for reading)
122
//! and a `inbound_chan_tx` sink (for writing):
123
//!
124
//!  * `ForwardReactor` holds the reading end of the inbound
125
//!    (coming from the client, if we are a relay, or coming from the guard, if we are a client)
126
//!    Tor channel, and the writing end of the outbound (towards the exit, if we are a middle relay)
127
//!    Tor channel, if there is one
128
//!  * `BackwardReactor` holds the reading end of the outbound channel, if there is one,
129
//!    and the writing end of the inbound channel, if there is one
130
//!
131
//! #### `ForwardReactor`
132
//!
133
//! It handles forward cells, by delegating to the implementation-dependent
134
//! [`ForwardHandler::handle_forward_cell`], which decides
135
//! whether the cell needs to be handled in `ForwardReactor`,
136
//! or in the `ForwardHandler` itself.
137
//!
138
//! More concretely:
139
//!
140
//! ```text
141
//!
142
//! Legend: `F` = "forward reactor", `H` = "ForwardHandler"
143
//!
144
//! | Message           | Received in | Handled in | Description                            |
145
//! |-------------------|-------------|------------|----------------------------------------|
146
//! | DESTROY           | F           | H          | Handled internally by the FowardHandler|
147
//! |-------------------|-------------|------------|----------------------------------------|
148
//! | PADDING_NEGOTIATE | F           | H          | Handled internally by the FowardHandler|
149
//! |-------------------|-------------|------------|----------------------------------------|
150
//! | *unrecognized*    | F           | H          | Unrecognized relay cell handling is    |
151
//! | RELAY OR          |             |            | implementation-dependent so these are  |
152
//! | RELAY_EARLY       |             |            | handled in the ForwardHandler.         |
153
//! |                   |             |            |                                        |
154
//! |                   |             |            | The relay ForwardHandler will handle   |
155
//! |                   |             |            | these by forwarding them to the next   |
156
//! |                   |             |            | hop, if there is one.                  |
157
//! |                   |             |            |                                        |
158
//! |                   |             |            | Clients don't yet implement            |
159
//! |                   |             |            | ForwardHandler, but when they do,      |
160
//! |                   |             |            | its implementation will simply reject  |
161
//! |                   |             |            | any messages that can't be decrypted   |
162
//! |-------------------|-------------|------------|----------------------------------------|
163
//! | *recognized*      | F           | see table  | Handling depends on the cmd            |
164
//! | RELAY OR          |             | below      |                                        |
165
//! | RELAY_EARLY       |             |            |                                        |
166
//! ```
167
//!
168
//! Recognized relay cells are handled by splitting each cell into individual messages,
169
//! and handling each message individually as described in the table below
170
//! (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell):
171
//!
172
//! ```text
173
//!
174
//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
175
//!
176
//! | RELAY cmd         | Received in | Handled in | Description                            |
177
//! |-------------------|-------------|------------|----------------------------------------|
178
//! | SENDME            | F           | B          | Sent to BackwardReactor for handling   |
179
//! |                   |             |            | (BackwardReactorCmd::HandleSendme)     |
180
//! |                   |             |            | because the forward reactor doesn't    |
181
//! |                   |             |            | have access to the inbound_chan_tx part|
182
//! |                   |             |            | of the inbound (towards the client)    |
183
//! |                   |             |            | Tor channel, and so cannot obtain the  |
184
//! |                   |             |            | congestion signals needed for SENDME   |
185
//! |                   |             |            | handling                               |
186
//! |-------------------|-------------|------------|----------------------------------------|
187
//! | Other             | F           | F          | Passed to impl-dependent handler       |
188
//! | (StreamId = 0)    |             |            |  `ForwardHandler::handle_meta_msg()`   |
189
//! |-------------------|-------------|------------|----------------------------------------|
190
//! | Other             | F           | S          | All messages with a non-zero stream ID |
191
//! | (StreamId != 0)   |             |            | are forwarded to the stream reactor    |
192
//! |-------------------|-------------|------------|----------------------------------------|
193
//! ```
194
//!
195
//! #### `BackwardReactor`
196
//!
197
//! It handles
198
//!
199
//!  * the packaging and delivery of all cells that need to be written to the "inbound" Tor channel
200
//!    (it writes them to the towards-the-client Tor channel sink) (**partially implemented**)
201
//!  * incoming cells coming over the "outbound" Tor channel. This channel only exists
202
//!    if we are a middle relay. These cells are relayed to the "inbound" Tor channel (**not implemented**).
203
//!  * the sending of padding cells, according to the PaddingController's instructions
204
//!
205
//! This multi-reactor architecture should, in theory, have better performance than
206
//! a single reactor system, because it enables us to parallelize some of the work:
207
//! the forward and backward directions share little state,
208
//! because they read from, and write to, different sinks/streams,
209
//! so they can be run in parallel (as separate tasks).
210
//! With a single reactor architecture, the reactor would need to drive
211
//! both the forward and the backward direction, and on each iteration
212
//! would need to decide which to prioritize, which might prove tricky
213
//! (though prioritizing one of them at random would've probably been good enough).
214
//!
215
//! The monolithic single reactor alternative would also have been significantly
216
//! more convoluted, and so more difficult to maintain in the long run.
217
//!
218
//
219
// NOTE: The FWD and BWD currently share the hop list containing the per-hop state,
220
// (including the congestion control object, which is behind a mutex).
221
//
222
//! [multi-reactor circuit subsystem]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/relay-conflux.md
223
//! [`StreamReactor`]: stream::StreamReactor
224

            
225
// TODO(DEDUP): this will replace CircHopList when we rewrite the client reactor
226
// to use the new reactor architecture
227
pub(crate) mod circhop;
228

            
229
pub(crate) mod backward;
230
pub(crate) mod forward;
231
pub(crate) mod hop_mgr;
232
pub(crate) mod macros;
233
pub(crate) mod stream;
234

            
235
use std::result::Result as StdResult;
236
use std::sync::Arc;
237

            
238
use derive_deftly::Deftly;
239
use futures::channel::mpsc;
240
use futures::{FutureExt as _, StreamExt as _, select_biased};
241
use oneshot_fused_workaround as oneshot;
242
use tracing::trace;
243

            
244
use tor_cell::chancell::CircId;
245
use tor_rtcompat::{DynTimeProvider, Runtime};
246

            
247
use crate::channel::Channel;
248
use crate::circuit::reactor::backward::BackwardHandler;
249
use crate::circuit::reactor::forward::ForwardHandler;
250
use crate::circuit::reactor::hop_mgr::HopMgr;
251
use crate::circuit::reactor::stream::ReadyStreamMsg;
252
use crate::circuit::{CircuitRxReceiver, UniqId};
253
use crate::memquota::CircuitAccount;
254
use crate::util::err::ReactorError;
255

            
256
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
257
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
258

            
259
use backward::BackwardReactor;
260
use forward::ForwardReactor;
261
use macros::derive_deftly_template_CircuitReactor;
262

            
263
/// The type of a oneshot channel used to inform reactor of the result of an operation.
264
pub(crate) type ReactorResultChannel<T> = oneshot::Sender<crate::Result<T>>;
265

            
266
/// A handle for interacting with a circuit reactor.
267
#[derive(derive_more::Debug)]
268
pub(crate) struct CircReactorHandle<F: ForwardHandler, B: BackwardHandler> {
269
    /// Sender for reactor control messages.
270
    #[debug(skip)]
271
    pub(crate) control: mpsc::UnboundedSender<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
272
    /// Sender for reactor control commands.
273
    #[debug(skip)]
274
    pub(crate) command: mpsc::UnboundedSender<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
275
    /// The time provider.
276
    #[expect(unused)] // TODO(relay)
277
    pub(crate) time_provider: DynTimeProvider,
278
    /// Memory quota account
279
    #[expect(unused)] // TODO(relay)
280
    pub(crate) memquota: CircuitAccount,
281
}
282

            
283
/// A control command.
284
///
285
/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
286
/// never cause cells to sent on the Tor channel,
287
/// while `CtrlMsg`s potentially do.
288
#[allow(unused)] // TODO(relay)
289
pub(crate) enum CtrlCmd<F, B> {
290
    /// A control command for the forward reactor.
291
    Forward(forward::CtrlCmd<F>),
292
    /// A control command for the backward reactor.
293
    Backward(backward::CtrlCmd<B>),
294
    /// Shut down the reactor.
295
    Shutdown,
296
}
297

            
298
/// A control message.
299
#[allow(unused)] // TODO(relay)
300
pub(crate) enum CtrlMsg<F, B> {
301
    /// A control message for the forward reactor.
302
    Forward(forward::CtrlMsg<F>),
303
    /// A control message for the backward reactor.
304
    Backward(backward::CtrlMsg<B>),
305
}
306

            
307
/// The entry point of the circuit reactor subsystem.
308
#[derive(Deftly)]
309
#[derive_deftly(CircuitReactor)]
310
#[deftly(reactor_name = "circuit reactor")]
311
#[deftly(only_run_once)]
312
#[deftly(run_inner_fn = "Self::run_inner")]
313
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
314
pub(crate) struct Reactor<R: Runtime, F: ForwardHandler, B: BackwardHandler> {
315
    /// The process-unique identifier of this circuit.
316
    ///
317
    /// Used for logging.
318
    unique_id: UniqId,
319
    /// The reactor for handling
320
    ///
321
    ///   * cells moving in the forward direction (from the client towards exit), if we are a relay
322
    ///   * incoming cells (coming from the guard), if we are a client
323
    ///
324
    /// Optional so we can move it out of self in run().
325
    forward: Option<ForwardReactor<R, F>>,
326
    /// The reactor for handling
327
    ///
328
    ///   * cells moving in the backward direction (from the exit towards client), if we are a relay
329
    ///   * outgoing cells (moving towards the guard), if we are a client
330
    ///
331
    /// Optional so we can move it out of self in run().
332
    backward: Option<BackwardReactor<B>>,
333
    /// Receiver for control messages for this reactor, sent by reactor handle objects.
334
    control: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
335
    /// Receiver for command messages for this reactor, sent by reactor handle objects.
336
    ///
337
    /// This MPSC channel is polled in [`run`](Self::run).
338
    ///
339
    /// NOTE: this is a separate channel from `control`, because some messages
340
    /// have higher priority and need to be handled even if the `inbound_chan_tx` is not
341
    /// ready (whereas `control` messages are not read until the `inbound_chan_tx` sink
342
    /// is ready to accept cells).
343
    command: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
344
    /// Control channels for the [`ForwardReactor`].
345
    ///
346
    /// Handles [`CtrlCmd::Forward`] and [`CtrlMsg::Forward`] messages.
347
    fwd_ctrl: ReactorCtrl<forward::CtrlCmd<F::CtrlCmd>, forward::CtrlMsg<F::CtrlMsg>>,
348
    /// Control channels for the [`BackwardReactor`].
349
    ///
350
    /// Handles [`CtrlCmd::Backward`] and [`CtrlMsg::Backward`] messages.
351
    bwd_ctrl: ReactorCtrl<backward::CtrlCmd<B::CtrlCmd>, backward::CtrlMsg<B::CtrlMsg>>,
352
}
353

            
354
/// A handle for sending control/command messages to a FWD or BWD.
355
struct ReactorCtrl<C, M> {
356
    /// Sender for control commands.
357
    command_tx: mpsc::UnboundedSender<C>,
358
    /// Sender for control messages.
359
    control_tx: mpsc::UnboundedSender<M>,
360
}
361

            
362
impl<C, M> ReactorCtrl<C, M> {
363
    /// Create a new sender handle.
364
80
    fn new(command_tx: mpsc::UnboundedSender<C>, control_tx: mpsc::UnboundedSender<M>) -> Self {
365
80
        Self {
366
80
            command_tx,
367
80
            control_tx,
368
80
        }
369
80
    }
370

            
371
    /// Send a control command.
372
4
    fn send_cmd(&mut self, cmd: C) -> Result<(), ReactorError> {
373
4
        self.command_tx
374
4
            .unbounded_send(cmd)
375
4
            .map_err(|_| ReactorError::Shutdown)
376
4
    }
377

            
378
    /// Send a control message.
379
    fn send_msg(&mut self, msg: M) -> Result<(), ReactorError> {
380
        self.control_tx
381
            .unbounded_send(msg)
382
            .map_err(|_| ReactorError::Shutdown)
383
    }
384
}
385

            
386
/// Trait implemented by types that can handle control messages and commands.
387
pub(crate) trait ControlHandler {
388
    /// The type of control message expected by the forward reactor.
389
    type CtrlMsg;
390

            
391
    /// The type of control command expected by the forward reactor.
392
    type CtrlCmd;
393

            
394
    // TODO(DEDUP): do these APIs make sense?
395
    // What should we return here, maybe some instructions for the base reactor
396
    // to do something?
397

            
398
    /// Handle a control command.
399
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError>;
400

            
401
    /// Handle a control message.
402
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError>;
403
}
404

            
405
#[allow(unused)] // TODO(relay)
406
impl<R: Runtime, F: ForwardHandler + ControlHandler, B: BackwardHandler + ControlHandler>
407
    Reactor<R, F, B>
408
{
409
    /// Create a new circuit reactor.
410
    ///
411
    /// The reactor will send outbound messages on `channel`, receive incoming
412
    /// messages on `inbound_chan_rx`, and identify this circuit by the channel-local
413
    /// [`CircId`] provided.
414
    ///
415
    /// The internal unique identifier for this circuit will be `unique_id`.
416
    #[allow(clippy::too_many_arguments)] // TODO
417
40
    pub(crate) fn new(
418
40
        runtime: R,
419
40
        channel: &Arc<Channel>,
420
40
        circ_id: CircId,
421
40
        unique_id: UniqId,
422
40
        inbound_chan_rx: CircuitRxReceiver,
423
40
        forward_impl: F,
424
40
        backward_impl: B,
425
40
        hop_mgr: HopMgr<R>,
426
40
        padding_ctrl: PaddingController,
427
40
        padding_event_stream: PaddingEventStream,
428
40
        // The sending end of this channel should be in HopMgr
429
40
        bwd_rx: mpsc::Receiver<ReadyStreamMsg>,
430
40
        fwd_events: mpsc::Receiver<F::CircEvent>,
431
40
        memquota: &CircuitAccount,
432
40
    ) -> (Self, CircReactorHandle<F, B>) {
433
        // NOTE: not registering this channel with the memquota subsystem is okay,
434
        // because it has no buffering (if ever decide to make the size of this buffer
435
        // non-zero for whatever reason, we must remember to register it with memquota
436
        // so that it counts towards the total memory usage for the circuit.
437
        #[allow(clippy::disallowed_methods)]
438
40
        let (backward_reactor_tx, forward_reactor_rx) = mpsc::channel(0);
439

            
440
        // TODO: channels galore
441
40
        let (control_tx, control_rx) = mpsc::unbounded();
442
40
        let (command_tx, command_rx) = mpsc::unbounded();
443

            
444
40
        let (fwd_control_tx, fwd_control_rx) = mpsc::unbounded();
445
40
        let (fwd_command_tx, fwd_command_rx) = mpsc::unbounded();
446
40
        let (bwd_control_tx, bwd_control_rx) = mpsc::unbounded();
447
40
        let (bwd_command_tx, bwd_command_rx) = mpsc::unbounded();
448

            
449
40
        let fwd_ctrl = ReactorCtrl::new(fwd_command_tx, fwd_control_tx);
450
40
        let bwd_ctrl = ReactorCtrl::new(bwd_command_tx, bwd_control_tx);
451

            
452
40
        let handle = CircReactorHandle {
453
40
            control: control_tx,
454
40
            command: command_tx,
455
40
            time_provider: DynTimeProvider::new(runtime.clone()),
456
40
            memquota: memquota.clone(),
457
40
        };
458

            
459
        /// Grab a handle to the hop list (it's needed by the BWD)
460
40
        let hops = Arc::clone(hop_mgr.hops());
461
40
        let forward = ForwardReactor::new(
462
40
            runtime.clone(),
463
40
            unique_id,
464
40
            forward_impl,
465
40
            hop_mgr,
466
40
            inbound_chan_rx,
467
40
            fwd_control_rx,
468
40
            fwd_command_rx,
469
40
            backward_reactor_tx,
470
40
            fwd_events,
471
40
            padding_ctrl.clone(),
472
        );
473

            
474
40
        let backward = BackwardReactor::new(
475
40
            runtime,
476
40
            channel,
477
40
            circ_id,
478
40
            unique_id,
479
40
            backward_impl,
480
40
            hops,
481
40
            forward_reactor_rx,
482
40
            bwd_control_rx,
483
40
            bwd_command_rx,
484
40
            padding_ctrl,
485
40
            padding_event_stream,
486
40
            bwd_rx,
487
        );
488

            
489
40
        let reactor = Reactor {
490
40
            unique_id,
491
40
            forward: Some(forward),
492
40
            backward: Some(backward),
493
40
            control: control_rx,
494
40
            command: command_rx,
495
40
            fwd_ctrl,
496
40
            bwd_ctrl,
497
40
        };
498

            
499
40
        (reactor, handle)
500
40
    }
501

            
502
    /// Helper for [`run`](Self::run).
503
40
    pub(crate) async fn run_inner(&mut self) -> StdResult<(), ReactorError> {
504
40
        let (forward, backward) = (|| Some((self.forward.take()?, self.backward.take()?)))()
505
40
            .expect("relay reactor spawned twice?!");
506

            
507
40
        let mut forward = Box::pin(forward.run()).fuse();
508
40
        let mut backward = Box::pin(backward.run()).fuse();
509
        loop {
510
            // If either of these completes, this function returns,
511
            // dropping fwd_ctrl/bwd_ctrl channels, which will, in turn,
512
            // cause the remaining reactor, if there is one, to shut down too
513
44
            select_biased! {
514
44
                res = self.command.next() => {
515
16
                    let Some(cmd) = res else {
516
12
                        trace!(
517
                            circ_id = %self.unique_id,
518
                            reason = "command channel drop",
519
                            "reactor shutdown",
520
                        );
521

            
522
12
                        return Err(ReactorError::Shutdown);
523
                    };
524

            
525
4
                    self.handle_command(cmd)?;
526
                },
527
44
                res = self.control.next() => {
528
                    let Some(msg) = res else {
529
                        trace!(
530
                            circ_id = %self.unique_id,
531
                            reason = "control channel drop",
532
                            "reactor shutdown",
533
                        );
534

            
535
                        return Err(ReactorError::Shutdown);
536
                    };
537

            
538
                    self.handle_control(msg)?;
539
                },
540
                // No need to log the error here, because it was already logged
541
                // by the reactor that shut down
542
24
                res = forward => return Ok(res?),
543
4
                res = backward => return Ok(res?),
544
            }
545
        }
546
40
    }
547

            
548
    /// Handle a shutdown request.
549
    fn handle_shutdown(&self) -> StdResult<(), ReactorError> {
550
        trace!(
551
            tunnel_id = %self.unique_id,
552
            "reactor shutdown due to explicit request",
553
        );
554

            
555
        Err(ReactorError::Shutdown)
556
    }
557

            
558
    /// Handle a [`CtrlCmd`].
559
4
    fn handle_command(
560
4
        &mut self,
561
4
        cmd: CtrlCmd<F::CtrlCmd, B::CtrlCmd>,
562
4
    ) -> StdResult<(), ReactorError> {
563
4
        match cmd {
564
4
            CtrlCmd::Forward(c) => self.fwd_ctrl.send_cmd(c),
565
            CtrlCmd::Backward(c) => self.bwd_ctrl.send_cmd(c),
566
            CtrlCmd::Shutdown => self.handle_shutdown(),
567
        }
568
4
    }
569

            
570
    /// Handle a [`CtrlMsg`].
571
    fn handle_control(
572
        &mut self,
573
        cmd: CtrlMsg<F::CtrlMsg, B::CtrlMsg>,
574
    ) -> StdResult<(), ReactorError> {
575
        match cmd {
576
            CtrlMsg::Forward(c) => self.fwd_ctrl.send_msg(c),
577
            CtrlMsg::Backward(c) => self.bwd_ctrl.send_msg(c),
578
        }
579
    }
580
}
581

            
582
#[cfg(test)]
583
pub(crate) mod test {
584
    // @@ begin test lint list maintained by maint/add_warning @@
585
    #![allow(clippy::bool_assert_comparison)]
586
    #![allow(clippy::clone_on_copy)]
587
    #![allow(clippy::dbg_macro)]
588
    #![allow(clippy::mixed_attributes_style)]
589
    #![allow(clippy::print_stderr)]
590
    #![allow(clippy::print_stdout)]
591
    #![allow(clippy::single_char_pattern)]
592
    #![allow(clippy::unwrap_used)]
593
    #![allow(clippy::unchecked_time_subtraction)]
594
    #![allow(clippy::useless_vec)]
595
    #![allow(clippy::needless_pass_by_value)]
596
    #![allow(clippy::string_slice)] // See arti#2571
597
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
598

            
599
    use tor_basic_utils::test_rng::testing_rng;
600
    use tor_cell::chancell::{BoxedCellBody, msg as chanmsg};
601
    use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, msg as relaymsg};
602

            
603
    use chanmsg::AnyChanMsg;
604

            
605
    #[cfg(feature = "hs-service")]
606
    use crate::stream::IncomingStreamRequestFilter;
607

            
608
    pub(crate) fn rmsg_to_ccmsg(
609
        id: Option<StreamId>,
610
        msg: relaymsg::AnyRelayMsg,
611
        early: bool,
612
    ) -> AnyChanMsg {
613
        // TODO #1947: test other formats.
614
        let rfmt = RelayCellFormat::V0;
615
        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
616
            .encode(rfmt, &mut testing_rng())
617
            .unwrap();
618
        let chanmsg = chanmsg::Relay::from(body);
619

            
620
        if early {
621
            let chanmsg = chanmsg::RelayEarly::from(chanmsg);
622
            AnyChanMsg::RelayEarly(chanmsg)
623
        } else {
624
            AnyChanMsg::Relay(chanmsg)
625
        }
626
    }
627

            
628
    #[cfg(any(feature = "hs-service", feature = "relay"))]
629
    pub(crate) struct AllowAllStreamsFilter;
630
    #[cfg(any(feature = "hs-service", feature = "relay"))]
631
    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
632
        fn disposition(
633
            &mut self,
634
            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
635
            _circ: &crate::circuit::CircHopSyncView<'_>,
636
        ) -> crate::Result<crate::stream::IncomingStreamRequestDisposition> {
637
            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
638
        }
639
    }
640
}