1
//! A circuit's view of the backward state of the circuit.
2

            
3
use crate::channel::Channel;
4
use crate::circuit::UniqId;
5
use crate::circuit::cell_sender::CircuitCellSender;
6
use crate::circuit::reactor::ControlHandler;
7
use crate::circuit::reactor::circhop::CircHopList;
8
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
9
use crate::circuit::reactor::stream::ReadyStreamMsg;
10
use crate::congestion::{CongestionControl, sendme};
11
use crate::crypto::cell::RelayCellBody;
12
use crate::util::err::ReactorError;
13
use crate::util::poll_all::PollAll;
14
use crate::{Error, HopNum, Result};
15

            
16
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
17
use crate::client::circuit::padding::{
18
    self, PaddingController, PaddingEvent, PaddingEventStream, QueuedCellPaddingInfo,
19
};
20

            
21
use tor_cell::chancell::msg::{AnyChanMsg, Relay};
22
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCmd, CircId};
23
use tor_cell::relaycell::msg::{Sendme, SendmeTag};
24
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, RelayCmd};
25
use tor_error::internal;
26
use tor_rtcompat::{DynTimeProvider, Runtime};
27

            
28
use derive_deftly::Deftly;
29
use futures::SinkExt;
30
use futures::channel::mpsc;
31
use futures::{FutureExt as _, StreamExt, future, select_biased};
32
use tracing::trace;
33

            
34
use std::pin::Pin;
35
use std::result::Result as StdResult;
36
use std::sync::{Arc, Mutex, RwLock};
37

            
38
use crate::circuit::CircuitRxReceiver;
39

            
40
#[cfg(feature = "circ-padding")]
41
use crate::circuit::padding::{CircPaddingDisposition, padding_disposition};
42

            
43
#[cfg(feature = "relay")]
44
use tor_cell::relaycell::msg::Extended2;
45

            
46
/// The "backward" circuit reactor of a relay.
47
///
48
/// See the [`reactor`](crate::circuit::reactor) module-level docs.
49
///
50
/// Shuts downs down if an error occurs, or if the [`Reactor`](super::Reactor),
51
/// [`ForwardReactor`](super::ForwardReactor), or if one of the
52
/// [`StreamReactor`](super::stream::StreamReactor)s of this circuit shuts down:
53
///
54
///   * if the `Reactor` shuts down, we are alerted via the ctrl/command mpsc channels
55
///     (their sending ends will close, which causes run_once() to return ReactorError::Shutdown)
56
///   * if `ForwardReactor` shuts down, the `Reactor` will notice and will itself shut down,
57
///     which, in turn, causes the `BackwardReactor` to shut down as described above
58
///   * if one of the `StreamReactor`s shuts down, the `ForwardReactor` will
59
///     notice when it next tries to deliver a stream message to it, and shut down,
60
///     causing the `BackwardReactor` and top-level `Reactor` to follow suit
61
#[derive(Deftly)]
62
#[derive_deftly(CircuitReactor)]
63
#[deftly(reactor_name = "backward reactor")]
64
#[deftly(run_inner_fn = "Self::run_once")]
65
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
66
pub(super) struct BackwardReactor<B: BackwardHandler> {
67
    /// The time provider.
68
    time_provider: DynTimeProvider,
69
    /// An identifier for logging about this reactor's circuit.
70
    unique_id: UniqId,
71
    /// The circuit identifier on the backward Tor channel.
72
    circ_id: CircId,
73
    /// The inbound Tor channel.
74
    channel: Arc<Channel>,
75
    /// Implementation-dependent part of the reactor.
76
    ///
77
    /// This enables us to customize the behavior of the reactor,
78
    /// depending on whether we are a client or a relay.
79
    inner: B,
80
    /// The reading end of the outbound Tor channel, if we are not the last hop.
81
    ///
82
    /// Yields cells moving from the exit towards the client, if we are a middle relay.
83
    outbound_chan_rx: Option<CircuitRxReceiver>,
84
    /// The per-hop state, shared with the forward reactor.
85
    ///
86
    /// The backward reactor acquires a read lock to this whenever it needs to
87
    ///
88
    ///   * send a circuit-level SENDME
89
    ///   * handle a circuit-level SENDME
90
    ///   * send a padding cell
91
    ///
92
    // Note: For the sending/handling of SENDMEs, we lock the hop list
93
    // to extract the relay cell format and CC state of the hop.
94
    // Technically, for the SENDME cases, we could've avoided locking
95
    // the hop list from the BWD, by having the FWD share the relay cell format
96
    // and CC state in the BackwardReactorCmd::{Send,Handle}Sendme command.
97
    // But for the padding case, we *need* the hop list, because we need
98
    // to work out what relay cell format to use when sending the padding cell.
99
    // But for the sake of simplicity, I made the BWD consult the CircHopList in all cases.
100
    //
101
    // TODO: the backward reactor only ever reads from this.
102
    // Conceptually, it is the foward reactor's HopMgr that owns this list:
103
    // only HopMgr can add hops to the list.
104
    //
105
    // Perhaps we need a specialized abstraction that only allows reading here.
106
    // This could be a wrapper over RwLock, providing a read-only API.
107
    hops: Arc<RwLock<CircHopList>>,
108
    /// The sending end of the backward Tor channel.
109
    ///
110
    /// Delivers cells towards the other endpoint: towards the client, if we are a relay,
111
    /// or towards the exit, if we are a client.
112
    inbound_chan_tx: CircuitCellSender,
113
    /// Channel for receiving control commands.
114
    command_rx: mpsc::UnboundedReceiver<CtrlCmd<B::CtrlCmd>>,
115
    /// Channel for receiving control messages.
116
    control_rx: mpsc::UnboundedReceiver<CtrlMsg<B::CtrlMsg>>,
117
    /// Receiver for [`BackwardReactorCmd`]s coming from the forward reactor.
118
    ///
119
    /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
120
    /// carrying Tor stream data to us.
121
    ///
122
    /// This serves a dual purpose:
123
    ///
124
    ///   * it enables the `ForwardReactor` to deliver Tor stream data received
125
    ///     from the other endpoint
126
    ///   * it lets the `BackwardReactor` know if the `ForwardReactor` has shut down:
127
    ///     we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
128
    ///     shuts down, we will get EOS upon calling `.next()`)
129
    forward_reactor_rx: mpsc::Receiver<BackwardReactorCmd>,
130
    /// A channel for receiving endpoint-bound stream messages from the StreamReactor(s)
131
    /// (the stream messages are client-bound if we are a relay, or exit-bound if we are a client).
132
    stream_rx: mpsc::Receiver<ReadyStreamMsg>,
133
    /// A padding controller to which padding-related events should be reported.
134
    padding_ctrl: PaddingController,
135
    /// An event stream telling us about padding-related events.
136
    padding_event_stream: PaddingEventStream,
137
    /// Current rules for blocking traffic, according to the padding controller.
138
    #[cfg(feature = "circ-padding")]
139
    padding_block: Option<padding::StartBlocking>,
140
}
141

            
142
/// A control message aimed at the generic forward reactor.
143
pub(crate) enum CtrlMsg<M> {
144
    /// An implementation-dependent control message.
145
    #[allow(unused)] // TODO(relay)
146
    Custom(M),
147
}
148

            
149
/// A control command aimed at the generic forward reactor.
150
pub(crate) enum CtrlCmd<C> {
151
    /// An implementation-dependent control command.
152
    #[allow(unused)] // TODO(relay)
153
    Custom(C),
154
}
155

            
156
/// Trait for customizing the behavior of the backward reactor.
157
///
158
/// Used for plugging in the implementation-dependent (client vs relay)
159
/// parts of the implementation into the generic one.
160
pub(crate) trait BackwardHandler: ControlHandler {
161
    /// The subclass of ChanMsg that can arrive on this type of circuit.
162
    type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error> + Send;
163

            
164
    /// Encrypt a RelayCellBody that is moving in the backward direction.
165
    fn encrypt_relay_cell(
166
        &mut self,
167
        cmd: ChanCmd,
168
        body: &mut RelayCellBody,
169
        hop: Option<HopNum>,
170
    ) -> SendmeTag;
171

            
172
    /// Handle a cell that was read from the Tor outbound channel.
173
    ///
174
    /// Returns an error if the cell should cause the reactor to shut down,
175
    /// or a [`BackwardCellDisposition`] specifying how it should be handled.
176
    fn handle_backward_cell(
177
        &mut self,
178
        circ_id: UniqId,
179
        cell: Self::CircChanMsg,
180
    ) -> StdResult<BackwardCellDisposition, ReactorError>;
181
}
182

            
183
/// What action to take in response to a cell arriving on our outbound Tor channel.
184
pub(crate) enum BackwardCellDisposition {
185
    /// Forward the cell, writing it to the inbound Tor channel.
186
    Forward(AnyChanMsg),
187
}
188

            
189
#[allow(unused)] // TODO(relay)
190
impl<B: BackwardHandler> BackwardReactor<B> {
191
    /// Create a new [`BackwardReactor`].
192
    #[allow(clippy::too_many_arguments)] // TODO
193
    pub(super) fn new<R: Runtime>(
194
        runtime: R,
195
        channel: &Arc<Channel>,
196
        circ_id: CircId,
197
        unique_id: UniqId,
198
        inner: B,
199
        hops: Arc<RwLock<CircHopList>>,
200
        forward_reactor_rx: mpsc::Receiver<BackwardReactorCmd>,
201
        control_rx: mpsc::UnboundedReceiver<CtrlMsg<B::CtrlMsg>>,
202
        command_rx: mpsc::UnboundedReceiver<CtrlCmd<B::CtrlCmd>>,
203
        padding_ctrl: PaddingController,
204
        padding_event_stream: PaddingEventStream,
205
        stream_rx: mpsc::Receiver<ReadyStreamMsg>,
206
    ) -> Self {
207
        let channel = Arc::clone(channel);
208
        let inbound_chan_tx = CircuitCellSender::from_channel_sender(channel.sender());
209

            
210
        Self {
211
            time_provider: DynTimeProvider::new(runtime),
212
            outbound_chan_rx: None,
213
            channel,
214
            inner,
215
            hops,
216
            inbound_chan_tx,
217
            unique_id,
218
            circ_id,
219
            forward_reactor_rx,
220
            control_rx,
221
            command_rx,
222
            stream_rx,
223
            padding_ctrl,
224
            padding_event_stream,
225
            #[cfg(feature = "circ-padding")]
226
            padding_block: None,
227
        }
228
    }
229

            
230
    /// Helper for [`run`](Self::run).
231
    ///
232
    /// Handles cells arriving on the outbound Tor channel,
233
    /// and writes cells to the inbound Tor channel.
234
    ///
235
    /// Because the Tor application streams, the `forward_reactor_rx` MPSC streams,
236
    /// and the outbound Tor channel MPSC stream are driven concurrently using [`PollAll`],
237
    /// this function can send up to 3 cells per call over the inbound Tor channel:
238
    ///
239
    ///    * a cell carrying Tor stream data
240
    ///    * a cell received from the outbound Tor channel, if we are a relay
241
    ///      (moving from the exit towards the client)
242
    ///    * a circuit-level SENDME
243
    ///
244
    /// However, in practice, leaky pipe is not really used,
245
    /// and so relays that have application streams (i.e. the exits),
246
    /// are not going to have an outbound Tor channel,
247
    /// and so this will only really drive Tor stream data,
248
    /// delivering at most 2 cells per call.
249
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
250
        use postage::prelude::{Sink as _, Stream as _};
251

            
252
        /// The maximum number of events we expect to handle per reactor loop.
253
        ///
254
        /// This is bounded by the number of futures we push into the PollAll.
255
        const PER_LOOP_EVENT_COUNT: usize = 3;
256

            
257
        // A collection of futures we plan to drive concurrently.
258
        let mut poll_all =
259
            PollAll::<PER_LOOP_EVENT_COUNT, Option<CircuitEvent<B::CircChanMsg>>>::new();
260

            
261
        // Flush the backward Tor channel sink, and check it for readiness
262
        //
263
        // TODO(flushing): here and everywhere else we need to flush:
264
        //
265
        // Currently, we try to flush every time we want to write to the sink,
266
        // but may be suboptimal.
267
        //
268
        // However, we don't actually *wait* for the flush to complete
269
        // (we just make a bit of progress by calling poll_flush),
270
        // so it's possible that this is actually tolerable.
271
        // We should run some tests, and if this turns out to be a performance bottleneck,
272
        // we'll have to rethink our flushing approach.
273
        let backward_chan_ready = future::poll_fn(|cx| {
274
            // The flush outcome doesn't matter,
275
            // so we simply move on to the readiness check.
276
            // The reason we don't wait on the flush is because we don't
277
            // want to flush on *every* reactor loop, but we do want to make
278
            // a bit of progress each time.
279
            //
280
            // (TODO: do we want to handle errors here?)
281
            let _ = self.inbound_chan_tx.poll_flush_unpin(cx);
282

            
283
            self.inbound_chan_tx.poll_ready_unpin(cx)
284
        });
285

            
286
        // Concurrently, drive :
287
        //  1. a future that reads from the StreamReactor, to see if there are
288
        //  any application streams that have a message to send
289
        //  (this resolves to a message that needs to be delivered to the peer)
290
        poll_all.push(async {
291
            // Internally, each stream reactor checks if we're allowed to send anything
292
            // that counts towards SENDME windows (and ceases to send us stream data if not)
293
            //
294
            // The reason we don't check that here is because stream_rx multiplexes stream data
295
            // from all hops, and we have no way of knowing which hop will want to send us stream
296
            // data next, and therefore we can't know which hop's CC object to use
297
            self.stream_rx.next().await.map(CircuitEvent::Send)
298
        });
299

            
300
        //  2. the stream of commands coming from the ForwardReactor
301
        //  (this resolves to a BackwardReactorCmd)
302
        poll_all.push(async {
303
            let event = match self.forward_reactor_rx.next().await {
304
                Some(cmd) => CircuitEvent::Forwarded(cmd),
305
                None => {
306
                    // The forward reactor has crashed, so we have to shut down.
307
                    CircuitEvent::ForwardShutdown
308
                }
309
            };
310

            
311
            Some(event)
312
        });
313

            
314
        // 3. Messages moving from the outbound channel towards the inbound Tor channel,
315
        // if we have an outbound Tor channel.
316
        //
317
        // NOTE: in practice, clients and exits won't have an outbound Tor channel,
318
        // so for them this will be a no-op.
319
        poll_all.push(async {
320
            let event = if let Some(outbound_chan_rx) = self.outbound_chan_rx.as_mut() {
321
                // Forward channel unexpectedly closed, we should close too
322
                match outbound_chan_rx.next().await {
323
                    Some(msg) => match msg.try_into() {
324
                        Err(e) => CircuitEvent::ProtoViolation(e),
325
                        Ok(cell) => CircuitEvent::Cell(cell),
326
                    },
327
                    None => {
328
                        // The forward reactor has crashed, so we have to shut down.
329
                        CircuitEvent::ForwardShutdown
330
                    }
331
                }
332
            } else {
333
                future::pending().await
334
            };
335

            
336
            Some(event)
337
        });
338

            
339
        let poll_all = async move {
340
            // Avoid polling **any** of the futures if the outgoing sink is blocked.
341
            //
342
            // This implements backpressure: we avoid reading from our input sources
343
            // if we know we're unable to write to the inbound Tor channel sink.
344
            //
345
            // More specifically, if our inbound Tor channel sink is full and can no longer
346
            // accept cells, we stop reading:
347
            //
348
            //   1. From the application streams (received from StreamReactor), if there are any.
349
            //
350
            //   2. From the forward_reactor_rx channel, used by the forward reactor to send us
351
            //
352
            //     - a circuit-level SENDME that we have received, or
353
            //     - a circuit-level SENDME that we need to deliver to the client
354
            //
355
            //     Not reading from the forward_reactor_rx channel, in turn, causes the forward reactor
356
            //     to block and therefore stop reading from **its** input sources,
357
            //     propagating backpressure all the way to the other endpoint of the circuit.
358
            //
359
            //   3. From the outbound Tor channel, if there is one.
360
            //
361
            // This will delay any SENDMEs the client or exit might have sent along
362
            // the way, and therefore count as a congestion signal.
363
            //
364
            // TODO: memquota setup to make sure this doesn't turn into a memory DOS vector
365
            let _ = backward_chan_ready.await;
366

            
367
            // TODO: it's important to not block reading from the forward_reactor_rx channel on the chan
368
            // sender readiness (for instance, we should not block the sending of SENDMEs
369
            // if the channel is blocked on a padding-induced block).
370
            //
371
            // This means we will need to move the forward_reactor_rx handling out of the PollAll
372
            // to the select_biased! below.
373
            poll_all.await
374
        };
375

            
376
        let events = select_biased! {
377
            res = self.command_rx.next().fuse() => {
378
                let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
379
                self.handle_cmd(cmd)?;
380
                return Ok(());
381
            }
382
            res = self.control_rx.next().fuse() => {
383
                let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
384
                self.handle_msg(msg)?;
385
                return Ok(());
386
            }
387
            res = self.padding_event_stream.next().fuse() => {
388
                // If there's a padding event, we need to handle it immediately,
389
                // because it might tell us to start blocking the inbound_chan_tx sink,
390
                // which, in turn, means we need to stop trying to read from
391
                // the application streams.
392
                let event = res.ok_or_else(|| ReactorError::Shutdown)?;
393

            
394
                cfg_if::cfg_if! {
395
                    if #[cfg(feature = "circ-padding")] {
396
                        self.run_padding_event(event).await?;
397
                    } else {
398
                        // If padding isn't enabled, we never generate a padding event,
399
                        // so we can be sure this case will never be called.
400
                        void::unreachable(event.0);
401
                    }
402
                }
403
                return Ok(())
404
            }
405
            res = poll_all.fuse() => res,
406
        };
