1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48

            
49
// TODO #1645 (either remove this, or decide to have it everywhere)
50
#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
51

            
52
use build::TunnelBuilder;
53
use mgr::{AbstractTunnel, AbstractTunnelBuilder};
54
use tor_basic_utils::retry::RetryDelay;
55
use tor_chanmgr::ChanMgr;
56
use tor_dircommon::fallback::FallbackList;
57
use tor_error::{error_report, warn_report};
58
use tor_guardmgr::RetireCircuits;
59
use tor_linkspec::ChanTarget;
60
use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
61
use tor_proto::circuit::UniqId;
62
use tor_proto::client::circuit::CircParameters;
63
use tor_rtcompat::Runtime;
64

            
65
#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
66
use tor_linkspec::IntoOwnedChanTarget;
67

            
68
use futures::StreamExt;
69
use std::sync::{Arc, Mutex, Weak};
70
use tor_rtcompat::SpawnExt;
71
use tracing::{debug, info, instrument, trace, warn};
72
use web_time_compat::{Duration, Instant, InstantExt};
73

            
74
#[cfg(feature = "testing")]
75
pub use config::test_config::TestConfig;
76

            
77
pub mod build;
78
mod config;
79
mod err;
80
#[cfg(feature = "hs-common")]
81
pub mod hspool;
82
mod impls;
83
pub mod isolation;
84
mod mgr;
85
#[cfg(test)]
86
mod mocks;
87
mod preemptive;
88
pub mod timeouts;
89
mod tunnel;
90
mod usage;
91

            
92
// Can't apply `visibility` to modules.
93
cfg_if::cfg_if! {
94
    if #[cfg(feature = "experimental-api")] {
95
        pub mod path;
96
    } else {
97
        pub(crate) mod path;
98
    }
99
}
100

            
101
pub use err::Error;
102
pub use isolation::IsolationToken;
103
pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
104
pub use tunnel::{
105
    ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
106
    ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
107
    ServiceOnionServiceIntroTunnel,
108
};
109
#[cfg(feature = "conflux")]
110
pub use tunnel::{
111
    ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
112
    ServiceMultiPathOnionServiceDataTunnel,
113
};
114
pub use usage::{TargetPort, TargetPorts};
115

            
116
pub use config::{
117
    CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
118
    PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
119
};
120

            
121
use crate::isolation::StreamIsolation;
122
use crate::mgr::TunnelProvenance;
123
use crate::preemptive::PreemptiveCircuitPredictor;
124
use usage::TargetTunnelUsage;
125

            
126
use safelog::sensitive as sv;
127
#[cfg(feature = "geoip")]
128
use tor_geoip::CountryCode;
129
pub use tor_guardmgr::{ExternalActivity, FirstHopId};
130
use tor_persist::StateMgr;
131
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
132

            
133
#[cfg(feature = "hs-common")]
134
use crate::hspool::{HsCircKind, HsCircStemKind};
135
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
136
use tor_guardmgr::vanguards::VanguardMgr;
137

            
138
/// A Result type as returned from this crate.
139
pub type Result<T> = std::result::Result<T, Error>;
140

            
141
/// Type alias for dynamic StorageHandle that can handle our timeout state.
142
type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
143

            
144
/// Key used to load timeout state information.
145
const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
146

            
147
/// Represents what we know about the Tor network.
148
///
149
/// This can either be a complete directory, or a list of fallbacks.
150
///
151
/// Not every DirInfo can be used to build every kind of circuit:
152
/// if you try to build a path with an inadequate DirInfo, you'll get a
153
/// NeedConsensus error.
154
#[derive(Debug, Copy, Clone)]
155
#[non_exhaustive]
156
pub enum DirInfo<'a> {
157
    /// A list of fallbacks, for use when we don't know a network directory.
158
    Fallbacks(&'a FallbackList),
159
    /// A complete network directory
160
    Directory(&'a NetDir),
161
    /// No information: we can only build one-hop paths: and that, only if the
162
    /// guard manager knows some guards or fallbacks.
163
    Nothing,
164
}
165

            
166
impl<'a> From<&'a FallbackList> for DirInfo<'a> {
167
478
    fn from(v: &'a FallbackList) -> DirInfo<'a> {
168
478
        DirInfo::Fallbacks(v)
169
478
    }
170
}
171
impl<'a> From<&'a NetDir> for DirInfo<'a> {
172
540
    fn from(v: &'a NetDir) -> DirInfo<'a> {
173
540
        DirInfo::Directory(v)
174
540
    }
175
}
176
impl<'a> DirInfo<'a> {
177
    /// Return a set of circuit parameters for this DirInfo.
178
6
    fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
179
        use tor_netdir::params::NetParameters;
180
        // We use a common function for both cases here to be sure that
181
        // we look at the defaults from NetParameters code.
182
6
        let defaults = NetParameters::default();
183
6
        let net_params = match self {
184
4
            DirInfo::Directory(d) => d.params(),
185
2
            _ => &defaults,
186
        };
187
6
        match usage {
188
            #[cfg(feature = "hs-common")]
189
            TargetTunnelUsage::HsCircBase { .. } => {
190
                build::onion_circparams_from_netparams(net_params)
191
            }
192
6
            _ => build::exit_circparams_from_netparams(net_params),
193
        }
194
6
    }
195
}
196

            
197
/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
198
/// when they're suitable, and launching them if they don't already exist.
199
///
200
/// Right now, its notion of "suitable" is quite rudimentary: it just
201
/// believes in two kinds of circuits: Exit circuits, and directory
202
/// circuits.  Exit circuits are ones that were created to connect to
203
/// a set of ports; directory circuits were made to talk to directory caches.
204
///
205
/// This is a "handle"; clones of it share state.
206
pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
207

            
208
impl<R: Runtime> CircMgr<R> {
209
    /// Construct a new circuit manager.
210
    ///
211
    /// # Usage note
212
    ///
213
    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
214
30
    pub fn new<SM, CFG: CircMgrConfig>(
215
30
        config: &CFG,
216
30
        storage: SM,
217
30
        runtime: &R,
218
30
        chanmgr: Arc<ChanMgr<R>>,
219
30
        guardmgr: &tor_guardmgr::GuardMgr<R>,
220
30
    ) -> Result<Self>
221
30
    where
222
30
        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
223
    {
224
30
        Ok(Self(Arc::new(CircMgrInner::new(
225
30
            config, storage, runtime, chanmgr, guardmgr,
226
        )?)))
227
30
    }
228

            
229
    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
230
    /// launching it if necessary.
231
    #[instrument(level = "trace", skip_all)]
232
    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
233
        let tunnel = self.0.get_or_launch_dir(netdir).await?;
234
        Ok(tunnel.into())
235
    }
