1
//! Code to handle incoming cells on a circuit.
2
//!
3
//! ## On message validation
4
//!
5
//! There are three steps for validating an incoming message on a stream:
6
//!
7
//! 1. Is the message contextually appropriate? (e.g., no more than one
8
//!    `CONNECTED` message per stream.) This is handled by calling
9
//!    [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10
//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11
//!    than we've sent data for.) This is handled within the reactor by the
12
//!    `StreamFlowCtrl`. For half-closed streams which don't send stream
13
//!    SENDMEs, an additional receive-window check is performed in the
14
//!    `halfstream` module.
15
//! 3. Does the message have an acceptable command type, and is the message
16
//!    well-formed? For open streams, the streams themselves handle this check.
17
//!    For half-closed streams, the reactor handles it by calling
18
//!    `consume_checked_msg()`.
19

            
20
pub(crate) mod circuit;
21
mod conflux;
22
mod control;
23

            
24
use crate::circuit::circhop::SendRelayCell;
25
use crate::circuit::{CircuitRxReceiver, UniqId};
26
use crate::client::circuit::ClientCircChanMsg;
27
use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
28
use crate::client::{HopLocation, TargetHop};
29
use crate::crypto::cell::HopNum;
30
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
31
use crate::memquota::CircuitAccount;
32
use crate::stream::CloseStreamBehavior;
33
use crate::streammap;
34
use crate::tunnel::{TunnelId, TunnelScopedCircId};
35
use crate::util::err::ReactorError;
36
use crate::util::skew::ClockSkew;
37
use crate::util::timeout::TimeoutEstimator;
38
use crate::{Error, Result};
39
use circuit::Circuit;
40
use conflux::ConfluxSet;
41
use control::ControlHandler;
42
use std::cmp::Ordering;
43
use std::collections::BinaryHeap;
44
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
45
use tor_cell::relaycell::msg::Sendme;
46
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
47
use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
48
use tor_rtcompat::{DynTimeProvider, SleepProvider};
49

            
50
use cfg_if::cfg_if;
51
use futures::StreamExt;
52
use futures::channel::mpsc;
53
use futures::{FutureExt as _, select_biased};
54
use oneshot_fused_workaround as oneshot;
55

            
56
use std::result::Result as StdResult;
57
use std::sync::Arc;
58
use std::time::Duration;
59

            
60
use crate::channel::Channel;
61
use crate::conflux::msghandler::RemoveLegReason;
62
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
63
use circuit::CircuitCmd;
64
use derive_more::From;
65
use smallvec::smallvec;
66
use tor_cell::chancell::CircId;
67
use tor_llcrypto::pk;
68
use tracing::{debug, info, instrument, trace, warn};
69

            
70
use super::circuit::{MutableState, TunnelMutableState};
71
use crate::circuit::reactor::ReactorResultChannel;
72

            
73
#[cfg(feature = "hs-service")]
74
use crate::stream::incoming::IncomingStreamRequestHandler;
75

            
76
#[cfg(feature = "conflux")]
77
use {
78
    crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
79
    crate::util::err::ConfluxHandshakeError,
80
};
81

            
82
pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
83

            
84
/// Contains a list of conflux handshake results.
85
#[cfg(feature = "conflux")]
86
pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
87

            
88
/// The type of oneshot channel used to inform reactor users of the outcome
89
/// of a client-side conflux handshake.
90
///
91
/// Contains a list of handshake results, one for each circuit that we were asked
92
/// to link in the tunnel.
93
#[cfg(feature = "conflux")]
94
pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
95

            
96
/// A handshake type, to be used when creating circuit hops.
97
#[derive(Clone, Debug)]
98
pub(crate) enum CircuitHandshake {
99
    /// Use the CREATE_FAST handshake.
100
    CreateFast,
101
    /// Use the ntor handshake.
102
    Ntor {
103
        /// The public key of the relay.
104
        public_key: NtorPublicKey,
105
        /// The Ed25519 identity of the relay, which is verified against the
106
        /// identity held in the circuit's channel.
107
        ed_identity: pk::ed25519::Ed25519Identity,
108
    },
109
    /// Use the ntor-v3 handshake.
110
    NtorV3 {
111
        /// The public key of the relay.
112
        public_key: NtorV3PublicKey,
113
    },
114
}
115

            
116
// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
117
// proliferation is a bit bothersome, but unavoidable with the current design.
118
//
119
// We should consider getting rid of some of these enums (if possible),
120
// and coming up with more intuitive names.
121

            
122
/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
123
#[derive(From, Debug)]
124
#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
125
enum RunOnceCmd {
126
    /// Run a single `RunOnceCmdInner` command.
127
    Single(RunOnceCmdInner),
128
    /// Run multiple `RunOnceCmdInner` commands.
129
    //
130
    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
131
    // but most of the time we're only going to have *one* RunOnceCmdInner
132
    // to run per run_once() loop. The enum enables us avoid the extra heap
133
    // allocation for the `RunOnceCmd::Single` case.
134
    Multiple(Vec<RunOnceCmdInner>),
135
}
136

            
137
/// Instructions for running something in the reactor loop.
138
///
139
/// Run at the end of [`Reactor::run_once`].
140
//
141
// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
142
// We should consider making each variant a tuple variant and deduplicating the fields.
143
#[derive(educe::Educe)]
144
#[educe(Debug)]
145
enum RunOnceCmdInner {
146
    /// Send a RELAY cell.
147
    Send {
148
        /// The leg the cell should be sent on.
149
        leg: UniqId,
150
        /// The cell to send.
151
        cell: SendRelayCell,
152
        /// A channel for sending completion notifications.
153
        done: Option<ReactorResultChannel<()>>,
154
    },
155
    /// Send a given control message on this circuit, and install a control-message handler to
156
    /// receive responses.
157
    #[cfg(feature = "send-control-msg")]
158
    SendMsgAndInstallHandler {
159
        /// The message to send, if any
160
        msg: Option<AnyRelayMsgOuter>,
161
        /// A message handler to install.
162
        ///
163
        /// If this is `None`, there must already be a message handler installed
164
        #[educe(Debug(ignore))]
165
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
166
        /// A sender that we use to tell the caller that the message was sent
167
        /// and the handler installed.
168
        done: oneshot::Sender<Result<()>>,
169
    },
170
    /// Handle a SENDME message.
171
    HandleSendMe {
172
        /// The leg the SENDME was received on.
173
        leg: UniqId,
174
        /// The hop number.
175
        hop: HopNum,
176
        /// The SENDME message to handle.
177
        sendme: Sendme,
178
    },
179
    /// Begin a stream with the provided hop in this circuit.
180
    ///
181
    /// Uses the provided stream ID, and sends the provided message to that hop.
182
    BeginStream {
183
        /// The cell to send.
184
        cell: Result<(SendRelayCell, StreamId)>,
185
        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
186
        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
187
        /// caller.
188
        hop: HopLocation,
189
        /// The circuit leg to begin the stream on.
190
        leg: UniqId,
191
        /// Oneshot channel to notify on completion, with the allocated stream ID.
192
        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
193
    },
194
    /// Consider sending an XON message with the given `rate`.
195
    MaybeSendXon {
196
        /// The drain rate to advertise in the XON message.
197
        rate: XonKbpsEwma,
198
        /// The ID of the stream to send the message on.
199
        stream_id: StreamId,
200
        /// The location of the hop on the tunnel.
201
        hop: HopLocation,
202
    },
203
    /// Close the specified stream.
204
    CloseStream {
205
        /// The hop number.
206
        hop: HopLocation,
207
        /// The ID of the stream to close.
208
        sid: StreamId,
209
        /// The stream-closing behavior.
210
        behav: CloseStreamBehavior,
211
        /// The reason for closing the stream.
212
        reason: streammap::TerminateReason,
213
        /// A channel for sending completion notifications.
214
        done: Option<ReactorResultChannel<()>>,
215
    },
216
    /// Get the clock skew claimed by the first hop of the circuit.
217
    FirstHopClockSkew {
218
        /// Oneshot channel to return the clock skew.
219
        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
220
    },
221
    /// Remove a circuit leg from the conflux set.
222
    RemoveLeg {
223
        /// The circuit leg to remove.
224
        leg: UniqId,
225
        /// The reason for removal.
226
        ///
227
        /// This is only used for conflux circuits that get removed
228
        /// before the conflux handshake is complete.
229
        ///
230
        /// The [`RemoveLegReason`] is mapped by the reactor to a
231
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
232
        /// handshake to indicate the reason the handshake failed.
233
        reason: RemoveLegReason,
234
    },
235
    /// A circuit has completed the conflux handshake,
236
    /// and wants to send the specified cell.
237
    ///
238
    /// This is similar to [`RunOnceCmdInner::Send`],
239
    /// but needs to remain a separate variant,
240
    /// because in addition to instructing the reactor to send a cell,
241
    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
242
    /// This enables the reactor to save the handshake result (`Ok(())`),
243
    /// and, if there are no other legs still in the handshake phase,
244
    /// send the result to the handshake initiator.
245
    #[cfg(feature = "conflux")]
246
    ConfluxHandshakeComplete {
247
        /// The circuit leg that has completed the handshake,
248
        /// This is the leg the cell should be sent on.
249
        leg: UniqId,
250
        /// The cell to send.
251
        cell: SendRelayCell,
252
    },
253
    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
254
    #[cfg(feature = "conflux")]
255
    Link {
256
        /// The circuits to link into the tunnel
257
        #[educe(Debug(ignore))]
258
        circuits: Vec<Circuit>,
259
        /// Oneshot channel for notifying of conflux handshake completion.
260
        answer: ConfluxLinkResultChannel,
261
    },
262
    /// Enqueue an out-of-order cell in ooo_msg.
263
    #[cfg(feature = "conflux")]
264
    Enqueue {
265
        /// The leg the entry originated from.
266
        leg: UniqId,
267
        /// The out-of-order message.
268
        msg: OooRelayMsg,
269
    },
270
    /// Take a padding-related event on a circuit leg.
271
    #[cfg(feature = "circ-padding")]
272
    PaddingAction {
273
        /// The leg to event on.
274
        leg: UniqId,
275
        /// The event to take.
276
        padding_event: PaddingEvent,
277
    },
278
    /// Perform a clean shutdown on this circuit.
279
    CleanShutdown,
280
}
281

            
282
impl RunOnceCmdInner {
283
    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
284
4402
    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
285
92
        match cmd {
286
4180
            CircuitCmd::Send(cell) => Self::Send {
287
4180
                leg,
288
4180
                cell,
289
4180
                done: None,
290
4180
            },
291
44
            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
292
            CircuitCmd::CloseStream {
293
58
                hop,
294
58
                sid,
295
58
                behav,
296
58
                reason,
297
58
            } => Self::CloseStream {
298
58
                hop: HopLocation::Hop((leg, hop)),
299
58
                sid,
300
58
                behav,
301
58
                reason,
302
58
                done: None,
303
58
            },
304
            #[cfg(feature = "conflux")]
305
24
            CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
306
            #[cfg(feature = "conflux")]
307
68
            CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
308
68
                let cell = SendRelayCell {
309
68
                    hop: Some(hop),
310
68
                    early,
311
68
                    cell,
312
68
                };
313
68
                Self::ConfluxHandshakeComplete { leg, cell }
314
            }
315
            #[cfg(feature = "conflux")]
316
16
            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
317
12
            CircuitCmd::CleanShutdown => Self::CleanShutdown,
318
        }
