1
//! Module exposing types for representing circuits in the tunnel reactor.
2

            
3
pub(crate) mod circhop;
4
pub(super) mod extender;
5

            
6
use crate::channel::Channel;
7
use crate::circuit::cell_sender::CircuitCellSender;
8
use crate::circuit::celltypes::CreateResponse;
9
use crate::circuit::circhop::{HopSettings, ReactorStreamComponents};
10
use crate::circuit::create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
11
use crate::circuit::padding::CircPaddingDisposition;
12
use crate::circuit::{CircuitRxReceiver, UniqId};
13
use crate::client::circuit::handshake::{BoxedClientLayer, HandshakeRole};
14
use crate::client::circuit::padding::{
15
    self, PaddingController, PaddingEventStream, QueuedCellPaddingInfo,
16
};
17
use crate::client::circuit::{ClientCircChanMsg, MutableState, path};
18
use crate::client::reactor::MetaCellDisposition;
19
use crate::congestion::CongestionSignals;
20
use crate::congestion::sendme;
21
use crate::crypto::binding::CircuitBinding;
22
use crate::crypto::cell::{
23
    HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
24
    RelayCellBody,
25
};
26
use crate::crypto::handshake::fast::CreateFastClient;
27
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
28
use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
29
use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
30
use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
31
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
32
use crate::stream::msg_streamid;
33
use crate::streammap;
34
use crate::tunnel::TunnelScopedCircId;
35
use crate::util::err::ReactorError;
36
use crate::util::timeout::TimeoutEstimator;
37
use crate::{ClockSkew, Error, Result};
38

            
39
use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
40
use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
41
use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
42
use tor_cell::chancell::{BoxedCellBody, ChanMsg};
43
use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
44
use tor_cell::relaycell::{
45
    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg,
46
};
47
use tor_error::{Bug, internal};
48
use tor_linkspec::RelayIds;
49
use tor_llcrypto::pk;
50
use web_time_compat::{Duration, Instant, SystemTime};
51

            
52
use futures::SinkExt as _;
53
use oneshot_fused_workaround as oneshot;
54
use tor_rtcompat::{DynTimeProvider, SleepProvider as _};
55
use tracing::{debug, instrument, trace, warn};
56

            
57
use super::{
58
    CellHandlers, CircuitHandshake, CloseStreamBehavior, ReactorResultChannel, SendRelayCell,
59
};
60

            
61
use crate::conflux::msghandler::ConfluxStatus;
62

            
63
use std::borrow::Borrow;
64
use std::pin::Pin;
65
use std::result::Result as StdResult;
66
use std::sync::Arc;
67

            
68
use extender::HandshakeAuxDataHandler;
69

            
70
#[cfg(feature = "hs-service")]
71
use {
72
    crate::circuit::CircHopSyncView,
73
    crate::client::stream::{InboundDataCmdChecker, IncomingStreamRequest},
74
    tor_cell::relaycell::msg::Begin,
75
};
76

            
77
#[cfg(feature = "conflux")]
78
use {
79
    crate::conflux::msghandler::{ConfluxAction, ConfluxCmd, ConfluxMsgHandler, OooRelayMsg},
80
    crate::tunnel::TunnelId,
81
};
82

            
83
#[cfg(not(feature = "flowctl-cc"))]
84
use crate::stream::STREAM_READER_BUFFER;
85

            
86
pub(super) use circhop::{CircHop, CircHopList};
87

            
88
/// A circuit "leg" from a tunnel.
89
///
90
/// Regular (non-multipath) circuits have a single leg.
91
/// Conflux (multipath) circuits have `N` (usually, `N = 2`).
92
pub(crate) struct Circuit {
93
    /// The time provider.
94
    runtime: DynTimeProvider,
95
    /// The channel this circuit is attached to.
96
    channel: Arc<Channel>,
97
    /// Sender object used to actually send cells.
98
    ///
99
    /// NOTE: Control messages could potentially add unboundedly to this, although that's
100
    ///       not likely to happen (and isn't triggereable from the network, either).
101
    pub(super) chan_sender: CircuitCellSender,
102
    /// Input stream, on which we receive ChanMsg objects from this circuit's
103
    /// channel.
104
    ///
105
    // TODO: could use a SPSC channel here instead.
106
    pub(super) input: CircuitRxReceiver,
107
    /// The cryptographic state for this circuit for inbound cells.
108
    /// This object is divided into multiple layers, each of which is
109
    /// shared with one hop of the circuit.
110
    crypto_in: InboundClientCrypt,
111
    /// The cryptographic state for this circuit for outbound cells.
112
    crypto_out: OutboundClientCrypt,
113
    /// List of hops state objects used by the reactor
114
    pub(super) hops: CircHopList,
115
    /// Mutable information about this circuit,
116
    /// shared with the reactor's `ConfluxSet`.
117
    mutable: Arc<MutableState>,
118
    /// This circuit's identifier on the upstream channel.
119
    channel_id: CircId,
120
    /// An identifier for logging about this reactor's circuit.
121
    unique_id: TunnelScopedCircId,
122
    /// A handler for conflux cells.
123
    ///
124
    /// Set once the conflux handshake is initiated by the reactor
125
    /// using [`Reactor::handle_link_circuits`](super::Reactor::handle_link_circuits).
126
    #[cfg(feature = "conflux")]
127
    conflux_handler: Option<ConfluxMsgHandler>,
128
    /// A padding controller to which padding-related events should be reported.
129
    padding_ctrl: PaddingController,
130
    /// An event stream telling us about padding-related events.
131
    //
132
    // TODO: it would be nice to have all of these streams wrapped in a single
133
    // SelectAll, but we can't really do that, since we need the ability to move them
134
    // from one conflux set to another, and a SelectAll doesn't let you actually
135
    // remove one of its constituent streams.  This issue might get solved along
136
    // with the rest of the next reactor refactoring.
137
    pub(super) padding_event_stream: PaddingEventStream,
138
    /// Current rules for blocking traffic, according to the padding controller.
139
    #[cfg(feature = "circ-padding")]
140
    padding_block: Option<padding::StartBlocking>,
141
    /// The circuit timeout estimator.
142
    ///
143
    /// Used for computing half-stream expiration.
144
    timeouts: Arc<dyn TimeoutEstimator>,
145
    /// Memory quota account
146
    #[allow(dead_code)] // Partly here to keep it alive as long as the circuit
147
    memquota: CircuitAccount,
148
}
149

            
150
/// A command to run in response to a circuit event.
151
///
152
/// Unlike `RunOnceCmdInner`, doesn't know anything about `UniqId`s.
153
/// The user of the `CircuitCmd`s is supposed to know the `UniqId`
154
/// of the circuit the `CircuitCmd` came from.
155
///
156
/// This type gets mapped to a `RunOnceCmdInner` in the circuit reactor.
157
#[derive(Debug, derive_more::From)]
158
pub(super) enum CircuitCmd {
159
    /// Send a RELAY cell on the circuit leg this command originates from.
160
    Send(SendRelayCell),
161
    /// Handle a SENDME message received on the circuit leg this command originates from.
162
    HandleSendMe {
163
        /// The hop number.
164
        hop: HopNum,
165
        /// The SENDME message to handle.
166
        sendme: Sendme,
167
    },
168
    /// Close the specified stream on the circuit leg this command originates from.
169
    CloseStream {
170
        /// The hop number.
171
        hop: HopNum,
172
        /// The ID of the stream to close.
173
        sid: StreamId,
174
        /// The stream-closing behavior.
175
        behav: CloseStreamBehavior,
176
        /// The reason for closing the stream.
177
        reason: streammap::TerminateReason,
178
    },
179
    /// Perform an action resulting from handling a conflux cell.
180
    #[cfg(feature = "conflux")]
181
    Conflux(ConfluxCmd),
182
    /// Perform a clean shutdown on this circuit.
183
    CleanShutdown,
184
    /// Enqueue an out-of-order cell in the reactor.
185
    #[cfg(feature = "conflux")]
186
    Enqueue(OooRelayMsg),
187
}
188

            
189
/// Return a `CircProto` error for the specified unsupported cell.
190
///
191
/// This error will shut down the reactor.
192
///
193
/// Note: this is a macro to simplify usage (this way the caller doesn't
194
/// need to .map() the result to the appropriate type)
195
macro_rules! unsupported_client_cell {
196
    ($msg:expr) => {{
197
        unsupported_client_cell!(@ $msg, "")
198
    }};
199

            
200
    ($msg:expr, $hopnum:expr) => {{
201
        let hop: HopNum = $hopnum;
202
        let hop_display = format!(" from hop {}", hop.display());
203
        unsupported_client_cell!(@ $msg, hop_display)
204
    }};
205

            
206
    (@ $msg:expr, $hopnum_display:expr) => {
207
        Err(crate::Error::CircProto(format!(
208
            "Unexpected {} cell{} on client circuit",
209
            $msg.cmd(),
210
            $hopnum_display,
211
        )))
212
    };
213
}
214

            
215
pub(super) use unsupported_client_cell;
216

            
217
impl Circuit {
218
    /// Create a new non-multipath circuit.
219
    #[allow(clippy::too_many_arguments)]
220
376
    pub(super) fn new(
221
376
        runtime: DynTimeProvider,
222
376
        channel: Arc<Channel>,
223
376
        channel_id: CircId,
224
376
        unique_id: TunnelScopedCircId,
225
376
        input: CircuitRxReceiver,
226
376
        memquota: CircuitAccount,
227
376
        mutable: Arc<MutableState>,
228
376
        padding_ctrl: PaddingController,
229
376
        padding_event_stream: PaddingEventStream,
230
376
        timeouts: Arc<dyn TimeoutEstimator>,
231
376
    ) -> Self {
232
376
        let chan_sender = CircuitCellSender::from_channel_sender(channel.sender());
233

            
234
376
        let crypto_out = OutboundClientCrypt::new();
235
376
        Circuit {
236
376
            runtime,
237
376
            channel,
238
376
            chan_sender,
239
376
            input,
240
376
            crypto_in: InboundClientCrypt::new(),
241
376
            hops: CircHopList::default(),
242
376
            unique_id,
243
376
            channel_id,
244
376
            crypto_out,
245
376
            mutable,
246
376
            #[cfg(feature = "conflux")]
247
376
            conflux_handler: None,
248
376
            padding_ctrl,
249
376
            padding_event_stream,
250
376
            #[cfg(feature = "circ-padding")]
251
376
            padding_block: None,
252
376
            timeouts,
253
376
            memquota,
254
376
        }
255
376
    }
256

            
257
    /// Return the process-unique identifier of this circuit.
258
18408
    pub(super) fn unique_id(&self) -> UniqId {
259
18408
        self.unique_id.unique_id()
260
18408
    }
261

            
262
    /// Return the shared mutable state of this circuit.
263
428
    pub(super) fn mutable(&self) -> &Arc<MutableState> {
264
428
        &self.mutable
265
428
    }
266

            
267
    /// Add this circuit to a multipath tunnel, by associating it with a new [`TunnelId`],
268
    /// and installing a [`ConfluxMsgHandler`] on this circuit.
269
    ///
270
    /// Once this is called, the circuit will be able to handle conflux cells.
271
    #[cfg(feature = "conflux")]
272
104
    pub(super) fn add_to_conflux_tunnel(
273
104
        &mut self,
274
104
        tunnel_id: TunnelId,
275
104
        conflux_handler: ConfluxMsgHandler,
276
104
    ) {
277
104
        self.unique_id = TunnelScopedCircId::new(tunnel_id, self.unique_id.unique_id());
278
104
        self.conflux_handler = Some(conflux_handler);
279
104
    }
280

            
281
    /// Send a LINK cell to the specified hop.
282
    ///
283
    /// This must be called *after* a [`ConfluxMsgHandler`] is installed
284
    /// on the circuit with [`add_to_conflux_tunnel`](Self::add_to_conflux_tunnel).
285
    #[cfg(feature = "conflux")]
286
104
    pub(super) async fn begin_conflux_link(
287
104
        &mut self,
288
104
        hop: HopNum,
289
104
        cell: AnyRelayMsgOuter,
290
104
        runtime: &tor_rtcompat::DynTimeProvider,
291
156
    ) -> Result<()> {
292
        use tor_rtcompat::SleepProvider as _;
293

            
294
104
        if self.conflux_handler.is_none() {
295
            return Err(internal!(
296
                "tried to send LINK cell before installing a ConfluxMsgHandler?!"
297
            )
298
            .into());
299
104
        }
300

            
301
104
        let cell = SendRelayCell {
302
104
            hop: Some(hop),
303
104
            early: false,
304
104
            cell,
305
104
        };
306
104
        self.send_relay_cell(cell).await?;
307

            
308
104
        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
309
            return Err(internal!("ConfluxMsgHandler disappeared?!").into());
310
        };
311

            
312
104
        Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
313
104
    }
