1
//! Module exposing the circuit reactor subsystem.
2
//!
3
//! This module implements the new [multi-reactor circuit subsystem].
4
//!
5
// Note: this is currently only used for the relay side,
6
// but we plan to eventually rewrite client circuit implementation
7
// to use these new reactor types as well.
8
//!
9
//! The entry point of the reactor is [`Reactor::run`], which launches the
10
//! reactor background tasks, and begins listening for inbound cells on the provided
11
//! inbound Tor channel.
12
//!
13
//! ### Architecture
14
//!
15
//! Internally, the circuit reactor consists of multiple reactors,
16
//! each running in a separate task:
17
//!
18
//!   * [`StreamReactor`] (one per hop): handles all messages arriving to,
19
//!     and coming from the streams of a given hop. The ready stream messages
20
//!     are sent to the [`BackwardReactor`]
21
//!   * [`ForwardReactor`]: handles incoming cells arriving on the
22
//!     "inbound" Tor channel (towards the guard, if we are a client, or towards
23
//!     the client, if we are a relay). If we are a client, it moves stream messages
24
//!     towards the corresponding [`StreamReactor`]. If we are a relay,
25
//!     in addition to sending any stream messages to the `StreamReactor`,
26
//!     this reactor also moves cells in the forward direction
27
//!     (from the client towards the exit)
28
//!   * [`BackwardReactor`]: writes cells to the "inbound" Tor channel:
29
//!     towards the client if we are a relay, or the towards the exit
30
//!     if we are a client.
31
//!
32
// TODO: the forward/backward terminology no longer makes sense! Come up with better terms...
33
//!
34
//! If we are an exit relay, the cell flow looks roughly like this:
35
//!
36
//! ```text
37
//!                             <stream_tx
38
//!                              MPSC (0)>
39
//!   +--------------> FWD -------------------------+
40
//!   |                 |                           |
41
//!   |                 |                           |
42
//!   |                 |                           |
43
//!   |                 |                           v
44
//! relay      BackwardReactorCmd            StreamReactor
45
//!   ^             <MPSC (0)>                      |
46
//!   |                 |                           |
47
//!   |                 |                           |
48
//!   |                 |                           |
49
//!   |                 v                           |
50
//!   +--------------- BWD <------------------------+
51
//!     application stream data    <stream_rx
52
//!                                 MPSC (0)>
53
//!
54
//! For a middle relay (the `StreamReactor` is omitted for brevity,
55
//! but middle relays can have one too, if leaky pipe is in use):
56
//!
57
//! ```text                   unrecognized cell
58
//!   +--------------> FWD -------------------------+
59
//!   |                 |                           |
60
//!   |                 |                           |
61
//!   |                 |                           |
62
//!   |                 |                           v
63
//! client      BackwardReactorCmd                relay
64
//! or relay        <MPSC (0)>                      |
65
//!   ^                 |                           |
66
//!   |                 |                           |
67
//!   |                 |                           |
68
//!   |                 |                           |
69
//!   |                 v                           |
70
//!   +--------------- BWD <------------------------+
71
//! ```
72
//!
73
//! On the client-side the `ForwardReactor` reads cells from the Tor channel to the guard,
74
//! and the `BackwardReactor` writes to it.
75
//!
76
//! ```text
77
//!   +--------------- FWD <--------------------+
78
//!   |                 |                       |
79
//!   |                 |                       |
80
//!   |                 |                       |
81
//!   v                 |                       |
82
//! StreamReactor  BackwardReactorCmd         guard
83
//!   |               <MPSC (0)>                ^
84
//!   |                 |                       |
85
//!   |                 |                       |
86
//!   |                 |                       |
87
//!   |                 v                       |
88
//!   +--------------> BWD ---------------------+
89
//! ```
90
//!
91
//! Client with leaky pipe (`SR` = `StreamReactor`):
92
//!
93
//! ```text
94
//!   +------------------------------+
95
//!   |       +--------------------+ | (1 MPSC TX per SR)
96
//!   |       |                    | |
97
//!   |       |       +----------- FWD <------------------+
98
//!   |       |       |             |                     |
99
//!   |       |       |             |                     |
100
//!   |       |       |             |                     |
101
//!   v       v       v             |                     |
102
//!  SR      SR      SR           BackwardReactorCmd    guard
103
//! (hop 4) (hop 3)  (hop 2)      <MPSC (0)>              ^
104
//!   |       |       |             |                     |
105
//!   |       |       |             |                     |
106
//!   |       |       |             |                     |
107
//!   |       |       |             v                     |
108
//!   |       |       |            BWD -------------------+
109
//!   |       |       |             ^
110
//!   |       |       |             |
111
//!   |       |       |             | <stream_rx
112
//!   |       |       |             |  MPSC (0)>
113
//!   +-------+-------+-------------+
114
//! ```
115
//!
116
// TODO(tuning): The inter-reactor MPSC channels have no buffering,
117
// which is likely going to be bad for performance,
118
// so we will need to tune the sizes of these MPSC buffers.
119
//!
120
//! The read and write ends of the inbound and outbound Tor channels are "split",
121
//! such that each reactor holds an `inbound_chan_rx` stream (for reading)
122
//! and a `inbound_chan_tx` sink (for writing):
123
//!
124
//!  * `ForwardReactor` holds the reading end of the inbound
125
//!    (coming from the client, if we are a relay, or coming from the guard, if we are a client)
126
//!    Tor channel, and the writing end of the outbound (towards the exit, if we are a middle relay)
127
//!    Tor channel, if there is one
128
//!  * `BackwardReactor` holds the reading end of the outbound channel, if there is one,
129
//!    and the writing end of the inbound channel, if there is one
130
//!
131
//! #### `ForwardReactor`
132
//!
133
//! It handles forward cells, by delegating to the implementation-dependent
134
//! [`ForwardHandler::handle_forward_cell`], which decides
135
//! whether the cell needs to be handled in `ForwardReactor`,
136
//! or in the `ForwardHandler` itself.
137
//!
138
//! More concretely:
139
//!
140
//! ```text
141
//!
142
//! Legend: `F` = "forward reactor", `H` = "ForwardHandler"
143
//!
144
//! | Message           | Received in | Handled in | Description                            |
145
//! |-------------------|-------------|------------|----------------------------------------|
146
//! | DESTROY           | F           | H          | Handled internally by the FowardHandler|
147
//! |-------------------|-------------|------------|----------------------------------------|
148
//! | PADDING_NEGOTIATE | F           | H          | Handled internally by the FowardHandler|
149
//! |-------------------|-------------|------------|----------------------------------------|
150
//! | *unrecognized*    | F           | H          | Unrecognized relay cell handling is    |
151
//! | RELAY OR          |             |            | implementation-dependent so these are  |
152
//! | RELAY_EARLY       |             |            | handled in the ForwardHandler.         |
153
//! |                   |             |            |                                        |
154
//! |                   |             |            | The relay ForwardHandler will handle   |
155
//! |                   |             |            | these by forwarding them to the next   |
156
//! |                   |             |            | hop, if there is one.                  |
157
//! |                   |             |            |                                        |
158
//! |                   |             |            | Clients don't yet implement            |
159
//! |                   |             |            | ForwardHandler, but when they do,      |
160
//! |                   |             |            | its implementation will simply reject  |
161
//! |                   |             |            | any messages that can't be decrypted   |
162
//! |-------------------|-------------|------------|----------------------------------------|
163
//! | *recognized*      | F           | see table  | Handling depends on the cmd            |
164
//! | RELAY OR          |             | below      |                                        |
165
//! | RELAY_EARLY       |             |            |                                        |
166
//! ```
167
//!
168
//! Recognized relay cells are handled by splitting each cell into individual messages,
169
//! and handling each message individually as described in the table below
170
//! (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell):
171
//!
172
//! ```text
173
//!
174
//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
175
//!
176
//! | RELAY cmd         | Received in | Handled in | Description                            |
177
//! |-------------------|-------------|------------|----------------------------------------|
178
//! | SENDME            | F           | B          | Sent to BackwardReactor for handling   |
179
//! |                   |             |            | (BackwardReactorCmd::HandleSendme)     |
180
//! |                   |             |            | because the forward reactor doesn't    |
181
//! |                   |             |            | have access to the inbound_chan_tx part|
182
//! |                   |             |            | of the inbound (towards the client)    |
183
//! |                   |             |            | Tor channel, and so cannot obtain the  |
184
//! |                   |             |            | congestion signals needed for SENDME   |
185
//! |                   |             |            | handling                               |
186
//! |-------------------|-------------|------------|----------------------------------------|
187
//! | Other             | F           | F          | Passed to impl-dependent handler       |
188
//! | (StreamId = 0)    |             |            |  `ForwardHandler::handle_meta_msg()`   |
189
//! |-------------------|-------------|------------|----------------------------------------|
190
//! | Other             | F           | S          | All messages with a non-zero stream ID |
191
//! | (StreamId != 0)   |             |            | are forwarded to the stream reactor    |
192
//! |-------------------|-------------|------------|----------------------------------------|
193
//! ```
194
//!
195
//! #### `BackwardReactor`
196
//!
197
//! It handles
198
//!
199
//!  * the packaging and delivery of all cells that need to be written to the "inbound" Tor channel
200
//!    (it writes them to the towards-the-client Tor channel sink) (**partially implemented**)
201
//!  * incoming cells coming over the "outbound" Tor channel. This channel only exists
202
//!    if we are a middle relay. These cells are relayed to the "inbound" Tor channel (**not implemented**).
203
//!  * the sending of padding cells, according to the PaddingController's instructions
204
//!
205
//! This multi-reactor architecture should, in theory, have better performance than
206
//! a single reactor system, because it enables us to parallelize some of the work:
207
//! the forward and backward directions share little state,
208
//! because they read from, and write to, different sinks/streams,
209
//! so they can be run in parallel (as separate tasks).
210
//! With a single reactor architecture, the reactor would need to drive
211
//! both the forward and the backward direction, and on each iteration
212
//! would need to decide which to prioritize, which might prove tricky
213
//! (though prioritizing one of them at random would've probably been good enough).
214
//!
215
//! The monolithic single reactor alternative would also have been significantly
216
//! more convoluted, and so more difficult to maintain in the long run.
217
//!
218
//
219
// NOTE: The FWD and BWD currently share the hop list containing the per-hop state,
220
// (including the congestion control object, which is behind a mutex).
221
//
222
//! [multi-reactor circuit subsystem]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/relay-conflux.md
223
//! [`StreamReactor`]: stream::StreamReactor
224

            
225
// TODO(DEDUP): this will replace CircHopList when we rewrite the client reactor
226
// to use the new reactor architecture
227
pub(crate) mod circhop;
228

            
229
pub(crate) mod backward;
230
pub(crate) mod forward;
231
pub(crate) mod hop_mgr;
232
pub(crate) mod macros;
233
pub(crate) mod stream;
234

            
235
use std::result::Result as StdResult;
236
use std::sync::Arc;
237

            
238
use derive_deftly::Deftly;
239
use futures::channel::mpsc;
240
use futures::{FutureExt as _, StreamExt as _, select_biased};
241
use oneshot_fused_workaround as oneshot;
242
use tracing::trace;
243

            
244
use tor_cell::chancell::CircId;
245
use tor_rtcompat::{DynTimeProvider, Runtime};
246

            
247
use crate::channel::Channel;
248
use crate::circuit::reactor::backward::BackwardHandler;
249
use crate::circuit::reactor::forward::ForwardHandler;
250
use crate::circuit::reactor::hop_mgr::HopMgr;
251
use crate::circuit::reactor::stream::ReadyStreamMsg;
252
use crate::circuit::{CircuitRxReceiver, UniqId};
253
use crate::memquota::CircuitAccount;
254
use crate::util::err::ReactorError;
255

            
256
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
257
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
258

            
259
use backward::BackwardReactor;
260
use forward::ForwardReactor;
261
use macros::derive_deftly_template_CircuitReactor;
262

            
263
/// The type of a oneshot channel used to inform reactor of the result of an operation.
264
pub(crate) type ReactorResultChannel<T> = oneshot::Sender<crate::Result<T>>;
265

            
266
/// A handle for interacting with a circuit reactor.
267
#[derive(derive_more::Debug)]
268
pub(crate) struct CircReactorHandle<F: ForwardHandler, B: BackwardHandler> {
269
    /// Sender for reactor control messages.
270
    #[debug(skip)]
271
    pub(crate) control: mpsc::UnboundedSender<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
272
    /// Sender for reactor control commands.
273
    #[debug(skip)]
274
    pub(crate) command: mpsc::UnboundedSender<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
275
    /// The time provider.
276
    pub(crate) time_provider: DynTimeProvider,
277
    /// Memory quota account
278
    pub(crate) memquota: CircuitAccount,
279
}
280

            
281
/// A control command.
282
///
283
/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
284
/// never cause cells to sent on the Tor channel,
285
/// while `CtrlMsg`s potentially do.
286
#[allow(unused)] // TODO(relay)
287
pub(crate) enum CtrlCmd<F, B> {
288
    /// A control command for the forward reactor.
289
    Forward(forward::CtrlCmd<F>),
290
    /// A control command for the backward reactor.
291
    Backward(backward::CtrlCmd<B>),
292
    /// Shut down the reactor.
293
    Shutdown,
294
}
295

            
296
/// A control message.
297
#[allow(unused)] // TODO(relay)
298
pub(crate) enum CtrlMsg<F, B> {
299
    /// A control message for the forward reactor.
300
    Forward(forward::CtrlMsg<F>),
301
    /// A control message for the backward reactor.
302
    Backward(backward::CtrlMsg<B>),
303
}
304

            
305
/// The entry point of the circuit reactor subsystem.
306
#[derive(Deftly)]
307
#[derive_deftly(CircuitReactor)]
308
#[deftly(reactor_name = "circuit reactor")]
309
#[deftly(only_run_once)]
310
#[deftly(run_inner_fn = "Self::run_inner")]
311
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
312
pub(crate) struct Reactor<R: Runtime, F: ForwardHandler, B: BackwardHandler> {
313
    /// The process-unique identifier of this circuit.
314
    ///
315
    /// Used for logging.
316
    unique_id: UniqId,
317
    /// The reactor for handling
318
    ///
319
    ///   * cells moving in the forward direction (from the client towards exit), if we are a relay
320
    ///   * incoming cells (coming from the guard), if we are a client
321
    ///
322
    /// Optional so we can move it out of self in run().
323
    forward: Option<ForwardReactor<R, F>>,
324
    /// The reactor for handling
325
    ///
326
    ///   * cells moving in the backward direction (from the exit towards client), if we are a relay
327
    ///   * outgoing cells (moving towards the guard), if we are a client
328
    ///
329
    /// Optional so we can move it out of self in run().
330
    backward: Option<BackwardReactor<B>>,
331
    /// Receiver for control messages for this reactor, sent by reactor handle objects.
332
    control: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
333
    /// Receiver for command messages for this reactor, sent by reactor handle objects.
334
    ///
335
    /// This MPSC channel is polled in [`run`](Self::run).
336
    ///
337
    /// NOTE: this is a separate channel from `control`, because some messages
338
    /// have higher priority and need to be handled even if the `inbound_chan_tx` is not
339
    /// ready (whereas `control` messages are not read until the `inbound_chan_tx` sink
340
    /// is ready to accept cells).
341
    command: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
342
    /// Control channels for the [`ForwardReactor`].
343
    ///
344
    /// Handles [`CtrlCmd::Forward`] and [`CtrlMsg::Forward`] messages.
345
    fwd_ctrl: ReactorCtrl<forward::CtrlCmd<F::CtrlCmd>, forward::CtrlMsg<F::CtrlMsg>>,
346
    /// Control channels for the [`BackwardReactor`].
347
    ///
348
    /// Handles [`CtrlCmd::Backward`] and [`CtrlMsg::Backward`] messages.
349
    bwd_ctrl: ReactorCtrl<backward::CtrlCmd<B::CtrlCmd>, backward::CtrlMsg<B::CtrlMsg>>,
350
}
351

            
352
/// A handle for sending control/command messages to a FWD or BWD.
353
struct ReactorCtrl<C, M> {
354
    /// Sender for control commands.
355
    command_tx: mpsc::UnboundedSender<C>,
356
    /// Sender for control messages.
357
    control_tx: mpsc::UnboundedSender<M>,
358
}
359

            
360
impl<C, M> ReactorCtrl<C, M> {
361
    /// Create a new sender handle.
362
32
    fn new(command_tx: mpsc::UnboundedSender<C>, control_tx: mpsc::UnboundedSender<M>) -> Self {
363
32
        Self {
364
32
            command_tx,
365
32
            control_tx,
366
32
        }
367
32
    }
368

            
369
    /// Send a control command.
370
4
    fn send_cmd(&mut self, cmd: C) -> Result<(), ReactorError> {
371
4
        self.command_tx
372
4
            .unbounded_send(cmd)
373
4
            .map_err(|_| ReactorError::Shutdown)
374
4
    }
375

            
376
    /// Send a control message.
377
    fn send_msg(&mut self, msg: M) -> Result<(), ReactorError> {
378
        self.control_tx
379
            .unbounded_send(msg)
380
            .map_err(|_| ReactorError::Shutdown)
381
    }
382
}
383

            
384
/// Trait implemented by types that can handle control messages and commands.
385
pub(crate) trait ControlHandler {
386
    /// The type of control message expected by the forward reactor.
387
    type CtrlMsg;
388

            
389
    /// The type of control command expected by the forward reactor.
390
    type CtrlCmd;
391

            
392
    // TODO(DEDUP): do these APIs make sense?
393
    // What should we return here, maybe some instructions for the base reactor
394
    // to do something?
395

            
396
    /// Handle a control command.
397
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError>;
398

            
399
    /// Handle a control message.
400
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError>;
401
}
402

            
403
#[allow(unused)] // TODO(relay)
404
impl<R: Runtime, F: ForwardHandler + ControlHandler, B: BackwardHandler + ControlHandler>
405
    Reactor<R, F, B>
