1
//! Conflux-related functionality
2

            
3
// TODO: replace Itertools::exactly_one() with a stdlib equivalent when there is one.
4
//
5
// See issue #48919 <https://github.com/rust-lang/rust/issues/48919>
6
#![allow(unstable_name_collisions)]
7

            
8
#[cfg(feature = "conflux")]
9
pub(crate) mod msghandler;
10

            
11
use std::pin::Pin;
12
use std::sync::atomic::{self, AtomicU64};
13
use std::sync::{Arc, Mutex};
14

            
15
use futures::{FutureExt as _, StreamExt, select_biased};
16
use itertools::Itertools;
17
use itertools::structs::ExactlyOneError;
18
use smallvec::{SmallVec, smallvec};
19
use tor_rtcompat::{SleepProvider as _, SleepProviderExt as _};
20
use tracing::{info, instrument, trace, warn};
21

            
22
use tor_cell::relaycell::AnyRelayMsgOuter;
23
use tor_error::{Bug, bad_api_usage, internal};
24
use tor_linkspec::HasRelayIds as _;
25

            
26
use crate::circuit::UniqId;
27
use crate::circuit::circhop::SendRelayCell;
28
use crate::client::circuit::TunnelMutableState;
29
#[cfg(feature = "circ-padding")]
30
use crate::client::circuit::padding::PaddingEvent;
31
use crate::client::circuit::path::HopDetail;
32
use crate::conflux::cmd_counts_towards_seqno;
33
use crate::conflux::msghandler::{ConfluxStatus, RemoveLegReason};
34
use crate::congestion::params::CongestionWindowParams;
35
use crate::crypto::cell::HopNum;
36
use crate::streammap;
37
use crate::tunnel::TunnelId;
38
use crate::util::err::ReactorError;
39
use crate::util::poll_all::PollAll;
40
use crate::util::tunnel_activity::TunnelActivity;
41

            
42
use super::circuit::CircHop;
43
use super::{Circuit, CircuitEvent};
44

            
45
#[cfg(feature = "conflux")]
46
use {
47
    crate::conflux::msghandler::ConfluxMsgHandler,
48
    msghandler::ClientConfluxMsgHandler,
49
    tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
50
    tor_cell::relaycell::msg::{ConfluxLink, ConfluxSwitch},
51
};
52

            
53
/// The maximum number of conflux legs to store in the conflux set SmallVec.
54
///
55
/// Attempting to store more legs will cause the SmallVec to spill to the heap.
56
///
57
/// Note: this value was picked arbitrarily and may not be suitable.
58
const MAX_CONFLUX_LEGS: usize = 16;
59

            
60
/// The number of futures we add to the per-circuit [`PollAll`] future in
61
/// [`ConfluxSet::next_circ_event`].
62
///
63
/// Used for the SmallVec size estimate;
64
const NUM_CIRC_FUTURES: usize = 2;
65

            
66
/// The expected number of circuit events to be returned from
67
/// [`ConfluxSet::next_circ_event`]
68
const CIRC_EVENT_COUNT: usize = MAX_CONFLUX_LEGS * NUM_CIRC_FUTURES;
69

            
70
/// A set with one or more circuits.
71
///
72
/// ### Conflux set life cycle
73
///
74
/// Conflux sets are created by the reactor using [`ConfluxSet::new`].
75
///
76
/// Every `ConfluxSet` starts out as a single-path set consisting of a single 0-length circuit.
77
///
78
/// After constructing a `ConfluxSet`, the reactor will proceed to extend its (only) circuit.
79
/// At this point, the `ConfluxSet` will be a single-path set with a single n-length circuit.
80
///
81
/// The reactor can then turn the `ConfluxSet` into a multi-path set
82
/// (a multi-path set is a conflux set that contains more than 1 circuit).
83
/// This is done using [`ConfluxSet::add_legs`], in response to a `CtrlMsg` sent
84
/// by the reactor user (also referred to as the "conflux handshake initiator").
85
/// After that, the conflux set is said to be a multi-path set with multiple N-length circuits.
86
///
87
/// Circuits can be removed from the set using [`ConfluxSet::remove`].
88
///
89
/// The lifetime of a `ConfluxSet` is tied to the lifetime of the reactor.
90
/// When the reactor is dropped, its underlying `ConfluxSet` is dropped too.
91
/// This can happen on an explicit shutdown request, or if a fatal error occurs.
92
///
93
/// Conversely, the `ConfluxSet` can also trigger a reactor shutdown.
94
/// For example, if after being instructed to remove a circuit from the set
95
/// using [`ConfluxSet::remove`], the set is completely depleted,
96
/// the `ConfluxSet` will return a [`ReactorError::Shutdown`] error,
97
/// which will cause the reactor to shut down.
98
pub(super) struct ConfluxSet {
99
    /// The unique identifier of the tunnel this conflux set belongs to.
100
    ///
101
    /// Used for setting the internal [`TunnelId`] of [`Circuit`]s
102
    /// that gets used for logging purposes.
103
    tunnel_id: TunnelId,
104
    /// The circuits in this conflux set.
105
    legs: SmallVec<[Circuit; MAX_CONFLUX_LEGS]>,
106
    /// Tunnel state, shared with `ClientCirc`.
107
    ///
108
    /// Contains the [`MutableState`](super::MutableState) of each circuit in the set.
109
    mutable: Arc<TunnelMutableState>,
110
    /// The unique identifier of the primary leg
111
    primary_id: UniqId,
112
    /// The join point of the set, if this is a multi-path set.
113
    ///
114
    /// Initially the conflux set starts out as a single-path set with no join point.
115
    /// When it is converted to a multipath set using [`add_legs`](Self::add_legs),
116
    /// the join point is initialized to the last hop in the tunnel.
117
    //
118
    // TODO(#2017): for simplicity, we currently we force all legs to have the same length,
119
    // to ensure the HopNum of the join point is the same for all of them.
120
    //
121
    // In the future we might want to relax this restriction.
122
    join_point: Option<JoinPoint>,
123
    /// The nonce associated with the circuits from this set.
124
    #[cfg(feature = "conflux")]
125
    nonce: V1Nonce,
126
    /// The desired UX
127
    #[cfg(feature = "conflux")]
128
    desired_ux: V1DesiredUx,
129
    /// The absolute sequence number of the last cell delivered to a stream.
130
    ///
131
    /// A clone of this is shared with each [`ConfluxMsgHandler`] created.
132
    ///
133
    /// When a message is received on a circuit leg, the `ConfluxMsgHandler`
134
    /// of the leg compares the (leg-local) sequence number of the message
135
    /// with this sequence number to determine whether the message is in-order.
136
    ///
137
    /// If the message is in-order, the `ConfluxMsgHandler` instructs the circuit
138
    /// to deliver it to its corresponding stream.
139
    ///
140
    /// If the message is out-of-order, the `ConfluxMsgHandler` instructs the circuit
141
    /// to instruct the reactor to buffer the message.
142
    last_seq_delivered: Arc<AtomicU64>,
143
    /// Whether we have selected our initial primary leg,
144
    /// if this is a multipath conflux set.
145
    selected_init_primary: bool,
146
}
147

            
148
/// The conflux join point.
149
#[derive(Clone, derive_more::Debug)]
150
struct JoinPoint {
151
    /// The hop number.
152
    hop: HopNum,
153
    /// The [`HopDetail`] of the hop.
154
    detail: HopDetail,
155
    /// The stream map of the joint point, shared with each circuit leg.
156
    #[debug(skip)]
157
    streams: Arc<Mutex<streammap::StreamMap>>,
158
}
159

            
160
impl ConfluxSet {
161
    /// Create a new conflux set, consisting of a single leg.
162
    ///
163
    /// Returns the newly created set and a reference to its [`TunnelMutableState`].
164
376
    pub(super) fn new(
165
376
        tunnel_id: TunnelId,
166
376
        circuit_leg: Circuit,
167
376
    ) -> (Self, Arc<TunnelMutableState>) {
168
376
        let primary_id = circuit_leg.unique_id();
169
376
        let circ_mutable = Arc::clone(circuit_leg.mutable());
170
376
        let legs = smallvec![circuit_leg];
171
        // Note: the join point is only set for multi-path tunnels
172
376
        let join_point = None;
173

            
174
        // TODO(#2035): read this from the consensus/config.
175
        #[cfg(feature = "conflux")]
176
376
        let desired_ux = V1DesiredUx::NO_OPINION;
177

            
178
376
        let mutable = Arc::new(TunnelMutableState::default());
179
376
        mutable.insert(primary_id, circ_mutable);
180

            
181
376
        let set = Self {
182
376
            tunnel_id,
183
376
            legs,
184
376
            primary_id,
185
376
            join_point,
186
376
            mutable: mutable.clone(),
187
376
            #[cfg(feature = "conflux")]
188
376
            nonce: V1Nonce::new(&mut rand::rng()),
189
376
            #[cfg(feature = "conflux")]
190
376
            desired_ux,
191
376
            last_seq_delivered: Arc::new(AtomicU64::new(0)),
192
376
            selected_init_primary: false,
193
376
        };
194

            
195
376
        (set, mutable)
196
376
    }
197

            
198
    /// Remove and return the only leg of this conflux set.
199
    ///
200
    /// Returns an error if there is more than one leg in the set,
201
    /// or if called before any circuit legs are available.
202
    ///
203
    /// Calling this function will empty the [`ConfluxSet`].
204
64
    pub(super) fn take_single_leg(&mut self) -> Result<Circuit, Bug> {
205
64
        let circ = self
206
64
            .legs
207
64
            .iter()
208
64
            .exactly_one()
209
64
            .map_err(NotSingleLegError::from)?;
210
60
        let circ_id = circ.unique_id();
211

            
212
60
        debug_assert!(circ_id == self.primary_id);
213

            
214
60
        self.remove_unchecked(circ_id)
215
64
    }
216

            
217
    /// Return a reference to the only leg of this conflux set,
218
    /// along with the leg's ID.
219
    ///
220
    /// Returns an error if there is more than one leg in the set,
221
    /// or if called before any circuit legs are available.
222
144
    pub(super) fn single_leg(&self) -> Result<&Circuit, NotSingleLegError> {
223
144
        Ok(self.legs.iter().exactly_one()?)
224
144
    }
225

            
226
    /// Return a mutable reference to the only leg of this conflux set,
227
    /// along with the leg's ID.
228
    ///
229
    /// Returns an error if there is more than one leg in the set,
230
    /// or if called before any circuit legs are available.
231
7404
    pub(super) fn single_leg_mut(&mut self) -> Result<&mut Circuit, NotSingleLegError> {
232
7404
        Ok(self.legs.iter_mut().exactly_one()?)
233
7404
    }
234

            
235
    /// Return the primary leg of this conflux set.
236
    ///
237
    /// Returns an error if called before any circuit legs are available.
238
32
    pub(super) fn primary_leg_mut(&mut self) -> Result<&mut Circuit, Bug> {
239
        #[cfg(not(feature = "conflux"))]
240
        if self.legs.len() > 1 {
241
            return Err(internal!(
242
                "got multipath tunnel, but conflux feature is disabled?!"
243
            ));
244
        }
245

            
246
32
        if self.legs.is_empty() {
247
            Err(bad_api_usage!(
248
                "tried to get circuit leg before creating it?!"
249
            ))
250
        } else {
251
32
            let circ = self
252
32
                .leg_mut(self.primary_id)
253
32
                .ok_or_else(|| internal!("conflux set is empty?!"))?;
254

            
255
32
            Ok(circ)
256
        }
257
32
    }
258

            
259
    /// Return a reference to the leg of this conflux set with the given id.
260
1506
    pub(super) fn leg(&self, leg_id: UniqId) -> Option<&Circuit> {
261
2775
        self.legs.iter().find(|circ| circ.unique_id() == leg_id)
262
1506
    }
263

            
264
    /// Return a mutable reference to the leg of this conflux set with the given id.
265
5294
    pub(super) fn leg_mut(&mut self, leg_id: UniqId) -> Option<&mut Circuit> {
266
8617
        self.legs.iter_mut().find(|circ| circ.unique_id() == leg_id)
267
5294
    }
268

            
269
    /// Return the number of legs in this conflux set.
270
    pub(super) fn len(&self) -> usize {
271
        self.legs.len()
272
    }
273

            
274
    /// Return whether this conflux set is empty.
275
6348
    pub(super) fn is_empty(&self) -> bool {
276
6348
        self.legs.len() == 0
277
6348
    }
278

            
279
    /// Remove the specified leg from this conflux set.
280
    ///
281
    /// Returns an error if the given leg doesn't exist in the set.
282
    ///
283
    /// Returns an error instructing the reactor to perform a clean shutdown
284
    /// ([`ReactorError::Shutdown`]), tearing down the entire [`ConfluxSet`], if
285
    ///
286
    ///   * the set is depleted (empty) after removing the specified leg
287
    ///   * `leg` is currently the sending (primary) leg of this set
288
    ///   * the closed leg had the highest non-zero last_seq_recv/sent
289
    ///   * the closed leg had some in-progress data (inflight > cc_sendme_inc)
290
    ///
291
    /// We do not yet support resumption. See [2.4.3. Closing circuits] in prop329.
292
    ///
293
    /// [2.4.3. Closing circuits]: https://spec.torproject.org/proposals/329-traffic-splitting.html#243-closing-circuits
294
    #[instrument(level = "trace", skip_all)]
295
40
    pub(super) fn remove(&mut self, leg: UniqId) -> Result<Circuit, ReactorError> {
296
40
        let circ = self.remove_unchecked(leg)?;
297

            
298
40
        tracing::trace!(
299
            circ_id = %circ.unique_id(),
300
            "Circuit removed from conflux set"
301
        );
302

            
303
40
        self.mutable.remove(circ.unique_id());
304

            
305
40
        if self.legs.is_empty() {
306
            // TODO: log the tunnel ID
307
12
            tracing::debug!("Conflux set is now empty, tunnel reactor shutting down");
308

            
309
            // The last circuit in the set has just died, so the reactor should exit.
310
12
            return Err(ReactorError::Shutdown);
311
28
        }
312

            
313
28
        if leg == self.primary_id {
314
            // We have just removed our sending leg,
315
            // so it's time to close the entire conflux set.
316
12
            return Err(ReactorError::Shutdown);
317
16
        }
318

            
319
        cfg_if::cfg_if! {
320
            if #[cfg(feature = "conflux")] {
321
16
                self.remove_conflux(circ)
322
            } else {
323
                // Conflux is disabled, so we can't possibly continue running if the only
324
                // leg in the tunnel is gone.
325
                //
326
                // Technically this should be unreachable (because of the is_empty()
327
                // check above)
328
                Err(internal!("Multiple legs in single-path tunnel?!").into())
329
            }
330
        }
331
40
    }
