1
//! Code to handle incoming cells on a circuit.
2
//!
3
//! ## On message validation
4
//!
5
//! There are three steps for validating an incoming message on a stream:
6
//!
7
//! 1. Is the message contextually appropriate? (e.g., no more than one
8
//!    `CONNECTED` message per stream.) This is handled by calling
9
//!    [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10
//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11
//!    than we've sent data for.) This is handled within the reactor by the
12
//!    `StreamFlowCtrl`. For half-closed streams which don't send stream
13
//!    SENDMEs, an additional receive-window check is performed in the
14
//!    `halfstream` module.
15
//! 3. Does the message have an acceptable command type, and is the message
16
//!    well-formed? For open streams, the streams themselves handle this check.
17
//!    For half-closed streams, the reactor handles it by calling
18
//!    `consume_checked_msg()`.
19

            
20
pub(crate) mod circuit;
21
mod conflux;
22
mod control;
23

            
24
use crate::circuit::circhop::SendRelayCell;
25
use crate::circuit::{CircuitRxReceiver, UniqId};
26
use crate::client::circuit::ClientCircChanMsg;
27
use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
28
use crate::client::{HopLocation, TargetHop};
29
use crate::crypto::cell::HopNum;
30
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
31
use crate::memquota::CircuitAccount;
32
use crate::stream::CloseStreamBehavior;
33
use crate::streammap;
34
use crate::tunnel::{TunnelId, TunnelScopedCircId};
35
use crate::util::err::ReactorError;
36
use crate::util::skew::ClockSkew;
37
use crate::util::timeout::TimeoutEstimator;
38
use crate::{Error, Result};
39
use circuit::Circuit;
40
use conflux::ConfluxSet;
41
use control::ControlHandler;
42
use std::cmp::Ordering;
43
use std::collections::BinaryHeap;
44
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
45
use tor_cell::relaycell::msg::Sendme;
46
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
47
use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
48
use tor_rtcompat::{DynTimeProvider, SleepProvider};
49

            
50
use cfg_if::cfg_if;
51
use futures::StreamExt;
52
use futures::channel::mpsc;
53
use futures::{FutureExt as _, select_biased};
54
use oneshot_fused_workaround as oneshot;
55

            
56
use std::result::Result as StdResult;
57
use std::sync::Arc;
58
use std::time::Duration;
59

            
60
use crate::channel::Channel;
61
use crate::conflux::msghandler::RemoveLegReason;
62
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
63
use circuit::CircuitCmd;
64
use derive_more::From;
65
use smallvec::smallvec;
66
use tor_cell::chancell::CircId;
67
use tor_llcrypto::pk;
68
use tracing::{debug, info, instrument, trace, warn};
69

            
70
use super::circuit::{MutableState, TunnelMutableState};
71
use crate::circuit::reactor::ReactorResultChannel;
72

            
73
#[cfg(feature = "hs-service")]
74
use crate::stream::incoming::IncomingStreamRequestHandler;
75

            
76
#[cfg(feature = "conflux")]
77
use {
78
    crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
79
    crate::util::err::ConfluxHandshakeError,
80
};
81

            
82
pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
83

            
84
/// Contains a list of conflux handshake results.
85
#[cfg(feature = "conflux")]
86
pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
87

            
88
/// The type of oneshot channel used to inform reactor users of the outcome
89
/// of a client-side conflux handshake.
90
///
91
/// Contains a list of handshake results, one for each circuit that we were asked
92
/// to link in the tunnel.
93
#[cfg(feature = "conflux")]
94
pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
95

            
96
/// A handshake type, to be used when creating circuit hops.
97
#[derive(Clone, Debug)]
98
pub(crate) enum CircuitHandshake {
99
    /// Use the CREATE_FAST handshake.
100
    CreateFast,
101
    /// Use the ntor handshake.
102
    Ntor {
103
        /// The public key of the relay.
104
        public_key: NtorPublicKey,
105
        /// The Ed25519 identity of the relay, which is verified against the
106
        /// identity held in the circuit's channel.
107
        ed_identity: pk::ed25519::Ed25519Identity,
108
    },
109
    /// Use the ntor-v3 handshake.
110
    NtorV3 {
111
        /// The public key of the relay.
112
        public_key: NtorV3PublicKey,
113
    },
114
}
115

            
116
// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
117
// proliferation is a bit bothersome, but unavoidable with the current design.
118
//
119
// We should consider getting rid of some of these enums (if possible),
120
// and coming up with more intuitive names.
121

            
122
/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
123
#[derive(From, Debug)]
124
#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
125
enum RunOnceCmd {
126
    /// Run a single `RunOnceCmdInner` command.
127
    Single(RunOnceCmdInner),
128
    /// Run multiple `RunOnceCmdInner` commands.
129
    //
130
    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
131
    // but most of the time we're only going to have *one* RunOnceCmdInner
132
    // to run per run_once() loop. The enum enables us avoid the extra heap
133
    // allocation for the `RunOnceCmd::Single` case.
134
    Multiple(Vec<RunOnceCmdInner>),
135
}
136

            
137
/// Instructions for running something in the reactor loop.
138
///
139
/// Run at the end of [`Reactor::run_once`].
140
//
141
// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
142
// We should consider making each variant a tuple variant and deduplicating the fields.
143
#[derive(educe::Educe)]
144
#[educe(Debug)]
145
enum RunOnceCmdInner {
146
    /// Send a RELAY cell.
147
    Send {
148
        /// The leg the cell should be sent on.
149
        leg: UniqId,
150
        /// The cell to send.
151
        cell: SendRelayCell,
152
        /// A channel for sending completion notifications.
153
        done: Option<ReactorResultChannel<()>>,
154
    },
155
    /// Send a given control message on this circuit, and install a control-message handler to
156
    /// receive responses.
157
    #[cfg(feature = "send-control-msg")]
158
    SendMsgAndInstallHandler {
159
        /// The message to send, if any
160
        msg: Option<AnyRelayMsgOuter>,
161
        /// A message handler to install.
162
        ///
163
        /// If this is `None`, there must already be a message handler installed
164
        #[educe(Debug(ignore))]
165
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
166
        /// A sender that we use to tell the caller that the message was sent
167
        /// and the handler installed.
168
        done: oneshot::Sender<Result<()>>,
169
    },
170
    /// Handle a SENDME message.
171
    HandleSendMe {
172
        /// The leg the SENDME was received on.
173
        leg: UniqId,
174
        /// The hop number.
175
        hop: HopNum,
176
        /// The SENDME message to handle.
177
        sendme: Sendme,
178
    },
179
    /// Begin a stream with the provided hop in this circuit.
180
    ///
181
    /// Uses the provided stream ID, and sends the provided message to that hop.
182
    BeginStream {
183
        /// The cell to send.
184
        cell: SendRelayCell,
185
        /// The ID of the stream to return on the oneshot channel.
186
        stream_id: StreamId,
187
        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
188
        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
189
        /// caller.
190
        hop: HopLocation,
191
        /// The circuit leg to begin the stream on.
192
        leg: UniqId,
193
        /// Oneshot channel to notify on completion, with the allocated stream ID.
194
        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
195
    },
196
    /// Consider sending an XON message with the given `rate`.
197
    MaybeSendXon {
198
        /// The drain rate to advertise in the XON message.
199
        rate: XonKbpsEwma,
200
        /// The ID of the stream to send the message on.
201
        stream_id: StreamId,
202
        /// The location of the hop on the tunnel.
203
        hop: HopLocation,
204
    },
205
    /// Close the specified stream.
206
    CloseStream {
207
        /// The hop number.
208
        hop: HopLocation,
209
        /// The ID of the stream to close.
210
        sid: StreamId,
211
        /// The stream-closing behavior.
212
        behav: CloseStreamBehavior,
213
        /// The reason for closing the stream.
214
        reason: streammap::TerminateReason,
215
        /// A channel for sending completion notifications.
216
        done: Option<ReactorResultChannel<()>>,
217
    },
218
    /// Get the clock skew claimed by the first hop of the circuit.
219
    FirstHopClockSkew {
220
        /// Oneshot channel to return the clock skew.
221
        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
222
    },
223
    /// Remove a circuit leg from the conflux set.
224
    RemoveLeg {
225
        /// The circuit leg to remove.
226
        leg: UniqId,
227
        /// The reason for removal.
228
        ///
229
        /// This is only used for conflux circuits that get removed
230
        /// before the conflux handshake is complete.
231
        ///
232
        /// The [`RemoveLegReason`] is mapped by the reactor to a
233
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
234
        /// handshake to indicate the reason the handshake failed.
235
        reason: RemoveLegReason,
236
    },
237
    /// A circuit has completed the conflux handshake,
238
    /// and wants to send the specified cell.
239
    ///
240
    /// This is similar to [`RunOnceCmdInner::Send`],
241
    /// but needs to remain a separate variant,
242
    /// because in addition to instructing the reactor to send a cell,
243
    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
244
    /// This enables the reactor to save the handshake result (`Ok(())`),
245
    /// and, if there are no other legs still in the handshake phase,
246
    /// send the result to the handshake initiator.
247
    #[cfg(feature = "conflux")]
248
    ConfluxHandshakeComplete {
249
        /// The circuit leg that has completed the handshake,
250
        /// This is the leg the cell should be sent on.
251
        leg: UniqId,
252
        /// The cell to send.
253
        cell: SendRelayCell,
254
    },
255
    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
256
    #[cfg(feature = "conflux")]
257
    Link {
258
        /// The circuits to link into the tunnel
259
        #[educe(Debug(ignore))]
260
        circuits: Vec<Circuit>,
261
        /// Oneshot channel for notifying of conflux handshake completion.
262
        answer: ConfluxLinkResultChannel,
263
    },
264
    /// Enqueue an out-of-order cell in ooo_msg.
265
    #[cfg(feature = "conflux")]
266
    Enqueue {
267
        /// The leg the entry originated from.
268
        leg: UniqId,
269
        /// The out-of-order message.
270
        msg: OooRelayMsg,
271
    },
272
    /// Take a padding-related event on a circuit leg.
273
    #[cfg(feature = "circ-padding")]
274
    PaddingAction {
275
        /// The leg to event on.
276
        leg: UniqId,
277
        /// The event to take.
278
        padding_event: PaddingEvent,
279
    },
280
    /// Perform a clean shutdown on this circuit.
281
    CleanShutdown,
282
}
283

            
284
impl RunOnceCmdInner {
285
    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
286
4396
    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
287
92
        match cmd {
288
4180
            CircuitCmd::Send(cell) => Self::Send {
289
4180
                leg,
290
4180
                cell,
291
4180
                done: None,
292
4180
            },
293
44
            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
294
            CircuitCmd::CloseStream {
295
52
                hop,
296
52
                sid,
297
52
                behav,
298
52
                reason,
299
52
            } => Self::CloseStream {
300
52
                hop: HopLocation::Hop((leg, hop)),
301
52
                sid,
302
52
                behav,
303
52
                reason,
304
52
                done: None,
305
52
            },
306
            #[cfg(feature = "conflux")]
307
24
            CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
308
            #[cfg(feature = "conflux")]
309
68
            CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
310
68
                let cell = SendRelayCell {
311
68
                    hop: Some(hop),
312
68
                    early,
313
68
                    cell,
314
68
                };
315
68
                Self::ConfluxHandshakeComplete { leg, cell }
316
            }
317
            #[cfg(feature = "conflux")]
318
16
            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
319
12
            CircuitCmd::CleanShutdown => Self::CleanShutdown,
320
        }