407

            
408
        // Note: there shouldn't be more than N < PER_LOOP_EVENT_COUNT events to handle
409
        // per reactor loop. We need to be careful here, because we must avoid blocking
410
        // the reactor.
411
        //
412
        // If handling more than one event per loop turns out to be a problem, we may
413
        // need to dispatch this to a background task instead.
414
        //
415
        // TODO(relay): this loop is actually a problem.
416
        // As mentioned in the run_once() docs, this will attempt to send up
417
        // to 3 cells on the inbound tor Channel (or 2 cells, assuming no leaky pipe).
418
        //
419
        // The problem is that the readiness check above (see backward_chan_ready)
420
        // only checks that the queue has enough room for 1 cell, not *2 cells*.
421
        // Trying to send more than 2 cell when there is only room for one
422
        // will cause the reactor to block (and because there is nothing
423
        // driving the flushing of this channel, this will be a hard block).
424
        //
425
        // We need to rethink the strategy here (e.g. by flushing in parallel
426
        // with handle_event())
427
        for event in events.into_iter().flatten() {
428
            self.handle_event(event).await?;
429
        }
430

            
431
        Ok(())
432
    }
433

            
434
    /// Handle a control command.
435
    fn handle_cmd(&mut self, cmd: CtrlCmd<B::CtrlCmd>) -> StdResult<(), ReactorError> {
436
        match cmd {
437
            CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
438
        }
439
    }
