1
//! Abstract code to manage a set of tunnels which has underlying circuit(s).
2
//!
3
//! This module implements the real logic for deciding when and how to
4
//! launch tunnels, and for which tunnels to hand out in response to
5
//! which requests.
6
//!
7
//! For testing and abstraction purposes, this module _does not_
8
//! actually know anything about tunnels _per se_.  Instead,
9
//! everything is handled using a set of traits that are internal to this
10
//! crate:
11
//!
12
//!  * [`AbstractTunnel`] is a view of a tunnel.
13
//!  * [`AbstractTunnelBuilder`] knows how to build an `AbstractCirc`.
14
//!
15
//! Using these traits, the [`AbstractTunnelMgr`] object manages a set of
16
//! tunnels , launching them as necessary, and keeping track of the
17
//! restrictions on their use.
18

            
19
// TODO:
20
// - Testing
21
//    - Error from prepare_action()
22
//    - Error reported by restrict_mut?
23

            
24
use crate::config::CircuitTiming;
25
use crate::usage::{SupportedTunnelUsage, TargetTunnelUsage};
26
use crate::{DirInfo, Error, PathConfig, Result, timeouts};
27

            
28
use retry_error::RetryError;
29
use tor_async_utils::mpsc_channel_no_memquota;
30
use tor_basic_utils::retry::RetryDelay;
31
use tor_config::MutCfg;
32
use tor_error::{AbsRetryTime, HasRetryTime, debug_report, info_report, internal, warn_report};
33
#[cfg(feature = "vanguards")]
34
use tor_guardmgr::vanguards::VanguardMgr;
35
use tor_linkspec::CircTarget;
36
use tor_proto::circuit::UniqId;
37
use tor_proto::client::circuit::{CircParameters, Path};
38
use tor_rtcompat::{Runtime, SleepProviderExt};
39

            
40
use async_trait::async_trait;
41
use futures::channel::mpsc;
42
use futures::future::{FutureExt, Shared};
43
use futures::stream::{FuturesUnordered, StreamExt};
44
use oneshot_fused_workaround as oneshot;
45
use std::collections::HashMap;
46
use std::fmt::Debug;
47
use std::hash::Hash;
48
use std::panic::AssertUnwindSafe;
49
use std::sync::{self, Arc, Weak};
50
use tor_rtcompat::SpawnExt;
51
use tracing::{debug, instrument, trace, warn};
52
use web_time_compat::{Duration, Instant};
53
mod streams;
54

            
55
/// Alias to force use of RandomState, regardless of features enabled in `weak_tables`.
56
///
57
/// See <https://github.com/tov/weak-table-rs/issues/23> for discussion.
58
///
59
/// (We could probably get away with a weaker hash function in this case, since
60
/// the attacker _probably_ doesn't have control over our pointers.)
61
type PtrWeakHashSet<T> = weak_table::PtrWeakHashSet<T, std::hash::RandomState>;
62

            
63
/// Description of how we got a tunnel.
64
#[non_exhaustive]
65
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
66
pub(crate) enum TunnelProvenance {
67
    /// This channel was newly launched, or was in progress and finished while
68
    /// we were waiting.
69
    NewlyCreated,
70
    /// This channel already existed when we asked for it.
71
    Preexisting,
72
}
73

            
74
/// An error returned when we cannot apply circuit restriction.
75
#[derive(Clone, Debug, thiserror::Error)]
76
#[non_exhaustive]
77
pub enum RestrictionFailed {
78
    /// Tried to restrict a specification, but the tunnel didn't support the
79
    /// requested usage.
80
    #[error("Specification did not support desired usage")]
81
    NotSupported,
82
}
83

            
84
/// Minimal abstract view of a tunnel.
85
///
86
/// From this module's point of view, tunnels are simply objects
87
/// with unique identities, and a possible closed-state.
88
#[async_trait]
89
pub(crate) trait AbstractTunnel: Debug {
90
    /// Type for a unique identifier for tunnels.
91
    type Id: Clone + Debug + Hash + Eq + Send + Sync;
92
    /// Return the unique identifier for this tunnel.
93
    ///
94
    /// # Requirements
95
    ///
96
    /// The values returned by this function are unique for distinct
97
    /// tunnels.
98
    fn id(&self) -> Self::Id;
99

            
100
    /// Return true if this tunnel is usable for some purpose.
101
    ///
102
    /// Reasons a tunnel might be unusable include being closed.
103
    fn usable(&self) -> bool;
104

            
105
    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
106
    ///
107
    /// Returns an error if the tunnel has more than one tunnel.
108
    fn single_path(&self) -> tor_proto::Result<Arc<Path>>;
109

            
110
    /// Return the number of hops in this tunnel.
111
    ///
112
    /// Returns an error if the circuit is closed.
113
    ///
114
    /// NOTE: This function will currently return only the number of hops
115
    /// _currently_ in the tunnel. If there is an extend operation in progress,
116
    /// the currently pending hop may or may not be counted, depending on whether
117
    /// the extend operation finishes before this call is done.
118
    fn n_hops(&self) -> tor_proto::Result<usize>;
119

            
120
    /// Return true if this tunnel is closed and therefore unusable.
121
    fn is_closing(&self) -> bool;
122

            
123
    /// Return a process-unique identifier for this tunnel.
124
    fn unique_id(&self) -> UniqId;
125

            
126
    /// Extend the tunnel via the most appropriate handshake to a new `target` hop.
127
    async fn extend<T: CircTarget + Sync>(
128
        &self,
129
        target: &T,
130
        params: CircParameters,
131
    ) -> tor_proto::Result<()>;
132

            
133
    /// Return a time at which this tunnel is last known to be used,
134
    /// or None if it is in use right now (or has never been used).
135
    async fn last_known_to_be_used_at(&self) -> tor_proto::Result<Option<Instant>>;
136
}
137

            
138
/// A plan for an `AbstractCircBuilder` that can maybe be mutated by tests.
139
///
140
/// You should implement this trait using all default methods for all code that isn't test code.
141
pub(crate) trait MockablePlan {
142
    /// Add a reason string that was passed to `SleepProvider::block_advance()` to this object
143
    /// so that it knows what to pass to `::release_advance()`.
144
    fn add_blocked_advance_reason(&mut self, _reason: String) {}
145
}
146

            
147
/// An object that knows how to build tunnels.
148
///
149
/// This creates tunnels in two phases. First, a plan is
150
/// made for how to build the tunnel. This planning phase should be
151
/// relatively fast, and must not suspend or block.  Its purpose is to
152
/// get an early estimate of which operations the tunnel will be able
153
/// to support when it's done.
154
///
155
/// Second, the tunnel is actually built, using the plan as input.
156

            
157
#[async_trait]
158
pub(crate) trait AbstractTunnelBuilder<R: Runtime>: Send + Sync {
159
    /// The tunnel type that this builder knows how to build.
160
    type Tunnel: AbstractTunnel + Send + Sync;
161
    /// An opaque type describing how a given tunnel will be built.
162
    /// It may represent some or all of a path-or it may not.
163
    //
164
    // TODO: It would be nice to have this parameterized on a lifetime,
165
    // and have that lifetime depend on the lifetime of the directory.
166
    // But I don't think that rust can do that.
167
    //
168
    // HACK(eta): I don't like the fact that `MockablePlan` is necessary here.
169
    type Plan: Send + Debug + MockablePlan;
170

            
171
    // TODO: I'd like to have a Dir type here to represent
172
    // create::DirInfo, but that would need to be parameterized too,
173
    // and would make everything complicated.
174

            
175
    /// Form a plan for how to build a new tunnel that supports `usage`.
176
    ///
177
    /// Return an opaque Plan object, and a new spec describing what
178
    /// the tunnel will actually support when it's built.  (For
179
    /// example, if the input spec requests a tunnel that connect to
180
    /// port 80, then "planning" the tunnel might involve picking an
181
    /// exit that supports port 80, and the resulting spec might be
182
    /// the exit's complete list of supported ports.)
183
    ///
184
    /// # Requirements
185
    ///
186
    /// The resulting Spec must support `usage`.
187
    fn plan_tunnel(
188
        &self,
189
        usage: &TargetTunnelUsage,
190
        dir: DirInfo<'_>,
191
    ) -> Result<(Self::Plan, SupportedTunnelUsage)>;
192

            
193
    /// Construct a tunnel according to a given plan.
194
    ///
195
    /// On success, return a spec describing what the tunnel can be used for,
196
    /// and the tunnel that was just constructed.
197
    ///
198
    /// This function should implement some kind of a timeout for
199
    /// tunnel that are taking too long.
200
    ///
201
    /// # Requirements
202
    ///
203
    /// The spec that this function returns _must_ support the usage
204
    /// that was originally passed to `plan_tunnel`.  It _must_ also
205
    /// contain the spec that was originally returned by
206
    /// `plan_tunnel`.
207
    async fn build_tunnel(&self, plan: Self::Plan) -> Result<(SupportedTunnelUsage, Self::Tunnel)>;
208

            
209
    /// Return a "parallelism factor" with which tunnels should be
210
    /// constructed for a given purpose.
211
    ///
212
    /// If this function returns N, then whenever we launch tunnels
213
    /// for this purpose, then we launch N in parallel.
214
    ///
215
    /// The default implementation returns 1.  The value of 0 is
216
    /// treated as if it were 1.
217
604
    fn launch_parallelism(&self, usage: &TargetTunnelUsage) -> usize {
218
604
        let _ = usage; // default implementation ignores this.
219
604
        1
220
604
    }
221

            
222
    /// Return a "parallelism factor" for which tunnels should be
223
    /// used for a given purpose.
224
    ///
225
    /// If this function returns N, then whenever we select among
226
    /// open tunnels for this purpose, we choose at random from the
227
    /// best N.
228
    ///
229
    /// The default implementation returns 1.  The value of 0 is
230
    /// treated as if it were 1.
231
    // TODO: Possibly this doesn't belong in this trait.
232
382
    fn select_parallelism(&self, usage: &TargetTunnelUsage) -> usize {
233
382
        let _ = usage; // default implementation ignores this.
234
382
        1
235
382
    }
236

            
237
    /// Return true if we are currently attempting to learn tunnel
238
    /// timeouts by building testing tunnels.
239
    fn learning_timeouts(&self) -> bool;
240

            
241
    /// Flush state to the state manager if we own the lock.
242
    ///
243
    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
244
    fn save_state(&self) -> Result<bool>;
245

            
246
    /// Return this builder's [`PathConfig`].
247
    fn path_config(&self) -> Arc<PathConfig>;
248

            
249
    /// Replace this builder's [`PathConfig`].
250
    // TODO: This is dead_code because we only call this for the CircuitBuilder specialization of
251
    // CircMgr, not from the generic version, because this trait doesn't provide guardmgr, which is
252
    // needed by the [`CircMgr::reconfigure`] function that would be the only caller of this. We
253
    // should add `guardmgr` to this trait, make [`CircMgr::reconfigure`] generic, and remove this
254
    // dead_code marking.
255
    #[allow(dead_code)]
256
    fn set_path_config(&self, new_config: PathConfig);
257

            
258
    /// Return a reference to this builder's timeout estimator.
259
    fn estimator(&self) -> &timeouts::Estimator;
260

            
261
    /// Return a reference to this builder's `VanguardMgr`.
262
    #[cfg(feature = "vanguards")]
263
    fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>>;
264

            
265
    /// Replace our state with a new owning state, assuming we have
266
    /// storage permission.
267
    fn upgrade_to_owned_state(&self) -> Result<()>;
268

            
269
    /// Reload persistent state from disk, if we don't have storage permission.
270
    fn reload_state(&self) -> Result<()>;
271

            
272
    /// Return a reference to this builder's `GuardMgr`.
273
    fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R>;
274

            
275
    /// Reconfigure this builder using the latest set of network parameters.
276
    ///
277
    /// (NOTE: for now, this only affects tunnel timeout estimation.)
278
    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters);