321
4396
    }
322
}
323

            
324
/// A command to execute at the end of [`Reactor::run_once`].
325
#[derive(From, Debug)]
326
#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
327
enum CircuitEvent {
328
    /// Run a single `CircuitCmd` command.
329
    RunCmd {
330
        /// The unique identifier of the circuit leg to run the command on
331
        leg: UniqId,
332
        /// The command to run.
333
        cmd: CircuitCmd,
334
    },
335
    /// Handle a control message
336
    HandleControl(CtrlMsg),
337
    /// Handle an input message.
338
    HandleCell {
339
        /// The unique identifier of the circuit leg the message was received on.
340
        leg: UniqId,
341
        /// The message to handle.
342
        cell: ClientCircChanMsg,
343
    },
344
    /// Remove the specified circuit leg from the conflux set.
345
    ///
346
    /// Returned whenever a single circuit leg needs to be removed
347
    /// from the reactor's conflux set, without necessarily tearing down
348
    /// the whole set or shutting down the reactor.
349
    ///
350
    /// Note: this event *can* cause the reactor to shut down
351
    /// (and the conflux set to be closed).
352
    ///
353
    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
354
    RemoveLeg {
355
        /// The leg to remove.
356
        leg: UniqId,
357
        /// The reason for removal.
358
        ///
359
        /// This is only used for conflux circuits that get removed
360
        /// before the conflux handshake is complete.
361
        ///
362
        /// The [`RemoveLegReason`] is mapped by the reactor to a
363
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
364
        /// handshake to indicate the reason the handshake failed.
365
        reason: RemoveLegReason,
366
    },
367
    /// Take some event (blocking or unblocking a circuit, or sending padding)
368
    /// based on the circuit padding backend code.
369
    PaddingAction {
370
        /// The leg on which to take the padding event .
371
        leg: UniqId,
372
        /// The event to take.
373
        padding_event: PaddingEvent,
374
    },
375
    /// Protocol violation. This leads for now to the close of the circuit reactor. The
376
    /// error causing the violation is set in err.
377
    ProtoViolation {
378
        /// The error that causes this protocol violation.
379
        err: crate::Error,
380
    },
381
}
382

            
383
impl CircuitEvent {
384
    /// Return the ordering with which we should handle this event
385
    /// within a list of events returned by a single call to next_circ_event().
386
    ///
387
    /// NOTE: Please do not make this any more complicated:
388
    /// It is a consequence of a kludge that we need this sorting at all.
389
    /// Assuming that eventually, we switch away from the current
390
    /// poll-oriented `next_circ_event` design,
391
    /// we may be able to get rid of this entirely.
392
88
    fn order_within_batch(&self) -> u8 {
393
        use CircuitEvent as CA;
394
        use PaddingEvent as PE;
395
        // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
396
        // only used by the protocol violation event which leads to a shutdown of the reactor.
397
        const IMMEDIATE: u8 = 0;
398
        const EARLY: u8 = 1;
399
        const NORMAL: u8 = 2;
400
        const LATE: u8 = 3;
401

            
402
        // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
403
        // "StopBlocking" to the start.
404
        //
405
        // This way, we can be sure that we will handle any "send data" operations
406
        // (and tell the Padder about them) _before_  we tell the Padder
407
        // that we have blocked the circuit.
408
        //
409
        // This keeps things a bit more logical.
410
88
        match self {
411
22
            CA::RunCmd { .. } => NORMAL,
412
            CA::HandleControl(..) => NORMAL,
413
62
            CA::HandleCell { .. } => NORMAL,
414
4
            CA::RemoveLeg { .. } => NORMAL,
415
            #[cfg(feature = "circ-padding")]
416
            CA::PaddingAction { padding_event, .. } => match padding_event {
417
                PE::StopBlocking => EARLY,
418
                PE::SendPadding(..) => NORMAL,
419
                PE::StartBlocking(..) => LATE,
420
            },
421
            #[cfg(not(feature = "circ-padding"))]
422
            CA::PaddingAction { .. } => NORMAL,
423
            CA::ProtoViolation { .. } => IMMEDIATE,
424
        }
425
88
    }
426
}
427

            
428
/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
429
/// progress.
430
///
431
/// # Background
432
///
433
/// The `Reactor` can't have async functions that send and receive cells, because its job is to
434
/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
435
///
436
/// To get around this problem, the reactor can send some cells, and then make one of these
437
/// `MetaCellHandler` objects, which will be run when the reply arrives.
438
pub(crate) trait MetaCellHandler: Send {
439
    /// The hop we're expecting the message to come from. This is compared against the hop
440
    /// from which we actually receive messages, and an error is thrown if the two don't match.
441
    fn expected_hop(&self) -> HopLocation;
442
    /// Called when the message we were waiting for arrives.
443
    ///
444
    /// Gets a copy of the `Reactor` in order to do anything it likes there.
445
    ///
446
    /// If this function returns an error, the reactor will shut down.
447
    fn handle_msg(
448
        &mut self,
449
        msg: UnparsedRelayMsg,
450
        reactor: &mut Circuit,
451
    ) -> Result<MetaCellDisposition>;
452
}
453

            
454
/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
455
#[derive(Debug, Clone, PartialEq)]
456
#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
457
#[non_exhaustive]
458
pub(crate) enum MetaCellDisposition {
459
    /// The message was consumed; the handler should remain installed.
460
    #[cfg(feature = "send-control-msg")]
461
    Consumed,
462
    /// The message was consumed; the handler should be uninstalled.
463
    ConversationFinished,
464
    /// The message was consumed; the circuit should be closed.
465
    #[cfg(feature = "send-control-msg")]
466
    CloseCirc,
467
    // TODO: Eventually we might want the ability to have multiple handlers
468
    // installed, and to let them say "not for me, maybe for somebody else?".
469
    // But right now we don't need that.
470
}
471

            
472
/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
473
///
474
/// This is a macro instead of a function to work around borrowck errors
475
/// in the select! from run_once().
476
macro_rules! unwrap_or_shutdown {
477
    ($self:expr, $res:expr, $reason:expr) => {{
478
        match $res {
479
            None => {
480
                trace!(
481
                    tunnel_id = %$self.tunnel_id,
482
                    reason = %$reason,
483
                    "reactor shutdown"
484
                );
485
                Err(ReactorError::Shutdown)
486
            }
487
            Some(v) => Ok(v),
488
        }
489
    }};
490
}
491

            
492
/// Object to handle incoming cells and background tasks on a circuit
493
///
494
/// This type is returned when you finish a circuit; you need to spawn a
495
/// new task that calls `run()` on it.
496
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
497
pub struct Reactor {
498
    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
499
    ///
500
    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
501
    /// is ready to accept cells.
502
    control: mpsc::UnboundedReceiver<CtrlMsg>,
503
    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
504
    ///
505
    /// This channel is polled in [`Reactor::run_once`].
506
    ///
507
    /// NOTE: this is a separate channel from `control`, because some messages
508
    /// have higher priority and need to be handled even if the `chan_sender` is not
509
    /// ready (whereas `control` messages are not read until the `chan_sender` sink
510
    /// is ready to accept cells).
511
    command: mpsc::UnboundedReceiver<CtrlCmd>,
512
    /// A oneshot sender that is used to alert other tasks when this reactor is
513
    /// finally dropped.
514
    ///
515
    /// It is a sender for Void because we never actually want to send anything here;
516
    /// we only want to generate canceled events.
517
    #[allow(dead_code)] // the only purpose of this field is to be dropped.
518
    reactor_closed_tx: oneshot::Sender<void::Void>,
519
    /// A set of circuits that form a tunnel.
520
    ///
521
    /// Contains 1 or more circuits.
522
    ///
523
    /// Circuits may be added to this set throughout the lifetime of the reactor.
524
    ///
525
    /// Sometimes, the reactor will remove circuits from this set,
526
    /// for example if the `LINKED` message takes too long to arrive,
527
    /// or if congestion control negotiation fails.
528
    /// The reactor will continue running with the remaining circuits.
529
    /// It will shut down if *all* the circuits are removed.
530
    ///
531
    // TODO(conflux): document all the reasons why the reactor might
532
    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
533
    circuits: ConfluxSet,
534
    /// An identifier for logging about this tunnel reactor.
535
    tunnel_id: TunnelId,
536
    /// Handlers, shared with `Circuit`.
537
    cell_handlers: CellHandlers,
538
    /// The time provider, used for conflux handshake timeouts.
539
    runtime: DynTimeProvider,
540
    /// The conflux handshake context, if there is an on-going handshake.
541
    ///
542
    /// Set to `None` if this is a single-path tunnel,
543
    /// or if none of the circuit legs from our conflux set
544
    /// are currently in the conflux handshake phase.
545
    #[cfg(feature = "conflux")]
546
    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
547
    /// A min-heap buffering all the out-of-order messages received so far.
548
    ///
549
    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
550
    /// on its size. We should make this participate in the memquota memory
551
    /// tracking system, somehow.
552
    #[cfg(feature = "conflux")]
553
    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
554
}
555

            
556
/// The context for an on-going conflux handshake.
557
#[cfg(feature = "conflux")]
558
struct ConfluxHandshakeCtx {
559
    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
560
    answer: ConfluxLinkResultChannel,
561
    /// The number of legs that are currently doing the handshake.
562
    num_legs: usize,
563
    /// The handshake results we have collected so far.
564
    results: ConfluxHandshakeResult,
565
}
566

            
567
/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
568
#[derive(Debug)]
569
#[cfg(feature = "conflux")]
570
struct ConfluxHeapEntry {
571
    /// The leg id this message came from.
572
    leg_id: UniqId,
573
    /// The out of order message
574
    msg: OooRelayMsg,
575
}
576

            
577
#[cfg(feature = "conflux")]
578
impl Ord for ConfluxHeapEntry {
579
4
    fn cmp(&self, other: &Self) -> Ordering {
580
4
        self.msg.cmp(&other.msg)
581
4
    }
582
}
583

            
584
#[cfg(feature = "conflux")]
585
impl PartialOrd for ConfluxHeapEntry {
586
4
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
587
4
        Some(self.cmp(other))
588
4
    }
