1
//! A circuit's view of the forward state of the circuit.
2

            
3
use crate::circuit::UniqId;
4
use crate::circuit::reactor::backward::BackwardReactorCmd;
5
use crate::circuit::reactor::hop_mgr::HopMgr;
6
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
7
use crate::circuit::reactor::stream::StreamMsg;
8
use crate::circuit::reactor::{ControlHandler, ReactorResultChannel};
9
use crate::congestion::sendme;
10
use crate::crypto::cell::RelayCellBody;
11
use crate::stream::cmdcheck::AnyCmdChecker;
12
use crate::stream::msg_streamid;
13
use crate::util::err::ReactorError;
14
use crate::{Error, HopNum, Result};
15

            
16
#[cfg(any(feature = "hs-service", feature = "relay"))]
17
use crate::stream::incoming::{
18
    IncomingStreamRequestFilter, IncomingStreamRequestHandler, StreamReqSender,
19
};
20

            
21
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
22
use crate::client::circuit::padding::{PaddingController, QueuedCellPaddingInfo};
23

            
24
use tor_cell::chancell::msg::AnyChanMsg;
25
use tor_cell::relaycell::msg::{Sendme, SendmeTag};
26
use tor_cell::relaycell::{
27
    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg,
28
};
29
use tor_error::internal;
30
use tor_linkspec::HasRelayIds;
31
use tor_rtcompat::Runtime;
32

            
33
use derive_deftly::Deftly;
34
use futures::SinkExt;
35
use futures::channel::mpsc;
36
use futures::{FutureExt as _, StreamExt, select_biased};
37
use tracing::debug;
38

            
39
use std::result::Result as StdResult;
40

            
41
use crate::circuit::CircuitRxReceiver;
42

            
43
/// The forward circuit reactor.
44
///
45
/// See the [`reactor`](crate::circuit::reactor) module-level docs.
46
///
47
/// Shuts downs down if an error occurs, or if either the [`Reactor`](super::Reactor)
48
/// or the [`BackwardReactor`](super::BackwardReactor) shuts down:
49
///
50
///   * if the `Reactor` shuts down, we are alerted via the ctrl/command mpsc channels
51
///     (their sending ends will close, which causes run_once() to return ReactorError::Shutdown)
52
///   * if `BackwardReactor` shuts down, the `Reactor` will notice and will itself shut down,
53
///     which, in turn, causes the `ForwardReactor` to shut down as described above
54
#[derive(Deftly)]
55
#[derive_deftly(CircuitReactor)]
56
#[deftly(reactor_name = "forward reactor")]
57
#[deftly(run_inner_fn = "Self::run_once")]
58
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
59
pub(super) struct ForwardReactor<R: Runtime, F: ForwardHandler> {
60
    /// A handle to the runtime.
61
    runtime: R,
62
    /// An identifier for logging about this reactor's circuit.
63
    unique_id: UniqId,
64
    /// Implementation-dependent part of the reactor.
65
    ///
66
    /// This enables us to customize the behavior of the reactor,
67
    /// depending on whether we are a client or a relay.
68
    inner: F,
69
    /// Channel for receiving control commands.
70
    command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
71
    /// Channel for receiving control messages.
72
    control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
73
    /// The reading end of the inbound Tor channel.
74
    ///
75
    /// Yields cells moving from the client towards the exit, if we are a relay,
76
    /// or cells moving towards *us*, if we are a client.
77
    inbound_chan_rx: CircuitRxReceiver,
78
    /// Sender for sending commands to the BackwardReactor.
79
    ///
80
    /// Used for sending:
81
    ///
82
    ///    * circuit-level SENDMEs received from the other endpoint
83
    ///      (`[BackwardReactorCmd::HandleSendme]`)
84
    ///    * circuit-level SENDMEs that need to be delivered to the other endpoint
85
    ///      (using `[BackwardReactorCmd::SendRelayMsg]`)
86
    ///
87
    /// The receiver is in [`BackwardReactor`](super::BackwardReactor), which is responsible for
88
    /// sending cell over the inbound channel.
89
    backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
90
    /// Hop manager, storing per-hop state, and handles to the stream reactors.
91
    ///
92
    /// Contains the `CircHopList`.
93
    hop_mgr: HopMgr<R>,
94
    /// An implementation-specific event stream.
95
    ///
96
    /// Polled from the main loop of the reactor.
97
    /// Each event is passed to [`ForwardHandler::handle_event`].
98
    circ_events: mpsc::Receiver<F::CircEvent>,
99
    /// A padding controller to which padding-related events should be reported.
100
    padding_ctrl: PaddingController,
101
}
102

            
103
/// A control command aimed at the generic forward reactor.
104
pub(crate) enum CtrlCmd<C> {
105
    /// Begin accepting streams on this circuit.
106
    //
107
    // TODO(DEDUP): this is very similar to its client-side counterpart,
108
    // except the hop is a Option<HopNum> instead of a TargetHop.
109
    #[cfg(any(feature = "hs-service", feature = "relay"))]
110
    AwaitStreamRequests {
111
        /// A channel for sending information about an incoming stream request.
112
        incoming_sender: StreamReqSender,
113
        /// A `CmdChecker` to keep track of which message types are acceptable.
114
        cmd_checker: AnyCmdChecker,
115
        /// Oneshot channel to notify on completion.
116
        done: ReactorResultChannel<()>,
117
        /// The hop that is allowed to create streams.
118
        ///
119
        /// Set to None if we are a relay wanting to accept stream requests.
120
        hop: Option<HopNum>,
121
        /// A filter used to check requests before passing them on.
122
        filter: Box<dyn IncomingStreamRequestFilter>,
123
    },
124
    /// An implementation-dependent control command.
125
    #[allow(unused)] // TODO(relay)
126
    Custom(C),
127
}
128

            
129
/// A control message aimed at the generic forward reactor.
130
pub(crate) enum CtrlMsg<M> {
131
    /// An implementation-dependent control message.
132
    #[allow(unused)] // TODO(relay)
133
    Custom(M),
134
}
135

            
136
/// Trait for customizing the behavior of the forward reactor.
137
///
138
/// Used for plugging in the implementation-dependent (client vs relay)
139
/// parts of the implementation into the generic one.
140
pub(crate) trait ForwardHandler: ControlHandler {
141
    /// Type that explains how to build an outgoing channel.
142
    type BuildSpec: HasRelayIds;
143

            
144
    /// The subclass of ChanMsg that can arrive on this type of circuit.
145
    type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error>;
146

            
147
    /// An opaque event type.
148
    ///
149
    /// The [`ForwardReactor`] polls an MPSC stream yielding `CircEvent`s from the main loop.
150
    /// Each event is passed to [`Self::handle_event`] for handling.
151
    type CircEvent;
152

            
153
    /// Handle a non-SENDME RELAY message on this circuit with stream ID 0.
154
    async fn handle_meta_msg<R: Runtime>(
155
        &mut self,
156
        runtime: &R,
157
        early: bool,
158
        hopnum: Option<HopNum>,
159
        msg: UnparsedRelayMsg,
160
        relay_cell_format: RelayCellFormat,
161
    ) -> StdResult<(), ReactorError>;
162

            
163
    /// Handle a forward cell that we could not decrypt.
164
    ///
165
    /// Only used by relays.
166
    fn handle_unrecognized_cell(
167
        &mut self,
168
        body: RelayCellBody,
169
        info: Option<QueuedCellPaddingInfo>,
170
    ) -> StdResult<(), ReactorError>;
171

            
172
    /// Handle a forward (TODO terminology) cell.
173
    ///
174
    /// The cell is
175
    ///   - moving from the client towards the exit, if we're a relay
176
    ///   - moving from the guard towards us, if we're a client
177
    ///
178
    /// Returns an error if the cell should cause the reactor to shut down,
179
    /// or a [`ForwardCellDisposition`] specifying how it should be handled.
180
    ///
181
    /// Returns `None` if the cell was handled internally by this handler.
182
    async fn handle_forward_cell<R: Runtime>(
183
        &mut self,
184
        hop_mgr: &mut HopMgr<R>,
185
        cell: Self::CircChanMsg,
186
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError>;
187

            
188
    /// Handle an implementation-specific circuit event.
189
    ///
190
    /// Returns a command for the backward reactor.
191
    fn handle_event(
192
        &mut self,
193
        event: Self::CircEvent,
194
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError>;
195

            
196
    /// Wait until the outbound channel, if there is one, is ready to accept more cells.
197
    ///
198
    /// Resolves immediately if there is no outbound channel.
199
    /// Blocks if there is a pending outbound channel.
200
    async fn outbound_chan_ready(&mut self) -> Result<()>;
201
}
202

            
203
/// What action to take in response to a cell arriving on our inbound Tor channel.
204
pub(crate) enum ForwardCellDisposition {
205
    /// Handle a decoded RELAY or RELAY_EARLY cell in the [`ForwardReactor`].
206
    HandleRecognizedRelay {
207
        /// The decoded cell.
208
        cell: RelayCellDecoderResult,
209
        /// Whether this was a RELAY_EARLY.
210
        early: bool,
211
        /// The hop this cell was for.
212
        hopnum: Option<HopNum>,
213
        /// The SENDME tag.
214
        tag: SendmeTag,
215
    },
216
}
217

            
218
impl<R: Runtime, F: ForwardHandler> ForwardReactor<R, F> {
219
    /// Create a new [`ForwardReactor`].
220
    #[allow(clippy::too_many_arguments)] // TODO
221
    pub(super) fn new(
222
        runtime: R,
223
        unique_id: UniqId,
224
        inner: F,
225
        hop_mgr: HopMgr<R>,
226
        inbound_chan_rx: CircuitRxReceiver,
227
        control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
228
        command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
229
        backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
230
        circ_events: mpsc::Receiver<F::CircEvent>,
231
        padding_ctrl: PaddingController,
232
    ) -> Self {
233
        Self {
234
            runtime,
235
            unique_id,
236
            inbound_chan_rx,
237
            control_rx,
238
            command_rx,
239
            inner,
240
            backward_reactor_tx,
241
            hop_mgr,
242
            circ_events,
243
            padding_ctrl,
244
        }
245
    }
246

            
247
    /// Helper for [`run`](Self::run).
248
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
249
        let outbound_chan_ready = self.inner.outbound_chan_ready();
250

            
251
        let inbound_chan_rx_fut = async {
252
            // Avoid reading from the inbound_chan_rx Tor Channel if the outgoing sink is blocked
253
            outbound_chan_ready.await?;
254
            Ok(self.inbound_chan_rx.next().await)
255
        };
256

            
257
        select_biased! {
258
            res = self.command_rx.next().fuse() => {
259
                let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
260
                self.handle_cmd(cmd)
261
            }
262
            res = self.control_rx.next().fuse() => {
263
                let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
264
                self.handle_msg(msg)
265
            }
266
            res = self.circ_events.next().fuse() => {
267
                let ev = res.ok_or_else(|| ReactorError::Shutdown)?;
268
                if let Some(cmd) = self.inner.handle_event(ev)? {
269
                    self.send_reactor_cmd(cmd).await?;
270
                }
271

            
272
                Ok(())
273
            }
274
            res = inbound_chan_rx_fut.fuse() => {
275
                let cell = res.map_err(ReactorError::Err)?;
276
                let Some(cell) = cell else {
277
                    debug!(
278
                        circ_id = %self.unique_id,
279
                        "Backward channel has closed, shutting down forward relay reactor",
280
                    );
281

            
282
                    return Err(ReactorError::Shutdown);
283
                };
284

            
285
                let cell: F::CircChanMsg = cell.try_into()?;
286
                let Some(disp) = self.inner.handle_forward_cell(&mut self.hop_mgr, cell).await? else {
287
                    return Ok(());
288
                };
289

            
290
                match disp {
291
                    ForwardCellDisposition::HandleRecognizedRelay { cell, early, hopnum, tag } => {
292
                        self.handle_relay_cell(cell, early, hopnum, tag).await
293
                    }
294
                }
295
            },
296
        }
297
    }
298

            
299
    /// Handle a control command.
300
    fn handle_cmd(&mut self, cmd: CtrlCmd<F::CtrlCmd>) -> StdResult<(), ReactorError> {
301
        match cmd {
302
            #[cfg(any(feature = "hs-service", feature = "relay"))]
303
            CtrlCmd::AwaitStreamRequests {
304
                incoming_sender,
305
                cmd_checker,
306
                done,
307
                hop,
308
                filter,
309
            } => {
310
                let handler = IncomingStreamRequestHandler {
311
                    incoming_sender,
312
                    cmd_checker,
313
                    hop_num: hop,
314
                    filter,
315
                };
316

            
317
                // Update the HopMgr with the
318
                let ret = self.hop_mgr.set_incoming_handler(handler);
319
                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
320
                Ok(())
321
            }
322
            CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
323
        }
324
    }
325

            
326
    /// Handle a control message.
327
    fn handle_msg(&mut self, msg: CtrlMsg<F::CtrlMsg>) -> StdResult<(), ReactorError> {
328
        match msg {
329
            CtrlMsg::Custom(c) => self.inner.handle_msg(c),
330
        }
331
    }
332

            
333
    /// Note that we have received a RELAY cell.
334
    ///
335
    /// Updates the padding and CC state.
336
    fn note_relay_cell_received(
337
        &self,
338
        hopnum: Option<HopNum>,
339
        c_t_w: bool,
340
    ) -> Result<(RelayCellFormat, bool)> {
341
        let mut hops = self.hop_mgr.hops().write().expect("poisoned lock");
342
        let hop = hops
343
            .get_mut(hopnum)
344
            .ok_or_else(|| internal!("msg from non-existant hop???"))?;
345

            
346
        // Check whether we are allowed to receive more data for this circuit hop.
347
        hop.inbound.decrement_cell_limit()?;
348

            
349
        // Decrement the circuit sendme windows, and see if we need to
350
        // send a sendme cell.
351
        let send_circ_sendme = if c_t_w {
352
            hop.ccontrol
353
                .lock()
354
                .expect("poisoned lock")
355
                .note_data_received()?
356
        } else {
357
            false
358
        };
359

            
360
        let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
361

            
362
        Ok((relay_cell_format, send_circ_sendme))
363
    }
364

            
365
    /// Handle a RELAY cell.
366
    ///
367
    // TODO(DEDUP): very similar to Client::handle_relay_cell()
368
    async fn handle_relay_cell(
369
        &mut self,
370
        decode_res: RelayCellDecoderResult,
371
        early: bool,
372
        hopnum: Option<HopNum>,
373
        tag: SendmeTag,
374
    ) -> StdResult<(), ReactorError> {
375
        // For padding purposes, if we are a relay, we set the hopnum to 0
376
        // TODO(relay): is this right?
377
        let hopnum_padding = hopnum.unwrap_or_else(|| HopNum::from(0));
378
        if decode_res.is_padding() {
379
            self.padding_ctrl.decrypted_padding(hopnum_padding)?;
380
        } else {
381
            self.padding_ctrl.decrypted_data(hopnum_padding);
382
        }
383

            
384
        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
385
        let (relay_cell_format, send_circ_sendme) = self.note_relay_cell_received(hopnum, c_t_w)?;
386

            
387
        // If we do need to send a circuit-level SENDME cell, do so.
388
        if send_circ_sendme {
389
            // This always sends a V1 (tagged) sendme cell, and thereby assumes
390
            // that SendmeEmitMinVersion is no more than 1.  If the authorities
391
            // every increase that parameter to a higher number, this will
392
            // become incorrect.  (Higher numbers are not currently defined.)
393
            let sendme = Sendme::from(tag);
394
            let msg = AnyRelayMsgOuter::new(None, sendme.into());
395
            let forward = BackwardReactorCmd::SendRelayMsg { hop: hopnum, msg };
396

            
397
            // NOTE: sending the SENDME to the backward reactor for handling
398
            // might seem counterintuitive, given that we have access to
399
            // the congestion control object right here (via hop_mgr).
400
            //
401
            // However, the forward reactor does not have access to the
402
            // outbound_chan_tx part of the inbound (towards the client) Tor channel,
403
            // and so it cannot handle the SENDME on its own
404
            // (because it cannot obtain the congestion signals),
405
            // so the SENDME needs to be handled in the backward reactor.
406
            //
407
            // NOTE: this will block if the backward reactor is not ready
408
            // to send any more cells.
409
            self.send_reactor_cmd(forward).await?;
410
        }
411

            
412
        let (mut msgs, incomplete) = decode_res.into_parts();
413
        while let Some(msg) = msgs.next() {
414
            match self
415
                .handle_relay_msg(early, hopnum, msg, relay_cell_format, c_t_w)
416
                .await
417
            {
418
                Ok(()) => continue,
419
                Err(e) => {
420
                    for m in msgs {
421
                        debug!(
422
                            circ_id = %self.unique_id,
423
                            "Ignoring relay msg received after triggering shutdown: {m:?}",
424
                        );
425
                    }
426
                    if let Some(incomplete) = incomplete {
427
                        debug!(
428
                            circ_id = %self.unique_id,
429
                            "Ignoring partial relay msg received after triggering shutdown: {:?}",
430
                            incomplete,
431
                        );
432
                    }
433

            
434
                    return Err(e);
435
                }
436
            }
437
        }
438

            
439
        Ok(())
440
    }
441

            
442
    /// Handle a single incoming RELAY message.
443
    async fn handle_relay_msg(
444
        &mut self,
445
        early: bool,
446
        hop: Option<HopNum>,
447
        msg: UnparsedRelayMsg,
448
        relay_cell_format: RelayCellFormat,
449
        cell_counts_toward_windows: bool,
450
    ) -> StdResult<(), ReactorError> {
451
        // If this msg wants/refuses to have a Stream ID, does it
452
        // have/not have one?
453
        let streamid = msg_streamid(&msg)?;
454

            
455
        // If this doesn't have a StreamId, it's a meta cell,
456
        // not meant for a particular stream.
457
        let Some(sid) = streamid else {
458
            return self
459
                .handle_meta_msg(early, hop, msg, relay_cell_format)
460
                .await;
461
        };
462

            
463
        let msg = StreamMsg {
464
            sid,
465
            msg,
466
            cell_counts_toward_windows,
467
        };
468

            
469
        // All messages on streams are handled in the stream reactor
470
        // (because that's where the stream map is)
471
        //
472
        // Internally, this will spawn a StreamReactor for the target hop,
473
        // if not already spawned.
474
        self.hop_mgr.send(hop, msg).await
475
    }
476

            
477
    /// Handle a RELAY or RELAY_EARLY message on this circuit with stream ID 0.
478
    async fn handle_meta_msg(
479
        &mut self,
480
        early: bool,
481
        hopnum: Option<HopNum>,
482
        msg: UnparsedRelayMsg,
483
        relay_cell_format: RelayCellFormat,
484
    ) -> StdResult<(), ReactorError> {
485
        match msg.cmd() {
486
            RelayCmd::SENDME => {
487
                let sendme = msg
488
                    .decode::<Sendme>()
489
                    .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
490
                    .into_msg();
491

            
492
                let cmd = BackwardReactorCmd::HandleSendme {
493
                    hop: hopnum,
494
                    sendme,
495
                };
496

            
497
                self.send_reactor_cmd(cmd).await
498
            }
499
            _ => {
500
                self.inner
501
                    .handle_meta_msg(&self.runtime, early, hopnum, msg, relay_cell_format)
502
                    .await
503
            }
504
        }
505
    }
506

            
507
    /// Send a command to the backward reactor.
508
    ///
509
    /// Blocks if the `backward_reactor_tx` channel is full, i.e. if the backward reactor
510
    /// is not ready to send any more cells.
511
    ///
512
    /// Returns an error if the backward reactor has shut down.
513
    async fn send_reactor_cmd(
514
        &mut self,
515
        forward: BackwardReactorCmd,
516
    ) -> StdResult<(), ReactorError> {
517
        self.backward_reactor_tx.send(forward).await.map_err(|_| {
518
            // The other reactor has shut down
519
            ReactorError::Shutdown
520
        })
521
    }
522
}