1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48

            
49
// This clippy lint produces a false positive on `use strum`, below.
50
// Attempting to apply the lint to just the use statement fails to suppress
51
// this lint and instead produces another lint about a useless clippy attribute.
52
#![allow(clippy::single_component_path_imports)]
53

            
54
mod bootstrap;
55
pub mod config;
56
mod docid;
57
mod docmeta;
58
mod err;
59
mod event;
60
mod shared_ref;
61
mod state;
62
mod storage;
63

            
64
#[cfg(feature = "bridge-client")]
65
pub mod bridgedesc;
66
#[cfg(feature = "dirfilter")]
67
pub mod filter;
68

            
69
use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70
use crate::err::BootstrapAction;
71
#[cfg(not(feature = "experimental-api"))]
72
use crate::shared_ref::SharedMutArc;
73
#[cfg(feature = "experimental-api")]
74
pub use crate::shared_ref::SharedMutArc;
75
use crate::storage::{DynStore, Store};
76
use bootstrap::AttemptId;
77
use event::DirProgress;
78
use postage::watch;
79
use scopeguard::ScopeGuard;
80
use tor_circmgr::CircMgr;
81
use tor_dirclient::SourceInfo;
82
use tor_dircommon::config::DirTolerance;
83
use tor_error::{info_report, into_internal, warn_report};
84
use tor_netdir::params::NetParameters;
85
use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
86

            
87
use async_trait::async_trait;
88
use futures::stream::BoxStream;
89
use oneshot_fused_workaround as oneshot;
90
use tor_netdoc::doc::netstatus::ProtoStatuses;
91
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
92
use tor_rtcompat::{Runtime, SpawnExt};
93
use tracing::{debug, info, instrument, trace, warn};
94
use web_time_compat::SystemTimeExt;
95

            
96
use std::marker::PhantomData;
97
use std::sync::atomic::{AtomicBool, Ordering};
98
use std::sync::{Arc, Mutex};
99
use std::time::Duration;
100
use std::{collections::HashMap, sync::Weak};
101
use std::{fmt::Debug, time::SystemTime};
102

            
103
use crate::state::{DirState, NetDirChange};
104
pub use config::DirMgrConfig;
105
pub use docid::DocId;
106
pub use err::Error;
107
pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
108
pub use storage::DocumentText;
109
pub use tor_dircommon::fallback::{FallbackDir, FallbackDirBuilder};
110
pub use tor_netdir::Timeliness;
111

            
112
/// Re-export of `strum` crate for use by an internal macro
113
use strum;
114

            
115
/// A Result as returned by this crate.
116
pub type Result<T> = std::result::Result<T, Error>;
117

            
118
/// Storage manager used by [`DirMgr`] and
119
/// [`BridgeDescMgr`](bridgedesc::BridgeDescMgr)
120
///
121
/// Internally, this wraps up a sqlite database.
122
///
123
/// This is a handle, which is cheap to clone; clones share state.
124
#[derive(Clone)]
125
pub struct DirMgrStore<R: Runtime> {
126
    /// The actual store
127
    pub(crate) store: Arc<Mutex<crate::DynStore>>,
128

            
129
    /// Be parameterized by Runtime even though we don't use it right now
130
    pub(crate) runtime: PhantomData<R>,
131
}
132

            
133
impl<R: Runtime> DirMgrStore<R> {
134
    /// Open the storage, according to the specified configuration
135
36
    pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
136
36
        let store = Arc::new(Mutex::new(config.open_store(offline)?));
137
36
        drop(runtime);
138
36
        let runtime = PhantomData;
139
36
        Ok(DirMgrStore { store, runtime })
140
36
    }
141
}
142

            
143
/// Trait for DirMgr implementations
144
#[async_trait]
145
pub trait DirProvider: NetDirProvider {
146
    /// Try to change our configuration to `new_config`.
147
    ///
148
    /// Actual behavior will depend on the value of `how`.
149
    fn reconfigure(
150
        &self,
151
        new_config: &DirMgrConfig,
152
        how: tor_config::Reconfigure,
153
    ) -> std::result::Result<(), tor_config::ReconfigureError>;
154

            
155
    /// Bootstrap a `DirProvider` that hasn't been bootstrapped yet.
156
    async fn bootstrap(&self) -> Result<()>;
157

            
158
    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
159
    /// in the latest directory's bootstrap status.
160
    ///
161
    /// Note that this stream can be lossy: the caller will not necessarily
162
    /// observe every event on the stream
163
    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
164

            
165
    /// Return a [`TaskHandle`] that can be used to manage the download process.
166
    fn download_task_handle(&self) -> Option<TaskHandle> {
167
        None
168
    }
169
}
170

            
171
// NOTE(eta): We can't implement this for Arc<DirMgr<R>> due to trait coherence rules, so instead
172
//            there's a blanket impl for Arc<T> in tor-netdir.
173
impl<R: Runtime> NetDirProvider for DirMgr<R> {
174
146
    fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
175
        use tor_netdir::Error as NetDirError;
176
146
        let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
177
        let lifetime = match timeliness {
178
            Timeliness::Strict => netdir.lifetime().clone(),
179
            Timeliness::Timely => self
180
                .config
181
                .get()
182
                .tolerance
183
                .extend_lifetime(netdir.lifetime()),
184
            Timeliness::Unchecked => return Ok(netdir),
185
        };
186
        // TODO #2384 -- we have a runtime here; we should use it.
187
        let now = SystemTime::get();
188
        if lifetime.valid_after() > now {
189
            Err(NetDirError::DirNotYetValid)
190
        } else if lifetime.valid_until() < now {
191
            Err(NetDirError::DirExpired)
192
        } else {
193
            Ok(netdir)
194
        }
195
146
    }
196

            
197
190
    fn events(&self) -> BoxStream<'static, DirEvent> {
198
190
        Box::pin(self.events.subscribe())
199
190
    }
200

            
201
26
    fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
202
26
        if let Some(netdir) = self.netdir.get() {
203
            // We have a directory, so we'd like to give it out for its
204
            // parameters.
205
            //
206
            // We do this even if the directory is expired, since parameters
207
            // don't really expire on any plausible timescale.
208
            netdir
209
        } else {
210
            // We have no directory, so we'll give out the default parameters as
211
            // modified by the provided override_net_params configuration.
212
            //
213
26
            self.default_parameters
214
26
                .lock()
215
26
                .expect("Poisoned lock")
216
26
                .clone()
217
        }