236

            
237
    /// Return a circuit suitable for exiting to all of the provided
238
    /// `ports`, launching it if necessary.
239
    ///
240
    /// If the list of ports is empty, then the chosen circuit will
241
    /// still end at _some_ exit.
242
    #[instrument(level = "trace", skip_all)]
243
    pub async fn get_or_launch_exit(
244
        &self,
245
        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
246
        ports: &[TargetPort],
247
        isolation: StreamIsolation,
248
        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
249
        //             additive. The function should be refactored to be builder-like.
250
        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
251
    ) -> Result<ClientDataTunnel> {
252
        let tunnel = self
253
            .0
254
            .get_or_launch_exit(
255
                netdir,
256
                ports,
257
                isolation,
258
                #[cfg(feature = "geoip")]
259
                country_code,
260
            )
261
            .await?;
262
        Ok(tunnel.into())
263
    }
264

            
265
    /// Return a circuit to a specific relay, suitable for using for direct
266
    /// (one-hop) directory downloads.
267
    ///
268
    /// This could be used, for example, to download a descriptor for a bridge.
269
    #[cfg(feature = "specific-relay")]
270
    #[instrument(level = "trace", skip_all)]
271
    pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
272
        &self,
273
        target: T,
274
    ) -> Result<ClientDirTunnel> {
275
        let tunnel = self.0.get_or_launch_dir_specific(target).await?;
276
        Ok(tunnel.into())
277
    }
278

            
279
    /// Launch the periodic daemon tasks required by the manager to function properly.
280
    ///
281
    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
282
    //
283
    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
284
    #[instrument(level = "trace", skip_all)]
285
22
    pub fn launch_background_tasks<D, S>(
286
22
        self: &Arc<Self>,
287
22
        runtime: &R,
288
22
        dir_provider: &Arc<D>,
289
22
        state_mgr: S,
290
22
    ) -> Result<Vec<TaskHandle>>
291
22
    where
292
22
        D: NetDirProvider + 'static + ?Sized,
293
22
        S: StateMgr + std::marker::Send + 'static,
294
    {
295
22
        CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
296
22
    }
297

            
298
    /// Return true if `netdir` has enough information to be used for this
299
    /// circuit manager.
300
    ///
301
    /// (This will check whether the netdir is missing any primary guard
302
    /// microdescriptors)
303
    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
304
        self.0.netdir_is_sufficient(netdir)
305
    }
306

            
307
    /// If `circ_id` is the unique identifier for a circuit that we're
308
    /// keeping track of, don't give it out for any future requests.
309
    pub fn retire_circ(&self, circ_id: &UniqId) {
310
        self.0.retire_circ(circ_id);
311
    }
312

            
313
    /// Record that a failure occurred on a circuit with a given guard, in a way
314
    /// that makes us unwilling to use that guard for future circuits.
315
    ///
316
    pub fn note_external_failure(
317
        &self,
318
        target: &impl ChanTarget,
319
        external_failure: ExternalActivity,
320
    ) {
321
        self.0.note_external_failure(target, external_failure);
322
    }
323

            
324
    /// Record that a success occurred on a circuit with a given guard, in a way
325
    /// that makes us possibly willing to use that guard for future circuits.
326
    pub fn note_external_success(
327
        &self,
328
        target: &impl ChanTarget,
329
        external_activity: ExternalActivity,
330
    ) {
331
        self.0.note_external_success(target, external_activity);
332
    }
333

            
334
    /// Return a stream of events about our estimated clock skew; these events
335
    /// are `None` when we don't have enough information to make an estimate,
336
    /// and `Some(`[`SkewEstimate`]`)` otherwise.
337
    ///
338
    /// Note that this stream can be lossy: if the estimate changes more than
339
    /// one before you read from the stream, you might only get the most recent
340
    /// update.