332

            
333
    /// Handle the removal of a circuit,
334
    /// returning an error if the reactor needs to shut down.
335
    #[cfg(feature = "conflux")]
336
16
    fn remove_conflux(&self, circ: Circuit) -> Result<Circuit, ReactorError> {
337
16
        let Some(status) = circ.conflux_status() else {
338
            return Err(internal!("Found non-conflux circuit in conflux set?!").into());
339
        };
340

            
341
        // TODO(conflux): should the circmgr be notified about the leg removal?
342
        //
343
        // "For circuits that are unlinked, the origin SHOULD immediately relaunch a new leg when it
344
        // is closed, subject to the limits in [SIDE_CHANNELS]."
345

            
346
        // If we've reached this point and the conflux set is non-empty,
347
        // it means it's a multi-path set.
348
        //
349
        // Time to check if we need to tear down the entire set.
350
16
        match status {
351
            ConfluxStatus::Unlinked => {
352
                // This circuit hasn't yet begun the conflux handshake,
353
                // so we can safely remove it from the set
354
                Ok(circ)
355
            }
356
            ConfluxStatus::Pending | ConfluxStatus::Linked => {
357
16
                let (circ_last_seq_recv, circ_last_seq_sent) =
358
16
                    (|| Ok::<_, ReactorError>((circ.last_seq_recv()?, circ.last_seq_sent()?)))()?;
359

            
360
                // If the closed leg had the highest non-zero last_seq_recv/sent, close the set
361
16
                if let Some(max_last_seq_recv) = self.max_last_seq_recv() {
362
16
                    if circ_last_seq_recv > max_last_seq_recv {
363
                        return Err(ReactorError::Shutdown);
364
16
                    }
365
                }
366

            
367
16
                if let Some(max_last_seq_sent) = self.max_last_seq_sent() {
368
16
                    if circ_last_seq_sent > max_last_seq_sent {
369
                        return Err(ReactorError::Shutdown);
370
16
                    }
371
                }
372

            
373
16
                let hop = self.join_point_hop(&circ)?;
374

            
375
24
                let (inflight, cwnd) = (|| {
376
16
                    let ccontrol = hop.ccontrol();
377
16
                    let inflight = ccontrol.inflight()?;
378
16
                    let cwnd = ccontrol.cwnd()?;
379

            
380
16
                    Some((inflight, cwnd))
381
                })()
382
16
                .ok_or_else(|| {
383
                    internal!("Congestion control algorithm doesn't track inflight cells or cwnd?!")
384
                })?;
385

            
386
                // If data is in progress on the leg (inflight > cc_sendme_inc),
387
                // then all legs must be closed
388
16
                if inflight >= cwnd.params().sendme_inc() {
389
                    return Err(ReactorError::Shutdown);
390
16
                }
391

            
392
16
                Ok(circ)
393
            }
394
        }
395
16
    }