218
        // TODO(nickm): If we felt extremely clever, we could add a third case
219
        // where, if we have a pending directory with a validated consensus, we
220
        // give out that consensus's network parameters even if we _don't_ yet
221
        // have a full directory.  That's significant refactoring, though, for
222
        // an unclear amount of benefit.
223
26
    }
224

            
225
22
    fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
226
22
        self.protocols.lock().expect("Poisoned lock").clone()
227
22
    }
228
}
229

            
230
#[async_trait]
231
impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
232
8
    fn reconfigure(
233
8
        &self,
234
8
        new_config: &DirMgrConfig,
235
8
        how: tor_config::Reconfigure,
236
8
    ) -> std::result::Result<(), tor_config::ReconfigureError> {
237
8
        DirMgr::reconfigure(self, new_config, how)
238
8
    }
239

            
240
    #[instrument(level = "trace", skip_all)]
241
    async fn bootstrap(&self) -> Result<()> {
242
        DirMgr::bootstrap(self).await
243
    }
244

            
245
22
    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
246
22
        Box::pin(DirMgr::bootstrap_events(self))
247
22
    }
248

            
249
22
    fn download_task_handle(&self) -> Option<TaskHandle> {
250
22
        Some(self.task_handle.clone())
251
22
    }
252
}
253

            
254
/// A directory manager to download, fetch, and cache a Tor directory.
255
///
256
/// A DirMgr can operate in three modes:
257
///   * In **offline** mode, it only reads from the cache, and can
258
///     only read once.
259
///   * In **read-only** mode, it reads from the cache, but checks
260
///     whether it can acquire an associated lock file.  If it can, then
261
///     it enters read-write mode.  If not, it checks the cache
262
///     periodically for new information.
263
///   * In **read-write** mode, it knows that no other process will be
264
///     writing to the cache, and it takes responsibility for fetching
265
///     data from the network and updating the directory with new
266
///     directory information.
267
pub struct DirMgr<R: Runtime> {
268
    /// Configuration information: where to find directories, how to
269
    /// validate them, and so on.
270
    config: tor_config::MutCfg<DirMgrConfig>,
271
    /// Handle to our sqlite cache.
272
    // TODO(nickm): I'd like to use an rwlock, but that's not feasible, since
273
    // rusqlite::Connection isn't Sync.
274
    // TODO is needed?
275
    store: Arc<Mutex<DynStore>>,
276
    /// Our latest sufficiently bootstrapped directory, if we have one.
277
    ///
278
    /// We use the RwLock so that we can give this out to a bunch of other
279
    /// users, and replace it once a new directory is bootstrapped.
280
    // TODO(eta): Eurgh! This is so many Arcs! (especially considering this
281
    //            gets wrapped in an Arc)
282
    netdir: Arc<SharedMutArc<NetDir>>,
283

            
284
    /// Our latest set of recommended protocols.
285
    protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
286

            
287
    /// A set of network parameters to hand out when we have no directory.
288
    default_parameters: Mutex<Arc<NetParameters>>,
289

            
290
    /// A publisher handle that we notify whenever the consensus changes.
291
    events: event::FlagPublisher<DirEvent>,
292

            
293
    /// A publisher handle that we notify whenever our bootstrapping status
294
    /// changes.
295
    send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
296

            
297
    /// A receiver handle that gets notified whenever our bootstrapping status
298
    /// changes.
299
    ///
300
    /// We don't need to keep this drained, since `postage::watch` already knows
301
    /// to discard unread events.
302
    receive_status: DirBootstrapEvents,
303

            
304
    /// A circuit manager, if this DirMgr supports downloading.
305
    circmgr: Option<Arc<CircMgr<R>>>,
306

            
307
    /// Our asynchronous runtime.
308
    runtime: R,
309

            
310
    /// Whether or not we're operating in offline mode.
311
    offline: bool,
312

            
313
    /// If we're not in offline mode, stores whether or not the `DirMgr` has attempted
314
    /// to bootstrap yet or not.
315
    ///
316
    /// This exists in order to prevent starting two concurrent bootstrap tasks.
317
    ///
318
    /// (In offline mode, this does nothing.)
319
    bootstrap_started: AtomicBool,
320

            
321
    /// A filter that gets applied to directory objects before we use them.
322
    #[cfg(feature = "dirfilter")]
323
    filter: crate::filter::FilterConfig,
324

            
325
    /// A task schedule that can be used if we're bootstrapping.  If this is
326
    /// None, then there's currently a scheduled task in progress.
327
    task_schedule: Mutex<Option<TaskSchedule<R>>>,
328

            
329
    /// A task handle that we return to anybody who needs to manage our download process.
330
    task_handle: TaskHandle,
331
}
332

            
333
/// The possible origins of a document.
334
///
335
/// Used (for example) to report where we got a document from if it fails to
336
/// parse.
337
#[derive(Debug, Clone)]
338
#[non_exhaustive]
339
pub enum DocSource {
340
    /// We loaded the document from our cache.
341
    LocalCache,
342
    /// We fetched the document from a server.
343
    DirServer {
344
        /// Information about the server we fetched the document from.
345
        source: Option<SourceInfo>,
346
    },
347
}
348

            
349
impl std::fmt::Display for DocSource {
350
2
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351
2
        match self {
352
            DocSource::LocalCache => write!(f, "local cache"),
353
2
            DocSource::DirServer { source: None } => write!(f, "directory server"),
354
            DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
355
        }
356
2
    }
357
}
358

            
359
impl<R: Runtime> DirMgr<R> {
360
    /// Try to load the directory from disk, without launching any
361
    /// kind of update process.
362
    ///
363
    /// This function runs in **offline** mode: it will give an error
364
    /// if the result is not up-to-date, or not fully downloaded.
365
    ///
366
    /// In general, you shouldn't use this function in a long-running
367
    /// program; it's only suitable for command-line or batch tools.
368
    // TODO: I wish this function didn't have to be async or take a runtime.
369
    pub fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
370
        let store = DirMgrStore::new(&config, runtime.clone(), true)?;
371
        let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
372

            
373
        // TODO: add some way to return a directory that isn't up-to-date
374
        let attempt = AttemptId::next();
375
        trace!(%attempt, "Trying to load a full directory from cache");
376
        let outcome = dirmgr.load_directory(attempt);
377
        trace!(%attempt, "Load result: {outcome:?}");
378
        let _success = outcome?;
379

            
380
        dirmgr
381
            .netdir(Timeliness::Timely)
382
            .map_err(|_| Error::DirectoryNotPresent)
383
    }