341
22
    pub fn skew_events(&self) -> ClockSkewEvents {
342
22
        self.0.skew_events()
343
22
    }
344

            
345
    /// Try to change our configuration settings to `new_config`.
346
    ///
347
    /// The actual behavior here will depend on the value of `how`.
348
    ///
349
    /// Returns whether any of the circuit pools should be cleared.
350
    #[instrument(level = "trace", skip_all)]
351
8
    pub fn reconfigure<CFG: CircMgrConfig>(
352
8
        &self,
353
8
        new_config: &CFG,
354
8
        how: tor_config::Reconfigure,
355
8
    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
356
8
        self.0.reconfigure(new_config, how)
357
8
    }
358

            
359
    /// Return an estimate-based delay for how long a given
360
    /// [`Action`](timeouts::Action) should be allowed to complete.
361
    ///
362
    /// Note that **you do not need to use this function** in order to get
363
    /// reasonable timeouts for the circuit-building operations provided by the
364
    /// `tor-circmgr` crate: those, unless specifically noted, always use these
365
    /// timeouts to cancel circuit operations that have taken too long.
366
    ///
367
    /// Instead, you should only use this function when you need to estimate how
368
    /// long some _other_ operation should take to complete.  For example, if
369
    /// you are sending a request over a 3-hop circuit and waiting for a reply,
370
    /// you might choose to wait for `estimate_timeout(Action::RoundTrip {
371
    /// length: 3 })`.
372
    ///
373
    /// Note also that this function returns a _timeout_ that the operation
374
    /// should be permitted to complete, not an estimated Duration that the
375
    /// operation _will_ take to complete. Timeouts are chosen to ensure that
376
    /// most operations will complete, but very slow ones will not.  So even if
377
    /// we expect that a circuit will complete in (say) 3 seconds, we might
378
    /// still allow a timeout of 4.5 seconds, to ensure that most circuits can
379
    /// complete.
380
    ///
381
    /// Estimate-based timeouts may change over time, given observations on the
382
    /// actual amount of time needed for circuits to complete building.  If not
383
    /// enough information has been gathered, a reasonable default will be used.
384
    pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
385
        self.0.estimate_timeout(timeout_action)
386
    }
387

            
388
    /// Return a reference to the associated CircuitBuilder that this CircMgr
389
    /// will use to create its circuits.
390
    #[cfg(feature = "experimental-api")]
391
    pub fn builder(&self) -> &TunnelBuilder<R> {
392
        CircMgrInner::builder(&self.0)
393
    }
394
}
395

            
396
/// Internal object used to implement CircMgr, which allows for mocking.
397
#[derive(Clone)]
398
pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
399
    /// The underlying circuit manager object that implements our behavior.
400
    mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
401
    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
402
    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
403
}
404

            
405
impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
406
    /// Construct a new circuit manager.
407
    ///
408
    /// # Usage note
409
    ///
410
    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
411
    #[allow(clippy::unnecessary_wraps)]
412
42
    pub(crate) fn new<SM, CFG: CircMgrConfig>(
413
42
        config: &CFG,
414
42
        storage: SM,
415
42
        runtime: &R,
416
42
        chanmgr: Arc<ChanMgr<R>>,
417
42
        guardmgr: &tor_guardmgr::GuardMgr<R>,
418
42
    ) -> Result<Self>
419
42
    where
420
42
        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
421
    {
422
        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
423
42
        let vanguardmgr = {
424
            // TODO(#1382): we need a way of checking if this arti instance
425
            // is running an onion service or not.
426
            //
427
            // Perhaps this information should be provided by CircMgrConfig.
428
42
            let has_onion_svc = false;
429
42
            VanguardMgr::new(
430
42
                config.vanguard_config(),
431
42
                runtime.clone(),
432
42
                storage.clone(),
433
42
                has_onion_svc,
434
            )?
435
        };
436

            
437
42
        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
438

            
439
42
        let builder = build::TunnelBuilder::new(
440
42
            runtime.clone(),
441
42
            chanmgr,
442
42
            config.path_rules().clone(),
443
42
            storage_handle,
444
42
            guardmgr.clone(),
445
            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
446
42
            vanguardmgr,
447
        );
448

            
449
42
        Ok(Self::new_generic(config, runtime, guardmgr, builder))
450
42
    }
451
}
452

            
453
impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
454
    /// Generic implementation for [`CircMgrInner::new`]
455
46
    pub(crate) fn new_generic<CFG: CircMgrConfig>(
456
46
        config: &CFG,
457
46
        runtime: &R,
458
46
        guardmgr: &tor_guardmgr::GuardMgr<R>,
459
46
        builder: B,
460
46
    ) -> Self {
461
46
        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
462
46
            config.preemptive_circuits().clone(),
463
        )));
464

            
465
46
        guardmgr.set_filter(config.path_rules().build_guard_filter());
466

            
467
46
        let mgr =
468
46
            mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
469

            
470
46
        CircMgrInner {
471
46
            mgr: Arc::new(mgr),
472
46
            predictor: preemptive,
473
46
        }
474
46
    }
475

            
476
    /// Launch the periodic daemon tasks required by the manager to function properly.
477
    ///
478
    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
479
    //
480
    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
481
    #[instrument(level = "trace", skip_all)]
