1
//! Channel for sending messages to [`StreamReactor`].
2

            
3
use crate::circuit::UniqId;
4
use crate::circuit::circhop::{CircHopOutbound, HopSettings};
5
use crate::circuit::reactor::circhop::CircHopList;
6
use crate::circuit::reactor::stream::{ReadyStreamMsg, StreamHandler, StreamMsg, StreamReactor};
7
use crate::congestion::CongestionControl;
8
use crate::memquota::CircuitAccount;
9
use crate::util::err::ReactorError;
10
use crate::{Error, HopNum, Result};
11

            
12
#[cfg(any(feature = "hs-service", feature = "relay"))]
13
use crate::stream::incoming::IncomingStreamRequestHandler;
14

            
15
use tor_error::internal;
16
use tor_rtcompat::Runtime;
17

            
18
use futures::SinkExt;
19
use futures::channel::mpsc;
20

            
21
use std::result::Result as StdResult;
22
use std::sync::{Arc, Mutex, RwLock};
23

            
24
/// The hop manager of a reactor.
25
///
26
/// This contains the per-hop state (e.g. congestion control information),
27
/// and a handle to the stream reactor of the hop.
28
///
29
/// The stream reactor of the hop is launched lazily,
30
/// when the first [`StreamMsg`] is sent via [`HopMgr::send`].
31
pub(crate) struct HopMgr<R: Runtime> {
32
    /// A handle to the runtime.
33
    runtime: R,
34
    /// Context used when spawning a stream reactor.
35
    ctx: StreamReactorContext,
36
    /// Sender for sending messages to BWD.
37
    ///
38
    /// The receiver is in BWD.
39
    ///
40
    /// A clone of this is passed to each spawned StreamReactor
41
    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
42
    /// The underlying senders, indexed by [`HopNum`].
43
    ///
44
    /// Relays have at most one stream reactor per circuit.
45
    /// Clients have at most one stream reactor per circuit hop.
46
    ///
47
    /// This is shared with the backward reactor.
48
    /// The backward reactor only ever *reads* from this
49
    /// (it never mutates the list).
50
    ///
51
    // TODO: the backward reactor only ever reads from this.
52
    // Conceptually, it is the HopMgr that owns this list,
53
    // because only HopMgr can add hops to the list.
54
    //
55
    // Perhaps we need a specialized abstraction that only allows reading here.
56
    // This could be a wrapper over RwLock, providing a read-only API for the BWD.
57
    hops: Arc<RwLock<CircHopList>>,
58
    /// Memory quota account
59
    memquota: CircuitAccount,
60
}
61

            
62
/// State needed to build a stream reactor.
63
///
64
/// Used when spawning the stream reactor of a hop.
65
struct StreamReactorContext {
66
    /// An identifier for logging about this reactor's circuit.
67
    unique_id: UniqId,
68
    /// The incoming stream handler.
69
    ///
70
    /// This is shared with every StreamReactor.
71
    #[cfg(any(feature = "hs-service", feature = "relay"))]
72
    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
73
    /// A handler for customizing the stream reactor behavior.
74
    handler: Arc<dyn StreamHandler>,
75
}
76

            
77
impl<R: Runtime> HopMgr<R> {
78
    /// Create a new [`HopMgr`] with an empty hop list.
79
    ///
80
    /// Hops are added with [`HopMgr::add_hop`].
81
16
    pub(crate) fn new<S: StreamHandler>(
82
16
        runtime: R,
83
16
        unique_id: UniqId,
84
16
        handler: S,
85
16
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
86
16
        memquota: CircuitAccount,
87
16
    ) -> Self {
88
        // We don't spawn any stream reactors ahead of time.
89
        // Instead we spawn them lazily, when opening streams.
90
16
        let hops = Arc::new(RwLock::new(Default::default()));
91
16
        let ctx = StreamReactorContext {
92
16
            unique_id,
93
16
            #[cfg(any(feature = "hs-service", feature = "relay"))]
94
16
            incoming: Arc::new(Mutex::new(None)),
95
16
            handler: Arc::new(handler),
96
16
        };
97

            
98
16
        Self {
99
16
            runtime,
100
16
            hops,
101
16
            ctx,
102
16
            bwd_tx,
103
16
            memquota,
104
16
        }
105
16
    }
106

            
107
    /// Return a reference to our hop list.
108
40
    pub(crate) fn hops(&self) -> &Arc<RwLock<CircHopList>> {
109
40
        &self.hops
110
40
    }
111

            
112
    /// Set the incoming stream handler for this reactor.
113
    ///
114
    /// There can only be one incoming stream handler per reactor,
115
    /// and each stream handler only pertains to a single hop (see expected_hop())
116
    //
117
    // TODO: eventually, we might want a different design here,
118
    // for example we might want to allow multiple stream handlers per reactor (one per hop).
119
    // However, for now, the implementation is intentionally kept similar to that
120
    // in the client reactor (to make it easier to migrate it to the new reactor design).
121
    //
122
    /// Returns an error if the hop manager already has a stream handler.
123
    ///
124
    /// Since the handler is shared with every hop's stream reactor,
125
    /// this function will update the handler for all of them.
126
    ///
127
    // TODO(DEDUP): almost identical to the client-side
128
    // CellHandlers::set_incoming_stream_req_handler()
129
    #[cfg(any(feature = "hs-service", feature = "relay"))]
130
4
    pub(crate) fn set_incoming_handler(&self, handler: IncomingStreamRequestHandler) -> Result<()> {
131
4
        let mut lock = self.ctx.incoming.lock().expect("poisoned lock");
132

            
133
4
        if lock.is_none() {
134
4
            *lock = Some(handler);
135
4
            Ok(())
136
        } else {
137
            Err(Error::from(internal!(
138
                "Tried to install a BEGIN cell handler before the old one was gone."
139
            )))
140
        }
141
4
    }
142

            
143
    /// Push a new hop to our hop list.
144
    ///
145
    /// Prepares a cc object for the hop, but does not spawn a stream reactor.
146
    ///
147
    /// Will return an error if the circuit already has [`u8::MAX`] hops.
148
16
    pub(crate) fn add_hop(&mut self, settings: HopSettings) -> Result<()> {
149
16
        let mut hops = self.hops.write().expect("poisoned lock");
150
16
        hops.add_hop(settings)
151
16
    }
152

            
153
    /// Send a message to the stream reactor of the specified `hop`,
154
    /// spawning it if necessary.
155
    pub(crate) async fn send(
156
        &mut self,
157
        hopnum: Option<HopNum>,
158
        msg: StreamMsg,
159
    ) -> StdResult<(), ReactorError> {
160
        let mut tx = self.get_or_spawn_stream_reactor(hopnum)?;
161

            
162
        tx.send(msg).await.map_err(|_| {
163
            // The stream reactor has shut down
164
            ReactorError::Shutdown
165
        })
166
    }
167

            
168
    /// Get a handle to the stream reactor, spawning it if necessary
169
    fn get_or_spawn_stream_reactor(
170
        &self,
171
        hopnum: Option<HopNum>,
172
    ) -> StdResult<mpsc::Sender<StreamMsg>, ReactorError> {
173
        let mut hops = self.hops.write().expect("poisoned lock");
174
        let hop = hops
175
            .get_mut(hopnum)
176
            .ok_or_else(|| internal!("tried to send cell to nonexistent hop?!"))?;
177

            
178
        let tx = match &hop.tx {
179
            Some(tx) => tx.clone(),
180
            None => {
181
                // If we don't have a handle to the stream reactor,
182
                // it means it hasn't been spawned yet, so we have to spawn it now.
183
                let tx =
184
                    self.spawn_stream_reactor(hopnum, &hop.settings, Arc::clone(&hop.ccontrol))?;
185

            
186
                hop.tx = Some(tx.clone());
187

            
188
                // Return a copy of this sender (can't borrow because the hop
189
                // is behind a Mutex, and we can't keep it locked across the send()
190
                // await point)
191
                tx
192
            }
193
        };
194

            
195
        Ok(tx)
196
    }
197

            
198
    /// Spawn a [`StreamReactor`] for the specified hop.
199
    fn spawn_stream_reactor(
200
        &self,
201
        hopnum: Option<HopNum>,
202
        settings: &HopSettings,
203
        ccontrol: Arc<Mutex<CongestionControl>>,
204
    ) -> StdResult<mpsc::Sender<StreamMsg>, ReactorError> {
205
        use tor_rtcompat::SpawnExt as _;
206

            
207
        // NOTE: not registering this channel with the memquota subsystem is okay,
208
        // because it has no buffering (if ever decide to make the size of this buffer
209
        // non-zero for whatever reason, we must remember to register it with memquota
210
        // so that it counts towards the total memory usage for the circuit.
211
        //
212
        // TODO(tuning): having zero buffering here is very likely suboptimal.
213
        // We should do *some* buffering here, and then figure out if we should it
214
        // up to memquota or not.
215
        #[allow(clippy::disallowed_methods)]
216
        let (fwd_stream_tx, fwd_stream_rx) = mpsc::channel(0);
217

            
218
        let flow_ctrl_params = Arc::new(settings.flow_ctrl_params.clone());
219
        let relay_format = settings.relay_crypt_protocol().relay_cell_format();
220
        let outbound = CircHopOutbound::new(ccontrol, relay_format, flow_ctrl_params, settings);
221

            
222
        let stream_reactor = StreamReactor::new(
223
            self.runtime.clone(),
224
            hopnum,
225
            outbound,
226
            self.ctx.unique_id,
227
            fwd_stream_rx,
228
            self.bwd_tx.clone(),
229
            Arc::clone(&self.ctx.handler),
230
            #[cfg(any(feature = "hs-service", feature = "relay"))]
231
            Arc::clone(&self.ctx.incoming),
232
            self.memquota.clone(),
233
        );
234

            
235
        self.runtime
236
            .spawn(async {
237
                let _ = stream_reactor.run().await;
238
            })
239
            .map_err(|_| ReactorError::Shutdown)?;
240

            
241
        Ok(fwd_stream_tx)
242
    }
243
}