384

            
385
    /// Return a current netdir, either loading it or bootstrapping it
386
    /// as needed.
387
    ///
388
    /// Like load_once, but will try to bootstrap (or wait for another
389
    /// process to bootstrap) if we don't have an up-to-date
390
    /// bootstrapped directory.
391
    ///
392
    /// In general, you shouldn't use this function in a long-running
393
    /// program; it's only suitable for command-line or batch tools.
394
    pub async fn load_or_bootstrap_once(
395
        config: DirMgrConfig,
396
        runtime: R,
397
        store: DirMgrStore<R>,
398
        circmgr: Arc<CircMgr<R>>,
399
    ) -> Result<Arc<NetDir>> {
400
        let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
401
        dirmgr
402
            .timely_netdir()
403
            .map_err(|_| Error::DirectoryNotPresent)
404
    }
405

            
406
    /// Create a new `DirMgr` in online mode, but don't bootstrap it yet.
407
    ///
408
    /// The `DirMgr` can be bootstrapped later with `bootstrap`.
409
22
    pub fn create_unbootstrapped(
410
22
        config: DirMgrConfig,
411
22
        runtime: R,
412
22
        store: DirMgrStore<R>,
413
22
        circmgr: Arc<CircMgr<R>>,
414
22
    ) -> Result<Arc<Self>> {
415
22
        Ok(Arc::new(DirMgr::from_config(
416
22
            config,
417
22
            runtime,
418
22
            store,
419
22
            Some(circmgr),
420
            false,
421
        )?))
422
22
    }
423

            
424
    /// Bootstrap a `DirMgr` created in online mode that hasn't been bootstrapped yet.
425
    ///
426
    /// This function will not return until the directory is bootstrapped enough to build circuits.
427
    /// It will also launch a background task that fetches any missing information, and that
428
    /// replaces the directory when a new one is available.
429
    ///
430
    /// This function is intended to be used together with `create_unbootstrapped`. There is no
431
    /// need to call this function otherwise.
432
    ///
433
    /// If bootstrapping has already successfully taken place, returns early with success.
434
    ///
435
    /// # Errors
436
    ///
437
    /// Returns an error if bootstrapping fails. If the error is [`Error::CantAdvanceState`],
438
    /// it may be possible to successfully bootstrap later on by calling this function again.
439
    ///
440
    /// # Panics
441
    ///
442
    /// Panics if the `DirMgr` passed to this function was not created in online mode, such as
443
    /// via `load_once`.
444
    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
445
    #[instrument(level = "trace", skip_all)]
446
    pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
447
        if self.offline {
448
            return Err(Error::OfflineMode);
449
        }
450

            
451
        // The semantics of this are "attempt to replace a 'false' value with 'true'.
452
        // If the value in bootstrap_started was not 'false' when the attempt was made, returns
453
        // `Err`; this means another bootstrap attempt is in progress or has completed, so we
454
        // return early.
455

            
456
        // NOTE(eta): could potentially weaken the `Ordering` here in future
457
        if self
458
            .bootstrap_started
459
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
460
            .is_err()
461
        {
462
            debug!("Attempted to bootstrap twice; ignoring.");
463
            return Ok(());
464
        }
465

            
466
        // Use a RAII guard to reset `bootstrap_started` to `false` if we return early without
467
        // completing bootstrap.
468
        let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
469
            v.store(false, Ordering::SeqCst);
470
        });
471

            
472
        let schedule = {
473
            let sched = self.task_schedule.lock().expect("poisoned lock").take();
474
            match sched {
475
                Some(sched) => sched,
476
                None => {
477
                    debug!("Attempted to bootstrap twice; ignoring.");
478
                    return Ok(());
479
                }
480
            }
481
        };
482

            
483
        // Try to load from the cache.
484
        let attempt_id = AttemptId::next();
485
        trace!(attempt=%attempt_id, "Starting to bootstrap directory");
486
        let have_directory = self.load_directory(attempt_id)?;
487

            
488
        let (mut sender, receiver) = if have_directory {
489
            info!("Loaded a good directory from cache.");
490
            (None, None)
491
        } else {
492
            info!("Didn't get usable directory from cache.");
493
            let (sender, receiver) = oneshot::channel();
494
            (Some(sender), Some(receiver))
495
        };
496

            
497
        // Whether we loaded or not, we now start downloading.
498
        let dirmgr_weak = Arc::downgrade(self);
499
        self.runtime
500
            .spawn(async move {
501
                // Use an RAII guard to make sure that when this task exits, the
502
                // TaskSchedule object is put back.
503
                //
504
                // TODO(nick): Putting the schedule back isn't actually useful
505
                // if the task exits _after_ we've bootstrapped for the first
506
                // time, because of how bootstrap_started works.
507
                let mut schedule = scopeguard::guard(schedule, |schedule| {
508
                    if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
509
                        *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
510
                    }
511
                });
512

            
513
                // Don't warn when these are Error::ManagerDropped: that
514
                // means that the DirMgr has been shut down.
515
                if let Err(e) =
516
                    Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
517
                        .await
518
                {
519
                    match e {
520
                        Error::ManagerDropped => {}
521
                        _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
522
                    }
523
                } else if let Err(e) =
524
                    Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
525
                        .await
526
                {
527
                    match e {
528
                        Error::ManagerDropped => {}
529
                        _ => warn_report!(e, "Unrecovered error while downloading"),
530
                    }
531
                }
532
            })
533
            .map_err(|e| Error::from_spawn("directory updater task", e))?;
534

            
535
        if let Some(receiver) = receiver {
536
            match receiver.await {
537
                Ok(()) => {
538
                    info!("We have enough information to build circuits.");
539
                    // Disarm the RAII guard, since we succeeded.  Now bootstrap_started will remain true.
540
                    let _ = ScopeGuard::into_inner(reset_bootstrap_started);
541
                }
542
                Err(_) => {
543
                    warn!("Bootstrapping task exited before finishing.");
544
                    return Err(Error::CantAdvanceState);
545
                }
546
            }
547
        }
548
        Ok(())
549
    }
550

            
551
    /// Returns `true` if a bootstrap attempt is in progress, or successfully completed.
552
    pub fn bootstrap_started(&self) -> bool {
553
        self.bootstrap_started.load(Ordering::SeqCst)
554
    }
555

            
556
    /// Return a new directory manager from a given configuration,
