1
//! Module exposing types for representing circuits in the tunnel reactor.
2

            
3
pub(crate) mod circhop;
4
pub(super) mod extender;
5

            
6
use crate::channel::Channel;
7
use crate::circuit::cell_sender::CircuitCellSender;
8
use crate::circuit::celltypes::CreateResponse;
9
use crate::circuit::circhop::HopSettings;
10
use crate::circuit::create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
11
use crate::circuit::padding::CircPaddingDisposition;
12
use crate::circuit::{CircuitRxReceiver, UniqId};
13
use crate::client::circuit::handshake::{BoxedClientLayer, HandshakeRole};
14
use crate::client::circuit::padding::{
15
    self, PaddingController, PaddingEventStream, QueuedCellPaddingInfo,
16
};
17
use crate::client::circuit::{ClientCircChanMsg, MutableState, path};
18
use crate::client::reactor::MetaCellDisposition;
19
use crate::congestion::CongestionSignals;
20
use crate::congestion::sendme;
21
use crate::crypto::binding::CircuitBinding;
22
use crate::crypto::cell::{
23
    HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
24
    RelayCellBody,
25
};
26
use crate::crypto::handshake::fast::CreateFastClient;
27
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
28
use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
29
use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
30
use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
31
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
32
use crate::stream::flow_ctrl::state::StreamRateLimit;
33
use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
34
use crate::stream::queue::{StreamQueueSender, stream_queue};
35
use crate::stream::{StreamMpscReceiver, msg_streamid};
36
use crate::streammap;
37
use crate::tunnel::TunnelScopedCircId;
38
use crate::util::err::ReactorError;
39
use crate::util::notify::NotifySender;
40
use crate::util::timeout::TimeoutEstimator;
41
use crate::{ClockSkew, Error, Result};
42

            
43
use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
44
use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
45
use tor_cell::chancell::{AnyChanCell, ChanCmd, CircId};
46
use tor_cell::chancell::{BoxedCellBody, ChanMsg};
47
use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, SendmeTag, Truncated};
48
use tor_cell::relaycell::{
49
    AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, StreamId, UnparsedRelayMsg,
50
};
51
use tor_error::{Bug, internal};
52
use tor_linkspec::RelayIds;
53
use tor_llcrypto::pk;
54
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
55

            
56
use futures::SinkExt as _;
57
use oneshot_fused_workaround as oneshot;
58
use postage::watch;
59
use tor_rtcompat::{DynTimeProvider, SleepProvider as _};
60
use tracing::{debug, instrument, trace, warn};
61

            
62
use super::{
63
    CellHandlers, CircuitHandshake, CloseStreamBehavior, ReactorResultChannel, SendRelayCell,
64
};
65

            
66
use crate::conflux::msghandler::ConfluxStatus;
67

            
68
use std::borrow::Borrow;
69
use std::pin::Pin;
70
use std::result::Result as StdResult;
71
use std::sync::Arc;
72
use std::time::{Duration, Instant, SystemTime};
73

            
74
use extender::HandshakeAuxDataHandler;
75

            
76
#[cfg(feature = "hs-service")]
77
use {
78
    crate::circuit::CircHopSyncView,
79
    crate::client::stream::{InboundDataCmdChecker, IncomingStreamRequest},
80
    tor_cell::relaycell::msg::Begin,
81
};
82

            
83
#[cfg(feature = "conflux")]
84
use {
85
    crate::conflux::msghandler::{ConfluxAction, ConfluxCmd, ConfluxMsgHandler, OooRelayMsg},
86
    crate::tunnel::TunnelId,
87
};
88

            
89
#[cfg(not(feature = "flowctl-cc"))]
90
use crate::stream::STREAM_READER_BUFFER;
91

            
92
pub(super) use circhop::{CircHop, CircHopList};
93

            
94
/// A circuit "leg" from a tunnel.
95
///
96
/// Regular (non-multipath) circuits have a single leg.
97
/// Conflux (multipath) circuits have `N` (usually, `N = 2`).
98
pub(crate) struct Circuit {
99
    /// The time provider.
100
    runtime: DynTimeProvider,
101
    /// The channel this circuit is attached to.
102
    channel: Arc<Channel>,
103
    /// Sender object used to actually send cells.
104
    ///
105
    /// NOTE: Control messages could potentially add unboundedly to this, although that's
106
    ///       not likely to happen (and isn't triggereable from the network, either).
107
    pub(super) chan_sender: CircuitCellSender,
108
    /// Input stream, on which we receive ChanMsg objects from this circuit's
109
    /// channel.
110
    ///
111
    // TODO: could use a SPSC channel here instead.
112
    pub(super) input: CircuitRxReceiver,
113
    /// The cryptographic state for this circuit for inbound cells.
114
    /// This object is divided into multiple layers, each of which is
115
    /// shared with one hop of the circuit.
116
    crypto_in: InboundClientCrypt,
117
    /// The cryptographic state for this circuit for outbound cells.
118
    crypto_out: OutboundClientCrypt,
119
    /// List of hops state objects used by the reactor
120
    pub(super) hops: CircHopList,
121
    /// Mutable information about this circuit,
122
    /// shared with the reactor's `ConfluxSet`.
123
    mutable: Arc<MutableState>,
124
    /// This circuit's identifier on the upstream channel.
125
    channel_id: CircId,
126
    /// An identifier for logging about this reactor's circuit.
127
    unique_id: TunnelScopedCircId,
128
    /// A handler for conflux cells.
129
    ///
130
    /// Set once the conflux handshake is initiated by the reactor
131
    /// using [`Reactor::handle_link_circuits`](super::Reactor::handle_link_circuits).
132
    #[cfg(feature = "conflux")]
133
    conflux_handler: Option<ConfluxMsgHandler>,
134
    /// A padding controller to which padding-related events should be reported.
135
    padding_ctrl: PaddingController,
136
    /// An event stream telling us about padding-related events.
137
    //
138
    // TODO: it would be nice to have all of these streams wrapped in a single
139
    // SelectAll, but we can't really do that, since we need the ability to move them
140
    // from one conflux set to another, and a SelectAll doesn't let you actually
141
    // remove one of its constituent streams.  This issue might get solved along
142
    // with the the rest of the next reactor refactoring.
143
    pub(super) padding_event_stream: PaddingEventStream,
144
    /// Current rules for blocking traffic, according to the padding controller.
145
    #[cfg(feature = "circ-padding")]
146
    padding_block: Option<padding::StartBlocking>,
147
    /// The circuit timeout estimator.
148
    ///
149
    /// Used for computing half-stream expiration.
150
    timeouts: Arc<dyn TimeoutEstimator>,
151
    /// Memory quota account
152
    #[allow(dead_code)] // Partly here to keep it alive as long as the circuit
153
    memquota: CircuitAccount,
154
}
155

            
156
/// A command to run in response to a circuit event.
157
///
158
/// Unlike `RunOnceCmdInner`, doesn't know anything about `UniqId`s.
159
/// The user of the `CircuitCmd`s is supposed to know the `UniqId`
160
/// of the circuit the `CircuitCmd` came from.
161
///
162
/// This type gets mapped to a `RunOnceCmdInner` in the circuit reactor.
163
#[derive(Debug, derive_more::From)]
164
pub(super) enum CircuitCmd {
165
    /// Send a RELAY cell on the circuit leg this command originates from.
166
    Send(SendRelayCell),
167
    /// Handle a SENDME message received on the circuit leg this command originates from.
168
    HandleSendMe {
169
        /// The hop number.
170
        hop: HopNum,
171
        /// The SENDME message to handle.
172
        sendme: Sendme,
173
    },
174
    /// Close the specified stream on the circuit leg this command originates from.
175
    CloseStream {
176
        /// The hop number.
177
        hop: HopNum,
178
        /// The ID of the stream to close.
179
        sid: StreamId,
180
        /// The stream-closing behavior.
181
        behav: CloseStreamBehavior,
182
        /// The reason for closing the stream.
183
        reason: streammap::TerminateReason,
184
    },
185
    /// Perform an action resulting from handling a conflux cell.
186
    #[cfg(feature = "conflux")]
187
    Conflux(ConfluxCmd),
188
    /// Perform a clean shutdown on this circuit.
189
    CleanShutdown,
190
    /// Enqueue an out-of-order cell in the reactor.
191
    #[cfg(feature = "conflux")]
192
    Enqueue(OooRelayMsg),
193
}
194

            
195
/// Return a `CircProto` error for the specified unsupported cell.
196
///
197
/// This error will shut down the reactor.
198
///
199
/// Note: this is a macro to simplify usage (this way the caller doesn't
200
/// need to .map() the result to the appropriate type)
201
macro_rules! unsupported_client_cell {
202
    ($msg:expr) => {{
203
        unsupported_client_cell!(@ $msg, "")
204
    }};
205

            
206
    ($msg:expr, $hopnum:expr) => {{
207
        let hop: HopNum = $hopnum;
208
        let hop_display = format!(" from hop {}", hop.display());
209
        unsupported_client_cell!(@ $msg, hop_display)
210
    }};
211

            
212
    (@ $msg:expr, $hopnum_display:expr) => {
213
        Err(crate::Error::CircProto(format!(
214
            "Unexpected {} cell{} on client circuit",
215
            $msg.cmd(),
216
            $hopnum_display,
217
        )))
218
    };
219
}
220

            
221
pub(super) use unsupported_client_cell;
222

            
223
impl Circuit {
224
    /// Create a new non-multipath circuit.
225
    #[allow(clippy::too_many_arguments)]
226
376
    pub(super) fn new(
227
376
        runtime: DynTimeProvider,
228
376
        channel: Arc<Channel>,
229
376
        channel_id: CircId,
230
376
        unique_id: TunnelScopedCircId,
231
376
        input: CircuitRxReceiver,
232
376
        memquota: CircuitAccount,
233
376
        mutable: Arc<MutableState>,
234
376
        padding_ctrl: PaddingController,
235
376
        padding_event_stream: PaddingEventStream,
236
376
        timeouts: Arc<dyn TimeoutEstimator>,
237
376
    ) -> Self {
238
376
        let chan_sender = CircuitCellSender::from_channel_sender(channel.sender());
239

            
240
376
        let crypto_out = OutboundClientCrypt::new();
241
376
        Circuit {
242
376
            runtime,
243
376
            channel,
244
376
            chan_sender,
245
376
            input,
246
376
            crypto_in: InboundClientCrypt::new(),
247
376
            hops: CircHopList::default(),
248
376
            unique_id,
249
376
            channel_id,
250
376
            crypto_out,
251
376
            mutable,
252
376
            #[cfg(feature = "conflux")]
253
376
            conflux_handler: None,
254
376
            padding_ctrl,
255
376
            padding_event_stream,
256
376
            #[cfg(feature = "circ-padding")]
257
376
            padding_block: None,
258
376
            timeouts,
259
376
            memquota,
260
376
        }
261
376
    }
262

            
263
    /// Return the process-unique identifier of this circuit.
264
18384
    pub(super) fn unique_id(&self) -> UniqId {
265
18384
        self.unique_id.unique_id()
266
18384
    }
267

            
268
    /// Return the shared mutable state of this circuit.
269
428
    pub(super) fn mutable(&self) -> &Arc<MutableState> {
270
428
        &self.mutable
271
428
    }
272

            
273
    /// Add this circuit to a multipath tunnel, by associating it with a new [`TunnelId`],
274
    /// and installing a [`ConfluxMsgHandler`] on this circuit.
275
    ///
276
    /// Once this is called, the circuit will be able to handle conflux cells.
277
    #[cfg(feature = "conflux")]
278
104
    pub(super) fn add_to_conflux_tunnel(
279
104
        &mut self,
280
104
        tunnel_id: TunnelId,
281
104
        conflux_handler: ConfluxMsgHandler,
282
104
    ) {
283
104
        self.unique_id = TunnelScopedCircId::new(tunnel_id, self.unique_id.unique_id());
284
104
        self.conflux_handler = Some(conflux_handler);
285
104
    }
286

            
287
    /// Send a LINK cell to the specified hop.
288
    ///
289
    /// This must be called *after* a [`ConfluxMsgHandler`] is installed
290
    /// on the circuit with [`add_to_conflux_tunnel`](Self::add_to_conflux_tunnel).
291
    #[cfg(feature = "conflux")]
292
104
    pub(super) async fn begin_conflux_link(
293
104
        &mut self,
294
104
        hop: HopNum,
295
104
        cell: AnyRelayMsgOuter,
296
104
        runtime: &tor_rtcompat::DynTimeProvider,
297
156
    ) -> Result<()> {
298
        use tor_rtcompat::SleepProvider as _;
299

            
300
104
        if self.conflux_handler.is_none() {
301
            return Err(internal!(
302
                "tried to send LINK cell before installing a ConfluxMsgHandler?!"
303
            )
304
            .into());
305
104
        }
306

            
307
104
        let cell = SendRelayCell {
308
104
            hop: Some(hop),
309
104
            early: false,
310
104
            cell,
311
104
        };
312
104
        self.send_relay_cell(cell).await?;
313

            
314
104
        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
315
            return Err(internal!("ConfluxMsgHandler disappeared?!").into());
316
        };
317

            
318
104
        Ok(conflux_handler.note_link_sent(runtime.wallclock())?)
319
104
    }