319
4402
    }
320
}
321

            
322
/// A command to execute at the end of [`Reactor::run_once`].
323
#[derive(From, Debug)]
324
#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
325
enum CircuitEvent {
326
    /// Run a single `CircuitCmd` command.
327
    RunCmd {
328
        /// The unique identifier of the circuit leg to run the command on
329
        leg: UniqId,
330
        /// The command to run.
331
        cmd: CircuitCmd,
332
    },
333
    /// Handle a control message
334
    HandleControl(CtrlMsg),
335
    /// Handle an input message.
336
    HandleCell {
337
        /// The unique identifier of the circuit leg the message was received on.
338
        leg: UniqId,
339
        /// The message to handle.
340
        cell: ClientCircChanMsg,
341
    },
342
    /// Remove the specified circuit leg from the conflux set.
343
    ///
344
    /// Returned whenever a single circuit leg needs to be be removed
345
    /// from the reactor's conflux set, without necessarily tearing down
346
    /// the whole set or shutting down the reactor.
347
    ///
348
    /// Note: this event *can* cause the reactor to shut down
349
    /// (and the conflux set to be closed).
350
    ///
351
    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
352
    RemoveLeg {
353
        /// The leg to remove.
354
        leg: UniqId,
355
        /// The reason for removal.
356
        ///
357
        /// This is only used for conflux circuits that get removed
358
        /// before the conflux handshake is complete.
359
        ///
360
        /// The [`RemoveLegReason`] is mapped by the reactor to a
361
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
362
        /// handshake to indicate the reason the handshake failed.
363
        reason: RemoveLegReason,
364
    },
365
    /// Take some event (blocking or unblocking a circuit, or sending padding)
366
    /// based on the circuit padding backend code.
367
    PaddingAction {
368
        /// The leg on which to take the padding event .
369
        leg: UniqId,
370
        /// The event to take.
371
        padding_event: PaddingEvent,
372
    },
373
    /// Protocol violation. This leads for now to the close of the circuit reactor. The
374
    /// error causing the violation is set in err.
375
    ProtoViolation {
376
        /// The error that causes this protocol violation.
377
        err: crate::Error,
378
    },
379
}
380

            
381
impl CircuitEvent {
382
    /// Return the ordering with which we should handle this event
383
    /// within a list of events returned by a single call to next_circ_event().
384
    ///
385
    /// NOTE: Please do not make this any more complicated:
386
    /// It is a consequence of a kludge that we need this sorting at all.
387
    /// Assuming that eventually, we switch away from the current
388
    /// poll-oriented `next_circ_event` design,
389
    /// we may be able to get rid of this entirely.
390
88
    fn order_within_batch(&self) -> u8 {
391
        use CircuitEvent as CA;
392
        use PaddingEvent as PE;
393
        // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
394
        // only used by the protocol violation event which leads to a shutdown of the reactor.
395
        const IMMEDIATE: u8 = 0;
396
        const EARLY: u8 = 1;
397
        const NORMAL: u8 = 2;
398
        const LATE: u8 = 3;
399

            
400
        // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
401
        // "StopBlocking" to the start.
402
        //
403
        // This way, we can be sure that we will handle any "send data" operations
404
        // (and tell the Padder about them) _before_  we tell the Padder
405
        // that we have blocked the circuit.
406
        //
407
        // This keeps things a bit more logical.
408
88
        match self {
409
22
            CA::RunCmd { .. } => NORMAL,
410
            CA::HandleControl(..) => NORMAL,
411
66
            CA::HandleCell { .. } => NORMAL,
412
            CA::RemoveLeg { .. } => NORMAL,
413
            #[cfg(feature = "circ-padding")]
414
            CA::PaddingAction { padding_event, .. } => match padding_event {
415
                PE::StopBlocking => EARLY,
416
                PE::SendPadding(..) => NORMAL,
417
                PE::StartBlocking(..) => LATE,
418
            },
419
            #[cfg(not(feature = "circ-padding"))]
420
            CA::PaddingAction { .. } => NORMAL,
421
            CA::ProtoViolation { .. } => IMMEDIATE,
422
        }
423
88
    }
424
}
425

            
426
/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
427
/// progress.
428
///
429
/// # Background
430
///
431
/// The `Reactor` can't have async functions that send and receive cells, because its job is to
432
/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
433
///
434
/// To get around this problem, the reactor can send some cells, and then make one of these
435
/// `MetaCellHandler` objects, which will be run when the reply arrives.
436
pub(crate) trait MetaCellHandler: Send {
437
    /// The hop we're expecting the message to come from. This is compared against the hop
438
    /// from which we actually receive messages, and an error is thrown if the two don't match.
439
    fn expected_hop(&self) -> HopLocation;
440
    /// Called when the message we were waiting for arrives.
441
    ///
442
    /// Gets a copy of the `Reactor` in order to do anything it likes there.
443
    ///
444
    /// If this function returns an error, the reactor will shut down.
445
    fn handle_msg(
446
        &mut self,
447
        msg: UnparsedRelayMsg,
448
        reactor: &mut Circuit,
449
    ) -> Result<MetaCellDisposition>;
450
}
451

            
452
/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
453
#[derive(Debug, Clone)]
454
#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
455
#[non_exhaustive]
456
pub(crate) enum MetaCellDisposition {
457
    /// The message was consumed; the handler should remain installed.
458
    #[cfg(feature = "send-control-msg")]
459
    Consumed,
460
    /// The message was consumed; the handler should be uninstalled.
461
    ConversationFinished,
462
    /// The message was consumed; the circuit should be closed.
463
    #[cfg(feature = "send-control-msg")]
464
    CloseCirc,
465
    // TODO: Eventually we might want the ability to have multiple handlers
466
    // installed, and to let them say "not for me, maybe for somebody else?".
467
    // But right now we don't need that.
468
}
469

            
470
/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
471
///
472
/// This is a macro instead of a function to work around borrowck errors
473
/// in the select! from run_once().
474
macro_rules! unwrap_or_shutdown {
475
    ($self:expr, $res:expr, $reason:expr) => {{
476
        match $res {
477
            None => {
478
                trace!(
479
                    tunnel_id = %$self.tunnel_id,
480
                    reason = %$reason,
481
                    "reactor shutdown"
482
                );
483
                Err(ReactorError::Shutdown)
484
            }
485
            Some(v) => Ok(v),
486
        }
487
    }};
488
}
489

            
490
/// Object to handle incoming cells and background tasks on a circuit
491
///
492
/// This type is returned when you finish a circuit; you need to spawn a
493
/// new task that calls `run()` on it.
494
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
495
pub struct Reactor {
496
    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
497
    ///
498
    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
499
    /// is ready to accept cells.
500
    control: mpsc::UnboundedReceiver<CtrlMsg>,
501
    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
502
    ///
503
    /// This channel is polled in [`Reactor::run_once`].
504
    ///
505
    /// NOTE: this is a separate channel from `control`, because some messages
506
    /// have higher priority and need to be handled even if the `chan_sender` is not
507
    /// ready (whereas `control` messages are not read until the `chan_sender` sink
508
    /// is ready to accept cells).
509
    command: mpsc::UnboundedReceiver<CtrlCmd>,
510
    /// A oneshot sender that is used to alert other tasks when this reactor is
511
    /// finally dropped.
512
    ///
513
    /// It is a sender for Void because we never actually want to send anything here;
514
    /// we only want to generate canceled events.
515
    #[allow(dead_code)] // the only purpose of this field is to be dropped.
516
    reactor_closed_tx: oneshot::Sender<void::Void>,
517
    /// A set of circuits that form a tunnel.
518
    ///
519
    /// Contains 1 or more circuits.
520
    ///
521
    /// Circuits may be added to this set throughout the lifetime of the reactor.
522
    ///
523
    /// Sometimes, the reactor will remove circuits from this set,
524
    /// for example if the `LINKED` message takes too long to arrive,
525
    /// or if congestion control negotiation fails.
526
    /// The reactor will continue running with the remaining circuits.
527
    /// It will shut down if *all* the circuits are removed.
528
    ///
529
    // TODO(conflux): document all the reasons why the reactor might
530
    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
531
    circuits: ConfluxSet,
532
    /// An identifier for logging about this tunnel reactor.
533
    tunnel_id: TunnelId,
534
    /// Handlers, shared with `Circuit`.
535
    cell_handlers: CellHandlers,
536
    /// The time provider, used for conflux handshake timeouts.
537
    runtime: DynTimeProvider,
538
    /// The conflux handshake context, if there is an on-going handshake.
539
    ///
540
    /// Set to `None` if this is a single-path tunnel,
541
    /// or if none of the circuit legs from our conflux set
542
    /// are currently in the conflux handshake phase.
543
    #[cfg(feature = "conflux")]
544
    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
545
    /// A min-heap buffering all the out-of-order messages received so far.
546
    ///
547
    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
548
    /// on its size. We should make this participate in the memquota memory
549
    /// tracking system, somehow.
550
    #[cfg(feature = "conflux")]
551
    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
552
}
553

            
554
/// The context for an on-going conflux handshake.
555
#[cfg(feature = "conflux")]
556
struct ConfluxHandshakeCtx {
557
    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
558
    answer: ConfluxLinkResultChannel,
559
    /// The number of legs that are currently doing the handshake.
560
    num_legs: usize,
561
    /// The handshake results we have collected so far.
562
    results: ConfluxHandshakeResult,
563
}
564

            
565
/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
566
#[derive(Debug)]
567
#[cfg(feature = "conflux")]
568
struct ConfluxHeapEntry {
569
    /// The leg id this message came from.
570
    leg_id: UniqId,
571
    /// The out of order message
572
    msg: OooRelayMsg,
573
}
574

            
575
#[cfg(feature = "conflux")]
576
impl Ord for ConfluxHeapEntry {
577
4
    fn cmp(&self, other: &Self) -> Ordering {
578
4
        self.msg.cmp(&other.msg)
579
4
    }
580
}
581

            
582
#[cfg(feature = "conflux")]
583
impl PartialOrd for ConfluxHeapEntry {
584
4
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
585
4
        Some(self.cmp(other))
586
4
    }