396

            
397
    /// Return the maximum relative last_seq_recv across all circuits.
398
    #[cfg(feature = "conflux")]
399
16
    fn max_last_seq_recv(&self) -> Option<u64> {
400
16
        self.legs
401
16
            .iter()
402
24
            .filter_map(|leg| leg.last_seq_recv().ok())
403
16
            .max()
404
16
    }
405

            
406
    /// Return the maximum relative last_seq_sent across all circuits.
407
    #[cfg(feature = "conflux")]
408
16
    fn max_last_seq_sent(&self) -> Option<u64> {
409
16
        self.legs
410
16
            .iter()
411
24
            .filter_map(|leg| leg.last_seq_sent().ok())
412
16
            .max()
413
16
    }
414

            
415
    /// Get the [`CircHop`] of the join point on the specified `circ`,
416
    /// returning an error if this is a single path conflux set.
417
2468
    fn join_point_hop<'c>(&self, circ: &'c Circuit) -> Result<&'c CircHop, Bug> {
418
2468
        let Some(join_point) = self.join_point.as_ref().map(|p| p.hop) else {
419
            return Err(internal!("No join point on conflux tunnel?!"));
420
        };
421

            
422
2468
        circ.hop(join_point)
423
2468
            .ok_or_else(|| internal!("Conflux join point disappeared?!"))
424
2468
    }
425

            
426
    /// Return an iterator of all circuits in the conflux set.
427
120
    fn circuits(&self) -> impl Iterator<Item = &Circuit> {
428
120
        self.legs.iter()
429
120
    }
430

            
431
    /// Return the most active [`TunnelActivity`] for any leg of this `ConfluxSet`.
432
    pub(super) fn tunnel_activity(&self) -> TunnelActivity {
433
        self.circuits()
434
            .map(|c| c.hops.tunnel_activity())
435
            .max()
436
            .unwrap_or_else(TunnelActivity::never_used)
437
    }
438

            
439
    /// Add legs to the this conflux set.
440
    ///
441
    /// Returns an error if any of the legs is invalid.
442
    ///
443
    /// A leg is considered valid if
444
    ///
445
    ///   * the circuit has the same length as all the other circuits in the set
446
    ///   * its last hop is equal to the designated join point
447
    ///   * the circuit has no streams attached to any of its hops
448
    ///   * the circuit is not already part of a conflux set
449
    ///
450
    /// Note: the circuits will not begin linking until
451
    /// [`link_circuits`](Self::link_circuits) is called.
452
    ///
453
    /// IMPORTANT: this function does not prevent the construction of conflux sets
454
    /// where the circuit legs share guard or middle relays. It is the responsibility
455
    /// of the caller to enforce the following invariant from prop354:
456
    ///
