1
//! Module exposing the circuit reactor subsystem.
2
//!
3
//! This module implements the new [multi-reactor circuit subsystem].
4
//!
5
// Note: this is currently only used for the relay side,
6
// but we plan to eventually rewrite client circuit implementation
7
// to use these new reactor types as well.
8
//!
9
//! The entry point of the reactor is [`Reactor::run`], which launches the
10
//! reactor background tasks, and begins listening for inbound cells on the provided
11
//! inbound Tor channel.
12
//!
13
//! ### Architecture
14
//!
15
//! Internally, the circuit reactor consists of multiple reactors,
16
//! each running in a separate task:
17
//!
18
//!   * [`StreamReactor`] (one per hop): handles all messages arriving to,
19
//!     and coming from the streams of a given hop. The ready stream messages
20
//!     are sent to the [`BackwardReactor`]
21
//!   * [`ForwardReactor`]: handles incoming cells arriving on the
22
//!     "inbound" Tor channel (towards the guard, if we are a client, or towards
23
//!     the client, if we are a relay). If we are a client, it moves stream messages
24
//!     towards the corresponding [`StreamReactor`]. If we are a relay,
25
//!     in addition to sending any stream messages to the `StreamReactor`,
26
//!     this reactor also moves cells in the forward direction
27
//!     (from the client towards the exit)
28
//!   * [`BackwardReactor`]: writes cells to the "inbound" Tor channel:
29
//!     towards the client if we are a relay, or the towards the exit
30
//!     if we are a client.
31
//!
32
// TODO: the forward/backward terminology no longer makes sense! Come up with better terms...
33
//!
34
//! If we are an exit relay, the cell flow looks roughly like this:
35
//!
36
//! ```text
37
//!                             <stream_tx
38
//!                              MPSC (0)>
39
//!   +--------------> FWD -------------------------+
40
//!   |                 |                           |
41
//!   |                 |                           |
42
//!   |                 |                           |
43
//!   |                 |                           v
44
//! relay      BackwardReactorCmd            StreamReactor
45
//!   ^             <MPSC (0)>                      |
46
//!   |                 |                           |
47
//!   |                 |                           |
48
//!   |                 |                           |
49
//!   |                 v                           |
50
//!   +--------------- BWD <------------------------+
51
//!     application stream data    <stream_rx
52
//!                                 MPSC (0)>
53
//!
54
//! For a middle relay (the `StreamReactor` is omitted for brevity,
55
//! but middle relays can have one too, if leaky pipe is in use):
56
//!
57
//! ```text                   unrecognized cell
58
//!   +--------------> FWD -------------------------+
59
//!   |                 |                           |
60
//!   |                 |                           |
61
//!   |                 |                           |
62
//!   |                 |                           v
63
//! client      BackwardReactorCmd                relay
64
//! or relay        <MPSC (0)>                      |
65
//!   ^                 |                           |
66
//!   |                 |                           |
67
//!   |                 |                           |
68
//!   |                 |                           |
69
//!   |                 v                           |
70
//!   +--------------- BWD <------------------------+
71
//! ```
72
//!
73
//! On the client-side the `ForwardReactor` reads cells from the Tor channel to the guard,
74
//! and the `BackwardReactor` writes to it.
75
//!
76
//! ```text
77
//!   +--------------- FWD <--------------------+
78
//!   |                 |                       |
79
//!   |                 |                       |
80
//!   |                 |                       |
81
//!   v                 |                       |
82
//! StreamReactor  BackwardReactorCmd         guard
83
//!   |               <MPSC (0)>                ^
84
//!   |                 |                       |
85
//!   |                 |                       |
86
//!   |                 |                       |
87
//!   |                 v                       |
88
//!   +--------------> BWD ---------------------+
89
//! ```
90
//!
91
//! Client with leaky pipe (`SR` = `StreamReactor`):
92
//!
93
//! ```text
94
//!   +------------------------------+
95
//!   |       +--------------------+ | (1 MPSC TX per SR)
96
//!   |       |                    | |
97
//!   |       |       +----------- FWD <------------------+
98
//!   |       |       |             |                     |
99
//!   |       |       |             |                     |
100
//!   |       |       |             |                     |
101
//!   v       v       v             |                     |
102
//!  SR      SR      SR           BackwardReactorCmd    guard
103
//! (hop 4) (hop 3)  (hop 2)      <MPSC (0)>              ^
104
//!   |       |       |             |                     |
105
//!   |       |       |             |                     |
106
//!   |       |       |             |                     |
107
//!   |       |       |             v                     |
108
//!   |       |       |            BWD -------------------+
109
//!   |       |       |             ^
110
//!   |       |       |             |
111
//!   |       |       |             | <stream_rx
112
//!   |       |       |             |  MPSC (0)>
113
//!   +-------+-------+-------------+
114
//! ```
115
//!
116
// TODO(tuning): The inter-reactor MPSC channels have no buffering,
117
// which is likely going to be bad for performance,
118
// so we will need to tune the sizes of these MPSC buffers.
119
//!
120
//! The read and write ends of the inbound and outbound Tor channels are "split",
121
//! such that each reactor holds an `inbound_chan_rx` stream (for reading)
122
//! and a `inbound_chan_tx` sink (for writing):
123
//!
124
//!  * `ForwardReactor` holds the reading end of the inbound
125
//!    (coming from the client, if we are a relay, or coming from the guard, if we are a client)
126
//!    Tor channel, and the writing end of the outbound (towards the exit, if we are a middle relay)
127
//!    Tor channel, if there is one
128
//!  * `BackwardReactor` holds the reading end of the outbound channel, if there is one,
129
//!    and the writing end of the inbound channel, if there is one
130
//!
131
//! #### `ForwardReactor`
132
//!
133
//! It handles
134
//!
135
//!  * unrecognized RELAY cells, by delegating to the implementation-dependent
136
//!    [`ForwardHandler::handle_unrecognized_cell`]
137
//!  * recognized RELAY cells, by splitting each cell into individual messages, and handling
138
//!    each message individually as described in the table below
139
//!    (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell).
140
//!
141
//! ```text
142
//!
143
//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
144
//!
145
//! | RELAY cmd         | Received in | Handled in | Description                            |
146
//! |-------------------|-------------|------------|----------------------------------------|
147
//! | SENDME            | F           | B          | Sent to BackwardReactor for handling   |
148
//! |                   |             |            | (BackwardReactorCmd::HandleSendme)     |
149
//! |                   |             |            | because the forward reactor doesn't    |
150
//! |                   |             |            | have access to the inbound_chan_tx part|
151
//! |                   |             |            | of the inbound (towards the client)    |
152
//! |                   |             |            | Tor channel, and so cannot obtain the  |
153
//! |                   |             |            | congestion signals needed for SENDME   |
154
//! |                   |             |            | handling                               |
155
//! |-------------------|-------------|------------|----------------------------------------|
156
//! | Other             | F           | F          | Passed to impl-dependent handler       |
157
//! | (StreamId = 0)    |             |            |  `ForwardHandler::handle_meta_msg()`   |
158
//! |-------------------|-------------|------------|----------------------------------------|
159
//! | Other             | F           | S          | All messages with a non-zero stream ID |
160
//! | (StreamId != 0)   |             |            | are forwarded to the stream reactor    |
161
//! |-------------------|-------------|------------|----------------------------------------|
162
//! ```
163
//!
164
//! #### `BackwardReactor`
165
//!
166
//! It handles
167
//!
168
//!  * the packaging and delivery of all cells that need to be written to the "inbound" Tor channel
169
//!    (it writes them to the towards-the-client Tor channel sink) (**partially implemented**)
170
//!  * incoming cells coming over the "outbound" Tor channel. This channel only exists
171
//!    if we are a middle relay. These cells are relayed to the "inbound" Tor channel (**not implemented**).
172
//!  * the sending of padding cells, according to the PaddingController's instructions
173
//!
174
//! This multi-reactor architecture should, in theory, have better performance than
175
//! a single reactor system, because it enables us to parallelize some of the work:
176
//! the forward and backward directions share little state,
177
//! because they read from, and write to, different sinks/streams,
178
//! so they can be run in parallel (as separate tasks).
179
//! With a single reactor architecture, the reactor would need to drive
180
//! both the forward and the backward direction, and on each iteration
181
//! would need to decide which to prioritize, which might prove tricky
182
//! (though prioritizing one of them at random would've probably been good enough).
183
//!
184
//! The monolithic single reactor alternative would also have been significantly
185
//! more convoluted, and so more difficult to maintain in the long run.
186
//!
187
//
188
// NOTE: The FWD and BWD currently share the hop list containing the per-hop state,
189
// (including the congestion control object, which is behind a mutex).
190
//
191
//! [multi-reactor circuit subsystem]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/relay-conflux.md
192
//! [`StreamReactor`]: stream::StreamReactor
193

            
194
// TODO(DEDUP): this will replace CircHopList when we rewrite the client reactor
195
// to use the new reactor architecture
196
pub(crate) mod circhop;
197

            
198
pub(crate) mod backward;
199
pub(crate) mod forward;
200
pub(crate) mod hop_mgr;
201
pub(crate) mod macros;
202
pub(crate) mod stream;
203

            
204
use std::result::Result as StdResult;
205
use std::sync::Arc;
206

            
207
use derive_deftly::Deftly;
208
use futures::channel::mpsc;
209
use futures::{FutureExt as _, StreamExt as _, select_biased};
210
use oneshot_fused_workaround as oneshot;
211
use tracing::trace;
212

            
213
use tor_cell::chancell::CircId;
214
use tor_rtcompat::{DynTimeProvider, Runtime};
215

            
216
use crate::channel::Channel;
217
use crate::circuit::reactor::backward::BackwardHandler;
218
use crate::circuit::reactor::forward::ForwardHandler;
219
use crate::circuit::reactor::hop_mgr::HopMgr;
220
use crate::circuit::reactor::stream::ReadyStreamMsg;
221
use crate::circuit::{CircuitRxReceiver, UniqId};
222
use crate::memquota::CircuitAccount;
223
use crate::util::err::ReactorError;
224

            
225
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
226
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
227

            
228
use backward::BackwardReactor;
229
use forward::ForwardReactor;
230
use macros::derive_deftly_template_CircuitReactor;
231

            
232
/// The type of a oneshot channel used to inform reactor of the result of an operation.
233
pub(crate) type ReactorResultChannel<T> = oneshot::Sender<crate::Result<T>>;
234

            
235
/// A handle for interacting with a circuit reactor.
236
#[derive(derive_more::Debug)]
237
pub(crate) struct CircReactorHandle<F: ForwardHandler, B: BackwardHandler> {
238
    /// Sender for reactor control messages.
239
    #[debug(skip)]
240
    pub(crate) control: mpsc::UnboundedSender<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
241
    /// Sender for reactor control commands.
242
    #[debug(skip)]
243
    pub(crate) command: mpsc::UnboundedSender<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
244
    /// The time provider.
245
    pub(crate) time_provider: DynTimeProvider,
246
    /// Memory quota account
247
    pub(crate) memquota: CircuitAccount,
248
}
249

            
250
/// A control command.
251
///
252
/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
253
/// never cause cells to sent on the Tor channel,
254
/// while `CtrlMsg`s potentially do.
255
#[allow(unused)] // TODO(relay)
256
pub(crate) enum CtrlCmd<F, B> {
257
    /// A control command for the forward reactor.
258
    Forward(forward::CtrlCmd<F>),
259
    /// A control command for the backward reactor.
260
    Backward(backward::CtrlCmd<B>),
261
    /// Shut down the reactor.
262
    Shutdown,
263
}
264

            
265
/// A control message.
266
#[allow(unused)] // TODO(relay)
267
pub(crate) enum CtrlMsg<F, B> {
268
    /// A control message for the forward reactor.
269
    Forward(forward::CtrlMsg<F>),
270
    /// A control message for the backward reactor.
271
    Backward(backward::CtrlMsg<B>),
272
}
273

            
274
/// The entry point of the circuit reactor subsystem.
275
#[derive(Deftly)]
276
#[derive_deftly(CircuitReactor)]
277
#[deftly(reactor_name = "circuit reactor")]
278
#[deftly(only_run_once)]
279
#[deftly(run_inner_fn = "Self::run_inner")]
280
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
281
pub(crate) struct Reactor<R: Runtime, F: ForwardHandler, B: BackwardHandler> {
282
    /// The process-unique identifier of this circuit.
283
    ///
284
    /// Used for logging.
285
    unique_id: UniqId,
286
    /// The reactor for handling
287
    ///
288
    ///   * cells moving in the forward direction (from the client towards exit), if we are a relay
289
    ///   * incoming cells (coming from the guard), if we are a client
290
    ///
291
    /// Optional so we can move it out of self in run().
292
    forward: Option<ForwardReactor<R, F>>,
293
    /// The reactor for handling
294
    ///
295
    ///   * cells moving in the backward direction (from the exit towards client), if we are a relay
296
    ///   * outgoing cells (moving towards the guard), if we are a client
297
    ///
298
    /// Optional so we can move it out of self in run().
299
    backward: Option<BackwardReactor<B>>,
300
    /// Receiver for control messages for this reactor, sent by reactor handle objects.
301
    control: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
302
    /// Receiver for command messages for this reactor, sent by reactor handle objects.
303
    ///
304
    /// This MPSC channel is polled in [`run`](Self::run).
305
    ///
306
    /// NOTE: this is a separate channel from `control`, because some messages
307
    /// have higher priority and need to be handled even if the `inbound_chan_tx` is not
308
    /// ready (whereas `control` messages are not read until the `inbound_chan_tx` sink
309
    /// is ready to accept cells).
310
    command: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
311
    /// Control channels for the [`ForwardReactor`].
312
    ///
313
    /// Handles [`CtrlCmd::Forward`] and [`CtrlMsg::Forward`] messages.
314
    fwd_ctrl: ReactorCtrl<forward::CtrlCmd<F::CtrlCmd>, forward::CtrlMsg<F::CtrlMsg>>,
315
    /// Control channels for the [`BackwardReactor`].
316
    ///
317
    /// Handles [`CtrlCmd::Backward`] and [`CtrlMsg::Backward`] messages.
318
    bwd_ctrl: ReactorCtrl<backward::CtrlCmd<B::CtrlCmd>, backward::CtrlMsg<B::CtrlMsg>>,
319
}
320

            
321
/// A handle for sending control/command messages to a FWD or BWD.
322
struct ReactorCtrl<C, M> {
323
    /// Sender for control commands.
324
    command_tx: mpsc::UnboundedSender<C>,
325
    /// Sender for control messages.
326
    control_tx: mpsc::UnboundedSender<M>,
327
}
328

            
329
impl<C, M> ReactorCtrl<C, M> {
330
    /// Create a new sender handle.
331
    fn new(command_tx: mpsc::UnboundedSender<C>, control_tx: mpsc::UnboundedSender<M>) -> Self {
332
        Self {
333
            command_tx,
334
            control_tx,
335
        }
336
    }
337

            
338
    /// Send a control command.
339
    fn send_cmd(&mut self, cmd: C) -> Result<(), ReactorError> {
340
        self.command_tx
341
            .unbounded_send(cmd)
342
            .map_err(|_| ReactorError::Shutdown)
343
    }
344

            
345
    /// Send a control message.
346
    fn send_msg(&mut self, msg: M) -> Result<(), ReactorError> {
347
        self.control_tx
348
            .unbounded_send(msg)
349
            .map_err(|_| ReactorError::Shutdown)
350
    }
351
}
352

            
353
/// Trait implemented by types that can handle control messages and commands.
354
pub(crate) trait ControlHandler {
355
    /// The type of control message expected by the forward reactor.
356
    type CtrlMsg;
357

            
358
    /// The type of control command expected by the forward reactor.
359
    type CtrlCmd;
360

            
361
    // TODO(DEDUP): do these APIs make sense?
362
    // What should we return here, maybe some instructions for the base reactor
363
    // to do something?
364

            
365
    /// Handle a control command.
366
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError>;
367

            
368
    /// Handle a control message.
369
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError>;
370
}
371

            
372
#[allow(unused)] // TODO(relay)
373
impl<R: Runtime, F: ForwardHandler + ControlHandler, B: BackwardHandler + ControlHandler>
374
    Reactor<R, F, B>