440

            
441
    /// Handle a control message.
442
    fn handle_msg(&mut self, msg: CtrlMsg<B::CtrlMsg>) -> StdResult<(), ReactorError> {
443
        match msg {
444
            CtrlMsg::Custom(c) => self.inner.handle_msg(c),
445
        }
446
    }
447

            
448
    /// Perform some circuit-padding-based event on the specified circuit.
449
    //
450
    // TODO(DEDUP): this is almost identical to the client-side Conflux::run_padding_event()
451
    #[cfg(feature = "circ-padding")]
452
    async fn run_padding_event(
453
        &mut self,
454
        padding_event: PaddingEvent,
455
    ) -> StdResult<(), ReactorError> {
456
        use PaddingEvent as E;
457

            
458
        match padding_event {
459
            E::SendPadding(send_padding) => {
460
                self.send_padding(send_padding).await?;
461
            }
462
            E::StartBlocking(start_blocking) => {
463
                self.start_blocking_for_padding(start_blocking);
464
            }
465
            E::StopBlocking => {
466
                self.stop_blocking_for_padding();
467
            }
468
        }
469
        Ok(())
470
    }
471

            
472
    /// Handle a request from our padding subsystem to send a padding packet.
473
    //
474
    // TODO(DEDUP): this is almost identical to the client-side Client::send_padding()