314

            
315
    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
316
    ///
317
    /// Returns `None` if the handshake is not currently in progress.
318
7052
    pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
319
        cfg_if::cfg_if! {
320
            if #[cfg(feature = "conflux")] {
321
8504
                self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
322
            } else {
323
                None
324
            }
325
        }
326
7052
    }
327

            
328
    /// Handle a [`CtrlMsg::AddFakeHop`](super::CtrlMsg::AddFakeHop) message.
329
    #[cfg(test)]
330
948
    pub(super) fn handle_add_fake_hop(
331
948
        &mut self,
332
948
        format: RelayCellFormat,
333
948
        fwd_lasthop: bool,
334
948
        rev_lasthop: bool,
335
948
        dummy_peer_id: path::HopDetail,
336
948
        // TODO-CGO: Take HopSettings instead of CircParams.
337
948
        // (Do this after we've got the virtual-hop refactorings done for
338
948
        // virtual extending.)
339
948
        params: &crate::client::circuit::CircParameters,
340
948
        done: ReactorResultChannel<()>,
341
948
    ) {
342
        use tor_protover::{Protocols, named};
343

            
344
        use crate::client::circuit::test::DummyCrypto;
345

            
346
948
        assert!(matches!(format, RelayCellFormat::V0));
347
948
        let _ = format; // TODO-CGO: remove this once we have CGO+hs implemented.
348

            
349
948
        let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
350
948
        let rev = Box::new(DummyCrypto::new(rev_lasthop));
351
948
        let binding = None;
352

            
353
948
        let settings = HopSettings::from_params_and_caps(
354
            // This is for testing only, so we'll assume full negotiation took place.
355
948
            crate::circuit::circhop::HopNegotiationType::Full,
356
948
            params,
357
948
            &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
358
        )
359
948
        .expect("Can't construct HopSettings");
360
948
        self.add_hop(dummy_peer_id, fwd, rev, binding, &settings)
361
948
            .expect("could not add hop to circuit");
362
948
        let _ = done.send(Ok(()));
363
948
    }
364

            
365
    /// Encode `msg` and encrypt it, returning the resulting cell
366
    /// and tag that should be expected for an authenticated SENDME sent
367
    /// in response to that cell.
