1
//! Multi-hop paths over the Tor network.
2
//!
3
//! Right now, we only implement "client circuits" -- also sometimes
4
//! called "origin circuits".  A client circuit is one that is
5
//! constructed by this Tor instance, and used in its own behalf to
6
//! send data over the Tor network.
7
//!
8
//! Each circuit has multiple hops over the Tor network: each hop
9
//! knows only the hop before and the hop after.  The client shares a
10
//! separate set of keys with each hop.
11
//!
12
//! To build a circuit, first create a [crate::channel::Channel], then
13
//! call its [crate::channel::Channel::new_tunnel] method.  This yields
14
//! a [PendingClientTunnel] object that won't become live until you call
15
//! one of the methods
16
//! (typically [`PendingClientTunnel::create_firsthop`])
17
//! that extends it to its first hop.  After you've
18
//! done that, you can call [`ClientCirc::extend`] on the tunnel to
19
//! build it into a multi-hop tunnel.  Finally, you can use
20
//! [ClientTunnel::begin_stream] to get a Stream object that can be used
21
//! for anonymized data.
22
//!
23
//! # Implementation
24
//!
25
//! Each open circuit has a corresponding Reactor object that runs in
26
//! an asynchronous task, and manages incoming cells from the
27
//! circuit's upstream channel.  These cells are either RELAY cells or
28
//! DESTROY cells.  DESTROY cells are handled immediately.
29
//! RELAY cells are either for a particular stream, in which case they
30
//! get forwarded to a RawCellStream object, or for no particular stream,
31
//! in which case they are considered "meta" cells (like EXTENDED2)
32
//! that should only get accepted if something is waiting for them.
33
//!
34
//! # Limitations
35
//!
36
//! This is client-only.
37

            
38
pub(crate) mod halfcirc;
39

            
40
#[cfg(feature = "hs-common")]
41
pub mod handshake;
42
#[cfg(not(feature = "hs-common"))]
43
pub(crate) mod handshake;
44

            
45
pub(crate) mod padding;
46

            
47
pub(super) mod path;
48

            
49
use crate::channel::Channel;
50
use crate::circuit::circhop::{HopNegotiationType, HopSettings};
51
use crate::circuit::{CircuitRxReceiver, celltypes::*};
52
#[cfg(feature = "circ-padding-manual")]
53
use crate::client::CircuitPadder;
54
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
55
use crate::client::reactor::{CircuitHandshake, CtrlCmd, CtrlMsg, Reactor};
56
use crate::crypto::cell::HopNum;
57
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
58
use crate::memquota::CircuitAccount;
59
use crate::util::skew::ClockSkew;
60
use crate::{Error, Result};
61
use derive_deftly::Deftly;
62
use educe::Educe;
63
use path::HopDetail;
64
use tor_cell::chancell::{
65
    CircId,
66
    msg::{self as chanmsg},
67
};
68
use tor_error::{bad_api_usage, internal, into_internal};
69
use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
70
use tor_protover::named;
71
use tor_rtcompat::DynTimeProvider;
72

            
73
use crate::circuit::UniqId;
74

            
75
use super::{ClientTunnel, TargetHop};
76

            
77
use futures::channel::mpsc;
78
use oneshot_fused_workaround as oneshot;
79

            
80
use futures::FutureExt as _;
81
use std::collections::HashMap;
82
use std::sync::{Arc, Mutex};
83
use tor_memquota::derive_deftly_template_HasMemoryCost;
84

            
85
use crate::crypto::handshake::ntor::NtorPublicKey;
86

            
87
#[cfg(test)]
88
use crate::stream::{StreamMpscReceiver, StreamMpscSender};
89

            
90
pub use crate::crypto::binding::CircuitBinding;
91
pub use path::{Path, PathEntry};
92

            
93
/// The size of the buffer for communication between `ClientCirc` and its reactor.
94
pub const CIRCUIT_BUFFER_SIZE: usize = 128;
95

            
96
// TODO: export this from the top-level instead (it's not client-specific).
97
pub use crate::circuit::CircParameters;
98

            
99
// TODO(relay): reexport this from somewhere else (it's not client-specific)
100
pub use crate::util::timeout::TimeoutEstimator;
101

            
102
/// A subclass of ChanMsg that can correctly arrive on a live client
103
/// circuit (one where a CREATED* has been received).
104
#[derive(Debug, Deftly)]
105
#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
106
#[derive_deftly(HasMemoryCost)]
107
#[derive_deftly(RestrictedChanMsgSet)]
108
#[deftly(usage = "on an open client circuit")]
109
pub(super) enum ClientCircChanMsg {
110
    /// A relay cell telling us some kind of remote command from some
111
    /// party on the circuit.
112
    Relay(chanmsg::Relay),
113
    /// A cell telling us to destroy the circuit.
114
    Destroy(chanmsg::Destroy),
115
    // Note: RelayEarly is not valid for clients!
116
}
117

            
118
#[derive(Debug)]
119
/// A circuit that we have constructed over the Tor network.
120
///
121
/// # Circuit life cycle
122
///
123
/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_tunnel`],
124
/// which returns a [`PendingClientTunnel`].  To get a real (one-hop) tunnel from
125
/// one of these, you invoke one of its `create_firsthop` methods (typically
126
/// [`create_firsthop_fast()`](PendingClientTunnel::create_firsthop_fast) or
127
/// [`create_firsthop()`](PendingClientTunnel::create_firsthop)).
128
/// Then, to add more hops to the circuit, you can call
129
/// [`extend()`](ClientCirc::extend) on it.
130
///
131
/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
132
/// `tor-proto` are probably not what you need.
133
///
134
/// After a circuit is created, it will persist until it is closed in one of
135
/// five ways:
136
///    1. A remote error occurs.
137
///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
138
///       circuit.
139
///    3. The circuit's channel is closed.
140
///    4. Someone calls [`ClientTunnel::terminate`] on the tunnel owning the circuit.
141
///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
142
///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
143
///       circuit from closing until all those streams have gone away.)
144
///
145
/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
146
/// will just be unusable for most purposes.  Most operations on it will fail
147
/// with an error.
148
//
149
// Effectively, this struct contains two Arcs: one for `path` and one for
150
// `control` (which surely has something Arc-like in it).  We cannot unify
151
// these by putting a single Arc around the whole struct, and passing
152
// an Arc strong reference to the `Reactor`, because then `control` would
153
// not be dropped when the last user of the circuit goes away.  We could
154
// make the reactor have a weak reference but weak references are more
155
// expensive to dereference.
156
//
157
// Because of the above, cloning this struct is always going to involve
158
// two atomic refcount changes/checks.  Wrapping it in another Arc would
159
// be overkill.
160
//
161
pub struct ClientCirc {
162
    /// Mutable state shared with the `Reactor`.
163
    pub(super) mutable: Arc<TunnelMutableState>,
164
    /// A unique identifier for this circuit.
165
    unique_id: UniqId,
166
    /// Channel to send control messages to the reactor.
167
    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
168
    /// Channel to send commands to the reactor.
169
    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
170
    /// A future that resolves to Cancelled once the reactor is shut down,
171
    /// meaning that the circuit is closed.
172
    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
173
    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
174
    /// For testing purposes: the CircId, for use in peek_circid().
175
    #[cfg(test)]
176
    circid: CircId,
177
    /// Memory quota account
178
    pub(super) memquota: CircuitAccount,
179
    /// Time provider
180
    pub(super) time_provider: DynTimeProvider,
181
    /// Indicate if this reactor is a multi path or not. This is flagged at the very first
182
    /// LinkCircuit seen and never changed after.
183
    ///
184
    /// We can't just look at the number of legs because a multi path tunnel could have 1 leg only
185
    /// because the other(s) have collapsed.
186
    ///
187
    /// This is very important because it allows to make a quick efficient safety check by the
188
    /// circmgr higher level tunnel type without locking the mutable state or using the command
189
    /// channel.
190
    pub(super) is_multi_path: bool,
191
}
192

            
193
/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
194
///
195
/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
196
/// but it is currently the best option we have for sharing
197
/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
198
/// In practice, these mutexes won't be accessed very often
199
/// (they're accessed for writing when a circuit is extended,
200
/// and for reading by the various `ClientCirc` APIs),
201
/// so they shouldn't really impact performance.
202
///
203
/// Alternatively, the circuit state information could be shared
204
/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
205
/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
206
/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
207
/// asynchronous, which will significantly complicate their callsites,
208
/// which would in turn need to be made async too.
209
///
210
/// We should revisit this decision at some point, and decide whether an async API
211
/// would be preferable.
212
#[derive(Debug, Default)]
213
pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
214

            
215
impl TunnelMutableState {
216
    /// Add the [`MutableState`] of a circuit.
217
428
    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
218
        #[allow(unused)] // unused in non-debug builds
219
428
        let state = self
220
428
            .0
221
428
            .lock()
222
428
            .expect("lock poisoned")
223
428
            .insert(unique_id, mutable);
224

            
225
428
        debug_assert!(state.is_none());
226
428
    }
227

            
228
    /// Remove the [`MutableState`] of a circuit.
229
34
    pub(super) fn remove(&self, unique_id: UniqId) {
230
        #[allow(unused)] // unused in non-debug builds
231
34
        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
232

            
233
34
        debug_assert!(state.is_some());
234
34
    }
235

            
236
    /// Return a [`Path`] object describing all the circuits in this tunnel.
237
48
    fn all_paths(&self) -> Vec<Arc<Path>> {
238
48
        let lock = self.0.lock().expect("lock poisoned");
239
72
        lock.values().map(|mutable| mutable.path()).collect()
240
48
    }
241

            
242
    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
243
    ///
244
    /// Returns an error if the tunnel has more than one tunnel.
245
    //
246
    // TODO: replace Itertools::exactly_one() with a stdlib equivalent when there is one.
247
    //
248
    // See issue #48919 <https://github.com/rust-lang/rust/issues/48919>
249
    #[allow(unstable_name_collisions)]
250
48
    fn single_path(&self) -> Result<Arc<Path>> {
251
        use itertools::Itertools as _;
252

            
253
48
        self.all_paths().into_iter().exactly_one().map_err(|_| {
254
            bad_api_usage!("requested the single path of a multi-path tunnel?!").into()
255
        })
256
48
    }
257

            
258
    /// Return a description of the first hop of this circuit.
259
    ///
260
    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
261
    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
262
    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
263
        let lock = self.0.lock().expect("lock poisoned");
264
        let mutable = lock
265
            .get(&unique_id)
266
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
267

            
268
        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
269
            path::HopDetail::Relay(r) => r,
270
            #[cfg(feature = "hs-common")]
271
            path::HopDetail::Virtual => {
272
                panic!("somehow made a circuit with a virtual first hop.")
273
            }
274
        });
275

            
276
        Ok(first_hop)
277
    }
278

            
279
    /// Return the [`HopNum`] of the last hop of the specified circuit.
280
    ///
281
    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
282
    ///
283
    /// See [`MutableState::last_hop_num`].
284
    pub(super) fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
285
        let lock = self.0.lock().expect("lock poisoned");
286
        let mutable = lock
287
            .get(&unique_id)
288
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
289

            
290
        Ok(mutable.last_hop_num())
291
    }
292

            
293
    /// Return the number of hops in the specified circuit.
294
    ///
295
    /// See [`MutableState::n_hops`].
296
120
    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
297
120
        let lock = self.0.lock().expect("lock poisoned");
298
120
        let mutable = lock
299
120
            .get(&unique_id)
300
120
            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
301

            
302
120
        Ok(mutable.n_hops())
303
120
    }
304
}
305

            
306
/// The mutable state of a circuit.
307
#[derive(Educe, Default)]
308
#[educe(Debug)]
309
pub(super) struct MutableState(Mutex<CircuitState>);
310

            
311
impl MutableState {
312
    /// Add a hop to the path of this circuit.
313
1020
    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
314
1020
        let mut mutable = self.0.lock().expect("poisoned lock");
315
1020
        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
316
1020
        mutable.binding.push(binding);
317
1020
    }
318

            
319
    /// Get a copy of the circuit's current [`path::Path`].
320
436
    pub(super) fn path(&self) -> Arc<path::Path> {
321
436
        let mutable = self.0.lock().expect("poisoned lock");
322
436
        Arc::clone(&mutable.path)
323
436
    }
324

            
325
    /// Return the cryptographic material used to prove knowledge of a shared
326
    /// secret with with `hop`.
327
    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
328
        let mutable = self.0.lock().expect("poisoned lock");
329

            
330
        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
331
        // NOTE: I'm not thrilled to have to copy this information, but we use
332
        // it very rarely, so it's not _that_ bad IMO.
333
    }
334

            
335
    /// Return a description of the first hop of this circuit.
336
    fn first_hop(&self) -> Option<HopDetail> {
337
        let mutable = self.0.lock().expect("poisoned lock");
338
        mutable.path.first_hop()
339
    }
340

            
341
    /// Return the [`HopNum`] of the last hop of this circuit.
342
    ///
343
    /// NOTE: This function will return the [`HopNum`] of the hop
344
    /// that is _currently_ the last. If there is an extend operation in progress,
345
    /// the currently pending hop may or may not be counted, depending on whether
346
    /// the extend operation finishes before this call is done.
347
    fn last_hop_num(&self) -> Option<HopNum> {
348
        let mutable = self.0.lock().expect("poisoned lock");
349
        mutable.path.last_hop_num()
350
    }
351

            
352
    /// Return the number of hops in this circuit.
353
    ///
354
    /// NOTE: This function will currently return only the number of hops
355
    /// _currently_ in the circuit. If there is an extend operation in progress,
356
    /// the currently pending hop may or may not be counted, depending on whether
357
    /// the extend operation finishes before this call is done.
358
120
    fn n_hops(&self) -> usize {
359
120
        let mutable = self.0.lock().expect("poisoned lock");
360
120
        mutable.path.n_hops()
361
120
    }
362
}
363

            
364
/// The shared state of a circuit.
365
#[derive(Educe, Default)]
366
#[educe(Debug)]
367
pub(super) struct CircuitState {
368
    /// Information about this circuit's path.
369
    ///
370
    /// This is stored in an Arc so that we can cheaply give a copy of it to
371
    /// client code; when we need to add a hop (which is less frequent) we use
372
    /// [`Arc::make_mut()`].
373
    path: Arc<path::Path>,
374

            
375
    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
376
    /// in the circuit's path.
377
    ///
378
    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
379
    /// fair chance that this will change in the future, and I don't want other
380
    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
381
    /// an `Option`.
382
    #[educe(Debug(ignore))]
383
    binding: Vec<Option<CircuitBinding>>,
384
}
385

            
386
/// A ClientCirc that needs to send a create cell and receive a created* cell.
387
///
388
/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
389
/// to negotiate the cryptographic handshake with the first hop.
390
pub struct PendingClientTunnel {
391
    /// A oneshot receiver on which we'll receive a CREATED* cell,
392
    /// or a DESTROY cell.
393
    recvcreated: oneshot::Receiver<CreateResponse>,
394
    /// The ClientCirc object that we can expose on success.
395
    circ: ClientCirc,
396
}
397

            
398
impl ClientCirc {
399
    /// Convert this `ClientCirc` into a single circuit [`ClientTunnel`].
400
364
    pub fn into_tunnel(self) -> Result<ClientTunnel> {
401
364
        self.try_into()
402
364
    }
403

            
404
    /// Return a description of the first hop of this circuit.
405
    ///
406
    /// # Panics
407
    ///
408
    /// Panics if there is no first hop.  (This should be impossible outside of
409
    /// the tor-proto crate, but within the crate it's possible to have a
410
    /// circuit with no hops.)
411
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
412
        Ok(self
413
            .mutable
414
            .first_hop(self.unique_id)
415
            .map_err(|_| Error::CircuitClosed)?
416
            .expect("called first_hop on an un-constructed circuit"))
417
    }
418

            
419
    /// Return a description of the last hop of the tunnel.
420
    ///
421
    /// Return None if the last hop is virtual.
422
    ///
423
    /// # Panics
424
    ///
425
    /// Panics if there is no last hop.  (This should be impossible outside of
426
    /// the tor-proto crate, but within the crate it's possible to have a
427
    /// circuit with no hops.)
428
    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
429
        let all_paths = self.all_paths();
430
        let path = all_paths.first().ok_or_else(|| {
431
            tor_error::bad_api_usage!("Called last_hop_info an an un-constructed tunnel")
432
        })?;
433
        Ok(path
434
            .hops()
435
            .last()
436
            .expect("Called last_hop an an un-constructed circuit")
437
            .as_chan_target()
438
            .map(OwnedChanTarget::from_chan_target))
439
    }
440

            
441
    /// Return the [`HopNum`] of the last hop of this circuit.
442
    ///
443
    /// Returns an error if there is no last hop.  (This should be impossible outside of the
444
    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
445
    ///
446
    /// NOTE: This function will return the [`HopNum`] of the hop
447
    /// that is _currently_ the last. If there is an extend operation in progress,
448
    /// the currently pending hop may or may not be counted, depending on whether
449
    /// the extend operation finishes before this call is done.
450
    pub fn last_hop_num(&self) -> Result<HopNum> {
451
        Ok(self
452
            .mutable
453
            .last_hop_num(self.unique_id)?
454
            .ok_or_else(|| internal!("no last hop index"))?)
455
    }
456

            
457
    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
458
    /// HopLocation with its id and hop number.
459
    ///
460
    /// Return an error if there is no last hop.
461
    pub fn last_hop(&self) -> Result<TargetHop> {
462
        let hop_num = self
463
            .mutable
464
            .last_hop_num(self.unique_id)?
465
            .ok_or_else(|| bad_api_usage!("no last hop"))?;
466
        Ok((self.unique_id, hop_num).into())
467
    }
468

            
469
    /// Return a list of [`Path`] objects describing all the circuits in this tunnel.
470
    ///
471
    /// Note that these `Path`s are not automatically updated if the underlying
472
    /// circuits are extended.
473
    pub fn all_paths(&self) -> Vec<Arc<Path>> {
474
        self.mutable.all_paths()
475
    }
476

            
477
    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
478
    ///
479
    /// Returns an error if the tunnel has more than one tunnel.
480
48
    pub fn single_path(&self) -> Result<Arc<Path>> {
481
48
        self.mutable.single_path()
482
48
    }
483

            
484
    /// Return the time at which this circuit last had any open streams.
485
    ///
486
    /// Returns `None` if this circuit has never had any open streams,
487
    /// or if it currently has open streams.
488
    ///
489
    /// NOTE that the Instant returned by this method is not affected by
490
    /// any runtime mocking; it is the output of an ordinary call to
491
    /// `Instant::now()`.
492
    pub async fn disused_since(&self) -> Result<Option<std::time::Instant>> {
493
        let (tx, rx) = oneshot::channel();
494
        self.command
495
            .unbounded_send(CtrlCmd::GetTunnelActivity { sender: tx })
496
            .map_err(|_| Error::CircuitClosed)?;
497

            
498
        Ok(rx.await.map_err(|_| Error::CircuitClosed)?.disused_since())
499
    }
500

            
501
    /// Get the clock skew claimed by the first hop of the circuit.
502
    ///
503
    /// See [`Channel::clock_skew()`].
504
    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
505
        let (tx, rx) = oneshot::channel();
506

            
507
        self.control
508
            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
509
            .map_err(|_| Error::CircuitClosed)?;
510

            
511
        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
512
    }
513

            
514
    /// Return a reference to this circuit's memory quota account
515
96
    pub fn mq_account(&self) -> &CircuitAccount {
516
96
        &self.memquota
517
96
    }
518

            
519
    /// Return the cryptographic material used to prove knowledge of a shared
520
    /// secret with with `hop`.
521
    ///
522
    /// See [`CircuitBinding`] for more information on how this is used.
523
    ///
524
    /// Return None if we have no circuit binding information for the hop, or if
525
    /// the hop does not exist.
526
    #[cfg(feature = "hs-service")]
527
    pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
528
        let (sender, receiver) = oneshot::channel();
529
        let msg = CtrlCmd::GetBindingKey { hop, done: sender };
530
        self.command
531
            .unbounded_send(msg)
532
            .map_err(|_| Error::CircuitClosed)?;
533

            
534
        receiver.await.map_err(|_| Error::CircuitClosed)?
535
    }
536

            
537
    /// Extend the circuit, via the most appropriate circuit extension handshake,
538
    /// to the chosen `target` hop.
539
    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
540
    where
541
        Tg: CircTarget,
542
    {
543
        #![allow(deprecated)]
544

            
545
        // For now we use the simplest decision-making mechanism:
546
        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
547
        //
548
        // This behavior is slightly different from C tor, which uses ntor v3
549
        // only whenever it want to send any extension in the circuit message.
550
        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
551
        // want to use an extension if we can, and so it doesn't make too much
552
        // sense to detect the case where we have no extensions.
553
        //
554
        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
555
        // on the tor network, and so we cannot simply assume that everybody has it.)
556
        if target
557
            .protovers()
558
            .supports_named_subver(named::RELAY_NTORV3)
559
        {
560
            self.extend_ntor_v3(target, params).await
561
        } else {
562
            self.extend_ntor(target, params).await
563
        }
564
    }
565

            
566
    /// Extend the circuit via the ntor handshake to a new target last
567
    /// hop.
568
    #[deprecated(since = "1.6.1", note = "Use extend instead.")]
569
60
    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
570
60
    where
571
60
        Tg: CircTarget,
572
60
    {
573
60
        let key = NtorPublicKey {
574
60
            id: *target
575
60
                .rsa_identity()
576
60
                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
577
60
            pk: *target.ntor_onion_key(),
578
        };
579
60
        let mut linkspecs = target
580
60
            .linkspecs()
581
60
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
582
60
        if !params.extend_by_ed25519_id {
583
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
584
60
        }
585

            
586
60
        let (tx, rx) = oneshot::channel();
587

            
588
60
        let peer_id = OwnedChanTarget::from_chan_target(target);
589
60
        let settings = HopSettings::from_params_and_caps(
590
60
            HopNegotiationType::None,
591
60
            &params,
592
60
            target.protovers(),
593
        )?;
594
60
        self.control
595
60
            .unbounded_send(CtrlMsg::ExtendNtor {
596
60
                peer_id,
597
60
                public_key: key,
598
60
                linkspecs,
599
60
                settings,
600
60
                done: tx,
601
60
            })
602
60
            .map_err(|_| Error::CircuitClosed)?;
603

            
604
60
        rx.await.map_err(|_| Error::CircuitClosed)??;
605

            
606
12
        Ok(())
607
60
    }
608

            
609
    /// Extend the circuit via the ntor handshake to a new target last
610
    /// hop.
611
    #[deprecated(since = "1.6.1", note = "Use extend instead.")]
612
12
    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
613
12
    where
614
12
        Tg: CircTarget,
615
12
    {
616
12
        let key = NtorV3PublicKey {
617
12
            id: *target
618
12
                .ed_identity()
619
12
                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
620
12
            pk: *target.ntor_onion_key(),
621
        };
622
12
        let mut linkspecs = target
623
12
            .linkspecs()
624
12
            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
625
12
        if !params.extend_by_ed25519_id {
626
            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
627
12
        }
628

            
629
12
        let (tx, rx) = oneshot::channel();
630

            
631
12
        let peer_id = OwnedChanTarget::from_chan_target(target);
632
12
        let settings = HopSettings::from_params_and_caps(
633
12
            HopNegotiationType::Full,
634
12
            &params,
635
12
            target.protovers(),
636
        )?;
637
12
        self.control
638
12
            .unbounded_send(CtrlMsg::ExtendNtorV3 {
639
12
                peer_id,
640
12
                public_key: key,
641
12
                linkspecs,
642
12
                settings,
643
12
                done: tx,
644
12
            })
645
12
            .map_err(|_| Error::CircuitClosed)?;
646

            
647
12
        rx.await.map_err(|_| Error::CircuitClosed)??;
648

            
649
12
        Ok(())
650
12
    }
651

            
652
    /// Extend this circuit by a single, "virtual" hop.
653
    ///
654
    /// A virtual hop is one for which we do not add an actual network connection
655
    /// between separate hosts (such as Relays).  We only add a layer of
656
    /// cryptography.
657
    ///
658
    /// This is used to implement onion services: the client and the service
659
    /// both build a circuit to a single rendezvous point, and tell the
660
    /// rendezvous point to relay traffic between their two circuits.  Having
661
    /// completed a [`handshake`] out of band[^1], the parties each extend their
662
    /// circuits by a single "virtual" encryption hop that represents their
663
    /// shared cryptographic context.
664
    ///
665
    /// Once a circuit has been extended in this way, it is an error to try to
666
    /// extend it in any other way.
667
    ///
668
    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
669
    ///     client sends their half of the handshake in an ` message, and the