475
    #[cfg(feature = "circ-padding")]
476
    async fn send_padding(&mut self, send_padding: padding::SendPadding) -> Result<()> {
477
        use CircPaddingDisposition::*;
478

            
479
        let target_hop = send_padding.hop;
480

            
481
        match padding_disposition(
482
            &send_padding,
483
            &self.inbound_chan_tx,
484
            self.padding_block.as_ref(),
485
        ) {
486
            QueuePaddingNormally => {
487
                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
488
                self.queue_padding_cell_for_hop(target_hop, queue_info)
489
                    .await?;
490
            }
491
            QueuePaddingAndBypass => {
492
                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
493
                self.queue_padding_cell_for_hop(target_hop, queue_info)
494
                    .await?;
495
            }
496
            TreatQueuedCellAsPadding => {
497
                self.padding_ctrl
498
                    .replaceable_padding_already_queued(target_hop, send_padding);
499
            }
500
        }
501
        Ok(())
502
    }
503

            
504
    /// Enable padding-based blocking,
505
    /// or change the rule for padding-based blocking to the one in `block`.
506
    //
507
    // TODO(DEDUP): copy of Client::start_blocking_for_padding()
508
    #[cfg(feature = "circ-padding")]
509
    pub(super) fn start_blocking_for_padding(&mut self, block: padding::StartBlocking) {
510
        self.inbound_chan_tx.start_blocking();
511
        self.padding_block = Some(block);
512
    }