406
{
407
    /// Create a new circuit reactor.
408
    ///
409
    /// The reactor will send outbound messages on `channel`, receive incoming
410
    /// messages on `inbound_chan_rx`, and identify this circuit by the channel-local
411
    /// [`CircId`] provided.
412
    ///
413
    /// The internal unique identifier for this circuit will be `unique_id`.
414
    #[allow(clippy::too_many_arguments)] // TODO
415
16
    pub(crate) fn new(
416
16
        runtime: R,
417
16
        channel: &Arc<Channel>,
418
16
        circ_id: CircId,
419
16
        unique_id: UniqId,
420
16
        inbound_chan_rx: CircuitRxReceiver,
421
16
        forward_impl: F,
422
16
        backward_impl: B,
423
16
        hop_mgr: HopMgr<R>,
424
16
        padding_ctrl: PaddingController,
425
16
        padding_event_stream: PaddingEventStream,
426
16
        // The sending end of this channel should be in HopMgr
427
16
        bwd_rx: mpsc::Receiver<ReadyStreamMsg>,
428
16
        fwd_events: mpsc::Receiver<F::CircEvent>,
429
16
        memquota: &CircuitAccount,
430
16
    ) -> (Self, CircReactorHandle<F, B>) {
431
        // NOTE: not registering this channel with the memquota subsystem is okay,
432
        // because it has no buffering (if ever decide to make the size of this buffer
433
        // non-zero for whatever reason, we must remember to register it with memquota
434
        // so that it counts towards the total memory usage for the circuit.
435
        #[allow(clippy::disallowed_methods)]
436
16
        let (backward_reactor_tx, forward_reactor_rx) = mpsc::channel(0);
437

            
438
        // TODO: channels galore
439
16
        let (control_tx, control_rx) = mpsc::unbounded();
440
16
        let (command_tx, command_rx) = mpsc::unbounded();
441

            
442
16
        let (fwd_control_tx, fwd_control_rx) = mpsc::unbounded();
443
16
        let (fwd_command_tx, fwd_command_rx) = mpsc::unbounded();
444
16
        let (bwd_control_tx, bwd_control_rx) = mpsc::unbounded();
445
16
        let (bwd_command_tx, bwd_command_rx) = mpsc::unbounded();
446

            
447
16
        let fwd_ctrl = ReactorCtrl::new(fwd_command_tx, fwd_control_tx);
448
16
        let bwd_ctrl = ReactorCtrl::new(bwd_command_tx, bwd_control_tx);
449

            
450
16
        let handle = CircReactorHandle {
451
16
            control: control_tx,
452
16
            command: command_tx,
453
16
            time_provider: DynTimeProvider::new(runtime.clone()),
454
16
            memquota: memquota.clone(),
455
16
        };
456

            
457
        /// Grab a handle to the hop list (it's needed by the BWD)
458
16
        let hops = Arc::clone(hop_mgr.hops());
459
16
        let forward = ForwardReactor::new(
460
16
            runtime.clone(),
461
16
            unique_id,
462
16
            forward_impl,
463
16
            hop_mgr,
464
16
            inbound_chan_rx,
465
16
            fwd_control_rx,
466
16
            fwd_command_rx,
467
16
            backward_reactor_tx,
468
16
            fwd_events,
469
16
            padding_ctrl.clone(),
470
        );
471

            
472
16
        let backward = BackwardReactor::new(
473
16
            runtime,
474
16
            channel,
475
16
            circ_id,
476
16
            unique_id,
477
16
            backward_impl,
478
16
            hops,
479
16
            forward_reactor_rx,
480
16
            bwd_control_rx,
481
16
            bwd_command_rx,
482
16
            padding_ctrl,
483
16
            padding_event_stream,
484
16
            bwd_rx,
485
        );
486

            
487
16
        let reactor = Reactor {
488
16
            unique_id,
489
16
            forward: Some(forward),
490
16
            backward: Some(backward),
491
16
            control: control_rx,
492
16
            command: command_rx,
493
16
            fwd_ctrl,
494
16
            bwd_ctrl,
495
16
        };
496

            
497
16
        (reactor, handle)
498
16
    }
499

            
500
    /// Helper for [`run`](Self::run).
501
16
    pub(crate) async fn run_inner(&mut self) -> StdResult<(), ReactorError> {
502
16
        let (forward, backward) = (|| Some((self.forward.take()?, self.backward.take()?)))()
503
16
            .expect("relay reactor spawned twice?!");
504

            
505
16
        let mut forward = Box::pin(forward.run()).fuse();
506
16
        let mut backward = Box::pin(backward.run()).fuse();
507
        loop {
508
            // If either of these completes, this function returns,
509
            // dropping fwd_ctrl/bwd_ctrl channels, which will, in turn,
510
            // cause the remaining reactor, if there is one, to shut down too
511
20
            select_biased! {
512
20
                res = self.command.next() => {
513
8
                    let Some(cmd) = res else {
514
4
                        trace!(
515
                            circ_id = %self.unique_id,
516
                            reason = "command channel drop",
517
                            "reactor shutdown",
518
                        );
519

            
520
4
                        return Err(ReactorError::Shutdown);
521
                    };
522

            
523
4
                    self.handle_command(cmd)?;
524
                },
525
20
                res = self.control.next() => {
526
                    let Some(msg) = res else {
527
                        trace!(
528
                            circ_id = %self.unique_id,
529
                            reason = "control channel drop",
530
                            "reactor shutdown",
531
                        );
532

            
533
                        return Err(ReactorError::Shutdown);
534
                    };
535

            
536
                    self.handle_control(msg)?;
537
                },
538
                // No need to log the error here, because it was already logged
539
                // by the reactor that shut down
540
12
                res = forward => return Ok(res?),
541
                res = backward => return Ok(res?),
542
            }
543
        }
544
16
    }
545

            
546
    /// Handle a shutdown request.
547
    fn handle_shutdown(&self) -> StdResult<(), ReactorError> {
548
        trace!(
549
            tunnel_id = %self.unique_id,
550
            "reactor shutdown due to explicit request",
551
        );
552

            
553
        Err(ReactorError::Shutdown)
554
    }
555

            
556
    /// Handle a [`CtrlCmd`].
557
4
    fn handle_command(
558
4
        &mut self,
559
4
        cmd: CtrlCmd<F::CtrlCmd, B::CtrlCmd>,
560
4
    ) -> StdResult<(), ReactorError> {
561
4
        match cmd {
562
4
            CtrlCmd::Forward(c) => self.fwd_ctrl.send_cmd(c),
563
            CtrlCmd::Backward(c) => self.bwd_ctrl.send_cmd(c),
564
            CtrlCmd::Shutdown => self.handle_shutdown(),
565
        }
566
4
    }
567

            
568
    /// Handle a [`CtrlMsg`].
569
    fn handle_control(
570
        &mut self,
571
        cmd: CtrlMsg<F::CtrlMsg, B::CtrlMsg>,
572
    ) -> StdResult<(), ReactorError> {
573
        match cmd {
574
            CtrlMsg::Forward(c) => self.fwd_ctrl.send_msg(c),
575
            CtrlMsg::Backward(c) => self.bwd_ctrl.send_msg(c),
576
        }
577
    }
578
}
579

            
580
#[cfg(test)]
581
pub(crate) mod test {
582
    // @@ begin test lint list maintained by maint/add_warning @@
583
    #![allow(clippy::bool_assert_comparison)]
584
    #![allow(clippy::clone_on_copy)]
585
    #![allow(clippy::dbg_macro)]
586
    #![allow(clippy::mixed_attributes_style)]
587
    #![allow(clippy::print_stderr)]
588
    #![allow(clippy::print_stdout)]
589
    #![allow(clippy::single_char_pattern)]
590
    #![allow(clippy::unwrap_used)]
591
    #![allow(clippy::unchecked_time_subtraction)]
592
    #![allow(clippy::useless_vec)]
593
    #![allow(clippy::needless_pass_by_value)]
594
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
595

            
596
    use tor_basic_utils::test_rng::testing_rng;
597
    use tor_cell::chancell::{BoxedCellBody, msg as chanmsg};
598
    use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, msg as relaymsg};