670
    ///     service's response is inline in its `RENDEZVOUS2` message.
671
    //
672
    // TODO hs: let's try to enforce the "you can't extend a circuit again once
673
    // it has been extended this way" property.  We could do that with internal
674
    // state, or some kind of a type state pattern.
675
    #[cfg(feature = "hs-common")]
676
    pub async fn extend_virtual(
677
        &self,
678
        protocol: handshake::RelayProtocol,
679
        role: handshake::HandshakeRole,
680
        seed: impl handshake::KeyGenerator,
681
        params: &CircParameters,
682
        capabilities: &tor_protover::Protocols,
683
    ) -> Result<()> {
684
        use self::handshake::BoxedClientLayer;
685

            
686
        // TODO CGO: Possibly refactor this match into a separate method when we revisit this.
687
        let negotiation_type = match protocol {
688
            handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
689
        };
690
        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
691

            
692
        let BoxedClientLayer { fwd, back, binding } =
693
            protocol.construct_client_layers(role, seed)?;
694

            
695
        let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
696
        let (tx, rx) = oneshot::channel();
697
        let message = CtrlCmd::ExtendVirtual {
698
            cell_crypto: (fwd, back, binding),
699
            settings,
700
            done: tx,
701
        };
702

            
703
        self.command
704
            .unbounded_send(message)
705
            .map_err(|_| Error::CircuitClosed)?;
706

            
707
        rx.await.map_err(|_| Error::CircuitClosed)?
708
    }
709

            
710
    /// Install a [`CircuitPadder`] at the listed `hop`.
711
    ///
712
    /// Replaces any previous padder installed at that hop.
713
    #[cfg(feature = "circ-padding-manual")]
714
    pub async fn start_padding_at_hop(&self, hop: HopNum, padder: CircuitPadder) -> Result<()> {
715
        self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), Some(padder))
716
            .await
717
    }
718

            
719
    /// Remove any [`CircuitPadder`] at the listed `hop`.
720
    ///
721
    /// Does nothing if there was not a padder installed there.
722
    #[cfg(feature = "circ-padding-manual")]
723
    pub async fn stop_padding_at_hop(&self, hop: HopNum) -> Result<()> {
724
        self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), None)
725
            .await
726
    }
727

            
728
    /// Helper: replace the padder at `hop` with the provided `padder`, or with `None`.
729
    #[cfg(feature = "circ-padding-manual")]
730
    pub(super) async fn set_padder_impl(
731
        &self,
732
        hop: crate::HopLocation,
733
        padder: Option<CircuitPadder>,
734
    ) -> Result<()> {
735
        let (tx, rx) = oneshot::channel();
736
        let msg = CtrlCmd::SetPadder {
737
            hop,
738
            padder,
739
            sender: tx,
740
        };
741
        self.command
742
            .unbounded_send(msg)
743
            .map_err(|_| Error::CircuitClosed)?;
744
        rx.await.map_err(|_| Error::CircuitClosed)?
745
    }
746

            
747
    /// Return true if this circuit is closed and therefore unusable.
748
128
    pub fn is_closing(&self) -> bool {
749
128
        self.control.is_closed()
750
128
    }
751

            
752
    /// Return a process-unique identifier for this circuit.
753
144
    pub fn unique_id(&self) -> UniqId {
754
144
        self.unique_id
755
144
    }
756

            
757
    /// Return the number of hops in this circuit.
758
    ///
759
    /// NOTE: This function will currently return only the number of hops
760
    /// _currently_ in the circuit. If there is an extend operation in progress,
761
    /// the currently pending hop may or may not be counted, depending on whether
762
    /// the extend operation finishes before this call is done.
763
120
    pub fn n_hops(&self) -> Result<usize> {
764
120
        self.mutable
765
120
            .n_hops(self.unique_id)
766
120
            .map_err(|_| Error::CircuitClosed)
767
120
    }
768

            
769
    /// Return a future that will resolve once this circuit has closed.
770
    ///
771
    /// Note that this method does not itself cause the circuit to shut down.
772
    ///
773
    /// TODO: Perhaps this should return some kind of status indication instead