513

            
514
    /// Disable padding-based blocking.
515
    ///
516
    // TODO(DEDUP): copy of Client::stop_blocking_for_padding()
517
    #[cfg(feature = "circ-padding")]
518
    pub(super) fn stop_blocking_for_padding(&mut self) {
519
        self.inbound_chan_tx.stop_blocking();
520
        self.padding_block = None;
521
    }
522

            
523
    /// Generate and encrypt a padding cell, and send it to a targeted hop.
524
    ///
525
    /// Ignores any padding-based blocking.
526
    ///
527
    // TODO(DEDUP): copy of Client::queue_padding_cell_for_hop()
528
    #[cfg(feature = "circ-padding")]
529
    async fn queue_padding_cell_for_hop(
530
        &mut self,
531
        target_hop: HopNum,
532
        queue_info: Option<QueuedCellPaddingInfo>,
533
    ) -> Result<()> {
534
        use tor_cell::relaycell::msg::Drop as DropMsg;
535

            
536
        let msg = AnyRelayMsgOuter::new(None, DropMsg::default().into());
537
        let hopnum = Some(target_hop);
538

            
539
        // TODO: the ccontrol state isn't actually needed here, because
540
        // DROP cells don't count towards SENDME windows.
541
        // Technically, we could avoid unnecessarily Arc::clone()ing the CC state
542
        // here, and just extract the relay cell format.
543
        // But for that we would need a specialized send_relay_cell_inner()-like function
544
        // that doesn't take a CC object, or to make the CC object optional in
545
        // send_relay_cell_inner().
546
        let (relay_cell_format, ccontrol) = self.hop_info(hopnum)?;
547

            
548
        self.send_relay_cell_inner(hopnum, relay_cell_format, msg, false, &ccontrol, queue_info)
549
            .await
550
    }