279
}
280

            
281
/// Enumeration to track the expiration state of a tunnel.
282
///
283
/// A tunnel an either be unused (at which point it should expire if it is
284
/// _still unused_ by a certain time, or dirty (at which point it should
285
/// expire after a certain duration).
286
///
287
/// All tunnels start out "unused" and become "dirty" when their spec
288
/// is first restricted -- that is, when they are first handed out to be
289
/// used for a request.
290
#[derive(Debug, Clone, PartialEq, Eq)]
291
enum ExpirationInfo {
292
    /// The tunnel has never been used, and has never been restricted for use with a request.
293
    Unused {
294
        /// A time when the tunnel was created.
295
        created: Instant,
296
    },
297

            
298
    /// The tunnel is not-long-lived; we will expire by waiting until a certain amount of time
299
    /// after it was first used.
300
    Dirty {
301
        /// The time at which this tunnel's spec was first restricted.
302
        dirty_since: Instant,
303
    },
304

            
305
    /// The tunnel is long-lived; we will expire by waiting until it has passed
306
    /// a certain amount of time without having any streams attached to it.
307
    LongLived {
308
        /// Last time at which the tunnel was checked and found not to have any streams.
309
        ///
310
        /// (This is a bit complicated: We have to be vague here, since we need
311
        /// an async check to find out that a tunnel is used, or when it actually
312
        /// became disused.)
313
        last_known_to_be_used_at: Instant,
314
    },
315
}
316

            
317
impl ExpirationInfo {
318
    /// Return an ExpirationInfo for a newly created tunnel.
319
92
    fn new(now: Instant) -> Self {
320
92
        ExpirationInfo::Unused { created: now }
321
92
    }
322

            
323
    /// Mark this ExpirationInfo as having been in-use at `now`.
324
    ///
325
    /// If `long_lived` is false, the associated tunnel should expire a certain amount of time
326
    /// after it was _first_ used.
327
    /// If `long_lived` is true, the associated tunnel should expire a certain amount of time
328
    /// after it was _last_ used.
329
448
    fn mark_used(&mut self, now: Instant, long_lived: bool) {
330
448
        if long_lived {
331
            *self = ExpirationInfo::LongLived {
332
                last_known_to_be_used_at: now,
333
            };
334
        } else {
335
448
            match self {
336
52
                ExpirationInfo::Unused { .. } => {
337
52
                    // This is our first time using this circuit; mark it dirty
338
52
                    *self = ExpirationInfo::Dirty { dirty_since: now };
339
52
                }
340
396
                ExpirationInfo::Dirty { .. } => {
341
396
                    // no need to update; we're tracking the time when the circuit _first_ became
342
396
                    // dirty, so further uses don't matter.
343
396
                }
344
                ExpirationInfo::LongLived { .. } => {
345
                    // shouldn't occur: we shouldn't be able to attach a stream with non-long-lived isolation
346
                    // to a tunnel marked as long-lived.  In this case we leave the timestamp alone.
347
                    // (If there were a bug here, it would be harmless, since we would
348
                    // correct the timestamp the next time we tried to expire the circuit.)
349
                }
350
            }
351
        }
352
448
    }
353

            
354
    /// Return an internal error if this ExpirationInfo is not marked as long-lived.
355
    fn check_long_lived(&self) -> Result<()> {
356
        match self {
357
            ExpirationInfo::Unused { .. } | ExpirationInfo::Dirty { .. } => Err(internal!(
358
                "Tunnel was not long-lived as expected. (Expiration status: {:?})",
359
                self
360
            )
361
            .into()),
362
            ExpirationInfo::LongLived { .. } => Ok(()),
363
        }
364
    }
365
}
366

            
367
/// Settings to determine when circuits are expired.
368
#[derive(Clone, Debug)]
369
pub(crate) struct ExpirationParameters {
370
    /// Any unused circuit is expired this long after it was created.
371
    expire_unused_after: Duration,
372
    /// Any non long-lived dirty circuit is expired this long after it first becomes dirty.
373
    expire_dirty_after: Duration,
374
    /// Any long-lived circuit is expired after having been disused for this long.
375
    expire_disused_after: Duration,
376
}
377

            
378
/// An entry for an open tunnel held by an `AbstractTunnelMgr`.
379
#[derive(Debug, Clone)]
380
pub(crate) struct OpenEntry<T> {
381
    /// The supported usage for this tunnel.
382
    spec: SupportedTunnelUsage,
383
    /// The tunnel under management.
384
    tunnel: Arc<T>,
385
    /// When does this tunnel expire?
386
    ///
387
    /// (Note that expired tunnels are removed from the manager,
388
    /// which does not actually close them until there are no more
389
    /// references to them.)
390
    expiration: ExpirationInfo,
391
}
392

            
393
impl<T: AbstractTunnel> OpenEntry<T> {
394
    /// Make a new OpenEntry for a given tunnel and spec.
395
98
    fn new(spec: SupportedTunnelUsage, tunnel: T, expiration: ExpirationInfo) -> Self {
396
98
        OpenEntry {
397
98
            spec,
398
98
            tunnel: tunnel.into(),
399
98
            expiration,
400
98
        }
401
98
    }
402

            
403
    /// Return true if the underlying tunnel can be used for `usage`.
404
1324
    pub(crate) fn supports(&self, usage: &TargetTunnelUsage) -> bool {
405
1324
        self.tunnel.usable() && self.spec.supports(usage)
406
1324
    }
407

            
408
    /// Change the underlying tunnel's permissible usage, based on its having
409
    /// been used for `usage` at time `now`.
410
    ///
411
    /// Return an error if the tunnel may not be used for `usage`.
412
448
    fn restrict_mut(&mut self, usage: &TargetTunnelUsage, now: Instant) -> Result<()> {
413
448
        self.spec.restrict_mut(usage)?;
414
448
        self.expiration.mark_used(now, self.spec.is_long_lived());
415
448
        Ok(())
416
448
    }
417

            
418
    /// Find the "best" entry from a slice of OpenEntry for supporting
419
    /// a given `usage`.
420
    ///
421
    /// If `parallelism` is some N greater than 1, we pick randomly
422
    /// from the best `N` tunnels.
423
    ///
424
    /// # Requirements
425
    ///
426
    /// Requires that `ents` is nonempty, and that every element of `ents`
427
    /// supports `spec`.
428
382
    fn find_best<'a>(
429
382
        // we do not mutate `ents`, but to return `&mut Self` we must have a mutable borrow
430
382
        ents: &'a mut [&'a mut Self],
431
382
        usage: &TargetTunnelUsage,
432
382
        parallelism: usize,
433
382
    ) -> &'a mut Self {
434
382
        let _ = usage; // not yet used.
435
        use rand::seq::IndexedMutRandom as _;
436
382
        let parallelism = parallelism.clamp(1, ents.len());
437
        // TODO: Actually look over the whole list to see which is better.
438
382
        let slice = &mut ents[0..parallelism];
439
382
        let mut rng = rand::rng();
440
382
        slice.choose_mut(&mut rng).expect("Input list was empty")
441
382
    }
442

            
443
    /// Return true if this tunnel should be expired given that the current time is `now`,
444
    /// and the current settings are `params`.
445
8
    fn should_expire(&self, now: Instant, params: &ExpirationParameters) -> ShouldExpire {
446
8
        match self.expiration {
447
            ExpirationInfo::Unused { created } => {
448
                ShouldExpire::certain(now, created + params.expire_unused_after)
449
            }
450
8
            ExpirationInfo::Dirty { dirty_since } => {
451
8
                ShouldExpire::certain(now, dirty_since + params.expire_dirty_after)
452
            }
453
            ExpirationInfo::LongLived {
454
                last_known_to_be_used_at,
455
            } => {
456
                ShouldExpire::uncertain(now, last_known_to_be_used_at + params.expire_disused_after)
457
            }
458
        }
459
8
    }