557
    /// bootstrapping from the network as necessary.
558
    #[instrument(level = "trace", skip_all)]
559
    pub async fn bootstrap_from_config(
560
        config: DirMgrConfig,
561
        runtime: R,
562
        store: DirMgrStore<R>,
563
        circmgr: Arc<CircMgr<R>>,
564
    ) -> Result<Arc<Self>> {
565
        let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
566

            
567
        dirmgr.bootstrap().await?;
568

            
569
        Ok(dirmgr)
570
    }
571

            
572
    /// Try forever to either lock the storage (and thereby become the
573
    /// owner), or to reload the database.
574
    ///
575
    /// If we have begin to have a bootstrapped directory, send a
576
    /// message using `on_complete`.
577
    ///
578
    /// If we eventually become the owner, return Ok().
579
    #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
580
    async fn reload_until_owner(
581
        weak: &Weak<Self>,
582
        schedule: &mut TaskSchedule<R>,
583
        attempt_id: AttemptId,
584
        on_complete: &mut Option<oneshot::Sender<()>>,
585
    ) -> Result<()> {
586
        let mut logged = false;
587
        let mut bootstrapped;
588
        {
589
            let dirmgr = upgrade_weak_ref(weak)?;
590
            bootstrapped = dirmgr.netdir.get().is_some();
591
        }
592

            
593
        loop {
594
            {
595
                let dirmgr = upgrade_weak_ref(weak)?;
596
                trace!("Trying to take ownership of the directory cache lock");
597
                if dirmgr.try_upgrade_to_readwrite()? {
598
                    // We now own the lock!  (Maybe we owned it before; the
599
                    // upgrade_to_readwrite() function is idempotent.)  We can
600
                    // do our own bootstrapping.
601
                    if logged {
602
                        info!(
603
                            "The previous owning process has given up the lock. We are now in charge of managing the directory."
604
                        );
605
                    }
606
                    return Ok(());
607
                }
608
            }
609

            
610
            if !logged {
611
                logged = true;
612
                if bootstrapped {
613
                    info!("Another process is managing the directory. We'll use its cache.");
614
                } else {
615
                    info!(
616
                        "Another process is bootstrapping the directory. Waiting till it finishes or exits."
617
                    );
618
                }
619
            }
620

            
621
            // We don't own the lock.  Somebody else owns the cache.  They
622
            // should be updating it.  Wait a bit, then try again.
623
            let pause = if bootstrapped {
624
                std::time::Duration::new(120, 0)
625
            } else {
626
                std::time::Duration::new(5, 0)
627
            };
628
            schedule.sleep(pause).await?;
629
            // TODO: instead of loading the whole thing we should have a
630
            // database entry that says when the last update was, or use
631
            // our state functions.
632
            {
633
                let dirmgr = upgrade_weak_ref(weak)?;
634
                trace!("Trying to load from the directory cache");
635
                if dirmgr.load_directory(attempt_id)? {
636
                    // Successfully loaded a bootstrapped directory.
637
                    if let Some(send_done) = on_complete.take() {
638
                        let _ = send_done.send(());
639
                    }
640
                    if !bootstrapped {
641
                        info!("The directory is now bootstrapped.");
642
                    }
643
                    bootstrapped = true;
644
                }
645
            }
646
        }
647
    }
648

            
649
    /// Try to fetch our directory info and keep it updated, indefinitely.
650
    ///
651
    /// If we have begin to have a bootstrapped directory, send a
652
    /// message using `on_complete`.
653
    #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
654
    #[instrument(level = "trace", skip_all)]
655
    async fn download_forever(
656
        weak: Weak<Self>,
657
        schedule: &mut TaskSchedule<R>,
658
        mut attempt_id: AttemptId,
659
        mut on_complete: Option<oneshot::Sender<()>>,
660
    ) -> Result<()> {
661
        let mut state: Box<dyn DirState> = {
662
            let dirmgr = upgrade_weak_ref(&weak)?;
663
            Box::new(state::GetConsensusState::new(
664
                dirmgr.runtime.clone(),
665
                dirmgr.config.get(),
666
                CacheUsage::CacheOkay,
667
                Some(dirmgr.netdir.clone()),
668
                #[cfg(feature = "dirfilter")]
669
                dirmgr
670
                    .filter
671
                    .clone()
672
                    .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
673
            ))
674
        };
675

            
676
        trace!("Entering download loop.");
677

            
678
        loop {
679
            let mut usable = false;
680

            
681
            let retry_config = {
682
                let dirmgr = upgrade_weak_ref(&weak)?;
683
                // TODO(nickm): instead of getting this every time we loop, it
684
                // might be a good idea to refresh it with each attempt, at
685
                // least at the point of checking the number of attempts.
686
                dirmgr.config.get().schedule.retry_bootstrap()
687
            };
688
            let mut retry_delay = retry_config.schedule();
689

            
690
            'retry_attempt: for try_num in retry_config.attempts() {
691
                trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
692
                let outcome = bootstrap::download(
693
                    Weak::clone(&weak),
694
                    &mut state,
695
                    schedule,
696
                    attempt_id,
697
                    &mut on_complete,
698
                )
699
                .await;
700
                trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
701

            
702
                if let Err(err) = outcome {
703
                    if state.is_ready(Readiness::Usable) {
704
                        usable = true;
705
                        info_report!(
706
                            err,
707
                            "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)"
708
                        );
709
                        break 'retry_attempt;
710
                    }
711

            
712
                    match err.bootstrap_action() {
713
                        BootstrapAction::Nonfatal => {
714
                            return Err(into_internal!(
715
                                "Nonfatal error should not have propagated here"
716
                            )(err)
717
                            .into());
718
                        }
719
                        BootstrapAction::Reset => {}
720
                        BootstrapAction::Fatal => return Err(err),
721
                    }
722

            
723
                    let delay = retry_delay.next_delay(&mut rand::rng());
724
                    warn_report!(
725
                        err,
726
                        "Unable to download a usable directory. (We will restart in {})",
727
                        humantime::format_duration(delay),
728
                    );
729
                    {
730
                        let dirmgr = upgrade_weak_ref(&weak)?;
731
                        dirmgr.note_reset(attempt_id);
732
                    }
733
                    schedule.sleep(delay).await?;
734
                    state = state.reset();
735
                } else {
736
                    info!(attempt=%attempt_id, "Directory is complete.");
737
                    usable = true;
738
                    break 'retry_attempt;
739
                }
740
            }
741

            
742
            if !usable {
743
                // we ran out of attempts.
744
                warn!(
745
                    "We failed {} times to bootstrap a directory. We're going to give up.",
746
                    retry_config.n_attempts()
747
                );
748
                return Err(Error::CantAdvanceState);
749
            } else {
750
                // Report success, if appropriate.
751
                if let Some(send_done) = on_complete.take() {
752
                    let _ = send_done.send(());
753
                }
754
            }
755

            
756
            let reset_at = state.reset_time();
757
            match reset_at {
758
                Some(t) => {
759
                    trace!("Sleeping until {}", time::OffsetDateTime::from(t));
760
                    schedule.sleep_until_wallclock(t).await?;
761
                }
762
                None => return Ok(()),
763
            }
764
            attempt_id = bootstrap::AttemptId::next();
765
            trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
766
            state = state.reset();
767
        }