551

            
552
    /// Determine how exactly to handle a request to handle padding.
553
    #[cfg(feature = "circ-padding")]
554
    fn padding_disposition(&self, send_padding: &padding::SendPadding) -> CircPaddingDisposition {
555
        crate::circuit::padding::padding_disposition(
556
            send_padding,
557
            &self.inbound_chan_tx,
558
            self.padding_block.as_ref(),
559
        )
560
    }
561

            
562
    /// Handle a circuit event.
563
    async fn handle_event(
564
        &mut self,
565
        event: CircuitEvent<B::CircChanMsg>,
566
    ) -> StdResult<(), ReactorError> {
567
        use CircuitEvent::*;
568

            
569
        match event {
570
            Cell(cell) => self.handle_backward_cell(cell).await,
571
            Send(msg) => {
572
                let ReadyStreamMsg {
573
                    hop,
574
                    relay_cell_format,
575
                    msg,
576
                    ccontrol,
577
                } = msg;
578

            
579
                self.send_relay_cell(hop, relay_cell_format, msg, false, &ccontrol)
580
                    .await?;
581

            
582
                Ok(())
583
            }
584
            Forwarded(cmd) => self.handle_reactor_cmd(cmd).await,
585
            ForwardShutdown => {
586
                // The forward reactor has crashed, so we have to shut down.
587
                trace!(
588
                    circ_id = %self.unique_id,
589
                    "Backward relay reactor shutdown (forward reactor has closed)",
590
                );
591

            
592
                Err(ReactorError::Shutdown)
593
            }
594
            ProtoViolation(err) => Err(err.into()),
595
        }
596
    }
597

            
598
    /// Return the RelayCellFormat and CC state of a given hop.
599
    fn hop_info(
600
        &self,
601
        hopnum: Option<HopNum>,
602
    ) -> Result<(RelayCellFormat, Arc<Mutex<CongestionControl>>)> {
603
        let hops = self.hops.read().expect("poisoned lock");
604
        let hop = hops
605
            .get(hopnum)
606
            .ok_or_else(|| internal!("tried to send padding to non-existent hop?!"))?;
607
        let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
608
        let ccontrol = Arc::clone(&hop.ccontrol);
609

            
610
        Ok((relay_cell_format, ccontrol))
611
    }
612

            
613
    /// Handle a command sent to us by the forward reactor.
614
    async fn handle_reactor_cmd(&mut self, msg: BackwardReactorCmd) -> StdResult<(), ReactorError> {
615
        use BackwardReactorCmd::*;
616

            
617
        match msg {
618
            SendRelayMsg { hop, msg } => {
619
                self.send_relay_msg(hop, msg).await?;
620
            }
621
            HandleSendme { hop, sendme } => {
622
                self.handle_sendme(hop, sendme).await?;
623
                return Ok(());
624
            }
625
            #[cfg(feature = "relay")]
626
            HandleCircuitExtended {
627
                hop,
628
                extended2,
629
                outbound_chan_rx,
630
            } => {
631
                self.outbound_chan_rx = Some(outbound_chan_rx);
632
                let msg = AnyRelayMsgOuter::new(None, extended2.into());
633
                self.send_relay_msg(hop, msg).await?;
634
            }
635
        }
636

            
637
        Ok(())
638
    }
639

            
640
    /// Send a relay message to the specified hop.
641
    async fn send_relay_msg(
642
        &mut self,
643
        hopnum: Option<HopNum>,
644
        msg: AnyRelayMsgOuter,
645
    ) -> StdResult<(), ReactorError> {
646
        let (relay_cell_format, ccontrol) = self.hop_info(hopnum)?;
647
        let cmd = msg.cmd();
648

            
649
        self.send_relay_cell(hopnum, relay_cell_format, msg, false, &ccontrol)
650
            .await?;
651

            
652
        if cmd == RelayCmd::SENDME {
653
            ccontrol.lock().expect("poisoned lock").note_sendme_sent();
654
        }
655

            
656
        Ok(())
657
    }