774
    /// of just ()
775
    pub fn wait_for_close(
776
        &self,
777
    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
778
        self.reactor_closed_rx.clone().map(|_| ())
779
    }
780
}
781

            
782
impl PendingClientTunnel {
783
    /// Instantiate a new circuit object: used from Channel::new_tunnel().
784
    ///
785
    /// Does not send a CREATE* cell on its own.
786
    #[allow(clippy::too_many_arguments)]
787
376
    pub(crate) fn new(
788
376
        id: CircId,
789
376
        channel: Arc<Channel>,
790
376
        createdreceiver: oneshot::Receiver<CreateResponse>,
791
376
        input: CircuitRxReceiver,
792
376
        unique_id: UniqId,
793
376
        runtime: DynTimeProvider,
794
376
        memquota: CircuitAccount,
795
376
        padding_ctrl: PaddingController,
796
376
        padding_stream: PaddingEventStream,
797
376
        timeouts: Arc<dyn TimeoutEstimator>,
798
376
    ) -> (PendingClientTunnel, crate::client::reactor::Reactor) {
799
376
        let time_provider = channel.time_provider().clone();
800
376
        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) = Reactor::new(
801
376
            channel,
802
376
            id,
803
376
            unique_id,
804
376
            input,
805
376
            runtime,
806
376
            memquota.clone(),
807
376
            padding_ctrl,
808
376
            padding_stream,
809
376
            timeouts,
810
376
        );
811

            
812
376
        let circuit = ClientCirc {
813
376
            mutable,
814
376
            unique_id,
815
376
            control: control_tx,
816
376
            command: command_tx,
817
376
            reactor_closed_rx: reactor_closed_rx.shared(),
818
376
            #[cfg(test)]
819
376
            circid: id,
820
376
            memquota,
821
376
            time_provider,
822
376
            is_multi_path: false,
823
376
        };
824

            
825
376
        let pending = PendingClientTunnel {
826
376
            recvcreated: createdreceiver,
827
376
            circ: circuit,
828
376
        };
829
376
        (pending, reactor)
830
376
    }
831

            
832
    /// Extract the process-unique identifier for this pending circuit.
833
    pub fn peek_unique_id(&self) -> UniqId {
834
        self.circ.unique_id
835
    }
836

            
837
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
838
    /// first hop of this circuit.
839
    ///
840
    /// There's no authentication in CRATE_FAST,
841
    /// so we don't need to know whom we're connecting to: we're just
842
    /// connecting to whichever relay the channel is for.
843
18
    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<ClientTunnel> {
844
        // We know nothing about this relay, so we assume it supports no protocol capabilities at all.
845
        //
846
        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
847
        // TODO prop364: When we implement CreateOneHop, we will want a Protocols argument here.
848
12
        let protocols = tor_protover::Protocols::new();
849
12
        let settings =
850
12
            HopSettings::from_params_and_caps(HopNegotiationType::None, &params, &protocols)?;
851
12
        let (tx, rx) = oneshot::channel();
852
12
        self.circ
853
12
            .control
854
12
            .unbounded_send(CtrlMsg::Create {
855
12
                recv_created: self.recvcreated,
856
12
                handshake: CircuitHandshake::CreateFast,
857
12
                settings,
858
12
                done: tx,
859
12
            })
860
12
            .map_err(|_| Error::CircuitClosed)?;
861

            
862
12
        rx.await.map_err(|_| Error::CircuitClosed)??;
863

            
864
12
        self.circ.into_tunnel()
865
12
    }
866

            
867
    /// Use the most appropriate handshake to connect to the first hop of this circuit.
868
    ///
869
    /// Note that the provided 'target' must match the channel's target,
870
    /// or the handshake will fail.
871
    pub async fn create_firsthop<Tg>(
872
        self,
873
        target: &Tg,
874
        params: CircParameters,
875
    ) -> Result<ClientTunnel>
876
    where
877
        Tg: tor_linkspec::CircTarget,
878
    {
879
        #![allow(deprecated)]
880
        // (See note in ClientCirc::extend.)
881
        if target
882
            .protovers()
883
            .supports_named_subver(named::RELAY_NTORV3)
884
        {
885
            self.create_firsthop_ntor_v3(target, params).await
886
        } else {
887
            self.create_firsthop_ntor(target, params).await
888
        }
889
    }
890

            
891
    /// Use the ntor handshake to connect to the first hop of this circuit.
892
    ///
893
    /// Note that the provided 'target' must match the channel's target,
894
    /// or the handshake will fail.
895
    #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
896
12
    pub async fn create_firsthop_ntor<Tg>(
897
12
        self,
898
12
        target: &Tg,
899
12
        params: CircParameters,
900
12
    ) -> Result<ClientTunnel>
901
12
    where
902
12
        Tg: tor_linkspec::CircTarget,
903
12
    {
904
12
        let (tx, rx) = oneshot::channel();
905
12
        let settings = HopSettings::from_params_and_caps(
906
12
            HopNegotiationType::None,
907
12
            &params,
908
12
            target.protovers(),
909
        )?;
910

            
911
12
        self.circ
912
12
            .control
913
12
            .unbounded_send(CtrlMsg::Create {
914
12
                recv_created: self.recvcreated,
915
                handshake: CircuitHandshake::Ntor {
916
                    public_key: NtorPublicKey {
917
12
                        id: *target
918
12
                            .rsa_identity()
919
12
                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
920
12
                        pk: *target.ntor_onion_key(),
921
                    },
922
12
                    ed_identity: *target
923
12
                        .ed_identity()
924
12
                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
925
                },
926
12
                settings,
927
12
                done: tx,
928
            })
929
12
            .map_err(|_| Error::CircuitClosed)?;
930

            
931
12
        rx.await.map_err(|_| Error::CircuitClosed)??;
932

            
933
12
        self.circ.into_tunnel()
934
12
    }
935

            
936
    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
937
    ///
938
    /// Assumes that the target supports ntor_v3. The caller should verify
939
    /// this before calling this function, e.g. by validating that the target
940
    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
941
    ///
942
    /// Note that the provided 'target' must match the channel's target,
943
    /// or the handshake will fail.
944
    #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
945
24
    pub async fn create_firsthop_ntor_v3<Tg>(
946
24
        self,
947
24
        target: &Tg,
948
24
        params: CircParameters,
949
24
    ) -> Result<ClientTunnel>
950
24
    where
951
24
        Tg: tor_linkspec::CircTarget,
952
24
    {
953
24
        let settings = HopSettings::from_params_and_caps(
954
24
            HopNegotiationType::Full,
955
24
            &params,
956
24
            target.protovers(),
957
        )?;
958
24
        let (tx, rx) = oneshot::channel();
959

            
960
24
        self.circ
961
24
            .control
962
24
            .unbounded_send(CtrlMsg::Create {
963
24
                recv_created: self.recvcreated,
964
                handshake: CircuitHandshake::NtorV3 {
965
                    public_key: NtorV3PublicKey {
966
24
                        id: *target
967
24
                            .ed_identity()
968
24
                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
969
24
                        pk: *target.ntor_onion_key(),
970
                    },
971
                },
972
24
                settings,
973
24
                done: tx,
974
            })
975
24
            .map_err(|_| Error::CircuitClosed)?;
976

            
977
24
        rx.await.map_err(|_| Error::CircuitClosed)??;
978

            
979
24
        self.circ.into_tunnel()
980
24
    }