482
26
    pub(crate) fn launch_background_tasks<D, S>(
483
26
        self: &Arc<Self>,
484
26
        runtime: &R,
485
26
        dir_provider: &Arc<D>,
486
26
        state_mgr: S,
487
26
    ) -> Result<Vec<TaskHandle>>
488
26
    where
489
26
        D: NetDirProvider + 'static + ?Sized,
490
26
        S: StateMgr + std::marker::Send + 'static,
491
    {
492
26
        let mut ret = vec![];
493

            
494
26
        runtime
495
26
            .spawn(Self::keep_circmgr_params_updated(
496
26
                dir_provider.events(),
497
26
                Arc::downgrade(self),
498
26
                Arc::downgrade(dir_provider),
499
            ))
500
26
            .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
501

            
502
26
        let (sched, handle) = TaskSchedule::new(runtime.clone());
503
26
        ret.push(handle);
504

            
505
26
        runtime
506
26
            .spawn(Self::update_persistent_state(
507
26
                sched,
508
26
                Arc::downgrade(self),
509
26
                state_mgr,
510
            ))
511
26
            .map_err(|e| Error::from_spawn("persistent state updater", e))?;
512

            
513
26
        let (sched, handle) = TaskSchedule::new(runtime.clone());
514
26
        ret.push(handle);
515

            
516
26
        runtime
517
26
            .spawn(Self::continually_launch_timeout_testing_circuits(
518
26
                sched,
519
26
                Arc::downgrade(self),
520
26
                Arc::downgrade(dir_provider),
521
            ))
522
26
            .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
523

            
524
26
        let (sched, handle) = TaskSchedule::new(runtime.clone());
525
26
        ret.push(handle);
526

            
527
26
        runtime
528
26
            .spawn(Self::continually_preemptively_build_circuits(
529
26
                sched,
530
26
                Arc::downgrade(self),
531
26
                Arc::downgrade(dir_provider),
532
            ))
533
26
            .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
534

            
535
26
        self.mgr
536
26
            .peek_builder()
537
26
            .guardmgr()
538
26
            .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
539

            
540
        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
541
        {
542
26
            let () = self
543
26
                .mgr
544
26
                .peek_builder()
545
26
                .vanguardmgr()
546
26
                .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
547
        }
548

            
549
26
        Ok(ret)
550
26
    }
551

            
552
    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
553
    /// launching it if necessary.
554
    #[instrument(level = "trace", skip_all)]
555
    pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
556
        self.expire_circuits().await;
557
        let usage = TargetTunnelUsage::Dir;
558
        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
559
    }
560

            
561
    /// Return a circuit suitable for exiting to all of the provided
562
    /// `ports`, launching it if necessary.
563
    ///
564
    /// If the list of ports is empty, then the chosen circuit will
565
    /// still end at _some_ exit.
566
    #[instrument(level = "trace", skip_all)]
567
    pub(crate) async fn get_or_launch_exit(
568
        &self,
569
        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
570
        ports: &[TargetPort],
571
        isolation: StreamIsolation,
572
        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
573
        //             additive. The function should be refactored to be builder-like.
574
        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
575
    ) -> Result<Arc<B::Tunnel>> {
576
        self.expire_circuits().await;
577
        // TODO #2428: Shouldn't we look at runtime.now() instead?
578
        let time = Instant::get();
579
        {
580
            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
581
            if ports.is_empty() {
582
                predictive.note_usage(None, time);
583
            } else {
584
                for port in ports.iter() {
585
                    predictive.note_usage(Some(*port), time);
586
                }
587
            }
588
        }
589
        let require_stability = ports.iter().any(|p| {
590
            self.mgr
591
                .peek_builder()
592
                .path_config()
593
                .long_lived_ports
594
                .contains(&p.port)
595
        });
596
        let ports = ports.iter().map(Clone::clone).collect();
597
        #[cfg(not(feature = "geoip"))]
598
        let country_code = None;
599
        let usage = TargetTunnelUsage::Exit {
600
            ports,
601
            isolation,
602
            country_code,
603
            require_stability,
604
        };
605
        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
606
    }
607

            
608
    /// Return a circuit to a specific relay, suitable for using for direct
609
    /// (one-hop) directory downloads.
610
    ///
611
    /// This could be used, for example, to download a descriptor for a bridge.
612
    #[cfg(feature = "specific-relay")]
613
    #[instrument(level = "trace", skip_all)]
614
    pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
615
        &self,
616
        target: T,
617
    ) -> Result<Arc<B::Tunnel>> {
618
        self.expire_circuits().await;
619
        let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
620
        self.mgr
621
            .get_or_launch(&usage, DirInfo::Nothing)
622
            .await
623
            .map(|(c, _)| c)
624
    }
625

            
626
    /// Try to change our configuration settings to `new_config`.
627
    ///
628
    /// The actual behavior here will depend on the value of `how`.
629
    ///
630
    /// Returns whether any of the circuit pools should be cleared.
631
    #[instrument(level = "trace", skip_all)]