457
    /// "If building a conflux leg: Reject any circuits that have the same Guard as the other conflux
458
    /// "leg(s) in the current conflux set, EXCEPT when one of the primary Guards is also the chosen
459
    /// "Exit of this conflux set (in which case, re-use the non-Exit Guard)."
460
    ///
461
    /// This is because at this level we don't actually know which relays are the guards,
462
    /// so we can't know if the join point happens to be one of the Guard + Exit relays.
463
    #[cfg(feature = "conflux")]
464
60
    pub(super) fn add_legs(
465
60
        &mut self,
466
60
        legs: Vec<Circuit>,
467
60
        runtime: &tor_rtcompat::DynTimeProvider,
468
60
    ) -> Result<(), Bug> {
469
60
        if legs.is_empty() {
470
            return Err(bad_api_usage!("asked to add empty leg list to conflux set"));
471
60
        }
472

            
473
60
        let join_point = match self.join_point.take() {
474
            Some(p) => {
475
                // Preserve the existing join point, if there is one.
476
                p
477
            }
478
            None => {
479
90
                let (hop, detail, streams) = (|| {
480
60
                    let first_leg = self.circuits().next()?;
481
60
                    let first_leg_path = first_leg.path();
482
60
                    let all_hops = first_leg_path.all_hops();
483
60
                    let hop_num = first_leg.last_hop_num()?;
484
60
                    let detail = all_hops.last()?;
485
60
                    let hop = first_leg.hop(hop_num)?;
486
60
                    let streams = Arc::clone(hop.stream_map());
487
60
                    Some((hop_num, detail.clone(), streams))
488
                })()
489
60
                .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
490

            
491
60
                JoinPoint {
492
60
                    hop,
493
60
                    detail,
494
60
                    streams,
495
60
                }
496
            }
497
        };
498

            
499
        // Check two HopDetails for equality.
500
        //
501
        // Returns an error if one of the hops is virtual.
502
88
        let hops_eq = |h1: &HopDetail, h2: &HopDetail| {
503
56
            match (h1, h2) {
504
56
                (HopDetail::Relay(t1), HopDetail::Relay(t2)) => Ok(t1.same_relay_ids(t2)),
505
                #[cfg(feature = "hs-common")]
506
                (HopDetail::Virtual, HopDetail::Virtual) => {
507
                    // TODO(#2016): support onion service conflux
508
                    Err(internal!("onion service conflux not supported"))
509
                }
510
                _ => Ok(false),
511
            }
512
56
        };
513

            
514
        // A leg is considered valid if
515
        //
516
        //   * the circuit has the expected length
517
        //     (the length of the first circuit we added to the set)
518
        //   * its last hop is equal to the designated join point
519
        //     (the last hop of the first circuit we added)
520
        //   * the circuit has no streams attached to any of its hops
521
        //   * the circuit is not already part of a conflux tunnel
522
        //
523
        // Returns an error if any hops are virtual.
524
90
        let leg_is_valid = |leg: &Circuit| -> Result<bool, Bug> {
525
            use crate::ccparams::Algorithm;
526

            
527
60
            let path = leg.path();
528
60
            let Some(last_hop) = path.all_hops().last() else {
529
                // A circuit with no hops is invalid
530
                return Ok(false);
531
            };
532

            
533
            // TODO: this sort of duplicates the check above.
534
            // The difference is that above we read the hop detail
535
            // information from the circuit Path, whereas here we get
536
            // the actual last CircHop of the circuit.
537
60
            let Some(last_hop_num) = leg.last_hop_num() else {
538
                // A circuit with no hops is invalid
539
                return Ok(false);
540
            };
541

            
542
60
            let circhop = leg
543
60
                .hop(last_hop_num)
544
60
                .ok_or_else(|| internal!("hop disappeared?!"))?;
545

            
546
            // Ensure we negotiated a suitable cc algorithm
547
60
            let is_cc_suitable = match circhop.ccontrol().algorithm() {
548
4
                Algorithm::FixedWindow(_) => false,
549
56
                Algorithm::Vegas(_) => true,
550
            };
551

            
552
60
            if !is_cc_suitable {
553
4
                return Ok(false);
554
56
            }
555

            
556
56
            Ok(last_hop_num == join_point.hop
557
56
                && hops_eq(last_hop, &join_point.detail)?
558
52
                && !leg.has_streams()
559
52
                && leg.conflux_status().is_none())
560
60
        };
561

            
562
112
        for leg in &legs {
563
60
            if !leg_is_valid(leg)? {
564
8
                return Err(bad_api_usage!("one more more conflux circuits are invalid"));
565
52
            }
566
        }
567

            
568
        // Select a join point, or put the existing one back into self.
569
52
        self.join_point = Some(join_point.clone());
570

            
571
        // The legs are valid, so add them to the set.
572
104
        for circ in legs {
573
52
            let mutable = Arc::clone(circ.mutable());
574
52
            let unique_id = circ.unique_id();
575
52
            self.legs.push(circ);
576
52
            // Merge the mutable state of the circuit into our tunnel state.
577
52
            self.mutable.insert(unique_id, mutable);
578
52
        }
579

            
580
52
        let cwnd_params = self.cwnd_params()?;
581
104
        for circ in self.legs.iter_mut() {
582
            // The circuits that have a None status don't know they're part of
583
            // a multi-path tunnel yet. They need to be initialized with a
584
            // conflux message handler, and have their join point fixed up
585
            // to share a stream map with the join point on all the other circuits.
586
104
            if circ.conflux_status().is_none() {
587
104
                let handler = Box::new(ClientConfluxMsgHandler::new(
588
104
                    join_point.hop,
589
104
                    self.nonce,
590
104
                    Arc::clone(&self.last_seq_delivered),
591
104
                    cwnd_params,
592
104
                    runtime.clone(),
593
                ));
594
104
                let conflux_handler =
595
104
                    ConfluxMsgHandler::new(handler, Arc::clone(&self.last_seq_delivered));
596

            
597
104
                circ.add_to_conflux_tunnel(self.tunnel_id, conflux_handler);
598

            
599
                // Ensure the stream map of the last hop is shared by all the legs
600
104
                let last_hop = circ
601
104
                    .hop_mut(join_point.hop)
602
104
                    .ok_or_else(|| bad_api_usage!("asked to join circuit with no hops"))?;
603
104
                last_hop.set_stream_map(Arc::clone(&join_point.streams))?;
604
            }
605
        }
606

            
607
52
        Ok(())
608
60
    }
609

            
610
    /// Get the [`CongestionWindowParams`] of the join point
611
    /// on the first leg.
612
    ///
613
    /// Returns an error if the congestion control algorithm
614
    /// doesn't have a congestion control window object,
615
    /// or if the conflux set is empty, or the joint point hop
616
    /// does not exist.
617
    ///
618
    // TODO: this function is a bit of a hack. In reality, we only