768
    }
769

            
770
    /// Get a reference to the circuit manager, if we have one.
771
2
    fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
772
2
        self.circmgr.clone().ok_or(Error::NoDownloadSupport)
773
2
    }
774

            
775
    /// Try to change our configuration to `new_config`.
776
    ///
777
    /// Actual behavior will depend on the value of `how`.
778
8
    pub fn reconfigure(
779
8
        &self,
780
8
        new_config: &DirMgrConfig,
781
8
        how: tor_config::Reconfigure,
782
8
    ) -> std::result::Result<(), tor_config::ReconfigureError> {
783
8
        let config = self.config.get();
784
        // We don't support changing these: doing so basically would require us
785
        // to abort all our in-progress downloads, since they might be based on
786
        // no-longer-viable information.
787
        // NOTE: keep this in sync with the behaviour of `DirMgrConfig::update_from_config`
788
8
        if new_config.cache_dir != config.cache_dir {
789
            how.cannot_change("storage.cache_dir")?;
790
8
        }
791
8
        if new_config.cache_trust != config.cache_trust {
792
            how.cannot_change("storage.permissions")?;
793
8
        }
794
8
        if new_config.authorities() != config.authorities() {
795
            how.cannot_change("network.authorities")?;
796
8
        }
797

            
798
8
        if how == tor_config::Reconfigure::CheckAllOrNothing {
799
4
            return Ok(());
800
4
        }
801

            
802
4
        let params_changed = new_config.override_net_params != config.override_net_params;
803

            
804
4
        self.config
805
4
            .map_and_replace(|cfg| cfg.update_from_config(new_config));
806

            
807
4
        if params_changed {
808
            let _ignore_err = self.netdir.mutate(|netdir| {
809
                netdir.replace_overridden_parameters(&new_config.override_net_params);
810
                Ok(())
811
            });
812
            {
813
                let mut params = self.default_parameters.lock().expect("lock failed");
814
                *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
815
            }
816

            
817
            // (It's okay to ignore the error, since it just means that there
818
            // was no current netdir.)
819
            self.events.publish(DirEvent::NewConsensus);
820
4
        }
821

            
822
4
        Ok(())
823
8
    }
824

            
825
    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
826
    /// in the latest directory's bootstrap status.
827
    ///
828
    /// Note that this stream can be lossy: the caller will not necessarily
829
    /// observe every event on the stream
830
22
    pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
831
22
        self.receive_status.clone()
832
22
    }
833

            
834
    /// Replace the latest status with `progress` and broadcast to anybody
835
    /// watching via a [`DirBootstrapEvents`] stream.
836
30
    fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
837
        // TODO(nickm): can I kill off this lock by having something else own the sender?
838
30
        let mut sender = self.send_status.lock().expect("poisoned lock");
839
30
        let mut status = sender.borrow_mut();
840

            
841
30
        status.update_progress(attempt_id, progress);
842
30
    }
843

            
844
    /// Update our status tracker to note that some number of errors has
845
    /// occurred.
846
    fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
847
        if n_errors == 0 {
848
            return;
849
        }
850
        let mut sender = self.send_status.lock().expect("poisoned lock");
851
        let mut status = sender.borrow_mut();
852

            
853
        status.note_errors(attempt_id, n_errors);
854
    }
855

            
856
    /// Update our status tracker to note that we've needed to reset our download attempt.
857
    fn note_reset(&self, attempt_id: AttemptId) {
858
        let mut sender = self.send_status.lock().expect("poisoned lock");
859
        let mut status = sender.borrow_mut();
860

            
861
        status.note_reset(attempt_id);
862
    }
863

            
864
    /// Try to make this a directory manager with read-write access to its
865
    /// storage.
866
    ///
867
    /// Return true if we got the lock, or if we already had it.
868
    ///
869
    /// Return false if another process has the lock
870
    fn try_upgrade_to_readwrite(&self) -> Result<bool> {
871
        self.store
872
            .lock()
873
            .expect("Directory storage lock poisoned")
874
            .upgrade_to_readwrite()
875
    }
876

            
877
    /// Return a reference to the store, if it is currently read-write.
878
    #[cfg(test)]
879
4
    fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
880
4
        let rw = !self
881
4
            .store
882
4
            .lock()
883
4
            .expect("Directory storage lock poisoned")
884
4
            .is_readonly();
885
        // A race-condition is possible here, but I believe it's harmless.
886
4
        if rw { Some(&self.store) } else { None }
887
4
    }
888

            
889
    /// Construct a DirMgr from a DirMgrConfig.
890
    ///
891
    /// If `offline` is set, opens the SQLite store read-only and sets the offline flag in the
892
    /// returned manager.
893
    #[allow(clippy::unnecessary_wraps)] // API compat and future-proofing