632
8
    pub(crate) fn reconfigure<CFG: CircMgrConfig>(
633
8
        &self,
634
8
        new_config: &CFG,
635
8
        how: tor_config::Reconfigure,
636
8
    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
637
8
        let old_path_rules = self.mgr.peek_builder().path_config();
638
8
        let predictor = self.predictor.lock().expect("poisoned lock");
639
8
        let preemptive_circuits = predictor.config();
640
8
        if preemptive_circuits.initial_predicted_ports
641
8
            != new_config.preemptive_circuits().initial_predicted_ports
642
        {
643
            // This change has no effect, since the list of ports was _initial_.
644
            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
645
8
        }
646

            
647
8
        if how == tor_config::Reconfigure::CheckAllOrNothing {
648
4
            return Ok(RetireCircuits::None);
649
4
        }
650

            
651
4
        let retire_because_of_guardmgr =
652
4
            self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
653

            
654
        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
655
4
        let retire_because_of_vanguardmgr = self
656
4
            .mgr
657
4
            .peek_builder()
658
4
            .vanguardmgr()
659
4
            .reconfigure(new_config.vanguard_config())?;
660

            
661
4
        let new_reachable = &new_config.path_rules().reachable_addrs;
662
4
        if new_reachable != &old_path_rules.reachable_addrs {
663
            let filter = new_config.path_rules().build_guard_filter();
664
            self.mgr.peek_builder().guardmgr().set_filter(filter);
665
4
        }
666

            
667
4
        let discard_all_circuits = !new_config
668
4
            .path_rules()
669
4
            .at_least_as_permissive_as(&old_path_rules)
670
4
            || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
671

            
672
        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
673
4
        let discard_all_circuits = discard_all_circuits
674
4
            || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
675

            
676
4
        self.mgr
677
4
            .peek_builder()
678
4
            .set_path_config(new_config.path_rules().clone());
679
4
        self.mgr
680
4
            .set_circuit_timing(new_config.circuit_timing().clone());
681
4
        predictor.set_config(new_config.preemptive_circuits().clone());
682

            
683
4
        if discard_all_circuits {
684
            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
685
            // retire those circuits that do not conform to the new path rules,
686
            // or do not conform to the new guard configuration.
687
            info!("Path configuration has become more restrictive: retiring existing circuits.");
688
            self.retire_all_circuits();
689
            return Ok(RetireCircuits::All);
690
4
        }
691
4
        Ok(RetireCircuits::None)
692
8
    }
693

            
694
    /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
695
    /// `circmgr` with the consensus parameters from `dirmgr`.
696
    ///
697
    /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
698
    /// dangling.
699
    ///
700
    /// This is a daemon task: it runs indefinitely in the background.
701
    #[instrument(level = "trace", skip_all)]
702
26
    async fn keep_circmgr_params_updated<D>(
703
26
        mut events: impl futures::Stream<Item = DirEvent> + Unpin,
704
26
        circmgr: Weak<Self>,
705
26
        dirmgr: Weak<D>,
706
26
    ) where
707
26
        D: NetDirProvider + 'static + ?Sized,
708
26
    {
709
        use DirEvent::*;
710
        while let Some(event) = events.next().await {
711
            if matches!(event, NewConsensus) {
712
                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
713
                    if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
714
                        cm.update_network_parameters(netdir.params());
715
                    }
716
                } else {
717
                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
718
                    break;
719
                }
720
            }
721
        }
722
4
    }
723

            
724
    /// Reconfigure this circuit manager using the latest set of
725
    /// network parameters.
726
    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
727
        self.mgr.update_network_parameters(p);
728
        self.mgr.peek_builder().update_network_parameters(p);
729
    }
730

            
731
    /// Run indefinitely, launching circuits as needed to get a good
732
    /// estimate for our circuit build timeouts.
733
    ///
734
    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
735
    ///
736
    /// This is a daemon task: it runs indefinitely in the background.
737
    #[instrument(level = "trace", skip_all)]
738
26
    async fn continually_launch_timeout_testing_circuits<D>(
739
26
        mut sched: TaskSchedule<R>,
740
26
        circmgr: Weak<Self>,
741
26
        dirmgr: Weak<D>,
742
26
    ) where
743
26
        D: NetDirProvider + 'static + ?Sized,
744
26
    {
745
        while sched.next().await.is_some() {
746
            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
747
                if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
748
                    if let Err(e) = cm
749
                        .launch_timeout_testing_circuit_if_appropriate(&netdir)
750
                        .await
751
                    {
752
                        warn_report!(e, "Problem launching a timeout testing circuit");
753
                    }
754
                    let delay = netdir
755
                        .params()
756
                        .cbt_testing_delay
757
                        .try_into()
758
                        .expect("Out-of-bounds value from BoundedInt32");
759

            
760
                    drop((cm, dm));
761
                    sched.fire_in(delay);
762
                } else {
763
                    // wait for the provider to announce some event, which will probably be
764
                    // NewConsensus; this is therefore a decent yardstick for rechecking
765
                    let _ = dm.events().next().await;
766
                    sched.fire();
767
                }
768
            } else {
769
                return;
770
            }
771
        }
772
8
    }
773

            
774
    /// If we need to launch a testing circuit to judge our circuit
775
    /// build timeouts timeouts, do so.
776
    ///
777
    /// # Note
778
    ///
779
    /// This function is invoked periodically from
780
    /// `continually_launch_timeout_testing_circuits`.
781
    async fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
782
        if !self.mgr.peek_builder().learning_timeouts() {
783
            return Ok(());
784
        }