460
}
461

            
462
/// When should a tunnel expire?
463
///
464
/// Reflects possible uncertainty.
465
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
466
enum ShouldExpire {
467
    /// The tunnel should expire now.
468
    Now,
469
    /// The circuit might expire now; we need to check.
470
    ///
471
    /// (This is the result we get when we know that this is a tunnel that should expire
472
    /// if it has gone for some duration D without having any streams on it,
473
    /// and that it definitely had a stream at time T.  It is now at least time T+D,
474
    /// but we don't know whether the tunnel has any streams in the intervening time.
475
    /// We need to call the async fn `last_known_to_be_used_at` to check.)
476
    PossiblyNow,
477
    /// The tunnel will not expire before the specified time.
478
    NotBefore(Instant),
479
}
480

            
481
impl ShouldExpire {
482
    /// Return a ShouldExpire reflecting an expiration that is known to be happening at `expiration`.
483
8
    fn certain(now: Instant, expiration: Instant) -> Self {
484
8
        if now >= expiration {
485
4
            ShouldExpire::Now
486
        } else {
487
4
            ShouldExpire::NotBefore(expiration)
488
        }
489
8
    }
490

            
491
    /// Return a ShouldExpire reflecting an expiration that is known to be no sooner than `expiration`,
492
    /// but possibly later.
493
    fn uncertain(now: Instant, expiration: Instant) -> Self {
494
        if now >= expiration {
495
            ShouldExpire::PossiblyNow
496
        } else {
497
            ShouldExpire::NotBefore(expiration)
498
        }
499
    }
500
}
501

            
502
/// A result type whose "Ok" value is the Id for a tunnel from B.
503
type PendResult<B, R> = Result<<<B as AbstractTunnelBuilder<R>>::Tunnel as AbstractTunnel>::Id>;
504

            
505
/// An in-progress tunnel request tracked by an `AbstractTunnelMgr`.
506
///
507
/// (In addition to tracking tunnels, `AbstractTunnelMgr` tracks
508
/// _requests_ for tunnels.  The manager uses these entries if it
509
/// finds that some tunnel created _after_ a request first launched
510
/// might meet the request's requirements.)
511
struct PendingRequest<B: AbstractTunnelBuilder<R>, R: Runtime> {
512
    /// Usage for the operation requested by this request
513
    usage: TargetTunnelUsage,
514
    /// A channel to use for telling this request about tunnels that it
515
    /// might like.
516
    notify: mpsc::Sender<PendResult<B, R>>,
517
}
518

            
519
impl<B: AbstractTunnelBuilder<R>, R: Runtime> PendingRequest<B, R> {
520
    /// Return true if this request would be supported by `spec`.
521
34
    fn supported_by(&self, spec: &SupportedTunnelUsage) -> bool {
522
34
        spec.supports(&self.usage)
523
34
    }
524
}
525

            
526
/// An entry for an under-construction in-progress tunnel tracked by
527
/// an `AbstractTunnelMgr`.
528
#[derive(Debug)]
529
struct PendingEntry<B: AbstractTunnelBuilder<R>, R: Runtime> {
530
    /// Specification that this tunnel will support, if every pending
531
    /// request that is waiting for it is attached to it.
532
    ///
533
    /// This spec becomes more and more restricted as more pending
534
    /// requests are waiting for this tunnel.
535
    ///
536
    /// This spec is contained by circ_spec, and must support the usage
537
    /// of every pending request that's waiting for this tunnel.
538
    tentative_assignment: sync::Mutex<SupportedTunnelUsage>,
539
    /// A shared future for requests to use when waiting for
540
    /// notification of this tunnel's success.
541
    receiver: Shared<oneshot::Receiver<PendResult<B, R>>>,
542
}
543

            
544
impl<B: AbstractTunnelBuilder<R>, R: Runtime> PendingEntry<B, R> {
545
    /// Make a new PendingEntry that starts out supporting a given
546
    /// spec.  Return that PendingEntry, along with a Sender to use to
547
    /// report the result of building this tunnel.
548
140
    fn new(spec: &SupportedTunnelUsage) -> (Self, oneshot::Sender<PendResult<B, R>>) {
549
140
        let tentative_assignment = sync::Mutex::new(spec.clone());
550
140
        let (sender, receiver) = oneshot::channel();
551
140
        let receiver = receiver.shared();
552
140
        let entry = PendingEntry {
553
140
            tentative_assignment,
554
140
            receiver,
555
140
        };
556
140
        (entry, sender)
557
140
    }
558

            
559
    /// Return true if this tunnel's current tentative assignment
560
    /// supports `usage`.
561
54
    fn supports(&self, usage: &TargetTunnelUsage) -> bool {
562
54
        let assignment = self.tentative_assignment.lock().expect("poisoned lock");
563
54
        assignment.supports(usage)
564
54
    }
565

            
566
    /// Try to change the tentative assignment of this tunnel by
567
    /// restricting it for use with `usage`.
568
    ///
569
    /// Return an error if the current tentative assignment didn't
570
    /// support `usage` in the first place.
571
26
    fn tentative_restrict_mut(&self, usage: &TargetTunnelUsage) -> Result<()> {
572
26
        if let Ok(mut assignment) = self.tentative_assignment.lock() {
573
26
            assignment.restrict_mut(usage)?;
574
        }
575
26
        Ok(())
576
26
    }
577

            
578
    /// Find the best PendingEntry values from a slice for use with
579
    /// `usage`.
580
    ///
581
    /// # Requirements
582
    ///
583
    /// The `ents` slice must not be empty.  Every element of `ents`
584
    /// must support the given spec.
585
26
    fn find_best(ents: &[Arc<Self>], usage: &TargetTunnelUsage) -> Vec<Arc<Self>> {
586
        // TODO: Actually look over the whole list to see which is better.
587
26
        let _ = usage; // currently unused
588
26
        vec![Arc::clone(&ents[0])]
589
26
    }
590
}
591

            
592
/// Wrapper type to represent the state between planning to build a
593
/// tunnel and constructing it.
594
#[derive(Debug)]
595
struct TunnelBuildPlan<B: AbstractTunnelBuilder<R>, R: Runtime> {
596
    /// The Plan object returned by [`AbstractTunnelBuilder::plan_tunnel`].
597
    plan: B::Plan,
598
    /// A sender to notify any pending requests when this tunnel is done.
599
    sender: oneshot::Sender<PendResult<B, R>>,
600
    /// A strong entry to the PendingEntry for this tunnel build attempt.
601
    pending: Arc<PendingEntry<B, R>>,
602
}
603

            
604
/// The inner state of an [`AbstractTunnelMgr`].
605
struct TunnelList<B: AbstractTunnelBuilder<R>, R: Runtime> {
606
    /// A map from tunnel ID to [`OpenEntry`] values for all managed
607
    /// open tunnels.
608
    ///
609
    /// A tunnel is added here from [`AbstractTunnelMgr::do_launch`] when we find
610
    /// that it completes successfully, and has not been cancelled.
611
    /// When we decide that such a tunnel should no longer be handed out for
612
    /// any new requests, we "retire" the tunnel by removing it from this map.
613
    #[allow(clippy::type_complexity)]
614
    open_tunnels: HashMap<<B::Tunnel as AbstractTunnel>::Id, OpenEntry<B::Tunnel>>,
615
    /// Weak-set of PendingEntry for tunnels that are being built.
616
    ///
617
    /// Because this set only holds weak references, and the only strong
618
    /// reference to the PendingEntry is held by the task building the tunnel,
619
    /// this set's members are lazily removed after the tunnel is either built
620
    /// or fails to build.
621
    ///
622
    /// This set is used for two purposes:
623
    ///
624
    /// 1. When a tunnel request finds that there is no open tunnel for its
625
    ///    purposes, it checks here to see if there is a pending tunnel that it
626
    ///    could wait for.
627
    /// 2. When a pending tunnel finishes building, it checks here to make sure
628
    ///    that it has not been cancelled. (Removing an entry from this set marks
629
    ///    it as cancelled.)
630
    ///
631
    /// An entry is added here in [`AbstractTunnelMgr::prepare_action`] when we
632
    /// decide that a tunnel needs to be launched.
633
    ///
634
    /// Later, in [`AbstractTunnelMgr::do_launch`], once the tunnel has finished
635
    /// (or failed), we remove the entry (by pointer identity).
636
    /// If we cannot find the entry, we conclude that the request has been
637
    /// _cancelled_, and so we discard any tunnel that was created.
638
    pending_tunnels: PtrWeakHashSet<Weak<PendingEntry<B, R>>>,
639
    /// Weak-set of PendingRequest for requests that are waiting for a
640
    /// tunnel to be built.
641
    ///
642
    /// Because this set only holds weak references, and the only
643
    /// strong reference to the PendingRequest is held by the task
644
    /// waiting for the tunnel to be built, this set's members are
645
    /// lazily removed after the request succeeds or fails.
646
    pending_requests: PtrWeakHashSet<Weak<PendingRequest<B, R>>>,
647
}
648

            
649
impl<B: AbstractTunnelBuilder<R>, R: Runtime> TunnelList<B, R> {
650
    /// Make a new empty `CircList`
651
98
    fn new() -> Self {
652
98
        TunnelList {
653
98
            open_tunnels: HashMap::new(),
654
98
            pending_tunnels: PtrWeakHashSet::new(),
655
98
            pending_requests: PtrWeakHashSet::new(),
656
98
        }
657
98
    }
658

            
659
    /// Add `e` to the list of open tunnels.
660
92
    fn add_open(&mut self, e: OpenEntry<B::Tunnel>) {
661
92
        let id = e.tunnel.id();
662
92
        self.open_tunnels.insert(id, e);
663
92
    }
664

            
665
    /// Find all the usable open tunnels that support `usage`.
666
    ///
667
    /// Return None if there are no such tunnels.
668
596
    fn find_open(&mut self, usage: &TargetTunnelUsage) -> Option<Vec<&mut OpenEntry<B::Tunnel>>> {
669
596
        let list = self.open_tunnels.values_mut();
670
596
        let v = SupportedTunnelUsage::find_supported(list, usage);
671
596
        if v.is_empty() { None } else { Some(v) }
672
596
    }
673

            
674
    /// Find an open tunnel by ID.
675
    ///
676
    /// Return None if no such tunnels exists in this list.
677
70
    fn get_open_mut(
678
70
        &mut self,
679
70
        id: &<B::Tunnel as AbstractTunnel>::Id,
680
70
    ) -> Option<&mut OpenEntry<B::Tunnel>> {
681
70
        self.open_tunnels.get_mut(id)
682
70
    }
683

            
684
    /// Extract an open tunnel by ID, removing it from this list.
685
    ///
686
    /// Return None if no such tunnel exists in this list.
687
8
    fn take_open(
688
8
        &mut self,
689
8
        id: &<B::Tunnel as AbstractTunnel>::Id,
690
8
    ) -> Option<OpenEntry<B::Tunnel>> {
691
8
        self.open_tunnels.remove(id)
692
8
    }
693

            
694
    /// Remove tunnels based on expiration times.
695
    ///
696
    /// We remove every unused tunnel that is set to expire by
697
    /// `unused_cutoff`, and every dirty tunnel that has been dirty
698
    /// since before `dirty_cutoff`.
699
    ///
700
    /// Return the next time at which anything will definitely expire,
701
    /// and a list of long-lived tunnels where we need to check their usage status
702
    /// before we can be sure if they are expired.
703
    #[must_use]
704
4
    fn expire_tunnels(
705
4
        &mut self,
706
4
        now: Instant,
707
4
        params: &ExpirationParameters,
708
4
    ) -> (Option<Instant>, Vec<Weak<B::Tunnel>>) {
709
4
        let mut need_check = Vec::new();
710
4
        let mut earliest_expiration = None;
711
4
        self.open_tunnels
712
8
            .retain(|_k, v| match v.should_expire(now, params) {
713
                // Expires now: Do not retain.
714
4
                ShouldExpire::Now => false,
715

            
716
                // Will expire at `when`: keep, but update `earliest_expiration`.
717
4
                ShouldExpire::NotBefore(when) => {
718
                    earliest_expiration = match earliest_expiration {
719
                        Some(t) if t < when => Some(t),
720
4
                        _ => Some(when),
721
                    };
722
4
                    true
723
                }
724

            
725
                // Need to check tunnel to see if/when it is disused.
726
                ShouldExpire::PossiblyNow => {
727
                    need_check.push(Arc::downgrade(&v.tunnel));
728
                    true
729
                }
730
8
            });
731
4
        (earliest_expiration, need_check)
732
4
    }
733

            
734
    /// Return the time when the tunnel with given `id`, should expire.
735
    ///
736
    /// Return None if no such tunnel exists.
737
    fn tunnel_should_expire(
738
        &mut self,
739
        id: &<B::Tunnel as AbstractTunnel>::Id,
740
        now: Instant,
741
        params: &ExpirationParameters,
742
    ) -> Option<ShouldExpire> {
743
        self.open_tunnels
744
            .get(id)
745
            .map(|v| v.should_expire(now, params))
746
    }
747

            
748
    /// Update the "last known to be in use" time of a long-lived tunnel with ID `id`,
749
    /// based on learning when it was last used.
750
    ///
751
    /// Expire the tunnel if appropriate.
752
    ///
753
    /// If the tunnel is still part of the map, return the next instant at which it might expire.
754
    ///
755
    /// Returns an error if the tunnel was present but was _not_ already marked as long-lived.
756
    fn update_long_lived_tunnel_last_used(
757
        &mut self,
758
        id: &<B::Tunnel as AbstractTunnel>::Id,
759
        now: Instant,
760
        params: &ExpirationParameters,
761
        disused_since: &tor_proto::Result<Option<Instant>>,
762
    ) -> crate::Result<Option<Instant>> {
763
        let Ok(disused_since) = disused_since else {
764
            // got an error looking up disused time: discard the circuit.
765
            let discard = self.take_open(id);
766
            if let Some(ent) = discard {
767
                ent.expiration.check_long_lived()?;
768
            }
769
            return Ok(None);
770
        };
771
        let Some(tun) = self.open_tunnels.get_mut(id) else {
772
            // Circuit isn't there. Return.
773
            return Ok(None);
774
        };
775
        tun.expiration.check_long_lived()?;
776
        let last_known_in_use_at = disused_since.unwrap_or(now);
777

            
778
        tun.expiration.mark_used(last_known_in_use_at, true);
779
        match tun.should_expire(now, params) {
780
            ShouldExpire::Now | ShouldExpire::PossiblyNow => {
781
                let _discard = self.take_open(id);
782
                Ok(None)
783
            }
784
            ShouldExpire::NotBefore(instant) => Ok(Some(instant)),
785
        }
786
    }
787

            
788
    /// Add `pending` to the set of in-progress tunnels.
789
136
    fn add_pending_tunnel(&mut self, pending: Arc<PendingEntry<B, R>>) {
790
136
        self.pending_tunnels.insert(pending);
791
136
    }
792

            
793
    /// Find all pending tunnels that support `usage`.
794
    ///
795
    /// If no such tunnels are currently being built, return None.
796
166
    fn find_pending_tunnels(
797
166
        &self,
798
166
        usage: &TargetTunnelUsage,
799
166
    ) -> Option<Vec<Arc<PendingEntry<B, R>>>> {
800
166
        let result: Vec<_> = self
801
166
            .pending_tunnels
802
166
            .iter()
803
166
            .filter(|p| p.supports(usage))
804
166
            .filter(|p| !matches!(p.receiver.peek(), Some(Err(_))))
805
166
            .collect();
806

            
807
166
        if result.is_empty() {
808
140
            None
809
        } else {
810
26
            Some(result)
811
        }
812
166
    }
813

            
814
    /// Return true if `circ` is still pending.
815
    ///
816
    /// A tunnel will become non-pending when finishes (successfully or not), or when it's
817
    /// removed from this list via `clear_all_tunnels()`.
818
52
    fn tunnel_is_pending(&self, circ: &Arc<PendingEntry<B, R>>) -> bool {
819
52
        self.pending_tunnels.contains(circ)
820
52
    }
821

            
822
    /// Construct and add a new entry to the set of request waiting
823
    /// for a tunnel.
824
    ///
825
    /// Return the request, and a new receiver stream that it should
826
    /// use for notification of possible tunnels to use.
827
150
    fn add_pending_request(&mut self, pending: &Arc<PendingRequest<B, R>>) {
828
150
        self.pending_requests.insert(Arc::clone(pending));
829
150
    }
830

            
831
    /// Return all pending requests that would be satisfied by a tunnel
832
    /// that supports `circ_spec`.
833
32
    fn find_pending_requests(
834
32
        &self,
835
32
        circ_spec: &SupportedTunnelUsage,
836
32
    ) -> Vec<Arc<PendingRequest<B, R>>> {
837
32
        self.pending_requests
838
32
            .iter()
839
34
            .filter(|pend| pend.supported_by(circ_spec))
840
32
            .collect()
841
32
    }
842

            
843
    /// Clear all pending and open tunnels.
844
    ///
845
    /// Calling `clear_all_tunnels` ensures that any request that is answered _after
846
    /// this method runs_ will receive a tunnels that was launched _after this
847
    /// method runs_.
848
    fn clear_all_tunnels(&mut self) {
849
        // Note that removing entries from pending_circs will also cause the
850
        // tunnel tasks to realize that they are cancelled when they
851
        // go to tell anybody about their results.
852
        self.pending_tunnels.clear();
853
        self.open_tunnels.clear();
854
    }
855
}
856

            
857
/// Timing information for tunnels that have been built but never used.
858
///
859
/// Currently taken from the network parameters.
860
struct UnusedTimings {
861
    /// Minimum lifetime of a tunnel created while learning
862
    /// tunnel timeouts.
863
    learning: Duration,
864
    /// Minimum lifetime of a tunnel created while not learning
865
    /// tunnel timeouts.
866
    not_learning: Duration,
867
}
868

            
869
// This isn't really fallible, given the definitions of the underlying
870
// types.
871
#[allow(clippy::fallible_impl_from)]
872
impl From<&tor_netdir::params::NetParameters> for UnusedTimings {
873
705
    fn from(v: &tor_netdir::params::NetParameters) -> Self {
874
        // These try_into() calls can't fail, so unwrap() can't panic.
875
        #[allow(clippy::unwrap_used)]
876
705
        UnusedTimings {
877
705
            learning: v
878
705
                .unused_client_circ_timeout_while_learning_cbt
879
705
                .try_into()
880
705
                .unwrap(),
881
705
            not_learning: v.unused_client_circ_timeout.try_into().unwrap(),
882
705
        }
883
705
    }
884
}
885

            
886
/// Abstract implementation for tunnel management.
887
///
888
/// The algorithm provided here is fairly simple. In its simplest form:
889
///
890
/// When somebody asks for a tunnel for a given operation: if we find
891
/// one open already, we return it.  If we find in-progress tunnels
892
/// that would meet our needs, we wait for one to finish (or for all
893
/// to fail).  And otherwise, we launch one or more tunnels to meet the
894
/// request's needs.
895
///
896
/// If this process fails, then we retry it, up to a timeout or a
897
/// numerical limit.
898
///
899
/// If a tunnel not previously considered for a given request
900
/// finishes before the request is satisfied, and if the tunnel would
901
/// satisfy the request, we try to give that tunnel as an answer to
902
/// that request even if it was not one of the tunnels that request
903
/// was waiting for.
904
pub(crate) struct AbstractTunnelMgr<B: AbstractTunnelBuilder<R>, R: Runtime> {
905
    /// Builder used to construct tunnels.
906
    builder: B,
907
    /// An asynchronous runtime to use for launching tasks and
908
    /// checking timeouts.
909
    runtime: R,
910
    /// A CircList to manage our list of tunnels, requests, and
911
    /// pending tunnels.
912
    tunnels: sync::Mutex<TunnelList<B, R>>,
913

            
914
    /// Configured information about when to expire tunnels and requests.
915
    circuit_timing: MutCfg<CircuitTiming>,
916

            
917
    /// Minimum lifetime of an unused tunnel.
918
    ///
919
    /// Derived from the network parameters.
920
    unused_timing: sync::Mutex<UnusedTimings>,
921
}
922

            
923
/// An action to take in order to satisfy a request for a tunnel.
924
enum Action<B: AbstractTunnelBuilder<R>, R: Runtime> {
925
    /// We found an open tunnel: return immediately.
926
    Open(Arc<B::Tunnel>),
927
    /// We found one or more pending tunnels: wait until one succeeds,
928
    /// or all fail.
929
    Wait(FuturesUnordered<Shared<oneshot::Receiver<PendResult<B, R>>>>),
930
    /// We should launch tunnels: here are the instructions for how
931
    /// to do so.
932
    Build(Vec<TunnelBuildPlan<B, R>>),
933
}
934

            
935
impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> AbstractTunnelMgr<B, R> {
936
    /// Construct a new AbstractTunnelMgr.
937
90
    pub(crate) fn new(builder: B, runtime: R, circuit_timing: CircuitTiming) -> Self {
938
90
        let circs = sync::Mutex::new(TunnelList::new());
939
90
        let dflt_params = tor_netdir::params::NetParameters::default();
940
90
        let unused_timing = (&dflt_params).into();
941
90
        AbstractTunnelMgr {
942
90
            builder,
943
90
            runtime,
944
90
            tunnels: circs,
945
90
            circuit_timing: circuit_timing.into(),
946
90
            unused_timing: sync::Mutex::new(unused_timing),
947
90
        }
948
90
    }
949

            
950
    /// Reconfigure this manager using the latest set of network parameters.
951
    pub(crate) fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
952
        let mut u = self
953
            .unused_timing
954
            .lock()
955
            .expect("Poisoned lock for unused_timing");
956
        *u = p.into();
957
    }