320

            
321
    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
322
    ///
323
    /// Returns `None` if the handshake is not currently in progress.
324
7046
    pub(super) fn conflux_hs_timeout(&self) -> Option<SystemTime> {
325
        cfg_if::cfg_if! {
326
            if #[cfg(feature = "conflux")] {
327
8498
                self.conflux_handler.as_ref().map(|handler| handler.handshake_timeout())?
328
            } else {
329
                None
330
            }
331
        }
332
7046
    }
333

            
334
    /// Handle a [`CtrlMsg::AddFakeHop`](super::CtrlMsg::AddFakeHop) message.
335
    #[cfg(test)]
336
948
    pub(super) fn handle_add_fake_hop(
337
948
        &mut self,
338
948
        format: RelayCellFormat,
339
948
        fwd_lasthop: bool,
340
948
        rev_lasthop: bool,
341
948
        dummy_peer_id: path::HopDetail,
342
948
        // TODO-CGO: Take HopSettings instead of CircParams.
343
948
        // (Do this after we've got the virtual-hop refactorings done for
344
948
        // virtual extending.)
345
948
        params: &crate::client::circuit::CircParameters,
346
948
        done: ReactorResultChannel<()>,
347
948
    ) {
348
        use tor_protover::{Protocols, named};
349

            
350
        use crate::client::circuit::test::DummyCrypto;
351

            
352
948
        assert!(matches!(format, RelayCellFormat::V0));
353
948
        let _ = format; // TODO-CGO: remove this once we have CGO+hs implemented.
354

            
355
948
        let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
356
948
        let rev = Box::new(DummyCrypto::new(rev_lasthop));
357
948
        let binding = None;
358

            
359
948
        let settings = HopSettings::from_params_and_caps(
360
            // This is for testing only, so we'll assume full negotiation took place.
361
948
            crate::circuit::circhop::HopNegotiationType::Full,
362
948
            params,
363
948
            &[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
364
        )
365
948
        .expect("Can't construct HopSettings");
366
948
        self.add_hop(dummy_peer_id, fwd, rev, binding, &settings)
367
948
            .expect("could not add hop to circuit");
368
948
        let _ = done.send(Ok(()));
369
948
    }
370

            
371
    /// Encode `msg` and encrypt it, returning the resulting cell
372
    /// and tag that should be expected for an authenticated SENDME sent
373
    /// in response to that cell.
374
4594
    fn encode_relay_cell(
375
4594
        crypto_out: &mut OutboundClientCrypt,
376
4594
        relay_format: RelayCellFormat,
377
4594
        hop: HopNum,
378
4594
        early: bool,
379
4594
        msg: AnyRelayMsgOuter,
380
4594
    ) -> Result<(AnyChanMsg, SendmeTag)> {
381
4594
        let mut body: RelayCellBody = msg
382
4594
            .encode(relay_format, &mut rand::rng())
383
4594
            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
384
4594
            .into();
385
4594
        let cmd = if early {
386
72
            ChanCmd::RELAY_EARLY
387
        } else {
388
4522
            ChanCmd::RELAY
389
        };
390
4594
        let tag = crypto_out.encrypt(cmd, &mut body, hop)?;
391
4594
        let msg = Relay::from(BoxedCellBody::from(body));
392
4594
        let msg = if early {
393
72
            AnyChanMsg::RelayEarly(msg.into())
394
        } else {
395
4522
            AnyChanMsg::Relay(msg)
396
        };
397

            
398
4594
        Ok((msg, tag))
399
4594
    }
400

            
401
    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
402
    ///
403
    /// If there is insufficient outgoing *circuit-level* or *stream-level*
404
    /// SENDME window, an error is returned instead.
405
    ///
406
    /// Does not check whether the cell is well-formed or reasonable.
407
    ///
408
    /// NOTE: the reactor should not call this function directly, only via
409
    /// [`ConfluxSet::send_relay_cell_on_leg`](super::conflux::ConfluxSet::send_relay_cell_on_leg),
410
    /// which will reroute the message, if necessary to the primary leg.
411
    #[instrument(level = "trace", skip_all)]
412
6891
    pub(super) async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
413
        self.send_relay_cell_inner(msg, None).await
414
4594
    }