785
        // We expire any too-old circuits here, so they don't get
786
        // counted towards max_circs.
787
        self.expire_circuits().await;
788
        let max_circs: u64 = netdir
789
            .params()
790
            .cbt_max_open_circuits_for_testing
791
            .try_into()
792
            .expect("Out-of-bounds result from BoundedInt32");
793
        if (self.mgr.n_tunnels() as u64) < max_circs {
794
            // Actually launch the circuit!
795
            let usage = TargetTunnelUsage::TimeoutTesting;
796
            let dirinfo = netdir.into();
797
            let mgr = Arc::clone(&self.mgr);
798
            debug!("Launching a circuit to test build times.");
799
            let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
800
            // We don't actually care when this circuit is done,
801
            // so it's okay to drop the Receiver without awaiting it.
802
            drop(receiver);
803
        }
804

            
805
        Ok(())
806
    }
807

            
808
    /// Run forever, periodically telling `circmgr` to update its persistent
809
    /// state.
810
    ///
811
    /// Exit when we notice that `circmgr` has been dropped.
812
    ///
813
    /// This is a daemon task: it runs indefinitely in the background.
814
    #[allow(clippy::cognitive_complexity)] // because of tracing
815
    #[instrument(level = "trace", skip_all)]
816
26
    async fn update_persistent_state<S>(
817
26
        mut sched: TaskSchedule<R>,
818
26
        circmgr: Weak<Self>,
819
26
        statemgr: S,
820
26
    ) where
821
26
        S: StateMgr + std::marker::Send,
822
26
    {
823
        while sched.next().await.is_some() {
824
            if let Some(circmgr) = Weak::upgrade(&circmgr) {
825
                use tor_persist::LockStatus::*;
826

            
827
                match statemgr.try_lock() {
828
                    Err(e) => {
829
                        error_report!(e, "Problem with state lock file");
830
                        break;
831
                    }
832
                    Ok(NewlyAcquired) => {
833
                        info!("We now own the lock on our state files.");
834
                        if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
835
                            error_report!(e, "Unable to upgrade to owned state files");
836
                            break;
837
                        }
838
                    }
839
                    Ok(AlreadyHeld) => {
840
                        if let Err(e) = circmgr.store_persistent_state() {
841
                            error_report!(e, "Unable to flush circmgr state");
842
                            break;
843
                        }
844
                    }
845
                    Ok(NoLock) => {
846
                        if let Err(e) = circmgr.reload_persistent_state() {
847
                            error_report!(e, "Unable to reload circmgr state");
848
                            break;
849
                        }
850
                    }
851
                }
852
            } else {
853
                debug!("Circmgr has disappeared; task exiting.");
854
                return;
855
            }
856
            // TODO(nickm): This delay is probably too small.
857
            //
858
            // Also, we probably don't even want a fixed delay here.  Instead,
859
            // we should be updating more frequently when the data is volatile
860
            // or has important info to save, and not at all when there are no
861
            // changes.
862
            sched.fire_in(Duration::from_secs(60));
863
        }
864

            
865
        debug!("State update task exiting (potentially due to handle drop).");
866
26
    }
867

            
868
    /// Switch from having an unowned persistent state to having an owned one.
869
    ///
870
    /// Requires that we hold the lock on the state files.
871
    pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
872
        self.mgr.peek_builder().upgrade_to_owned_state()?;
873
        Ok(())
874
    }
875

            
876
    /// Reload state from the state manager.
877
    ///
878
    /// We only call this method if we _don't_ have the lock on the state
879
    /// files.  If we have the lock, we only want to save.
880
    pub(crate) fn reload_persistent_state(&self) -> Result<()> {
881
        self.mgr.peek_builder().reload_state()?;
882
        Ok(())
883
    }
884

            
885
    /// Run indefinitely, launching circuits where the preemptive circuit
886
    /// predictor thinks it'd be a good idea to have them.
887
    ///
888
    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
889
    ///
890
    /// This is a daemon task: it runs indefinitely in the background.
891
    ///
892
    /// # Note
893
    ///
894
    /// This would be better handled entirely within `tor-circmgr`, like
895
    /// other daemon tasks.
896
    #[instrument(level = "trace", skip_all)]
897
26
    async fn continually_preemptively_build_circuits<D>(
898
26
        mut sched: TaskSchedule<R>,
899
26
        circmgr: Weak<Self>,
900
26
        dirmgr: Weak<D>,
901
26
    ) where
902
26
        D: NetDirProvider + 'static + ?Sized,
903
26
    {
904
        let base_delay = Duration::from_secs(10);
905
        let mut retry = RetryDelay::from_duration(base_delay);
906

            
907
        while sched.next().await.is_some() {
908
            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
909
                if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
910
                    let result = cm
911
                        .launch_circuits_preemptively(DirInfo::Directory(&netdir))
912
                        .await;
913

            
914
                    let delay = match result {
915
                        Ok(()) => {
916
                            retry.reset();
917
                            base_delay
918
                        }
919
                        Err(_) => retry.next_delay(&mut rand::rng()),
920
                    };
921

            
922
                    sched.fire_in(delay);
923
                } else {
924
                    // wait for the provider to announce some event, which will probably be
925
                    // NewConsensus; this is therefore a decent yardstick for rechecking
926
                    let _ = dm.events().next().await;
927
                    sched.fire();
928
                }
929
            } else {
930
                return;
931
            }