958

            
959
    /// Return this manager's [`CircuitTiming`].
960
608
    pub(crate) fn circuit_timing(&self) -> Arc<CircuitTiming> {
961
608
        self.circuit_timing.get()
962
608
    }
963

            
964
    /// Return this manager's [`CircuitTiming`].
965
4
    pub(crate) fn set_circuit_timing(&self, new_config: CircuitTiming) {
966
4
        self.circuit_timing.replace(new_config);
967
4
    }
968
    /// Return a circuit suitable for use with a given `usage`,
969
    /// creating that circuit if necessary, and restricting it
970
    /// under the assumption that it will be used for that spec.
971
    ///
972
    /// This is the primary entry point for AbstractTunnelMgr.
973
    #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor?
974
    #[instrument(level = "trace", skip_all)]
975
464
    pub(crate) async fn get_or_launch(
976
464
        self: &Arc<Self>,
977
464
        usage: &TargetTunnelUsage,
978
464
        dir: DirInfo<'_>,
979
464
    ) -> Result<(Arc<B::Tunnel>, TunnelProvenance)> {
980
        /// Largest number of "resets" that we will accept in this attempt.
981
        ///
982
        /// A "reset" is an internally generated error that does not represent a
983
        /// real problem; only a "whoops, got to try again" kind of a situation.
984
        /// For example, if we reconfigure in the middle of an attempt and need
985
        /// to re-launch the circuit, that counts as a "reset", since there was
986
        /// nothing actually _wrong_ with the circuit we were building.
987
        ///
988
        /// We accept more resets than we do real failures. However,
989
        /// we don't accept an unlimited number: we don't want to inadvertently
990
        /// permit infinite loops here. If we ever bump against this limit, we
991
        /// should not automatically increase it: we should instead figure out
992
        /// why it is happening and try to make it not happen.
993
        const MAX_RESETS: usize = 8;
994

            
995
        let circuit_timing = self.circuit_timing();
996
        let timeout_at = self.runtime.now() + circuit_timing.request_timeout;
997
        let max_tries = circuit_timing.request_max_retries;
998
        // We compute the maximum number of failures by dividing the maximum
999
        // number of circuits to attempt by the number that will be launched in
        // parallel for each iteration.
        let max_failures = usize::div_ceil(
            max_tries as usize,
            std::cmp::max(1, self.builder.launch_parallelism(usage)),
        );
        let mut retry_schedule = RetryDelay::from_msec(100);
        let mut retry_err = RetryError::<Box<Error>>::in_attempt_to("find or build a tunnel");
        let mut n_failures = 0;
        let mut n_resets = 0;
        for attempt_num in 1.. {
            // How much time is remaining?
            let remaining = match timeout_at.checked_duration_since(self.runtime.now()) {
                None => {
                    retry_err.push_timed(
                        Error::RequestTimeout,
                        self.runtime.now(),
                        Some(self.runtime.wallclock()),
                    );
                    break;
                }
                Some(t) => t,
            };
            let error = match self.prepare_action(usage, dir, true) {
                Ok(action) => {
                    // We successfully found an action: Take that action.
                    let outcome = self
                        .runtime
                        .timeout(remaining, Arc::clone(self).take_action(action, usage))
                        .await;
                    match outcome {
                        Ok(Ok(circ)) => return Ok(circ),
                        Ok(Err(e)) => {
                            debug!("Circuit attempt {} failed.", attempt_num);
                            Error::RequestFailed(e)
                        }
                        Err(_) => {
                            // We ran out of "remaining" time; there is nothing
                            // more to be done.
                            warn!("All tunnel attempts failed due to timeout");
                            retry_err.push_timed(
                                Error::RequestTimeout,
                                self.runtime.now(),
                                Some(self.runtime.wallclock()),
                            );
                            break;
                        }
                    }
                }
                Err(e) => {
                    // We couldn't pick the action!
                    debug_report!(
                        &e,
                        "Couldn't pick action for tunnel attempt {}",
                        attempt_num,
                    );
                    e
                }
            };
            // There's been an error.  See how long we wait before we retry.
            let now = self.runtime.now();
            let retry_time =
                error.abs_retry_time(now, || retry_schedule.next_delay(&mut rand::rng()));
            let (count, count_limit) = if error.is_internal_reset() {
                (&mut n_resets, MAX_RESETS)
            } else {
                (&mut n_failures, max_failures)
            };
            // Record the error, flattening it if needed.
            match error {
                // Flatten nested RetryError, using mockable time for each error
                Error::RequestFailed(e) => {
                    retry_err.extend_from_retry_error(e);
                }
                e => retry_err.push_timed(e, now, Some(self.runtime.wallclock())),
            }
            *count += 1;
            // If we have reached our limit of this kind of problem, we're done.
            if *count >= count_limit {
                warn!("Reached circuit build retry limit, exiting...");
                break;
            }
            // Wait, or not, as appropriate.
            match retry_time {
                AbsRetryTime::Immediate => {}
                AbsRetryTime::Never => break,
                AbsRetryTime::At(t) => {
                    let remaining = timeout_at.saturating_duration_since(now);
                    let delay = t.saturating_duration_since(now);
                    trace!(?delay, "Waiting to retry...");
                    self.runtime.sleep(std::cmp::min(delay, remaining)).await;
                }
            }
        }
        warn!("Request failed");
        Err(Error::RequestFailed(retry_err))
464
    }
    /// Make sure a circuit exists, without actually asking for it.
    ///
    /// Make sure that there is a circuit (built or in-progress) that could be
    /// used for `usage`, and launch one or more circuits in a background task
    /// if there is not.
    // TODO: This should probably take some kind of parallelism parameter.
    #[cfg(test)]