415

            
416
    /// As [`send_relay_cell`](Self::send_relay_cell), but takes an optional
417
    /// [`QueuedCellPaddingInfo`] in `padding_info`.
418
    ///
419
    /// If `padding_info` is None, `msg` must be non-padding: we report it as such to the
420
    /// padding controller.
421
    #[instrument(level = "trace", skip_all)]
422
4594
    async fn send_relay_cell_inner(
423
4594
        &mut self,
424
4594
        msg: SendRelayCell,
425
4594
        padding_info: Option<QueuedCellPaddingInfo>,
426
6891
    ) -> Result<()> {
427
        let SendRelayCell {
428
            hop,
429
            early,
430
            cell: msg,
431
4594
        } = msg;
432

            
433
        let is_conflux_link = msg.cmd() == RelayCmd::CONFLUX_LINK;
434
        if !is_conflux_link && self.is_conflux_pending() {
435
            // Note: it is the responsibility of the reactor user to wait until
436
            // at least one of the legs completes the handshake.
437
            return Err(internal!("tried to send cell on unlinked circuit").into());
438
        }
439

            
440
        trace!(circ_id = %self.unique_id, cell = ?msg, "sending relay cell");
441

            
442
        // Cloned, because we borrow mutably from self when we get the circhop.
443
        let runtime = self.runtime.clone();
444
        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
445
        let stream_id = msg.stream_id();
446
        let hop = hop.expect("missing hop in client SendRelayCell?!");
447
        let circhop = self.hops.get_mut(hop).ok_or(Error::NoSuchHop)?;
448

            
449
        // We might be out of capacity entirely; see if we are about to hit a limit.
450
        //
451
        // TODO: If we ever add a notion of _recoverable_ errors below, we'll
452
        // need a way to restore this limit, and similarly for about_to_send().
453
        circhop.decrement_outbound_cell_limit()?;
454

            
455
        // We need to apply stream-level flow control *before* encoding the message.
456
        if c_t_w {
457
            if let Some(stream_id) = stream_id {
458
                circhop.about_to_send(stream_id, msg.msg())?;
459
            }
460
        }
461

            
462
        // Save the RelayCmd of the message before it gets consumed below.
463
        // We need this to tell our ConfluxMsgHandler about the cell we've just sent,
464
        // so that it can update its counters.
465
        let relay_cmd = msg.cmd();
466

            
467
        // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
468
        //            the whole circuit (e.g. by returning an error).
469
        let (msg, tag) = Self::encode_relay_cell(
470
            &mut self.crypto_out,
471
            circhop.relay_cell_format(),
472
            hop,
473
            early,
474
            msg,
475
        )?;
476
        // The cell counted for congestion control, inform our algorithm of such and pass down the
477
        // tag for authenticated SENDMEs.
478
        if c_t_w {
479
            circhop.ccontrol().note_data_sent(&runtime, &tag)?;
480
        }
481

            
482
        // Remember that we've enqueued this cell.
483
4594
        let padding_info = padding_info.or_else(|| self.padding_ctrl.queued_data(hop));
484

            
485
        self.send_msg(msg, padding_info).await?;
486

            
487
        #[cfg(feature = "conflux")]
488
        if let Some(conflux) = self.conflux_handler.as_mut() {
489
            conflux.note_cell_sent(relay_cmd);
490
        }
491

            
492
        Ok(())
493
4594
    }
494

            
495
    /// Helper: process a cell on a channel.  Most cells get ignored
496
    /// or rejected; a few get delivered to circuits.
497
    ///
498
    /// Return `CellStatus::CleanShutdown` if we should exit.
499
    ///
500
    // TODO: returning `Vec<CircuitCmd>` means we're unnecessarily
501
    // allocating a `Vec` here. Generally, the number of commands is going to be small
502
    // (usually 1, but > 1 when we start supporting packed cells).
503
    //
504
    // We should consider using smallvec instead. It might also be a good idea to have a
505
    // separate higher-level type splitting this out into Single(CircuitCmd),
506
    // and Multiple(SmallVec<[CircuitCmd; <capacity>]>).
507
512
    pub(super) fn handle_cell(
508
512
        &mut self,
509
512
        handlers: &mut CellHandlers,
510
512
        leg: UniqId,
511
512
        cell: ClientCircChanMsg,
512
512
    ) -> Result<Vec<CircuitCmd>> {
513
512
        trace!(circ_id = %self.unique_id, cell = ?cell, "handling cell");
514
        use ClientCircChanMsg::*;
515
512
        match cell {
516
500
            Relay(r) => self.handle_relay_cell(handlers, leg, r),
517
12
            Destroy(d) => {
518
12
                let reason = d.reason();
519
12
                debug!(
520
                    circ_id = %self.unique_id,
521
                    "Received DESTROY cell. Reason: {} [{}]",
522
                    reason.human_str(),
523
                    reason
524
                );
525

            
526
18
                self.handle_destroy_cell().map(|c| vec![c])
527
            }
528
        }
529
512
    }
530

            
531
    /// Decode `cell`, returning its corresponding hop number, tag,
532
    /// and decoded body.
533
500
    fn decode_relay_cell(
534
500
        &mut self,
535
500
        cell: Relay,
536
500
    ) -> Result<(HopNum, SendmeTag, RelayCellDecoderResult)> {
537
        // This is always RELAY, not RELAY_EARLY, so long as this code is client-only.
538
500
        let cmd = cell.cmd();
539
500
        let mut body = cell.into_relay_body().into();
540

            
541
        // Decrypt the cell. If it's recognized, then find the
542
        // corresponding hop.
543
500
        let (hopnum, tag) = self.crypto_in.decrypt(cmd, &mut body)?;
544

            
545
        // Decode the cell.
546
500
        let decode_res = self
547
500
            .hop_mut(hopnum)
548
500
            .ok_or_else(|| {
549
                Error::from(internal!(
550
                    "Trying to decode cell from nonexistent hop {:?}",
551
                    hopnum
552
                ))
553
            })?
554
500
            .decode(body.into())?;
555

            
556
500
        Ok((hopnum, tag, decode_res))
557
500
    }
558

            
559
    /// React to a Relay or RelayEarly cell.