368
4588
    fn encode_relay_cell(
369
4588
        crypto_out: &mut OutboundClientCrypt,
370
4588
        relay_format: RelayCellFormat,
371
4588
        hop: HopNum,
372
4588
        early: bool,
373
4588
        msg: AnyRelayMsgOuter,
374
4588
    ) -> Result<(AnyChanMsg, SendmeTag)> {
375
4588
        let mut body: RelayCellBody = msg
376
4588
            .encode(relay_format, &mut rand::rng())
377
4588
            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
378
4588
            .into();
379
4588
        let cmd = if early {
380
72
            ChanCmd::RELAY_EARLY
381
        } else {
382
4516
            ChanCmd::RELAY
383
        };
384
4588
        let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
385
4588
        let msg = Relay::from(BoxedCellBody::from(body));
386
4588
        let msg = if early {
387
72
            AnyChanMsg::RelayEarly(msg.into())
388
        } else {
389
4516
            AnyChanMsg::Relay(msg)
390
        };
391

            
392
4588
        Ok((msg, tag))
393
4588
    }
394

            
395
    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
396
    ///
397
    /// If there is insufficient outgoing *circuit-level* or *stream-level*
398
    /// SENDME window, an error is returned instead.
399
    ///
400
    /// Does not check whether the cell is well-formed or reasonable.
401
    ///
402
    /// NOTE: the reactor should not call this function directly, only via
403
    /// [`ConfluxSet::send_relay_cell_on_leg`](super::conflux::ConfluxSet::send_relay_cell_on_leg),
404
    /// which will reroute the message, if necessary to the primary leg.
405
    #[instrument(level = "trace", skip_all)]
406
6882
    pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
407
        self.send_relay_cell_inner(msg, None).await
408
4588
    }
409

            
410
    /// As [`send_relay_cell`](Self::send_relay_cell), but takes an optional
411
    /// [`QueuedCellPaddingInfo`] in `padding_info`.
412
    ///
413
    /// If `padding_info` is None, `msg` must be non-padding: we report it as such to the
414
    /// padding controller.
415
    #[instrument(level = "trace", skip_all)]
416
4588
    async fn send_relay_cell_inner(
417
4588
        &mut self,
418
4588
        msg: SendRelayCell,
419
4588
        padding_info: Option<QueuedCellPaddingInfo>,
420
6882
    ) -> Result<()> {
421
        let SendRelayCell {
422
            hop,
423
            early,
424
            cell: msg,
425
4588
        } = msg;
426

            
427
        let is_conflux_link = msg.cmd() == RelayCmd::CONFLUX_LINK;
428
        if !is_conflux_link && self.is_conflux_pending() {
429
            // Note: it is the responsibility of the reactor user to wait until
430
            // at least one of the legs completes the handshake.
431
            return Err(internal!("tried to send cell on unlinked circuit").into());
432
        }
433

            
434
        trace!(circ_id = %self.unique_id, cell = ?msg, "sending relay cell");
435

            
436
        // Cloned, because we borrow mutably from self when we get the circhop.
437
        let runtime = self.runtime.clone();
438
        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
439
        let stream_id = msg.stream_id();
440
        let hop = hop.expect("missing hop in client SendRelayCell?!");
441
        let circhop = self.hops.get_mut(hop).ok_or(Error::NoSuchHop)?;
442

            
443
        // We might be out of capacity entirely; see if we are about to hit a limit.
444
        //
445
        // TODO: If we ever add a notion of _recoverable_ errors below, we'll
446
        // need a way to restore this limit, and similarly for about_to_send().
447
        circhop.decrement_outbound_cell_limit()?;
448

            
449
        // We need to apply stream-level flow control *before* encoding the message.
450
        if c_t_w {
451
            if let Some(stream_id) = stream_id {
452
                circhop.about_to_send(stream_id, msg.msg())?;
453
            }
454
        }
455

            
456
        // Save the RelayCmd of the message before it gets consumed below.
457
        // We need this to tell our ConfluxMsgHandler about the cell we've just sent,
458
        // so that it can update its counters.
459
        let relay_cmd = msg.cmd();
460

            
461
        // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
462
        //            the whole circuit (e.g. by returning an error).
463
        let (msg, tag) = Self::encode_relay_cell(
464
            &mut self.crypto_out,
465
            circhop.relay_cell_format(),
466
            hop,
467
            early,
468
            msg,
469
        )?;
470
        // The cell counted for congestion control, inform our algorithm of such and pass down the
471
        // tag for authenticated SENDMEs.
472
        if c_t_w {
473
            circhop.ccontrol().note_data_sent(&runtime, &tag)?;
474
        }
475

            
476
        // Remember that we've enqueued this cell.
477
4588
        let padding_info = padding_info.or_else(|| self.padding_ctrl.queued_data(hop));
478

            
479
        self.send_msg(msg, padding_info).await?;
480

            
481
        #[cfg(feature = "conflux")]
482
        if let Some(conflux) = self.conflux_handler.as_mut() {
483
            conflux.note_cell_sent(relay_cmd);
484
        }
485

            
486
        Ok(())
487
4588
    }
488

            
489
    /// Helper: process a cell on a channel.  Most cells get ignored
490
    /// or rejected; a few get delivered to circuits.
491
    ///
492
    /// Return `CellStatus::CleanShutdown` if we should exit.
493
    ///
494
    // TODO: returning `Vec<CircuitCmd>` means we're unnecessarily
495
    // allocating a `Vec` here. Generally, the number of commands is going to be small
496
    // (usually 1, but > 1 when we start supporting packed cells).
497
    //
498
    // We should consider using smallvec instead. It might also be a good idea to have a
499
    // separate higher-level type splitting this out into Single(CircuitCmd),
500
    // and Multiple(SmallVec<[CircuitCmd; <capacity>]>).
501
512
    pub(super) fn handle_cell(
502
512
        &mut self,
503
512
        handlers: &mut CellHandlers,
504
512
        leg: UniqId,
505
512
        cell: ClientCircChanMsg,
506
512
    ) -> Result<Vec<CircuitCmd>> {
507
512
        trace!(circ_id = %self.unique_id, cell = ?cell, "handling cell");
508
        use ClientCircChanMsg::*;
509
512
        match cell {
510
500
            Relay(r) => self.handle_relay_cell(handlers, leg, r),
511
12
            Destroy(d) => {
512
12
                let reason = d.reason();
513
12
                debug!(
514
                    circ_id = %self.unique_id,
515
                    "Received DESTROY cell. Reason: {} [{}]",
516
                    reason.human_str(),
517
                    reason
518
                );
519

            
520
18
                self.handle_destroy_cell().map(|c| vec![c])
521
            }
522
        }
523
512
    }
524

            
525
    /// Decode `cell`, returning its corresponding hop number, tag,
526
    /// and decoded body.
527
500
    fn decode_relay_cell(
528
500
        &mut self,
529
500
        cell: Relay,
530
500
    ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
531
        // This is always RELAY, not RELAY_EARLY, so long as this code is client-only.
532
500
        let cmd = cell.cmd();
533
500
        let mut body = cell.into_relay_body().into();
534

            
535
        // Decrypt the cell. If it's recognized, then find the
536
        // corresponding hop.
537
500
        let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
538

            
539
        // Decode the cell.
540
500
        let decode_res = self
541
500
            .hop_mut(hopnum)
542
500
            .ok_or_else(|| {
543
                Error::from(internal!(
544
                    "Trying to decode cell from nonexistent hop {:?}",
545
                    hopnum
546
                ))
547
            })?
548
500
            .decode(body.into())?;
549

            
550
500
        Ok((hopnum, tag, decode_res))
551
500
    }
552

            
553
    /// React to a Relay or RelayEarly cell.