589
}
590

            
591
#[cfg(feature = "conflux")]
592
impl PartialEq for ConfluxHeapEntry {
593
    fn eq(&self, other: &Self) -> bool {
594
        self.msg == other.msg
595
    }
596
}
597

            
598
#[cfg(feature = "conflux")]
599
impl Eq for ConfluxHeapEntry {}
600

            
601
/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
602
struct CellHandlers {
603
    /// A handler for a meta cell, together with a result channel to notify on completion.
604
    ///
605
    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
606
    ///
607
    /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
608
    /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
609
    /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
610
    /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
611
    /// (which are handled separately) or conflux cells
612
    /// (which are handled by the conflux handlers).
613
    ///
614
    /// The handler is uninstalled after the receipt of the EXTENDED cell,
615
    /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
616
    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
617
    /// A handler for incoming stream requests.
618
    #[cfg(feature = "hs-service")]
619
    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
620
}
621

            
622
impl Reactor {
623
    /// Create a new circuit reactor.
624
    ///
625
    /// The reactor will send outbound messages on `channel`, receive incoming
626
    /// messages on `input`, and identify this circuit by the channel-local
627
    /// [`CircId`] provided.
628
    ///
629
    /// The internal unique identifier for this circuit will be `unique_id`.
630
    #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
631
376
    pub(super) fn new(
632
376
        channel: Arc<Channel>,
633
376
        channel_id: CircId,
634
376
        unique_id: UniqId,
635
376
        input: CircuitRxReceiver,
636
376
        runtime: DynTimeProvider,
637
376
        memquota: CircuitAccount,
638
376
        padding_ctrl: PaddingController,
639
376
        padding_stream: PaddingEventStream,
640
376
        timeouts: Arc<dyn TimeoutEstimator + Send>,
641
376
    ) -> (
642
376
        Self,
643
376
        mpsc::UnboundedSender<CtrlMsg>,
644
376
        mpsc::UnboundedSender<CtrlCmd>,
645
376
        oneshot::Receiver<void::Void>,
646
376
        Arc<TunnelMutableState>,
647
376
    ) {
648
376
        let tunnel_id = TunnelId::next();
649
376
        let (control_tx, control_rx) = mpsc::unbounded();
650
376
        let (command_tx, command_rx) = mpsc::unbounded();
651
376
        let mutable = Arc::new(MutableState::default());
652

            
653
376
        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
654

            
655
376
        let cell_handlers = CellHandlers {
656
376
            meta_handler: None,
657
376
            #[cfg(feature = "hs-service")]
658
376
            incoming_stream_req_handler: None,
659
376
        };
660

            
661
376
        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
662
376
        let circuit_leg = Circuit::new(
663
376
            runtime.clone(),
664
376
            channel,
665
376
            channel_id,
666
376
            unique_id,
667
376
            input,
668
376
            memquota,
669
376
            Arc::clone(&mutable),
670
376
            padding_ctrl,
671
376
            padding_stream,
672
376
            timeouts,
673
        );
674

            
675
376
        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
676

            
677
376
        let reactor = Reactor {
678
376
            circuits,
679
376
            control: control_rx,
680
376
            command: command_rx,
681
376
            reactor_closed_tx,
682
376
            tunnel_id,
683
376
            cell_handlers,
684
376
            runtime,
685
376
            #[cfg(feature = "conflux")]
686
376
            conflux_hs_ctx: None,
687
376
            #[cfg(feature = "conflux")]
688
376
            ooo_msgs: Default::default(),
689
376
        };
690

            
691
376
        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
692
376
    }
693

            
694
    /// Launch the reactor, and run until the circuit closes or we
695
    /// encounter an error.
696
    ///
697
    /// Once this method returns, the circuit is dead and cannot be
698
    /// used again.
699
    #[instrument(level = "trace", skip_all)]
700
564
    pub async fn run(mut self) -> Result<()> {
701
        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
702
        let result: Result<()> = loop {
703
            match self.run_once().await {
704
                Ok(()) => (),
705
                Err(ReactorError::Shutdown) => break Ok(()),
706
                Err(ReactorError::Err(e)) => break Err(e),
707
            }
708
        };
709

            
710
        // Log that the reactor stopped, possibly with the associated error as a report.
711
        // May log at a higher level depending on the error kind.
712
        const MSG: &str = "Tunnel reactor stopped";
713
        match &result {
714
            Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
715
            Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
716
        }
717

            
718
        result
719
282
    }
720

            
721
    /// Helper for run: doesn't mark the circuit closed on finish.  Only
722
    /// processes one cell or control message.
723
    #[instrument(level = "trace", skip_all)]
724
9498
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
725
        // If all the circuits are closed, shut down the reactor
726
        if self.circuits.is_empty() {
727
            trace!(
728
                tunnel_id = %self.tunnel_id,
729
                "Tunnel reactor shutting down: all circuits have closed",
730
            );
731

            
732
            return Err(ReactorError::Shutdown);
733
        }
734

            
735
        // If this is a single path circuit, we need to wait until the first hop
736
        // is created before doing anything else
737
        let single_path_with_hops = self
738
            .circuits
739
            .single_leg_mut()
740
4882
            .is_ok_and(|leg| !leg.has_hops());
741
        if single_path_with_hops {
742
            self.wait_for_create().await?;
743

            
744
            return Ok(());
745
        }
746

            
747
        // Prioritize the buffered messages.
748
        //
749
        // Note: if any of the messages are ready to be handled,
750
        // this will block the reactor until we are done processing them
751
        //
752
        // TODO circpad: If this is a problem, we might want to re-order things so that we
753
        // prioritize padding instead.  On the other hand, this should be fixed by refactoring
754
        // circuit and tunnel reactors, so we might do well to just leave it alone for now.
755
        #[cfg(feature = "conflux")]
756
        self.try_dequeue_ooo_msgs().await?;
757

            
758
        let mut events = select_biased! {
759
            res = self.command.next() => {
760
                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
761
                return ControlHandler::new(self).handle_cmd(cmd);
762
            },
763
            // Check whether we've got a control message pending.
764
            //
765
            // Note: unfortunately, reading from control here means we might start
766
            // handling control messages before our chan_senders are ready.
767
            // With the current design, this is inevitable: we can't know which circuit leg
768
            // a control message is meant for without first reading the control message from
769
            // the channel, and at that point, we can't know for sure whether that particular
770
            // circuit is ready for sending.
771
            ret = self.control.next() => {
772
                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
773
                smallvec![CircuitEvent::HandleControl(msg)]
774
            },
775
            res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
776
        };
777

            
778
        // Put the events into the order that we need to execute them in.
779
        //
780
        // (Yes, this _does_ have to be a stable sort.  Not all events may be freely re-ordered
781
        // with respect to one another.)
782
88
        events.sort_by_key(|a| a.order_within_batch());
783

            
784
        for event in events {
785
            let cmd = match event {
786
                CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
787
                    RunOnceCmdInner::from_circuit_cmd(leg, cmd),
788
                )),
789
                CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
790
                    .handle_msg(ctrl)?
791
                    .map(RunOnceCmd::Single),
792
                CircuitEvent::HandleCell { leg, cell } => {
793
                    let circ = self
794
                        .circuits
795
                        .leg_mut(leg)
796
                        .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
797

            
798
                    let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
799
                    if circ_cmds.is_empty() {
800
                        None
801
                    } else {
802
                        // TODO: we return RunOnceCmd::Multiple even if there's a single command.
803
                        //
804
                        // See the TODO on `Circuit::handle_cell`.
805
                        let cmd = RunOnceCmd::Multiple(
806
                            circ_cmds
807
                                .into_iter()
808
164
                                .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
809
                                .collect(),
810
                        );
811

            
812
                        Some(cmd)
813
                    }
814
                }
815
                CircuitEvent::RemoveLeg { leg, reason } => {
816
                    Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
817
                }
818
                CircuitEvent::PaddingAction { leg, padding_event } => {
819
                    cfg_if! {
820
                        if #[cfg(feature = "circ-padding")] {
821
                            Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
822
                        } else {
823
                            // If padding isn't enabled, we never generate a padding event,
824
                            // so we can be sure this case will never be called.
825
                            void::unreachable(padding_event.0);
826
                        }
827
                    }
828
                }