560
500
    fn handle_relay_cell(
561
500
        &mut self,
562
500
        handlers: &mut CellHandlers,
563
500
        leg: UniqId,
564
500
        cell: Relay,
565
500
    ) -> Result<Vec<CircuitCmd>> {
566
500
        let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
567

            
568
500
        if decode_res.is_padding() {
569
            self.padding_ctrl.decrypted_padding(hopnum)?;
570
500
        } else {
571
500
            self.padding_ctrl.decrypted_data(hopnum);
572
500
        }
573

            
574
        // Check whether we are allowed to receive more data for this circuit hop.
575
500
        self.hop_mut(hopnum)
576
500
            .ok_or_else(|| internal!("nonexistent hop {:?}", hopnum))?
577
500
            .decrement_inbound_cell_limit()?;
578

            
579
500
        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
580

            
581
        // Decrement the circuit sendme windows, and see if we need to
582
        // send a sendme cell.
583
500
        let send_circ_sendme = if c_t_w {
584
96
            self.hop_mut(hopnum)
585
96
                .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
586
96
                .ccontrol()
587
96
                .note_data_received()?
588
        } else {
589
404
            false
590
        };
591

            
592
500
        let mut circ_cmds = vec![];
593
        // If we do need to send a circuit-level SENDME cell, do so.
594
500
        if send_circ_sendme {
595
            // This always sends a V1 (tagged) sendme cell, and thereby assumes
596
            // that SendmeEmitMinVersion is no more than 1.  If the authorities
597
            // every increase that parameter to a higher number, this will
598
            // become incorrect.  (Higher numbers are not currently defined.)
599
            let sendme = Sendme::from(tag);
600
            let cell = AnyRelayMsgOuter::new(None, sendme.into());
601
            circ_cmds.push(CircuitCmd::Send(SendRelayCell {
602
                hop: Some(hopnum),
603
                early: false,
604
                cell,
605
            }));
606

            
607
            // Inform congestion control of the SENDME we are sending. This is a circuit level one.
608
            self.hop_mut(hopnum)
609
                .ok_or_else(|| {
610
                    Error::from(internal!(
611
                        "Trying to send SENDME to nonexistent hop {:?}",
612
                        hopnum
613
                    ))
614
                })?
615
                .ccontrol()
616
                .note_sendme_sent()?;
617
500
        }
618

            
619
500
        let (mut msgs, incomplete) = decode_res.into_parts();
620
932
        while let Some(msg) = msgs.next() {
621
500
            let msg_status = self.handle_relay_msg(handlers, hopnum, leg, c_t_w, msg)?;
622

            
623
152
            match msg_status {
624
280
                None => continue,
625
                Some(msg @ CircuitCmd::CleanShutdown) => {
626
                    for m in msgs {
627
                        debug!(
628
                            "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
629
                            id = self.unique_id
630
                        );
631
                    }
632
                    if let Some(incomplete) = incomplete {
633
                        debug!(
634
                            "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
635
                            incomplete,
636
                            id = self.unique_id,
637
                        );
638
                    }
639
                    circ_cmds.push(msg);
640
                    return Ok(circ_cmds);
641
                }
642
152
                Some(msg) => {
643
152
                    circ_cmds.push(msg);
644
152
                }
645
            }
646
        }
647

            
648
432
        Ok(circ_cmds)
649
500
    }
650

            
651
    /// Handle a single incoming relay message.
652
500
    fn handle_relay_msg(
653
500
        &mut self,
654
500
        handlers: &mut CellHandlers,
655
500
        hopnum: HopNum,
656
500
        leg: UniqId,
657
500
        cell_counts_toward_windows: bool,
658
500
        msg: UnparsedRelayMsg,
659
500
    ) -> Result<Option<CircuitCmd>> {
660
        // If this msg wants/refuses to have a Stream ID, does it
661
        // have/not have one?
662
500
        let streamid = msg_streamid(&msg)?;
663

            
664
        // If this doesn't have a StreamId, it's a meta cell,
665
        // not meant for a particular stream.
666
500
        let Some(streamid) = streamid else {
667
232
            return self.handle_meta_cell(handlers, hopnum, msg);
668
        };
669

            
670
        #[cfg(feature = "conflux")]
671
268
        let msg = if let Some(conflux) = self.conflux_handler.as_mut() {
672
76
            match conflux.action_for_msg(hopnum, cell_counts_toward_windows, streamid, msg)? {
673
60
                ConfluxAction::Deliver(msg) => {
674
                    // The message either doesn't count towards the sequence numbers
675
                    // or is already well-ordered, so we're ready to handle it.
676

            
677
                    // It's possible that some of our buffered messages are now ready to be
678
                    // handled. We don't check that here, however, because that's handled
679
                    // by the reactor main loop.
680
60
                    msg
681
                }
682
16
                ConfluxAction::Enqueue(msg) => {
683
                    // Tell the reactor to enqueue this msg
684
16
                    return Ok(Some(CircuitCmd::Enqueue(msg)));
685
                }
686
            }
687
        } else {
688
            // If we don't have a conflux_handler, it means this circuit is not part of
689
            // a conflux tunnel, so we can just process the message.
690
192
            msg
691
        };
692

            
693
252
        self.handle_in_order_relay_msg(
694
252
            handlers,
695
252
            hopnum,
696
252
            leg,
697
252
            cell_counts_toward_windows,
698
252
            streamid,
699
252
            msg,
700
        )
701
500
    }
702

            
703
    /// Handle a single incoming relay message that is known to be in order.
704
268
    pub(super) fn handle_in_order_relay_msg(
705
268
        &mut self,
706
268
        handlers: &mut CellHandlers,
707
268
        hopnum: HopNum,
708
268
        leg: UniqId,
709
268
        cell_counts_toward_windows: bool,
710
268
        streamid: StreamId,
711
268
        msg: UnparsedRelayMsg,
712
268
    ) -> Result<Option<CircuitCmd>> {
713
268
        let now = self.runtime.now();
714

            
715
        #[cfg(feature = "conflux")]
716
268
        if let Some(conflux) = self.conflux_handler.as_mut() {
717
76
            conflux.inc_last_seq_delivered(&msg);
718
192
        }
719

            
720
268
        let path = self.mutable.path();
721

            
722
268
        let nonexistent_hop_err = || Error::CircProto("Cell from nonexistent hop!".into());
723
268
        let hop = self.hop_mut(hopnum).ok_or_else(nonexistent_hop_err)?;
724

            
725
268
        let hop_detail = path
726
268
            .iter()
727
268
            .nth(usize::from(hopnum))
728
268
            .ok_or_else(nonexistent_hop_err)?;
729

            
730
        // Returns the original message if it's an incoming stream request
731
        // that we need to handle.
732
268
        let res = hop.handle_msg(hop_detail, cell_counts_toward_windows, streamid, msg, now)?;
733

            
734
        // If it was an incoming stream request, we don't need to worry about
735
        // sending an XOFF as there's no stream data within this message.
736
264
        if let Some(msg) = res {
737
            cfg_if::cfg_if! {
738
                if #[cfg(feature = "hs-service")] {
739
48
                    return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum, leg);
740
                } else {
741
                    return Err(internal!("incoming stream not rejected, but hs-service feature is disabled?!").into());
742
                }
743
            }
744
216
        }
745

            
746
        // We may want to send an XOFF if the incoming buffer is too large.
747
216
        if let Some(cell) = hop.maybe_send_xoff(streamid)? {
748
            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
749
            let cell = SendRelayCell {
750
                hop: Some(hopnum),
751
                early: false,
752
                cell,
753
            };
754
            return Ok(Some(CircuitCmd::Send(cell)));
755
216
        }
756

            
757
216
        Ok(None)
758
268
    }
759

            
760
    /// Handle a conflux message coming from the specified hop.
761
    ///
762
    /// Returns an error if
763
    ///
764
    ///   * this is not a conflux circuit (i.e. it doesn't have a [`ConfluxMsgHandler`])
765
    ///   * this is a client circuit and the conflux message originated an unexpected hop
766
    ///   * the cell was sent in violation of the handshake protocol
767
    #[cfg(feature = "conflux")]
768
128
    fn handle_conflux_msg(
769
128
        &mut self,
770
128
        hop: HopNum,
771
128
        msg: UnparsedRelayMsg,
772
128
    ) -> Result<Option<ConfluxCmd>> {
773
128
        let Some(conflux_handler) = self.conflux_handler.as_mut() else {
774
            // If conflux is not enabled, tear down the circuit
775
            // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329)
776
16
            return Err(Error::CircProto(format!(
777
16
                "Received {} cell from hop {} on non-conflux client circuit?!",
778
16
                msg.cmd(),
779
16
                hop.display(),
780
16
            )));
781
        };
782

            
783
112
        Ok(conflux_handler.handle_conflux_msg(msg, hop))
784
128
    }