554
500
    fn handle_relay_cell(
555
500
        &mut self,
556
500
        handlers: &mut CellHandlers,
557
500
        leg: UniqId,
558
500
        cell: Relay,
559
500
    ) -> Result<Vec<CircuitCmd>> {
560
500
        let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
561

            
562
500
        if decode_res.is_padding() {
563
            self.padding_ctrl.decrypted_padding(hopnum)?;
564
500
        } else {
565
500
            self.padding_ctrl.decrypted_data(hopnum);
566
500
        }
567

            
568
        // Check whether we are allowed to receive more data for this circuit hop.
569
500
        self.hop_mut(hopnum)
570
500
            .ok_or_else(|| internal!("nonexistent hop {:?}", hopnum))?
571
500
            .decrement_inbound_cell_limit()?;
572

            
573
500
        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
574

            
575
        // Decrement the circuit sendme windows, and see if we need to
576
        // send a sendme cell.
577
500
        let send_circ_sendme = if c_t_w {
578
96
            self.hop_mut(hopnum)
579
96
                .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
580
96
                .ccontrol()
581
96
                .note_data_received()?
582
        } else {
583
404
            false
584
        };
585

            
586
500
        let mut circ_cmds = vec![];
587
        // If we do need to send a circuit-level SENDME cell, do so.
588
500
        if send_circ_sendme {
589
            // This always sends a V1 (tagged) sendme cell, and thereby assumes
590
            // that SendmeEmitMinVersion is no more than 1.  If the authorities
591
            // every increase that parameter to a higher number, this will
592
            // become incorrect.  (Higher numbers are not currently defined.)
593
            let sendme = Sendme::from(tag);
594
            let cell = AnyRelayMsgOuter::new(None, sendme.into());
595
            circ_cmds.push(CircuitCmd::Send(SendRelayCell {
596
                hop: Some(hopnum),
597
                early: false,
598
                cell,
599
            }));
600

            
601
            // Inform congestion control of the SENDME we are sending. This is a circuit level one.
602
            self.hop_mut(hopnum)
603
                .ok_or_else(|| {
604
                    Error::from(internal!(
605
                        "Trying to send SENDME to nonexistent hop {:?}",
606
                        hopnum
607
                    ))
608
                })?
609
                .ccontrol()
610
                .note_sendme_sent()?;
611
500
        }
612

            
613
500
        let (mut msgs, incomplete) = decode_res.into_parts();
614
932
        while let Some(msg) = msgs.next() {
615
500
            let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
616

            
617
152
            match msg_status {
618
280
                None => continue,
619
                Some(msg @ CircuitCmd::CleanShutdown) => {
620
                    for m in msgs {
621
                        debug!(
622
                            "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
623
                            id = self.unique_id
624
                        );
625
                    }
626
                    if let Some(incomplete) = incomplete {
627
                        debug!(
628
                            "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
629
                            incomplete,
630
                            id = self.unique_id,
631
                        );
632
                    }
633
                    circ_cmds.push(msg);
634
                    return Ok(circ_cmds);
635
                }
636
152
                Some(msg) => {
637
152
                    circ_cmds.push(msg);
638
152
                }
639
            }
640
        }
641

            
642
432
        Ok(circ_cmds)
643
500
    }
644

            
645
    /// Handle a single incoming relay message.
646
500
    fn handle_relay_msg(
647
500
        &mut self,
648
500
        handlers: &mut CellHandlers,
649
500
        hopnum: HopNum,
650
500
        leg: UniqId,
651
500
        cell_counts_toward_windows: bool,
652
500
        msg: UnparsedRelayMsg,
653
500
    ) -> Result<Option<CircuitCmd>> {
654
        // If this msg wants/refuses to have a Stream ID, does it
655
        // have/not have one?
656
500
        let streamid = msg_streamid(&msg)?;
657

            
658
        // If this doesn't have a StreamId, it's a meta cell,
659
        // not meant for a particular stream.
660
500
        let Some(streamid) = streamid else {
661
232
            return self.handle_meta_cell(handlers, hopnum, msg);
662
        };
663

            
664
        #[cfg(feature = "conflux")]
665
268
        let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
666
76
            match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
667
60
                ConfluxAction::Deliver(msg) => {
668
                    // The message either doesn't count towards the sequence numbers
669
                    // or is already well-ordered, so we're ready to handle it.
670

            
671
                    // It's possible that some of our buffered messages are now ready to be
672
                    // handled. We don't check that here, however, because that's handled
673
                    // by the reactor main loop.
674
60
                    msg
675
                }
676
16
                ConfluxAction::Enqueue(msg) => {
677
                    // Tell the reactor to enqueue this msg
678
16
                    return Ok(Some(CircuitCmd::Enqueue(msg)));
679
                }
680
            }
681
        } else {
682
            // If we don't have a conflux_handler, it means this circuit is not part of
683
            // a conflux tunnel, so we can just process the message.
684
192
            msg
685
        };
686

            
687
252
        self.handle_in_order_relay_msg(
688
252
            handlers,
689
252
            hopnum,
690
252
            leg,
691
252
            cell_counts_toward_windows,
692
252
            streamid,
693
252
            msg,
694
        )
695
500
    }
696

            
697
    /// Handle a single incoming relay message that is known to be in order.
698
268
    pub(super) fn handle_in_order_relay_msg(
699
268
        &mut self,
700
268
        handlers: &mut CellHandlers,
701
268
        hopnum: HopNum,
702
268
        leg: UniqId,
703
268
        cell_counts_toward_windows: bool,
704
268
        streamid: StreamId,
705
268
        msg: UnparsedRelayMsg,
706
268
    ) -> Result<Option<CircuitCmd>> {
707
268
        let now = self.runtime.now();
708

            
709
        #[cfg(feature = "conflux")]
710
268
        if let Some(conflux) = self.conflux_handler.as_mut() {
711
76
            conflux.inc_last_seq_delivered(&msg);
712
192
        }
713

            
714
268
        let path = self.mutable.path();
715

            
716
268
        let nonexistent_hop_err = || Error::CircProto("Cell from nonexistent hop!".into());
717
268
        let hop = self.hop_mut(hopnum).ok_or_else(nonexistent_hop_err)?;
718

            
719
268
        let hop_detail = path
720
268
            .iter()
721
268
            .nth(usize::from(hopnum))
722
268
            .ok_or_else(nonexistent_hop_err)?;
723

            
724
        // Returns the original message if it's an incoming stream request
725
        // that we need to handle.
726
268
        let res = hop.handle_msg(hop_detail, cell_counts_toward_windows, streamid, msg, now)?;
727

            
728
        // If it was an incoming stream request, we don't need to worry about
729
        // sending an XOFF as there's no stream data within this message.
730
264
        if let Some(msg) = res {
731
            cfg_if::cfg_if! {
732
                if #[cfg(feature = "hs-service")] {
733
48
                    return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
734
                } else {
735
                    return Err(internal!("incoming stream not rejected, but hs-service feature is disabled?!").into());
736
                }
737
            }
738
216
        }
739

            
740
        // We may want to send an XOFF if the incoming buffer is too large.
741
216
        if let Some(cell) = hop.maybe_send_xoff(streamid)? {
742
            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
743
            let cell = SendRelayCell {
744
                hop: Some(hopnum),
745
                early: false,
746
                cell,
747
            };
748
            return Ok(Some(CircuitCmd::Send(cell)));
749
216
        }
750

            
751
216
        Ok(None)
752
268
    }
753

            
754
    /// Handle a conflux message coming from the specified hop.
755
    ///
756
    /// Returns an error if
757
    ///
758
    ///   * this is not a conflux circuit (i.e. it doesn't have a [`ConfluxMsgHandler`])
759
    ///   * this is a client circuit and the conflux message originated an unexpected hop
760
    ///   * the cell was sent in violation of the handshake protocol
761
    #[cfg(feature = "conflux")]