8
    pub(crate) fn ensure_tunnel(
8
        self: &Arc<Self>,
8
        usage: &TargetTunnelUsage,
8
        dir: DirInfo<'_>,
8
    ) -> Result<()> {
8
        let action = self.prepare_action(usage, dir, false)?;
8
        if let Action::Build(plans) = action {
16
            for plan in plans {
8
                let self_clone = Arc::clone(self);
8
                let _ignore_receiver = self_clone.spawn_launch(usage, plan);
8
            }
        }
8
        Ok(())
8
    }
    /// Choose which action we should take in order to provide a tunnel
    /// for a given `usage`.
    ///
    /// If `restrict_circ` is true, we restrict the spec of any
    /// circ we decide to use to mark that it _is_ being used for
    /// `usage`.
    #[instrument(level = "trace", skip_all)]
548
    fn prepare_action(
548
        &self,
548
        usage: &TargetTunnelUsage,
548
        dir: DirInfo<'_>,
548
        restrict_circ: bool,
548
    ) -> Result<Action<B, R>> {
548
        let mut list = self.tunnels.lock().expect("poisoned lock");
548
        if let Some(mut open) = list.find_open(usage) {
            // We have open tunnels that meet the spec: return the best one.
382
            let parallelism = self.builder.select_parallelism(usage);
382
            let best = OpenEntry::find_best(&mut open, usage, parallelism);
382
            if restrict_circ {
382
                let now = self.runtime.now();
382
                best.restrict_mut(usage, now)?;
            }
            // TODO: If we have fewer tunnels here than our select
            // parallelism, perhaps we should launch more?
382
            return Ok(Action::Open(best.tunnel.clone()));
166
        }
166
        if let Some(pending) = list.find_pending_tunnels(usage) {
            // There are pending tunnels that could meet the spec.
            // Restrict them under the assumption that they could all
            // be used for this, and then wait until one is ready (or
            // all have failed)
26
            let best = PendingEntry::find_best(&pending, usage);
26
            if restrict_circ {
52
                for item in &best {
                    // TODO: Do we want to tentatively restrict _all_ of these?
                    // not clear to me.
26
                    item.tentative_restrict_mut(usage)?;
                }
            }
26
            let stream = best.iter().map(|item| item.receiver.clone()).collect();
            // TODO: if we have fewer tunnels here than our launch
            // parallelism, we might want to launch more.
26
            return Ok(Action::Wait(stream));
140
        }
        // Okay, we need to launch tunnels here.
140
        let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage));
140
        let mut plans = Vec::new();
140
        let mut last_err = None;
140
        for _ in 0..parallelism {
140
            match self.plan_by_usage(dir, usage) {
132
                Ok((pending, plan)) => {
132
                    list.add_pending_tunnel(pending);
132
                    plans.push(plan);
132
                }
8
                Err(e) => {
8
                    debug!("Unable to make a plan for {:?}: {}", usage, e);
8
                    last_err = Some(e);
                }
            }
        }
140
        if !plans.is_empty() {
132
            Ok(Action::Build(plans))
8
        } else if let Some(last_err) = last_err {
8
            Err(last_err)
        } else {
            // we didn't even try to plan anything!
            Err(internal!("no plans were built, but no errors were found").into())
        }
548
    }
    /// Execute an action returned by pick-action, and return the
    /// resulting tunnel or error.
    #[allow(clippy::cognitive_complexity, clippy::type_complexity)] // TODO #2010: Refactor
    #[instrument(level = "trace", skip_all)]
532
    async fn take_action(
532
        self: Arc<Self>,
532
        act: Action<B, R>,
532
        usage: &TargetTunnelUsage,
532
    ) -> std::result::Result<(Arc<B::Tunnel>, TunnelProvenance), RetryError<Box<Error>>> {
        /// Store the error `err` into `retry_err`, as appropriate.
80
        fn record_error<R: Runtime>(
80
            retry_err: &mut RetryError<Box<Error>>,
80
            source: streams::Source,
80
            building: bool,
80
            mut err: Error,
80
            runtime: &R,
80
        ) {
80
            if source == streams::Source::Right {
                // We don't care about this error, since it is from neither a tunnel we launched
                // nor one that we're waiting on.
                return;
80
            }
80
            if !building {
8
                // We aren't building our own tunnels, so our errors are
8
                // secondary reports of other tunnels' failures.
8
                err = Error::PendingFailed(Box::new(err));
72
            }
80
            retry_err.push_timed(err, runtime.now(), Some(runtime.wallclock()));
80
        }
        /// Return a string describing what it means, within the context of this
        /// function, to have gotten an answer from `source`.
        fn describe_source(building: bool, source: streams::Source) -> &'static str {
            match (building, source) {
                (_, streams::Source::Right) => "optimistic advice",
                (true, streams::Source::Left) => "tunnel we're building",
                (false, streams::Source::Left) => "pending tunnel",
            }
        }
        // Get or make a stream of futures to wait on.
532
        let (building, wait_on_stream) = match act {
            Action::Open(c) => {
                // There's already a perfectly good open tunnel; we can return
                // it now.
                trace!("Returning existing tunnel.");
                return Ok((c, TunnelProvenance::Preexisting));
            }
            Action::Wait(f) => {
                // There is one or more pending tunnel that we're waiting for.
                // If any succeeds, we try to use it.  If they all fail, we
                // fail.
                trace!("Waiting for tunnel.");
                (false, f)
            }
            Action::Build(plans) => {
                // We're going to launch one or more tunnels in parallel.  We
                // report success if any succeeds, and failure of they all fail.
                trace!("Building new tunnel.");
                let futures = FuturesUnordered::new();
                for plan in plans {
                    let self_clone = Arc::clone(&self);
                    // (This is where we actually launch tunnels.)
                    futures.push(self_clone.spawn_launch(usage, plan));
                }
                (true, futures)
            }
        };
        // Insert ourself into the list of pending requests, and make a
        // stream for us to listen on for notification from pending tunnels
        // other than those we are pending on.
        let (pending_request, additional_stream) = {
            // We don't want this queue to participate in memory quota tracking.
            // There isn't any tunnel yet, so there wouldn't be anything to account it to.
            // If this queue has the oldest data, probably the whole system is badly broken.
            // Tearing down the whole tunnel manager won't help.
            let (send, recv) = mpsc_channel_no_memquota(8);
            let pending = Arc::new(PendingRequest {
                usage: usage.clone(),
                notify: send,
            });
            let mut list = self.tunnels.lock().expect("poisoned lock");
            list.add_pending_request(&pending);
            (pending, recv)
        };
        // We use our "select_biased" stream combiner here to ensure that:
        //   1) Circuits from wait_on_stream (the ones we're pending on) are
        //      preferred.
        //   2) We exit this function when those tunnels are exhausted.
        //   3) We still get notified about other tunnels that might meet our
        //      interests.
        //
        // The events from Left stream are the oes that we explicitly asked for,
        // so we'll treat errors there as real problems.  The events from the
        // Right stream are ones that we got opportunistically told about; it's
        // not a big deal if those fail.
        let mut incoming = streams::select_biased(wait_on_stream, additional_stream.map(Ok));
        let mut retry_error = RetryError::in_attempt_to("wait for tunnels");
        while let Some((src, id)) = incoming.next().await {
            match id {
                Ok(Ok(ref id)) => {
                    // Great, we have a tunnel . See if we can use it!
                    let mut list = self.tunnels.lock().expect("poisoned lock");
                    if let Some(ent) = list.get_open_mut(id) {
                        let now = self.runtime.now();
                        match ent.restrict_mut(usage, now) {
                            Ok(()) => {
                                // Great, this will work.  We drop the
                                // pending request now explicitly to remove
                                // it from the list.
                                drop(pending_request);
                                if matches!(ent.expiration, ExpirationInfo::Unused { .. }) {
                                    let try_to_expire_after = if ent.spec.is_long_lived() {
                                        self.circuit_timing().disused_circuit_timeout
                                    } else {
                                        self.circuit_timing().max_dirtiness
                                    };
                                    // Since this tunnel hasn't been used yet, schedule expiration
                                    // task after `max_dirtiness` from now.
                                    spawn_expiration_task(
                                        &self.runtime,
                                        Arc::downgrade(&self),
                                        ent.tunnel.id(),
                                        now + try_to_expire_after,
                                    );
                                }
                                return Ok((ent.tunnel.clone(), TunnelProvenance::NewlyCreated));
                            }
                            Err(e) => {
                                // In this case, a `UsageMismatched` error just means that we lost the race
                                // to restrict this tunnel.
                                let e = match e {
                                    Error::UsageMismatched(e) => Error::LostUsabilityRace(e),
                                    x => x,
                                };
                                if src == streams::Source::Left {
                                    info_report!(
                                        &e,
                                        "{} suggested we use {:?}, but restrictions failed",
                                        describe_source(building, src),
                                        id,
                                    );
                                } else {
                                    debug_report!(
                                        &e,
                                        "{} suggested we use {:?}, but restrictions failed",
                                        describe_source(building, src),
                                        id,
                                    );
                                }
                                record_error(&mut retry_error, src, building, e, &self.runtime);
                                continue;
                            }
                        }
                    }
                }
                Ok(Err(ref e)) => {
                    debug!("{} sent error {:?}", describe_source(building, src), e);
                    record_error(&mut retry_error, src, building, e.clone(), &self.runtime);
                }
                Err(oneshot::Canceled) => {
                    debug!(
                        "{} went away (Canceled), quitting take_action right away",
                        describe_source(building, src)
                    );
                    record_error(
                        &mut retry_error,
                        src,
                        building,
                        Error::PendingCanceled,
                        &self.runtime,
                    );
                    return Err(retry_error);
                }
            }
            debug!(
                "While waiting on tunnel: {:?} from {}",
                id,
                describe_source(building, src)
            );
        }
        // Nothing worked.  We drop the pending request now explicitly
        // to remove it from the list.  (We could just let it get dropped
        // implicitly, but that's a bit confusing.)
        drop(pending_request);
        Err(retry_error)
528
    }
    /// Given a directory and usage, compute the necessary objects to
    /// build a tunnel: A [`PendingEntry`] to keep track of the in-process
    /// tunnel, and a [`TunnelBuildPlan`] that we'll give to the thread
    /// that will build the tunnel.
    ///
    /// The caller should probably add the resulting `PendingEntry` to
    /// `self.circs`.
    ///
    /// This is an internal function that we call when we're pretty sure
    /// we want to build a tunnel.
    #[allow(clippy::type_complexity)]
148
    fn plan_by_usage(
148
        &self,
148
        dir: DirInfo<'_>,
148
        usage: &TargetTunnelUsage,
148
    ) -> Result<(Arc<PendingEntry<B, R>>, TunnelBuildPlan<B, R>)> {
148
        let (plan, bspec) = self.builder.plan_tunnel(usage, dir)?;
140
        let (pending, sender) = PendingEntry::new(&bspec);
140
        let pending = Arc::new(pending);
140
        let plan = TunnelBuildPlan {
140
            plan,
140
            sender,
140
            pending: Arc::clone(&pending),
140
        };
140
        Ok((pending, plan))
148
    }
    /// Launch a managed tunnel for a target usage, without checking
    /// whether one already exists or is pending.
    ///
    /// Return a listener that will be informed when the tunnel is done.
    #[instrument(level = "trace", skip_all)]