829
                CircuitEvent::ProtoViolation { err } => {
830
                    return Err(err.into());
831
                }
832
            };
833

            
834
            if let Some(cmd) = cmd {
835
                self.handle_run_once_cmd(cmd).await?;
836
            }
837
        }
838

            
839
        Ok(())
840
6280
    }
841

            
842
    /// Try to process the previously-out-of-order messages we might have buffered.
843
    #[cfg(feature = "conflux")]
844
    #[instrument(level = "trace", skip_all)]
845
8934
    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
846
        // Check if we're ready to dequeue any of the previously out-of-order cells.
847
        while let Some(entry) = self.ooo_msgs.peek() {
848
            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
849

            
850
            if !should_pop {
851
                break;
852
            }
853

            
854
            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
855

            
856
            let circ = self
857
                .circuits
858
                .leg_mut(entry.leg_id)
859
                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
860
            let handlers = &mut self.cell_handlers;
861
            let cmd = circ
862
                .handle_in_order_relay_msg(
863
                    handlers,
864
                    entry.msg.hopnum,
865
                    entry.leg_id,
866
                    entry.msg.cell_counts_towards_windows,
867
                    entry.msg.streamid,
868
                    entry.msg.msg,
869
                )?
870
                .map(|cmd| {
871
                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
872
                });
873

            
874
            if let Some(cmd) = cmd {
875
                self.handle_run_once_cmd(cmd).await?;
876
            }
877
        }