785

            
786
    /// For conflux: return the sequence number of the last cell sent on this leg.
787
    ///
788
    /// Returns an error if this circuit is not part of a conflux set.
789
    #[cfg(feature = "conflux")]
790
48
    pub(super) fn last_seq_sent(&self) -> Result<u64> {
791
48
        let handler = self
792
48
            .conflux_handler
793
48
            .as_ref()
794
48
            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
795

            
796
48
        Ok(handler.last_seq_sent())
797
48
    }
798

            
799
    /// For conflux: set the sequence number of the last cell sent on this leg.
800
    ///
801
    /// Returns an error if this circuit is not part of a conflux set.
802
    #[cfg(feature = "conflux")]
803
8
    pub(super) fn set_last_seq_sent(&mut self, n: u64) -> Result<()> {
804
8
        let handler = self
805
8
            .conflux_handler
806
8
            .as_mut()
807
8
            .ok_or_else(|| internal!("tried to get last_seq_sent of non-conflux circ"))?;
808

            
809
8
        handler.set_last_seq_sent(n);
810
8
        Ok(())
811
8
    }
812

            
813
    /// For conflux: return the sequence number of the last cell received on this leg.
814
    ///
815
    /// Returns an error if this circuit is not part of a conflux set.
816
    #[cfg(feature = "conflux")]
817
32
    pub(super) fn last_seq_recv(&self) -> Result<u64> {
818
32
        let handler = self
819
32
            .conflux_handler
820
32
            .as_ref()
821
32
            .ok_or_else(|| internal!("tried to get last_seq_recv of non-conflux circ"))?;
822

            
823
32
        Ok(handler.last_seq_recv())
824
32
    }
825

            
826
    /// A helper for handling incoming stream requests.
827
    ///
828
    // TODO: can we make this a method on CircHop to avoid the double HopNum lookup?
829
    #[cfg(feature = "hs-service")]
830
48
    fn handle_incoming_stream_request(
831
48
        &mut self,
832
48
        handlers: &mut CellHandlers,
833
48
        msg: UnparsedRelayMsg,
834
48
        stream_id: StreamId,
835
48
        hop_num: HopNum,
836
48
        leg: UniqId,
837
48
    ) -> Result<Option<CircuitCmd>> {
838
        use tor_cell::relaycell::msg::EndReason;
839
        use tor_error::into_internal;
840
        use tor_log_ratelim::log_ratelim;
841

            
842
        use crate::client::circuit::CIRCUIT_BUFFER_SIZE;
843
        use crate::stream::incoming::StreamReqInfo;
844

            
845
        // We need to construct this early so that we don't double-borrow &mut self
846

            
847
48
        let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
848
            return Err(Error::CircProto(
849
                "Cannot handle BEGIN cells on this circuit".into(),
850
            ));
851
        };
852

            
853
        // The handler's hop_num is only ever set to None for relays.
854
48
        let expected_hop_num = handler
855
48
            .hop_num
856
48
            .ok_or_else(|| internal!("Handler HopNum is None in client impl?!"))?;
857

            
858
48
        if hop_num != expected_hop_num {
859
12
            return Err(Error::CircProto(format!(
860
12
                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
861
12
                expected_hop_num.display(),
862
12
                msg.cmd(),
863
12
                hop_num.display()
864
12
            )));
865
36
        }
866

            
867
36
        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
868

            
869
        // TODO: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
870
        // have to look it up again! However, we can't pass the `&mut hop` reference from
871
        // `handle_relay_cell` to this function, because that makes Rust angry (we'd be
872
        // borrowing self as mutable more than once).
873
        //
874
        // TODO: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
875
        // handle_relay_cell to work around the problem described above
876
36
        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
877

            
878
36
        if message_closes_stream {
879
            hop.ending_msg_received(stream_id)?;
880

            
881
            return Ok(None);
882
36
        }
883

            
884
36
        let begin = msg
885
36
            .decode::<Begin>()
886
36
            .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
887
36
            .into_msg();
888

            
889
36
        let req = IncomingStreamRequest::Begin(begin);
890

            
891
        {
892
            use crate::client::stream::IncomingStreamRequestDisposition::*;
893

            
894
36
            let ctx = crate::client::stream::IncomingStreamRequestContext { request: &req };
895
            // IMPORTANT: super::syncview::CircHopSyncView::n_open_streams() (called via disposition() below)
896
            // accesses the stream map mutexes!
897
            //
898
            // This means it's very important not to call this function while any of the hop's
899
            // stream map mutex is held.
900
36
            let view = CircHopSyncView::new(hop.outbound());
901

            
902
36
            match handler.filter.as_mut().disposition(&ctx, &view)? {
903
36
                Accept => {}
904
                CloseCircuit => return Ok(Some(CircuitCmd::CleanShutdown)),
905
                RejectRequest(end) => {
906
                    let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
907
                    let cell = SendRelayCell {
908
                        hop: Some(hop_num),
909
                        early: false,
910
                        cell: end_msg,
911
                    };
912
                    return Ok(Some(CircuitCmd::Send(cell)));
913
                }
914
            }
915
        }
916

            
917
        // TODO: Sadly, we need to look up `&mut hop` yet again,
918
        // since we needed to pass `&self.hops` by reference to our filter above. :(
919
36
        let hop = self.hops.get_mut(hop_num).ok_or(Error::CircuitClosed)?;
920
36
        let relay_cell_format = hop.relay_cell_format();
921

            
922
36
        let memquota = StreamAccount::new(&self.memquota)?;
923

            
924
36
        let (sender, receiver) = stream_queue(
925
            #[cfg(not(feature = "flowctl-cc"))]
926
            STREAM_READER_BUFFER,
927
36
            &memquota,
928
36
            self.chan_sender.time_provider(),
929
        )?;
930

            
931
36
        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(
932
36
            self.chan_sender.time_provider().clone(),
933
36
            memquota.as_raw_account(),
934
        )?;
935

            
936
36
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
937

            
938
        // A channel for the reactor to request a new drain rate from the reader.
939
        // Typically this notification will be sent after an XOFF is sent so that the reader can
940
        // send us a new drain rate when the stream data queue becomes empty.
941
36
        let mut drain_rate_request_tx = NotifySender::new_typed();
942
36
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
943

            
944
36
        let cmd_checker = InboundDataCmdChecker::new_connected();
945
36
        hop.add_ent_with_id(
946
36
            sender,
947
36
            msg_rx,
948
36
            rate_limit_tx,
949
36
            drain_rate_request_tx,
950
36
            stream_id,
951
36
            cmd_checker,
952
        )?;
953

            
954
36
        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
955
36
            req,
956
36
            stream_id,
957
36
            hop: Some((leg, hop_num).into()),
958
36
            msg_tx,
959
36
            receiver,
960
36
            rate_limit_stream: rate_limit_rx,
961
36
            drain_rate_request_stream: drain_rate_request_rx,
962
36
            memquota,
963
36
            relay_cell_format,
964
36
        });
965

            
966
36
        log_ratelim!("Delivering message to incoming stream handler"; outcome);
967

            
968
36
        if let Err(e) = outcome {
969
            if e.is_full() {
970
                // The IncomingStreamRequestHandler's stream is full; it isn't
971
                // handling requests fast enough. So instead, we reply with an
972
                // END cell.
973
                let end_msg = AnyRelayMsgOuter::new(
974
                    Some(stream_id),
975
                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
976
                );
977

            
978
                let cell = SendRelayCell {
979
                    hop: Some(hop_num),
980
                    early: false,
981
                    cell: end_msg,
982
                };
983
                return Ok(Some(CircuitCmd::Send(cell)));
984
            } else if e.is_disconnected() {
985
                // The IncomingStreamRequestHandler's stream has been dropped.
986
                // In the Tor protocol as it stands, this always means that the
987
                // circuit itself is out-of-use and should be closed. (See notes
988
                // on `allow_stream_requests.`)
989
                //
990
                // Note that we will _not_ reach this point immediately after
991
                // the IncomingStreamRequestHandler is dropped; we won't hit it
992
                // until we next get an incoming request.  Thus, if we do later
993
                // want to add early detection for a dropped
994
                // IncomingStreamRequestHandler, we need to do it elsewhere, in
995
                // a different way.
996
                debug!(
997
                    circ_id = %self.unique_id,
998
                    "Incoming stream request receiver dropped",
999
                );
                // This will _cause_ the circuit to get closed.
                return Err(Error::CircuitClosed);
            } else {
                // There are no errors like this with the current design of
                // futures::mpsc, but we shouldn't just ignore the possibility
                // that they'll be added later.
                return Err(Error::from((into_internal!(
                    "try_send failed unexpectedly"
                ))(e)));
            }
36
        }