981
}
982

            
983
#[cfg(test)]
984
pub(crate) mod test {
985
    // @@ begin test lint list maintained by maint/add_warning @@
986
    #![allow(clippy::bool_assert_comparison)]
987
    #![allow(clippy::clone_on_copy)]
988
    #![allow(clippy::dbg_macro)]
989
    #![allow(clippy::mixed_attributes_style)]
990
    #![allow(clippy::print_stderr)]
991
    #![allow(clippy::print_stdout)]
992
    #![allow(clippy::single_char_pattern)]
993
    #![allow(clippy::unwrap_used)]
994
    #![allow(clippy::unchecked_time_subtraction)]
995
    #![allow(clippy::useless_vec)]
996
    #![allow(clippy::needless_pass_by_value)]
997
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
998

            
999
    use super::*;
    use crate::channel::test::{CodecResult, new_reactor};
    use crate::circuit::CircuitRxSender;
    use crate::client::circuit::padding::new_padding;
    use crate::client::stream::DataStream;
    #[cfg(feature = "hs-service")]
    use crate::client::stream::IncomingStreamRequestFilter;
    use crate::congestion::params::CongestionControlParams;
    use crate::congestion::test_utils::params::build_cc_vegas_params;
    use crate::crypto::cell::RelayCellBody;
    use crate::crypto::handshake::ntor_v3::NtorV3Server;
    use crate::memquota::SpecificAccount as _;
    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
    use crate::util::DummyTimeoutEstimator;
    use assert_matches::assert_matches;
    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
    use futures::channel::mpsc::{Receiver, Sender};
    use futures::io::{AsyncReadExt, AsyncWriteExt};
    use futures::sink::SinkExt;
    use futures::stream::StreamExt;
    use hex_literal::hex;
    use std::collections::{HashMap, VecDeque};
    use std::fmt::Debug;
    use std::time::Duration;
    use tor_basic_utils::test_rng::testing_rng;
    use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCell, ChanCmd, msg as chanmsg};
    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
    use tor_cell::relaycell::msg::SendmeTag;
    use tor_cell::relaycell::{
        AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg, msg::AnyRelayMsg,
    };
    use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
    use tor_linkspec::OwnedCircTarget;
    use tor_memquota::HasMemoryCost;
    use tor_rtcompat::Runtime;
    use tor_rtcompat::SpawnExt;
    use tracing::trace;
    use tracing_test::traced_test;
    #[cfg(feature = "conflux")]
    use {
        crate::client::reactor::ConfluxHandshakeResult,
        crate::util::err::ConfluxHandshakeError,
        futures::future::FusedFuture,
        futures::lock::Mutex as AsyncMutex,
        std::pin::Pin,
        std::result::Result as StdResult,
        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
        tor_cell::relaycell::msg::ConfluxLink,
        tor_rtmock::MockRuntime,
    };
    impl PendingClientTunnel {
        /// Testing only: Extract the circuit ID for this pending circuit.
        pub(crate) fn peek_circid(&self) -> CircId {
            self.circ.circid
        }
    }
    impl ClientCirc {
        /// Testing only: Extract the circuit ID of this circuit.
        pub(crate) fn peek_circid(&self) -> CircId {
            self.circid
        }
    }
    impl ClientTunnel {
        pub(crate) async fn resolve_last_hop(&self) -> TargetHop {
            let (sender, receiver) = oneshot::channel();
            let _ =
                self.as_single_circ()
                    .unwrap()
                    .command
                    .unbounded_send(CtrlCmd::ResolveTargetHop {
                        hop: TargetHop::LastHop,
                        done: sender,
                    });
            TargetHop::Hop(receiver.await.unwrap().unwrap())
        }
    }
    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> AnyChanMsg {
        // TODO #1947: test other formats.
        let rfmt = RelayCellFormat::V0;
        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
            .encode(rfmt, &mut testing_rng())
            .unwrap();
        let chanmsg = chanmsg::Relay::from(body);
        AnyChanMsg::Relay(chanmsg)
    }
    // Example relay IDs and keys
    const EXAMPLE_SK: [u8; 32] =
        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
    const EXAMPLE_PK: [u8; 32] =
        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
    #[cfg(test)]
    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
        buffer: usize,
    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
        crate::fake_mpsc(buffer)
    }
    /// return an example OwnedCircTarget that can get used for an ntor handshake.
    fn example_target() -> OwnedCircTarget {
        let mut builder = OwnedCircTarget::builder();
        builder
            .chan_target()
            .ed_identity(EXAMPLE_ED_ID.into())
            .rsa_identity(EXAMPLE_RSA_ID.into());
        builder
            .ntor_onion_key(EXAMPLE_PK.into())
            .protocols("FlowCtrl=1-2".parse().unwrap())
            .build()
            .unwrap()
    }
    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
        crate::crypto::handshake::ntor::NtorSecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_RSA_ID.into(),
        )
    }
    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
            EXAMPLE_SK.into(),
            EXAMPLE_PK.into(),
            EXAMPLE_ED_ID.into(),
        )
    }
    fn working_fake_channel<R: Runtime>(
        rt: &R,
    ) -> (Arc<Channel>, Receiver<AnyChanCell>, Sender<CodecResult>) {
        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
        rt.spawn(async {
            let _ignore = chan_reactor.run().await;
        })
        .unwrap();
        (channel, rx, tx)
    }
    /// Which handshake type to use.
    #[derive(Copy, Clone)]
    enum HandshakeType {
        Fast,
        Ntor,
        NtorV3,
    }
    #[allow(deprecated)]
    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
        // We want to try progressing from a pending circuit to a circuit
        // via a crate_fast handshake.
        use crate::crypto::handshake::{ServerHandshake, fast::CreateFastServer, ntor::NtorServer};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let circid = CircId::new(128).unwrap();
        let (created_send, created_recv) = oneshot::channel();
        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
        let unique_id = UniqId::new(23, 17);
        let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
        let (pending, reactor) = PendingClientTunnel::new(
            circid,
            chan,
            created_recv,
            circmsg_recv,
            unique_id,
            DynTimeProvider::new(rt.clone()),
            CircuitAccount::new_noop(),
            padding_ctrl,
            padding_stream,
            Arc::new(DummyTimeoutEstimator),
        );
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        // Future to pretend to be a relay on the other end of the circuit.
        let simulate_relay_fut = async move {
            let mut rng = testing_rng();
            let create_cell = rx.next().await.unwrap();
            assert_eq!(create_cell.circid(), Some(circid));
            let reply = match handshake_type {
                HandshakeType::Fast => {
                    let cf = match create_cell.msg() {
                        AnyChanMsg::CreateFast(cf) => cf,
                        other => panic!("{:?}", other),
                    };
                    let (_, rep) = CreateFastServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[()],
                        cf.handshake(),
                    )
                    .unwrap();
                    CreateResponse::CreatedFast(CreatedFast::new(rep))
                }
                HandshakeType::Ntor => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        other => panic!("{:?}", other),
                    };
                    let (_, rep) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
                HandshakeType::NtorV3 => {
                    let c2 = match create_cell.msg() {
                        AnyChanMsg::Create2(c2) => c2,
                        other => panic!("{:?}", other),
                    };
                    let mut reply_fn = if with_cc {
                        |client_exts: &[CircRequestExt]| {
                            let _ = client_exts
                                .iter()
                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
                                .expect("Client failed to request CC");
                            // This needs to be aligned to test_utils params
                            // value due to validation that needs it in range.
                            Some(vec![CircResponseExt::CcResponse(
                                extend_ext::CcResponse::new(31),
                            )])
                        }
                    } else {
                        |_: &_| Some(vec![])
                    };
                    let (_, rep) = NtorV3Server::server(
                        &mut rng,
                        &mut reply_fn,
                        &[example_ntor_v3_key()],
                        c2.body(),
                    )
                    .unwrap();
                    CreateResponse::Created2(Created2::new(rep))
                }
            };
            created_send.send(reply).unwrap();
        };
        // Future to pretend to be a client.
        let client_fut = async move {
            let target = example_target();
            let params = CircParameters::default();
            let ret = match handshake_type {
                HandshakeType::Fast => {
                    trace!("doing fast create");
                    pending.create_firsthop_fast(params).await
                }
                HandshakeType::Ntor => {
                    trace!("doing ntor create");
                    pending.create_firsthop_ntor(&target, params).await
                }
                HandshakeType::NtorV3 => {
                    let params = if with_cc {
                        // Setup CC vegas parameters.
                        CircParameters::new(
                            true,
                            build_cc_vegas_params(),
                            FlowCtrlParameters::defaults_for_tests(),
                        )
                    } else {
                        params
                    };
                    trace!("doing ntor_v3 create");
                    pending.create_firsthop_ntor_v3(&target, params).await
                }
            };
            trace!("create done: result {:?}", ret);
            ret
        };
        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
        let _circ = circ.unwrap();
        // pfew!  We've build a circuit!  Let's make sure it has one hop.
        assert_eq!(_circ.n_hops().unwrap(), 1);
    }
    #[traced_test]
    #[test]
    fn test_create_fast() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Fast, false).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_create_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::Ntor, false).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_create_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3, false).await;
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "flowctl-cc")]
    fn test_create_ntor_v3_with_cc() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_create(&rt, HandshakeType::NtorV3, true).await;
        });
    }
    // An encryption layer that doesn't do any crypto.   Can be used
    // as inbound or outbound, but not both at once.
    pub(crate) struct DummyCrypto {
        counter_tag: [u8; 20],
        counter: u32,
        lasthop: bool,
    }
    impl DummyCrypto {
        fn next_tag(&mut self) -> SendmeTag {
            #![allow(clippy::identity_op)]
            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
            self.counter += 1;
            self.counter_tag.into()
        }
    }
    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
            self.next_tag()
        }
        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
    }
    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
        fn decrypt_inbound(
            &mut self,
            _cmd: ChanCmd,
            _cell: &mut RelayCellBody,
        ) -> Option<SendmeTag> {
            if self.lasthop {
                Some(self.next_tag())
            } else {
                None
            }
        }
    }
    impl DummyCrypto {
        pub(crate) fn new(lasthop: bool) -> Self {
            DummyCrypto {
                counter_tag: [0; 20],
                counter: 0,
                lasthop,
            }
        }
    }
    // Helper: set up a 3-hop circuit with no encryption, where the
    // next inbound message seems to come from hop next_msg_from
    async fn newtunnel_ext<R: Runtime>(
        rt: &R,
        unique_id: UniqId,
        chan: Arc<Channel>,
        hops: Vec<path::HopDetail>,
        next_msg_from: HopNum,
        params: CircParameters,
    ) -> (ClientTunnel, CircuitRxSender) {
        let circid = CircId::new(128).unwrap();
        let (_created_send, created_recv) = oneshot::channel();
        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
        let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
        let (pending, reactor) = PendingClientTunnel::new(
            circid,
            chan,
            created_recv,
            circmsg_recv,
            unique_id,
            DynTimeProvider::new(rt.clone()),
            CircuitAccount::new_noop(),
            padding_ctrl,
            padding_stream,
            Arc::new(DummyTimeoutEstimator),
        );
        rt.spawn(async {
            let _ignore = reactor.run().await;
        })
        .unwrap();
        let PendingClientTunnel {
            circ,
            recvcreated: _,
        } = pending;
        // TODO #1067: Support other formats
        let relay_cell_format = RelayCellFormat::V0;
        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
        for (idx, peer_id) in hops.into_iter().enumerate() {
            let (tx, rx) = oneshot::channel();
            let idx = idx as u8;
            circ.command
                .unbounded_send(CtrlCmd::AddFakeHop {
                    relay_cell_format,
                    fwd_lasthop: idx == last_hop_num,
                    rev_lasthop: idx == u8::from(next_msg_from),
                    peer_id,
                    params: params.clone(),
                    done: tx,
                })
                .unwrap();
            rx.await.unwrap().unwrap();
        }
        (circ.into_tunnel().unwrap(), circmsg_send)
    }
    // Helper: set up a 3-hop circuit with no encryption, where the
    // next inbound message seems to come from hop next_msg_from
    async fn newtunnel<R: Runtime>(
        rt: &R,
        chan: Arc<Channel>,
    ) -> (Arc<ClientTunnel>, CircuitRxSender) {
        let hops = std::iter::repeat_with(|| {
            let peer_id = tor_linkspec::OwnedChanTarget::builder()
                .ed_identity([4; 32].into())
                .rsa_identity([5; 20].into())
                .build()
                .expect("Could not construct fake hop");
            path::HopDetail::Relay(peer_id)
        })
        .take(3)
        .collect();
        let unique_id = UniqId::new(23, 17);
        let (tunnel, circmsg_send) = newtunnel_ext(
            rt,
            unique_id,
            chan,
            hops,
            2.into(),
            CircParameters::default(),
        )
        .await;
        (Arc::new(tunnel), circmsg_send)
    }
    /// Create `n` distinct [`path::HopDetail`]s,
    /// with the specified `start_idx` for the dummy identities.
    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
        (0..n)
            .map(|idx| {
                let peer_id = tor_linkspec::OwnedChanTarget::builder()
                    .ed_identity([idx + start_idx; 32].into())
                    .rsa_identity([idx + start_idx + 1; 20].into())
                    .build()
                    .expect("Could not construct fake hop");
                path::HopDetail::Relay(peer_id)
            })
            .collect()
    }
    #[allow(deprecated)]
    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
        use crate::crypto::handshake::{ServerHandshake, ntor::NtorServer};
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let (tunnel, mut sink) = newtunnel(rt, chan).await;
        let circ = Arc::new(tunnel.as_single_circ().unwrap());
        let circid = circ.peek_circid();
        let params = CircParameters::default();
        let extend_fut = async move {
            let target = example_target();
            match handshake_type {
                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
            };
            circ // gotta keep the circ alive, or the reactor would exit.
        };
        let reply_fut = async move {
            // We've disabled encryption on this circuit, so we can just
            // read the extend2 cell.
            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            assert_eq!(id, Some(circid));
            let rmsg = match chmsg {
                AnyChanMsg::RelayEarly(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let e2 = match rmsg.msg() {
                AnyRelayMsg::Extend2(e2) => e2,
                other => panic!("{:?}", other),
            };
            let mut rng = testing_rng();
            let reply = match handshake_type {
                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
                HandshakeType::Ntor => {
                    let (_keygen, reply) = NtorServer::server(
                        &mut rng,
                        &mut |_: &()| Some(()),
                        &[example_ntor_key()],
                        e2.handshake(),
                    )
                    .unwrap();
                    reply
                }
                HandshakeType::NtorV3 => {
                    let (_keygen, reply) = NtorV3Server::server(
                        &mut rng,
                        &mut |_: &[CircRequestExt]| Some(vec![]),
                        &[example_ntor_v3_key()],
                        e2.handshake(),
                    )
                    .unwrap();
                    reply
                }
            };
            let extended2 = relaymsg::Extended2::new(reply).into();
            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
        };
        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
        // Did we really add another hop?
        assert_eq!(circ.n_hops().unwrap(), 4);
        // Do the path accessors report a reasonable outcome?
        {
            let path = circ.single_path().unwrap();
            let path = path
                .all_hops()
                .filter_map(|hop| match hop {
                    path::HopDetail::Relay(r) => Some(r),
                    #[cfg(feature = "hs-common")]
                    path::HopDetail::Virtual => None,
                })
                .collect::<Vec<_>>();
            assert_eq!(path.len(), 4);
            use tor_linkspec::HasRelayIds;
            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
        }
        {
            let path = circ.single_path().unwrap();
            assert_eq!(path.n_hops(), 4);
            use tor_linkspec::HasRelayIds;
            assert_eq!(
                path.hops()[3].as_chan_target().unwrap().ed_identity(),
                example_target().ed_identity()
            );
            assert_ne!(
                path.hops()[0].as_chan_target().unwrap().ed_identity(),
                example_target().ed_identity()
            );
        }
    }
    #[traced_test]
    #[test]
    fn test_extend_ntor() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_extend(&rt, HandshakeType::Ntor).await;
        });
    }
    #[traced_test]
    #[test]
    fn test_extend_ntor_v3() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            test_extend(&rt, HandshakeType::NtorV3).await;
        });
    }
    #[allow(deprecated)]
    async fn bad_extend_test_impl<R: Runtime>(
        rt: &R,
        reply_hop: HopNum,
        bad_reply: AnyChanMsg,
    ) -> Error {
        let (chan, mut rx, _sink) = working_fake_channel(rt);
        let hops = std::iter::repeat_with(|| {
            let peer_id = tor_linkspec::OwnedChanTarget::builder()
                .ed_identity([4; 32].into())
                .rsa_identity([5; 20].into())
                .build()
                .expect("Could not construct fake hop");
            path::HopDetail::Relay(peer_id)
        })
        .take(3)
        .collect();
        let unique_id = UniqId::new(23, 17);
        let (tunnel, mut sink) = newtunnel_ext(
            rt,
            unique_id,
            chan,
            hops,
            reply_hop,
            CircParameters::default(),
        )
        .await;
        let params = CircParameters::default();
        let target = example_target();
        let reply_task_handle = rt
            .spawn_with_handle(async move {
                // Wait for a cell, and make sure it's EXTEND2.
                let (_circid, chanmsg) = rx.next().await.unwrap().into_circid_and_msg();
                let AnyChanMsg::RelayEarly(relay_early) = chanmsg else {
                    panic!("unexpected message {chanmsg:?}");
                };
                let relaymsg = UnparsedRelayMsg::from_singleton_body(
                    RelayCellFormat::V0,
                    relay_early.into_relay_body(),
                )
                .unwrap();
                assert_eq!(relaymsg.cmd(), RelayCmd::EXTEND2);
                // Send back the "bad_reply."
                sink.send(bad_reply).await.unwrap();
                sink
            })
            .unwrap();
        let outcome = tunnel
            .as_single_circ()
            .unwrap()
            .extend_ntor(&target, params)
            .await;
        let _sink = reply_task_handle.await;
        assert_eq!(tunnel.n_hops().unwrap(), 3);
        assert!(outcome.is_err());
        outcome.unwrap_err()
    }
    #[traced_test]
    #[test]
    fn bad_extend_wronghop() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![]).into();
            let cc = rmsg_to_ccmsg(None, extended2);
            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
            // This case shows up as a CircDestroy, since a message sent
            // from the wrong hop won't even be delivered to the extend
            // code's meta-handler.  Instead the unexpected message will cause
            // the circuit to get torn down.
            match error {
                Error::CircuitClosed => {}
                x => panic!("got other error: {}", x),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_wrongtype() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended = relaymsg::Extended::new(vec![7; 200]).into();
            let cc = rmsg_to_ccmsg(None, extended);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::BytesErr {
                    err: tor_bytes::Error::InvalidMessage(_),
                    object: "extended2 message",
                } => {}
                other => panic!("{:?}", other),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_destroy() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let cc = AnyChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::CircuitClosed => {}
                other => panic!("{:?}", other),
            }
        });
    }
    #[traced_test]
    #[test]
    fn bad_extend_crypto() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
            let cc = rmsg_to_ccmsg(None, extended2);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            assert_matches!(error, Error::BadCircHandshakeAuth);
        });
    }
    #[traced_test]
    #[test]
    fn begindir() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
            let circ = tunnel.as_single_circ().unwrap();
            let circid = circ.peek_circid();
            let begin_and_send_fut = async move {
                // Here we'll say we've got a circuit, and we want to
                // make a simple BEGINDIR request with it.
                let mut stream = tunnel.begin_dir_stream().await.unwrap();
                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
                stream.flush().await.unwrap();
                let mut buf = [0_u8; 1024];
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(n, 0);
                stream
            };
            let reply_fut = async move {
                // We've disabled encryption on this circuit, so we can just
                // read the begindir cell.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert_matches!(rmsg, AnyRelayMsg::BeginDir(_));
                // Reply with a Connected cell to indicate success.
                let connected = relaymsg::Connected::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                // Now read a DATA cell...
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid_2, streamid);
                if let AnyRelayMsg::Data(d) = rmsg {
                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
                } else {
                    panic!();
                }
                // Write another data cell in reply!
                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
                    .unwrap()
                    .into();
                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
                // Send an END cell to say that the conversation is over.
                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
                (rx, sink) // gotta keep these alive, or the reactor will exit.
            };
            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
        });
    }
    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
    fn close_stream_helper(by_drop: bool) {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
            let stream_fut = async move {
                let stream = tunnel
                    .begin_stream("www.example.com", 80, None)
                    .await
                    .unwrap();
                let (r, mut w) = stream.split();
                if by_drop {
                    // Drop the writer and the reader, which should close the stream.
                    drop(r);
                    drop(w);
                    (None, tunnel) // make sure to keep the circuit alive
                } else {
                    // Call close on the writer, while keeping the reader alive.
                    w.close().await.unwrap();
                    (Some(r), tunnel)
                }
            };
            let handler_fut = async {
                // Read the BEGIN message.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
                // Reply with a CONNECTED.
                let connected =
                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                // Expect an END.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (_, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::END);
                (rx, sink) // keep these alive or the reactor will exit.
            };
            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
        });
    }
    #[traced_test]
    #[test]
    fn drop_stream() {
        close_stream_helper(true);
    }
    #[traced_test]
    #[test]
    fn close_stream() {
        close_stream_helper(false);
    }
    #[traced_test]
    #[test]
    fn expire_halfstreams() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
            let client_fut = async move {
                let stream = tunnel
                    .begin_stream("www.example.com", 80, None)
                    .await
                    .unwrap();
                let (r, mut w) = stream.split();
                // Close the stream
                w.close().await.unwrap();
                (Some(r), tunnel)
            };
            let exit_fut = async {
                // Read the BEGIN message.
                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                let rmsg = match msg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
                // Reply with a CONNECTED.
                let connected =
                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                (rx, streamid, sink) // keep these alive or the reactor will exit.
            };
            let ((_opt_reader, tunnel), (_rx, streamid, mut sink)) =
                futures::join!(client_fut, exit_fut);
            // Progress all futures to ensure the reactor has a chance to notice
            // we closed the stream.
            rt.progress_until_stalled().await;
            // The tunnel should remain open
            assert!(!tunnel.is_closed());
            // Write some more data on the half-stream.
            // The half-stream hasn't expired yet, so it will simply be ignored.
            let data = relaymsg::Data::new(b"hello").unwrap();
            sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data)))
                .await
                .unwrap();
            rt.progress_until_stalled().await;
            // This was not a protocol violation, so the tunnel is still alive.
            assert!(!tunnel.is_closed());
            // Advance the time to cause the half-streams to get garbage collected.
            //
            // Advancing it by 2 * CBT ought to be enough, because the RTT estimator
            // won't yet have an estimate for the max_rtt.
            let stream_timeout = DummyTimeoutEstimator.circuit_build_timeout(3);
            rt.advance_by(2 * stream_timeout).await;
            // Sending this cell is a protocol violation now
            // that the half-stream expired.
            let data = relaymsg::Data::new(b"hello").unwrap();
            sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data)))
                .await
                .unwrap();
            rt.progress_until_stalled().await;
            // The tunnel shut down because of the proto violation.
            assert!(tunnel.is_closed());
        });
    }
    // Set up a circuit and stream that expects some incoming SENDMEs.
    async fn setup_incoming_sendme_case<R: Runtime>(
        rt: &R,
        n_to_send: usize,
    ) -> (
        Arc<ClientTunnel>,
        DataStream,
        CircuitRxSender,
        Option<StreamId>,
        usize,
        Receiver<AnyChanCell>,
        Sender<CodecResult>,
    ) {
        let (chan, mut rx, sink2) = working_fake_channel(rt);
        let (tunnel, mut sink) = newtunnel(rt, chan).await;
        let circid = tunnel.as_single_circ().unwrap().peek_circid();
        let begin_and_send_fut = {
            let tunnel = tunnel.clone();
            async move {
                // Take our circuit and make a stream on it.
                let mut stream = tunnel
                    .begin_stream("www.example.com", 443, None)
                    .await
                    .unwrap();
                let junk = [0_u8; 1024];
                let mut remaining = n_to_send;
                while remaining > 0 {
                    let n = std::cmp::min(remaining, junk.len());
                    stream.write_all(&junk[..n]).await.unwrap();
                    remaining -= n;
                }
                stream.flush().await.unwrap();
                stream
            }
        };
        let receive_fut = async move {
            // Read the begin cell.
            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            let rmsg = match chmsg {
                AnyChanMsg::Relay(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
            assert_matches!(rmsg, AnyRelayMsg::Begin(_));
            // Reply with a connected cell...
            let connected = relaymsg::Connected::new_empty().into();
            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
            // Now read bytes from the stream until we have them all.
            let mut bytes_received = 0_usize;
            let mut cells_received = 0_usize;
            while bytes_received < n_to_send {
                // Read a data cell, and remember how much we got.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, Some(circid));
                let rmsg = match chmsg {
                    AnyChanMsg::Relay(r) => {
                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                            .unwrap()
                    }
                    other => panic!("{:?}", other),
                };
                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid2, streamid);
                if let AnyRelayMsg::Data(dat) = rmsg {
                    cells_received += 1;
                    bytes_received += dat.as_ref().len();
                } else {
                    panic!();
                }
            }
            (sink, streamid, cells_received, rx)
        };
        let (stream, (sink, streamid, cells_received, rx)) =
            futures::join!(begin_and_send_fut, receive_fut);
        (tunnel, stream, sink, streamid, cells_received, rx, sink2)
    }
    #[traced_test]
    #[test]
    fn accept_valid_sendme() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let (tunnel, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
            let circ = tunnel.as_single_circ().unwrap();
            assert_eq!(cells_received, 301);
            // Make sure that the circuit is indeed expecting the right sendmes
            {
                let (tx, rx) = oneshot::channel();
                circ.command
                    .unbounded_send(CtrlCmd::QuerySendWindow {
                        hop: 2.into(),
                        leg: tunnel.unique_id(),
                        done: tx,
                    })
                    .unwrap();
                let (window, tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 301);
                assert_eq!(tags.len(), 3);
                // 100
                assert_eq!(
                    tags[0],
                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
                );
                // 200
                assert_eq!(
                    tags[1],
                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
                );
                // 300
                assert_eq!(
                    tags[2],
                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
                );
            }
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
                // Make and send a stream-level sendme.
                let s_sendme = relaymsg::Sendme::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
                sink
            };
            let _sink = reply_with_sendme_fut.await;
            rt.advance_until_stalled().await;
            // Now make sure that the circuit is still happy, and its
            // window is updated.
            {
                let (tx, rx) = oneshot::channel();
                circ.command
                    .unbounded_send(CtrlCmd::QuerySendWindow {
                        hop: 2.into(),
                        leg: tunnel.unique_id(),
                        done: tx,
                    })
                    .unwrap();
                let (window, _tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 201);
            }
        });
    }
    #[traced_test]
    #[test]
    fn invalid_circ_sendme() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            // Same setup as accept_valid_sendme() test above but try giving
            // a sendme with the wrong tag.
            let (tunnel, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme with a bad tag.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
                        .into();
                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
                sink
            };
            let _sink = reply_with_sendme_fut.await;
            // Check whether the reactor dies as a result of receiving invalid data.
            rt.advance_until_stalled().await;
            assert!(tunnel.is_closed());
        });
    }
    #[traced_test]
    #[test]
    fn test_busy_stream_fairness() {
        // Number of streams to use.
        const N_STREAMS: usize = 3;
        // Number of cells (roughly) for each stream to send.
        const N_CELLS: usize = 20;
        // Number of bytes that *each* stream will send, and that we'll read
        // from the channel.
        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
        // Ignoring cell granularity, with perfect fairness we'd expect
        // `N_BYTES/N_STREAMS` bytes from each stream.
        //
        // We currently allow for up to a full cell less than that.  This is
        // somewhat arbitrary and can be changed as needed, since we don't
        // provide any specific fairness guarantees.
        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
            // Run clients in a single task, doing our own round-robin
            // scheduling of writes to the reactor. Conversely, if we were to
            // put each client in its own task, we would be at the the mercy of
            // how fairly the runtime schedules the client tasks, which is outside
            // the scope of this test.
            rt.spawn({
                // Clone the circuit to keep it alive after writers have
                // finished with it.
                let tunnel = tunnel.clone();
                async move {
                    let mut clients = VecDeque::new();
                    struct Client {
                        stream: DataStream,
                        to_write: &'static [u8],
                    }
                    for _ in 0..N_STREAMS {
                        clients.push_back(Client {
                            stream: tunnel
                                .begin_stream("www.example.com", 80, None)
                                .await
                                .unwrap(),
                            to_write: &[0_u8; N_BYTES][..],
                        });
                    }
                    while let Some(mut client) = clients.pop_front() {
                        if client.to_write.is_empty() {
                            // Client is done. Don't put back in queue.
                            continue;
                        }
                        let written = client.stream.write(client.to_write).await.unwrap();
                        client.to_write = &client.to_write[written..];
                        clients.push_back(client);
                    }
                }
            })
            .unwrap();
            let channel_handler_fut = async {
                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
                let mut total_bytes_received = 0;
                loop {
                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
                    let rmsg = match msg {
                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
                            RelayCellFormat::V0,
                            r.into_relay_body(),
                        )
                        .unwrap(),
                        other => panic!("Unexpected chanmsg: {other:?}"),
                    };
                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                    match rmsg.cmd() {
                        RelayCmd::BEGIN => {
                            // Add an entry for this stream.
                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
                            assert_eq!(prev, None);
                            // Reply with a CONNECTED.
                            let connected = relaymsg::Connected::new_with_addr(
                                "10.0.0.1".parse().unwrap(),
                                1234,
                            )
                            .into();
                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
                        }
                        RelayCmd::DATA => {
                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
                            let nbytes = data_msg.as_ref().len();
                            total_bytes_received += nbytes;
                            let streamid = streamid.unwrap();
                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
                            *stream_bytes += nbytes;
                            if total_bytes_received >= N_BYTES {
                                break;
                            }
                        }
                        RelayCmd::END => {
                            // Stream is done. If fair scheduling is working as
                            // expected we *probably* shouldn't get here, but we
                            // can ignore it and save the failure until we
                            // actually have the final stats.
                            continue;
                        }
                        other => {
                            panic!("Unexpected command {other:?}");
                        }
                    }
                }
                // Return our stats, along with the `rx` and `sink` to keep the
                // reactor alive (since clients could still be writing).
                (total_bytes_received, stream_bytes_received, rx, sink)
            };
            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
                channel_handler_fut.await;
            assert_eq!(stream_bytes_received.len(), N_STREAMS);
            for (sid, stream_bytes) in stream_bytes_received {
                assert!(
                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
                );
            }
        });
    }
    #[test]
    fn basic_params() {
        use super::CircParameters;
        let mut p = CircParameters::default();
        assert!(p.extend_by_ed25519_id);
        p.extend_by_ed25519_id = false;
        assert!(!p.extend_by_ed25519_id);
    }
    #[cfg(feature = "hs-service")]
    struct AllowAllStreamsFilter;
    #[cfg(feature = "hs-service")]
    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
        fn disposition(
            &mut self,
            _ctx: &crate::client::stream::IncomingStreamRequestContext<'_>,
            _circ: &crate::circuit::CircHopSyncView<'_>,
        ) -> Result<crate::client::stream::IncomingStreamRequestDisposition> {
            Ok(crate::client::stream::IncomingStreamRequestDisposition::Accept)
        }
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn allow_stream_requests_twice() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (tunnel, _send) = newtunnel(&rt, chan).await;
            let _incoming = tunnel
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    tunnel.resolve_last_hop().await,
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let incoming = tunnel
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    tunnel.resolve_last_hop().await,
                    AllowAllStreamsFilter,
                )
                .await;
            // There can only be one IncomingStream at a time on any given circuit.
            assert!(incoming.is_err());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn allow_stream_requests() {
        use tor_cell::relaycell::msg::BeginFlags;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            const TEST_DATA: &[u8] = b"ping";
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (tunnel, mut send) = newtunnel(&rt, chan).await;
            let rfmt = RelayCellFormat::V0;
            // A helper channel for coordinating the "client"/"service" interaction
            let (tx, rx) = oneshot::channel();
            let mut incoming = tunnel
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    tunnel.resolve_last_hop().await,
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                let stream = incoming.next().await.unwrap();
                let mut data_stream = stream
                    .accept_data(relaymsg::Connected::new_empty())
                    .await
                    .unwrap();
                // Notify the client task we're ready to accept DATA cells
                tx.send(()).unwrap();
                // Read the data the client sent us
                let mut buf = [0_u8; TEST_DATA.len()];
                data_stream.read_exact(&mut buf).await.unwrap();
                assert_eq!(&buf, TEST_DATA);
                tunnel
            };
            let simulate_client = async move {
                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending a begin cell
                send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
                // Wait until the service is ready to accept data
                // TODO: we shouldn't need to wait! This is needed because the service will reject
                // any DATA cells that aren't associated with a known stream. We need to wait until
                // the service receives our BEGIN cell (and the reactor updates hop.map with the
                // new stream).
                rx.await.unwrap();
                // Now send some data along the newly established circuit..
                let data = relaymsg::Data::new(TEST_DATA).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let data_msg = chanmsg::Relay::from(body);
                send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn accept_stream_after_reject() {
        use tor_cell::relaycell::msg::AnyRelayMsg;
        use tor_cell::relaycell::msg::BeginFlags;
        use tor_cell::relaycell::msg::EndReason;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            const TEST_DATA: &[u8] = b"ping";
            const STREAM_COUNT: usize = 2;
            let rfmt = RelayCellFormat::V0;
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (tunnel, mut send) = newtunnel(&rt, chan).await;
            // A helper channel for coordinating the "client"/"service" interaction
            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
            let mut incoming = tunnel
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    tunnel.resolve_last_hop().await,
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                // Process 2 incoming streams
                for i in 0..STREAM_COUNT {
                    let stream = incoming.next().await.unwrap();
                    // Reject the first one
                    if i == 0 {
                        stream
                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
                            .await
                            .unwrap();
                        // Notify the client
                        tx.send(()).await.unwrap();
                        continue;
                    }
                    let mut data_stream = stream
                        .accept_data(relaymsg::Connected::new_empty())
                        .await
                        .unwrap();
                    // Notify the client task we're ready to accept DATA cells
                    tx.send(()).await.unwrap();
                    // Read the data the client sent us
                    let mut buf = [0_u8; TEST_DATA.len()];
                    data_stream.read_exact(&mut buf).await.unwrap();
                    assert_eq!(&buf, TEST_DATA);
                }
                tunnel
            };
            let simulate_client = async move {
                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending 2 identical begin
                // cells (the first one will be rejected by the test service).
                for _ in 0..STREAM_COUNT {
                    send.send(AnyChanMsg::Relay(begin_msg.clone()))
                        .await
                        .unwrap();
                    // Wait until the service rejects our request
                    rx.next().await.unwrap();
                }
                // Now send some data along the newly established circuit..
                let data = relaymsg::Data::new(TEST_DATA).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let data_msg = chanmsg::Relay::from(body);
                send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "hs-service")]
    fn incoming_stream_bad_hop() {
        use tor_cell::relaycell::msg::BeginFlags;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            /// Expect the originator of the BEGIN cell to be hop 1.
            const EXPECTED_HOP: u8 = 1;
            let rfmt = RelayCellFormat::V0;
            let (chan, _rx, _sink) = working_fake_channel(&rt);
            let (tunnel, mut send) = newtunnel(&rt, chan).await;
            // Expect to receive incoming streams from hop EXPECTED_HOP
            let mut incoming = tunnel
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    // Build the precise HopLocation with the underlying circuit.
                    (
                        tunnel.as_single_circ().unwrap().unique_id(),
                        EXPECTED_HOP.into(),
                    )
                        .into(),
                    AllowAllStreamsFilter,
                )
                .await
                .unwrap();
            let simulate_service = async move {
                // The originator of the cell is actually the last hop on the circuit, not hop 1,
                // so we expect the reactor to shut down.
                assert!(incoming.next().await.is_none());
                tunnel
            };
            let simulate_client = async move {
                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
                let body: BoxedCellBody =
                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
                        .encode(rfmt, &mut testing_rng())
                        .unwrap();
                let begin_msg = chanmsg::Relay::from(body);
                // Pretend to be a client at the other end of the circuit sending a begin cell
                send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
                send
            };
            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn multipath_circ_validation() {
        use std::error::Error as _;
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let params = CircParameters::default();
            let invalid_tunnels = [
                setup_bad_conflux_tunnel(&rt).await,
                setup_conflux_tunnel(&rt, true, params).await,
            ];
            for tunnel in invalid_tunnels {
                let TestTunnelCtx {
                    tunnel: _tunnel,
                    circs: _circs,
                    conflux_link_rx,
                } = tunnel;
                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
                let err_src = conflux_hs_err.source().unwrap();
                // The two circuits don't end in the same hop (no join point),
                // so the reactor will refuse to link them
                assert!(
                    err_src
                        .to_string()
                        .contains("one more more conflux circuits are invalid")
                );
            }
        });
    }
    // TODO: this structure could be reused for the other tests,
    // to address nickm's comment:
    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
    #[derive(Debug)]
    #[allow(unused)]
    #[cfg(feature = "conflux")]
    struct TestCircuitCtx {
        chan_rx: Receiver<AnyChanCell>,
        chan_tx: Sender<std::result::Result<AnyChanCell, Error>>,
        circ_tx: CircuitRxSender,
        unique_id: UniqId,
    }
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct TestTunnelCtx {
        tunnel: Arc<ClientTunnel>,
        circs: Vec<TestCircuitCtx>,
        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
    }
    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
    #[cfg(feature = "conflux")]
    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
        // Wait for the LINK cell...
        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
        let rmsg = match chmsg {
            AnyChanMsg::Relay(r) => {
                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                    .unwrap()
            }
            other => panic!("{:?}", other),
        };
        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
        let link = match rmsg {
            AnyRelayMsg::ConfluxLink(link) => link,
            _ => panic!("unexpected relay message {rmsg:?}"),
        };
        assert!(streamid.is_none());
        link
    }
    #[cfg(feature = "conflux")]
    async fn setup_conflux_tunnel(
        rt: &MockRuntime,
        same_hops: bool,
        params: CircParameters,
    ) -> TestTunnelCtx {
        let hops1 = hop_details(3, 0);
        let hops2 = if same_hops {
            hops1.clone()
        } else {
            hop_details(3, 10)
        };
        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
        let (mut tunnel1, sink1) = newtunnel_ext(
            rt,
            UniqId::new(1, 3),
            chan1,
            hops1,
            2.into(),
            params.clone(),
        )
        .await;
        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
        let (tunnel2, sink2) =
            newtunnel_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
        let (answer_tx, answer_rx) = oneshot::channel();
        tunnel2
            .as_single_circ()
            .unwrap()
            .command
            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
            .unwrap();
        let circuit = answer_rx.await.unwrap().unwrap();
        // The circuit should be shutting down its reactor
        rt.advance_until_stalled().await;
        assert!(tunnel2.is_closed());
        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
        // Tell the first circuit to link with the second and form a multipath tunnel
        tunnel1
            .as_single_circ()
            .unwrap()
            .control
            .unbounded_send(CtrlMsg::LinkCircuits {
                circuits: vec![circuit],
                answer: conflux_link_tx,
            })
            .unwrap();
        let circ_ctx1 = TestCircuitCtx {
            chan_rx: rx1,
            chan_tx: chan_sink1,
            circ_tx: sink1,
            unique_id: tunnel1.unique_id(),
        };
        let circ_ctx2 = TestCircuitCtx {
            chan_rx: rx2,
            chan_tx: chan_sink2,
            circ_tx: sink2,
            unique_id: tunnel2.unique_id(),
        };
        // TODO(conflux): nothing currently sets this,
        // so we need to manually set it.
        //
        // Instead of doing this, we should have a ClientCirc
        // API that sends CtrlMsg::Link circuits and sets this to true
        tunnel1.circ.is_multi_path = true;
        TestTunnelCtx {
            tunnel: Arc::new(tunnel1),
            circs: vec![circ_ctx1, circ_ctx2],
            conflux_link_rx,
        }
    }
    #[cfg(feature = "conflux")]
    async fn setup_good_conflux_tunnel(
        rt: &MockRuntime,
        cc_params: CongestionControlParams,
    ) -> TestTunnelCtx {
        // Our 2 test circuits are identical, so they both have the same guards,
        // which technically violates the conflux set rule mentioned in prop354.
        // For testing purposes this is fine, but in production we'll need to ensure
        // the calling code prevents guard reuse (except in the case where
        // one of the guards happens to be Guard + Exit)
        let same_hops = true;
        let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
        let params = CircParameters::new(true, cc_params, flow_ctrl_params);
        setup_conflux_tunnel(rt, same_hops, params).await
    }
    #[cfg(feature = "conflux")]
    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
        // The two circuits don't share any hops,
        // so they won't end in the same hop (no join point),
        // causing the reactor to refuse to link them.
        let same_hops = false;
        let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
        let params = CircParameters::new(true, build_cc_vegas_params(), flow_ctrl_params);
        setup_conflux_tunnel(rt, same_hops, params).await
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn reject_conflux_linked_before_hs() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
            let nonce = V1Nonce::new(&mut testing_rng());
            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            // Send a LINKED cell
            let linked = relaymsg::ConfluxLinked::new(payload).into();
            sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
            rt.advance_until_stalled().await;
            assert!(tunnel.is_closed());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_hs_timeout() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel: _tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            // Wait for the LINK cell
            let link = await_link_payload(&mut circ1.chan_rx).await;
            // Send a LINK cell on the first leg...
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ1
                .circ_tx
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            // Do nothing, and wait for the handshake to timeout on the second leg
            rt.advance_by(Duration::from_secs(60)).await;
            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
            // Get the handshake results of each circuit
            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
                conflux_hs_res.try_into().unwrap();
            assert!(res1.is_ok());
            let err = res2.unwrap_err();
            assert_matches!(err, ConfluxHandshakeError::Timeout);
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_hs() {
        use crate::util::err::ConfluxHandshakeError;
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let nonce = V1Nonce::new(&mut testing_rng());
            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            //let extended2 = relaymsg::Extended2::new(vec![]).into();
            let bad_hs_responses = [
                (
                    rmsg_to_ccmsg(
                        None,
                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
                    ),
                    "Received CONFLUX_LINKED cell with mismatched nonce",
                ),
                (
                    rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
                ),
                (
                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
                    "Received CONFLUX_SWITCH on unlinked circuit?!",
                ),
                // TODO: this currently causes the reactor to shut down immediately,
                // without sending a response on the handshake channel
                /*
                (
                    rmsg_to_ccmsg(None, extended2),
                    "Received CONFLUX_LINKED cell with mismatched nonce",
                ),
                */
            ];
            for (bad_cell, expected_err) in bad_hs_responses {
                let TestTunnelCtx {
                    tunnel,
                    circs,
                    conflux_link_rx,
                } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
                // Respond with a bogus cell on one of the legs
                circ2.circ_tx.send(bad_cell).await.unwrap();
                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
                // Get the handshake results (the handshake results are reported early,
                // without waiting for the second circuit leg's handshake to timeout,
                // because this is a protocol violation causing the entire tunnel to shut down)
                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
                    conflux_hs_res.try_into().unwrap();
                match res2.unwrap_err() {
                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
                        assert_eq!(e, expected_err);
                    }
                    e => panic!("unexpected error: {e:?}"),
                }
                assert!(tunnel.is_closed());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn unexpected_conflux_cell() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let nonce = V1Nonce::new(&mut testing_rng());
            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
            let bad_cells = [
                rmsg_to_ccmsg(
                    None,
                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
                ),
                rmsg_to_ccmsg(
                    None,
                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
                ),
                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
            ];
            for bad_cell in bad_cells {
                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
                let (tunnel, mut sink) = newtunnel(&rt, chan).await;
                sink.send(bad_cell).await.unwrap();
                rt.advance_until_stalled().await;
                // Note: unfortunately we can't assert the circuit is
                // closing for the reason, because the reactor just logs
                // the error and then exits.
                assert!(tunnel.is_closed());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_linked() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx: _,
            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            let link = await_link_payload(&mut circ1.chan_rx).await;
            // Send a LINK cell on the first leg...
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ1
                .circ_tx
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            // ...and two LINKED cells on the second
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ2
                .circ_tx
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
            circ2
                .circ_tx
                .send(rmsg_to_ccmsg(None, linked))
                .await
                .unwrap();
            rt.advance_until_stalled().await;
            // Receiving a LINKED cell on an already linked leg causes
            // the tunnel to be torn down
            assert!(tunnel.is_closed());
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_bad_switch() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let cc_vegas_params = build_cc_vegas_params();
            let cwnd_init = cc_vegas_params.cwnd_params().cwnd_init();
            let bad_switch = [
                // SWITCH cells with seqno = 0 are not allowed
                relaymsg::ConfluxSwitch::new(0),
                // SWITCH cells with seqno > cc_init_cwnd are not allowed
                // on tunnels that have not received any data
                relaymsg::ConfluxSwitch::new(cwnd_init + 1),
            ];
            for bad_cell in bad_switch {
                let TestTunnelCtx {
                    tunnel,
                    circs,
                    conflux_link_rx,
                } = setup_good_conflux_tunnel(&rt, cc_vegas_params.clone()).await;
                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
                let link = await_link_payload(&mut circ1.chan_rx).await;
                // Send a LINKED cell on both legs
                for circ in [&mut circ1, &mut circ2] {
                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
                    circ.circ_tx
                        .send(rmsg_to_ccmsg(None, linked))
                        .await
                        .unwrap();
                }
                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
                // Now send a bad SWITCH cell on the first leg.
                // This will cause the tunnel reactor to shut down.
                let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
                circ1.circ_tx.send(msg).await.unwrap();
                // The tunnel should be shutting down
                rt.advance_until_stalled().await;
                assert!(tunnel.is_closed());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn conflux_consecutive_switch() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            let link = await_link_payload(&mut circ1.chan_rx).await;
            // Send a LINKED cell on both legs
            for circ in [&mut circ1, &mut circ2] {
                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
                circ.circ_tx
                    .send(rmsg_to_ccmsg(None, linked))
                    .await
                    .unwrap();
            }
            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
            // Send a valid SWITCH cell on the first leg.
            let switch1 = relaymsg::ConfluxSwitch::new(10);
            let msg = rmsg_to_ccmsg(None, switch1.into());
            circ1.circ_tx.send(msg).await.unwrap();
            // The tunnel should not be shutting down
            rt.advance_until_stalled().await;
            assert!(!tunnel.is_closed());
            // Send another valid SWITCH cell on the same leg.
            let switch2 = relaymsg::ConfluxSwitch::new(12);
            let msg = rmsg_to_ccmsg(None, switch2.into());
            circ1.circ_tx.send(msg).await.unwrap();
            // The tunnel should now be shutting down
            // (consecutive switches are not allowed)
            rt.advance_until_stalled().await;
            assert!(tunnel.is_closed());
        });
    }
    // This test ensures CtrlMsg::ShutdownAndReturnCircuit returns an
    // error when called on a multi-path tunnel
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn shutdown_and_return_circ_multipath() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx: _,
            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
            rt.progress_until_stalled().await;
            let (answer_tx, answer_rx) = oneshot::channel();
            tunnel
                .circ
                .command
                .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
                .unwrap();
            // map explicitly returns () for clarity
            #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
            let err = answer_rx
                .await
                .unwrap()
                .map(|_| {
                    // Map to () so we can call unwrap
                    // (Circuit doesn't impl debug)
                    ()
                })
                .unwrap_err();
            const MSG: &str = "not a single leg conflux set (got at least 2 elements when exactly one was expected)";
            assert!(err.to_string().contains(MSG), "{err}");
            // The tunnel reactor should be shutting down,
            // regardless of the error
            rt.progress_until_stalled().await;
            assert!(tunnel.is_closed());
            // Keep circs alive, to prevent the reactor
            // from shutting down prematurely
            drop(circs);
        });
    }
    /// Run a conflux test endpoint.
    #[cfg(feature = "conflux")]
    #[derive(Debug)]
    enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
        /// Pretend to be an exit relay.
        Relay(ConfluxExitState<I>),
        /// Client task.
        Client {
            /// Channel for receiving the outcome of the conflux handshakes.
            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
            /// The tunnel reactor handle
            tunnel: Arc<ClientTunnel>,
            /// Data to send on a stream.
            send_data: Vec<u8>,
            /// Data we expect to receive on a stream.
            recv_data: Vec<u8>,
        },
    }
    /// Structure for returning the sinks, channels, etc. that must stay
    /// alive until the test is complete.
    #[allow(unused, clippy::large_enum_variant)]
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    enum ConfluxEndpointResult {
        Circuit {
            tunnel: Arc<ClientTunnel>,
            stream: DataStream,
        },
        Relay {
            circ: TestCircuitCtx,
        },
    }
    /// Stream data, shared by all the mock exit endpoints.
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct ConfluxStreamState {
        /// The data received so far on this stream (at the exit).
        data_recvd: Vec<u8>,
        /// The total amount of data we expect to receive on this stream.
        expected_data_len: usize,
        /// Whether we have seen a BEGIN cell yet.
        begin_recvd: bool,
        /// Whether we have seen an END cell yet.
        end_recvd: bool,
        /// Whether we have sent an END cell yet.
        end_sent: bool,
    }
    #[cfg(feature = "conflux")]
    impl ConfluxStreamState {
        fn new(expected_data_len: usize) -> Self {
            Self {
                data_recvd: vec![],
                expected_data_len,
                begin_recvd: false,
                end_recvd: false,
                end_sent: false,
            }
        }
    }
    /// An object describing a SWITCH cell that we expect to receive
    /// in the mock exit
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct ExpectedSwitch {
        /// The number of cells we've seen on this leg so far,
        /// up to and including the SWITCH.
        cells_so_far: usize,
        /// The expected seqno in SWITCH cell,
        seqno: u32,
    }
    /// Object dispatching cells for delivery on the appropriate
    /// leg in a multipath tunnel.
    ///
    /// Used to send out-of-order cells from the mock exit
    /// to the client under test.
    #[cfg(feature = "conflux")]
    struct CellDispatcher {
        /// Channels on which to send the [`CellToSend`] commands on.
        leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
        /// The list of cells to send,
        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
    }
    #[cfg(feature = "conflux")]
    impl CellDispatcher {
        async fn run(mut self) {
            while !self.cells_to_send.is_empty() {
                let (circ_id, cell) = self.cells_to_send.remove(0);
                let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
                let (done_tx, done_rx) = oneshot::channel();
                cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
                // Wait for the cell to be sent before sending the next one.
                let () = done_rx.await.unwrap();
            }
        }
    }
    /// A cell for the mock exit to send on one of its legs.
    #[cfg(feature = "conflux")]
    #[derive(Debug)]
    struct CellToSend {
        /// Channel for notifying the control task that the cell was sent.
        done_tx: oneshot::Sender<()>,
        /// The cell to send.
        cell: AnyRelayMsg,
    }
    /// The state of a mock exit.
    #[derive(Debug)]
    #[cfg(feature = "conflux")]
    struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
        /// The runtime, shared by the test client and mock exit tasks.
        ///
        /// The mutex prevents the client and mock exit tasks from calling
        /// functions like [`MockRuntime::advance_until_stalled`]
        /// or [`MockRuntime::progress_until_stalled]` concurrently,
        /// as this is not supported by the mock runtime.
        runtime: Arc<AsyncMutex<MockRuntime>>,
        /// The client view of the tunnel.
        tunnel: Arc<ClientTunnel>,
        /// The circuit test context.
        circ: TestCircuitCtx,
        /// The RTT delay to introduce just before each SENDME.
        ///
        /// Used to trigger the client to send a SWITCH.
        rtt_delays: I,
        /// State of the (only) expected stream on this tunnel,
        /// shared by all the mock exit endpoints.
        stream_state: Arc<Mutex<ConfluxStreamState>>,
        /// The number of cells after which to expect a SWITCH
        /// cell from the client.
        expect_switch: Vec<ExpectedSwitch>,
        /// Channel for receiving notifications from the other leg.
        event_rx: mpsc::Receiver<MockExitEvent>,
        /// Channel for sending notifications to the other leg.
        event_tx: mpsc::Sender<MockExitEvent>,
        /// Whether this circuit leg should act as the primary (sending) leg.
        is_sending_leg: bool,
        /// A channel for receiving cells to send on this stream.
        cells_rx: mpsc::Receiver<CellToSend>,
    }
    #[cfg(feature = "conflux")]
    async fn good_exit_handshake(
        runtime: &Arc<AsyncMutex<MockRuntime>>,
        init_rtt_delay: Option<Duration>,
        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
        sink: &mut CircuitRxSender,
    ) {
        // Wait for the LINK cell
        let link = await_link_payload(rx).await;
        // Introduce an artificial delay, to make one circ have a better initial RTT
        // than the other
        if let Some(init_rtt_delay) = init_rtt_delay {
            runtime.lock().await.advance_by(init_rtt_delay).await;
        }
        // Reply with a LINKED cell...
        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
        sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
        // Wait for the client to respond with LINKED_ACK...
        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
        let rmsg = match chmsg {
            AnyChanMsg::Relay(r) => {
                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                    .unwrap()
            }
            other => panic!("{other:?}"),
        };
        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
        assert_matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_));
    }
    /// An event sent by one mock conflux leg to another.
    #[derive(Copy, Clone, Debug)]
    enum MockExitEvent {
        /// Inform the other leg we are done.
        Done,
        /// Inform the other leg a stream was opened.
        BeginRecvd(StreamId),
    }
    #[cfg(feature = "conflux")]
    async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
        state: ConfluxExitState<I>,
    ) -> ConfluxEndpointResult {
        let ConfluxExitState {
            runtime,
            tunnel,
            mut circ,
            rtt_delays,
            stream_state,
            mut expect_switch,
            mut event_tx,
            mut event_rx,
            is_sending_leg,
            mut cells_rx,
        } = state;
        let mut rtt_delays = rtt_delays.into_iter();
        // Expect the client to open a stream, and de-multiplex the received stream data
        let stream_len = stream_state.lock().unwrap().expected_data_len;
        let mut data_cells_received = 0_usize;
        let mut cell_count = 0_usize;
        let mut tags = vec![];
        let mut streamid = None;
        let mut done_writing = false;
        loop {
            let should_exit = {
                let stream_state = stream_state.lock().unwrap();
                let done_reading = stream_state.data_recvd.len() >= stream_len;
                (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
            };
            if should_exit {
                break;
            }
            use futures::select;
            // Only start reading from the dispatcher channel after the stream is open
            // and we're ready to start sending cells.
            let mut next_cell = if streamid.is_some() && !done_writing {
                Box::pin(cells_rx.next().fuse())
                    as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
            } else {
                Box::pin(std::future::pending().fuse())
            };
            // Wait for the BEGIN cell to arrive, or for the transfer to complete
            // (we need to bail if the other leg already completed);
            let res = select! {
                res = circ.chan_rx.next() => {
                    res.unwrap()
                },
                res = event_rx.next() => {
                    let Some(event) = res else {
                        break;
                    };
                    match event {
                        MockExitEvent::Done => {
                            break;
                        },
                        MockExitEvent::BeginRecvd(id) => {
                            // The stream is now open (the other leg received the BEGIN),
                            // so we're reading to start reading cells from the cell dispatcher.
                            streamid = Some(id);
                            continue;
                        },
                    }
                }
                res = next_cell => {
                    if let Some(cell_to_send) = res {
                        let CellToSend { cell, done_tx } = cell_to_send;
                        // SWITCH cells don't have a stream ID
                        let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
                            None
                        } else {
                            streamid
                        };
                        circ.circ_tx
                            .send(rmsg_to_ccmsg(streamid, cell))
                            .await
                            .unwrap();
                        runtime.lock().await.advance_until_stalled().await;
                        done_tx.send(()).unwrap();
                    } else {
                        done_writing = true;
                    }
                    continue;
                }
            };
            let (_id, chmsg) = res.into_circid_and_msg();
            cell_count += 1;
            let rmsg = match chmsg {
                AnyChanMsg::Relay(r) => {
                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
                        .unwrap()
                }
                other => panic!("{:?}", other),
            };
            let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
            if streamid.is_none() {
                streamid = new_streamid;
            }
            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
            let end_recvd = stream_state.lock().unwrap().end_recvd;
            match rmsg {
                AnyRelayMsg::Begin(_) if begin_recvd => {
                    panic!("client tried to open two streams?!");
                }
                AnyRelayMsg::Begin(_) if !begin_recvd => {
                    stream_state.lock().unwrap().begin_recvd = true;
                    // Reply with a connected cell...
                    let connected = relaymsg::Connected::new_empty().into();
                    circ.circ_tx
                        .send(rmsg_to_ccmsg(streamid, connected))
                        .await
                        .unwrap();
                    // Tell the other leg we received a BEGIN cell
                    event_tx
                        .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
                        .await
                        .unwrap();
                }
                AnyRelayMsg::End(_) if !end_recvd => {
                    stream_state.lock().unwrap().end_recvd = true;
                    break;
                }
                AnyRelayMsg::End(_) if end_recvd => {
                    panic!("received two END cells for the same stream?!");
                }
                AnyRelayMsg::ConfluxSwitch(cell) => {
                    // Ensure we got the SWITCH after the expected number of cells
                    let expected = expect_switch.remove(0);
                    assert_eq!(expected.cells_so_far, cell_count);
                    assert_eq!(expected.seqno, cell.seqno());
                    // To keep the tests simple, we don't handle out of order cells,
                    // and simply sort the received data at the end.
                    // This ensures all the data was actually received,
                    // but it doesn't actually test that the SWITCH cells
                    // contain the appropriate seqnos.
                    continue;
                }
                AnyRelayMsg::Data(dat) => {
                    data_cells_received += 1;
                    stream_state
                        .lock()
                        .unwrap()
                        .data_recvd
                        .extend_from_slice(dat.as_ref());
                    let is_next_cell_sendme = data_cells_received.is_multiple_of(31);
                    if is_next_cell_sendme {
                        if tags.is_empty() {
                            // Important: we need to make sure all the SENDMEs
                            // we sent so far have been processed by the reactor
                            // (otherwise the next QuerySendWindow call
                            // might return an outdated list of tags!)
                            runtime.lock().await.advance_until_stalled().await;
                            let (tx, rx) = oneshot::channel();
                            tunnel
                                .circ
                                .command
                                .unbounded_send(CtrlCmd::QuerySendWindow {
                                    hop: 2.into(),
                                    leg: circ.unique_id,
                                    done: tx,
                                })
                                .unwrap();
                            // Get a fresh batch of tags.
                            let (_window, new_tags) = rx.await.unwrap().unwrap();
                            tags = new_tags;
                        }
                        let tag = tags.remove(0);
                        // Introduce an artificial delay, to make one circ have worse RTT
                        // than the other, and thus trigger a SWITCH
                        if let Some(rtt_delay) = rtt_delays.next().flatten() {
                            runtime.lock().await.advance_by(rtt_delay).await;
                        }
                        // Make and send a circuit-level SENDME
                        let sendme = relaymsg::Sendme::from(tag).into();
                        circ.circ_tx
                            .send(rmsg_to_ccmsg(None, sendme))
                            .await
                            .unwrap();
                    }
                }
                _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
            }
        }
        let end_recvd = stream_state.lock().unwrap().end_recvd;
        // Close the stream if the other endpoint hasn't already done so
        if is_sending_leg && !end_recvd {
            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
            circ.circ_tx
                .send(rmsg_to_ccmsg(streamid, end))
                .await
                .unwrap();
            stream_state.lock().unwrap().end_sent = true;
        }
        // This is allowed to fail, because the other leg might have exited first.
        let _ = event_tx.send(MockExitEvent::Done).await;
        // Ensure we received all the switch cells we were expecting
        assert!(
            expect_switch.is_empty(),
            "expect_switch = {expect_switch:?}"
        );
        ConfluxEndpointResult::Relay { circ }
    }
    #[cfg(feature = "conflux")]
    async fn run_conflux_client(
        tunnel: Arc<ClientTunnel>,
        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
        send_data: Vec<u8>,
        recv_data: Vec<u8>,
    ) -> ConfluxEndpointResult {
        let res = conflux_link_rx.await;
        let res = res.unwrap().unwrap();
        assert_eq!(res.len(), 2);
        // All circuit legs have completed the conflux handshake,
        // so we now have a multipath tunnel
        // Now we're ready to open a stream
        let mut stream = tunnel
            .begin_stream("www.example.com", 443, None)
            .await
            .unwrap();
        stream.write_all(&send_data).await.unwrap();
        stream.flush().await.unwrap();
        let mut recv: Vec<u8> = Vec::new();
        let recv_len = stream.read_to_end(&mut recv).await.unwrap();
        assert_eq!(recv_len, recv_data.len());
        assert_eq!(recv_data, recv);
        ConfluxEndpointResult::Circuit { tunnel, stream }
    }
    #[cfg(feature = "conflux")]
    async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
        endpoint: ConfluxTestEndpoint<I>,
    ) -> ConfluxEndpointResult {
        match endpoint {
            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
            ConfluxTestEndpoint::Client {
                tunnel,
                conflux_link_rx,
                send_data,
                recv_data,
            } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
        }
    }
    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
    // with 2 legs, opens a stream and sends 300 DATA cells on it.
    //
    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
    // which mock the behavior of an exit. The two relay tasks introduce
    // artificial delays before each SENDME sent to the client,
    // in order to trigger it to switch its sending leg predictably.
    //
    // The mock exit does not send any data on the stream.
    //
    // This test checks that the client sends SWITCH cells at the right time,
    // and that all the data it sent over the stream arrived at the exit.
    //
    // Note, however, that it doesn't check that the client sends the data in
    // the right order. For simplicity, the test concatenates the data received
    // on both legs, sorts it, and then compares it against the of the data sent
    // by the client (TODO: improve this)
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn multipath_client_to_exit() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            /// The number of data cells to send.
            const NUM_CELLS: usize = 300;
            /// 498 bytes per DATA cell.
            const CELL_SIZE: usize = 498;
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            // The stream data we're going to send over the conflux tunnel
            let mut send_data = (0..255_u8)
                .cycle()
                .take(NUM_CELLS * CELL_SIZE)
                .collect::<Vec<_>>();
            let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
            let mut tasks = vec![];
            // Channels used by the mock relays to notify each other
            // of various events.
            let (tx1, rx1) = mpsc::channel(1);
            let (tx2, rx2) = mpsc::channel(1);
            // The 9 RTT delays to insert before each of the 9 SENDMEs
            // the exit will end up sending.
            //
            // Note: the first delay is the init_rtt delay (measured during the conflux HS).
            let circ1_rtt_delays = [
                // Initially, circ1 has better RTT, so we will start on this leg.
                Some(Duration::from_millis(100)),
                // But then its RTT takes a turn for the worse,
                // triggering a switch after the first SENDME is processed
                // (this happens after sending 123 DATA cells).
                Some(Duration::from_millis(500)),
                Some(Duration::from_millis(700)),
                Some(Duration::from_millis(900)),
                Some(Duration::from_millis(1100)),
                Some(Duration::from_millis(1300)),
                Some(Duration::from_millis(1500)),
                Some(Duration::from_millis(1700)),
                Some(Duration::from_millis(1900)),
                Some(Duration::from_millis(2100)),
            ]
            .into_iter();
            let circ2_rtt_delays = [
                Some(Duration::from_millis(200)),
                Some(Duration::from_millis(400)),
                Some(Duration::from_millis(600)),
                Some(Duration::from_millis(800)),
                Some(Duration::from_millis(1000)),
                Some(Duration::from_millis(1200)),
                Some(Duration::from_millis(1400)),
                Some(Duration::from_millis(1600)),
                Some(Duration::from_millis(1800)),
                Some(Duration::from_millis(2000)),
            ]
            .into_iter();
            let expected_switches1 = vec![ExpectedSwitch {
                // We start on this leg, and receive a BEGIN cell,
                // followed by (4 * 31 - 1) = 123 DATA cells.
                // Then it becomes blocked on CC, then finally the reactor
                // realizes it has some SENDMEs to process, and
                // then as a result of the new RTT measurement, we switch to circ1,
                // and then finally we switch back here, and get another SWITCH
                // as the 126th cell.
                cells_so_far: 126,
                // Leg 2 switches back to this leg after the 249th cell
                // (just before sending the 250th one):
                // seqno = 125 carried over from leg 1 (see the seqno of the
                // SWITCH expected on leg 2 below), plus 1 SWITCH, plus
                // 4 * 31 = 124 DATA cells after which the RTT of the first leg
                // is deemed favorable again.
                //
                // 249 - 125 (last_seq_sent of leg 1) = 124
                seqno: 124,
            }];
            let expected_switches2 = vec![ExpectedSwitch {
                // The SWITCH is the first cell we received after the conflux HS
                // on this leg.
                cells_so_far: 1,
                // See explanation on the ExpectedSwitch from circ1 above.
                seqno: 125,
            }];
            let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
            // Drop the senders and close the channels,
            // we have nothing to send in this test.
            let (_, cells_rx1) = mpsc::channel(1);
            let (_, cells_rx2) = mpsc::channel(1);
            let relay1 = ConfluxExitState {
                runtime: Arc::clone(&relay_runtime),
                tunnel: Arc::clone(&tunnel),
                circ: circ1,
                rtt_delays: circ1_rtt_delays,
                stream_state: Arc::clone(&stream_state),
                expect_switch: expected_switches1,
                event_tx: tx1,
                event_rx: rx2,
                is_sending_leg: true,
                cells_rx: cells_rx1,
            };
            let relay2 = ConfluxExitState {
                runtime: Arc::clone(&relay_runtime),
                tunnel: Arc::clone(&tunnel),
                circ: circ2,
                rtt_delays: circ2_rtt_delays,
                stream_state: Arc::clone(&stream_state),
                expect_switch: expected_switches2,
                event_tx: tx2,
                event_rx: rx1,
                is_sending_leg: false,
                cells_rx: cells_rx2,
            };
            for mut mock_relay in [relay1, relay2] {
                let leg = mock_relay.circ.unique_id;
                // Do the conflux handshake
                //
                // We do this outside of run_conflux_endpoint,
                // toa void running both handshakes at concurrently
                // (this gives more predictable RTT delays:
                // if both handshake tasks run at once, they race
                // to advance the mock runtime's clock)
                good_exit_handshake(
                    &relay_runtime,
                    mock_relay.rtt_delays.next().flatten(),
                    &mut mock_relay.circ.chan_rx,
                    &mut mock_relay.circ.circ_tx,
                )
                .await;
                let relay = ConfluxTestEndpoint::Relay(mock_relay);
                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
            }
            tasks.push(rt.spawn_join(
                "client task".to_string(),
                run_conflux_endpoint(ConfluxTestEndpoint::Client {
                    tunnel,
                    conflux_link_rx,
                    send_data: send_data.clone(),
                    recv_data: vec![],
                }),
            ));
            let _sinks = futures::future::join_all(tasks).await;
            let mut stream_state = stream_state.lock().unwrap();
            assert!(stream_state.begin_recvd);
            stream_state.data_recvd.sort();
            send_data.sort();
            assert_eq!(stream_state.data_recvd, send_data);
        });
    }
    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
    // with 2 legs, opens a stream and reads from the stream until the stream is closed.
    //
    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
    // which mock the behavior of an exit. The two tasks send DATA and SWITCH
    // cells on the two circuit "legs" such that some cells arrive out of order.
    // This forces the client to buffer some cells, and then reorder them when
    // the missing cells finally arrive.
    //
    // The client does not send any data on the stream.
    #[cfg(feature = "conflux")]
    async fn run_multipath_exit_to_client_test(
        rt: MockRuntime,
        tunnel: TestTunnelCtx,
        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
        send_data: Vec<u8>,
        recv_data: Vec<u8>,
    ) -> Arc<Mutex<ConfluxStreamState>> {
        let TestTunnelCtx {
            tunnel,
            circs,
            conflux_link_rx,
        } = tunnel;
        let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
        let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
        let mut tasks = vec![];
        let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
        let (cells_tx1, cells_rx1) = mpsc::channel(1);
        let (cells_tx2, cells_rx2) = mpsc::channel(1);
        let dispatcher = CellDispatcher {
            leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
                .into_iter()
                .collect(),
            cells_to_send,
        };
        // Channels used by the mock relays to notify each other
        // of various events.
        let (tx1, rx1) = mpsc::channel(1);
        let (tx2, rx2) = mpsc::channel(1);
        let relay1 = ConfluxExitState {
            runtime: Arc::clone(&relay_runtime),
            tunnel: Arc::clone(&tunnel),
            circ: circ1,
            rtt_delays: [].into_iter(),
            stream_state: Arc::clone(&stream_state),
            // Expect no SWITCH cells from the client
            expect_switch: vec![],
            event_tx: tx1,
            event_rx: rx2,
            is_sending_leg: false,
            cells_rx: cells_rx1,
        };
        let relay2 = ConfluxExitState {
            runtime: Arc::clone(&relay_runtime),
            tunnel: Arc::clone(&tunnel),
            circ: circ2,
            rtt_delays: [].into_iter(),
            stream_state: Arc::clone(&stream_state),
            // Expect no SWITCH cells from the client
            expect_switch: vec![],
            event_tx: tx2,
            event_rx: rx1,
            is_sending_leg: true,
            cells_rx: cells_rx2,
        };
        // Run the cell dispatcher, which tells each exit leg task
        // what cells to write.
        //
        // This enables us to write out-of-order cells deterministically.
        rt.spawn(dispatcher.run()).unwrap();
        for mut mock_relay in [relay1, relay2] {
            let leg = mock_relay.circ.unique_id;
            good_exit_handshake(
                &relay_runtime,
                mock_relay.rtt_delays.next().flatten(),
                &mut mock_relay.circ.chan_rx,
                &mut mock_relay.circ.circ_tx,
            )
            .await;
            let relay = ConfluxTestEndpoint::Relay(mock_relay);
            tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
        }
        tasks.push(rt.spawn_join(
            "client task".to_string(),
            run_conflux_endpoint(ConfluxTestEndpoint::Client {
                tunnel,
                conflux_link_rx,
                send_data: send_data.clone(),
                recv_data,
            }),
        ));
        // Wait for all the tasks to complete
        let _sinks = futures::future::join_all(tasks).await;
        stream_state
    }
    #[traced_test]
    #[test]
    #[cfg(feature = "conflux")]
    fn multipath_exit_to_client() {
        // The data we expect the client to read from the stream
        const TO_SEND: &[u8] =
            b"But something about Buster Friendly irritated John Isidore, one specific thing";
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            // The indices of the tunnel legs.
            const CIRC1: usize = 0;
            const CIRC2: usize = 1;
            // The client receives the following cells, in the order indicated
            // by the t0-t8 "timestamps" (where C = CONNECTED, D = DATA, E = END,
            // S = SWITCH):
            //
            //  Leg 1 (CIRC1):   -----------D--------------------- D -- D -- C
            //                              |                      |    |    | \
            //                              |                      |    |    |  v
            //                              |                      |    |    | client
            //                              |                      |    |    |  ^
            //                              |                      |    |    |/
            //  Leg 2 (CIRC2): E - D -- D --\--- D* -- S (seqno=4)-/----/----/
            //                 |   |    |   |    |       |         |    |    |
            //                 |   |    |   |    |       |         |    |    |
            //                 |   |    |   |    |       |         |    |    |
            //  Time:          t8  t7   t6  t5   t4      t3        t2   t1  t0
            //
            //
            //  The cells marked with * are out of order.
            //
            // Note: t0 is the time when the client receives the first cell,
            // and t8 is the time when it receives the last one.
            // In other words, this test simulates a mock exit that "sent" the cells
            // in the order t0, t1, t2, t5, t4, t6, t7, t8
            let simple_switch = vec![
                (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
                (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
                // Switch to sending on the second leg
                (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
                // An out of order cell!
                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
                // The missing cell (as indicated by seqno = 4 from the switch cell above)
                // is finally arriving on leg1
                (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
                (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
                (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
            ];
            //  Leg 1 (CIRC1): ---------------- D  ------D* --- S(seqno = 3) -- D - D ---------------------------- C
            //                                  |        |          |           |   |                              | \
            //                                  |        |          |           |   |                              |  v
            //                                  |        |          |           |   |                              |  client
            //                                  |        |          |           |   |                              |  ^
            //                                  |        |          |           |   |                              | /
            //  Leg 2 (CIRC2): E - S(seqno = 2) \ -- D --\----------\---------- \ --\--- D* -- D* - S(seqno = 3) --/
            //                 |        |       |    |   |          |           |   |    |     |         |         |
            //                 |        |       |    |   |          |           |   |    |     |         |         |
            //                 |        |       |    |   |          |           |   |    |     |         |         |
            //  Time:          t11      t10     t9   t8  t7         t6          t5  t4   t3    t2        t1        t0
            //  =====================================================================================================
            //  Leg 1 LSR:      8        8      8 7  7   7          6           3   2    1      1        1         1
            //  Leg 2 LSR:      9        8      6 6  6   5          5           5   5    5      4        3         0
            //  LSD:            9        8      8 7  6   5          5       5   3   2    1      1        1         1
            //                                    ^ OOO cell is delivered   ^ the OOO cells are delivered to the stream
            //
            //
            //  (LSR = last seq received, LSD = last seq delivered, both from the client's POV)
            //
            //
            // The client keeps track of the `last_seqno_received` (LSR) on each leg.
            // This is incremented for each cell that counts towards the seqnos (BEGIN, DATA, etc.)
            // that is received on the leg. The client also tracks the `last_seqno_delivered` (LSD),
            // which is the seqno of the last cell delivered to a stream
            // (this is global for the whole tunnel, whereas the LSR is different for each leg).
            //
            // When switching to leg `N`, the seqno in the switch is, from the POV of the sender,
            // the delta between the absolute seqno (i.e. the total number of cells[^1] sent)
            // and the value of this absolute seqno when leg `N` was last used.
            //
            // At the time of the first SWITCH from `t1`, the exit "sent" 3 cells:
            // a `CONNECTED` cell, which was received by the client at `t0`, and 2 `DATA` cells that
            // haven't been received yet. At this point, the exit decides to switch to leg 2,
            // on which it hasn't sent any cells yet, so the seqno is set to `3 - 0 = 3`.
            //
            // At `t6` when the exit sends the second switch (leg 2 -> leg 1), has "sent" 6 cells
            // (`C` plus the data cells that are received at `t1 - 5` and `t8`.
            // The seqno is `6 - 3 = 3`, because when it last sent on leg 1,
            // the absolute seqno was `3`.
            //
            // At `t10`, the absolute seqno is 8 (8 qualifying cells have been sent so far).
            // When the exit last sent on leg 2 (which we are switching to),
            // the absolute seqno was `6`, so the `SWITCH` cell will have `8 - 6 = 2` as the seqno.
            //
            // [^1]: only counting the cells that count towards sequence numbers
            let multiple_switches = vec![
                // Immediately switch to sending on the second leg
                // (indicating that we've already sent 3 cells (including the CONNECTED)
                (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
                // Two out of order cells!
                (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
                // The missing cells finally arrive on the first leg
                (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
                (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
                // Switch back to the first leg
                (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
                // OOO cell
                (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
                // Missing cell is received
                (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
                // The remaining cells are in-order
                (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
                // Switch right after we've sent all the data we had to send
                (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
            ];
            // TODO: give these tests the ability to control when END cells are sent
            // (currently we have ensure the is_sending_leg is set to true
            // on the leg that ends up sending the last data cell).
            //
            // TODO: test the edge cases
            let tests = [simple_switch, multiple_switches];
            for cells_to_send in tests {
                let tunnel = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
                assert_eq!(tunnel.circs.len(), 2);
                let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
                let cells_to_send = cells_to_send
                    .into_iter()
                    .map(|(i, cell)| (circ_ids[i], cell))
                    .collect();
                // The client won't be sending any DATA cells on this stream
                let send_data = vec![];
                let stream_state = run_multipath_exit_to_client_test(
                    rt.clone(),
                    tunnel,
                    cells_to_send,
                    send_data.clone(),
                    TO_SEND.into(),
                )
                .await;
                let stream_state = stream_state.lock().unwrap();
                assert!(stream_state.begin_recvd);
                // We don't expect the client to have sent anything
                assert!(stream_state.data_recvd.is_empty());
            }
        });
    }
    #[traced_test]
    #[test]
    #[cfg(all(feature = "conflux", feature = "hs-service"))]
    fn conflux_incoming_stream() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            use std::error::Error as _;
            const EXPECTED_HOP: u8 = 1;
            let TestTunnelCtx {
                tunnel,
                circs,
                conflux_link_rx,
            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
            let link = await_link_payload(&mut circ1.chan_rx).await;
            for circ in [&mut circ1, &mut circ2] {
                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
                circ.circ_tx
                    .send(rmsg_to_ccmsg(None, linked))
                    .await
                    .unwrap();
            }
            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
            // TODO(#2002): we don't currently support conflux for onion services
            let err = tunnel
                .allow_stream_requests(
                    &[tor_cell::relaycell::RelayCmd::BEGIN],
                    (tunnel.circ.unique_id(), EXPECTED_HOP.into()).into(),
                    AllowAllStreamsFilter,
                )
                .await
                // IncomingStream doesn't impl Debug, so we need to map to a different type
                .map(|_| ())
                .unwrap_err();
            let err_src = err.source().unwrap().to_string();
            assert!(
                err_src.contains("Cannot allow stream requests on a multi-path tunnel"),
                "{err_src}"
            );
        });
    }
    #[test]
    fn client_circ_chan_msg() {
        use tor_cell::chancell::msg::{self, AnyChanMsg};
        fn good(m: AnyChanMsg) {
            assert!(ClientCircChanMsg::try_from(m).is_ok());
        }
        fn bad(m: AnyChanMsg) {
            assert!(ClientCircChanMsg::try_from(m).is_err());
        }
        good(msg::Destroy::new(2.into()).into());
        bad(msg::CreatedFast::new(&b"guaranteed in this world"[..]).into());
        bad(msg::Created2::new(&b"and the next"[..]).into());
        good(msg::Relay::new(&b"guaranteed guaranteed"[..]).into());
        bad(msg::AnyChanMsg::RelayEarly(
            msg::Relay::new(&b"for the world and its mother"[..]).into(),
        ));
        bad(msg::Versions::new([1, 2, 3]).unwrap().into());
    }
}