619
    // need the cc_cwnd_init parameter (for SWITCH seqno validation).
620
    // The fact that we obtain it from the cc params of the join point
621
    // is an implementation detail (it's a workaround for the fact that
622
    // at this point, these params can only obtained from a CircHop)
623
    #[cfg(feature = "conflux")]
624
52
    fn cwnd_params(&self) -> Result<CongestionWindowParams, Bug> {
625
52
        let primary_leg = self
626
52
            .leg(self.primary_id)
627
52
            .ok_or_else(|| internal!("no primary leg?!"))?;
628
52
        let join_point = self.join_point_hop(primary_leg)?;
629
52
        let ccontrol = join_point.ccontrol();
630
52
        let cwnd = ccontrol
631
52
            .cwnd()
632
52
            .ok_or_else(|| internal!("congestion control algorithm does not track the cwnd?!"))?;
633

            
634
52
        Ok(*cwnd.params())
635
52
    }
636

            
637
    /// Try to update the primary leg based on the configured desired UX,
638
    /// if needed.
639
    ///
640
    /// Returns the SWITCH cell to send on the primary leg,
641
    /// if we switched primary leg.
642
    #[cfg(feature = "conflux")]
643
1212
    pub(super) fn maybe_update_primary_leg(&mut self) -> crate::Result<Option<SendRelayCell>> {
644
        use tor_error::into_internal;
645

            
646
1212
        let Some(join_point) = self.join_point.as_ref() else {
647
            // Return early if this is not a multi-path tunnel
648
            return Ok(None);
649
        };
650

            
651
1212
        let join_point = join_point.hop;
652

            
653
1212
        if !self.should_update_primary_leg() {
654
            // Nothing to do
655
12
            return Ok(None);
656
1200
        }
657

            
658
1200
        let Some(new_primary_id) = self.select_primary_leg()? else {
659
            // None of the legs satisfy our UX requirements, continue using the existing one.
660
            return Ok(None);
661
        };
662

            
663
        // Check that the newly selected leg is actually different from the previous
664
1200
        if self.primary_id == new_primary_id {
665
            // The primary leg stays the same, nothing to do.
666
1192
            return Ok(None);
667
8
        }
668

            
669
8
        let prev_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
670
8
        self.primary_id = new_primary_id;
671
8
        let new_last_seq_sent = self.primary_leg_mut()?.last_seq_sent()?;
672

            
673
        // If this fails, it means we haven't updated our primary leg in a very long time.
674
        //
675
        // TODO(#2036): there are currently no safeguards to prevent us from staying
676
        // on the same leg for "too long". Perhaps we should design should_update_primary_leg()
677
        // such that it forces us to switch legs periodically, to prevent the seqno delta from
678
        // getting too big?
679
8
        let seqno_delta = u32::try_from(prev_last_seq_sent - new_last_seq_sent).map_err(
680
8
            into_internal!("Seqno delta for switch does not fit in u32?!"),
681
        )?;
682

            
683
        // We need to carry the last_seq_sent over to the next leg
684
        // (the next cell sent will have seqno = prev_last_seq_sent + 1)
685
8
        self.primary_leg_mut()?
686
8
            .set_last_seq_sent(prev_last_seq_sent)?;
687

            
688
8
        let switch = ConfluxSwitch::new(seqno_delta);
689
8
        let cell = AnyRelayMsgOuter::new(None, switch.into());
690
8
        Ok(Some(SendRelayCell {
691
8
            hop: Some(join_point),
692
8
            early: false,
693
8
            cell,
694
8
        }))
695
1212
    }
696

            
697
    /// Whether it's time to select a new primary leg.
698
    #[cfg(feature = "conflux")]
699
1212
    fn should_update_primary_leg(&mut self) -> bool {
700
1212
        if !self.selected_init_primary {
701
12
            self.maybe_select_init_primary();
702
12
            return false;
703
1200
        }
704

            
705
        // If we don't have at least 2 legs,
706
        // we can't switch our primary leg.
707
1200
        if self.legs.len() < 2 {
708
            return false;
709
1200
        }
710

            
711
        // TODO(conflux-tuning): if it turns out we switch legs too frequently,
712
        // we might want to implement some sort of rate-limiting here
713
        // (see c-tor's conflux_can_switch).
714

            
715
1200
        true
716
1212
    }
717

            
718
    /// Return the best leg according to the configured desired UX.
719
    ///
720
    /// Returns `None` if no suitable leg was found.
721
    #[cfg(feature = "conflux")]
722
1200
    fn select_primary_leg(&self) -> Result<Option<UniqId>, Bug> {
723
1200
        match self.desired_ux {
724
            V1DesiredUx::NO_OPINION | V1DesiredUx::MIN_LATENCY => {
725
1200
                self.select_primary_leg_min_rtt(false)
726
            }
727
            V1DesiredUx::HIGH_THROUGHPUT => self.select_primary_leg_min_rtt(true),
728
            V1DesiredUx::LOW_MEM_LATENCY | V1DesiredUx::LOW_MEM_THROUGHPUT => {
729
                // TODO(conflux-tuning): add support for low-memory algorithms
730
                self.select_primary_leg_min_rtt(false)
731
            }
732
            _ => {
733
                // Default to MIN_RTT if we don't recognize the desired UX value
734
                warn!(
735
                    tunnel_id = %self.tunnel_id,
736
                    "Ignoring unrecognized conflux desired UX {}, using MIN_LATENCY",
737
                    self.desired_ux
738
                );
739
                self.select_primary_leg_min_rtt(false)
740
            }
741
        }
742
1200
    }
743

            
744
    /// Try to choose an initial primary leg, if we have an initial RTT measurement
745
    /// for at least one of the legs.
746
    #[cfg(feature = "conflux")]
747
12
    fn maybe_select_init_primary(&mut self) {
748
12
        let best = self
749
12
            .legs
750
12
            .iter()
751
30
            .filter_map(|leg| leg.init_rtt().map(|rtt| (leg, rtt)))
752
12
            .min_by_key(|(_leg, rtt)| *rtt)
753
18
            .map(|(leg, _rtt)| leg.unique_id());
754

            
755
12
        if let Some(best) = best {
756
12
            self.primary_id = best;
757
12
            self.selected_init_primary = true;
758
12
        }
759
12
    }
760

            
761
    /// Return the leg with the best (lowest) RTT.
762
    ///
763
    /// If `check_can_send` is true, selects the lowest RTT leg that is ready to send.
764
    ///
765
    /// Returns `None` if no suitable leg was found.
766
    #[cfg(feature = "conflux")]