587
}
588

            
589
#[cfg(feature = "conflux")]
590
impl PartialEq for ConfluxHeapEntry {
591
    fn eq(&self, other: &Self) -> bool {
592
        self.msg == other.msg
593
    }
594
}
595

            
596
#[cfg(feature = "conflux")]
597
impl Eq for ConfluxHeapEntry {}
598

            
599
/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
600
struct CellHandlers {
601
    /// A handler for a meta cell, together with a result channel to notify on completion.
602
    ///
603
    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
604
    ///
605
    /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
606
    /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
607
    /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
608
    /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
609
    /// (which are handled separately) or conflux cells
610
    /// (which are handled by the conflux handlers).
611
    ///
612
    /// The handler is uninstalled after the receipt of the EXTENDED cell,
613
    /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
614
    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
615
    /// A handler for incoming stream requests.
616
    #[cfg(feature = "hs-service")]
617
    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
618
}
619

            
620
impl Reactor {
621
    /// Create a new circuit reactor.
622
    ///
623
    /// The reactor will send outbound messages on `channel`, receive incoming
624
    /// messages on `input`, and identify this circuit by the channel-local
625
    /// [`CircId`] provided.
626
    ///
627
    /// The internal unique identifier for this circuit will be `unique_id`.
628
    #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
629
376
    pub(super) fn new(
630
376
        channel: Arc<Channel>,
631
376
        channel_id: CircId,
632
376
        unique_id: UniqId,
633
376
        input: CircuitRxReceiver,
634
376
        runtime: DynTimeProvider,
635
376
        memquota: CircuitAccount,
636
376
        padding_ctrl: PaddingController,
637
376
        padding_stream: PaddingEventStream,
638
376
        timeouts: Arc<dyn TimeoutEstimator + Send>,
639
376
    ) -> (
640
376
        Self,
641
376
        mpsc::UnboundedSender<CtrlMsg>,
642
376
        mpsc::UnboundedSender<CtrlCmd>,
643
376
        oneshot::Receiver<void::Void>,
644
376
        Arc<TunnelMutableState>,
645
376
    ) {
646
376
        let tunnel_id = TunnelId::next();
647
376
        let (control_tx, control_rx) = mpsc::unbounded();
648
376
        let (command_tx, command_rx) = mpsc::unbounded();
649
376
        let mutable = Arc::new(MutableState::default());
650

            
651
376
        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
652

            
653
376
        let cell_handlers = CellHandlers {
654
376
            meta_handler: None,
655
376
            #[cfg(feature = "hs-service")]
656
376
            incoming_stream_req_handler: None,
657
376
        };
658

            
659
376
        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
660
376
        let circuit_leg = Circuit::new(
661
376
            runtime.clone(),
662
376
            channel,
663
376
            channel_id,
664
376
            unique_id,
665
376
            input,
666
376
            memquota,
667
376
            Arc::clone(&mutable),
668
376
            padding_ctrl,
669
376
            padding_stream,
670
376
            timeouts,
671
        );
672

            
673
376
        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
674

            
675
376
        let reactor = Reactor {
676
376
            circuits,
677
376
            control: control_rx,
678
376
            command: command_rx,
679
376
            reactor_closed_tx,
680
376
            tunnel_id,
681
376
            cell_handlers,
682
376
            runtime,
683
376
            #[cfg(feature = "conflux")]
684
376
            conflux_hs_ctx: None,
685
376
            #[cfg(feature = "conflux")]
686
376
            ooo_msgs: Default::default(),
687
376
        };
688

            
689
376
        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
690
376
    }
691

            
692
    /// Launch the reactor, and run until the circuit closes or we
693
    /// encounter an error.
694
    ///
695
    /// Once this method returns, the circuit is dead and cannot be
696
    /// used again.
697
    #[instrument(level = "trace", skip_all)]
698
564
    pub async fn run(mut self) -> Result<()> {
699
        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
700
        let result: Result<()> = loop {
701
            match self.run_once().await {
702
                Ok(()) => (),
703
                Err(ReactorError::Shutdown) => break Ok(()),
704
                Err(ReactorError::Err(e)) => break Err(e),
705
            }
706
        };
707

            
708
        // Log that the reactor stopped, possibly with the associated error as a report.
709
        // May log at a higher level depending on the error kind.
710
        const MSG: &str = "Tunnel reactor stopped";
711
        match &result {
712
            Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
713
            Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
714
        }
715

            
716
        result
717
292
    }
718

            
719
    /// Helper for run: doesn't mark the circuit closed on finish.  Only
720
    /// processes one cell or control message.
721
    #[instrument(level = "trace", skip_all)]
722
9504
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
723
        // If all the circuits are closed, shut down the reactor
724
        if self.circuits.is_empty() {
725
            trace!(
726
                tunnel_id = %self.tunnel_id,
727
                "Tunnel reactor shutting down: all circuits have closed",
728
            );
729

            
730
            return Err(ReactorError::Shutdown);
731
        }
732

            
733
        // If this is a single path circuit, we need to wait until the first hop
734
        // is created before doing anything else
735
        let single_path_with_hops = self
736
            .circuits
737
            .single_leg_mut()
738
4886
            .is_ok_and(|leg| !leg.has_hops());
739
        if single_path_with_hops {
740
            self.wait_for_create().await?;
741

            
742
            return Ok(());
743
        }
744

            
745
        // Prioritize the buffered messages.
746
        //
747
        // Note: if any of the messages are ready to be handled,
748
        // this will block the reactor until we are done processing them
749
        //
750
        // TODO circpad: If this is a problem, we might want to re-order things so that we
751
        // prioritize padding instead.  On the other hand, this should be fixed by refactoring
752
        // circuit and tunnel reactors, so we might do well to just leave it alone for now.
753
        #[cfg(feature = "conflux")]
754
        self.try_dequeue_ooo_msgs().await?;
755

            
756
        let mut events = select_biased! {
757
            res = self.command.next() => {
758
                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
759
                return ControlHandler::new(self).handle_cmd(cmd);
760
            },
761
            // Check whether we've got a control message pending.
762
            //
763
            // Note: unfortunately, reading from control here means we might start
764
            // handling control messages before our chan_senders are ready.
765
            // With the current design, this is inevitable: we can't know which circuit leg
766
            // a control message is meant for without first reading the control message from
767
            // the channel, and at that point, we can't know for sure whether that particular
768
            // circuit is ready for sending.
769
            ret = self.control.next() => {
770
                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
771
                smallvec![CircuitEvent::HandleControl(msg)]
772
            },
773
            res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
774
        };
775

            
776
        // Put the events into the order that we need to execute them in.
777
        //
778
        // (Yes, this _does_ have to be a stable sort.  Not all events may be freely re-ordered
779
        // with respect to one another.)
780
88
        events.sort_by_key(|a| a.order_within_batch());
781

            
782
        for event in events {
783
            let cmd = match event {
784
                CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
785
                    RunOnceCmdInner::from_circuit_cmd(leg, cmd),
786
                )),
787
                CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
788
                    .handle_msg(ctrl)?
789
                    .map(RunOnceCmd::Single),
790
                CircuitEvent::HandleCell { leg, cell } => {
791
                    let circ = self
792
                        .circuits
793
                        .leg_mut(leg)
794
                        .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
795

            
796
                    let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
797
                    if circ_cmds.is_empty() {
798
                        None
799
                    } else {
800
                        // TODO: we return RunOnceCmd::Multiple even if there's a single command.
801
                        //
802
                        // See the TODO on `Circuit::handle_cell`.
803
                        let cmd = RunOnceCmd::Multiple(
804
                            circ_cmds
805
                                .into_iter()
806
164
                                .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
807
                                .collect(),
808
                        );
809

            
810
                        Some(cmd)
811
                    }
812
                }
813
                CircuitEvent::RemoveLeg { leg, reason } => {
814
                    Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
815
                }
816
                CircuitEvent::PaddingAction { leg, padding_event } => {
817
                    cfg_if! {
818
                        if #[cfg(feature = "circ-padding")] {
819
                            Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
820
                        } else {
821
                            // If padding isn't enabled, we never generate a padding event,
822
                            // so we can be sure this case will never be called.
823
                            void::unreachable(padding_event.0);
824
                        }
825
                    }
826
                }