878

            
879
        Ok(())
880
5956
    }
881

            
882
    /// Handle a [`RunOnceCmd`].
883
    #[instrument(level = "trace", skip_all)]
884
6993
    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
885
4662
        match cmd {
886
            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
887
            RunOnceCmd::Multiple(cmds) => {
888
                // While we know `sendable` is ready to accept *one* cell,
889
                // we can't be certain it will be able to accept *all* of the cells
890
                // that need to be sent here. This means we *may* end up buffering
891
                // in its underlying SometimesUnboundedSink! That is OK, because
892
                // RunOnceCmd::Multiple is only used for handling packed cells.
893
                for cmd in cmds {
894
                    self.handle_single_run_once_cmd(cmd).await?;
895
                }
896
            }
897
        }
898

            
899
        Ok(())
900
4662
    }
901

            
902
    /// Handle a [`RunOnceCmd`].
903
    #[instrument(level = "trace", skip_all)]
904
4662
    async fn handle_single_run_once_cmd(
905
4662
        &mut self,
906
4662
        cmd: RunOnceCmdInner,
907
6993
    ) -> StdResult<(), ReactorError> {
908
4662
        match cmd {
909
            RunOnceCmdInner::Send { leg, cell, done } => {
910
                // TODO: check the cc window
911
                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
912
                if let Some(done) = done {
913
                    // Don't care if the receiver goes away
914
                    let _ = done.send(res.clone());
915
                }
916
                res?;
917
            }
918
            #[cfg(feature = "send-control-msg")]
919
            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
920
                let cell: Result<Option<SendRelayCell>> =
921
                    self.prepare_msg_and_install_handler(msg, handler);
922

            
923
                match cell {
924
                    Ok(Some(cell)) => {
925
                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
926
                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
927
                        // don't care if receiver goes away.
928
                        let _ = done.send(outcome.clone());
929
                        outcome?;
930
                    }
931
                    Ok(None) => {
932
                        // don't care if receiver goes away.
933
                        let _ = done.send(Ok(()));
934
                    }
935
                    Err(e) => {
936
                        // don't care if receiver goes away.
937
                        let _ = done.send(Err(e.clone()));
938
                        return Err(e.into());
939
                    }
940
                }
941
            }
942
            RunOnceCmdInner::BeginStream {
943
                leg,
944
                cell,
945
                stream_id,
946
                hop,
947
                done,
948
            } => {
949
                let circ = self
950
                    .circuits
951
                    .leg_mut(leg)
952
                    .ok_or_else(|| internal!("leg disappeared?!"))?;
953
                let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
954
                let relay_format = circ
955
                    .hop_mut(cell_hop)
956
                    // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
957
                    .ok_or(Error::NoSuchHop)?
958
                    .relay_cell_format();
959

            
960
                let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
961
                // don't care if receiver goes away.
962
96
                let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
963
                outcome?;
964
            }
965
            RunOnceCmdInner::CloseStream {
966
                hop,
967
                sid,
968
                behav,
969
                reason,
970
                done,
971
            } => {
972
                let result = {
973
                    let (leg_id, hop_num) = self
974
                        .resolve_hop_location(hop)
975
                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
976
                    let leg = self
977
                        .circuits
978
                        .leg_mut(leg_id)
979
                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
980
                    Ok::<_, Bug>((leg, hop_num))
981
                };
982

            
983
                let (leg, hop_num) = match result {
984
                    Ok(x) => x,
985
                    Err(e) => {
986
                        if let Some(done) = done {
987
                            // don't care if the sender goes away
988
                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
989
                            let _ = done.send(Err(e.into()));
990
                        }
991
                        return Ok(());
992
                    }
993
                };
994

            
995
                let max_rtt = {
996
                    let hop = leg
997
                        .hop(hop_num)
998
                        .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
999
                    let ccontrol = hop.ccontrol();
                    // Note: if we have no measurements for the RTT, this will be set to 0,
                    // and the timeout will be 2 * CBT.
                    ccontrol
                        .rtt()
                        .max_rtt_usec()
                        .map(|rtt| Duration::from_millis(u64::from(rtt)))
                        .unwrap_or_default()
                };
                // The length of the circuit up until the hop that has the half-streeam.
                //
                // +1, because HopNums are zero-based.
                let circ_len = usize::from(hop_num) + 1;
                // We double the CBT to account for rend circuits,
                // which are twice as long (otherwise we risk expiring
                // the rend half-streams too soon).
                let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
                let expire_at = self.runtime.now() + timeout;
                let res: Result<()> = leg
                    .close_stream(hop_num, sid, behav, reason, expire_at)
                    .await;
                if let Some(done) = done {
                    // don't care if the sender goes away
                    let _ = done.send(res);
                }
            }
            RunOnceCmdInner::MaybeSendXon {
                rate,
                stream_id,
                hop,
            } => {
                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
                    Ok(x) => x,
                    Err(NoJoinPointError) => {
                        // A stream tried to send an XON message message to the join point of
                        // a tunnel that has never had a join point. Currently in arti, only a
                        // `StreamTarget` asks us to send an XON message, and this tunnel
                        // originally created the `StreamTarget` to begin with. So this is a
                        // legitimate bug somewhere in the tunnel code.
                        return Err(
                            internal!(
                                "Could not send an XON message to a join point on a tunnel without a join point",
                            )
                            .into()
                        );
                    }
                };
                let Some(leg) = self.circuits.leg_mut(leg_id) else {
                    // The leg has disappeared. This is fine since the stream may have ended and
                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
                    // It is possible that is a bug and this is an incorrect leg number, but
                    // it's not currently possible to differentiate between an incorrect leg
                    // number and a tunnel leg that has been closed.
                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
                    return Ok(());
                };
                let Some(hop) = leg.hop_mut(hop_num) else {
                    // The hop has disappeared. This is fine since the circuit may have been
                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
                    // It is possible that is a bug and this is an incorrect hop number, but
                    // it's not currently possible to differentiate between an incorrect hop
                    // number and a circuit hop that has been removed.
                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
                    return Ok(());
                };
                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
                    // Nothing to do.
                    return Ok(());
                };
                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
                let cell = SendRelayCell {
                    hop: Some(hop_num),
                    early: false,
                    cell,
                };
                leg.send_relay_cell(cell).await?;
            }
            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
                let leg = self
                    .circuits
                    .leg_mut(leg)
                    .ok_or_else(|| internal!("leg disappeared?!"))?;
                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
                // future which *should* resolve immediately
                let signals = leg.chan_sender.congestion_signals().await;
                leg.handle_sendme(hop, sendme, signals)?;
            }
            RunOnceCmdInner::FirstHopClockSkew { answer } => {
                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
                // don't care if the sender goes away
                let _ = answer.send(res.map_err(Into::into));
            }
            RunOnceCmdInner::CleanShutdown => {
                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
                return Err(ReactorError::Shutdown);
            }
            RunOnceCmdInner::RemoveLeg { leg, reason } => {
                debug!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
                let circ = self.circuits.remove(leg)?;
                let is_conflux_pending = circ.is_conflux_pending();
                // Drop the removed leg. This will cause it to close if it's not already closed.
                drop(circ);
                // If we reach this point, it means we have more than one leg
                // (otherwise the .remove() would've returned a Shutdown error),
                // so we expect there to be a ConfluxHandshakeContext installed.
                #[cfg(feature = "conflux")]
                if is_conflux_pending {
                    let (error, proto_violation): (_, Option<Error>) = match &reason {
                        RemoveLegReason::ConfluxHandshakeTimeout => {
                            (ConfluxHandshakeError::Timeout, None)
                        }
                        RemoveLegReason::ConfluxHandshakeErr(e) => {
                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
                        }
                        RemoveLegReason::ChannelClosed => {
                            (ConfluxHandshakeError::ChannelClosed, None)
                        }
                    };
                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
                    if let Some(e) = proto_violation {
                        tor_error::warn_report!(
                            e,
                            tunnel_id = %self.tunnel_id,
                            "Malformed conflux handshake, tearing down tunnel",
                        );
                        return Err(e.into());
                    }
                }
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
                // Note: on the client-side, the handshake is considered complete once the
                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
                //
                // We're optimistic here, and declare the handshake a success *before*
                // sending the LINKED_ACK response. I think this is OK though,
                // because if the send_relay_cell() below fails, the reactor will shut
                // down anyway. OTOH, marking the handshake as complete slightly early
                // means that on the happy path, the circuit is marked as usable sooner,
                // instead of blocking on the sending of the LINKED_ACK.
                self.note_conflux_handshake_result(Ok(()), false)?;
                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
                res?;
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::Link { circuits, answer } => {
                // Add the specified circuits to our conflux set,
                // and send a LINK cell down each unlinked leg.
                //
                // NOTE: this will block the reactor until all the cells are sent.
                self.handle_link_circuits(circuits, answer).await?;
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::Enqueue { leg, msg } => {
                let entry = ConfluxHeapEntry { leg_id: leg, msg };
                self.ooo_msgs.push(entry);
            }
            #[cfg(feature = "circ-padding")]
            RunOnceCmdInner::PaddingAction { leg, padding_event } => {
                // TODO: If we someday move back to having a per-circuit reactor,
                // this event would logically belong there, not on the tunnel reactor.
                self.circuits.run_padding_event(leg, padding_event).await?;
            }
        }
        Ok(())
4662
    }
    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
    ///
    /// Returns an error if an unexpected `CtrlMsg` is received.
    #[instrument(level = "trace", skip_all)]