36
        Ok(None)
48
    }
    /// Helper: process a destroy cell.
    #[allow(clippy::unnecessary_wraps)]
12
    fn handle_destroy_cell(&mut self) -> Result<CircuitCmd> {
        // I think there is nothing more to do here.
12
        Ok(CircuitCmd::CleanShutdown)
12
    }
    /// Handle a [`CtrlMsg::Create`](super::CtrlMsg::Create) message.
48
    pub(super) async fn handle_create(
48
        &mut self,
48
        recv_created: oneshot::Receiver<CreateResponse>,
48
        handshake: CircuitHandshake,
48
        settings: HopSettings,
48
        done: ReactorResultChannel<()>,
72
    ) -> StdResult<(), ReactorError> {
48
        let ret = match handshake {
12
            CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, settings).await,
            CircuitHandshake::Ntor {
12
                public_key,
12
                ed_identity,
            } => {
12
                self.create_firsthop_ntor(recv_created, ed_identity, public_key, settings)
12
                    .await
            }
24
            CircuitHandshake::NtorV3 { public_key } => {
24
                self.create_firsthop_ntor_v3(recv_created, public_key, settings)
24
                    .await
            }
        };
48
        let _ = done.send(ret); // don't care if sender goes away
        // TODO: maybe we don't need to flush here?
        // (we could let run_once() handle all the flushing)
48
        self.chan_sender.flush().await?;
48
        Ok(())
48
    }
    /// Helper: create the first hop of a circuit.
    ///
    /// This is parameterized not just on the RNG, but a wrapper object to
    /// build the right kind of create cell, and a handshake object to perform
    /// the cryptographic handshake.
48
    async fn create_impl<H, W, M>(
48
        &mut self,
48
        recvcreated: oneshot::Receiver<CreateResponse>,
48
        wrap: &W,
48
        key: &H::KeyType,
48
        mut settings: HopSettings,
48
        msg: &M,
48
    ) -> Result<()>
48
    where
48
        H: ClientHandshake + HandshakeAuxDataHandler,
48
        W: CreateHandshakeWrap,
48
        H::KeyGen: KeyGenerator,
48
        M: Borrow<H::ClientAuxData>,
48
    {
        // We don't need to shut down the circuit on failure here, since this
        // function consumes the PendingClientCirc and only returns
        // a ClientCirc on success.
48
        let (state, msg) = {
            // done like this because holding the RNG across an await boundary makes the future
            // non-Send
48
            let mut rng = rand::rng();
48
            H::client1(&mut rng, key, msg)?
        };
48
        let create_cell = wrap.to_chanmsg(msg);
48
        trace!(
            circ_id = %self.unique_id,
            create = %create_cell.cmd(),
            "Extending to hop 1",
        );
48
        self.send_msg(create_cell, None).await?;
48
        let reply = recvcreated
48
            .await
48
            .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
48
        let relay_handshake = wrap.decode_chanmsg(reply)?;
48
        let (server_msg, keygen) = H::client2(state, relay_handshake)?;
48
        H::handle_server_aux_data(&mut settings, &server_msg)?;
48
        let BoxedClientLayer { fwd, back, binding } = settings
48
            .relay_crypt_protocol()
48
            .construct_client_layers(HandshakeRole::Initiator, keygen)?;
48
        trace!(circ_id = %self.unique_id, "Handshake complete; circuit created.");
48
        let peer_id = self.channel.target().clone();
48
        self.add_hop(
48
            path::HopDetail::Relay(peer_id),
48
            fwd,
48
            back,
48
            binding,
48
            &settings,
        )?;
48
        Ok(())
48
    }
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
    /// first hop of this circuit.
    ///
    /// There's no authentication in CREATE_FAST,
    /// so we don't need to know whom we're connecting to: we're just
    /// connecting to whichever relay the channel is for.
12
    async fn create_firsthop_fast(
12
        &mut self,
12
        recvcreated: oneshot::Receiver<CreateResponse>,
12
        settings: HopSettings,
18
    ) -> Result<()> {
        // In a CREATE_FAST handshake, we can't negotiate a format other than this.
12
        let wrap = CreateFastWrap;
12
        self.create_impl::<CreateFastClient, _, _>(recvcreated, &wrap, &(), settings, &())
12
            .await
12
    }
    /// Use the ntor handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided keys must match the channel's target,
    /// or the handshake will fail.
12
    async fn create_firsthop_ntor(
12
        &mut self,
12
        recvcreated: oneshot::Receiver<CreateResponse>,
12
        ed_identity: pk::ed25519::Ed25519Identity,
12
        pubkey: NtorPublicKey,
12
        settings: HopSettings,
18
    ) -> Result<()> {
        // Exit now if we have an Ed25519 or RSA identity mismatch.
12
        let target = RelayIds::builder()
12
            .ed_identity(ed_identity)
12
            .rsa_identity(pubkey.id)
12
            .build()
12
            .expect("Unable to build RelayIds");
12
        self.channel.check_match(&target)?;
12
        let wrap = Create2Wrap {
12
            handshake_type: HandshakeType::NTOR,
12
        };
12
        self.create_impl::<NtorClient, _, _>(recvcreated, &wrap, &pubkey, settings, &())
12
            .await
12
    }
    /// Use the ntor-v3 handshake to connect to the first hop of this circuit.
    ///
    /// Note that the provided key must match the channel's target,
    /// or the handshake will fail.
24
    async fn create_firsthop_ntor_v3(
24
        &mut self,
24
        recvcreated: oneshot::Receiver<CreateResponse>,
24
        pubkey: NtorV3PublicKey,
24
        settings: HopSettings,
36
    ) -> Result<()> {
        // Exit now if we have a mismatched key.
24
        let target = RelayIds::builder()
24
            .ed_identity(pubkey.id)
24
            .build()
24
            .expect("Unable to build RelayIds");
24
        self.channel.check_match(&target)?;
        // Set the client extensions.
24
        let client_extensions = settings.circuit_request_extensions()?;
24
        let wrap = Create2Wrap {
24
            handshake_type: HandshakeType::NTOR_V3,
24
        };
24
        self.create_impl::<NtorV3Client, _, _>(
24
            recvcreated,
24
            &wrap,
24
            &pubkey,
24
            settings,
24
            &client_extensions,
24
        )
24
        .await
24
    }
    /// Add a hop to the end of this circuit.
    ///
    /// Will return an error if the circuit already has [`u8::MAX`] hops.