827
                CircuitEvent::ProtoViolation { err } => {
828
                    return Err(err.into());
829
                }
830
            };
831

            
832
            if let Some(cmd) = cmd {
833
                self.handle_run_once_cmd(cmd).await?;
834
            }
835
        }
836

            
837
        Ok(())
838
6284
    }
839

            
840
    /// Try to process the previously-out-of-order messages we might have buffered.
841
    #[cfg(feature = "conflux")]
842
    #[instrument(level = "trace", skip_all)]
843
8940
    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
844
        // Check if we're ready to dequeue any of the previously out-of-order cells.
845
        while let Some(entry) = self.ooo_msgs.peek() {
846
            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
847

            
848
            if !should_pop {
849
                break;
850
            }
851

            
852
            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
853

            
854
            let circ = self
855
                .circuits
856
                .leg_mut(entry.leg_id)
857
                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
858
            let handlers = &mut self.cell_handlers;
859
            let cmd = circ
860
                .handle_in_order_relay_msg(
861
                    handlers,
862
                    entry.msg.hopnum,
863
                    entry.leg_id,
864
                    entry.msg.cell_counts_towards_windows,
865
                    entry.msg.streamid,
866
                    entry.msg.msg,
867
                )?
868
                .map(|cmd| {
869
                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
870
                });
871

            
872
            if let Some(cmd) = cmd {
873
                self.handle_run_once_cmd(cmd).await?;
874
            }
875
        }