932
        }
933
8
    }
934

            
935
    /// Launch circuits preemptively, using the preemptive circuit predictor's
936
    /// predictions.
937
    ///
938
    /// # Note
939
    ///
940
    /// This function is invoked periodically from
941
    /// `continually_preemptively_build_circuits()`.
942
    #[allow(clippy::cognitive_complexity)]
943
    #[instrument(level = "trace", skip_all)]
944
    async fn launch_circuits_preemptively(
945
        &self,
946
        netdir: DirInfo<'_>,
947
    ) -> std::result::Result<(), err::PreemptiveCircError> {
948
        trace!("Checking preemptive circuit predictions.");
949
        let (circs, threshold) = {
950
            let path_config = self.mgr.peek_builder().path_config();
951
            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
952
            let threshold = preemptive.config().disable_at_threshold;
953
            (preemptive.predict(&path_config), threshold)
954
        };
955

            
956
        if self.mgr.n_tunnels() >= threshold {
957
            return Ok(());
958
        }
959
        let mut n_created = 0_usize;
960
        let mut n_errors = 0_usize;
961

            
962
        let futures = circs
963
            .iter()
964
            .map(|usage| self.mgr.get_or_launch(usage, netdir));
965
        let results = futures::future::join_all(futures).await;
966
        for (i, result) in results.into_iter().enumerate() {
967
            match result {
968
                Ok((_, TunnelProvenance::NewlyCreated)) => {
969
                    debug!("Preeemptive circuit was created for {:?}", circs[i]);
970
                    n_created += 1;
971
                }
972
                Ok((_, TunnelProvenance::Preexisting)) => {
973
                    trace!("Circuit already existed created for {:?}", circs[i]);
974
                }
975
                Err(e) => {
976
                    warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
977
                    n_errors += 1;
978
                }
979
            }
980
        }
981

            
982
        if n_created > 0 || n_errors == 0 {
983
            // Either we successfully made a circuit, or we didn't have any
984
            // failures while looking for preexisting circuits.  Progress was
985
            // made, so there's no need to back off.
986
            Ok(())
987
        } else {
988
            // We didn't build any circuits and we hit at least one error:
989
            // We'll call this unsuccessful.
990
            Err(err::PreemptiveCircError)
991
        }
992
    }
993

            
994
    /// Create and return a new (typically anonymous) onion circuit stem
995
    /// of type `stem_kind`.
996
    ///
997
    /// If `circ_kind` is provided, we apply additional rules to make sure
998
    /// that this will be usable as a stem for the given kind of onion service circuit.
999
    /// Otherwise, we pick a stem that will probably be useful in general.
    ///
    /// This circuit is guaranteed not to have been used for any traffic
    /// previously, and it will not be given out for any other requests in the
    /// future unless explicitly re-registered with a circuit manager.
    ///
    /// If `planned_target` is provided, then the circuit will be built so that
    /// it does not share any family members with the provided target.  (The
    /// circuit _will not be_ extended to that target itself!)
    ///
    /// Used to implement onion service clients and services.
    #[cfg(feature = "hs-common")]
    #[instrument(level = "trace", skip_all)]
4
    pub(crate) async fn launch_hs_unmanaged<T>(
4
        &self,
4
        planned_target: Option<T>,
4
        dir: &NetDir,
4
        stem_kind: HsCircStemKind,
4
        circ_kind: Option<HsCircKind>,
4
    ) -> Result<B::Tunnel>
4
    where
4
        T: IntoOwnedChanTarget,
4
    {
        let usage = TargetTunnelUsage::HsCircBase {
            compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
            stem_kind,
            circ_kind,
        };
        let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
        Ok(client_circ)
4
    }
    /// Return true if `netdir` has enough information to be used for this
    /// circuit manager.
    ///
    /// (This will check whether the netdir is missing any primary guard
    /// microdescriptors)
    pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
        self.mgr
            .peek_builder()
            .guardmgr()
            .netdir_is_sufficient(netdir)
    }
    /// Internal implementation for [`CircMgr::estimate_timeout`].
    pub(crate) fn estimate_timeout(
        &self,
        timeout_action: &timeouts::Action,
    ) -> std::time::Duration {
        let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
        timeout
    }
    /// Internal implementation for [`CircMgr::builder`].
    pub(crate) fn builder(&self) -> &B {
        self.mgr.peek_builder()
    }
    /// Flush state to the state manager, if there is any unsaved state and
    /// we have the lock.
    ///
    /// Return true if we saved something; false if we didn't have the lock.