894
36
    fn from_config(
895
36
        config: DirMgrConfig,
896
36
        runtime: R,
897
36
        store: DirMgrStore<R>,
898
36
        circmgr: Option<Arc<CircMgr<R>>>,
899
36
        offline: bool,
900
36
    ) -> Result<Self> {
901
36
        let netdir = Arc::new(SharedMutArc::new());
902
36
        let events = event::FlagPublisher::new();
903
36
        let default_parameters = NetParameters::from_map(&config.override_net_params);
904
36
        let default_parameters = Mutex::new(Arc::new(default_parameters));
905

            
906
36
        let (send_status, receive_status) = postage::watch::channel();
907
36
        let send_status = Mutex::new(send_status);
908
36
        let receive_status = DirBootstrapEvents {
909
36
            inner: receive_status,
910
36
        };
911
        #[cfg(feature = "dirfilter")]
912
36
        let filter = config.extensions.filter.clone();
913

            
914
        // We create these early so the client code can access task_handle before bootstrap() returns.
915
36
        let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
916
36
        let task_schedule = Mutex::new(Some(task_schedule));
917

            
918
        // We load the cached protocol recommendations unconditionally: the caller needs them even
919
        // if it does not try to load the reset of the cache.
920
36
        let protocols = {
921
36
            let store = store.store.lock().expect("lock poisoned");
922
36
            store
923
36
                .cached_protocol_recommendations()?
924
36
                .map(|(t, p)| (t, Arc::new(p)))
925
        };
926

            
927
36
        Ok(DirMgr {
928
36
            config: config.into(),
929
36
            store: store.store,
930
36
            netdir,
931
36
            protocols: Mutex::new(protocols),
932
36
            default_parameters,
933
36
            events,
934
36
            send_status,
935
36
            receive_status,
936
36
            circmgr,
937
36
            runtime,
938
36
            offline,
939
36
            bootstrap_started: AtomicBool::new(false),
940
36
            #[cfg(feature = "dirfilter")]
941
36
            filter,
942
36
            task_schedule,
943
36
            task_handle,
944
36
        })
945
36
    }
946

            
947
    /// Load the latest non-pending non-expired directory from the
948
    /// cache, if it is newer than the one we have.
949
    ///
950
    /// Return false if there is no such consensus.
951
    fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
952
        let state = state::GetConsensusState::new(
953
            self.runtime.clone(),
954
            self.config.get(),
955
            CacheUsage::CacheOnly,
956
            None,
957
            #[cfg(feature = "dirfilter")]
958
            self.filter
959
                .clone()
960
                .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
961
        );
962
        let _ = bootstrap::load(self, Box::new(state), attempt_id)?;
963

            
964
        Ok(self.netdir.get().is_some())
965
    }
966

            
967
    /// Return a new asynchronous stream that will receive notification
968
    /// whenever the consensus has changed.
969
    ///
970
    /// Multiple events may be batched up into a single item: each time
971
    /// this stream yields an event, all you can assume is that the event has
972
    /// occurred at least once.
973
    pub fn events(&self) -> impl futures::Stream<Item = DirEvent> + use<R> {
974
        self.events.subscribe()
975
    }
976

            
977
    /// Try to load the text of a single document described by `doc` from
978
    /// storage.
979
6
    pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
980
        use itertools::Itertools;
981
6
        let mut result = HashMap::new();
982
6
        let query: DocQuery = (*doc).into();
983
6
        let store = self.store.lock().expect("store lock poisoned");
984
6
        query.load_from_store_into(&mut result, &**store)?;
985
6
        let item = result.into_iter().at_most_one().map_err(|_| {
986
            Error::CacheCorruption("Found more than one entry in storage for given docid")
987
        })?;
988
6
        if let Some((docid, doctext)) = item {
989
4
            if &docid != doc {
990
                return Err(Error::CacheCorruption(
991
                    "Item from storage had incorrect docid.",
992
                ));
993
4
            }
994
4
            Ok(Some(doctext))
995
        } else {
996
2
            Ok(None)
997
        }
998
6
    }
999

            
    /// Load the text for a collection of documents.
    ///
    /// If many of the documents have the same type, this can be more
    /// efficient than calling [`text`](Self::text).
2
    pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
2
    where
2
        T: IntoIterator<Item = DocId>,
    {
2
        let partitioned = docid::partition_by_type(docs);
2
        let mut result = HashMap::new();
2
        let store = self.store.lock().expect("store lock poisoned");
6
        for (_, query) in partitioned.into_iter() {
6
            query.load_from_store_into(&mut result, &**store)?;
        }
2
        Ok(result)
2
    }
    /// Given a request we sent and the response we got from a
    /// directory server, see whether we should expand that response
    /// into "something larger".
    ///
    /// Currently, this handles expanding consensus diffs, and nothing
    /// else.  We do it at this stage of our downloading operation
    /// because it requires access to the store.
12
    fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
12
        if let ClientRequest::Consensus(req) = req {
8
            if tor_consdiff::looks_like_diff(&text) {
4
                if let Some(old_d) = req.old_consensus_digests().next() {
4
                    let db_val = {
4
                        let s = self.store.lock().expect("Directory storage lock poisoned");
4
                        s.consensus_by_sha3_digest_of_signed_part(old_d)?
                    };
4
                    if let Some((old_consensus, meta)) = db_val {
4
                        info!("Applying a consensus diff");
4
                        let new_consensus = tor_consdiff::apply_diff(
4
                            old_consensus.as_str()?,
4
                            &text,
4
                            Some(*meta.sha3_256_of_signed()),
                        )?;
4
                        new_consensus.check_digest()?;
2
                        return Ok(new_consensus.to_string());
                    }
                }
                return Err(Error::Unwanted(
                    "Received a consensus diff we did not ask for",
                ));
4
            }
4
        }
8
        Ok(text)
12
    }
    /// If `state` has netdir changes to apply, apply them to our netdir.
    #[allow(clippy::cognitive_complexity)]