876

            
877
        Ok(())
878
5960
    }
879

            
880
    /// Handle a [`RunOnceCmd`].
881
    #[instrument(level = "trace", skip_all)]
882
6978
    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
883
4652
        match cmd {
884
            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
885
            RunOnceCmd::Multiple(cmds) => {
886
                // While we know `sendable` is ready to accept *one* cell,
887
                // we can't be certain it will be able to accept *all* of the cells
888
                // that need to be sent here. This means we *may* end up buffering
889
                // in its underlying SometimesUnboundedSink! That is OK, because
890
                // RunOnceCmd::Multiple is only used for handling packed cells.
891
                for cmd in cmds {
892
                    self.handle_single_run_once_cmd(cmd).await?;
893
                }
894
            }
895
        }
896

            
897
        Ok(())
898
4652
    }
899

            
900
    /// Handle a [`RunOnceCmd`].
901
    #[instrument(level = "trace", skip_all)]
902
4652
    async fn handle_single_run_once_cmd(
903
4652
        &mut self,
904
4652
        cmd: RunOnceCmdInner,
905
6978
    ) -> StdResult<(), ReactorError> {
906
4652
        match cmd {
907
            RunOnceCmdInner::Send { leg, cell, done } => {
908
                // TODO: check the cc window
909
                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
910
                if let Some(done) = done {
911
                    // Don't care if the receiver goes away
912
                    let _ = done.send(res.clone());
913
                }
914
                res?;
915
            }
916
            #[cfg(feature = "send-control-msg")]
917
            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
918
                let cell: Result<Option<SendRelayCell>> =
919
                    self.prepare_msg_and_install_handler(msg, handler);
920

            
921
                match cell {
922
                    Ok(Some(cell)) => {
923
                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
924
                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
925
                        // don't care if receiver goes away.
926
                        let _ = done.send(outcome.clone());
927
                        outcome?;
928
                    }
929
                    Ok(None) => {
930
                        // don't care if receiver goes away.
931
                        let _ = done.send(Ok(()));
932
                    }
933
                    Err(e) => {
934
                        // don't care if receiver goes away.
935
                        let _ = done.send(Err(e.clone()));
936
                        return Err(e.into());
937
                    }
938
                }
939
            }
940
            RunOnceCmdInner::BeginStream {
941
                leg,
942
                cell,
943
                hop,
944
                done,
945
            } => {
946
                match cell {
947
                    Ok((cell, stream_id)) => {
948
                        let circ = self
949
                            .circuits
950
                            .leg_mut(leg)
951
                            .ok_or_else(|| internal!("leg disappeared?!"))?;
952
                        let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
953
                        let relay_format = circ
954
                            .hop_mut(cell_hop)
955
                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
956
                            .ok_or(Error::NoSuchHop)?
957
                            .relay_cell_format();
958

            
959
                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
960
                        // don't care if receiver goes away.
961
96
                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
962
                        outcome?;
963
                    }
964
                    Err(e) => {
965
                        // don't care if receiver goes away.
966
                        let _ = done.send(Err(e.clone()));
967
                        return Err(e.into());
968
                    }
969
                }
970
            }
971
            RunOnceCmdInner::CloseStream {
972
                hop,
973
                sid,
974
                behav,
975
                reason,
976
                done,
977
            } => {
978
                let result = {
979
                    let (leg_id, hop_num) = self
980
                        .resolve_hop_location(hop)
981
                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
982
                    let leg = self
983
                        .circuits
984
                        .leg_mut(leg_id)
985
                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
986
                    Ok::<_, Bug>((leg, hop_num))
987
                };
988

            
989
                let (leg, hop_num) = match result {
990
                    Ok(x) => x,
991
                    Err(e) => {
992
                        if let Some(done) = done {
993
                            // don't care if the sender goes away
994
                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
995
                            let _ = done.send(Err(e.into()));
996
                        }
997
                        return Ok(());
998
                    }
999
                };
                let max_rtt = {
                    let hop = leg
                        .hop(hop_num)
                        .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
                    let ccontrol = hop.ccontrol();
                    // Note: if we have no measurements for the RTT, this will be set to 0,
                    // and the timeout will be 2 * CBT.
                    ccontrol
                        .rtt()
                        .max_rtt_usec()
                        .map(|rtt| Duration::from_millis(u64::from(rtt)))
                        .unwrap_or_default()
                };
                // The length of the circuit up until the hop that has the half-streeam.
                //
                // +1, because HopNums are zero-based.
                let circ_len = usize::from(hop_num) + 1;
                // We double the CBT to account for rend circuits,
                // which are twice as long (otherwise we risk expiring
                // the rend half-streams too soon).
                let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
                let expire_at = self.runtime.now() + timeout;
                let res: Result<()> = leg
                    .close_stream(hop_num, sid, behav, reason, expire_at)
                    .await;
                if let Some(done) = done {
                    // don't care if the sender goes away
                    let _ = done.send(res);
                }
            }
            RunOnceCmdInner::MaybeSendXon {
                rate,
                stream_id,
                hop,
            } => {
                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
                    Ok(x) => x,
                    Err(NoJoinPointError) => {
                        // A stream tried to send an XON message message to the join point of
                        // a tunnel that has never had a join point. Currently in arti, only a
                        // `StreamTarget` asks us to send an XON message, and this tunnel
                        // originally created the `StreamTarget` to begin with. So this is a
                        // legitimate bug somewhere in the tunnel code.
                        return Err(
                            internal!(
                                "Could not send an XON message to a join point on a tunnel without a join point",
                            )
                            .into()
                        );
                    }
                };
                let Some(leg) = self.circuits.leg_mut(leg_id) else {
                    // The leg has disappeared. This is fine since the stream may have ended and
                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
                    // It is possible that is a bug and this is an incorrect leg number, but
                    // it's not currently possible to differentiate between an incorrect leg
                    // number and a tunnel leg that has been closed.
                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
                    return Ok(());
                };
                let Some(hop) = leg.hop_mut(hop_num) else {
                    // The hop has disappeared. This is fine since the circuit may have been
                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
                    // It is possible that is a bug and this is an incorrect hop number, but
                    // it's not currently possible to differentiate between an incorrect hop
                    // number and a circuit hop that has been removed.
                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
                    return Ok(());
                };
                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
                    // Nothing to do.
                    return Ok(());
                };
                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
                let cell = SendRelayCell {
                    hop: Some(hop_num),
                    early: false,
                    cell,
                };
                leg.send_relay_cell(cell).await?;
            }
            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
                let leg = self
                    .circuits
                    .leg_mut(leg)
                    .ok_or_else(|| internal!("leg disappeared?!"))?;
                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
                // future which *should* resolve immediately
                let signals = leg.chan_sender.congestion_signals().await;
                leg.handle_sendme(hop, sendme, signals)?;
            }
            RunOnceCmdInner::FirstHopClockSkew { answer } => {
                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
                // don't care if the sender goes away
                let _ = answer.send(res.map_err(Into::into));
            }
            RunOnceCmdInner::CleanShutdown => {
                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
                return Err(ReactorError::Shutdown);
            }
            RunOnceCmdInner::RemoveLeg { leg, reason } => {
                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
                let circ = self.circuits.remove(leg)?;
                let is_conflux_pending = circ.is_conflux_pending();
                // Drop the removed leg. This will cause it to close if it's not already closed.
                drop(circ);
                // If we reach this point, it means we have more than one leg
                // (otherwise the .remove() would've returned a Shutdown error),
                // so we expect there to be a ConfluxHandshakeContext installed.
                #[cfg(feature = "conflux")]
                if is_conflux_pending {
                    let (error, proto_violation): (_, Option<Error>) = match &reason {
                        RemoveLegReason::ConfluxHandshakeTimeout => {
                            (ConfluxHandshakeError::Timeout, None)
                        }
                        RemoveLegReason::ConfluxHandshakeErr(e) => {
                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
                        }
                        RemoveLegReason::ChannelClosed => {
                            (ConfluxHandshakeError::ChannelClosed, None)
                        }
                    };
                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
                    if let Some(e) = proto_violation {
                        tor_error::warn_report!(
                            e,
                            tunnel_id = %self.tunnel_id,
                            "Malformed conflux handshake, tearing down tunnel",
                        );
                        return Err(e.into());
                    }
                }
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
                // Note: on the client-side, the handshake is considered complete once the
                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
                //
                // We're optimistic here, and declare the handshake a success *before*
                // sending the LINKED_ACK response. I think this is OK though,
                // because if the send_relay_cell() below fails, the reactor will shut
                // down anyway. OTOH, marking the handshake as complete slightly early
                // means that on the happy path, the circuit is marked as usable sooner,
                // instead of blocking on the sending of the LINKED_ACK.
                self.note_conflux_handshake_result(Ok(()), false)?;
                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
                res?;
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::Link { circuits, answer } => {
                // Add the specified circuits to our conflux set,
                // and send a LINK cell down each unlinked leg.
                //
                // NOTE: this will block the reactor until all the cells are sent.
                self.handle_link_circuits(circuits, answer).await?;
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::Enqueue { leg, msg } => {
                let entry = ConfluxHeapEntry { leg_id: leg, msg };
                self.ooo_msgs.push(entry);
            }
            #[cfg(feature = "circ-padding")]
            RunOnceCmdInner::PaddingAction { leg, padding_event } => {
                // TODO: If we someday move back to having a per-circuit reactor,
                // this event would logically belong there, not on the tunnel reactor.
                self.circuits.run_padding_event(leg, padding_event).await?;
            }
        }
        Ok(())
4652
    }
    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
    ///
    /// Returns an error if an unexpected `CtrlMsg` is received.
    #[instrument(level = "trace", skip_all)]