767
1200
    fn select_primary_leg_min_rtt(&self, check_can_send: bool) -> Result<Option<UniqId>, Bug> {
768
1200
        let mut best = None;
769

            
770
2400
        for circ in self.legs.iter() {
771
2400
            let leg_id = circ.unique_id();
772
2400
            let join_point = self.join_point_hop(circ)?;
773
2400
            let ccontrol = join_point.ccontrol();
774

            
775
2400
            if check_can_send && !ccontrol.can_send() {
776
                continue;
777
2400
            }
778

            
779
2400
            let rtt = ccontrol.rtt();
780
3144
            let init_rtt_usec = || {
781
1488
                circ.init_rtt()
782
1488
                    .map(|rtt| u32::try_from(rtt.as_micros()).unwrap_or(u32::MAX))
783
1488
            };
784

            
785
2400
            let Some(ewma_rtt) = rtt.ewma_rtt_usec().or_else(init_rtt_usec) else {
786
                return Err(internal!(
787
                    "attempted to select primary leg before handshake completed?!"
788
                ));
789
            };
790

            
791
2400
            match best.take() {
792
1200
                None => {
793
1200
                    best = Some((leg_id, ewma_rtt));
794
1200
                }
795
1200
                Some(best_so_far) => {
796
1200
                    if best_so_far.1 <= ewma_rtt {
797
704
                        best = Some(best_so_far);
798
704
                    } else {
799
496
                        best = Some((leg_id, ewma_rtt));
800
496
                    }
801
                }
802
            }
803
        }
804

            
805
1200
        Ok(best.map(|(leg_id, _)| leg_id))
806
1200
    }
807

            
808
    /// Returns `true` if our conflux join point is blocked on congestion control
809
    /// on the specified `circuit`.
810
    ///
811
    /// Returns `false` if the join point is not blocked on cc,
812
    /// or if this is a single-path set.
813
    ///
814
    /// Returns an error if this is a multipath tunnel,
815
    /// but the joint point hop doesn't exist on the specified circuit.
816
    #[cfg(feature = "conflux")]
817
1454
    fn is_join_point_blocked_on_cc(join_hop: HopNum, circuit: &Circuit) -> Result<bool, Bug> {
818
1454
        let join_circhop = circuit.hop(join_hop).ok_or_else(|| {
819
            internal!(
820
                "Join point hop {} not found on circuit {}?!",
821
                join_hop.display(),
822
                circuit.unique_id(),
823
            )
824
        })?;
825

            
826
1454
        Ok(!join_circhop.ccontrol().can_send())
827
1454
    }
828

            
829
    /// Returns whether [`next_circ_event`](Self::next_circ_event)
830
    /// should avoid polling the join point streams entirely.
831
    #[cfg(feature = "conflux")]
832
5628
    fn should_skip_join_point(&self) -> Result<bool, Bug> {
833
5628
        let Some(primary_join_point) = self.primary_join_point() else {
834
            // Single-path, there is no join point
835
4174
            return Ok(false);
836
        };
837

            
838
1454
        let join_hop = primary_join_point.1;
839
1454
        let primary_blocked_on_cc = {
840
1454
            let primary = self
841
1454
                .leg(self.primary_id)
842
1454
                .ok_or_else(|| internal!("primary leg disappeared?!"))?;
843
1454
            Self::is_join_point_blocked_on_cc(join_hop, primary)?
844
        };
845

            
846
1454
        if !primary_blocked_on_cc {
847
            // Easy, we can just carry on
848
1426
            return Ok(false);
849
28
        }
850

            
851
        // Now, if the primary *is* blocked on cc, we may still be able to poll
852
        // the join point streams (if we're using the right desired UX)
853
28
        let should_skip = if self.desired_ux != V1DesiredUx::HIGH_THROUGHPUT {
854
            // The primary leg is blocked on cc, and we can't switch because we're
855
            // not using the high throughput algorithm, so we must stop reading
856
            // the join point streams.
857
            //
858
            // Note: if the selected algorithm is HIGH_THROUGHPUT,
859
            // it's okay to continue reading from the edge connection,
860
            // because maybe_update_primary_leg() will select a new,
861
            // non-blocked primary leg, just before sending.
862
28
            trace!(
863
                tunnel_id = %self.tunnel_id,
864
                join_point = ?primary_join_point,
865
                reason = "sending leg blocked on congestion control",
866
                "Pausing join point stream reads"
867
            );
868

            
869
28
            true
870
        } else {
871
            // Ah-ha, the desired UX is HIGH_THROUGHPUT, which means we can switch
872
            // to an unblocked leg before sending any cells over the join point,
873
            // as long as there are some unblocked legs.
874

            
875
            // TODO: figure out how to rewrite this with an idiomatic iterator combinator
876
            let mut all_blocked_on_cc = true;
877
            for leg in &self.legs {
878
                all_blocked_on_cc = Self::is_join_point_blocked_on_cc(join_hop, leg)?;
879
                if !all_blocked_on_cc {
880
                    break;
881
                }
882
            }
883

            
884
            if all_blocked_on_cc {
885
                // All legs are blocked on cc, so we must stop reading from
886
                // the join point streams for now.
887
                trace!(
888
                    tunnel_id = %self.tunnel_id,
889
                    join_point = ?primary_join_point,
890
                    reason = "all legs blocked on congestion control",
891
                    "Pausing join point stream reads"
892
                );
893

            
894
                true
895
            } else {
896
                // At least one leg is not blocked, so we can continue reading
897
                // from the join point streams
898
                false
899
            }
900
        };
901

            
902
28
        Ok(should_skip)
903
5628
    }
904

            
905
    /// Returns the next ready [`CircuitEvent`],
906
    /// obtained from processing the incoming/outgoing messages on all the circuits in this set.
907
    ///
908
    /// Will return an error if there are no circuits in this set,
909
    /// or other internal errors occur.
910
    ///
911
    /// This is cancellation-safe.
912
    #[allow(clippy::unnecessary_wraps)] // Can return Err if conflux is enabled
913
    #[instrument(level = "trace", skip_all)]
914
5960
    pub(super) async fn next_circ_event(
915
5960
        &mut self,
916
5960
        runtime: &tor_rtcompat::DynTimeProvider,
917
8774
    ) -> Result<SmallVec<[CircuitEvent; CIRC_EVENT_COUNT]>, crate::Error> {
918
        // Avoid polling the streams on the join point if our primary
919
        // leg is blocked on cc
920
        cfg_if::cfg_if! {
921
            if #[cfg(feature = "conflux")] {
922
                let mut should_poll_join_point = !self.should_skip_join_point()?;
923
            } else {
924
                let mut should_poll_join_point = true;
925
            }
926
        };
927
        let join_point = self.primary_join_point().map(|join_point| join_point.1);
928

            
929
        // Each circuit leg has a PollAll future (see poll_all_circ below)
930
        // that drives two futures: one that reads from input channel,
931
        // and another drives the application streams.
932
        //
933
        // *This* PollAll drives the PollAll futures of all circuit legs in lockstep,
934
        // ensuring they all get a chance to make some progress on every reactor iteration.
935
        //