599

            
600
    use chanmsg::AnyChanMsg;
601

            
602
    #[cfg(feature = "hs-service")]
603
    use crate::client::stream::IncomingStreamRequestFilter;
604

            
605
    pub(crate) fn rmsg_to_ccmsg(
606
        id: Option<StreamId>,
607
        msg: relaymsg::AnyRelayMsg,
608
        early: bool,
609
    ) -> AnyChanMsg {
610
        // TODO #1947: test other formats.
611
        let rfmt = RelayCellFormat::V0;
612
        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
613
            .encode(rfmt, &mut testing_rng())
614
            .unwrap();
615
        let chanmsg = chanmsg::Relay::from(body);
616

            
617
        if early {
618
            let chanmsg = chanmsg::RelayEarly::from(chanmsg);
619
            AnyChanMsg::RelayEarly(chanmsg)
620
        } else {
621
            AnyChanMsg::Relay(chanmsg)
622
        }
623
    }
624

            
625
    #[cfg(any(feature = "hs-service", feature = "relay"))]
626
    pub(crate) struct AllowAllStreamsFilter;
627
    #[cfg(any(feature = "hs-service", feature = "relay"))]
628
    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
629
        fn disposition(
630
            &mut self,
631
            _ctx: &crate::client::stream::IncomingStreamRequestContext<'_>,
632
            _circ: &crate::circuit::CircHopSyncView<'_>,
633
        ) -> crate::Result<crate::client::stream::IncomingStreamRequestDisposition> {
634
            Ok(crate::client::stream::IncomingStreamRequestDisposition::Accept)
635
        }
636
    }
637
}