564
    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
        let msg = select_biased! {
            res = self.command.next() => {
                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
                match cmd {
                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
                    #[cfg(test)]
                    CtrlCmd::AddFakeHop {
                        relay_cell_format: format,
                        fwd_lasthop,
                        rev_lasthop,
                        peer_id,
                        params,
                        done,
                    } => {
                        let leg = self.circuits.single_leg_mut()?;
                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
                        return Ok(())
                    },
                    _ => {
                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
                    }
                }
            },
            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
        };
        match msg {
            CtrlMsg::Create {
                recv_created,
                handshake,
                settings,
                done,
            } => {
                // TODO(conflux): instead of crashing the reactor, it might be better
                // to send the error via the done channel instead
                let leg = self.circuits.single_leg_mut()?;
                leg.handle_create(recv_created, handshake, settings, done)
                    .await
            }
            _ => {
                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
            }
        }
376
    }
    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
    ///
    /// If all the circuits we were waiting on have finished the conflux handshake,
    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
    /// are sent to the handshake initiator.
    #[cfg(feature = "conflux")]
    #[instrument(level = "trace", skip_all)]
84
    fn note_conflux_handshake_result(
84
        &mut self,
84
        res: StdResult<(), ConfluxHandshakeError>,
84
        reactor_is_closing: bool,
84
    ) -> StdResult<(), ReactorError> {
84
        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
84
            Some(conflux_ctx) => {
84
                conflux_ctx.results.push(res);
                // Whether all the legs have finished linking:
84
                conflux_ctx.results.len() == conflux_ctx.num_legs
            }
            None => {
                return Err(internal!("no conflux handshake context").into());
            }
        };