16
    fn apply_netdir_changes(
16
        self: &Arc<Self>,
16
        state: &mut Box<dyn DirState>,
16
        store: &mut dyn Store,
16
    ) -> Result<()> {
16
        if let Some(change) = state.get_netdir_change() {
            match change {
                NetDirChange::AttemptReplace {
                    netdir,
                    consensus_meta,
                } => {
                    // Check the new netdir is sufficient, if we have a circmgr.
                    // (Unwraps are fine because the `Option` is `Some` until we take it.)
                    if let Some(ref cm) = self.circmgr {
                        if !cm
                            .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
                        {
                            debug!("Got a new NetDir, but it doesn't have enough guards yet.");
                            return Ok(());
                        }
                    }
                    let is_stale = {
                        // Done inside a block to not hold a long-lived copy of the NetDir.
                        self.netdir
                            .get()
                            .map(|x| {
                                x.lifetime().valid_after()
                                    > netdir
                                        .as_ref()
                                        .expect("AttemptReplace had None")
                                        .lifetime()
                                        .valid_after()
                            })
                            .unwrap_or(false)
                    };
                    if is_stale {
                        warn!("Got a new NetDir, but it's older than the one we currently have!");
                        return Err(Error::NetDirOlder);
                    }
                    let cfg = self.config.get();
                    let mut netdir = netdir.take().expect("AttemptReplace had None");
                    netdir.replace_overridden_parameters(&cfg.override_net_params);
                    self.netdir.replace(netdir);
                    self.events.publish(DirEvent::NewConsensus);
                    self.events.publish(DirEvent::NewDescriptors);
                    info!("Marked consensus usable.");
                    if !store.is_readonly() {
                        store.mark_consensus_usable(consensus_meta)?;
                        // Now that a consensus is usable, older consensuses may
                        // need to expire.
                        store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
                    }
                    Ok(())
                }
                NetDirChange::AddMicrodescs(mds) => {
                    self.netdir.mutate(|netdir| {
                        for md in mds.drain(..) {
                            netdir.add_microdesc(md);
                        }
                        Ok(())
                    })?;
                    self.events.publish(DirEvent::NewDescriptors);
                    Ok(())
                }
                NetDirChange::SetRequiredProtocol { timestamp, protos } => {
                    if !store.is_readonly() {
                        store.update_protocol_recommendations(timestamp, protos.as_ref())?;
                    }
                    let mut pr = self.protocols.lock().expect("Poisoned lock");
                    *pr = Some((timestamp, protos));
                    self.events.publish(DirEvent::NewProtocolRecommendation);
                    Ok(())
                }
            }
        } else {
16
            Ok(())
        }
16
    }
}
/// A degree of readiness for a given directory state object.
#[derive(Debug, Copy, Clone)]
enum Readiness {
    /// There is no more information to download.
    Complete,
    /// There is more information to download, but we don't need to
    Usable,
}
/// Try to upgrade a weak reference to a DirMgr, and give an error on
/// failure.
24
fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
24
    Weak::upgrade(weak).ok_or(Error::ManagerDropped)
24
}
/// Given a time `now`, and an amount of tolerated clock skew `tolerance`,
/// return the age of the oldest consensus that we should request at that time.
8
pub(crate) fn default_consensus_cutoff(
8
    now: SystemTime,
8
    tolerance: &DirTolerance,
8
) -> Result<SystemTime> {
    /// We _always_ allow at least this much age in our consensuses, to account
    /// for the fact that consensuses have some lifetime.
    const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
8
    let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance());
8
    let cutoff = time::OffsetDateTime::from(now - allow_skew);
    // We now round cutoff to the next hour, so that we aren't leaking our exact
    // time to the directory cache.
    //
    // With the time crate, it's easier to calculate the "next hour" by rounding
    // _down_ then adding an hour; rounding up would sometimes require changing
    // the date too.
8
    let (h, _m, _s) = cutoff.to_hms();
8
    let cutoff = cutoff.replace_time(
8
        time::Time::from_hms(h, 0, 0)
8
            .map_err(tor_error::into_internal!("Failed clock calculation"))?,
    );
8
    let cutoff = cutoff + Duration::from_secs(3600);
8
    Ok(cutoff.into())