564
    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
        let msg = select_biased! {
            res = self.command.next() => {
                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
                match cmd {
                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
                    #[cfg(test)]
                    CtrlCmd::AddFakeHop {
                        relay_cell_format: format,
                        fwd_lasthop,
                        rev_lasthop,
                        peer_id,
                        params,
                        done,
                    } => {
                        let leg = self.circuits.single_leg_mut()?;
                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
                        return Ok(())
                    },
                    _ => {
                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
                    }
                }
            },
            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
        };
        match msg {
            CtrlMsg::Create {
                recv_created,
                handshake,
                settings,
                done,
            } => {
                // TODO(conflux): instead of crashing the reactor, it might be better
                // to send the error via the done channel instead
                let leg = self.circuits.single_leg_mut()?;
                leg.handle_create(recv_created, handshake, settings, done)
                    .await
            }
            _ => {
                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
            }
        }
376
    }
    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
    ///
    /// If all the circuits we were waiting on have finished the conflux handshake,
    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
    /// are sent to the handshake initiator.
    #[cfg(feature = "conflux")]
    #[instrument(level = "trace", skip_all)]
84
    fn note_conflux_handshake_result(
84
        &mut self,
84
        res: StdResult<(), ConfluxHandshakeError>,
84
        reactor_is_closing: bool,
84
    ) -> StdResult<(), ReactorError> {
84
        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
84
            Some(conflux_ctx) => {
84
                conflux_ctx.results.push(res);
                // Whether all the legs have finished linking:
84
                conflux_ctx.results.len() == conflux_ctx.num_legs
            }
            None => {
                return Err(internal!("no conflux handshake context").into());
            }
        };