84
        if tunnel_complete || reactor_is_closing {
            // Time to remove the conflux handshake context
            // and extract the results we have collected
48
            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
108
            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
48
            let leg_count = conflux_ctx.results.len();
48
            info!(
                tunnel_id = %self.tunnel_id,
                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
            );
48
            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
            // We don't expect to receive any more handshake results,
            // at least not until we get another LinkCircuits control message,
            // which will install a new ConfluxHandshakeCtx with a channel
            // for us to send updates on
36
        }
80
        Ok(())
84
    }
    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
    fn prepare_msg_and_install_handler(
        &mut self,
        msg: Option<AnyRelayMsgOuter>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<Option<SendRelayCell>> {
        let msg = msg
            .map(|msg| {
                let handlers = &mut self.cell_handlers;
                let handler = handler
                    .as_ref()
                    .or(handlers.meta_handler.as_ref())
                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
                // We should always have a precise HopLocation here so this should never fails but
                // in case we have a ::JointPoint, we'll notice.
                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
                    "MsgHandler doesn't have a precise HopLocation"
                ))?;
                Ok::<_, crate::Error>(SendRelayCell {
                    hop: Some(hop),
                    early: false,
                    cell: msg,
                })
            })
            .transpose()?;
        if let Some(handler) = handler {
            self.cell_handlers.set_meta_handler(handler)?;
        }
        Ok(msg)
    }
    /// Handle a shutdown request.
64
    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
64
        trace!(
            tunnel_id = %self.tunnel_id,
            "reactor shutdown due to explicit request",
        );
64
        Err(ReactorError::Shutdown)
64
    }
    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
    ///
    /// Returns an error over the `answer` channel if the reactor has no circuits,
    /// or more than one circuit. The reactor will shut down regardless.
    #[cfg(feature = "conflux")]
64
    fn handle_shutdown_and_return_circuit(
64
        &mut self,
64
        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
64
    ) -> StdResult<(), ReactorError> {
        // Don't care if the receiver goes away
64
        let _ = answer.send(self.circuits.take_single_leg());
64
        self.handle_shutdown().map(|_| ())
64
    }
    /// Resolves a [`TargetHop`] to a [`HopLocation`].
    ///
    /// After resolving a `TargetHop::LastHop`,
    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
    ///
    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
    /// if you need a fixed hop position in the tunnel.
    /// For example if we open a stream to `TargetHop::LastHop`,
    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
    /// as we don't want the stream position to change as the tunnel is extended or truncated.
    ///
    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
    /// and the tunnel has no hops
    /// (either has no legs, or has legs which contain no hops).