762
128
    fn handle_conflux_msg(
763
128
        &mut self,
764
128
        hop: HopNum,
765
128
        msg: UnparsedRelayMsg,
766
128
    ) -> Result<Option<ConfluxCmd>> {
767
128
        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
768
            // If conflux is not enabled, tear down the circuit
769
            // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329)
770
16
            return Err(Error::CircProto(format!(
771
16
                "Received {} cell from hop {} on non-conflux client circuit?!",
772
16
                msg.cmd(),
773
16
                hop.display(),
774
16
            )));
775
        };
776

            
777
112
        Ok(conflux_handler.handle_conflux_msg(msg, hop))
778
128
    }
779

            
780
    /// For conflux: return the sequence number of the last cell sent on this leg.
781
    ///
782
    /// Returns an error if this circuit is not part of a conflux set.
783
    #[cfg(feature = "conflux")]
784
48
    pub(super) fn last_seq_sent(&self) -> Result<u64> {
785
48
        let handler = self
786
48
            .conflux_handler
787
48
            .as_ref()
788
48
            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
789

            
790
48
        Ok(handler.last_seq_sent())
791
48
    }
792

            
793
    /// For conflux: set the sequence number of the last cell sent on this leg.
794
    ///
795
    /// Returns an error if this circuit is not part of a conflux set.
796
    #[cfg(feature = "conflux")]
797
8
    pub(super) fn set_last_seq_sent(&mut self, n: u64) -> Result<()> {
798
8
        let handler = self
799
8
            .conflux_handler
800
8
            .as_mut()
801
8
            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
802

            
803
8
        handler.set_last_seq_sent(n);
804
8
        Ok(())
805
8
    }
806

            
807
    /// For conflux: return the sequence number of the last cell received on this leg.
808
    ///
809
    /// Returns an error if this circuit is not part of a conflux set.
810
    #[cfg(feature = "conflux")]
811
32
    pub(super) fn last_seq_recv(&self) -> Result<u64> {
812
32
        let handler = self
813
32
            .conflux_handler
814
32
            .as_ref()
815
32
            .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
816

            
817
32
        Ok(handler.last_seq_recv())
818
32
    }
819

            
820
    /// A helper for handling incoming stream requests.
821
    ///
822
    // TODO: can we make this a method on CircHop to avoid the double HopNum lookup?
823
    #[cfg(feature = "hs-service")]
824
48
    fn handle_incoming_stream_request(
825
48
        &mut self,
826
48
        handlers: &mut CellHandlers,
827
48
        msg: UnparsedRelayMsg,
828
48
        stream_id: StreamId,
829
48
        hop_num: HopNum,
830
48
        leg: UniqId,
831
48
    ) -> Result<Option<CircuitCmd>> {
832
        use tor_cell::relaycell::msg::EndReason;
833
        use tor_error::into_internal;
834
        use tor_log_ratelim::log_ratelim;
835

            
836
        use crate::stream::incoming::StreamReqInfo;
837

            
838
        // We need to construct this early so that we don't double-borrow &mut self
839

            
840
48
        let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
841
            return Err(Error::CircProto(
842
                "Cannot handle BEGIN cells on this circuit".into(),
843
            ));
844
        };
845

            
846
        // The handler's hop_num is only ever set to None for relays.
847
48
        let expected_hop_num = handler
848
48
            .hop_num
849
48
            .ok_or_else(|| internal!("Handler HopNum is None in client impl?!"))?;
850

            
851
48
        if hop_num != expected_hop_num {
852
12
            return Err(Error::CircProto(format!(
853
12
                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
854
12
                expected_hop_num.display(),
855
12
                msg.cmd(),
856
12
                hop_num.display()
857
12
            )));
858
36
        }
859

            
860
36
        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
861

            
862
        // TODO: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
863
        // have to look it up again! However, we can't pass the `&mut hop` reference from
864
        // `handle_relay_cell` to this function, because that makes Rust angry (we'd be
865
        // borrowing self as mutable more than once).
866
        //
867
        // TODO: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
868
        // handle_relay_cell to work around the problem described above
869
36
        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
870

            
871
36
        if message_closes_stream {
872
            hop.ending_msg_received(stream_id)?;
873

            
874
            return Ok(None);
875
36
        }
876

            
877
36
        let begin = msg
878
36
            .decode::<Begin>()
879
36
            .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
880
36
            .into_msg();
881

            
882
36
        let req = IncomingStreamRequest::Begin(begin);
883

            
884
        {
885
            use crate::client::stream::IncomingStreamRequestDisposition::*;
886

            
887
36
            let ctx = crate::client::stream::IncomingStreamRequestContext { request: &req };
888
            // IMPORTANT: super::syncview::CircHopSyncView::n_open_streams() (called via disposition() below)
889
            // accesses the stream map mutexes!
890
            //
891
            // This means it's very important not to call this function while any of the hop's
892
            // stream map mutex is held.
893
36
            let view = CircHopSyncView::new(hop.outbound());
894

            
895
36
            match handler.filter.as_mut().disposition(&ctx, &view)? {
896
36
                Accept => {}
897
                CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
898
                RejectRequest(end) => {
899
                    let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
900
                    let cell = SendRelayCell {
901
                        hop: Some(hop_num),
902
                        early: false,
903
                        cell: end_msg,
904
                    };
905
                    return Ok(Some(CircuitCmd::Send(cell)));
906
                }
907
            }
908
        }
909

            
910
        // TODO: Sadly, we need to look up `&mut hop` yet again,
911
        // since we needed to pass `&self.hops` by reference to our filter above. :(
912
36
        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
913
36
        let relay_cell_format = hop.relay_cell_format();
914

            
915
36
        let memquota = StreamAccount::new(&self.memquota)?;
916

            
917
36
        let cmd_checker = InboundDataCmdChecker::new_connected();
918
36
        let stream_components = hop.add_ent_with_id(
919
36
            self.chan_sender.time_provider(),
920
36
            stream_id,
921
36
            cmd_checker,
922
36
            &memquota,
923
        )?;
924

            
925
36
        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
926
36
            req,
927
36
            stream_id,
928
36
            hop: Some((leg, hop_num).into()),
929
36
            stream_components,
930
36
            memquota,
931
36
            relay_cell_format,
932
36
        });
933

            
934
36
        log_ratelim!("Delivering message to incoming stream handler"; outcome);
935

            
936
36
        if let Err(e) = outcome {
937
            if e.is_full() {
938
                // The IncomingStreamRequestHandler's stream is full; it isn't
939
                // handling requests fast enough. So instead, we reply with an
940
                // END cell.
941
                let end_msg = AnyRelayMsgOuter::new(
942
                    Some(stream_id),
943
                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
944
                );
945

            
946
                let cell = SendRelayCell {
947
                    hop: Some(hop_num),
948
                    early: false,
949
                    cell: end_msg,
950
                };
951
                return Ok(Some(CircuitCmd::Send(cell)));
952
            } else if e.is_disconnected() {
953
                // The IncomingStreamRequestHandler's stream has been dropped.
954
                // In the Tor protocol as it stands, this always means that the
955
                // circuit itself is out-of-use and should be closed. (See notes
956
                // on `allow_stream_requests.`)
957
                //
958
                // Note that we will _not_ reach this point immediately after
959
                // the IncomingStreamRequestHandler is dropped; we won't hit it
960
                // until we next get an incoming request.  Thus, if we do later
961
                // want to add early detection for a dropped
962
                // IncomingStreamRequestHandler, we need to do it elsewhere, in
963
                // a different way.
964
                debug!(
965
                    circ_id = %self.unique_id,
966
                    "Incoming stream request receiver dropped",
967
                );
968
                // This will _cause_ the circuit to get closed.
969
                return Err(Error::CircuitClosed);
970
            } else {
971
                // There are no errors like this with the current design of
972
                // futures::mpsc, but we shouldn't just ignore the possibility
973
                // that they'll be added later.
974
                return Err(Error::from((into_internal!(
975
                    "try_send failed unexpectedly"
976
                ))(e)));
977
            }
978
36
        }
