1
//! Channel for sending messages to [`StreamReactor`].
2

            
3
use crate::circuit::UniqId;
4
use crate::circuit::circhop::{CircHopOutbound, HopSettings};
5
use crate::circuit::reactor::circhop::CircHopList;
6
use crate::circuit::reactor::stream::{CtrlMsg, ReadyStreamMsg, StreamHandler, StreamReactor};
7
use crate::congestion::CongestionControl;
8
use crate::memquota::CircuitAccount;
9
use crate::util::err::ReactorError;
10
use crate::{Error, HopNum, Result};
11

            
12
#[cfg(any(feature = "hs-service", feature = "relay"))]
13
use {
14
    crate::stream::CloseStreamBehavior, crate::stream::incoming::IncomingStreamRequestHandler,
15
    tor_cell::relaycell::StreamId,
16
};
17

            
18
use tor_error::internal;
19
use tor_rtcompat::Runtime;
20

            
21
use futures::SinkExt;
22
use futures::channel::mpsc;
23

            
24
use std::result::Result as StdResult;
25
use std::sync::{Arc, Mutex, RwLock};
26

            
27
/// The hop manager of a reactor.
28
///
29
/// This contains the per-hop state (e.g. congestion control information),
30
/// and a handle to the stream reactor of the hop.
31
///
32
/// The stream reactor of the hop is launched lazily,
33
/// when the first [`CtrlMsg`] is sent via [`HopMgr::send`].
34
pub(crate) struct HopMgr<R: Runtime> {
35
    /// A handle to the runtime.
36
    runtime: R,
37
    /// Context used when spawning a stream reactor.
38
    ctx: StreamReactorContext,
39
    /// Sender for sending messages to BWD.
40
    ///
41
    /// The receiver is in BWD.
42
    ///
43
    /// A clone of this is passed to each spawned StreamReactor
44
    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
45
    /// The underlying senders, indexed by [`HopNum`].
46
    ///
47
    /// Relays have at most one stream reactor per circuit.
48
    /// Clients have at most one stream reactor per circuit hop.
49
    ///
50
    /// This is shared with the backward reactor.
51
    /// The backward reactor only ever *reads* from this
52
    /// (it never mutates the list).
53
    ///
54
    // TODO: the backward reactor only ever reads from this.
55
    // Conceptually, it is the HopMgr that owns this list,
56
    // because only HopMgr can add hops to the list.
57
    //
58
    // Perhaps we need a specialized abstraction that only allows reading here.
59
    // This could be a wrapper over RwLock, providing a read-only API for the BWD.
60
    hops: Arc<RwLock<CircHopList>>,
61
    /// Memory quota account
62
    memquota: CircuitAccount,
63
}
64

            
65
/// State needed to build a stream reactor.
66
///
67
/// Used when spawning the stream reactor of a hop.
68
struct StreamReactorContext {
69
    /// An identifier for logging about this reactor's circuit.
70
    unique_id: UniqId,
71
    /// The incoming stream handler.
72
    ///
73
    /// This is shared with every StreamReactor.
74
    #[cfg(any(feature = "hs-service", feature = "relay"))]
75
    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
76
    /// A handler for customizing the stream reactor behavior.
77
    handler: Arc<dyn StreamHandler>,
78
}
79

            
80
impl<R: Runtime> HopMgr<R> {
81
    /// Create a new [`HopMgr`] with an empty hop list,
82
    /// settings the incoming stream request handler to `incoming_handler`.
83
    ///
84
    /// Hops are added with [`HopMgr::add_hop`].
85
    #[cfg(feature = "relay")]
86
40
    pub(crate) fn new_with_incoming_handler<S: StreamHandler>(
87
40
        runtime: R,
88
40
        unique_id: UniqId,
89
40
        handler: S,
90
40
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
91
40
        incoming_handler: IncomingStreamRequestHandler,
92
40
        memquota: CircuitAccount,
93
40
    ) -> Self {
94
40
        Self::new_inner(
95
40
            runtime,
96
40
            unique_id,
97
40
            handler,
98
40
            bwd_tx,
99
40
            Some(incoming_handler),
100
40
            memquota,
101
        )
102
40
    }
103

            
104
    /// Create a new [`HopMgr`] with an empty hop list.
105
    ///
106
    /// Hops are added with [`HopMgr::add_hop`].
107
    #[expect(unused)] // TODO(dedup): clients will use this
108
    pub(crate) fn new<S: StreamHandler>(
109
        runtime: R,
110
        unique_id: UniqId,
111
        handler: S,
112
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
113
        memquota: CircuitAccount,
114
    ) -> Self {
115
        Self::new_inner(
116
            runtime,
117
            unique_id,
118
            handler,
119
            bwd_tx,
120
            #[cfg(any(feature = "hs-service", feature = "relay"))]
121
            None,
122
            memquota,
123
        )
124
    }
125

            
126
    /// Helper for the new*() functions.
127
40
    fn new_inner<S: StreamHandler>(
128
40
        runtime: R,
129
40
        unique_id: UniqId,
130
40
        handler: S,
131
40
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
132
40
        #[cfg(any(feature = "hs-service", feature = "relay"))] incoming_handler: Option<
133
40
            IncomingStreamRequestHandler,
134
40
        >,
135
40
        memquota: CircuitAccount,
136
40
    ) -> Self {
137
        // We don't spawn any stream reactors ahead of time.
138
        // Instead we spawn them lazily, when opening streams.
139
40
        let hops = Arc::new(RwLock::new(Default::default()));
140
40
        let ctx = StreamReactorContext {
141
40
            unique_id,
142
40
            #[cfg(any(feature = "hs-service", feature = "relay"))]
143
40
            incoming: Arc::new(Mutex::new(incoming_handler)),
144
40
            handler: Arc::new(handler),
145
40
        };
146

            
147
40
        Self {
148
40
            runtime,
149
40
            hops,
150
40
            ctx,
151
40
            bwd_tx,
152
40
            memquota,
153
40
        }
154
40
    }
155

            
156
    /// Return a reference to our hop list.
157
120
    pub(crate) fn hops(&self) -> &Arc<RwLock<CircHopList>> {
158
120
        &self.hops
159
120
    }
160

            
161
    /// Set the incoming stream handler for this reactor.
162
    ///
163
    /// There can only be one incoming stream handler per reactor,
164
    /// and each stream handler only pertains to a single hop (see expected_hop())
165
    //
166
    // TODO: eventually, we might want a different design here,
167
    // for example we might want to allow multiple stream handlers per reactor (one per hop).
168
    // However, for now, the implementation is intentionally kept similar to that
169
    // in the client reactor (to make it easier to migrate it to the new reactor design).
170
    //
171
    /// Returns an error if the hop manager already has a stream handler.
172
    ///
173
    /// Since the handler is shared with every hop's stream reactor,
174
    /// this function will update the handler for all of them.
175
    ///
176
    // TODO(DEDUP): almost identical to the client-side
177
    // CellHandlers::set_incoming_stream_req_handler()
178
    #[cfg(any(feature = "hs-service", feature = "relay"))]
179
    pub(crate) fn set_incoming_handler(&self, handler: IncomingStreamRequestHandler) -> Result<()> {
180
        let mut lock = self.ctx.incoming.lock().expect("poisoned lock");
181

            
182
        if lock.is_none() {
183
            *lock = Some(handler);
184
            Ok(())
185
        } else {
186
            Err(Error::from(internal!(
187
                "Tried to install a BEGIN cell handler before the old one was gone."
188
            )))
189
        }
190
    }
191

            
192
    /// Push a new hop to our hop list.
193
    ///
194
    /// Prepares a cc object for the hop, but does not spawn a stream reactor.
195
    ///
196
    /// Will return an error if the circuit already has [`u8::MAX`] hops.
197
40
    pub(crate) fn add_hop(&mut self, settings: HopSettings) -> Result<()> {
198
40
        let mut hops = self.hops.write().expect("poisoned lock");
199
40
        hops.add_hop(settings)
200
40
    }
201

            
202
    /// Send a message to the stream reactor of the specified `hop`,
203
    /// spawning it if necessary.
204
16
    pub(crate) async fn send(
205
16
        &mut self,
206
16
        hopnum: Option<HopNum>,
207
16
        msg: CtrlMsg,
208
16
    ) -> StdResult<(), ReactorError> {
209
16
        let mut tx = self.get_or_spawn_stream_reactor(hopnum)?;
210

            
211
16
        tx.send(msg).await.map_err(|_| {
212
            // The stream reactor has shut down
213
            ReactorError::Shutdown
214
        })
215
14
    }
216

            
217
    /// Tell the stream reactor of the specified `hop`
218
    /// to close the stream with the specified `stream_id`.
219
    #[cfg(any(feature = "hs-service", feature = "relay"))]
220
4
    pub(crate) async fn close_pending(
221
4
        &mut self,
222
4
        hopnum: Option<HopNum>,
223
4
        stream_id: StreamId,
224
4
        behav: CloseStreamBehavior,
225
4
    ) -> StdResult<(), Error> {
226
4
        let mut tx = self.get_or_spawn_stream_reactor(hopnum)?;
227

            
228
4
        let msg = CtrlMsg::ClosePendingStream { stream_id, behav };
229

            
230
4
        tx.send(msg).await.map_err(|_| {
231
            // The stream reactor has shut down
232
            Error::NotConnected
233
        })
234
4
    }
235

            
236
    /// Get a handle to the stream reactor, spawning it if necessary
237
20
    fn get_or_spawn_stream_reactor(
238
20
        &self,
239
20
        hopnum: Option<HopNum>,
240
20
    ) -> StdResult<mpsc::Sender<CtrlMsg>, Error> {
241
20
        let mut hops = self.hops.write().expect("poisoned lock");
242
20
        let hop = hops
243
20
            .get_mut(hopnum)
244
20
            .ok_or_else(|| internal!("tried to send cell to nonexistent hop?!"))?;
245

            
246
20
        let tx = match &hop.tx {
247
12
            Some(tx) => tx.clone(),
248
            None => {
249
                // If we don't have a handle to the stream reactor,
250
                // it means it hasn't been spawned yet, so we have to spawn it now.
251
8
                let tx =
252
8
                    self.spawn_stream_reactor(hopnum, &hop.settings, Arc::clone(&hop.ccontrol))?;
253

            
254
8
                hop.tx = Some(tx.clone());
255

            
256
                // Return a copy of this sender (can't borrow because the hop
257
                // is behind a Mutex, and we can't keep it locked across the send()
258
                // await point)
259
8
                tx
260
            }
261
        };
262

            
263
20
        Ok(tx)
264
20
    }
265

            
266
    /// Spawn a [`StreamReactor`] for the specified hop.
267
8
    fn spawn_stream_reactor(
268
8
        &self,
269
8
        hopnum: Option<HopNum>,
270
8
        settings: &HopSettings,
271
8
        ccontrol: Arc<Mutex<CongestionControl>>,
272
8
    ) -> StdResult<mpsc::Sender<CtrlMsg>, Error> {
273
        use tor_rtcompat::SpawnExt as _;
274

            
275
        // NOTE: not registering this channel with the memquota subsystem is okay,
276
        // because it has no buffering (if ever decide to make the size of this buffer
277
        // non-zero for whatever reason, we must remember to register it with memquota
278
        // so that it counts towards the total memory usage for the circuit.
279
        //
280
        // TODO(tuning): having zero buffering here is very likely suboptimal.
281
        // We should do *some* buffering here, and then figure out if we should it
282
        // up to memquota or not.
283
        #[allow(clippy::disallowed_methods)]
284
8
        let (fwd_stream_tx, fwd_stream_rx) = mpsc::channel(0);
285

            
286
8
        let flow_ctrl_params = Arc::new(settings.flow_ctrl_params.clone());
287
8
        let relay_format = settings.relay_crypt_protocol().relay_cell_format();
288
8
        let outbound = CircHopOutbound::new(ccontrol, relay_format, flow_ctrl_params, settings);
289

            
290
8
        let stream_reactor = StreamReactor::new(
291
8
            self.runtime.clone(),
292
8
            hopnum,
293
8
            outbound,
294
8
            self.ctx.unique_id,
295
8
            fwd_stream_rx,
296
8
            self.bwd_tx.clone(),
297
8
            Arc::clone(&self.ctx.handler),
298
            #[cfg(any(feature = "hs-service", feature = "relay"))]
299
8
            Arc::clone(&self.ctx.incoming),
300
8
            self.memquota.clone(),
301
        );
302

            
303
8
        self.runtime
304
8
            .spawn(async {
305
8
                let _ = stream_reactor.run().await;
306
8
            })
307
8
            .map_err(|e| Error::Spawn {
308
                spawning: "stream reactor",
309
                cause: e.into(),
310
            })?;
311

            
312
8
        Ok(fwd_stream_tx)
313
8
    }
314
}