1
//! A circuit's view of the forward state of the circuit.
2

            
3
use crate::circuit::UniqId;
4
use crate::circuit::reactor::backward::BackwardReactorCmd;
5
use crate::circuit::reactor::hop_mgr::HopMgr;
6
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
7
use crate::circuit::reactor::stream::StreamMsg;
8
use crate::circuit::reactor::{ControlHandler, ReactorResultChannel};
9
use crate::congestion::sendme;
10
use crate::stream::cmdcheck::AnyCmdChecker;
11
use crate::stream::msg_streamid;
12
use crate::util::err::ReactorError;
13
use crate::{Error, HopNum, Result};
14

            
15
#[cfg(any(feature = "hs-service", feature = "relay"))]
16
use crate::stream::incoming::{
17
    IncomingStreamRequestFilter, IncomingStreamRequestHandler, StreamReqSender,
18
};
19

            
20
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
21
use crate::client::circuit::padding::PaddingController;
22

            
23
use tor_cell::chancell::msg::AnyChanMsg;
24
use tor_cell::relaycell::msg::{Sendme, SendmeTag};
25
use tor_cell::relaycell::{
26
    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg,
27
};
28
use tor_error::internal;
29
use tor_linkspec::HasRelayIds;
30
use tor_rtcompat::Runtime;
31

            
32
use derive_deftly::Deftly;
33
use futures::SinkExt;
34
use futures::channel::mpsc;
35
use futures::{FutureExt as _, StreamExt, select_biased};
36
use tracing::debug;
37

            
38
use std::result::Result as StdResult;
39

            
40
use crate::circuit::CircuitRxReceiver;
41

            
42
/// The forward circuit reactor.
43
///
44
/// See the [`reactor`](crate::circuit::reactor) module-level docs.
45
///
46
/// Shuts downs down if an error occurs, or if either the [`Reactor`](super::Reactor)
47
/// or the [`BackwardReactor`](super::BackwardReactor) shuts down:
48
///
49
///   * if the `Reactor` shuts down, we are alerted via the ctrl/command mpsc channels
50
///     (their sending ends will close, which causes run_once() to return ReactorError::Shutdown)
51
///   * if `BackwardReactor` shuts down, the `Reactor` will notice and will itself shut down,
52
///     which, in turn, causes the `ForwardReactor` to shut down as described above
53
#[derive(Deftly)]
54
#[derive_deftly(CircuitReactor)]
55
#[deftly(reactor_name = "forward reactor")]
56
#[deftly(run_inner_fn = "Self::run_once")]
57
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
58
pub(super) struct ForwardReactor<R: Runtime, F: ForwardHandler> {
59
    /// A handle to the runtime.
60
    runtime: R,
61
    /// An identifier for logging about this reactor's circuit.
62
    unique_id: UniqId,
63
    /// Implementation-dependent part of the reactor.
64
    ///
65
    /// This enables us to customize the behavior of the reactor,
66
    /// depending on whether we are a client or a relay.
67
    inner: F,
68
    /// Channel for receiving control commands.
69
    command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
70
    /// Channel for receiving control messages.
71
    control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
72
    /// The reading end of the inbound Tor channel.
73
    ///
74
    /// Yields cells moving from the client towards the exit, if we are a relay,
75
    /// or cells moving towards *us*, if we are a client.
76
    inbound_chan_rx: CircuitRxReceiver,
77
    /// Sender for sending commands to the BackwardReactor.
78
    ///
79
    /// Used for sending:
80
    ///
81
    ///    * circuit-level SENDMEs received from the other endpoint
82
    ///      (`[BackwardReactorCmd::HandleSendme]`)
83
    ///    * circuit-level SENDMEs that need to be delivered to the other endpoint
84
    ///      (using `[BackwardReactorCmd::SendRelayMsg]`)
85
    ///
86
    /// The receiver is in [`BackwardReactor`](super::BackwardReactor), which is responsible for
87
    /// sending cell over the inbound channel.
88
    backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
89
    /// Hop manager, storing per-hop state, and handles to the stream reactors.
90
    ///
91
    /// Contains the `CircHopList`.
92
    hop_mgr: HopMgr<R>,
93
    /// An implementation-specific event stream.
94
    ///
95
    /// Polled from the main loop of the reactor.
96
    /// Each event is passed to [`ForwardHandler::handle_event`].
97
    circ_events: mpsc::Receiver<F::CircEvent>,
98
    /// A padding controller to which padding-related events should be reported.
99
    padding_ctrl: PaddingController,
100
}
101

            
102
/// A control command aimed at the generic forward reactor.
103
pub(crate) enum CtrlCmd<C> {
104
    /// Begin accepting streams on this circuit.
105
    //
106
    // TODO(DEDUP): this is very similar to its client-side counterpart,
107
    // except the hop is a Option<HopNum> instead of a TargetHop.
108
    #[cfg(any(feature = "hs-service", feature = "relay"))]
109
    AwaitStreamRequests {
110
        /// A channel for sending information about an incoming stream request.
111
        incoming_sender: StreamReqSender,
112
        /// A `CmdChecker` to keep track of which message types are acceptable.
113
        cmd_checker: AnyCmdChecker,
114
        /// Oneshot channel to notify on completion.
115
        done: ReactorResultChannel<()>,
116
        /// The hop that is allowed to create streams.
117
        ///
118
        /// Set to None if we are a relay wanting to accept stream requests.
119
        hop: Option<HopNum>,
120
        /// A filter used to check requests before passing them on.
121
        filter: Box<dyn IncomingStreamRequestFilter>,
122
    },
123
    /// An implementation-dependent control command.
124
    #[allow(unused)] // TODO(relay)
125
    Custom(C),
126
}
127

            
128
/// A control message aimed at the generic forward reactor.
129
pub(crate) enum CtrlMsg<M> {
130
    /// An implementation-dependent control message.
131
    #[allow(unused)] // TODO(relay)
132
    Custom(M),
133
}
134

            
135
/// Trait for customizing the behavior of the forward reactor.
136
///
137
/// Used for plugging in the implementation-dependent (client vs relay)
138
/// parts of the implementation into the generic one.
139
pub(crate) trait ForwardHandler: ControlHandler {
140
    /// Type that explains how to build an outgoing channel.
141
    type BuildSpec: HasRelayIds;
142

            
143
    /// The subclass of ChanMsg that can arrive on this type of circuit.
144
    type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error>;
145

            
146
    /// An opaque event type.
147
    ///
148
    /// The [`ForwardReactor`] polls an MPSC stream yielding `CircEvent`s from the main loop.
149
    /// Each event is passed to [`Self::handle_event`] for handling.
150
    type CircEvent;
151

            
152
    /// Handle a non-SENDME RELAY message on this circuit with stream ID 0.
153
    async fn handle_meta_msg<R: Runtime>(
154
        &mut self,
155
        runtime: &R,
156
        early: bool,
157
        hopnum: Option<HopNum>,
158
        msg: UnparsedRelayMsg,
159
        relay_cell_format: RelayCellFormat,
160
    ) -> StdResult<(), ReactorError>;
161

            
162
    /// Handle a forward (TODO terminology) cell.
163
    ///
164
    /// The cell is
165
    ///   - moving from the client towards the exit, if we're a relay
166
    ///   - moving from the guard towards us, if we're a client
167
    ///
168
    /// Returns an error if the cell should cause the reactor to shut down,
169
    /// or a [`ForwardCellDisposition`] specifying how it should be handled.
170
    ///
171
    /// Returns `None` if the cell was handled internally by this handler.
172
    async fn handle_forward_cell<R: Runtime>(
173
        &mut self,
174
        hop_mgr: &mut HopMgr<R>,
175
        cell: Self::CircChanMsg,
176
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError>;
177

            
178
    /// Handle an implementation-specific circuit event.
179
    ///
180
    /// Returns a command for the backward reactor.
181
    fn handle_event(
182
        &mut self,
183
        event: Self::CircEvent,
184
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError>;
185

            
186
    /// Wait until the outbound channel, if there is one, is ready to accept more cells.
187
    ///
188
    /// Resolves immediately if there is no outbound channel.
189
    /// Blocks if there is a pending outbound channel.
190
    async fn outbound_chan_ready(&mut self) -> Result<()>;
191
}
192

            
193
/// What action to take in response to a cell arriving on our inbound Tor channel.
194
pub(crate) enum ForwardCellDisposition {
195
    /// Handle a decoded RELAY or RELAY_EARLY cell in the [`ForwardReactor`].
196
    HandleRecognizedRelay {
197
        /// The decoded cell.
198
        cell: RelayCellDecoderResult,
199
        /// Whether this was a RELAY_EARLY.
200
        early: bool,
201
        /// The hop this cell was for.
202
        hopnum: Option<HopNum>,
203
        /// The SENDME tag.
204
        tag: SendmeTag,
205
    },
206
}
207

            
208
impl<R: Runtime, F: ForwardHandler> ForwardReactor<R, F> {
209
    /// Create a new [`ForwardReactor`].
210
    #[allow(clippy::too_many_arguments)] // TODO
211
16
    pub(super) fn new(
212
16
        runtime: R,
213
16
        unique_id: UniqId,
214
16
        inner: F,
215
16
        hop_mgr: HopMgr<R>,
216
16
        inbound_chan_rx: CircuitRxReceiver,
217
16
        control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
218
16
        command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
219
16
        backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
220
16
        circ_events: mpsc::Receiver<F::CircEvent>,
221
16
        padding_ctrl: PaddingController,
222
16
    ) -> Self {
223
16
        Self {
224
16
            runtime,
225
16
            unique_id,
226
16
            inbound_chan_rx,
227
16
            control_rx,
228
16
            command_rx,
229
16
            inner,
230
16
            backward_reactor_tx,
231
16
            hop_mgr,
232
16
            circ_events,
233
16
            padding_ctrl,
234
16
        }
235
16
    }
236

            
237
    /// Helper for [`run`](Self::run).
238
36
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
239
36
        let outbound_chan_ready = self.inner.outbound_chan_ready();
240

            
241
36
        let inbound_chan_rx_fut = async {
242
            // Avoid reading from the inbound_chan_rx Tor Channel if the outgoing sink is blocked
243
36
            outbound_chan_ready.await?;
244
36
            Ok(self.inbound_chan_rx.next().await)
245
24
        };
246

            
247
36
        select_biased! {
248
36
            res = self.command_rx.next().fuse() => {
249
4
                let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
250
4
                self.handle_cmd(cmd)
251
            }
252
36
            res = self.control_rx.next().fuse() => {
253
                let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
254
                self.handle_msg(msg)
255
            }
256
36
            res = self.circ_events.next().fuse() => {
257
4
                let ev = res.ok_or_else(|| ReactorError::Shutdown)?;
258
4
                if let Some(cmd) = self.inner.handle_event(ev)? {
259
4
                    self.send_reactor_cmd(cmd).await?;
260
                }
261

            
262
4
                Ok(())
263
            }
264
36
            res = inbound_chan_rx_fut.fuse() => {
265
24
                let cell = res.map_err(ReactorError::Err)?;
266
24
                let Some(cell) = cell else {
267
                    debug!(
268
                        circ_id = %self.unique_id,
269
                        "Backward channel has closed, shutting down forward relay reactor",
270
                    );
271

            
272
                    return Err(ReactorError::Shutdown);
273
                };
274

            
275
24
                let cell: F::CircChanMsg = cell.try_into()?;
276
24
                let Some(disp) = self.inner.handle_forward_cell(&mut self.hop_mgr, cell).await? else {
277
8
                    return Ok(());
278
                };
279

            
280
12
                match disp {
281
12
                    ForwardCellDisposition::HandleRecognizedRelay { cell, early, hopnum, tag } => {
282
12
                        self.handle_relay_cell(cell, early, hopnum, tag).await
283
                    }
284
                }
285
            },
286
        }
287
32
    }
288

            
289
    /// Handle a control command.
290
4
    fn handle_cmd(&mut self, cmd: CtrlCmd<F::CtrlCmd>) -> StdResult<(), ReactorError> {
291
4
        match cmd {
292
            #[cfg(any(feature = "hs-service", feature = "relay"))]
293
            CtrlCmd::AwaitStreamRequests {
294
4
                incoming_sender,
295
4
                cmd_checker,
296
4
                done,
297
4
                hop,
298
4
                filter,
299
            } => {
300
4
                let handler = IncomingStreamRequestHandler {
301
4
                    incoming_sender,
302
4
                    cmd_checker,
303
4
                    hop_num: hop,
304
4
                    filter,
305
4
                };
306

            
307
                // Update the HopMgr with the
308
4
                let ret = self.hop_mgr.set_incoming_handler(handler);
309
4
                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
310
4
                Ok(())
311
            }
312
            CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
313
        }
314
4
    }
315

            
316
    /// Handle a control message.
317
    fn handle_msg(&mut self, msg: CtrlMsg<F::CtrlMsg>) -> StdResult<(), ReactorError> {
318
        match msg {
319
            CtrlMsg::Custom(c) => self.inner.handle_msg(c),
320
        }
321
    }
322

            
323
    /// Note that we have received a RELAY cell.
324
    ///
325
    /// Updates the padding and CC state.
326
12
    fn note_relay_cell_received(
327
12
        &self,
328
12
        hopnum: Option<HopNum>,
329
12
        c_t_w: bool,
330
12
    ) -> Result<(RelayCellFormat, bool)> {
331
12
        let mut hops = self.hop_mgr.hops().write().expect("poisoned lock");
332
12
        let hop = hops
333
12
            .get_mut(hopnum)
334
12
            .ok_or_else(|| internal!("msg from non-existent hop???"))?;
335

            
336
        // Check whether we are allowed to receive more data for this circuit hop.
337
12
        hop.inbound.decrement_cell_limit()?;
338

            
339
        // Decrement the circuit sendme windows, and see if we need to
340
        // send a sendme cell.
341
12
        let send_circ_sendme = if c_t_w {
342
            hop.ccontrol
343
                .lock()
344
                .expect("poisoned lock")
345
                .note_data_received()?
346
        } else {
347
12
            false
348
        };
349

            
350
12
        let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
351

            
352
12
        Ok((relay_cell_format, send_circ_sendme))
353
12
    }
354

            
355
    /// Handle a RELAY cell.
356
    ///
357
    // TODO(DEDUP): very similar to Client::handle_relay_cell()
358
12
    async fn handle_relay_cell(
359
12
        &mut self,
360
12
        decode_res: RelayCellDecoderResult,
361
12
        early: bool,
362
12
        hopnum: Option<HopNum>,
363
12
        tag: SendmeTag,
364
12
    ) -> StdResult<(), ReactorError> {
365
        // For padding purposes, if we are a relay, we set the hopnum to 0
366
        // TODO(relay): is this right?
367
12
        let hopnum_padding = hopnum.unwrap_or_else(|| HopNum::from(0));
368
12
        if decode_res.is_padding() {
369
            self.padding_ctrl.decrypted_padding(hopnum_padding)?;
370
12
        } else {
371
12
            self.padding_ctrl.decrypted_data(hopnum_padding);
372
12
        }
373

            
374
12
        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
375
12
        let (relay_cell_format, send_circ_sendme) = self.note_relay_cell_received(hopnum, c_t_w)?;
376

            
377
        // If we do need to send a circuit-level SENDME cell, do so.
378
12
        if send_circ_sendme {
379
            // This always sends a V1 (tagged) sendme cell, and thereby assumes
380
            // that SendmeEmitMinVersion is no more than 1.  If the authorities
381
            // every increase that parameter to a higher number, this will
382
            // become incorrect.  (Higher numbers are not currently defined.)
383
            let sendme = Sendme::from(tag);
384
            let msg = AnyRelayMsgOuter::new(None, sendme.into());
385
            let forward = BackwardReactorCmd::SendRelayMsg { hop: hopnum, msg };
386

            
387
            // NOTE: sending the SENDME to the backward reactor for handling
388
            // might seem counterintuitive, given that we have access to
389
            // the congestion control object right here (via hop_mgr).
390
            //
391
            // However, the forward reactor does not have access to the
392
            // outbound_chan_tx part of the inbound (towards the client) Tor channel,
393
            // and so it cannot handle the SENDME on its own
394
            // (because it cannot obtain the congestion signals),
395
            // so the SENDME needs to be handled in the backward reactor.
396
            //
397
            // NOTE: this will block if the backward reactor is not ready
398
            // to send any more cells.
399
            self.send_reactor_cmd(forward).await?;
400
12
        }
401

            
402
12
        let (mut msgs, incomplete) = decode_res.into_parts();
403
16
        while let Some(msg) = msgs.next() {
404
12
            match self
405
12
                .handle_relay_msg(early, hopnum, msg, relay_cell_format, c_t_w)
406
12
                .await
407
            {
408
4
                Ok(()) => continue,
409
8
                Err(e) => {
410
8
                    for m in msgs {
411
                        debug!(
412
                            circ_id = %self.unique_id,
413
                            "Ignoring relay msg received after triggering shutdown: {m:?}",
414
                        );
415
                    }
416
8
                    if let Some(incomplete) = incomplete {
417
                        debug!(
418
                            circ_id = %self.unique_id,
419
                            "Ignoring partial relay msg received after triggering shutdown: {:?}",
420
                            incomplete,
421
                        );
422
8
                    }
423

            
424
8
                    return Err(e);
425
                }
426
            }
427
        }
428

            
429
4
        Ok(())
430
12
    }
431

            
432
    /// Handle a single incoming RELAY message.
433
12
    async fn handle_relay_msg(
434
12
        &mut self,
435
12
        early: bool,
436
12
        hop: Option<HopNum>,
437
12
        msg: UnparsedRelayMsg,
438
12
        relay_cell_format: RelayCellFormat,
439
12
        cell_counts_toward_windows: bool,
440
12
    ) -> StdResult<(), ReactorError> {
441
        // If this msg wants/refuses to have a Stream ID, does it
442
        // have/not have one?
443
12
        let streamid = msg_streamid(&msg)?;
444

            
445
        // If this doesn't have a StreamId, it's a meta cell,
446
        // not meant for a particular stream.
447
8
        let Some(sid) = streamid else {
448
8
            return self
449
8
                .handle_meta_msg(early, hop, msg, relay_cell_format)
450
8
                .await;
451
        };
452

            
453
        let msg = StreamMsg {
454
            sid,
455
            msg,
456
            cell_counts_toward_windows,
457
        };
458

            
459
        // All messages on streams are handled in the stream reactor
460
        // (because that's where the stream map is)
461
        //
462
        // Internally, this will spawn a StreamReactor for the target hop,
463
        // if not already spawned.
464
        self.hop_mgr.send(hop, msg).await
465
12
    }
466

            
467
    /// Handle a RELAY or RELAY_EARLY message on this circuit with stream ID 0.
468
8
    async fn handle_meta_msg(
469
8
        &mut self,
470
8
        early: bool,
471
8
        hopnum: Option<HopNum>,
472
8
        msg: UnparsedRelayMsg,
473
8
        relay_cell_format: RelayCellFormat,
474
8
    ) -> StdResult<(), ReactorError> {
475
8
        match msg.cmd() {
476
            RelayCmd::SENDME => {
477
                let sendme = msg
478
                    .decode::<Sendme>()
479
                    .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
480
                    .into_msg();
481

            
482
                let cmd = BackwardReactorCmd::HandleSendme {
483
                    hop: hopnum,
484
                    sendme,
485
                };
486

            
487
                self.send_reactor_cmd(cmd).await
488
            }
489
            _ => {
490
8
                self.inner
491
8
                    .handle_meta_msg(&self.runtime, early, hopnum, msg, relay_cell_format)
492
8
                    .await
493
            }
494
        }
495
8
    }
496

            
497
    /// Send a command to the backward reactor.
498
    ///
499
    /// Blocks if the `backward_reactor_tx` channel is full, i.e. if the backward reactor
500
    /// is not ready to send any more cells.
501
    ///
502
    /// Returns an error if the backward reactor has shut down.
503
4
    async fn send_reactor_cmd(
504
4
        &mut self,
505
4
        forward: BackwardReactorCmd,
506
4
    ) -> StdResult<(), ReactorError> {
507
4
        self.backward_reactor_tx.send(forward).await.map_err(|_| {
508
            // The other reactor has shut down
509
            ReactorError::Shutdown
510
        })
511
4
    }
512
}