658

            
659
    /// Handle a circuit-level SENDME (stream ID = 0).
660
    ///
661
    /// Returns an error if the SENDME does not have an authentication tag
662
    /// (versions of Tor <=0.3.5 omit the SENDME tag, but we don't support
663
    /// those any longer).
664
    ///
665
    /// Any error returned from this function will shut down the reactor.
666
    ///
667
    // TODO(DEDUP): duplicates the logic from the client-side Circuit::handle_sendme()
668
    async fn handle_sendme(
669
        &mut self,
670
        hopnum: Option<HopNum>,
671
        sendme: Sendme,
672
    ) -> StdResult<(), ReactorError> {
673
        let tag = sendme
674
            .into_sendme_tag()
675
            .ok_or_else(|| Error::CircProto("missing tag on circuit sendme".into()))?;
676

            
677
        // NOTE: it's okay to await. We are only awaiting on the congestion_signals
678
        // future which *should* resolve immediately
679
        let signals = self.inbound_chan_tx.congestion_signals().await;
680

            
681
        let hops = self.hops.read().expect("poisoned lock");
682
        let hop = hops
683
            .get(hopnum)
684
            .ok_or_else(|| internal!("tried to send padding to non-existent hop?!"))?;
685

            
686
        // Update the CC object that we received a SENDME along
687
        // with possible congestion signals.
688
        hop.ccontrol
689
            .lock()
690
            .expect("poisoned lock")
691
            .note_sendme_received(&self.time_provider, tag, signals)?;
692

            
693
        Ok(())
694
    }
695

            
696
    /// Encode `msg` and encrypt it, returning the resulting cell
697
    /// and tag that should be expected for an authenticated SENDME sent
698
    /// in response to that cell.
699
    ///
700
    // TODO(DEDUP): duplicates the logic from the client-side Circuit::encode_relay_cell()
701
    fn encode_relay_cell(
702
        &mut self,
703
        relay_format: RelayCellFormat,
704
        hop: Option<HopNum>,
705
        early: bool,
706
        msg: AnyRelayMsgOuter,
707
    ) -> Result<(AnyChanMsg, SendmeTag)> {
708
        let mut body: RelayCellBody = msg
709
            .encode(relay_format, &mut rand::rng())
710
            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
711
            .into();
712
        let cmd = if early {
713
            ChanCmd::RELAY_EARLY
714
        } else {
715
            ChanCmd::RELAY
716
        };
717

            
718
        // Use the implementation-dependent encryption logic
719
        let tag = self.inner.encrypt_relay_cell(cmd, &mut body, hop);
720
        let msg = Relay::from(BoxedCellBody::from(body));
721
        let msg = if early {
722
            AnyChanMsg::RelayEarly(msg.into())
723
        } else {
724
            AnyChanMsg::Relay(msg)
725
        };
726

            
727
        Ok((msg, tag))
728
    }
729

            
730
    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
731
    ///
732
    /// If there is insufficient outgoing *circuit-level* or *stream-level*
733
    /// SENDME window, an error is returned instead.
734
    ///
735
    /// Does not check whether the cell is well-formed or reasonable.
736
    async fn send_relay_cell(
737
        &mut self,
738
        hop: Option<HopNum>,
739
        relay_cell_format: RelayCellFormat,
740
        msg: AnyRelayMsgOuter,
741
        early: bool,
742
        ccontrol: &Arc<Mutex<CongestionControl>>,
743
    ) -> Result<()> {
744
        self.send_relay_cell_inner(hop, relay_cell_format, msg, early, ccontrol, None)
745
            .await
746
    }
747

            
748
    /// As [`send_relay_cell`](Self::send_relay_cell), but takes an optional
749
    /// [`QueuedCellPaddingInfo`] in `padding_info`.
750
    ///
751
    /// If `padding_info` is None, `msg` must be non-padding: we report it as such to the
752
    /// padding controller.
753
    ///
754
    // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