1020
    pub(super) fn add_hop(
1020
        &mut self,
1020
        peer_id: path::HopDetail,
1020
        fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1020
        rev: Box<dyn InboundClientLayer + 'static + Send>,
1020
        binding: Option<CircuitBinding>,
1020
        settings: &HopSettings,
1020
    ) -> StdResult<(), Bug> {
1020
        let hop_num = self.hops.len();
1020
        debug_assert_eq!(hop_num, usize::from(self.num_hops()));
        // There are several places in the code that assume that a `usize` hop number
        // can be cast or converted to a `u8` hop number,
        // so this check is important to prevent panics or incorrect behaviour.
1020
        if hop_num == usize::from(u8::MAX) {
            return Err(internal!(
                "cannot add more hops to a circuit with `u8::MAX` hops"
            ));
1020
        }
1020
        let hop_num = (hop_num as u8).into();
1020
        let hop = CircHop::new(self.unique_id, hop_num, settings);
1020
        self.hops.push(hop);
1020
        self.crypto_in.add_layer(rev);
1020
        self.crypto_out.add_layer(fwd);
1020
        self.mutable.add_hop(peer_id, binding);
1020
        Ok(())
1020
    }
    /// Handle a RELAY cell on this circuit with stream ID 0.
    ///
    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
    /// This function returns a `CircProto` error if `msg` is an unsupported,
    /// unexpected, or otherwise invalid message:
    ///
    ///   * unexpected messages are rejected by returning an error using
    ///     [`unsupported_client_cell`]
    ///   * SENDME/TRUNCATED messages are rejected if they don't parse
    ///   * SENDME authentication tags are validated inside [`Circuit::handle_sendme`]
    ///   * conflux cells are handled in the client [`ConfluxMsgHandler`]
    ///
    /// The error is propagated all the way up to [`Circuit::handle_cell`],
    /// and eventually ends up being returned from the reactor's `run_once` function,
    /// causing it to shut down.
    #[allow(clippy::cognitive_complexity)]
232
    fn handle_meta_cell(
232
        &mut self,
232
        handlers: &mut CellHandlers,
232
        hopnum: HopNum,
232
        msg: UnparsedRelayMsg,
232
    ) -> Result<Option<CircuitCmd>> {
        // SENDME cells and TRUNCATED get handled internally by the circuit.
        // TODO: This pattern (Check command, try to decode, map error) occurs
        // several times, and would be good to extract simplify. Such
        // simplification is obstructed by a couple of factors: First, that
        // there is not currently a good way to get the RelayCmd from _type_ of
        // a RelayMsg.  Second, that decode() [correctly] consumes the
        // UnparsedRelayMsg.  I tried a macro-based approach, and didn't care
        // for it. -nickm
232
        if msg.cmd() == RelayCmd::SENDME {
44
            let sendme = msg
44
                .decode::<Sendme>()
44
                .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
44
                .into_msg();
44
            return Ok(Some(CircuitCmd::HandleSendMe {
44
                hop: hopnum,
44
                sendme,
44
            }));
188
        }
188
        if msg.cmd() == RelayCmd::TRUNCATED {
            let truncated = msg
                .decode::<Truncated>()
                .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
                .into_msg();
            let reason = truncated.reason();
            debug!(
                circ_id = %self.unique_id,
                "Truncated from hop {}. Reason: {} [{}]",
                hopnum.display(),
                reason.human_str(),
                reason
            );
            return Ok(Some(CircuitCmd::CleanShutdown));
188
        }
188
        if msg.cmd() == RelayCmd::DROP {
            cfg_if::cfg_if! {
                if #[cfg(feature = "circ-padding")] {
                    return Ok(None);
                } else {
                    use crate::util::err::ExcessPadding;
                    return Err(Error::ExcessPadding(ExcessPadding::NoPaddingNegotiated, hopnum));
                }
            }
188
        }
188
        trace!(circ_id = %self.unique_id, cell = ?msg, "Received meta-cell");
        #[cfg(feature = "conflux")]
60
        if matches!(
188
            msg.cmd(),
            RelayCmd::CONFLUX_LINK
                | RelayCmd::CONFLUX_LINKED
                | RelayCmd::CONFLUX_LINKED_ACK
                | RelayCmd::CONFLUX_SWITCH
        ) {
128
            let cmd = self.handle_conflux_msg(hopnum, msg)?;
112
            return Ok(cmd.map(CircuitCmd::from));
60
        }
60
        if self.is_conflux_pending() {
            warn!(
                circ_id = %self.unique_id,
                "received unexpected cell {msg:?} on unlinked conflux circuit",
            );
            return Err(Error::CircProto(
                "Received unexpected cell on unlinked circuit".into(),
            ));
60
        }
        // For all other command types, we'll only get them in response
        // to another command, which should have registered a responder.
        //
        // TODO: should the conflux state machine be a meta cell handler?
        // We'd need to add support for multiple meta handlers, and change the
        // MetaCellHandler API to support returning Option<RunOnceCmdInner>
        // (because some cells will require sending a response)
60
        if let Some(mut handler) = handlers.meta_handler.take() {
            // The handler has a TargetHop so we do a quick convert for equality check.
60
            if handler.expected_hop() == (self.unique_id(), hopnum).into() {
                // Somebody was waiting for a message -- maybe this message
48
                let ret = handler.handle_msg(msg, self);
48
                trace!(
                    circ_id = %self.unique_id,
                    result = ?ret,
                    "meta handler completed",
                );
24
                match ret {
                    #[cfg(feature = "send-control-msg")]
                    Ok(MetaCellDisposition::Consumed) => {
                        handlers.meta_handler = Some(handler);
                        Ok(None)
                    }
24
                    Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
                    #[cfg(feature = "send-control-msg")]
                    Ok(MetaCellDisposition::CloseCirc) => Ok(Some(CircuitCmd::CleanShutdown)),
24
                    Err(e) => Err(e),
                }
            } else {
                // Somebody wanted a message from a different hop!  Put this
                // one back.
12
                handlers.meta_handler = Some(handler);
12
                unsupported_client_cell!(msg, hopnum)
            }
        } else {
            // No need to call shutdown here, since this error will
            // propagate to the reactor shut it down.
            unsupported_client_cell!(msg)
        }
232
    }
    /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
    #[instrument(level = "trace", skip_all)]
44
    pub(super) fn handle_sendme(
44
        &mut self,
44
        hopnum: HopNum,
44
        msg: Sendme,
44
        signals: CongestionSignals,
44
    ) -> Result<Option<CircuitCmd>> {
        // Cloned, because we borrow mutably from self when we get the circhop.
44
        let runtime = self.runtime.clone();
        // No need to call "shutdown" on errors in this function;
        // it's called from the reactor task and errors will propagate there.
44
        let hop = self
44
            .hop_mut(hopnum)
44
            .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
44
        let tag = msg.into_sendme_tag().ok_or_else(||
                // Versions of Tor <=0.3.5 would omit a SENDME tag in this case;
                // but we don't support those any longer.
                 Error::CircProto("missing tag on circuit sendme".into()))?;
        // Update the CC object that we received a SENDME along with possible congestion signals.
44
        hop.ccontrol()
44
            .note_sendme_received(&runtime, tag, signals)?;
40
        Ok(None)
44
    }
    /// Send a message onto the circuit's channel.
    ///
    /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
    /// will be enqueued for sending at a later iteration of the reactor loop.
    ///
    /// `info` is the status returned from the padding controller when we told it we were queueing
    /// this data.  It should be provided whenever possible.
    ///
    /// # Note
    ///
    /// Making use of the enqueuing capabilities of this function is discouraged! You should first
    /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
    /// ideally use this to implement backpressure (such that you do not read from other sources
    /// that would send here while you know you're unable to forward the messages on).
    #[instrument(level = "trace", skip_all)]
4642
    async fn send_msg(
4642
        &mut self,
4642
        msg: AnyChanMsg,
4642
        info: Option<QueuedCellPaddingInfo>,
6963
    ) -> Result<()> {
        let cell = AnyChanCell::new(Some(self.channel_id), msg);
        // Note: this future is always `Ready`, so await won't block.
        Pin::new(&mut self.chan_sender)
            .send_unbounded((cell, info))
            .await?;
        Ok(())
4642
    }
    /// Remove all halfstreams that are expired at `now`.
7046
    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
7046
        self.hops.remove_expired_halfstreams(now);
7046
    }
    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
4112
    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
4112
        self.hops.hop(hopnum)
4112
    }
    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
1794
    pub(super) fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
1794
        self.hops.get_mut(hopnum)
1794
    }
    /// Begin a stream with the provided hop in this circuit.
    // TODO: see if there's a way that we can clean this up
    #[allow(clippy::too_many_arguments)]