4
    pub(crate) fn launch_by_usage(
4
        self: &Arc<Self>,
4
        usage: &TargetTunnelUsage,
4
        dir: DirInfo<'_>,
4
    ) -> Result<Shared<oneshot::Receiver<PendResult<B, R>>>> {
4
        let (pending, plan) = self.plan_by_usage(dir, usage)?;
4
        self.tunnels
4
            .lock()
4
            .expect("Poisoned lock for tunnel list")
4
            .add_pending_tunnel(pending);
4
        Ok(Arc::clone(self).spawn_launch(usage, plan))
4
    }
    /// Spawn a background task to launch a tunnel, and report its status.
    ///
    /// The `usage` argument is the usage from the original request that made
    /// us build this tunnel.
    #[instrument(level = "trace", skip_all)]
136
    fn spawn_launch(
136
        self: Arc<Self>,
136
        usage: &TargetTunnelUsage,
136
        plan: TunnelBuildPlan<B, R>,
136
    ) -> Shared<oneshot::Receiver<PendResult<B, R>>> {
136
        let _ = usage; // Currently unused.
        let TunnelBuildPlan {
136
            mut plan,
136
            sender,
136
            pending,
136
        } = plan;
136
        let request_loyalty = self.circuit_timing().request_loyalty;
136
        let wait_on_future = pending.receiver.clone();
136
        let runtime = self.runtime.clone();
136
        let runtime_copy = self.runtime.clone();
136
        let tid = rand::random::<u64>();
        // We release this block when the tunnel builder task terminates.
136
        let reason = format!("tunnel builder task {}", tid);
136
        runtime.block_advance(reason.clone());
        // During tests, the `FakeBuilder` will need to release the block in order to fake a timeout
        // correctly.
136
        plan.add_blocked_advance_reason(reason);
136
        runtime
136
            .spawn(async move {
136
                let self_clone = Arc::clone(&self);
136
                let future = AssertUnwindSafe(self_clone.do_launch(plan, pending)).catch_unwind();
136
                let (new_spec, reply) = match future.await {
124
                    Ok(x) => x, // Success or regular failure
                    Err(e) => {
                        // Okay, this is a panic.  We have to tell the calling
                        // thread about it, then exit this tunnel builder task.
                        let _ = sender.send(Err(internal!("tunnel build task panicked").into()));
                        std::panic::panic_any(e);
                    }
                };
                // Tell anybody who was listening about it that this
                // tunnel is now usable or failed.
                //
                // (We ignore any errors from `send`: That just means that nobody
                // was waiting for this tunnel.)
124
                let _ = sender.send(reply.clone());
124
                if let Some(new_spec) = new_spec {
                    // Wait briefly before we notify opportunistically.  This
                    // delay will give the tunnels that were originally
                    // specifically intended for a request a little more time
                    // to finish, before we offer it this tunnel instead.
52
                    let sl = runtime_copy.sleep(request_loyalty);
52
                    runtime_copy.allow_one_advance(request_loyalty);
52
                    sl.await;
32
                    let pending = {
32
                        let list = self.tunnels.lock().expect("poisoned lock");
32
                        list.find_pending_requests(&new_spec)
                    };
40
                    for pending_request in pending {
8
                        let _ = pending_request.notify.clone().try_send(reply.clone());
8
                    }
72
                }
104
                runtime_copy.release_advance(format!("tunnel builder task {}", tid));
104
            })
136
            .expect("Couldn't spawn tunnel-building task");
136
        wait_on_future
136
    }
    /// Run in the background to launch a tunnel. Return a 2-tuple of the new
    /// tunnel spec and the outcome that should be sent to the initiator.
    #[instrument(level = "trace", skip_all)]
136
    async fn do_launch(
136
        self: Arc<Self>,
136
        plan: <B as AbstractTunnelBuilder<R>>::Plan,
136
        pending: Arc<PendingEntry<B, R>>,
136
    ) -> (Option<SupportedTunnelUsage>, PendResult<B, R>) {
        let outcome = self.builder.build_tunnel(plan).await;
        match outcome {
            Err(e) => (None, Err(e)),
            Ok((new_spec, tunnel)) => {
                let id = tunnel.id();
                let use_duration = self.pick_use_duration();
                let now = self.runtime.now();
                let exp_inst = now + use_duration;
                let runtime_copy = self.runtime.clone();
                spawn_expiration_task(&runtime_copy, Arc::downgrade(&self), tunnel.id(), exp_inst);
                // I used to call restrict_mut here, but now I'm not so
                // sure. Doing restrict_mut makes sure that this
                // tunnel will be suitable for the request that asked
                // for us in the first place, but that should be
                // ensured anyway by our tracking its tentative
                // assignment.
                //
                // new_spec.restrict_mut(&usage_copy).unwrap();
                let use_before = ExpirationInfo::new(now);
                let open_ent = OpenEntry::new(new_spec.clone(), tunnel, use_before);
                {
                    let mut list = self.tunnels.lock().expect("poisoned lock");
                    // Finally, before we return this tunnel, we need to make
                    // sure that this pending tunnel is still pending.  (If it
                    // is not pending, then it was cancelled through a call to
                    // `retire_all_tunnels`, and the configuration that we used
                    // to launch it is now sufficiently outdated that we should
                    // no longer give this tunnel to a client.)
                    if list.tunnel_is_pending(&pending) {
                        list.add_open(open_ent);
                        // We drop our reference to 'pending' here:
                        // this should make all the weak references to
                        // the `PendingEntry` become dangling.
                        drop(pending);
                        (Some(new_spec), Ok(id))
                    } else {
                        // This tunnel is no longer pending! It must have been cancelled, probably
                        // by a call to retire_all_tunnels()
                        drop(pending); // ibid
                        (None, Err(Error::CircCanceled))
                    }
                }
            }
        }
124
    }
    /// Return the currently configured expiration parameters.
4
    fn expiration_params(&self) -> ExpirationParameters {
4
        let expire_unused_after = self.pick_use_duration();
4
        let expire_dirty_after = self.circuit_timing().max_dirtiness;
4
        let expire_disused_after = self.circuit_timing().disused_circuit_timeout;
4
        ExpirationParameters {
4
            expire_unused_after,
4
            expire_dirty_after,
4
            expire_disused_after,
4
        }
4
    }
    /// Plan and launch a new tunnel to a given target, bypassing our managed
    /// pool of tunnels.
    ///
    /// This method will always return a new tunnel, and never return a tunnel
    /// that this CircMgr gives out for anything else.
    ///
    /// The new tunnel will participate in the guard and timeout apparatus as
    /// appropriate, no retry attempt will be made if the tunnel fails.
    #[cfg(feature = "hs-common")]
    #[instrument(level = "trace", skip_all)]
4
    pub(crate) async fn launch_unmanaged(
4
        &self,
4
        usage: &TargetTunnelUsage,
4
        dir: DirInfo<'_>,
4
    ) -> Result<(SupportedTunnelUsage, B::Tunnel)> {
        let (_, plan) = self.plan_by_usage(dir, usage)?;
        self.builder.build_tunnel(plan.plan).await
4
    }
    /// Remove the tunnel with a given `id` from this manager.
    ///
    /// After this function is called, that tunnel will no longer be handed
    /// out to any future requests.
    ///
    /// Return None if we have no tunnel with the given ID.
8
    pub(crate) fn take_tunnel(
8
        &self,
8
        id: &<B::Tunnel as AbstractTunnel>::Id,
8
    ) -> Option<Arc<B::Tunnel>> {
8
        let mut list = self.tunnels.lock().expect("poisoned lock");
8
        list.take_open(id).map(|e| e.tunnel)
8
    }
    /// Remove all open and pending tunnels and from this manager, to ensure
    /// they can't be given out for any more requests.
    ///
    /// Calling `retire_all_tunnels` ensures that any tunnel request that gets
    /// an  answer _after this method runs_ will receive a tunnel that was
    /// launched _after this method runs_.
    ///
    /// We call this method this when our configuration changes in such a way
    /// that we want to make sure that any new (or pending) requests will
    /// receive tunnels that are built using the new configuration.
    //
    // For more information, see documentation on [`CircuitList::open_circs`],
    // [`CircuitList::pending_circs`], and comments in `do_launch`.
    pub(crate) fn retire_all_tunnels(&self) {
        let mut list = self.tunnels.lock().expect("poisoned lock");
        list.clear_all_tunnels();
    }
    /// Expire tunnels according to the rules in `config` and the
    /// current time `now`.
    ///
    /// Expired tunnels will not be automatically closed, but they will
    /// no longer be given out for new tunnels.
    ///
    /// Return the earliest time at which any current tunnel will expire.
4
    pub(crate) async fn expire_tunnels(&self, now: Instant) -> Option<Instant> {
4
        let expiration_params = self.expiration_params();
        // While holding the lock, we call TunnelList::expire_tunnels.
        // That function will expire what it can, and return a list of the tunnels for which
        // we need to call `disused_since`.
4
        let (mut earliest_expiration, need_to_check) = {
4
            let mut list = self.tunnels.lock().expect("poisoned lock");
4
            list.expire_tunnels(now, &expiration_params)
4
        };
        // Now we've dropped the lock, and can do async checks.
4
        let mut last_known_usage = Vec::new();
4
        for tunnel in need_to_check {
            let Some(tunnel) = Weak::upgrade(&tunnel) else {
                continue; // The tunnel is already gone.
            };
            last_known_usage.push((tunnel.id(), tunnel.last_known_to_be_used_at().await));
        }
        // Now get the lock again, and tell the list what we learned.
        //
        // Note that if this function is called twice simultaneously, in some corner cases, we might
        // decide to expire something twice.  That's okay.
        {
4
            let mut list = self.tunnels.lock().expect("poisoned lock");
4
            for (id, disused_since) in last_known_usage {
                match list.update_long_lived_tunnel_last_used(
                    &id,
                    now,
                    &expiration_params,
                    &disused_since,
                ) {
                    Ok(Some(may_expire)) => {
                        earliest_expiration = match earliest_expiration {
                            Some(exp) if exp < may_expire => Some(exp),
                            _ => Some(may_expire),
                        };
                    }
                    Ok(None) => {}
                    Err(e) => warn_report!(e, "Error while updating status on tunnel {:?}", id),
                }
            }
        }
4
        earliest_expiration
4
    }
    /// Consider expiring the tunnel with given tunnel `id`,
    /// according to the rules in `config` and the current time `now`.
    ///
    /// Returns None if the circuit is expired; otherwise returns the next time at which the circuit may expire.
    pub(crate) async fn consider_expiring_tunnel(
        &self,
        tun_id: &<B::Tunnel as AbstractTunnel>::Id,
        now: Instant,
    ) -> Result<Option<Instant>> {
        let expiration_params = self.expiration_params();
        // With the lock, call TunneList::tunnel_should_expire, and expire it (or don't)
        // if the decision is obvious.
        let tunnel = {
            let mut list: sync::MutexGuard<'_, TunnelList<B, R>> =
                self.tunnels.lock().expect("poisoned lock");
            let Some(should_expire) = list.tunnel_should_expire(tun_id, now, &expiration_params)
            else {
                return Ok(None);
            };
            match should_expire {
                ShouldExpire::Now => {
                    let _discard = list.take_open(tun_id);
                    return Ok(None);
                }
                ShouldExpire::NotBefore(t) => return Ok(Some(t)),
                ShouldExpire::PossiblyNow => {
                    let Some(tunnel_ent) = list.get_open_mut(tun_id) else {
                        return Ok(None);
                    };
                    Arc::clone(&tunnel_ent.tunnel)
                }
            }
        };
        // If we get here, then we have a long-lived tunnel for which we need to check `disused_since`
        let last_known_in_use_at = tunnel.last_known_to_be_used_at().await;
        // Now we tell the TunnelList what we learned.
        {
            let mut list: sync::MutexGuard<'_, TunnelList<B, R>> =
                self.tunnels.lock().expect("poisoned lock");
            list.update_long_lived_tunnel_last_used(
                tun_id,
                now,
                &expiration_params,
                &last_known_in_use_at,
            )
        }
    }
    /// Return the number of open tunnels held by this tunnel manager.