936
        // IMPORTANT: if you want to push additional futures into this,
937
        // bear in mind that the ordering matters!
938
        // If multiple futures resolve at the same time, their results will be processed
939
        // in the order their corresponding futures were inserted into `PollAll`.
940
        // So if futures A and B resolve at the same time, and future A was pushed
941
        // into `PollAll` before future B, the result of future A will come
942
        // before future B's result in the result list returned by poll_all.await.
943
        //
944
        // This means that the events corresponding to the first circuit in the tunnel
945
        // will be executed first, followed by the events issued by the next circuit,
946
        // and so on.
947
        //
948
        let mut poll_all =
949
            PollAll::<MAX_CONFLUX_LEGS, SmallVec<[CircuitEvent; NUM_CIRC_FUTURES]>>::new();
950

            
951
        for leg in &mut self.legs {
952
            let unique_id = leg.unique_id();
953
5628
            let tunnel_id = self.tunnel_id;
954
            let runtime = runtime.clone();
955

            
956
            // Garbage-collect all halfstreams that have expired.
957
            //
958
            // Note: this will iterate over the closed streams of all hops.
959
            // If we think this will cause perf issues, one idea would be to make
960
            // StreamMap::closed_streams into a min-heap, and add a branch to the
961
            // select_biased! below to sleep until the first expiry is due
962
            // (but my gut feeling is that iterating is cheaper)
963
            leg.remove_expired_halfstreams(runtime.now());
964

            
965
            // The client SHOULD abandon and close circuit if the LINKED message takes too long to
966
            // arrive. This timeout MUST be no larger than the normal SOCKS/stream timeout in use for
967
            // RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead. (The C-Tor
968
            // implementation currently uses Circuit Build Timeout).
969
            let conflux_hs_timeout = leg.conflux_hs_timeout();
970

            
971
            let mut poll_all_circ = PollAll::<NUM_CIRC_FUTURES, CircuitEvent>::new();
972

            
973
524
            let input = leg.input.next().map(move |res| match res {
974
512
                Some(msg) => match msg.try_into() {
975
512
                    Ok(cell) => CircuitEvent::HandleCell {
976
512
                        leg: unique_id,
977
512
                        cell,
978
512
                    },
979
                    // A message outside our restricted set is either a fatal internal error or
980
                    // a protocol violation somehow so shutdown.
981
                    //
982
                    // TODO(relay): We have this spec ticket open about this behavior:
983
                    // https://gitlab.torproject.org/tpo/core/torspec/-/issues/385. It is plausible
984
                    // that we decide to either keep this circuit close behavior or close the
985
                    // entire channel in this case. Resolution of the above ticket needs to fix
986
                    // this part.
987
                    Err(e) => CircuitEvent::ProtoViolation { err: e },
988
                },
989
12
                None => CircuitEvent::RemoveLeg {
990
12
                    leg: unique_id,
991
12
                    reason: RemoveLegReason::ChannelClosed,
992
12
                },
993
524
            });
994
            poll_all_circ.push(input);
995

            
996
            // This future resolves when the chan_sender sink (i.e. the outgoing TCP connection)
997
            // becomes ready. We need it inside the next_ready_stream future below,
998
            // to prevent reading from the application streams before we are ready to send.
999
7110
            let chan_ready_fut = futures::future::poll_fn(|cx| {
                use futures::Sink as _;
                // Ensure the chan sender sink is ready before polling the ready streams.
7110
                Pin::new(&mut leg.chan_sender).poll_ready(cx)
7110
            });
            let exclude_hop = if should_poll_join_point {
                // Avoid polling the join point more than once per reactor loop.
                should_poll_join_point = false;
                None
            } else {
                join_point
            };
            let mut ready_streams = leg.hops.ready_streams_iterator(exclude_hop);
7078
            let next_ready_stream = async move {
                // Avoid polling the application streams if the outgoing sink is blocked
7078
                let _ = chan_ready_fut.await;
7078
                match ready_streams.next().await {
4234
                    Some(x) => x,
                    None => {
                        info!(circ_id=%unique_id, "no ready streams (maybe blocked on cc?)");
                        // There are no ready streams (for example, they may all be
                        // blocked due to congestion control), so there is nothing
                        // to do.
                        // We await an infinitely pending future so that we don't
                        // immediately return a `None` in the `select_biased!` below.
                        // We'd rather wait on `input.next()` than immediately return with
                        // no `CircuitEvent`, which could put the reactor into a spin loop.
                        let () = std::future::pending().await;
                        unreachable!();
                    }
                }
4234
            };
            poll_all_circ.push(next_ready_stream.map(move |cmd| CircuitEvent::RunCmd {
4234
                leg: unique_id,
4234
                cmd,
4234
            }));
            let mut next_padding_event_fut = leg.padding_event_stream.next();
            // This selects between 3 events that cannot be handled concurrently.
            //
            // If the conflux handshake times out, we need to remove the circuit leg
            // (any pending padding events or application stream data should be discarded;
            // in fact, there shouldn't even be any open streams on circuits that are
            // in the conflux handshake phase).
            //
            // If there's a padding event, we need to handle it immediately,
            // because it might tell us to start blocking the chan_sender sink,
            // which, in turn, means we need to stop trying to read from the application streams.
            poll_all.push(
7078
                async move {
7078
                    let conflux_hs_timeout = if let Some(timeout) = conflux_hs_timeout {
                        // TODO: ask Diziet if we can have a sleep_until_instant() function
120
                        Box::pin(runtime.sleep_until_wallclock(timeout))
120
                            as Pin<Box<dyn Future<Output = ()> + Send>>
                    } else {
6958
                        Box::pin(std::future::pending())
                    };
7078
                    select_biased! {
7078
                        () = conflux_hs_timeout.fuse() => {
4
                            warn!(
                                tunnel_id = %tunnel_id,
                                circ_id = %unique_id,
                                "Conflux handshake timed out on circuit"
                            );
                            // Conflux handshake has timed out, time to remove this circuit leg,
                            // and notify the handshake initiator.
4
                            smallvec![CircuitEvent::RemoveLeg {
                                leg: unique_id,
                                reason: RemoveLegReason::ConfluxHandshakeTimeout,
                            }]
                        }
                        padding_event = next_padding_event_fut => {
                            smallvec![CircuitEvent::PaddingAction {
                                leg: unique_id,
                                padding_event:
                                    padding_event.expect("PaddingEventStream, surprisingly, was terminated!"),
                            }]
                        }
7078
                        ret = poll_all_circ.fuse() => ret,
                    }
4744
                }
            );
        }
        // Flatten the nested SmallVecs to simplify the calling code
        // (which will handle all the returned events sequentially).
        Ok(poll_all.await.into_iter().flatten().collect())
4722
    }
    /// The join point on the current primary leg.