375
{
376
    /// Create a new circuit reactor.
377
    ///
378
    /// The reactor will send outbound messages on `channel`, receive incoming
379
    /// messages on `inbound_chan_rx`, and identify this circuit by the channel-local
380
    /// [`CircId`] provided.
381
    ///
382
    /// The internal unique identifier for this circuit will be `unique_id`.
383
    #[allow(clippy::too_many_arguments)] // TODO
384
    pub(crate) fn new(
385
        runtime: R,
386
        channel: &Arc<Channel>,
387
        circ_id: CircId,
388
        unique_id: UniqId,
389
        inbound_chan_rx: CircuitRxReceiver,
390
        forward_impl: F,
391
        backward_impl: B,
392
        hop_mgr: HopMgr<R>,
393
        padding_ctrl: PaddingController,
394
        padding_event_stream: PaddingEventStream,
395
        // The sending end of this channel should be in HopMgr
396
        bwd_rx: mpsc::Receiver<ReadyStreamMsg>,
397
        fwd_events: mpsc::Receiver<F::CircEvent>,
398
        memquota: &CircuitAccount,
399
    ) -> (Self, CircReactorHandle<F, B>) {
400
        // NOTE: not registering this channel with the memquota subsystem is okay,
401
        // because it has no buffering (if ever decide to make the size of this buffer
402
        // non-zero for whatever reason, we must remember to register it with memquota
403
        // so that it counts towards the total memory usage for the circuit.
404
        #[allow(clippy::disallowed_methods)]
405
        let (backward_reactor_tx, forward_reactor_rx) = mpsc::channel(0);
406

            
407
        // TODO: channels galore
408
        let (control_tx, control_rx) = mpsc::unbounded();
409
        let (command_tx, command_rx) = mpsc::unbounded();
410

            
411
        let (fwd_control_tx, fwd_control_rx) = mpsc::unbounded();
412
        let (fwd_command_tx, fwd_command_rx) = mpsc::unbounded();
413
        let (bwd_control_tx, bwd_control_rx) = mpsc::unbounded();
414
        let (bwd_command_tx, bwd_command_rx) = mpsc::unbounded();
415

            
416
        let fwd_ctrl = ReactorCtrl::new(fwd_command_tx, fwd_control_tx);
417
        let bwd_ctrl = ReactorCtrl::new(bwd_command_tx, bwd_control_tx);
418

            
419
        let handle = CircReactorHandle {
420
            control: control_tx,
421
            command: command_tx,
422
            time_provider: DynTimeProvider::new(runtime.clone()),
423
            memquota: memquota.clone(),
424
        };
425

            
426
        /// Grab a handle to the hop list (it's needed by the BWD)
427
        let hops = Arc::clone(hop_mgr.hops());
428
        let forward = ForwardReactor::new(
429
            runtime.clone(),
430
            unique_id,
431
            forward_impl,
432
            hop_mgr,
433
            inbound_chan_rx,
434
            fwd_control_rx,
435
            fwd_command_rx,
436
            backward_reactor_tx,
437
            fwd_events,
438
            padding_ctrl.clone(),
439
        );
440

            
441
        let backward = BackwardReactor::new(
442
            runtime,
443
            channel,
444
            circ_id,
445
            unique_id,
446
            backward_impl,
447
            hops,
448
            forward_reactor_rx,
449
            bwd_control_rx,
450
            bwd_command_rx,
451
            padding_ctrl,
452
            padding_event_stream,
453
            bwd_rx,
454
        );
455

            
456
        let reactor = Reactor {
457
            unique_id,
458
            forward: Some(forward),
459
            backward: Some(backward),
460
            control: control_rx,
461
            command: command_rx,
462
            fwd_ctrl,
463
            bwd_ctrl,
464
        };
465

            
466
        (reactor, handle)
467
    }
468

            
469
    /// Helper for [`run`](Self::run).
470
    pub(crate) async fn run_inner(&mut self) -> StdResult<(), ReactorError> {
471
        let (forward, backward) = (|| Some((self.forward.take()?, self.backward.take()?)))()
472
            .expect("relay reactor spawned twice?!");
473

            
474
        let mut forward = Box::pin(forward.run()).fuse();
475
        let mut backward = Box::pin(backward.run()).fuse();
476
        loop {
477
            // If either of these completes, this function returns,
478
            // dropping fwd_ctrl/bwd_ctrl channels, which will, in turn,
479
            // cause the remaining reactor, if there is one, to shut down too
480
            select_biased! {
481
                res = self.command.next() => {
482
                    let Some(cmd) = res else {
483
                        trace!(
484
                            circ_id = %self.unique_id,
485
                            reason = "command channel drop",
486
                            "reactor shutdown",
487
                        );
488

            
489
                        return Err(ReactorError::Shutdown);
490
                    };
491

            
492
                    self.handle_command(cmd)?;
493
                },
494
                res = self.control.next() => {
495
                    let Some(msg) = res else {
496
                        trace!(
497
                            circ_id = %self.unique_id,
498
                            reason = "control channel drop",
499
                            "reactor shutdown",
500
                        );
501

            
502
                        return Err(ReactorError::Shutdown);
503
                    };
504

            
505
                    self.handle_control(msg)?;
506
                },
507
                // No need to log the error here, because it was already logged
508
                // by the reactor that shut down
509
                res = forward => return Ok(res?),
510
                res = backward => return Ok(res?),
511
            }
512
        }
513
    }
514

            
515
    /// Handle a shutdown request.
516
    fn handle_shutdown(&self) -> StdResult<(), ReactorError> {
517
        trace!(
518
            tunnel_id = %self.unique_id,
519
            "reactor shutdown due to explicit request",
520
        );
521

            
522
        Err(ReactorError::Shutdown)
523
    }
524

            
525
    /// Handle a [`CtrlCmd`].
526
    fn handle_command(
527
        &mut self,
528
        cmd: CtrlCmd<F::CtrlCmd, B::CtrlCmd>,
529
    ) -> StdResult<(), ReactorError> {
530
        match cmd {
531
            CtrlCmd::Forward(c) => self.fwd_ctrl.send_cmd(c),
532
            CtrlCmd::Backward(c) => self.bwd_ctrl.send_cmd(c),
533
            CtrlCmd::Shutdown => self.handle_shutdown(),
534
        }
535
    }
536

            
537
    /// Handle a [`CtrlMsg`].
538
    fn handle_control(
539
        &mut self,
540
        cmd: CtrlMsg<F::CtrlMsg, B::CtrlMsg>,
541
    ) -> StdResult<(), ReactorError> {
542
        match cmd {
543
            CtrlMsg::Forward(c) => self.fwd_ctrl.send_msg(c),
544
            CtrlMsg::Backward(c) => self.bwd_ctrl.send_msg(c),
545
        }
546
    }
547
}