1
//! A circuit's view of the forward state of the circuit.
2

            
3
use crate::circuit::UniqId;
4
use crate::circuit::reactor::backward::BackwardReactorCmd;
5
use crate::circuit::reactor::hop_mgr::HopMgr;
6
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
7
use crate::circuit::reactor::stream;
8
use crate::circuit::reactor::{ControlHandler, ReactorResultChannel};
9
use crate::congestion::sendme;
10
use crate::stream::cmdcheck::AnyCmdChecker;
11
use crate::stream::msg_streamid;
12
use crate::util::err::ReactorError;
13
use crate::{Error, HopNum, Result};
14

            
15
#[cfg(any(feature = "hs-service", feature = "relay"))]
16
use {
17
    crate::stream::CloseStreamBehavior,
18
    crate::stream::incoming::{
19
        IncomingStreamRequestFilter, IncomingStreamRequestHandler, StreamReqSender,
20
    },
21
    tor_cell::relaycell::StreamId,
22
};
23

            
24
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
25
use crate::client::circuit::padding::PaddingController;
26

            
27
use tor_cell::chancell::msg::AnyChanMsg;
28
use tor_cell::relaycell::msg::{Sendme, SendmeTag};
29
use tor_cell::relaycell::{
30
    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg,
31
};
32
use tor_error::internal;
33
use tor_linkspec::HasRelayIds;
34
use tor_rtcompat::Runtime;
35

            
36
use derive_deftly::Deftly;
37
use futures::SinkExt;
38
use futures::channel::mpsc;
39
use futures::{FutureExt as _, StreamExt, select_biased};
40
use tracing::debug;
41

            
42
use std::result::Result as StdResult;
43

            
44
use crate::circuit::CircuitRxReceiver;
45

            
46
/// The forward circuit reactor.
47
///
48
/// See the [`reactor`](crate::circuit::reactor) module-level docs.
49
///
50
/// Shuts downs down if an error occurs, or if either the [`Reactor`](super::Reactor)
51
/// or the [`BackwardReactor`](super::BackwardReactor) shuts down:
52
///
53
///   * if the `Reactor` shuts down, we are alerted via the ctrl/command mpsc channels
54
///     (their sending ends will close, which causes run_once() to return ReactorError::Shutdown)
55
///   * if `BackwardReactor` shuts down, the `Reactor` will notice and will itself shut down,
56
///     which, in turn, causes the `ForwardReactor` to shut down as described above
57
#[derive(Deftly)]
58
#[derive_deftly(CircuitReactor)]
59
#[deftly(reactor_name = "forward reactor")]
60
#[deftly(run_inner_fn = "Self::run_once")]
61
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
62
pub(super) struct ForwardReactor<R: Runtime, F: ForwardHandler> {
63
    /// A handle to the runtime.
64
    runtime: R,
65
    /// An identifier for logging about this reactor's circuit.
66
    unique_id: UniqId,
67
    /// Implementation-dependent part of the reactor.
68
    ///
69
    /// This enables us to customize the behavior of the reactor,
70
    /// depending on whether we are a client or a relay.
71
    inner: F,
72
    /// Channel for receiving control commands.
73
    command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
74
    /// Channel for receiving control messages.
75
    control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
76
    /// The reading end of the inbound Tor channel.
77
    ///
78
    /// Yields cells moving from the client towards the exit, if we are a relay,
79
    /// or cells moving towards *us*, if we are a client.
80
    inbound_chan_rx: CircuitRxReceiver,
81
    /// Sender for sending commands to the BackwardReactor.
82
    ///
83
    /// Used for sending:
84
    ///
85
    ///    * circuit-level SENDMEs received from the other endpoint
86
    ///      (`[BackwardReactorCmd::HandleSendme]`)
87
    ///    * circuit-level SENDMEs that need to be delivered to the other endpoint
88
    ///      (using `[BackwardReactorCmd::SendRelayMsg]`)
89
    ///
90
    /// The receiver is in [`BackwardReactor`](super::BackwardReactor), which is responsible for
91
    /// sending cell over the inbound channel.
92
    backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
93
    /// Hop manager, storing per-hop state, and handles to the stream reactors.
94
    ///
95
    /// Contains the `CircHopList`.
96
    hop_mgr: HopMgr<R>,
97
    /// An implementation-specific event stream.
98
    ///
99
    /// Polled from the main loop of the reactor.
100
    /// Each event is passed to [`ForwardHandler::handle_event`].
101
    circ_events: mpsc::Receiver<F::CircEvent>,
102
    /// A padding controller to which padding-related events should be reported.
103
    padding_ctrl: PaddingController,
104
}
105

            
106
/// A control command aimed at the generic forward reactor.
107
pub(crate) enum CtrlCmd<C> {
108
    /// Begin accepting streams on this circuit.
109
    //
110
    // TODO(DEDUP): this is very similar to its client-side counterpart,
111
    // except the hop is a Option<HopNum> instead of a TargetHop.
112
    #[cfg(any(feature = "hs-service", feature = "relay"))]
113
    #[expect(unused)] // TODO(dedup): this will be used by hs services
114
    AwaitStreamRequests {
115
        /// A channel for sending information about an incoming stream request.
116
        incoming_sender: StreamReqSender,
117
        /// A `CmdChecker` to keep track of which message types are acceptable.
118
        cmd_checker: AnyCmdChecker,
119
        /// Oneshot channel to notify on completion.
120
        done: ReactorResultChannel<()>,
121
        /// The hop that is allowed to create streams.
122
        ///
123
        /// Set to None if we are a relay wanting to accept stream requests.
124
        hop: Option<HopNum>,
125
        /// A filter used to check requests before passing them on.
126
        filter: Box<dyn IncomingStreamRequestFilter>,
127
    },
128

            
129
    /// Close the specified pending incoming stream, sending the provided END message.
130
    ///
131
    /// A stream is said to be pending if the message for initiating the stream was received but
132
    /// not has not been responded to yet.
133
    ///
134
    /// This should be used by responders for closing pending incoming streams initiated by the
135
    /// other party on the circuit.
136
    ///
137
    /// TODO(dedup): this is almost identical to the ClosePendingStream control message
138
    /// from the client-side. We can get rid of the duplication by rewriting
139
    /// the client circuit reactor to use the new multi-reactor architecture
140
    #[cfg(any(feature = "hs-service", feature = "relay"))]
141
    ClosePendingStream {
142
        /// The hop number the stream is on.
143
        ///
144
        /// Set to None if we are a relay.
145
        hop: Option<HopNum>,
146
        /// The stream ID to send the END for.
147
        stream_id: StreamId,
148
        /// The END message to send, if any.
149
        message: CloseStreamBehavior,
150
        /// Oneshot channel to notify on completion.
151
        done: ReactorResultChannel<()>,
152
    },
153
    /// An implementation-dependent control command.
154
    #[allow(unused)] // TODO(relay)
155
    Custom(C),
156
}
157

            
158
/// A control message aimed at the generic forward reactor.
159
pub(crate) enum CtrlMsg<M> {
160
    /// An implementation-dependent control message.
161
    #[allow(unused)] // TODO(relay)
162
    Custom(M),
163
}
164

            
165
/// Trait for customizing the behavior of the forward reactor.
166
///
167
/// Used for plugging in the implementation-dependent (client vs relay)
168
/// parts of the implementation into the generic one.
169
pub(crate) trait ForwardHandler: ControlHandler {
170
    /// Type that explains how to build an outgoing channel.
171
    type BuildSpec: HasRelayIds;
172

            
173
    /// The subclass of ChanMsg that can arrive on this type of circuit.
174
    type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error>;
175

            
176
    /// An opaque event type.
177
    ///
178
    /// The [`ForwardReactor`] polls an MPSC stream yielding `CircEvent`s from the main loop.
179
    /// Each event is passed to [`Self::handle_event`] for handling.
180
    type CircEvent;
181

            
182
    /// Handle a non-SENDME RELAY message on this circuit with stream ID 0.
183
    async fn handle_meta_msg<R: Runtime>(
184
        &mut self,
185
        runtime: &R,
186
        early: bool,
187
        hopnum: Option<HopNum>,
188
        msg: UnparsedRelayMsg,
189
        relay_cell_format: RelayCellFormat,
190
    ) -> StdResult<(), ReactorError>;
191

            
192
    /// Handle a forward (TODO terminology) cell.
193
    ///
194
    /// The cell is
195
    ///   - moving from the client towards the exit, if we're a relay
196
    ///   - moving from the guard towards us, if we're a client
197
    ///
198
    /// Returns an error if the cell should cause the reactor to shut down,
199
    /// or a [`ForwardCellDisposition`] specifying how it should be handled.
200
    ///
201
    /// Returns `None` if the cell was handled internally by this handler.
202
    async fn handle_forward_cell<R: Runtime>(
203
        &mut self,
204
        hop_mgr: &mut HopMgr<R>,
205
        cell: Self::CircChanMsg,
206
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError>;
207

            
208
    /// Handle an implementation-specific circuit event.
209
    ///
210
    /// Returns a command for the backward reactor.
211
    fn handle_event(
212
        &mut self,
213
        event: Self::CircEvent,
214
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError>;
215

            
216
    /// Wait until the outbound channel, if there is one, is ready to accept more cells.
217
    ///
218
    /// Resolves immediately if there is no outbound channel.
219
    /// Blocks if there is a pending outbound channel.
220
    async fn outbound_chan_ready(&mut self) -> Result<()>;
221
}
222

            
223
/// What action to take in response to a cell arriving on our inbound Tor channel.
224
pub(crate) enum ForwardCellDisposition {
225
    /// Handle a decoded RELAY or RELAY_EARLY cell in the [`ForwardReactor`].
226
    HandleRecognizedRelay {
227
        /// The decoded cell.
228
        cell: RelayCellDecoderResult,
229
        /// Whether this was a RELAY_EARLY.
230
        early: bool,
231
        /// The hop this cell was for.
232
        hopnum: Option<HopNum>,
233
        /// The SENDME tag.
234
        tag: SendmeTag,
235
    },
236
}
237

            
238
impl<R: Runtime, F: ForwardHandler> ForwardReactor<R, F> {
239
    /// Create a new [`ForwardReactor`].
240
    #[allow(clippy::too_many_arguments)] // TODO
241
40
    pub(super) fn new(
242
40
        runtime: R,
243
40
        unique_id: UniqId,
244
40
        inner: F,
245
40
        hop_mgr: HopMgr<R>,
246
40
        inbound_chan_rx: CircuitRxReceiver,
247
40
        control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
248
40
        command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
249
40
        backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
250
40
        circ_events: mpsc::Receiver<F::CircEvent>,
251
40
        padding_ctrl: PaddingController,
252
40
    ) -> Self {
253
40
        Self {
254
40
            runtime,
255
40
            unique_id,
256
40
            inbound_chan_rx,
257
40
            control_rx,
258
40
            command_rx,
259
40
            inner,
260
40
            backward_reactor_tx,
261
40
            hop_mgr,
262
40
            circ_events,
263
40
            padding_ctrl,
264
40
        }
265
40
    }
266

            
267
    /// Helper for [`run`](Self::run).
268
82
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
269
82
        let outbound_chan_ready = self.inner.outbound_chan_ready();
270

            
271
82
        let inbound_chan_rx_fut = async {
272
            // Avoid reading from the inbound_chan_rx Tor Channel if the outgoing sink is blocked
273
82
            outbound_chan_ready.await?;
274
82
            Ok(self.inbound_chan_rx.next().await)
275
56
        };
276

            
277
82
        select_biased! {
278
82
            res = self.command_rx.next().fuse() => {
279
4
                let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
280
4
                self.handle_cmd(cmd).await
281
            }
282
82
            res = self.control_rx.next().fuse() => {
283
                let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
284
                self.handle_msg(msg)
285
            }
286
82
            res = self.circ_events.next().fuse() => {
287
8
                let ev = res.ok_or_else(|| ReactorError::Shutdown)?;
288
8
                if let Some(cmd) = self.inner.handle_event(ev)? {
289
8
                    self.send_reactor_cmd(cmd).await?;
290
                }
291

            
292
8
                Ok(())
293
            }
294
82
            res = inbound_chan_rx_fut.fuse() => {
295
56
                let cell = res.map_err(ReactorError::Err)?;
296
56
                let Some(cell) = cell else {
297
                    debug!(
298
                        circ_id = %self.unique_id,
299
                        "Backward channel has closed, shutting down forward relay reactor",
300
                    );
301

            
302
                    return Err(ReactorError::Shutdown);
303
                };
304

            
305
56
                let cell: F::CircChanMsg = cell.try_into()?;
306
56
                let Some(disp) = self.inner.handle_forward_cell(&mut self.hop_mgr, cell).await? else {
307
8
                    return Ok(());
308
                };
309

            
310
40
                match disp {
311
40
                    ForwardCellDisposition::HandleRecognizedRelay { cell, early, hopnum, tag } => {
312
40
                        self.handle_relay_cell(cell, early, hopnum, tag).await
313
                    }
314
                }
315
            },
316
        }
317
66
    }
318

            
319
    /// Handle a control command.
320
    #[allow(clippy::unused_async)] // used if any(feature = "hs-service", feature = "relay")
321
4
    async fn handle_cmd(&mut self, cmd: CtrlCmd<F::CtrlCmd>) -> StdResult<(), ReactorError> {
322
4
        match cmd {
323
            #[cfg(any(feature = "hs-service", feature = "relay"))]
324
            CtrlCmd::AwaitStreamRequests {
325
                incoming_sender,
326
                cmd_checker,
327
                done,
328
                hop,
329
                filter,
330
            } => {
331
                let handler = IncomingStreamRequestHandler {
332
                    incoming_sender,
333
                    cmd_checker,
334
                    hop_num: hop,
335
                    filter,
336
                };
337

            
338
                // Update the HopMgr with the
339
                let ret = self.hop_mgr.set_incoming_handler(handler);
340
                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
341
                Ok(())
342
            }
343
            #[cfg(any(feature = "hs-service", feature = "relay"))]
344
            CtrlCmd::ClosePendingStream {
345
4
                hop,
346
4
                stream_id,
347
4
                message,
348
4
                done,
349
            } => {
350
4
                let ret = self.hop_mgr.close_pending(hop, stream_id, message).await;
351
4
                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
352

            
353
4
                Ok(())
354
            }
355
            CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
356
        }
357
4
    }
358

            
359
    /// Handle a control message.
360
    fn handle_msg(&mut self, msg: CtrlMsg<F::CtrlMsg>) -> StdResult<(), ReactorError> {
361
        match msg {
362
            CtrlMsg::Custom(c) => self.inner.handle_msg(c),
363
        }
364
    }
365

            
366
    /// Note that we have received a RELAY cell.
367
    ///
368
    /// Updates the padding and CC state.
369
40
    fn note_relay_cell_received(
370
40
        &self,
371
40
        hopnum: Option<HopNum>,
372
40
        c_t_w: bool,
373
40
    ) -> Result<(RelayCellFormat, bool)> {
374
40
        let mut hops = self.hop_mgr.hops().write().expect("poisoned lock");
375
40
        let hop = hops
376
40
            .get_mut(hopnum)
377
40
            .ok_or_else(|| internal!("msg from non-existent hop???"))?;
378

            
379
        // Check whether we are allowed to receive more data for this circuit hop.
380
40
        hop.inbound.decrement_cell_limit()?;
381

            
382
        // Decrement the circuit sendme windows, and see if we need to
383
        // send a sendme cell.
384
40
        let send_circ_sendme = if c_t_w {
385
8
            hop.ccontrol
386
8
                .lock()
387
8
                .expect("poisoned lock")
388
8
                .note_data_received()?
389
        } else {
390
32
            false
391
        };
392

            
393
40
        let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
394

            
395
40
        Ok((relay_cell_format, send_circ_sendme))
396
40
    }
397

            
398
    /// Handle a RELAY cell.
399
    ///
400
    // TODO(DEDUP): very similar to Client::handle_relay_cell()
401
40
    async fn handle_relay_cell(
402
40
        &mut self,
403
40
        decode_res: RelayCellDecoderResult,
404
40
        early: bool,
405
40
        hopnum: Option<HopNum>,
406
40
        tag: SendmeTag,
407
40
    ) -> StdResult<(), ReactorError> {
408
        // For padding purposes, if we are a relay, we set the hopnum to 0
409
        // TODO(relay): is this right?
410
40
        let hopnum_padding = hopnum.unwrap_or_else(|| HopNum::from(0));
411
40
        if decode_res.is_padding() {
412
            self.padding_ctrl.decrypted_padding(hopnum_padding)?;
413
40
        } else {
414
40
            self.padding_ctrl.decrypted_data(hopnum_padding);
415
40
        }
416

            
417
40
        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
418
40
        let (relay_cell_format, send_circ_sendme) = self.note_relay_cell_received(hopnum, c_t_w)?;
419

            
420
        // If we do need to send a circuit-level SENDME cell, do so.
421
40
        if send_circ_sendme {
422
            // This always sends a V1 (tagged) sendme cell, and thereby assumes
423
            // that SendmeEmitMinVersion is no more than 1.  If the authorities
424
            // every increase that parameter to a higher number, this will
425
            // become incorrect.  (Higher numbers are not currently defined.)
426
            let sendme = Sendme::from(tag);
427
            let msg = AnyRelayMsgOuter::new(None, sendme.into());
428
            let forward = BackwardReactorCmd::SendRelayMsg { hop: hopnum, msg };
429

            
430
            // NOTE: sending the SENDME to the backward reactor for handling
431
            // might seem counterintuitive, given that we have access to
432
            // the congestion control object right here (via hop_mgr).
433
            //
434
            // However, the forward reactor does not have access to the
435
            // outbound_chan_tx part of the inbound (towards the client) Tor channel,
436
            // and so it cannot handle the SENDME on its own
437
            // (because it cannot obtain the congestion signals),
438
            // so the SENDME needs to be handled in the backward reactor.
439
            //
440
            // NOTE: this will block if the backward reactor is not ready
441
            // to send any more cells.
442
            self.send_reactor_cmd(forward).await?;
443
40
        }
444

            
445
40
        let (mut msgs, incomplete) = decode_res.into_parts();
446
62
        while let Some(msg) = msgs.next() {
447
40
            match self
448
40
                .handle_relay_msg(early, hopnum, msg, relay_cell_format, c_t_w)
449
40
                .await
450
            {
451
22
                Ok(()) => continue,
452
16
                Err(e) => {
453
16
                    for m in msgs {
454
                        debug!(
455
                            circ_id = %self.unique_id,
456
                            "Ignoring relay msg received after triggering shutdown: {m:?}",
457
                        );
458
                    }
459
16
                    if let Some(incomplete) = incomplete {
460
                        debug!(
461
                            circ_id = %self.unique_id,
462
                            "Ignoring partial relay msg received after triggering shutdown: {:?}",
463
                            incomplete,
464
                        );
465
16
                    }
466

            
467
16
                    return Err(e);
468
                }
469
            }
470
        }
471

            
472
22
        Ok(())
473
38
    }
474

            
475
    /// Handle a single incoming RELAY message.
476
40
    async fn handle_relay_msg(
477
40
        &mut self,
478
40
        early: bool,
479
40
        hop: Option<HopNum>,
480
40
        msg: UnparsedRelayMsg,
481
40
        relay_cell_format: RelayCellFormat,
482
40
        cell_counts_toward_windows: bool,
483
40
    ) -> StdResult<(), ReactorError> {
484
        // If this msg wants/refuses to have a Stream ID, does it
485
        // have/not have one?
486
40
        let streamid = msg_streamid(&msg)?;
487

            
488
        // If this doesn't have a StreamId, it's a meta cell,
489
        // not meant for a particular stream.
490
36
        let Some(sid) = streamid else {
491
20
            return self
492
20
                .handle_meta_msg(early, hop, msg, relay_cell_format)
493
20
                .await;
494
        };
495

            
496
16
        let msg = stream::CtrlMsg::DeliverStreamMsg {
497
16
            sid,
498
16
            msg,
499
16
            cell_counts_toward_windows,
500
16
        };
501

            
502
        // All messages on streams are handled in the stream reactor
503
        // (because that's where the stream map is)
504
        //
505
        // Internally, this will spawn a StreamReactor for the target hop,
506
        // if not already spawned.
507
16
        self.hop_mgr.send(hop, msg).await
508
38
    }
509

            
510
    /// Handle a RELAY or RELAY_EARLY message on this circuit with stream ID 0.
511
20
    async fn handle_meta_msg(
512
20
        &mut self,
513
20
        early: bool,
514
20
        hopnum: Option<HopNum>,
515
20
        msg: UnparsedRelayMsg,
516
20
        relay_cell_format: RelayCellFormat,
517
20
    ) -> StdResult<(), ReactorError> {
518
20
        match msg.cmd() {
519
            RelayCmd::SENDME => {
520
                let sendme = msg
521
                    .decode::<Sendme>()
522
                    .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
523
                    .into_msg();
524

            
525
                let cmd = BackwardReactorCmd::HandleSendme {
526
                    hop: hopnum,
527
                    sendme,
528
                };
529

            
530
                self.send_reactor_cmd(cmd).await
531
            }
532
            _ => {
533
20
                self.inner
534
20
                    .handle_meta_msg(&self.runtime, early, hopnum, msg, relay_cell_format)
535
20
                    .await
536
            }
537
        }
538
20
    }
539

            
540
    /// Send a command to the backward reactor.
541
    ///
542
    /// Blocks if the `backward_reactor_tx` channel is full, i.e. if the backward reactor
543
    /// is not ready to send any more cells.
544
    ///
545
    /// Returns an error if the backward reactor has shut down.
546
8
    async fn send_reactor_cmd(
547
8
        &mut self,
548
8
        forward: BackwardReactorCmd,
549
8
    ) -> StdResult<(), ReactorError> {
550
8
        self.backward_reactor_tx.send(forward).await.map_err(|_| {
551
            // The other reactor has shut down
552
            ReactorError::Shutdown
553
        })
554
8
    }
555
}