24
    pub(crate) fn n_tunnels(&self) -> usize {
24
        let list = self.tunnels.lock().expect("poisoned lock");
24
        list.open_tunnels.len()
24
    }
    /// Return the number of pending tunnels tracked by this tunnel manager.
    #[cfg(test)]
8
    pub(crate) fn n_pending_tunnels(&self) -> usize {
8
        let list = self.tunnels.lock().expect("poisoned lock");
8
        list.pending_tunnels.len()
8
    }
    /// Get a reference to this manager's runtime.
30
    pub(crate) fn peek_runtime(&self) -> &R {
30
        &self.runtime
30
    }
    /// Get a reference to this manager's builder.
158
    pub(crate) fn peek_builder(&self) -> &B {
158
        &self.builder
158
    }
    /// Pick a duration by when a new tunnel should expire from now
    /// if it has not yet been used
56
    fn pick_use_duration(&self) -> Duration {
56
        let timings = self
56
            .unused_timing
56
            .lock()
56
            .expect("Poisoned lock for unused_timing");
56
        if self.builder.learning_timeouts() {
            timings.learning
        } else {
            // TODO: In Tor, this calculation also depends on
            // stuff related to predicted ports and channel
            // padding.
            use tor_basic_utils::RngExt as _;
56
            let mut rng = rand::rng();
56
            rng.gen_range_checked(timings.not_learning..=timings.not_learning * 2)
56
                .expect("T .. 2x T turned out to be an empty duration range?!")
        }
56
    }
}
/// Spawn an expiration task that expires a tunnel at given instant.
///
/// When the timeout occurs, if the tunnel manager is still present,
/// the task will ask the manager to expire the tunnel, if the tunnel
/// is ready to expire.
//
// TODO: It would be good to do away with this function entirely, and have a smarter expiration
// function.  This one only exists because there is not an "expire some circuits" background task.
52
fn spawn_expiration_task<B, R>(
52
    runtime: &R,
52
    circmgr: Weak<AbstractTunnelMgr<B, R>>,
52
    circ_id: <<B as AbstractTunnelBuilder<R>>::Tunnel as AbstractTunnel>::Id,
52
    exp_inst: Instant,
52
) where
52
    R: Runtime,
52
    B: 'static + AbstractTunnelBuilder<R>,
{
52
    let now = runtime.now();
52
    let rt_copy = runtime.clone();
52
    let mut duration = exp_inst.saturating_duration_since(now);
    // NOTE: Once there was an optimization here that ran the expiration immediately if
    // `duration` was zero.
    // I discarded that optimization when I made `consider_expiring_tunnel` async,
    // since we really want this function _not_ to be async,
    // because we run it in contexts where we hold a Mutex on the tunnel list.
    // Spawn a timer expiration task with given expiration instant.
52
    if let Err(e) = runtime.spawn(async move {
        loop {
52
            rt_copy.sleep(duration).await;
            let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
                cm
            } else {
                return;
            };
            match cm.consider_expiring_tunnel(&circ_id, exp_inst).await {
                Ok(None) => return,
                Ok(Some(when)) => {
                    duration = when.saturating_duration_since(rt_copy.now());
                }
                Err(e) => {
                    warn_report!(
                        e,
                        "Error while considering expiration for tunnel {:?}",
                        circ_id
                    );
                    return;
                }
            }
        }
    }) {
        warn_report!(e, "Unable to launch expiration task");
52
    }