979

            
980
36
        Ok(None)
981
48
    }
982

            
983
    /// Helper: process a destroy cell.
984
    #[allow(clippy::unnecessary_wraps)]
985
12
    fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
986
        // I think there is nothing more to do here.
987
12
        Ok(CircuitCmd::CleanShutdown)
988
12
    }
989

            
990
    /// Handle a [`CtrlMsg::Create`](super::CtrlMsg::Create) message.
991
48
    pub(super) async fn handle_create(
992
48
        &mut self,
993
48
        recv_created: oneshot::Receiver<CreateResponse>,
994
48
        handshake: CircuitHandshake,
995
48
        settings: HopSettings,
996
48
        done: ReactorResultChannel<()>,
997
72
    ) -> StdResult<(), ReactorError> {
998
48
        let ret = match handshake {
999
12
            CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, settings).await,
            CircuitHandshake::Ntor {
12
                public_key,
12
                ed_identity,
            } => {
12
                self.create_firsthop_ntor(recv_created, ed_identity, public_key, settings)
12
                    .await
            }
24
            CircuitHandshake::NtorV3 { public_key } => {
24
                self.create_firsthop_ntor_v3(recv_created, public_key, settings)
24
                    .await
            }
        };
48
        let _ = done.send(ret); // don't care if sender goes away
        // TODO: maybe we don't need to flush here?
        // (we could let run_once() handle all the flushing)
48
        self.chan_sender.flush().await?;
48
        Ok(())
48
    }
    /// Helper: create the first hop of a circuit.
    ///
    /// This is parameterized not just on the RNG, but a wrapper object to
    /// build the right kind of create cell, and a handshake object to perform
    /// the cryptographic handshake.
48
    async fn create_impl<H, W, M>(
48
        &mut self,
48
        recvcreated: oneshot::Receiver<CreateResponse>,
48
        wrap: &W,
48
        key: &H::KeyType,
48
        mut settings: HopSettings,
48
        msg: &M,
48
    ) -> Result<()>
48
    where
48
        H: ClientHandshake + HandshakeAuxDataHandler,
48
        W: CreateHandshakeWrap,
48
        H::KeyGen: KeyGenerator,
48
        M: Borrow<H::ClientAuxData>,
48
    {
        // We don't need to shut down the circuit on failure here, since this
        // function consumes the PendingClientCirc and only returns
        // a ClientCirc on success.
48
        let (state, msg) = H::client1(&mut rand::rng(), key, msg)?;
48
        let create_cell = wrap.to_chanmsg(msg);
48
        trace!(
            circ_id = %self.unique_id,
            create = %create_cell.cmd(),
            "Extending to hop 1",
        );
48
        self.send_msg(create_cell, None).await?;
48
        let reply = recvcreated
48
            .await
48
            .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
48
        let relay_handshake = wrap.decode_chanmsg(reply)?;
48
        let (server_msg, keygen) = H::client2(state, relay_handshake)?;
48
        H::handle_server_aux_data(&mut settings, &server_msg)?;
48
        let BoxedClientLayer { fwd, back, binding } = settings
48
            .relay_crypt_protocol()
48
            .construct_client_layers(HandshakeRole::Initiator, keygen)?;
48
        trace!(circ_id = %self.unique_id, "Handshake complete; circuit created.");
48
        let peer_id = self.channel.target().clone();
48
        self.add_hop(
48
            path::HopDetail::Relay(peer_id),
48
            fwd,
48
            back,
48
            binding,
48
            &settings,
        )?;
48
        Ok(())
48
    }
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
    /// first hop of this circuit.
    ///
    /// There's no authentication in CREATE_FAST,
    /// so we don't need to know whom we're connecting to: we're just
    /// connecting to whichever relay the channel is for.
12
    async fn create_firsthop_fast(
12
        &mut self,
12
        recvcreated: oneshot::Receiver<CreateResponse>,
12
        settings: HopSettings,
18
    ) -> Result<()> {
        // In a CREATE_FAST handshake, we can't negotiate a format other than this.
12
        let wrap = CreateFastWrap;
12
        self.create_impl::<CreateFastClient, _, _>(recvcreated, &wrap, &(), settings, &())
12
            .await
12
    }
    /// Use the ntor handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided keys must match the channel's target,
    /// or the handshake will fail.
12
    async fn create_firsthop_ntor(
12
        &mut self,
12
        recvcreated: oneshot::Receiver<CreateResponse>,
12
        ed_identity: pk::ed25519::Ed25519Identity,
12
        pubkey: NtorPublicKey,
12
        settings: HopSettings,
18
    ) -> Result<()> {
        // Exit now if we have an Ed25519 or RSA identity mismatch.
12
        let target = RelayIds::builder()
12
            .ed_identity(ed_identity)
12
            .rsa_identity(pubkey.id)
12
            .build()
12
            .expect("Unable to build RelayIds");
12
        self.channel.check_match(&target)?;
12
        let wrap = Create2Wrap {
12
            handshake_type: HandshakeType::NTOR,
12
        };
12
        self.create_impl::<NtorClient, _, _>(recvcreated, &wrap, &pubkey, settings, &())
12
            .await
12
    }
    /// Use the ntor-v3 handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided key must match the channel's target,
    /// or the handshake will fail.
24
    async fn create_firsthop_ntor_v3(
24
        &mut self,
24
        recvcreated: oneshot::Receiver<CreateResponse>,
24
        pubkey: NtorV3PublicKey,
24
        settings: HopSettings,
36
    ) -> Result<()> {
        // Exit now if we have a mismatched key.
24
        let target = RelayIds::builder()
24
            .ed_identity(pubkey.id)
24
            .build()
24
            .expect("Unable to build RelayIds");
24
        self.channel.check_match(&target)?;
        // Set the client extensions.
24
        let client_extensions = settings.circuit_request_extensions()?;
24
        let wrap = Create2Wrap {
24
            handshake_type: HandshakeType::NTOR_V3,
24
        };
24
        self.create_impl::<NtorV3Client, _, _>(
24
            recvcreated,
24
            &wrap,
24
            &pubkey,
24
            settings,
24
            &client_extensions,
24
        )
24
        .await
24
    }
    /// Add a hop to the end of this circuit.
    ///
    /// Will return an error if the circuit already has [`u8::MAX`] hops.
1020
    pub(super) fn add_hop(
1020
        &mut self,
1020
        peer_id: path::HopDetail,
1020
        fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1020
        rev: Box<dyn InboundClientLayer + 'static + Send>,
1020
        binding: Option<CircuitBinding>,
1020
        settings: &HopSettings,
1020
    ) -> StdResult<(), Bug> {
1020
        let hop_num = self.hops.len();
1020
        debug_assert_eq!(hop_num, usize::from(self.num_hops()));
        // There are several places in the code that assume that a `usize` hop number
        // can be cast or converted to a `u8` hop number,
        // so this check is important to prevent panics or incorrect behaviour.
1020
        if hop_num == usize::from(u8::MAX) {
            return Err(internal!(
                "cannot add more hops to a circuit with `u8::MAX` hops"
            ));
1020
        }
1020
        let hop_num = (hop_num as u8).into();
1020
        let hop = CircHop::new(self.unique_id, hop_num, settings);
1020
        self.hops.push(hop);
1020
        self.crypto_in.add_layer(rev);
1020
        self.crypto_out.add_layer(fwd);
1020
        self.mutable.add_hop(peer_id, binding);
1020
        Ok(())
1020
    }
    /// Handle a RELAY cell on this circuit with stream ID 0.
    ///
    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
    /// This function returns a `CircProto` error if `msg` is an unsupported,
    /// unexpected, or otherwise invalid message:
    ///
    ///   * unexpected messages are rejected by returning an error using
    ///     [`unsupported_client_cell`]
    ///   * SENDME/TRUNCATED messages are rejected if they don't parse
    ///   * SENDME authentication tags are validated inside [`Circuit::handle_sendme`]
    ///   * conflux cells are handled in the client [`ConfluxMsgHandler`]
    ///
    /// The error is propagated all the way up to [`Circuit::handle_cell`],
    /// and eventually ends up being returned from the reactor's `run_once` function,
    /// causing it to shut down.
    #[allow(clippy::cognitive_complexity)]
