1
//! Module exposing the relay circuit reactor subsystem.
2
//!
3
//! See [`reactor`](crate::circuit::reactor) for a description of the overall architecture.
4
//!
5
//! #### `ForwardReactor`
6
//!
7
//! It handles
8
//!
9
//!  * unrecognized RELAY cells, by moving them in the forward direction (towards the exit)
10
//!  * recognized RELAY cells, by splitting each cell into messages, and handling
11
//!    each message individually as described in the table below
12
//!    (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell).
13
//!  * RELAY_EARLY cells (**not yet implemented**)
14
//!  * DESTROY cells (**not yet implemented**)
15
//!  * PADDING_NEGOTIATE cells (**not yet implemented**)
16
//!
17
//! ```text
18
//!
19
//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
20
//!
21
//! | RELAY cmd         | Received in | Handled in | Description                            |
22
//! |-------------------|-------------|------------|----------------------------------------|
23
//! | DROP              | F           | F          | Passed to PaddingController for        |
24
//! |                   |             |            | validation                             |
25
//! |-------------------|-------------|------------|----------------------------------------|
26
//! | EXTEND2           | F           |            | Handled by instructing the channel     |
27
//! |                   |             |            | provider to launch a new channel, and  |
28
//! |                   |             |            | waiting for the new channel on its     |
29
//! |                   |             |            | outgoing_chan_rx receiver              |
30
//! |                   |             |            | (**not yet implemented**)              |
31
//! |-------------------|-------------|------------|----------------------------------------|
32
//! | TRUNCATE          | F           | F          | (**not yet implemented**)              |
33
//! |                   |             |            |                                        |
34
//! |-------------------|-------------|------------|----------------------------------------|
35
//! | TODO              |             |            |                                        |
36
//! |                   |             |            |                                        |
37
//! ```
38

            
39
pub(crate) mod backward;
40
pub(crate) mod forward;
41

            
42
use std::sync::Arc;
43

            
44
use futures::channel::mpsc;
45

            
46
use tor_cell::chancell::CircId;
47
use tor_linkspec::OwnedChanTarget;
48
use tor_rtcompat::Runtime;
49

            
50
use crate::channel::Channel;
51
use crate::circuit::circhop::HopSettings;
52
use crate::circuit::reactor::Reactor as BaseReactor;
53
use crate::circuit::reactor::hop_mgr::HopMgr;
54
use crate::circuit::{CircuitRxReceiver, UniqId};
55
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
56
use crate::memquota::CircuitAccount;
57
use crate::relay::RelayCirc;
58
use crate::relay::channel_provider::ChannelProvider;
59
use crate::relay::reactor::backward::Backward;
60
use crate::relay::reactor::forward::Forward;
61
use crate::util::timeout::TimeoutEstimator;
62

            
63
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
64
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
65

            
66
/// Type-alias for the relay base reactor type.
67
type RelayBaseReactor<R> = BaseReactor<R, Forward, Backward>;
68

            
69
/// The entry point of the circuit reactor subsystem.
70
#[allow(unused)] // TODO(relay)
71
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
72
pub(crate) struct Reactor<R: Runtime>(RelayBaseReactor<R>);
73

            
74
#[allow(unused)] // TODO(relay)
75
impl<R: Runtime> Reactor<R> {
76
    /// Create a new circuit reactor.
77
    ///
78
    /// The reactor will send outbound messages on `channel`, receive incoming
79
    /// messages on `input`, and identify this circuit by the channel-local
80
    /// [`CircId`] provided.
81
    ///
82
    /// The internal unique identifier for this circuit will be `unique_id`.
83
    #[allow(clippy::too_many_arguments)] // TODO
84
    pub(super) fn new(
85
        runtime: R,
86
        channel: &Arc<Channel>,
87
        circ_id: CircId,
88
        unique_id: UniqId,
89
        input: CircuitRxReceiver,
90
        crypto_in: Box<dyn InboundRelayLayer + Send>,
91
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
92
        settings: &HopSettings,
93
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send>,
94
        padding_ctrl: PaddingController,
95
        padding_event_stream: PaddingEventStream,
96
        timeouts: Arc<dyn TimeoutEstimator>,
97
        memquota: &CircuitAccount,
98
    ) -> crate::Result<(Self, Arc<RelayCirc>)> {
99
        // NOTE: not registering this channel with the memquota subsystem is okay,
100
        // because it has no buffering (if ever decide to make the size of this buffer
101
        // non-zero for whatever reason, we must remember to register it with memquota
102
        // so that it counts towards the total memory usage for the circuit.
103
        #[allow(clippy::disallowed_methods)]
104
        let (stream_tx, stream_rx) = mpsc::channel(0);
105

            
106
        let mut hop_mgr = HopMgr::new(
107
            runtime.clone(),
108
            unique_id,
109
            timeouts,
110
            stream_tx,
111
            memquota.clone(),
112
        );
113

            
114
        // On the relay side, we always have one "hop" (ourselves).
115
        //
116
        // Clients will need to call this function in response to CtrlMsg::Create
117
        // (TODO: for clients, we probably will need to store a bunch more state here)
118
        hop_mgr.add_hop(settings.clone())?;
119

            
120
        // TODO(relay): currently we don't need buffering on this channel,
121
        // but we might need it if we start using it for more than just EXTENDED2 events
122
        #[allow(clippy::disallowed_methods)]
123
        let (fwd_ev_tx, fwd_ev_rx) = mpsc::channel(0);
124
        let forward_foo = Forward::new(unique_id, crypto_out, chan_provider, fwd_ev_tx);
125
        let backward_foo = Backward::new(crypto_in);
126

            
127
        let (inner, handle) = crate::circuit::reactor::Reactor::new(
128
            runtime,
129
            channel,
130
            circ_id,
131
            unique_id,
132
            input,
133
            forward_foo,
134
            backward_foo,
135
            hop_mgr,
136
            padding_ctrl,
137
            padding_event_stream,
138
            stream_rx,
139
            fwd_ev_rx,
140
            memquota,
141
        );
142

            
143
        let reactor = Self(inner);
144
        let handle = Arc::new(RelayCirc(handle));
145

            
146
        Ok((reactor, handle))
147
    }
148

            
149
    /// Launch the reactor, and run until the circuit closes or we
150
    /// encounter an error.
151
    ///
152
    /// Once this method returns, the circuit is dead and cannot be
153
    /// used again.
154
    pub(crate) async fn run(mut self) -> crate::Result<()> {
155
        self.0.run().await
156
    }
157
}