96
    pub(super) fn begin_stream(
96
        &mut self,
96
        hop_num: HopNum,
96
        message: AnyRelayMsg,
96
        sender: StreamQueueSender,
96
        rx: StreamMpscReceiver<AnyRelayMsg>,
96
        rate_limit_notifier: watch::Sender<StreamRateLimit>,
96
        drain_rate_requester: NotifySender<DrainRateRequest>,
96
        cmd_checker: AnyCmdChecker,
96
    ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
96
        let Some(hop) = self.hop_mut(hop_num) else {
            return Err(internal!(
                "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
                self.unique_id,
            ));
        };
96
        Ok(hop.begin_stream(
96
            message,
96
            sender,
96
            rx,
96
            rate_limit_notifier,
96
            drain_rate_requester,
96
            cmd_checker,
96
        ))
96
    }
    /// Close the specified stream
    #[instrument(level = "trace", skip_all)]
70
    pub(super) async fn close_stream(
70
        &mut self,
70
        hop_num: HopNum,
70
        sid: StreamId,
70
        behav: CloseStreamBehavior,
70
        reason: streammap::TerminateReason,
70
        expiry: Instant,
105
    ) -> Result<()> {
        if let Some(hop) = self.hop_mut(hop_num) {
            let res = hop.close_stream(sid, behav, reason, expiry)?;
            if let Some(cell) = res {
                self.send_relay_cell(cell).await?;
            }
        }
        Ok(())
70
    }
    /// Returns true if there are any streams on this circuit
    ///
    /// Important: this function locks the stream map of its each of the [`CircHop`]s
    /// in this circuit, so it must **not** be called from any function where the
    /// stream map lock is held.
52
    pub(super) fn has_streams(&self) -> bool {
52
        self.hops.has_streams()
52
    }
    /// The number of hops in this circuit.
1272
    pub(super) fn num_hops(&self) -> u8 {
        // `Circuit::add_hop` checks to make sure that we never have more than `u8::MAX` hops,
        // so `self.hops.len()` should be safe to cast to a `u8`.
        // If that assumption is violated,
        // we choose to panic rather than silently use the wrong hop due to an `as` cast.
1272
        self.hops
1272
            .len()
1272
            .try_into()
1272
            .expect("`hops.len()` has more than `u8::MAX` hops")
1272
    }
    /// Check whether this circuit has any hops.
4886
    pub(super) fn has_hops(&self) -> bool {
4886
        !self.hops.is_empty()
4886
    }
    /// Get the `HopNum` of the last hop, if this circuit is non-empty.
    ///
    /// Returns `None` if the circuit has no hops.
252
    pub(super) fn last_hop_num(&self) -> Option<HopNum> {
252
        let num_hops = self.num_hops();
252
        if num_hops == 0 {
            // asked for the last hop, but there are no hops
            return None;
252
        }
252
        Some(HopNum::from(num_hops - 1))
252
    }
    /// Get the path of the circuit.
    ///
    /// **Warning:** Do not call while already holding the [`Self::mutable`] lock.
120
    pub(super) fn path(&self) -> Arc<path::Path> {
120
        self.mutable.path()
120
    }
    /// Return a ClockSkew declaring how much clock skew the other side of this channel
    /// claimed that we had when we negotiated the connection.
    pub(super) fn clock_skew(&self) -> ClockSkew {
        self.channel.clock_skew()
    }
    /// Does congestion control use stream SENDMEs for the given `hop`?
    ///
    /// Returns `None` if `hop` doesn't exist.
    pub(super) fn uses_stream_sendme(&self, hop: HopNum) -> Option<bool> {
        let hop = self.hop(hop)?;
        Some(hop.ccontrol().uses_stream_sendme())
    }
    /// Returns whether this is a conflux circuit that is not linked yet.
4566
    pub(super) fn is_conflux_pending(&self) -> bool {
4566
        let Some(status) = self.conflux_status() else {
3266
            return false;
        };
1300
        status != ConfluxStatus::Linked
4566
    }
    /// Returns the conflux status of this circuit.
    ///
    /// Returns `None` if this is not a conflux circuit.
4902
    pub(super) fn conflux_status(&self) -> Option<ConfluxStatus> {
        cfg_if::cfg_if! {
            if #[cfg(feature = "conflux")] {
4902
                self.conflux_handler
4902
                    .as_ref()
5612
                    .map(|handler| handler.status())
            } else {
                None
            }
        }
4902
    }
    /// Returns initial RTT on this leg, measured in the conflux handshake.
    #[cfg(feature = "conflux")]
1512
    pub(super) fn init_rtt(&self) -> Option<Duration> {
1512
        self.conflux_handler
1512
            .as_ref()
2268
            .map(|handler| handler.init_rtt())?
1512
    }
    /// Start or stop padding at the given hop.
    ///
    /// Replaces any previous padder at that hop.
    ///
    /// Return an error if that hop doesn't exist.
    #[cfg(feature = "circ-padding-manual")]
    pub(super) fn set_padding_at_hop(
        &self,
        hop: HopNum,
        padder: Option<padding::CircuitPadder>,
    ) -> Result<()> {
        if self.hop(hop).is_none() {
            return Err(Error::NoSuchHop);
        }
        self.padding_ctrl.install_padder_padding_at_hop(hop, padder);
        Ok(())
    }
    /// Determine how exactly to handle a request to handle padding.
    ///
    /// This is fairly complicated; see the maybenot documentation for more information.
    ///
    /// ## Limitations
    ///
    /// In our current padding implementation, a circuit is either blocked or not blocked:
    /// we do not keep track of which hop is actually doing the blocking.
    #[cfg(feature = "circ-padding")]
    fn padding_disposition(&self, send_padding: &padding::SendPadding) -> CircPaddingDisposition {
        crate::circuit::padding::padding_disposition(
            send_padding,
            &self.chan_sender,
            self.padding_block.as_ref(),
        )
    }
    /// Handle a request from our padding subsystem to send a padding packet.
    #[cfg(feature = "circ-padding")]
    pub(super) async fn send_padding(&mut self, send_padding: padding::SendPadding) -> Result<()> {
        use CircPaddingDisposition::*;
        let target_hop = send_padding.hop;
        match self.padding_disposition(&send_padding) {
            QueuePaddingNormally => {
                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
                self.queue_padding_cell_for_hop(target_hop, queue_info)
                    .await?;
            }
            QueuePaddingAndBypass => {
                let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
                self.queue_padding_cell_for_hop(target_hop, queue_info)
                    .await?;
            }
            TreatQueuedCellAsPadding => {
                self.padding_ctrl
                    .replaceable_padding_already_queued(target_hop, send_padding);
            }
        }
        Ok(())
    }
    /// Generate and encrypt a padding cell, and send it to a targeted hop.
    ///
    /// Ignores any padding-based blocking.
    #[cfg(feature = "circ-padding")]
    async fn queue_padding_cell_for_hop(
        &mut self,
        target_hop: HopNum,
        queue_info: Option<QueuedCellPaddingInfo>,
    ) -> Result<()> {
        use tor_cell::relaycell::msg::Drop as DropMsg;
        let msg = SendRelayCell {
            hop: Some(target_hop),
            // TODO circpad: we will probably want padding machines that can send EARLY cells.
            early: false,
            cell: AnyRelayMsgOuter::new(None, DropMsg::default().into()),
        };
        self.send_relay_cell_inner(msg, queue_info).await
    }
    /// Enable padding-based blocking,
    /// or change the rule for padding-based blocking to the one in `block`.
    #[cfg(feature = "circ-padding")]
    pub(super) fn start_blocking_for_padding(&mut self, block: padding::StartBlocking) {
        self.chan_sender.start_blocking();
        self.padding_block = Some(block);
    }
    /// Disable padding-based blocking.
    #[cfg(feature = "circ-padding")]
    pub(super) fn stop_blocking_for_padding(&mut self) {
        self.chan_sender.stop_blocking();
        self.padding_block = None;
    }
    /// The estimated circuit build timeout for a circuit of the specified length.
70
    pub(super) fn estimate_cbt(&self, length: usize) -> Duration {
70
        self.timeouts.circuit_build_timeout(length)
70
    }
}
impl Drop for Circuit {
324
    fn drop(&mut self) {
324
        let _ = self.channel.close_circuit(self.channel_id);
324
    }
}