232
    fn handle_meta_cell(
232
        &mut self,
232
        handlers: &mut CellHandlers,
232
        hopnum: HopNum,
232
        msg: UnparsedRelayMsg,
232
    ) -> Result<Option<CircuitCmd>> {
        // SENDME cells and TRUNCATED get handled internally by the circuit.
        // TODO: This pattern (Check command, try to decode, map error) occurs
        // several times, and would be good to extract simplify. Such
        // simplification is obstructed by a couple of factors: First, that
        // there is not currently a good way to get the RelayCmd from _type_ of
        // a RelayMsg.  Second, that decode() [correctly] consumes the
        // UnparsedRelayMsg.  I tried a macro-based approach, and didn't care
        // for it. -nickm
232
        if msg.cmd() == RelayCmd::SENDME {
44
            let sendme = msg
44
                .decode::<Sendme>()
44
                .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
44
                .into_msg();
44
            return Ok(Some(CircuitCmd::HandleSendMe {
44
                hop: hopnum,
44
                sendme,
44
            }));
188
        }
188
        if msg.cmd() == RelayCmd::TRUNCATED {
            let truncated = msg
                .decode::<Truncated>()
                .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
                .into_msg();
            let reason = truncated.reason();
            debug!(
                circ_id = %self.unique_id,
                "Truncated from hop {}. Reason: {} [{}]",
                hopnum.display(),
                reason.human_str(),
                reason
            );
            return Ok(Some(CircuitCmd::CleanShutdown));
188
        }
188
        if msg.cmd() == RelayCmd::DROP {
            cfg_if::cfg_if! {
                if #[cfg(feature = "circ-padding")] {
                    return Ok(None);
                } else {
                    use crate::util::err::ExcessPadding;
                    return Err(Error::ExcessPadding(ExcessPadding::NoPaddingNegotiated, hopnum));
                }
            }
188
        }
188
        trace!(circ_id = %self.unique_id, cell = ?msg, "Received meta-cell");
        #[cfg(feature = "conflux")]
60
        if matches!(
188
            msg.cmd(),
            RelayCmd::CONFLUX_LINK
                | RelayCmd::CONFLUX_LINKED
                | RelayCmd::CONFLUX_LINKED_ACK
                | RelayCmd::CONFLUX_SWITCH
        ) {
128
            let cmd = self.handle_conflux_msg(hopnum, msg)?;
112
            return Ok(cmd.map(CircuitCmd::from));
60
        }
60
        if self.is_conflux_pending() {
            warn!(
                circ_id = %self.unique_id,
                "received unexpected cell {msg:?} on unlinked conflux circuit",
            );
            return Err(Error::CircProto(
                "Received unexpected cell on unlinked circuit".into(),
            ));
60
        }
        // For all other command types, we'll only get them in response
        // to another command, which should have registered a responder.
        //
        // TODO: should the conflux state machine be a meta cell handler?
        // We'd need to add support for multiple meta handlers, and change the
        // MetaCellHandler API to support returning Option<RunOnceCmdInner>
        // (because some cells will require sending a response)
60
        if let Some(mut handler) = handlers.meta_handler.take() {
            // The handler has a TargetHop so we do a quick convert for equality check.
60
            if handler.expected_hop() == (self.unique_id(), hopnum).into() {
                // Somebody was waiting for a message -- maybe this message
48
                let ret = handler.handle_msg(msg, self);
48
                trace!(
                    circ_id = %self.unique_id,
                    result = ?ret,
                    "meta handler completed",
                );
24
                match ret {
                    #[cfg(feature = "send-control-msg")]
                    Ok(MetaCellDisposition::Consumed) => {
                        handlers.meta_handler = Some(handler);
                        Ok(None)
                    }
24
                    Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
                    #[cfg(feature = "send-control-msg")]
                    Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
24
                    Err(e) => Err(e),
                }
            } else {
                // Somebody wanted a message from a different hop!  Put this
                // one back.
12
                handlers.meta_handler = Some(handler);
12
                unsupported_client_cell!(msg, hopnum)
            }
        } else {
            // No need to call shutdown here, since this error will
            // propagate to the reactor shut it down.
            unsupported_client_cell!(msg)
        }
232
    }
    /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
    #[instrument(level = "trace", skip_all)]
44
    pub(super) fn handle_sendme(
44
        &mut self,
44
        hopnum: HopNum,
44
        msg: Sendme,
44
        signals: CongestionSignals,
44
    ) -> Result<Option<CircuitCmd>> {
        // Cloned, because we borrow mutably from self when we get the circhop.
44
        let runtime = self.runtime.clone();
        // No need to call "shutdown" on errors in this function;
        // it's called from the reactor task and errors will propagate there.
44
        let hop = self
44
            .hop_mut(hopnum)
44
            .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
44
        let tag = msg.into_sendme_tag().ok_or_else(||
                // Versions of Tor <=0.3.5 would omit a SENDME tag in this case;
                // but we don't support those any longer.
                 Error::CircProto("missing tag on circuit sendme".into()))?;
        // Update the CC object that we received a SENDME along with possible congestion signals.
44
        hop.ccontrol()
44
            .note_sendme_received(&runtime, tag, signals)?;
40
        Ok(None)
44
    }
    /// Send a message onto the circuit's channel.
    ///
    /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
    /// will be enqueued for sending at a later iteration of the reactor loop.
    ///
    /// `info` is the status returned from the padding controller when we told it we were queueing
    /// this data.  It should be provided whenever possible.
    ///
    /// # Note
    ///
    /// Making use of the enqueuing capabilities of this function is discouraged! You should first
    /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
    /// ideally use this to implement backpressure (such that you do not read from other sources
    /// that would send here while you know you're unable to forward the messages on).
    #[instrument(level = "trace", skip_all)]
4636
    async fn send_msg(
4636
        &mut self,
4636
        msg: AnyChanMsg,
4636
        info: Option<QueuedCellPaddingInfo>,
6954
    ) -> Result<()> {
        let cell = AnyChanCell::new(Some(self.channel_id), msg);
        // Note: this future is always `Ready`, so await won't block.
        Pin::new(&mut self.chan_sender)
            .send_unbounded((cell, info))
            .await?;
        Ok(())
4636
    }
    /// Remove all halfstreams that are expired at `now`.
7052
    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
7052
        self.hops.remove_expired_halfstreams(now);
7052
    }
    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
4106
    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
4106
        self.hops.hop(hopnum)
4106
    }
    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
1788
    pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1788
        self.hops.get_mut(hopnum)
1788
    }
    /// Begin a stream with the provided hop in this circuit.
    // TODO: see if there's a way that we can clean this up
    #[allow(clippy::too_many_arguments)]