84
        if tunnel_complete || reactor_is_closing {
            // Time to remove the conflux handshake context
            // and extract the results we have collected
48
            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
108
            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
48
            let leg_count = conflux_ctx.results.len();
48
            info!(
                tunnel_id = %self.tunnel_id,
                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
            );
48
            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
            // We don't expect to receive any more handshake results,
            // at least not until we get another LinkCircuits control message,
            // which will install a new ConfluxHandshakeCtx with a channel
            // for us to send updates on
36
        }
80
        Ok(())
84
    }
    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
    fn prepare_msg_and_install_handler(
        &mut self,
        msg: Option<AnyRelayMsgOuter>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<Option<SendRelayCell>> {
        let msg = msg
            .map(|msg| {
                let handlers = &mut self.cell_handlers;
                let handler = handler
                    .as_ref()
                    .or(handlers.meta_handler.as_ref())
                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
                // We should always have a precise HopLocation here so this should never fails but
                // in case we have a ::JointPoint, we'll notice.
                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
                    "MsgHandler doesn't have a precise HopLocation"
                ))?;
                Ok::<_, crate::Error>(SendRelayCell {
                    hop: Some(hop),
                    early: false,
                    cell: msg,
                })
            })
            .transpose()?;
        if let Some(handler) = handler {
            self.cell_handlers.set_meta_handler(handler)?;
        }
        Ok(msg)
    }
    /// Handle a shutdown request.
64
    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
64
        trace!(
            tunnel_id = %self.tunnel_id,
            "reactor shutdown due to explicit request",
        );
64
        Err(ReactorError::Shutdown)
64
    }
    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
    ///
    /// Returns an error over the `answer` channel if the reactor has no circuits,
    /// or more than one circuit. The reactor will shut down regardless.
    #[cfg(feature = "conflux")]
64
    fn handle_shutdown_and_return_circuit(
64
        &mut self,
64
        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
64
    ) -> StdResult<(), ReactorError> {
        // Don't care if the receiver goes away
64
        let _ = answer.send(self.circuits.take_single_leg());
64
        self.handle_shutdown().map(|_| ())
64
    }
    /// Resolves a [`TargetHop`] to a [`HopLocation`].
    ///
    /// After resolving a `TargetHop::LastHop`,
    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
    ///
    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
    /// if you need a fixed hop position in the tunnel.
    /// For example if we open a stream to `TargetHop::LastHop`,
    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
    /// as we don't want the stream position to change as the tunnel is extended or truncated.
    ///
    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
    /// and the tunnel has no hops
    /// (either has no legs, or has legs which contain no hops).