8
}
/// Return a list of the protocols [supported](tor_protover::doc_supported) by this crate
/// when running as a client.
422
pub fn supported_client_protocols() -> tor_protover::Protocols {
    use tor_protover::named::*;
    // WARNING: REMOVING ELEMENTS FROM THIS LIST CAN BE DANGEROUS!
    // SEE [`tor_protover::doc_changing`]
422
    [
422
        //
422
        DIRCACHE_CONSDIFF,
422
    ]
422
    .into_iter()
422
    .collect()
422
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_time_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use crate::docmeta::{AuthCertMeta, ConsensusMeta};
    use std::time::Duration;
    use tempfile::TempDir;
    use tor_basic_utils::test_rng::testing_rng;
    use tor_netdoc::doc::netstatus::ConsensusFlavor;
    use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
    use tor_rtcompat::SleepProvider;
    #[test]
    fn protocols() {
        let pr = supported_client_protocols();
        let expected = "DirCache=2".parse().unwrap();
        assert_eq!(pr, expected);
    }
    pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
        let dir = TempDir::new().unwrap();
        let config = DirMgrConfig {
            cache_dir: dir.path().into(),
            ..Default::default()
        };
        let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
        let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
        (dir, dirmgr)
    }
    #[test]
    fn failing_accessors() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let (_tempdir, mgr) = new_mgr(rt);
            assert!(mgr.circmgr().is_err());
            assert!(mgr.netdir(Timeliness::Unchecked).is_err());
        });
    }
    #[test]
    fn load_and_store_internals() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let now = rt.wallclock();
            let tomorrow = now + Duration::from_secs(86400);
            let later = tomorrow + Duration::from_secs(86400);
            let (_tempdir, mgr) = new_mgr(rt);
            // Seed the storage with a bunch of junk.
            let d1 = [5_u8; 32];
            let d2 = [7; 32];
            let d3 = [42; 32];
            let d4 = [99; 20];
            let d5 = [12; 20];
            let certid1 = AuthCertKeyIds {
                id_fingerprint: d4.into(),
                sk_fingerprint: d5.into(),
            };
            let certid2 = AuthCertKeyIds {
                id_fingerprint: d5.into(),
                sk_fingerprint: d4.into(),
            };
            {
                let mut store = mgr.store.lock().unwrap();
                store
                    .store_microdescs(
                        &[
                            ("Fake micro 1", &d1),
                            ("Fake micro 2", &d2),
                            ("Fake micro 3", &d3),
                        ],
                        now,
                    )
                    .unwrap();
                #[cfg(feature = "routerdesc")]
                store
                    .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
                    .unwrap();
                store
                    .store_authcerts(&[
                        (
                            AuthCertMeta::new(certid1, now, tomorrow),
                            "Fake certificate one",
                        ),
                        (
                            AuthCertMeta::new(certid2, now, tomorrow),
                            "Fake certificate two",
                        ),
                    ])
                    .unwrap();
                let cmeta = ConsensusMeta::new(
                    Lifetime::new(now, tomorrow, later).unwrap(),
                    [102; 32],
                    [103; 32],
                );
                store
                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
                    .unwrap();
            }
            // Try to get it with text().
            let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
            assert_eq!(t1.as_str(), Ok("Fake micro 1"));
            let t2 = mgr
                .text(&DocId::LatestConsensus {
                    flavor: ConsensusFlavor::Microdesc,
                    cache_usage: CacheUsage::CacheOkay,
                })
                .unwrap()
                .unwrap();
            assert_eq!(t2.as_str(), Ok("Fake consensus!"));
            let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
            assert!(t3.is_none());
            // Now try texts()
            let d_bogus = DocId::Microdesc([255; 32]);
            let res = mgr
                .texts(vec![
                    DocId::Microdesc(d2),
                    DocId::Microdesc(d3),
                    d_bogus,
                    DocId::AuthCert(certid2),
                    #[cfg(feature = "routerdesc")]
                    DocId::RouterDesc(d5),
                ])
                .unwrap();
            assert_eq!(
                res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
                Ok("Fake micro 2")
            );
            assert_eq!(
                res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
                Ok("Fake micro 3")
            );
            assert!(!res.contains_key(&d_bogus));
            assert_eq!(
                res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
                Ok("Fake certificate two")
            );
            #[cfg(feature = "routerdesc")]
            assert_eq!(
                res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
                Ok("Fake rd2")
            );
        });
    }
    #[test]
    fn make_consensus_request() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let now = rt.wallclock();
            let tomorrow = now + Duration::from_secs(86400);
            let later = tomorrow + Duration::from_secs(86400);
            let (_tempdir, mgr) = new_mgr(rt);
            let config = DirMgrConfig::default();
            // Try with an empty store.
            let req = {
                let store = mgr.store.lock().unwrap();
                bootstrap::make_consensus_request(
                    now,
                    ConsensusFlavor::Microdesc,
                    &**store,
                    &config,
                )
                .unwrap()
            };
            let tolerance = DirTolerance::default().post_valid_tolerance();
            match req {
                ClientRequest::Consensus(r) => {
                    assert_eq!(r.old_consensus_digests().count(), 0);
                    let date = r.last_consensus_date().unwrap();
                    assert!(date >= now - tolerance);
                    assert!(date <= now - tolerance + Duration::from_secs(3600));
                }
                _ => panic!("Wrong request type"),
            }
            // Add a fake consensus record.
            let d_prev = [42; 32];
            {
                let mut store = mgr.store.lock().unwrap();
                let cmeta = ConsensusMeta::new(
                    Lifetime::new(now, tomorrow, later).unwrap(),
                    d_prev,
                    [103; 32],
                );
                store
                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
                    .unwrap();
            }
            // Now try again.
            let req = {
                let store = mgr.store.lock().unwrap();
                bootstrap::make_consensus_request(
                    now,
                    ConsensusFlavor::Microdesc,
                    &**store,
                    &config,
                )
                .unwrap()
            };
            match req {
                ClientRequest::Consensus(r) => {
                    let ds: Vec<_> = r.old_consensus_digests().collect();
                    assert_eq!(ds.len(), 1);
                    assert_eq!(ds[0], &d_prev);
                    assert_eq!(r.last_consensus_date(), Some(now));
                }
                _ => panic!("Wrong request type"),
            }
        });
    }
    #[test]
    fn make_other_requests() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            use rand::Rng;
            let (_tempdir, mgr) = new_mgr(rt);
            let certid1 = AuthCertKeyIds {
                id_fingerprint: [99; 20].into(),
                sk_fingerprint: [100; 20].into(),
            };
            let mut rng = testing_rng();
            #[cfg(feature = "routerdesc")]
            let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
            let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
            let config = DirMgrConfig::default();
            // Try an authcert.
            let query = DocId::AuthCert(certid1);
            let store = mgr.store.lock().unwrap();
            let reqs =
                bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
                    .unwrap();
            assert_eq!(reqs.len(), 1);
            let req = &reqs[0];
            if let ClientRequest::AuthCert(r) = req {
                assert_eq!(r.keys().next(), Some(&certid1));
            } else {
                panic!();
            }
            // Try a bunch of mds.
            let reqs =
                bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
                    .unwrap();
            assert_eq!(reqs.len(), 2);
            assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
            // Try a bunch of rds.
            #[cfg(feature = "routerdesc")]
            {
                let reqs = bootstrap::make_requests_for_documents(
                    &mgr.runtime,
                    &rd_ids,
                    &**store,
                    &config,
                )
                .unwrap();
                assert_eq!(reqs.len(), 2);
                assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
            }
        });
    }
    #[test]
    fn expand_response() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let now = rt.wallclock();
            let day = Duration::from_secs(86400);
            let config = DirMgrConfig::default();
            let (_tempdir, mgr) = new_mgr(rt);
            // Try a simple request: nothing should happen.
            let q = DocId::Microdesc([99; 32]);
            let r = {
                let store = mgr.store.lock().unwrap();
                bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
                    .unwrap()
            };
            let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
            assert_eq!(&expanded.unwrap(), "ABC");
            // Try a consensus response that doesn't look like a diff in
            // response to a query that doesn't ask for one.
            let latest_id = DocId::LatestConsensus {
                flavor: ConsensusFlavor::Microdesc,
                cache_usage: CacheUsage::CacheOkay,
            };
            let r = {
                let store = mgr.store.lock().unwrap();
                bootstrap::make_requests_for_documents(
                    &mgr.runtime,
                    &[latest_id],
                    &**store,
                    &config,
                )
                .unwrap()
            };
            let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
            assert_eq!(&expanded.unwrap(), "DEF");
            // Now stick some metadata and a string into the storage so that
            // we can ask for a diff.
            {
                let mut store = mgr.store.lock().unwrap();
                let d_in = [0x99; 32]; // This one, we can fake.
                let cmeta = ConsensusMeta::new(
                    Lifetime::new(now, now + day, now + 2 * day).unwrap(),
                    d_in,
                    d_in,
                );
                store
                    .store_consensus(
                        &cmeta,
                        ConsensusFlavor::Microdesc,
                        false,
                        "line 1\nline2\nline 3\n",
                    )
                    .unwrap();
            }
            // Try expanding something that isn't a consensus, even if we'd like
            // one.
            let r = {
                let store = mgr.store.lock().unwrap();
                bootstrap::make_requests_for_documents(
                    &mgr.runtime,
                    &[latest_id],
                    &**store,
                    &config,
                )
                .unwrap()
            };
            let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
            assert_eq!(&expanded.unwrap(), "hello");
            // Finally, try "expanding" a diff (by applying it and checking the digest.
            let diff = "network-status-diff-version 1
hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
2c
replacement line
.
".to_string();
            let expanded = mgr.expand_response_text(&r[0], diff);
            assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
            // If the digest is wrong, that should get rejected.
            let diff = "network-status-diff-version 1
hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
2c
replacement line
.
".to_string();
            let expanded = mgr.expand_response_text(&r[0], diff);
            assert!(expanded.is_err());
        });
    }
}