204
    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
204
        match hop {
60
            TargetHop::Hop(hop) => Ok(hop),
            TargetHop::LastHop => {
144
                if let Ok(leg) = self.circuits.single_leg() {
132
                    let leg_id = leg.unique_id();
                    // single-path tunnel
132
                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
132
                    Ok(HopLocation::Hop((leg_id, hop)))
12
                } else if !self.circuits.is_empty() {
                    // multi-path tunnel
12
                    Ok(HopLocation::JoinPoint)
                } else {
                    // no legs
                    Err(NoHopsBuiltError)
                }
            }
        }
204
    }
    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
    ///
    /// After resolving a `HopLocation::JoinPoint`,
    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
    ///
    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
    /// need them,
    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
    /// iterations as the primary leg may change from one iteration to the next.
    ///
    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
    /// but it does not have a join point.
    #[instrument(level = "trace", skip_all)]
226
    fn resolve_hop_location(
226
        &self,
226
        hop: HopLocation,
226
    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
226
        match hop {
214
            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
            HopLocation::JoinPoint => {
12
                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
12
                    Ok((leg_id, hop_num))
                } else {
                    // Attempted to get the join point of a non-multipath tunnel.
                    Err(NoJoinPointError)
                }
            }
        }
226
    }
    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
    ///
    /// This is a helper function that basically calls both resolve_target_hop and
    /// resolve_hop_location back to back.
    ///
    /// It returns None on failure to resolve meaning that if you want more detailed error on why
    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
60
    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
60
        self.resolve_target_hop(hop)
60
            .ok()
90
            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
60
    }
    /// Install or remove a padder at a given hop.
    #[cfg(feature = "circ-padding-manual")]
    fn set_padding_at_hop(
        &self,
        hop: HopLocation,
        padder: Option<super::circuit::padding::CircuitPadder>,
    ) -> Result<()> {
        let HopLocation::Hop((leg_id, hop_num)) = hop else {
            return Err(bad_api_usage!("Padding to the join point is not supported.").into());
        };
        let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
        circ.set_padding_at_hop(hop_num, padder)?;
        Ok(())
    }
    /// Does congestion control use stream SENDMEs for the given hop?
    ///
    /// Returns `None` if either the `leg` or `hop` don't exist.
    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
        self.circuits.uses_stream_sendme(leg, hop)
    }
    /// Handle a request to link some extra circuits in the reactor's conflux set.
    ///
    /// The circuits are validated, and if they do not have the same length,
    /// or if they do not all have the same last hop, an error is returned on
    /// the `answer` channel, and the conflux handshake is *not* initiated.
    ///
    /// If validation succeeds, the circuits are added to this reactor's conflux set,
    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
    ///
    /// NOTE: this blocks the reactor main loop until all the cells are sent.
    #[cfg(feature = "conflux")]
    #[instrument(level = "trace", skip_all)]
60
    async fn handle_link_circuits(
60
        &mut self,
60
        circuits: Vec<Circuit>,
60
        answer: ConfluxLinkResultChannel,
90
    ) -> StdResult<(), ReactorError> {
        use tor_error::warn_report;
        if self.conflux_hs_ctx.is_some() {
            let err = internal!("conflux linking already in progress");
            send_conflux_outcome(answer, Err(err.into()))?;
            return Ok(());
        }
        let unlinked_legs = self.circuits.num_unlinked();
        // We need to send the LINK cell on each of the new circuits
        // and on each of the existing, unlinked legs from self.circuits.
        //
        // In reality, there can only be one such circuit
        // (the "initial" one from the previously single-path tunnel),
        // because any circuits that to complete the conflux handshake
        // get removed from the set.
        let num_legs = circuits.len() + unlinked_legs;
        // Note: add_legs validates `circuits`
60
        let res = async {
60
            self.circuits.add_legs(circuits, &self.runtime)?;
52
            self.circuits.link_circuits(&self.runtime).await
60
        }
        .await;
        if let Err(e) = res {
            warn_report!(e, "Failed to link conflux circuits");
            send_conflux_outcome(answer, Err(e))?;
        } else {
            // Save the channel, to notify the user of completion.
            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
                answer,
                num_legs,
                results: Default::default(),
            });
        }
        Ok(())
60
    }
}
/// Notify the conflux handshake initiator of the handshake outcome.
///
/// Returns an error if the initiator has done away.
#[cfg(feature = "conflux")]
56
fn send_conflux_outcome(
56
    tx: ConfluxLinkResultChannel,
56
    res: Result<ConfluxHandshakeResult>,
56
) -> StdResult<(), ReactorError> {
56
    if tx.send(res).is_err() {
4
        tracing::warn!("conflux initiator went away before handshake completed?");
4
        return Err(ReactorError::Shutdown);
52
    }
52
    Ok(())
56
}
/// The tunnel does not have any hops.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("no hops have been built for this tunnel")]
pub(crate) struct NoHopsBuiltError;
/// The tunnel does not have a join point.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("the tunnel does not have a join point")]
pub(crate) struct NoJoinPointError;
impl CellHandlers {
    /// Try to install a given meta-cell handler to receive any unusual cells on
    /// this circuit, along with a result channel to notify on completion.
72
    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
72
        if self.meta_handler.is_none() {
72
            self.meta_handler = Some(handler);
72
            Ok(())
        } else {
            Err(Error::from(internal!(
                "Tried to install a meta-cell handler before the old one was gone."
            )))
        }
72
    }
    /// Try to install a given cell handler on this circuit.
    #[cfg(feature = "hs-service")]
60
    fn set_incoming_stream_req_handler(
60
        &mut self,
60
        handler: IncomingStreamRequestHandler,
60
    ) -> Result<()> {
60
        if self.incoming_stream_req_handler.is_none() {
48
            self.incoming_stream_req_handler = Some(handler);
48
            Ok(())
        } else {
12
            Err(Error::from(internal!(
12
                "Tried to install a BEGIN cell handler before the old one was gone."
12
            )))
        }
60
    }
}
#[cfg(test)]
mod test {
    // Tested in [`crate::client::circuit::test`].
}