204
    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
204
        match hop {
60
            TargetHop::Hop(hop) => Ok(hop),
            TargetHop::LastHop => {
144
                if let Ok(leg) = self.circuits.single_leg() {
132
                    let leg_id = leg.unique_id();
                    // single-path tunnel
132
                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
132
                    Ok(HopLocation::Hop((leg_id, hop)))
12
                } else if !self.circuits.is_empty() {
                    // multi-path tunnel
12
                    Ok(HopLocation::JoinPoint)
                } else {
                    // no legs
                    Err(NoHopsBuiltError)
                }
            }
        }
204
    }
    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
    ///
    /// After resolving a `HopLocation::JoinPoint`,
    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
    ///
    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
    /// need them,
    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
    /// iterations as the primary leg may change from one iteration to the next.
    ///
    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
    /// but it does not have a join point.
    #[instrument(level = "trace", skip_all)]
220
    fn resolve_hop_location(
220
        &self,
220
        hop: HopLocation,
220
    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
220
        match hop {
208
            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
            HopLocation::JoinPoint => {
12
                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
12
                    Ok((leg_id, hop_num))
                } else {
                    // Attempted to get the join point of a non-multipath tunnel.
                    Err(NoJoinPointError)
                }
            }
        }
220
    }
    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
    ///
    /// This is a helper function that basically calls both resolve_target_hop and
    /// resolve_hop_location back to back.
    ///
    /// It returns None on failure to resolve meaning that if you want more detailed error on why
    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
60
    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
60
        self.resolve_target_hop(hop)
60
            .ok()
90
            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
60
    }
    /// Install or remove a padder at a given hop.
    #[cfg(feature = "circ-padding-manual")]
    fn set_padding_at_hop(
        &self,
        hop: HopLocation,
        padder: Option<super::circuit::padding::CircuitPadder>,
    ) -> Result<()> {
        let HopLocation::Hop((leg_id, hop_num)) = hop else {
            return Err(bad_api_usage!("Padding to the join point is not supported.").into());
        };
        let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
        circ.set_padding_at_hop(hop_num, padder)?;
        Ok(())
    }
    /// Does congestion control use stream SENDMEs for the given hop?
    ///
    /// Returns `None` if either the `leg` or `hop` don't exist.
    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
        self.circuits.uses_stream_sendme(leg, hop)
    }
    /// Handle a request to link some extra circuits in the reactor's conflux set.
    ///
    /// The circuits are validated, and if they do not have the same length,
    /// or if they do not all have the same last hop, an error is returned on
    /// the `answer` channel, and the conflux handshake is *not* initiated.
    ///
    /// If validation succeeds, the circuits are added to this reactor's conflux set,
    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
    ///
    /// NOTE: this blocks the reactor main loop until all the cells are sent.
    #[cfg(feature = "conflux")]
    #[instrument(level = "trace", skip_all)]
60
    async fn handle_link_circuits(
60
        &mut self,
60
        circuits: Vec<Circuit>,
60
        answer: ConfluxLinkResultChannel,
90
    ) -> StdResult<(), ReactorError> {
        use tor_error::warn_report;
        if self.conflux_hs_ctx.is_some() {
            let err = internal!("conflux linking already in progress");
            send_conflux_outcome(answer, Err(err.into()))?;
            return Ok(());
        }
        let unlinked_legs = self.circuits.num_unlinked();
        // We need to send the LINK cell on each of the new circuits
        // and on each of the existing, unlinked legs from self.circuits.
        //
        // In reality, there can only be one such circuit
        // (the "initial" one from the previously single-path tunnel),
        // because any circuits that to complete the conflux handshake
        // get removed from the set.
        let num_legs = circuits.len() + unlinked_legs;
        // Note: add_legs validates `circuits`
60
        let res = async {
60
            self.circuits.add_legs(circuits, &self.runtime)?;
52
            self.circuits.link_circuits(&self.runtime).await
60
        }
        .await;
        if let Err(e) = res {
            warn_report!(e, "Failed to link conflux circuits");
            send_conflux_outcome(answer, Err(e))?;
        } else {
            // Save the channel, to notify the user of completion.
            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
                answer,
                num_legs,
                results: Default::default(),
            });
        }
        Ok(())
60
    }
}
/// Notify the conflux handshake initiator of the handshake outcome.
///
/// Returns an error if the initiator has done away.
#[cfg(feature = "conflux")]
56
fn send_conflux_outcome(
56
    tx: ConfluxLinkResultChannel,
56
    res: Result<ConfluxHandshakeResult>,
56
) -> StdResult<(), ReactorError> {
56
    if tx.send(res).is_err() {
4
        tracing::warn!("conflux initiator went away before handshake completed?");
4
        return Err(ReactorError::Shutdown);
52
    }
52
    Ok(())
56
}
/// The tunnel does not have any hops.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("no hops have been built for this tunnel")]
pub(crate) struct NoHopsBuiltError;
/// The tunnel does not have a join point.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("the tunnel does not have a join point")]
pub(crate) struct NoJoinPointError;
impl CellHandlers {
    /// Try to install a given meta-cell handler to receive any unusual cells on
    /// this circuit, along with a result channel to notify on completion.
72
    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
72
        if self.meta_handler.is_none() {
72
            self.meta_handler = Some(handler);
72
            Ok(())
        } else {
            Err(Error::from(internal!(
                "Tried to install a meta-cell handler before the old one was gone."
            )))
        }
72
    }
    /// Try to install a given cell handler on this circuit.
    #[cfg(feature = "hs-service")]
60
    fn set_incoming_stream_req_handler(
60
        &mut self,
60
        handler: IncomingStreamRequestHandler,
60
    ) -> Result<()> {
60
        if self.incoming_stream_req_handler.is_none() {
48
            self.incoming_stream_req_handler = Some(handler);
48
            Ok(())
        } else {
12
            Err(Error::from(internal!(
12
                "Tried to install a BEGIN cell handler before the old one was gone."
12
            )))
        }
60
    }
}
#[cfg(test)]
mod test {
    // Tested in [`crate::client::circuit::test`].
}