52
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_time_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use crate::isolation::test::{IsolationTokenEq, assert_isoleq};
    use crate::mocks::{FakeBuilder, FakeCirc, FakeId, FakeOp};
    use crate::usage::{ExitPolicy, SupportedTunnelUsage};
    use crate::{
        Error, IsolationToken, StreamIsolation, TargetPort, TargetPorts, TargetTunnelUsage,
    };
    use std::sync::LazyLock;
    use tor_dircommon::fallback::FallbackList;
    use tor_guardmgr::TestConfig;
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
    use tor_netdir::testnet;
    use tor_persist::TestingStateMgr;
    use tor_rtcompat::SleepProvider;
    use tor_rtmock::MockRuntime;
    use web_time_compat::InstantExt;
    #[allow(deprecated)] // TODO #1885
    use tor_rtmock::MockSleepRuntime;
    static FALLBACKS_EMPTY: LazyLock<FallbackList> = LazyLock::new(|| [].into());
    fn di() -> DirInfo<'static> {
        (&*FALLBACKS_EMPTY).into()
    }
    fn target_to_spec(target: &TargetTunnelUsage) -> SupportedTunnelUsage {
        match target {
            TargetTunnelUsage::Exit {
                ports,
                isolation,
                country_code,
                require_stability,
            } => SupportedTunnelUsage::Exit {
                policy: ExitPolicy::from_target_ports(&TargetPorts::from(&ports[..])),
                isolation: Some(isolation.clone()),
                country_code: country_code.clone(),
                all_relays_stable: *require_stability,
            },
            _ => unimplemented!(),
        }
    }
    impl<U: PartialEq> IsolationTokenEq for OpenEntry<U> {
        fn isol_eq(&self, other: &Self) -> bool {
            self.spec.isol_eq(&other.spec)
                && self.tunnel == other.tunnel
                && self.expiration == other.expiration
        }
    }
    impl<U: PartialEq> IsolationTokenEq for &mut OpenEntry<U> {
        fn isol_eq(&self, other: &Self) -> bool {
            self.spec.isol_eq(&other.spec)
                && self.tunnel == other.tunnel
                && self.expiration == other.expiration
        }
    }
    fn make_builder<R: Runtime>(runtime: &R) -> FakeBuilder<R> {
        let state_mgr = TestingStateMgr::new();
        let guard_config = TestConfig::default();
        FakeBuilder::new(runtime, state_mgr, &guard_config)
    }
    #[test]
    fn basic_tests() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let builder = make_builder(&rt);
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let webports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            // Check initialization.
            assert_eq!(mgr.n_tunnels(), 0);
            assert!(mgr.peek_builder().script.lock().unwrap().is_empty());
            // Launch a tunnel ; make sure we get it.
            let c1 = rt.wait_for(mgr.get_or_launch(&webports, di())).await;
            let c1 = c1.unwrap().0;
            assert_eq!(mgr.n_tunnels(), 1);
            // Make sure we get the one we already made if we ask for it.
            let port80 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
            let c2 = mgr.get_or_launch(&port80, di()).await;
            let c2 = c2.unwrap().0;
            assert!(FakeCirc::eq(&c1, &c2));
            assert_eq!(mgr.n_tunnels(), 1);
            // Now try launching two tunnels "at once" to make sure that our
            // pending-tunnel code works.
            let dnsport = TargetTunnelUsage::new_from_ipv4_ports(&[53]);
            let dnsport_restrict = TargetTunnelUsage::Exit {
                ports: vec![TargetPort::ipv4(53)],
                isolation: StreamIsolation::builder().build().unwrap(),
                country_code: None,
                require_stability: false,
            };
            let (c3, c4) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&dnsport, di()),
                    mgr.get_or_launch(&dnsport_restrict, di()),
                ))
                .await;
            let c3 = c3.unwrap().0;
            let c4 = c4.unwrap().0;
            assert!(!FakeCirc::eq(&c1, &c3));
            assert!(FakeCirc::eq(&c3, &c4));
            assert_eq!(c3.id(), c4.id());
            assert_eq!(mgr.n_tunnels(), 2);
            // Now we're going to remove c3 from consideration.  It's the
            // same as c4, so removing c4 will give us None.
            let c3_taken = mgr.take_tunnel(&c3.id()).unwrap();
            let now_its_gone = mgr.take_tunnel(&c4.id());
            assert!(FakeCirc::eq(&c3_taken, &c3));
            assert!(now_its_gone.is_none());
            assert_eq!(mgr.n_tunnels(), 1);
            // Having removed them, let's launch another dnsport and make
            // sure we get a different tunnel.
            let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await;
            let c5 = c5.unwrap().0;
            assert!(!FakeCirc::eq(&c3, &c5));
            assert!(!FakeCirc::eq(&c4, &c5));
            assert_eq!(mgr.n_tunnels(), 2);
            // Now try launch_by_usage.
            let prev = mgr.n_pending_tunnels();
            assert!(mgr.launch_by_usage(&dnsport, di()).is_ok());
            assert_eq!(mgr.n_pending_tunnels(), prev + 1);
            // TODO: Actually make sure that launch_by_usage launched
            // the right thing.
        });
    }
    #[test]
    fn request_timeout() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            // This will fail once, and then completely time out.  The
            // result will be a failure.
            let builder = make_builder(&rt);
            builder.set(&ports, vec![FakeOp::Fail, FakeOp::Timeout]);
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = mgr
                .peek_runtime()
                .wait_for(mgr.get_or_launch(&ports, di()))
                .await;
            assert!(matches!(c1, Err(Error::RequestFailed(_))));
        });
    }
    #[test]
    fn request_timeout2() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            // Now try a more complicated case: we'll try to get things so
            // that we wait for a little over our predicted time because
            // of our wait-for-next-action logic.
            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            let builder = make_builder(&rt);
            builder.set(
                &ports,
                vec![
                    FakeOp::Delay(Duration::from_millis(60_000 - 25)),
                    FakeOp::NoPlan,
                ],
            );
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = mgr
                .peek_runtime()
                .wait_for(mgr.get_or_launch(&ports, di()))
                .await;
            assert!(matches!(c1, Err(Error::RequestFailed(_))));
        });
    }
    #[test]
    fn request_unplannable() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            // This will fail a the planning stages, a lot.
            let builder = make_builder(&rt);
            builder.set(&ports, vec![FakeOp::NoPlan; 2000]);
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
            assert!(matches!(c1, Err(Error::RequestFailed(_))));
        });
    }
    #[test]
    fn request_fails_too_much() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            // This will fail 1000 times, which is above the retry limit.
            let builder = make_builder(&rt);
            builder.set(&ports, vec![FakeOp::Fail; 1000]);
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
            assert!(matches!(c1, Err(Error::RequestFailed(_))));
        });
    }
    #[test]
    fn request_wrong_spec() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            // The first time this is called, it will build a tunnel
            // with the wrong spec.  (A tunnel builder should never
            // actually _do_ that, but it's something we code for.)
            let builder = make_builder(&rt);
            builder.set(
                &ports,
                vec![FakeOp::WrongSpec(target_to_spec(
                    &TargetTunnelUsage::new_from_ipv4_ports(&[22]),
                ))],
            );
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
            assert!(c1.is_ok());
        });
    }
    #[test]
    fn request_retried() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let ports = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            // This will fail twice, and then succeed. The result will be
            // a success.
            let builder = make_builder(&rt);
            builder.set(&ports, vec![FakeOp::Fail, FakeOp::Fail]);
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            // This test doesn't exercise any timeout behaviour.
            rt.block_advance("test doesn't require advancing");
            let (c1, c2) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&ports, di()),
                    mgr.get_or_launch(&ports, di()),
                ))
                .await;
            let c1 = c1.unwrap().0;
            let c2 = c2.unwrap().0;
            assert!(FakeCirc::eq(&c1, &c2));
        });
    }
    #[test]
    fn isolated() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let builder = make_builder(&rt);
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            // Set our isolation so that iso1 and iso2 can't share a tunnel,
            // but no_iso can share a tunnel with either.
            let iso1 = TargetTunnelUsage::Exit {
                ports: vec![TargetPort::ipv4(443)],
                isolation: StreamIsolation::builder()
                    .owner_token(IsolationToken::new())
                    .build()
                    .unwrap(),
                country_code: None,
                require_stability: false,
            };
            let iso2 = TargetTunnelUsage::Exit {
                ports: vec![TargetPort::ipv4(443)],
                isolation: StreamIsolation::builder()
                    .owner_token(IsolationToken::new())
                    .build()
                    .unwrap(),
                country_code: None,
                require_stability: false,
            };
            let no_iso1 = TargetTunnelUsage::new_from_ipv4_ports(&[443]);
            let no_iso2 = no_iso1.clone();
            // We're going to try launching these tunnels in 24 different
            // orders, to make sure that the outcome is correct each time.
            use itertools::Itertools;
            let timeouts: Vec<_> = [0_u64, 2, 4, 6]
                .iter()
                .map(|d| Duration::from_millis(*d))
                .collect();
            for delays in timeouts.iter().permutations(4) {
                let d1 = delays[0];
                let d2 = delays[1];
                let d3 = delays[2];
                let d4 = delays[2];
                let (c_iso1, c_iso2, c_no_iso1, c_no_iso2) = rt
                    .wait_for(futures::future::join4(
                        async {
                            rt.sleep(*d1).await;
                            mgr.get_or_launch(&iso1, di()).await
                        },
                        async {
                            rt.sleep(*d2).await;
                            mgr.get_or_launch(&iso2, di()).await
                        },
                        async {
                            rt.sleep(*d3).await;
                            mgr.get_or_launch(&no_iso1, di()).await
                        },
                        async {
                            rt.sleep(*d4).await;
                            mgr.get_or_launch(&no_iso2, di()).await
                        },
                    ))
                    .await;
                let c_iso1 = c_iso1.unwrap().0;
                let c_iso2 = c_iso2.unwrap().0;
                let c_no_iso1 = c_no_iso1.unwrap().0;
                let c_no_iso2 = c_no_iso2.unwrap().0;
                assert!(!FakeCirc::eq(&c_iso1, &c_iso2));
                assert!(!FakeCirc::eq(&c_iso1, &c_no_iso1));
                assert!(!FakeCirc::eq(&c_iso1, &c_no_iso2));
                assert!(!FakeCirc::eq(&c_iso2, &c_no_iso1));
                assert!(!FakeCirc::eq(&c_iso2, &c_no_iso2));
                assert!(FakeCirc::eq(&c_no_iso1, &c_no_iso2));
            }
        });
    }
    #[test]
    fn opportunistic() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            // The first request will time out completely, but we're
            // making a second request after we launch it.  That
            // request should succeed, and notify the first request.
            let ports1 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
            let ports2 = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            let builder = make_builder(&rt);
            builder.set(&ports1, vec![FakeOp::Timeout]);
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            // Note that ports2 will be wider than ports1, so the second
            // request will have to launch a new tunnel.
            let (c1, c2) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&ports1, di()),
                    async {
                        rt.sleep(Duration::from_millis(100)).await;
                        mgr.get_or_launch(&ports2, di()).await
                    },
                ))
                .await;
            if let (Ok((c1, _)), Ok((c2, _))) = (c1, c2) {
                assert!(FakeCirc::eq(&c1, &c2));
            } else {
                panic!();
            };
        });
    }
    #[test]
    fn prebuild() {
        MockRuntime::test_with_various(|rt| async move {
            // This time we're going to use ensure_tunnel() to make
            // sure that a tunnel gets built, and then launch two
            // other tunnels that will use it.
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let builder = make_builder(&rt);
            let mgr = Arc::new(AbstractTunnelMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let ports1 = TargetTunnelUsage::new_from_ipv4_ports(&[80, 443]);
            let ports2 = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
            let ports3 = TargetTunnelUsage::new_from_ipv4_ports(&[443]);
            let ok = mgr.ensure_tunnel(&ports1, di());
            let (c1, c2) = rt
                .wait_for(futures::future::join(
                    async {
                        rt.sleep(Duration::from_millis(10)).await;
                        mgr.get_or_launch(&ports2, di()).await
                    },
                    async {
                        rt.sleep(Duration::from_millis(50)).await;
                        mgr.get_or_launch(&ports3, di()).await
                    },
                ))
                .await;
            assert!(ok.is_ok());
            let c1 = c1.unwrap().0;
            let c2 = c2.unwrap().0;
            // If we had launched these separately, they wouldn't share
            // a tunnel.
            assert!(FakeCirc::eq(&c1, &c2));
        });
    }
    #[test]
    fn expiration() {
        MockRuntime::test_with_various(|rt| async move {
            use crate::config::CircuitTimingBuilder;
            // Now let's make some tunnels -- one dirty, one clean, and
            // make sure that one expires and one doesn't.
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let builder = make_builder(&rt);
            let circuit_timing = CircuitTimingBuilder::default()
                .max_dirtiness(Duration::from_secs(15))
                .build()
                .unwrap();
            let mgr = Arc::new(AbstractTunnelMgr::new(builder, rt.clone(), circuit_timing));
            let imap = TargetTunnelUsage::new_from_ipv4_ports(&[993]);
            let pop = TargetTunnelUsage::new_from_ipv4_ports(&[995]);
            let ok = mgr.ensure_tunnel(&imap, di());
            let pop1 = rt.wait_for(mgr.get_or_launch(&pop, di())).await;
            assert!(ok.is_ok());
            let pop1 = pop1.unwrap().0;
            rt.advance(Duration::from_secs(30)).await;
            rt.advance(Duration::from_secs(15)).await;
            let imap1 = rt.wait_for(mgr.get_or_launch(&imap, di())).await.unwrap().0;
            // This should expire the pop tunnel, since it came from
            // get_or_launch() [which marks the tunnel as being
            // used].  It should not expire the imap tunnel, since
            // it was not dirty until 15 seconds after the cutoff.
            let now = rt.now();
            mgr.expire_tunnels(now).await;
            let (pop2, imap2) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&pop, di()),
                    mgr.get_or_launch(&imap, di()),
                ))
                .await;
            let pop2 = pop2.unwrap().0;
            let imap2 = imap2.unwrap().0;
            assert!(!FakeCirc::eq(&pop2, &pop1));
            assert!(FakeCirc::eq(&imap2, &imap1));
        });
    }
    /// Returns three exit policies; one that permits nothing, one that permits ports 80
    /// and 443 only, and one that permits all ports.
    fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
        // FIXME(eta): the below is copypasta; would be nice to have a better way of
        //             constructing ExitPolicy objects for testing maybe
        let network = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
        // Nodes with ID 0x0a through 0x13 and 0x1e through 0x27 are
        // exits.  Odd-numbered ones allow only ports 80 and 443;
        // even-numbered ones allow all ports.
        let id_noexit: Ed25519Identity = [0x05; 32].into();
        let id_webexit: Ed25519Identity = [0x11; 32].into();
        let id_fullexit: Ed25519Identity = [0x20; 32].into();
        let not_exit = network.by_id(&id_noexit).unwrap();
        let web_exit = network.by_id(&id_webexit).unwrap();
        let full_exit = network.by_id(&id_fullexit).unwrap();
        let ep_none = ExitPolicy::from_relay(&not_exit);
        let ep_web = ExitPolicy::from_relay(&web_exit);
        let ep_full = ExitPolicy::from_relay(&full_exit);
        (ep_none, ep_web, ep_full)
    }
    #[test]
    fn test_find_supported() {
        let (ep_none, ep_web, ep_full) = get_exit_policies();
        let fake_circ = FakeCirc { id: FakeId::next() };
        let expiration = ExpirationInfo::Unused {
            created: Instant::get(),
        };
        let mut entry_none = OpenEntry::new(
            SupportedTunnelUsage::Exit {
                policy: ep_none,
                isolation: None,
                country_code: None,
                all_relays_stable: true,
            },
            fake_circ.clone(),
            expiration.clone(),
        );
        let mut entry_none_c = entry_none.clone();
        let mut entry_web = OpenEntry::new(
            SupportedTunnelUsage::Exit {
                policy: ep_web,
                isolation: None,
                country_code: None,
                all_relays_stable: true,
            },
            fake_circ.clone(),
            expiration.clone(),
        );
        let mut entry_web_c = entry_web.clone();
        let mut entry_full = OpenEntry::new(
            SupportedTunnelUsage::Exit {
                policy: ep_full,
                isolation: None,
                country_code: None,
                all_relays_stable: true,
            },
            fake_circ,
            expiration,
        );
        let mut entry_full_c = entry_full.clone();
        let usage_web = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
        let empty: Vec<&mut OpenEntry<FakeCirc>> = vec![];
        assert_isoleq!(
            SupportedTunnelUsage::find_supported(vec![&mut entry_none].into_iter(), &usage_web),
            empty
        );
        // HACK(eta): We have to faff around with clones and such because
        //            `abstract_spec_find_supported` has a silly signature that involves `&mut`
        //            refs, which we can't have more than one of.
        assert_isoleq!(
            SupportedTunnelUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_web,
            ),
            vec![&mut entry_web_c]
        );
        assert_isoleq!(
            SupportedTunnelUsage::find_supported(
                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
                &usage_web,
            ),
            vec![&mut entry_web_c, &mut entry_full_c]
        );
        // Test preemptive tunnel usage:
        let usage_preemptive_web = TargetTunnelUsage::Preemptive {
            port: Some(TargetPort::ipv4(80)),
            circs: 2,
            require_stability: false,
        };
        let usage_preemptive_dns = TargetTunnelUsage::Preemptive {
            port: None,
            circs: 2,
            require_stability: false,
        };
        // shouldn't return anything unless there are >=2 tunnels
        assert_isoleq!(
            SupportedTunnelUsage::find_supported(
                vec![&mut entry_none].into_iter(),
                &usage_preemptive_web
            ),
            empty
        );
        assert_isoleq!(
            SupportedTunnelUsage::find_supported(
                vec![&mut entry_none].into_iter(),
                &usage_preemptive_dns
            ),
            empty
        );
        assert_isoleq!(
            SupportedTunnelUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_preemptive_web
            ),
            empty
        );
        assert_isoleq!(
            SupportedTunnelUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_preemptive_dns
            ),
            vec![&mut entry_none_c, &mut entry_web_c]
        );
        assert_isoleq!(
            SupportedTunnelUsage::find_supported(
                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
                &usage_preemptive_web
            ),
            vec![&mut entry_web_c, &mut entry_full_c]
        );
    }
    #[test]
    fn test_circlist_preemptive_target_circs() {
        MockRuntime::test_with_various(|rt| async move {
            #[allow(deprecated)] // TODO #1885
            let rt = MockSleepRuntime::new(rt);
            let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
            let dirinfo = DirInfo::Directory(&netdir);
            let builder = make_builder(&rt);
            for circs in [2, 8].iter() {
                let mut circlist = TunnelList::<FakeBuilder<MockRuntime>, MockRuntime>::new();
                let preemptive_target = TargetTunnelUsage::Preemptive {
                    port: Some(TargetPort::ipv4(80)),
                    circs: *circs,
                    require_stability: false,
                };
                for _ in 0..*circs {
                    assert!(circlist.find_open(&preemptive_target).is_none());
                    let usage = TargetTunnelUsage::new_from_ipv4_ports(&[80]);
                    let (plan, _) = builder.plan_tunnel(&usage, dirinfo).unwrap();
                    let (spec, circ) = rt.wait_for(builder.build_tunnel(plan)).await.unwrap();
                    let entry = OpenEntry::new(
                        spec,
                        circ,
                        ExpirationInfo::new(rt.now() + Duration::from_secs(60)),
                    );
                    circlist.add_open(entry);
                }
                assert!(circlist.find_open(&preemptive_target).is_some());
            }
        });
    }
}