48
    pub(crate) fn store_persistent_state(&self) -> Result<bool> {
48
        self.mgr.peek_builder().save_state()
48
    }
    /// Expire every circuit that has been dirty for too long.
    ///
    /// Expired circuits are not closed while they still have users,
    /// but they are no longer given out for new requests.
    async fn expire_circuits(&self) {
        // TODO: I would prefer not to call this at every request, but
        // it should be fine for now.  (At some point we may no longer
        // need this, or might not need to call it so often, now that
        // our circuit expiration runs on scheduled timers via
        // spawn_expiration_task.)
        let now = self.mgr.peek_runtime().now();
        // TODO: Use return value here to implement the above TODO.
        let _next_expiration = self.mgr.expire_tunnels(now).await;
    }
    /// Mark every circuit that we have launched so far as unsuitable for
    /// any future requests.  This won't close existing circuits that have
    /// streams attached to them, but it will prevent any future streams from
    /// being attached.
    ///
    /// TODO: we may want to expose this eventually.  If we do, we should
    /// be very clear that you don't want to use it haphazardly.
    pub(crate) fn retire_all_circuits(&self) {
        self.mgr.retire_all_tunnels();
    }
    /// If `circ_id` is the unique identifier for a circuit that we're
    /// keeping track of, don't give it out for any future requests.
    pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
        let _ = self.mgr.take_tunnel(circ_id);
    }
    /// Return a stream of events about our estimated clock skew; these events
    /// are `None` when we don't have enough information to make an estimate,
    /// and `Some(`[`SkewEstimate`]`)` otherwise.
    ///
    /// Note that this stream can be lossy: if the estimate changes more than
    /// one before you read from the stream, you might only get the most recent
    /// update.
22
    pub(crate) fn skew_events(&self) -> ClockSkewEvents {
22
        self.mgr.peek_builder().guardmgr().skew_events()
22
    }
    /// Record that a failure occurred on a circuit with a given guard, in a way
    /// that makes us unwilling to use that guard for future circuits.
    ///
    pub(crate) fn note_external_failure(
        &self,
        target: &impl ChanTarget,
        external_failure: ExternalActivity,
    ) {
        self.mgr
            .peek_builder()
            .guardmgr()
            .note_external_failure(target, external_failure);
    }
    /// Record that a success occurred on a circuit with a given guard, in a way
    /// that makes us possibly willing to use that guard for future circuits.
    pub(crate) fn note_external_success(
        &self,
        target: &impl ChanTarget,
        external_activity: ExternalActivity,
    ) {
        self.mgr
            .peek_builder()
            .guardmgr()
            .note_external_success(target, external_activity);
    }
}
impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
16
    fn drop(&mut self) {
16
        match self.store_persistent_state() {
4
            Ok(true) => info!("Flushed persistent state at exit."),
12
            Ok(false) => debug!("Lock not held; no state to flush."),
            Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
        }
16
    }
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_time_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use mocks::FakeBuilder;
    use tor_guardmgr::GuardMgr;
    use tor_linkspec::OwnedChanTarget;
    use tor_netdir::testprovider::TestNetDirProvider;
    use tor_persist::TestingStateMgr;
    use super::*;
    #[test]
    fn get_params() {
        use tor_netdir::{MdReceiver, PartialNetDir};
        use tor_netdoc::doc::netstatus::NetParams;
        // If it's just fallbackdir, we get the default parameters.
        let fb = FallbackList::from([]);
        let di: DirInfo<'_> = (&fb).into();
        let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
        assert!(!p1.extend_by_ed25519_id);
        // Now try with a directory and configured parameters.
        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
        let mut params = NetParams::default();
        params.set("circwindow".into(), 100);
        params.set("ExtendByEd25519ID".into(), 1);
        let mut dir = PartialNetDir::new(consensus, Some(&params));
        for m in microdescs {
            dir.add_microdesc(m);
        }
        let netdir = dir.unwrap_if_sufficient().unwrap();
        let di: DirInfo<'_> = (&netdir).into();
        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
        assert!(p2.extend_by_ed25519_id);
        // Now try with a bogus circwindow value.
        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
        let mut params = NetParams::default();
        params.set("circwindow".into(), 100_000);
        params.set("ExtendByEd25519ID".into(), 1);
        let mut dir = PartialNetDir::new(consensus, Some(&params));
        for m in microdescs {
            dir.add_microdesc(m);
        }
        let netdir = dir.unwrap_if_sufficient().unwrap();
        let di: DirInfo<'_> = (&netdir).into();
        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
        assert!(p2.extend_by_ed25519_id);
    }
    fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
        let config = crate::config::test_config::TestConfig::default();
        let statemgr = TestingStateMgr::new();
        let guardmgr =
            GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
        let builder = FakeBuilder::new(
            &runtime,
            statemgr.clone(),
            &tor_guardmgr::TestConfig::default(),
        );
        let circmgr = Arc::new(CircMgrInner::new_generic(
            &config, &runtime, &guardmgr, builder,
        ));
        let netdir = Arc::new(TestNetDirProvider::new());
        CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
            .expect("launch CircMgrInner background tasks");
        circmgr
    }
    #[test]
    #[cfg(feature = "hs-common")]
    fn test_launch_hs_unmanaged() {
        tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
            let circmgr = make_circmgr(runtime.clone());
            let netdir = tor_netdir::testnet::construct_netdir()
                .unwrap_if_sufficient()
                .unwrap();
            let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
            runtime.spawn_identified("launch_hs_unamanged", async move {
                ret_tx
                    .send(
                        circmgr
                            .launch_hs_unmanaged::<OwnedChanTarget>(
                                None,
                                &netdir,
                                HsCircStemKind::Naive,
                                None,
                            )
                            .await,
                    )
                    .unwrap();
            });
            runtime.advance_by(Duration::from_millis(60)).await;
            ret_rx.await.unwrap().unwrap();
        });
    }
}