96
    pub(super) fn begin_stream(
96
        &mut self,
96
        hop_num: HopNum,
96
        message: AnyRelayMsg,
96
        time_prov: &DynTimeProvider,
96
        cmd_checker: AnyCmdChecker,
96
        memquota: &StreamAccount,
96
    ) -> Result<(SendRelayCell, StreamId, ReactorStreamComponents)> {
96
        let Some(hop) = self.hop_mut(hop_num) else {
            return Err(internal!(
                "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
                self.unique_id,
            )
            .into());
        };
96
        hop.begin_stream(message, time_prov, cmd_checker, memquota)
96
    }
    /// Close the specified stream
    #[instrument(level = "trace", skip_all)]
64
    pub(super) async fn close_stream(
64
        &mut self,
64
        hop_num: HopNum,
64
        sid: StreamId,
64
        behav: CloseStreamBehavior,
64
        reason: streammap::TerminateReason,
64
        expiry: Instant,
96
    ) -> Result<()> {
        if let Some(hop) = self.hop_mut(hop_num) {
            let res = hop.close_stream(sid, behav, reason, expiry)?;
            if let Some(cell) = res {
                self.send_relay_cell(cell).await?;
            }
        }
        Ok(())
64
    }
    /// Returns true if there are any streams on this circuit
    ///
    /// Important: this function locks the stream map of its each of the [`CircHop`]s
    /// in this circuit, so it must **not** be called from any function where the
    /// stream map lock is held.
52
    pub(super) fn has_streams(&self) -> bool {
52
        self.hops.has_streams()
52
    }
    /// The number of hops in this circuit.
1272
    pub(super) fn num_hops(&self) -> u8 {
        // `Circuit::add_hop` checks to make sure that we never have more than `u8::MAX` hops,
        // so `self.hops.len()` should be safe to cast to a `u8`.
        // If that assumption is violated,
        // we choose to panic rather than silently use the wrong hop due to an `as` cast.
1272
        self.hops
1272
            .len()
1272
            .try_into()
1272
            .expect("`hops.len()` has more than `u8::MAX` hops")
1272
    }
    /// Check whether this circuit has any hops.
4878
    pub(super) fn has_hops(&self) -> bool {
4878
        !self.hops.is_empty()
4878
    }
    /// Get the `HopNum` of the last hop, if this circuit is non-empty.
    ///
    /// Returns `None` if the circuit has no hops.
252
    pub(super) fn last_hop_num(&self) -> Option<HopNum> {
252
        let num_hops = self.num_hops();
252
        if num_hops == 0 {
            // asked for the last hop, but there are no hops
            return None;
252
        }
252
        Some(HopNum::from(num_hops - 1))
252
    }
    /// Get the path of the circuit.
    ///
    /// **Warning:** Do not call while already holding the [`Self::mutable`] lock.
120
    pub(super) fn path(&self) -> Arc<path::Path> {
120
        self.mutable.path()
120
    }
    /// Return a ClockSkew declaring how much clock skew the other side of this channel
    /// claimed that we had when we negotiated the connection.
    pub(super) fn clock_skew(&self) -> ClockSkew {
        self.channel.clock_skew()
    }
    /// Does congestion control use stream SENDMEs for the given `hop`?
    ///
    /// Returns `None` if `hop` doesn't exist.
    pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
        let hop = self.hop(hop)?;
        Some(hop.ccontrol().uses_stream_sendme())
    }
    /// Returns whether this is a conflux circuit that is not linked yet.
4560
    pub(super) fn is_conflux_pending(&self) -> bool {
4560
        let Some(status) = self.conflux_status() else {
3260
            return false;
        };
1300
        status != ConfluxStatus::Linked
4560
    }
    /// Returns the conflux status of this circuit.
    ///
    /// Returns `None` if this is not a conflux circuit.
4896
    pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
        cfg_if::cfg_if! {
            if #[cfg(feature = "conflux")] {
4896
                self.conflux_handler
4896
                    .as_ref()
5606
                    .map(|handler| handler.status())
            } else {
                None
            }
        }
4896
    }
    /// Returns initial RTT on this leg, measured in the conflux handshake.
    #[cfg(feature = "conflux")]
1512
    pub(super) fn init_rtt(&self) -> Option<Duration> {
1512
        self.conflux_handler
1512
            .as_ref()
2268
            .map(|handler| handler.init_rtt())?
1512
    }
    /// Start or stop padding at the given hop.
    ///
    /// Replaces any previous padder at that hop.
    ///
    /// Return an error if that hop doesn't exist.
    #[cfg(feature = "circ-padding-manual")]
    pub(super) fn set_padding_at_hop(
        &self,
        hop: HopNum,
        padder: Option<padding::CircuitPadder>,
    ) -> Result<()> {
        if self.hop(hop).is_none() {
            return Err(Error::NoSuchHop);
        }
        self.padding_ctrl.install_padder_padding_at_hop(hop, padder);
        Ok(())
    }
    /// Determine how exactly to handle a request to handle padding.
    ///
    /// This is fairly complicated; see the maybenot documentation for more information.
    ///
    /// ## Limitations
    ///
    /// In our current padding implementation, a circuit is either blocked or not blocked:
    /// we do not keep track of which hop is actually doing the blocking.
    #[cfg(feature = "circ-padding")]
    fn padding_disposition(&self, send_padding: &padding::SendPadding) -> CircPaddingDisposition {
        crate::circuit::padding::padding_disposition(
            send_padding,
            &self.chan_sender,
            self.padding_block.as_ref(),
        )
    }
    /// Handle a request from our padding subsystem to send a padding packet.
    #[cfg(feature = "circ-padding")]
    pub(super) async fn send_padding(&mut self, send_padding: padding::SendPadding) -> Result<()> {
        use CircPaddingDisposition::*;
        let target_hop = send_padding.hop;
        match self.padding_disposition(&send_padding) {
            QueuePaddingNormally => {
                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
                self.queue_padding_cell_for_hop(target_hop, queue_info)
                    .await?;
            }
            QueuePaddingAndBypass => {
                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
                self.queue_padding_cell_for_hop(target_hop, queue_info)
                    .await?;
            }
            TreatQueuedCellAsPadding => {
                self.padding_ctrl
                    .replaceable_padding_already_queued(target_hop, send_padding);
            }
        }
        Ok(())
    }
    /// Generate and encrypt a padding cell, and send it to a targeted hop.
    ///
    /// Ignores any padding-based blocking.
    #[cfg(feature = "circ-padding")]
    async fn queue_padding_cell_for_hop(
        &mut self,
        target_hop: HopNum,
        queue_info: Option<QueuedCellPaddingInfo>,
    ) -> Result<()> {
        use tor_cell::relaycell::msg::Drop as DropMsg;
        let msg = SendRelayCell {
            hop: Some(target_hop),
            // TODO circpad: we will probably want padding machines that can send EARLY cells.
            early: false,
            cell: AnyRelayMsgOuter::new(None, DropMsg::default().into()),
        };
        self.send_relay_cell_inner(msg, queue_info).await
    }
    /// Enable padding-based blocking,
    /// or change the rule for padding-based blocking to the one in `block`.
    #[cfg(feature = "circ-padding")]
    pub(super) fn start_blocking_for_padding(&mut self, block: padding::StartBlocking) {
        self.chan_sender.start_blocking();
        self.padding_block = Some(block);
    }
    /// Disable padding-based blocking.
    #[cfg(feature = "circ-padding")]
    pub(super) fn stop_blocking_for_padding(&mut self) {
        self.chan_sender.stop_blocking();
        self.padding_block = None;
    }
    /// The estimated circuit build timeout for a circuit of the specified length.
64
    pub(super) fn estimate_cbt(&self, length: usize) -> Duration {
64
        self.timeouts.circuit_build_timeout(length)
64
    }
}
impl Drop for Circuit {
324
    fn drop(&mut self) {
324
        let _ = self.channel.close_circuit(self.channel_id);
324
    }
}