11320
    pub(super) fn primary_join_point(&self) -> Option<(UniqId, HopNum)> {
11320
        self.join_point
11320
            .as_ref()
12806
            .map(|join_point| (self.primary_id, join_point.hop))
11320
    }
    /// Does congestion control use stream SENDMEs for the given hop?
    ///
    /// Returns `None` if either the `leg` or `hop` don't exist.
    pub(super) fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
        self.leg(leg)?.uses_stream_sendme(hop)
    }
    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
    ///
    /// See [`Circuit::send_relay_cell`].
    #[instrument(level = "trace", skip_all)]
4412
    pub(super) async fn send_relay_cell_on_leg(
4412
        &mut self,
4412
        msg: SendRelayCell,
4412
        leg: Option<UniqId>,
6618
    ) -> crate::Result<()> {
        let conflux_join_point = self.join_point.as_ref().map(|join_point| join_point.hop);
        let leg = if let Some(join_point) = conflux_join_point {
            let hop = msg.hop.expect("missing hop in client SendRelayCell?!");
            // Conflux circuits always send multiplexed relay commands to
            // to the last hop (the join point).
            if cmd_counts_towards_seqno(msg.cell.cmd()) {
                if hop != join_point {
                    // For leaky pipe, we must continue using the original leg
                    leg
                } else {
4412
                    let old_primary_leg = self.primary_id;
                    // Check if it's time to switch our primary leg.
                    #[cfg(feature = "conflux")]
                    if let Some(switch_cell) = self.maybe_update_primary_leg()? {
                        trace!(
                            old = ?old_primary_leg,
                            new = ?self.primary_id,
                            "Switching primary conflux leg..."
                        );
                        self.primary_leg_mut()?.send_relay_cell(switch_cell).await?;
                    }
                    // Use the possibly updated primary leg
                    Some(self.primary_id)
                }
            } else {
                // Non-multiplexed commands go on their original
                // circuit and hop
                leg
            }
        } else {
            // If there is no join point, it means this is not
            // a multi-path tunnel, so we continue using
            // the leg_id/hop the cmd came from.
            leg
        };
        let leg = leg.unwrap_or(self.primary_id);
        let circ = self
            .leg_mut(leg)
            .ok_or_else(|| internal!("leg disappeared?!"))?;
        circ.send_relay_cell(msg).await
4412
    }
    /// Send a LINK cell down each unlinked leg.
    #[cfg(feature = "conflux")]
52
    pub(super) async fn link_circuits(
52
        &mut self,
52
        runtime: &tor_rtcompat::DynTimeProvider,
78
    ) -> crate::Result<()> {
52
        let (_leg_id, join_point) = self
52
            .primary_join_point()
52
            .ok_or_else(|| internal!("no join point when trying to send LINK"))?;
        // Link all the circuits that haven't started the conflux handshake yet.
104
        for circ in self
52
            .legs
52
            .iter_mut()
            // TODO: it is an internal error if any of the legs don't have a conflux handler
            // (i.e. if conflux_status() returns None)
104
            .filter(|circ| circ.conflux_status() == Some(ConfluxStatus::Unlinked))
        {
104
            let v1_payload = V1LinkPayload::new(self.nonce, self.desired_ux);
104
            let link = ConfluxLink::new(v1_payload);
104
            let cell = AnyRelayMsgOuter::new(None, link.into());
104
            circ.begin_conflux_link(join_point, cell, runtime).await?;
        }
        // TODO(conflux): the caller should take care to not allow opening streams
        // until the conflux set is ready (i.e. until at least one of the legs completes
        // the handshake).
        //
        // We will probably need a channel for notifying the caller
        // of handshake completion/conflux set readiness
52
        Ok(())
52
    }
    /// Get the number of unlinked or non-conflux legs.
    #[cfg(feature = "conflux")]
60
    pub(super) fn num_unlinked(&self) -> usize {
60
        self.circuits()
90
            .filter(|circ| {
60
                let status = circ.conflux_status();
60
                status.is_none() || status == Some(ConfluxStatus::Unlinked)
60
            })
60
            .count()
60
    }
    /// Check if the specified sequence number is the sequence number of the
    /// next message we're expecting to handle.
36
    pub(super) fn is_seqno_in_order(&self, seq_recv: u64) -> bool {
36
        let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
36
        seq_recv == last_seq_delivered + 1
36
    }
    /// Remove the circuit leg with the specified `UniqId` from this conflux set.
    ///
    /// Unlike [`ConfluxSet::remove`], this function does not check
    /// if the removal of the leg ought to trigger a reactor shutdown.
    ///
    /// Returns an error if the leg doesn't exit in the conflux set.
100
    fn remove_unchecked(&mut self, circ_id: UniqId) -> Result<Circuit, Bug> {
100
        let idx = self
100
            .legs
100
            .iter()
166
            .position(|circ| circ.unique_id() == circ_id)
100
            .ok_or_else(|| internal!("leg {circ_id:?} not found in conflux set"))?;
100
        Ok(self.legs.remove(idx))
100
    }
    /// Perform some circuit-padding-based event on the specified circuit.
    #[cfg(feature = "circ-padding")]
    pub(super) async fn run_padding_event(
        &mut self,
        circ_id: UniqId,
        padding_event: PaddingEvent,
    ) -> crate::Result<()> {
        use PaddingEvent as E;
        let Some(circ) = self.leg_mut(circ_id) else {
            // No such circuit; it must have gone away after generating this event.
            // Just ignore it.
            return Ok(());
        };
        match padding_event {
            E::SendPadding(send_padding) => {
                circ.send_padding(send_padding).await?;
            }
            E::StartBlocking(start_blocking) => {
                circ.start_blocking_for_padding(start_blocking);
            }
            E::StopBlocking => {
                circ.stop_blocking_for_padding();
            }
        }
        Ok(())
    }
}
/// An error returned when a method is expecting a single-leg conflux circuit,
/// but it is not single-leg.
#[derive(Clone, Debug, derive_more::Display, thiserror::Error)]
pub(super) struct NotSingleLegError(#[source] Bug);
impl From<NotSingleLegError> for Bug {
4
    fn from(e: NotSingleLegError) -> Self {
4
        e.0
4
    }
}
impl From<NotSingleLegError> for crate::Error {
    fn from(e: NotSingleLegError) -> Self {
        Self::from(e.0)
    }
}
impl From<NotSingleLegError> for ReactorError {
    fn from(e: NotSingleLegError) -> Self {
        Self::from(e.0)
    }
}
impl<I: Iterator> From<ExactlyOneError<I>> for NotSingleLegError {
1466
    fn from(e: ExactlyOneError<I>) -> Self {
        // TODO: cannot wrap the ExactlyOneError with into_bad_api_usage
        // because it's not Send + Sync
1466
        Self(bad_api_usage!("not a single leg conflux set ({e})"))
1466
    }
}
#[cfg(test)]
mod test {
    // Tested in [`crate::client::circuit::test`].
}