755
    async fn send_relay_cell_inner(
756
        &mut self,
757
        hop: Option<HopNum>,
758
        relay_cell_format: RelayCellFormat,
759
        msg: AnyRelayMsgOuter,
760
        early: bool,
761
        ccontrol: &Arc<Mutex<CongestionControl>>,
762
        padding_info: Option<QueuedCellPaddingInfo>,
763
    ) -> Result<()> {
764
        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
765
        let (msg, tag) = self.encode_relay_cell(relay_cell_format, hop, early, msg)?;
766
        let cell = AnyChanCell::new(Some(self.circ_id), msg);
767

            
768
        // TODO: we use HopNum(0) if we're a relay (i.e. if the hop is None).
769
        // Is that ok?
770
        let hop = hop.unwrap_or_else(|| HopNum::from(0));
771
        // Remember that we've enqueued this cell.
772
        let padding_info = padding_info.or_else(|| self.padding_ctrl.queued_data(hop));
773

            
774
        // Note: this future is always `Ready`, because we checked the sink for readiness
775
        // before polling the async streams, so await won't block.
776
        Pin::new(&mut self.inbound_chan_tx)
777
            .send_unbounded((cell, padding_info))
778
            .await?;
779

            
780
        if c_t_w {
781
            ccontrol
782
                .lock()
783
                .expect("poisoned lock")
784
                .note_data_sent(&self.time_provider, &tag)?;
785
        }
786

            
787
        Ok(())
788
    }
789

            
790
    /// Handle a backward cell (moving from the exit towards the client).
791
    async fn handle_backward_cell(&mut self, cell: B::CircChanMsg) -> StdResult<(), ReactorError> {
792
        match self.inner.handle_backward_cell(self.unique_id, cell)? {
793
            BackwardCellDisposition::Forward(cell) => {
794
                let cell = AnyChanCell::new(Some(self.circ_id), cell);
795
                self.inbound_chan_tx
796
                    .send((cell, None))
797
                    .await
798
                    .map_err(ReactorError::Err)
799
            }
800
        }
801
    }
802
}
803

            
804
impl<B: BackwardHandler> Drop for BackwardReactor<B> {
805
    fn drop(&mut self) {
806
        // This will send a DESTROY down the inbound Tor channel
807
        let _ = self.channel.close_circuit(self.circ_id);
808
    }
809
}
810

            
811
/// A circuit event that must be handled by the [`BackwardReactor`].
812
enum CircuitEvent<M> {
813
    /// We received a cell that needs to be handled.
814
    ///
815
    /// The cell is client-bound if we are a relay, or exit-bound if we are a client).
816
    Cell(M),
817
    /// We received a RELAY cell from the stream reactor that needs
818
    /// to be packaged and written to our Tor channel.
819
    ///
820
    /// The message is client-bound if we are a relay, or exit-bound if we are a client).
821
    Send(ReadyStreamMsg),
822
    /// We received a cell from the ForwardReactor that we need to handle.
823
    ///
824
    /// This might be
825
    ///
826
    ///   * a circuit-level SENDME that we have received, or
827
    ///   * a circuit-level SENDME that we need to deliver to the client
828
    Forwarded(BackwardReactorCmd),
829
    /// The forward reactor has shut down.
830
    ///
831
    /// We need to shut down too.
832
    ForwardShutdown,
833
    /// Protocol violation.
834
    ///
835
    /// This can happen if we receive a channel message that is not supported on the channel.
836
    ProtoViolation(Error),
837
}
838

            
839
/// Instructions from the forward reactor.
840
pub(crate) enum BackwardReactorCmd {
841
    /// A circuit SENDME we received from the other endpoint.
842
    HandleSendme {
843
        /// The hop the SENDME came on.
844
        hop: Option<HopNum>,
845
        /// The SENDME.
846
        sendme: Sendme,
847
    },
848
    /// A message we need to send back to the other endpoint.
849
    SendRelayMsg {
850
        /// The hop to encode the message for.
851
        hop: Option<HopNum>,
852
        /// The message to send.
853
        msg: AnyRelayMsgOuter,
854
    },
855
    /// This relay circuit was extended by another hop.
856
    ///
857
    /// This causes the reactor send the `extended2` message on its inbound channel,
858
    /// and start reading from `outbound_chan_rx` in the main loop.
859
    //
860
    ///
861
    // TODO: I wish we didn't need to expose this relay-specific variant
862
    // in the generic reactor but we have no choice: abstracting it away
863
    // means either introducing a mutex between the relay-side forward/backward
864
    // handlers, or yet another mpsc between them.
865
    #[cfg(feature = "relay")]
866
    HandleCircuitExtended {
867
        /// The hop to encode the message for.
868
        ///
869
        /// In practice, this is always None, because only relays use this.
870
        hop: Option<HopNum>,
871
        /// The cell to send to the specified hop,
872
        extended2: Extended2,
873
        /// The reading end of the outbound Tor channel, if we are not the last hop.
874
        ///
875
        /// Yields cells moving from the exit towards the client, if we are a middle relay.
876
        outbound_chan_rx: CircuitRxReceiver,
877
    },
878
}