1
//! Implementation for the primary directory state machine.
2
//!
3
//! There are three (active) states that a download can be in: looking
4
//! for a consensus ([`GetConsensusState`]), looking for certificates
5
//! to validate that consensus ([`GetCertsState`]), and looking for
6
//! microdescriptors ([`GetMicrodescsState`]).
7
//!
8
//! These states have no contact with the network, and are purely
9
//! reactive to other code that drives them.  See the
10
//! [`bootstrap`](crate::bootstrap) module for functions that actually
11
//! load or download directory information.
12

            
13
use std::collections::{HashMap, HashSet};
14
use std::fmt::Debug;
15
use std::mem;
16
use std::sync::{Arc, Mutex};
17
use std::time::{Duration, SystemTime};
18
use time::OffsetDateTime;
19
use tor_basic_utils::RngExt as _;
20
use tor_dircommon::retry::DownloadSchedule;
21
use tor_error::{internal, warn_report};
22
use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
23
use tor_netdoc::doc::authcert::UncheckedAuthCert;
24
use tor_netdoc::doc::netstatus::{Lifetime, ProtoStatuses};
25
use tracing::{debug, warn};
26

            
27
use crate::event::DirProgress;
28

            
29
use crate::storage::DynStore;
30
use crate::{
31
    CacheUsage, ClientRequest, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result,
32
    docmeta::{AuthCertMeta, ConsensusMeta},
33
    event,
34
};
35
use crate::{DocSource, SharedMutArc};
36
use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
37
#[cfg(feature = "geoip")]
38
use tor_geoip::GeoipDb;
39
use tor_llcrypto::pk::rsa::RsaIdentity;
40
use tor_netdoc::doc::{
41
    microdesc::{MdDigest, Microdesc},
42
    netstatus::MdConsensus,
43
};
44
use tor_netdoc::{
45
    AllowAnnotations,
46
    doc::{
47
        authcert::{AuthCert, AuthCertKeyIds},
48
        microdesc::MicrodescReader,
49
        netstatus::{ConsensusFlavor, UnvalidatedMdConsensus},
50
    },
51
};
52
use tor_rtcompat::Runtime;
53

            
54
/// A change to the currently running `NetDir`, returned by the state machines in this module.
55
#[derive(Debug)]
56
pub(crate) enum NetDirChange<'a> {
57
    /// If the provided `NetDir` is suitable for use (i.e. the caller determines it can build
58
    /// circuits with it), replace the current `NetDir` with it.
59
    ///
60
    /// The caller must call `DirState::on_netdir_replaced` if the replace was successful.
61
    AttemptReplace {
62
        /// The netdir to replace the current one with, if it's usable.
63
        ///
64
        /// The `Option` is always `Some` when returned from the state machine; it's there
65
        /// so that the caller can call `.take()` to avoid cloning the netdir.
66
        netdir: &'a mut Option<NetDir>,
67
        /// The consensus metadata for this netdir.
68
        consensus_meta: &'a ConsensusMeta,
69
    },
70
    /// Add the provided microdescriptors to the current `NetDir`.
71
    AddMicrodescs(&'a mut Vec<Microdesc>),
72
    /// Replace the recommended set of subprotocols.
73
    SetRequiredProtocol {
74
        /// The time at which the protocol statuses were recommended
75
        timestamp: SystemTime,
76
        /// The recommended set of protocols.
77
        protos: Arc<ProtoStatuses>,
78
    },
79
}
80

            
81
/// A "state" object used to represent our progress in downloading a
82
/// directory.
83
///
84
/// These state objects are not meant to know about the network, or
85
/// how to fetch documents at all.  Instead, they keep track of what
86
/// information they are missing, and what to do when they get that
87
/// information.
88
///
89
/// Every state object has two possible transitions: "resetting", and
90
/// "advancing".  Advancing happens when a state has no more work to
91
/// do, and needs to transform into a different kind of object.
92
/// Resetting happens when this state needs to go back to an initial
93
/// state in order to start over -- either because of an error or
94
/// because the information it has downloaded is no longer timely.
95
pub(crate) trait DirState: Send {
96
    /// Return a human-readable description of this state.
97
    fn describe(&self) -> String;
98
    /// Return a list of the documents we're missing.
99
    ///
100
    /// If every document on this list were to be loaded or downloaded, then
101
    /// the state should either become "ready to advance", or "complete."
102
    ///
103
    /// This list should never _grow_ on a given state; only advancing
104
    /// or resetting the state should add new DocIds that weren't
105
    /// there before.
106
    fn missing_docs(&self) -> Vec<DocId>;
107
    /// Describe whether this state has reached `ready` status.
108
    fn is_ready(&self, ready: Readiness) -> bool;
109
    /// If the state object wants to make changes to the currently running `NetDir`,
110
    /// return the proposed changes.
111
16
    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
112
16
        None
113
16
    }
114
    /// Return true if this state can advance to another state via its
115
    /// `advance` method.
116
    fn can_advance(&self) -> bool;
117
    /// Add one or more documents from our cache; returns 'true' if there
118
    /// was any change in this state.
119
    ///
120
    /// Set `changed` to true if any semantic changes in this state were made.
121
    ///
122
    /// An error return does not necessarily mean that no data was added;
123
    /// partial successes are possible.
124
    fn add_from_cache(
125
        &mut self,
126
        docs: HashMap<DocId, DocumentText>,
127
        changed: &mut bool,
128
    ) -> Result<()>;
129

            
130
    /// Add information that we have just downloaded to this state.
131
    ///
132
    /// This method receives a copy of the original request, and should reject
133
    /// any documents that do not pertain to it.
134
    ///
135
    /// If `storage` is provided, then we should write any accepted documents
136
    /// into `storage` so they can be saved in a cache.
137
    ///
138
    /// Set `changed` to true if any semantic changes in this state were made.
139
    ///
140
    /// An error return does not necessarily mean that no data was added;
141
    /// partial successes are possible.
142
    fn add_from_download(
143
        &mut self,
144
        text: &str,
145
        request: &ClientRequest,
146
        source: DocSource,
147
        storage: Option<&Mutex<DynStore>>,
148
        changed: &mut bool,
149
    ) -> Result<()>;
150
    /// Return a summary of this state as a [`DirProgress`].
151
    fn bootstrap_progress(&self) -> event::DirProgress;
152
    /// Return a configuration for attempting downloads.
153
    fn dl_config(&self) -> DownloadSchedule;
154
    /// If possible, advance to the next state.
155
    fn advance(self: Box<Self>) -> Box<dyn DirState>;
156
    /// Return a time (if any) when downloaders should stop attempting to
157
    /// advance this state, and should instead reset it and start over.
158
    fn reset_time(&self) -> Option<SystemTime>;
159
    /// Reset this state and start over.
160
    fn reset(self: Box<Self>) -> Box<dyn DirState>;
161
}
162

            
163
/// An object that can provide a previous netdir for the bootstrapping state machines to use.
164
pub(crate) trait PreviousNetDir: Send + Sync + 'static + Debug {
165
    /// Get the previous netdir, if there still is one.
166
    fn get_netdir(&self) -> Option<Arc<NetDir>>;
167
}
168

            
169
impl PreviousNetDir for SharedMutArc<NetDir> {
170
    fn get_netdir(&self) -> Option<Arc<NetDir>> {
171
        self.get()
172
    }
173
}
174

            
175
/// Initial state: fetching or loading a consensus directory.
176
#[derive(Clone, Debug)]
177
pub(crate) struct GetConsensusState<R: Runtime> {
178
    /// How should we get the consensus from the cache, if at all?
179
    cache_usage: CacheUsage,
180

            
181
    /// If present, a time after which we want our consensus to have
182
    /// been published.
183
    //
184
    // TODO: This is not yet used everywhere it could be.  In the future maybe
185
    // it should be inserted into the DocId::LatestConsensus  alternative rather
186
    // than being recalculated in make_consensus_request,
187
    after: Option<SystemTime>,
188

            
189
    /// If present, our next state.
190
    ///
191
    /// (This is present once we have a consensus.)
192
    next: Option<GetCertsState<R>>,
193

            
194
    /// A list of RsaIdentity for the authorities that we believe in.
195
    ///
196
    /// No consensus can be valid unless it purports to be signed by
197
    /// more than half of these authorities.
198
    authority_ids: Vec<RsaIdentity>,
199

            
200
    /// A `Runtime` implementation.
201
    rt: R,
202
    /// The configuration of the directory manager. Used for download configuration
203
    /// purposes.
204
    config: Arc<DirMgrConfig>,
205
    /// If one exists, the netdir we're trying to update.
206
    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
207

            
208
    /// A filter that gets applied to directory objects before we use them.
209
    #[cfg(feature = "dirfilter")]
210
    filter: Arc<dyn crate::filter::DirFilter>,
211
}
212

            
213
impl<R: Runtime> GetConsensusState<R> {
214
    /// Create a new `GetConsensusState`, using the cache as per `cache_usage` and downloading as
215
    /// per the relevant sections of `config`. If `prev_netdir` is supplied, information from that
216
    /// directory may be used to complete the next one.
217
14
    pub(crate) fn new(
218
14
        rt: R,
219
14
        config: Arc<DirMgrConfig>,
220
14
        cache_usage: CacheUsage,
221
14
        prev_netdir: Option<Arc<dyn PreviousNetDir>>,
222
14
        #[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
223
14
    ) -> Self {
224
14
        let authority_ids = config.authorities().v3idents().clone();
225
14
        let after = prev_netdir
226
14
            .as_ref()
227
14
            .and_then(|x| x.get_netdir())
228
14
            .map(|nd| nd.lifetime().valid_after());
229

            
230
14
        GetConsensusState {
231
14
            cache_usage,
232
14
            after,
233
14
            next: None,
234
14
            authority_ids,
235
14
            rt,
236
14
            config,
237
14
            prev_netdir,
238
14
            #[cfg(feature = "dirfilter")]
239
14
            filter,
240
14
        }
241
14
    }
242
}
243

            
244
impl<R: Runtime> DirState for GetConsensusState<R> {
245
8
    fn describe(&self) -> String {
246
8
        if self.next.is_some() {
247
2
            "About to fetch certificates."
248
        } else {
249
6
            match self.cache_usage {
250
                CacheUsage::CacheOnly => "Looking for a cached consensus.",
251
4
                CacheUsage::CacheOkay => "Looking for a consensus.",
252
2
                CacheUsage::MustDownload => "Downloading a consensus.",
253
            }
254
        }
255
8
        .to_string()
256
8
    }
257
4
    fn missing_docs(&self) -> Vec<DocId> {
258
4
        if self.can_advance() {
259
2
            return Vec::new();
260
2
        }
261
2
        let flavor = ConsensusFlavor::Microdesc;
262
2
        vec![DocId::LatestConsensus {
263
2
            flavor,
264
2
            cache_usage: self.cache_usage,
265
2
        }]
266
4
    }
267
4
    fn is_ready(&self, _ready: Readiness) -> bool {
268
4
        false
269
4
    }
270
10
    fn can_advance(&self) -> bool {
271
10
        self.next.is_some()
272
10
    }
273
2
    fn bootstrap_progress(&self) -> DirProgress {
274
2
        if let Some(next) = &self.next {
275
            next.bootstrap_progress()
276
        } else {
277
2
            DirProgress::NoConsensus { after: self.after }
278
        }
279
2
    }
280
2
    fn dl_config(&self) -> DownloadSchedule {
281
2
        self.config.schedule.retry_consensus()
282
2
    }
283
2
    fn add_from_cache(
284
2
        &mut self,
285
2
        docs: HashMap<DocId, DocumentText>,
286
2
        changed: &mut bool,
287
2
    ) -> Result<()> {
288
2
        let text = match docs.into_iter().next() {
289
            None => return Ok(()),
290
            Some((
291
                DocId::LatestConsensus {
292
                    flavor: ConsensusFlavor::Microdesc,
293
                    ..
294
                },
295
2
                text,
296
2
            )) => text,
297
            _ => return Err(Error::CacheCorruption("Not an md consensus")),
298
        };
299

            
300
2
        let source = DocSource::LocalCache;
301

            
302
2
        self.add_consensus_text(
303
2
            source,
304
2
            text.as_str().map_err(Error::BadUtf8InCache)?,
305
2
            None,
306
2
            changed,
307
        )?;
308
2
        Ok(())
309
2
    }
310
10
    fn add_from_download(
311
10
        &mut self,
312
10
        text: &str,
313
10
        request: &ClientRequest,
314
10
        source: DocSource,
315
10
        storage: Option<&Mutex<DynStore>>,
316
10
        changed: &mut bool,
317
10
    ) -> Result<()> {
318
10
        let requested_newer_than = match request {
319
10
            ClientRequest::Consensus(r) => r.last_consensus_date(),
320
            _ => None,
321
        };
322
10
        let meta = self.add_consensus_text(source, text, requested_newer_than, changed)?;
323

            
324
6
        if let Some(store) = storage {
325
2
            let mut w = store.lock().expect("Directory storage lock poisoned");
326
2
            w.store_consensus(meta, ConsensusFlavor::Microdesc, true, text)?;
327
4
        }
328
6
        Ok(())
329
10
    }
330
6
    fn advance(self: Box<Self>) -> Box<dyn DirState> {
331
6
        match self.next {
332
6
            Some(next) => Box::new(next),
333
            None => self,
334
        }
335
6
    }
336
2
    fn reset_time(&self) -> Option<SystemTime> {
337
2
        None
338
2
    }
339
    fn reset(self: Box<Self>) -> Box<dyn DirState> {
340
        self
341
    }
342
}
343

            
344
impl<R: Runtime> GetConsensusState<R> {
345
    /// Helper: try to set the current consensus text from an input string
346
    /// `text`.  Refuse it if the authorities could never be correct, or if it
347
    /// is ill-formed.
348
    ///
349
    /// If `cutoff` is provided, treat any consensus older than `cutoff` as
350
    /// older-than-requested.
351
    ///
352
    /// Errors from this method are not fatal to the download process.
353
12
    fn add_consensus_text(
354
12
        &mut self,
355
12
        source: DocSource,
356
12
        text: &str,
357
12
        cutoff: Option<SystemTime>,
358
12
        changed: &mut bool,
359
12
    ) -> Result<&ConsensusMeta> {
360
        // Try to parse it and get its metadata.
361
10
        let (consensus_meta, unvalidated) = {
362
10
            let (signedval, remainder, parsed) =
363
12
                MdConsensus::parse(text).map_err(|e| Error::from_netdoc(source.clone(), e))?;
364
            #[cfg(feature = "dirfilter")]
365
10
            let parsed = self.filter.filter_consensus(parsed)?;
366
10
            let parsed = self.config.tolerance.extend_tolerance(parsed);
367
10
            let now = self.rt.wallclock();
368
10
            let timely = parsed.check_valid_at(&now)?;
369
10
            if let Some(cutoff) = cutoff {
370
                if timely.peek_lifetime().valid_after() < cutoff {
371
                    return Err(Error::Unwanted("consensus was older than requested"));
372
                }
373
10
            }
374
10
            let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);
375
10
            (meta, timely)
376
        };
377

            
378
        // Check out what authorities we believe in, and see if enough
379
        // of them are purported to have signed this consensus.
380
10
        let unvalidated = unvalidated.set_n_authorities(self.authority_ids.len());
381

            
382
10
        let id_refs: Vec<_> = self.authority_ids.iter().collect();
383
10
        if !unvalidated.authorities_are_correct(&id_refs[..]) {
384
2
            return Err(Error::UnrecognizedAuthorities);
385
8
        }
386
        // Yes, we've added the consensus.  That's a change.
387
8
        *changed = true;
388

            
389
        // Make a set of all the certificates we want -- the subset of
390
        // those listed on the consensus that we would indeed accept as
391
        // authoritative.
392
8
        let desired_certs = unvalidated
393
8
            .signing_cert_ids()
394
24
            .filter(|m| self.recognizes_authority(&m.id_fingerprint))
395
8
            .collect();
396

            
397
8
        self.next = Some(GetCertsState {
398
8
            cache_usage: self.cache_usage,
399
8
            consensus_source: source,
400
8
            consensus: GetCertsConsensus::Unvalidated(unvalidated),
401
8
            consensus_meta,
402
8
            missing_certs: desired_certs,
403
8
            certs: Vec::new(),
404
8
            rt: self.rt.clone(),
405
8
            config: self.config.clone(),
406
8
            prev_netdir: self.prev_netdir.take(),
407
8
            protocol_statuses: None,
408
8
            #[cfg(feature = "dirfilter")]
409
8
            filter: self.filter.clone(),
410
8
        });
411

            
412
        // Unwrap should be safe because `next` was just assigned
413
        #[allow(clippy::unwrap_used)]
414
8
        Ok(&self.next.as_ref().unwrap().consensus_meta)
415
12
    }
416

            
417
    /// Return true if `id` is an authority identity we recognize
418
24
    fn recognizes_authority(&self, id: &RsaIdentity) -> bool {
419
40
        self.authority_ids.iter().any(|auth| auth == id)
420
24
    }
421
}
422

            
423
/// One of two possible internal states for the consensus in a GetCertsState.
424
///
425
/// This inner object is advanced by `try_checking_sigs`.
426
#[derive(Clone, Debug)]
427
enum GetCertsConsensus {
428
    /// We have an unvalidated consensus; we haven't checked its signatures.
429
    Unvalidated(UnvalidatedMdConsensus),
430
    /// A validated consensus: the signatures are fine and we can advance.
431
    Validated(MdConsensus),
432
    /// We failed to validate the consensus, even after getting enough certificates.
433
    Failed,
434
}
435

            
436
/// Second state: fetching or loading authority certificates.
437
///
438
/// TODO: we should probably do what C tor does, and try to use the
439
/// same directory that gave us the consensus.
440
///
441
/// TODO SECURITY: This needs better handling for the DOS attack where
442
/// we are given a bad consensus signed with fictional certificates
443
/// that we can never find.
444
#[derive(Clone, Debug)]
445
struct GetCertsState<R: Runtime> {
446
    /// The cache usage we had in mind when we began.  Used to reset.
447
    cache_usage: CacheUsage,
448
    /// Where did we get our consensus?
449
    consensus_source: DocSource,
450
    /// The consensus that we are trying to validate, or an error if we've given
451
    /// up on validating it.
452
    consensus: GetCertsConsensus,
453
    /// Metadata for the consensus.
454
    consensus_meta: ConsensusMeta,
455
    /// A set of the certificate keypairs for the certificates we don't
456
    /// have yet.
457
    missing_certs: HashSet<AuthCertKeyIds>,
458
    /// A list of the certificates we've been able to load or download.
459
    certs: Vec<AuthCert>,
460

            
461
    /// A `Runtime` implementation.
462
    rt: R,
463
    /// The configuration of the directory manager. Used for download configuration
464
    /// purposes.
465
    config: Arc<DirMgrConfig>,
466
    /// If one exists, the netdir we're trying to update.
467
    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
468

            
469
    /// If present a set of protocols to install as our latest recommended set.
470
    protocol_statuses: Option<(SystemTime, Arc<ProtoStatuses>)>,
471

            
472
    /// A filter that gets applied to directory objects before we use them.
473
    #[cfg(feature = "dirfilter")]
474
    filter: Arc<dyn crate::filter::DirFilter>,
475
}
476

            
477
impl<R: Runtime> GetCertsState<R> {
478
    /// Handle a certificate result returned by `tor_netdoc`: checking it for timeliness
479
    /// and well-signedness.
480
    ///
481
    /// On success return the `AuthCert` and the string that represents it within the string `within`.
482
    /// On failure, return an error.
483
6
    fn check_parsed_certificate<'s>(
484
6
        &self,
485
6
        parsed: tor_netdoc::Result<UncheckedAuthCert>,
486
6
        source: &DocSource,
487
6
        within: &'s str,
488
6
    ) -> Result<(AuthCert, &'s str)> {
489
6
        let parsed = parsed.map_err(|e| Error::from_netdoc(source.clone(), e))?;
490
6
        let cert_text = parsed
491
6
            .within(within)
492
6
            .expect("Certificate was not in input as expected");
493
6
        let wellsigned = parsed.check_signature()?;
494
6
        let now = self.rt.wallclock();
495
6
        let timely_cert = self
496
6
            .config
497
6
            .tolerance
498
6
            .extend_tolerance(wellsigned)
499
6
            .check_valid_at(&now)?;
500
6
        Ok((timely_cert, cert_text))
501
6
    }
502

            
503
    /// If we have enough certificates, and we have not yet checked the
504
    /// signatures on the consensus, try checking them.
505
    ///
506
    /// If the consensus is valid, remove the unvalidated consensus from `self`
507
    /// and put the validated consensus there instead.
508
    ///
509
    /// If the consensus is invalid, throw it out set a blocking error.
510
4
    fn try_checking_sigs(&mut self) -> Result<()> {
511
        use GetCertsConsensus as C;
512
        // Temporary value; we'll replace the consensus field with something
513
        // better before the method returns.
514
4
        let mut consensus = C::Failed;
515
4
        std::mem::swap(&mut consensus, &mut self.consensus);
516

            
517
4
        let unvalidated = match consensus {
518
4
            C::Unvalidated(uv) if uv.key_is_correct(&self.certs[..]).is_ok() => uv,
519
            _ => {
520
                // nothing to check at this point.  Either we already checked the consensus, or we don't yet have enough certificates.
521
2
                self.consensus = consensus;
522
2
                return Ok(());
523
            }
524
        };
525

            
526
2
        let (new_consensus, outcome) = match unvalidated.check_signature(&self.certs[..]) {
527
2
            Ok(validated) => (C::Validated(validated), Ok(())),
528
            Err(cause) => (
529
                C::Failed,
530
                Err(Error::ConsensusInvalid {
531
                    source: self.consensus_source.clone(),
532
                    cause,
533
                }),
534
            ),
535
        };
536
2
        self.consensus = new_consensus;
537

            
538
        // Update our protocol recommendations if we have a validated consensus,
539
        // and if we haven't already updated our recommendations.
540
2
        if let GetCertsConsensus::Validated(v) = &self.consensus {
541
2
            if self.protocol_statuses.is_none() {
542
2
                let protoset: &Arc<ProtoStatuses> = v.protocol_statuses();
543
2
                self.protocol_statuses = Some((
544
2
                    self.consensus_meta.lifetime().valid_after(),
545
2
                    Arc::clone(protoset),
546
2
                ));
547
2
            }
548
        }
549

            
550
2
        outcome
551
4
    }
552
}
553

            
554
impl<R: Runtime> DirState for GetCertsState<R> {
555
4
    fn describe(&self) -> String {
556
        use GetCertsConsensus as C;
557
4
        match &self.consensus {
558
            C::Unvalidated(_) => {
559
4
                let total = self.certs.len() + self.missing_certs.len();
560
4
                format!(
561
4
                    "Downloading certificates for consensus (we are missing {}/{}).",
562
4
                    self.missing_certs.len(),
563
                    total
564
                )
565
            }
566
            C::Validated(_) => "Validated consensus; about to get microdescriptors".to_string(),
567
            C::Failed => "Failed to validate consensus".to_string(),
568
        }
569
4
    }
570
10
    fn missing_docs(&self) -> Vec<DocId> {
571
10
        self.missing_certs
572
10
            .iter()
573
12
            .map(|id| DocId::AuthCert(*id))
574
10
            .collect()
575
10
    }
576
4
    fn is_ready(&self, _ready: Readiness) -> bool {
577
4
        false
578
4
    }
579
6
    fn can_advance(&self) -> bool {
580
6
        matches!(self.consensus, GetCertsConsensus::Validated(_))
581
6
    }
582
4
    fn bootstrap_progress(&self) -> DirProgress {
583
4
        let n_certs = self.certs.len();
584
4
        let n_missing_certs = self.missing_certs.len();
585
4
        let total_certs = n_missing_certs + n_certs;
586
4
        DirProgress::FetchingCerts {
587
4
            lifetime: self.consensus_meta.lifetime().clone(),
588
4
            usable_lifetime: self
589
4
                .config
590
4
                .tolerance
591
4
                .extend_lifetime(self.consensus_meta.lifetime()),
592
4

            
593
4
            n_certs: (n_certs as u16, total_certs as u16),
594
4
        }
595
4
    }
596
2
    fn dl_config(&self) -> DownloadSchedule {
597
2
        self.config.schedule.retry_certs()
598
2
    }
599
2
    fn add_from_cache(
600
2
        &mut self,
601
2
        docs: HashMap<DocId, DocumentText>,
602
2
        changed: &mut bool,
603
2
    ) -> Result<()> {
604
        // Here we iterate over the documents we want, taking them from
605
        // our input and remembering them.
606
2
        let source = DocSource::LocalCache;
607
2
        let mut nonfatal_error = None;
608
4
        for id in &self.missing_docs() {
609
4
            if let Some(cert) = docs.get(id) {
610
2
                let text = cert.as_str().map_err(Error::BadUtf8InCache)?;
611
2
                let parsed = AuthCert::parse(text);
612
2
                match self.check_parsed_certificate(parsed, &source, text) {
613
2
                    Ok((cert, _text)) => {
614
2
                        self.missing_certs.remove(&cert.key_ids());
615
2
                        self.certs.push(cert);
616
2
                        *changed = true;
617
2
                    }
618
                    Err(e) => {
619
                        nonfatal_error.get_or_insert(e);
620
                    }
621
                }
622
2
            }
623
        }
624
2
        if *changed {
625
2
            self.try_checking_sigs()?;
626
        }
627
2
        opt_err_to_result(nonfatal_error)
628
2
    }
629
4
    fn add_from_download(
630
4
        &mut self,
631
4
        text: &str,
632
4
        request: &ClientRequest,
633
4
        source: DocSource,
634
4
        storage: Option<&Mutex<DynStore>>,
635
4
        changed: &mut bool,
636
4
    ) -> Result<()> {
637
4
        let asked_for: HashSet<_> = match request {
638
4
            ClientRequest::AuthCert(a) => a.keys().collect(),
639
            _ => return Err(internal!("expected an AuthCert request").into()),
640
        };
641

            
642
4
        let mut nonfatal_error = None;
643
4
        let mut newcerts = Vec::new();
644
4
        for cert in
645
4
            AuthCert::parse_multiple(text).map_err(|e| Error::from_netdoc(source.clone(), e))?
646
        {
647
4
            match self.check_parsed_certificate(cert, &source, text) {
648
4
                Ok((cert, cert_text)) => {
649
4
                    newcerts.push((cert, cert_text));
650
4
                }
651
                Err(e) => {
652
                    warn_report!(e, "Problem with certificate received from {}", &source);
653
                    nonfatal_error.get_or_insert(e);
654
                }
655
            }
656
        }
657

            
658
        // Now discard any certs we didn't ask for.
659
4
        let len_orig = newcerts.len();
660
4
        newcerts.retain(|(cert, _)| asked_for.contains(&cert.key_ids()));
661
4
        if newcerts.len() != len_orig {
662
2
            warn!(
663
                "Discarding certificates from {} that we didn't ask for.",
664
                source
665
            );
666
2
            nonfatal_error.get_or_insert(Error::Unwanted("Certificate we didn't request"));
667
2
        }
668

            
669
        // We want to exit early if we aren't saving any certificates.
670
4
        if newcerts.is_empty() {
671
2
            return opt_err_to_result(nonfatal_error);
672
2
        }
673

            
674
2
        if let Some(store) = storage {
675
            // Write the certificates to the store.
676
2
            let v: Vec<_> = newcerts[..]
677
2
                .iter()
678
2
                .map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
679
2
                .collect();
680
2
            let mut w = store.lock().expect("Directory storage lock poisoned");
681
2
            w.store_authcerts(&v[..])?;
682
        }
683

            
684
        // Remember the certificates in this state, and remove them
685
        // from our list of missing certs.
686
4
        for (cert, _) in newcerts {
687
2
            let ids = cert.key_ids();
688
2
            if self.missing_certs.contains(&ids) {
689
2
                self.missing_certs.remove(&ids);
690
2
                self.certs.push(cert);
691
2
                *changed = true;
692
2
            }
693
        }
694

            
695
2
        if *changed {
696
2
            self.try_checking_sigs()?;
697
        }
698
2
        opt_err_to_result(nonfatal_error)
699
4
    }
700

            
701
2
    fn advance(self: Box<Self>) -> Box<dyn DirState> {
702
        use GetCertsConsensus::*;
703
2
        match self.consensus {
704
2
            Validated(validated) => Box::new(GetMicrodescsState::new(
705
2
                self.cache_usage,
706
2
                validated,
707
2
                self.consensus_meta,
708
2
                self.rt,
709
2
                self.config,
710
2
                self.prev_netdir,
711
2
                #[cfg(feature = "dirfilter")]
712
2
                self.filter,
713
2
            )),
714
            _ => self,
715
        }
716
2
    }
717

            
718
    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
719
        self.protocol_statuses.as_ref().map(|(timestamp, protos)| {
720
            NetDirChange::SetRequiredProtocol {
721
                timestamp: *timestamp,
722
                protos: Arc::clone(protos),
723
            }
724
        })
725
    }
726

            
727
2
    fn reset_time(&self) -> Option<SystemTime> {
728
2
        Some(
729
2
            self.consensus_meta.lifetime().valid_until()
730
2
                + self.config.tolerance.post_valid_tolerance(),
731
2
        )
732
2
    }
733
2
    fn reset(self: Box<Self>) -> Box<dyn DirState> {
734
2
        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
735
            // Cache only means we can't ever download.
736
            CacheUsage::CacheOnly
737
        } else {
738
            // If we reset in this state, we should always go to "must
739
            // download": Either we've failed to get the certs we needed, or we
740
            // have found that the consensus wasn't valid.  Either case calls
741
            // for a fresh consensus download attempt.
742
2
            CacheUsage::MustDownload
743
        };
744

            
745
2
        Box::new(GetConsensusState::new(
746
2
            self.rt,
747
2
            self.config,
748
2
            cache_usage,
749
2
            self.prev_netdir,
750
2
            #[cfg(feature = "dirfilter")]
751
2
            self.filter,
752
2
        ))
753
2
    }
754
}
755

            
756
/// Final state: we're fetching or loading microdescriptors
757
#[derive(Debug, Clone)]
758
struct GetMicrodescsState<R: Runtime> {
759
    /// How should we get the consensus from the cache, if at all?
760
    cache_usage: CacheUsage,
761
    /// Total number of microdescriptors listed in the consensus.
762
    n_microdescs: usize,
763
    /// The current status of our netdir.
764
    partial: PendingNetDir,
765
    /// Metadata for the current consensus.
766
    meta: ConsensusMeta,
767
    /// A pending list of microdescriptor digests whose
768
    /// "last-listed-at" times we should update.
769
    newly_listed: Vec<MdDigest>,
770
    /// A time after which we should try to replace this directory and
771
    /// find a new one.  Since this is randomized, we only compute it
772
    /// once.
773
    reset_time: SystemTime,
774

            
775
    /// A `Runtime` implementation.
776
    rt: R,
777
    /// The configuration of the directory manager. Used for download configuration
778
    /// purposes.
779
    config: Arc<DirMgrConfig>,
780
    /// If one exists, the netdir we're trying to update.
781
    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
782

            
783
    /// A filter that gets applied to directory objects before we use them.
784
    #[cfg(feature = "dirfilter")]
785
    filter: Arc<dyn crate::filter::DirFilter>,
786
}
787

            
788
/// Information about a network directory that might not be ready to become _the_ current network
789
/// directory.
790
#[derive(Debug, Clone)]
791
enum PendingNetDir {
792
    /// A NetDir for which we have a consensus, but not enough microdescriptors.
793
    Partial(PartialNetDir),
794
    /// A NetDir we're either trying to get our caller to replace, or that the caller
795
    /// has already taken from us.
796
    ///
797
    /// After the netdir gets taken, the `collected_microdescs` and `missing_microdescs`
798
    /// fields get used. Before then, we just do operations on the netdir.
799
    Yielding {
800
        /// The actual netdir. This starts out as `Some`, but our caller can `take()` it
801
        /// from us.
802
        netdir: Option<NetDir>,
803
        /// Microdescs we have collected in order to yield to our caller.
804
        collected_microdescs: Vec<Microdesc>,
805
        /// Which microdescs we need for the netdir that either is or used to be in `netdir`.
806
        ///
807
        /// NOTE(eta): This MUST always match the netdir's own idea of which microdescs we need.
808
        ///            We do this by copying the netdir's missing microdescs into here when we
809
        ///            instantiate it.
810
        ///            (This code assumes that it doesn't add more needed microdescriptors later!)
811
        missing_microdescs: HashSet<MdDigest>,
812
        /// The time at which we should renew this netdir, assuming we have
813
        /// driven it to a "usable" state.
814
        replace_dir_time: SystemTime,
815
    },
816
    /// A dummy value, so we can use `mem::replace`.
817
    Dummy,
818
}
819

            
820
impl MdReceiver for PendingNetDir {
821
6
    fn missing_microdescs(&self) -> Box<dyn Iterator<Item = &MdDigest> + '_> {
822
6
        match self {
823
4
            PendingNetDir::Partial(partial) => partial.missing_microdescs(),
824
            PendingNetDir::Yielding {
825
2
                netdir,
826
2
                missing_microdescs,
827
                ..
828
            } => {
829
2
                if let Some(nd) = netdir.as_ref() {
830
                    nd.missing_microdescs()
831
                } else {
832
2
                    Box::new(missing_microdescs.iter())
833
                }
834
            }
835
            PendingNetDir::Dummy => unreachable!(),
836
        }
837
6
    }
838

            
839
8
    fn add_microdesc(&mut self, md: Microdesc) -> bool {
840
8
        match self {
841
8
            PendingNetDir::Partial(partial) => partial.add_microdesc(md),
842
            PendingNetDir::Yielding {
843
                netdir,
844
                missing_microdescs,
845
                collected_microdescs,
846
                ..
847
            } => {
848
                let wanted = missing_microdescs.remove(md.digest());
849
                if let Some(nd) = netdir.as_mut() {
850
                    let nd_wanted = nd.add_microdesc(md);
851
                    // This shouldn't ever happen; if it does, our invariants are violated.
852
                    debug_assert_eq!(wanted, nd_wanted);
853
                    nd_wanted
854
                } else {
855
                    collected_microdescs.push(md);
856
                    wanted
857
                }
858
            }
859
            PendingNetDir::Dummy => unreachable!(),
860
        }
861
8
    }
862

            
863
14
    fn n_missing(&self) -> usize {
864
14
        match self {
865
12
            PendingNetDir::Partial(partial) => partial.n_missing(),
866
            PendingNetDir::Yielding {
867
2
                netdir,
868
2
                missing_microdescs,
869
                ..
870
            } => {
871
2
                if let Some(nd) = netdir.as_ref() {
872
                    // This shouldn't ever happen; if it does, our invariants are violated.
873
                    debug_assert_eq!(nd.n_missing(), missing_microdescs.len());
874
                    nd.n_missing()
875
                } else {
876
2
                    missing_microdescs.len()
877
                }
878
            }
879
            PendingNetDir::Dummy => unreachable!(),
880
        }
881
14
    }
882
}
883

            
884
impl PendingNetDir {
885
    /// If this PendingNetDir is Partial and could not be partial, upgrade it.
886
10
    fn upgrade_if_necessary(&mut self) {
887
10
        if matches!(self, PendingNetDir::Partial(..)) {
888
10
            match mem::replace(self, PendingNetDir::Dummy) {
889
10
                PendingNetDir::Partial(p) => match p.unwrap_if_sufficient() {
890
2
                    Ok(nd) => {
891
2
                        let missing: HashSet<_> = nd.missing_microdescs().copied().collect();
892
2
                        let replace_dir_time = pick_download_time(nd.lifetime());
893
2
                        debug!(
894
                            "Consensus now usable, with {} microdescriptors missing. \
895
                                The current consensus is fresh until {}, and valid until {}. \
896
                                I've picked {} as the earliest time to replace it.",
897
                            missing.len(),
898
                            OffsetDateTime::from(nd.lifetime().fresh_until()),
899
                            OffsetDateTime::from(nd.lifetime().valid_until()),
900
                            OffsetDateTime::from(replace_dir_time)
901
                        );
902
2
                        *self = PendingNetDir::Yielding {
903
2
                            netdir: Some(nd),
904
2
                            collected_microdescs: vec![],
905
2
                            missing_microdescs: missing,
906
2
                            replace_dir_time,
907
2
                        };
908
                    }
909
8
                    Err(p) => {
910
8
                        *self = PendingNetDir::Partial(p);
911
8
                    }
912
                },
913
                _ => unreachable!(),
914
            }
915
        }
916
10
        assert!(!matches!(self, PendingNetDir::Dummy));
917
10
    }
918
}
919

            
920
impl<R: Runtime> GetMicrodescsState<R> {
921
    /// Create a new [`GetMicrodescsState`] from a provided
922
    /// microdescriptor consensus.
923
6
    fn new(
924
6
        cache_usage: CacheUsage,
925
6
        consensus: MdConsensus,
926
6
        meta: ConsensusMeta,
927
6
        rt: R,
928
6
        config: Arc<DirMgrConfig>,
929
6
        prev_netdir: Option<Arc<dyn PreviousNetDir>>,
930
6
        #[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
931
6
    ) -> Self {
932
6
        let reset_time =
933
6
            consensus.lifetime().valid_until() + config.tolerance.post_valid_tolerance();
934
6
        let n_microdescs = consensus.relays().len();
935

            
936
6
        let params = &config.override_net_params;
937
        #[cfg(not(feature = "geoip"))]
938
        let mut partial_dir = PartialNetDir::new(consensus, Some(params));
939
        // TODO(eta): Make this embedded database configurable using the `DirMgrConfig`.
940
        #[cfg(feature = "geoip")]
941
6
        let mut partial_dir =
942
6
            PartialNetDir::new_with_geoip(consensus, Some(params), &GeoipDb::new_embedded());
943

            
944
6
        if let Some(old_dir) = prev_netdir.as_ref().and_then(|x| x.get_netdir()) {
945
            partial_dir.fill_from_previous_netdir(old_dir);
946
6
        }
947

            
948
        // Always upgrade at least once: otherwise, we won't notice we're ready unless we
949
        // add a microdescriptor.
950
6
        let mut partial = PendingNetDir::Partial(partial_dir);
951
6
        partial.upgrade_if_necessary();
952

            
953
6
        GetMicrodescsState {
954
6
            cache_usage,
955
6
            n_microdescs,
956
6
            partial,
957
6
            meta,
958
6
            newly_listed: Vec::new(),
959
6
            reset_time,
960
6
            rt,
961
6
            config,
962
6
            prev_netdir,
963
6

            
964
6
            #[cfg(feature = "dirfilter")]
965
6
            filter,
966
6
        }
967
6
    }
968

            
969
    /// Add a bunch of microdescriptors to the in-progress netdir.
970
4
    fn register_microdescs<I>(&mut self, mds: I, _source: &DocSource, changed: &mut bool)
971
4
    where
972
4
        I: IntoIterator<Item = Microdesc>,
973
    {
974
        #[cfg(feature = "dirfilter")]
975
4
        let mds: Vec<Microdesc> = mds
976
4
            .into_iter()
977
8
            .filter_map(|m| self.filter.filter_md(m).ok())
978
4
            .collect();
979
4
        let is_partial = matches!(self.partial, PendingNetDir::Partial(..));
980
12
        for md in mds {
981
8
            if is_partial {
982
8
                self.newly_listed.push(*md.digest());
983
8
            }
984
8
            self.partial.add_microdesc(md);
985
8
            *changed = true;
986
        }
987
4
        self.partial.upgrade_if_necessary();
988
4
    }
989
}
990

            
991
impl<R: Runtime> DirState for GetMicrodescsState<R> {
992
4
    fn describe(&self) -> String {
993
4
        format!(
994
4
            "Downloading microdescriptors (we are missing {}).",
995
4
            self.partial.n_missing()
996
        )
997
4
    }
998
6
    fn missing_docs(&self) -> Vec<DocId> {
999
6
        self.partial
6
            .missing_microdescs()
14
            .map(|d| DocId::Microdesc(*d))
6
            .collect()
6
    }
2
    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
2
        match self.partial {
            PendingNetDir::Yielding {
2
                ref mut netdir,
2
                ref mut collected_microdescs,
                ..
            } => {
2
                if netdir.is_some() {
2
                    Some(NetDirChange::AttemptReplace {
2
                        netdir,
2
                        consensus_meta: &self.meta,
2
                    })
                } else {
                    collected_microdescs
                        .is_empty()
                        .then_some(NetDirChange::AddMicrodescs(collected_microdescs))
                }
            }
            _ => None,
        }
2
    }
18
    fn is_ready(&self, ready: Readiness) -> bool {
18
        match ready {
6
            Readiness::Complete => self.partial.n_missing() == 0,
            Readiness::Usable => {
                // We're "usable" if the calling code thought our netdir was usable enough to
                // steal it.
2
                matches!(self.partial, PendingNetDir::Yielding { ref netdir, .. } if netdir.is_none())
            }
        }
18
    }
4
    fn can_advance(&self) -> bool {
4
        false
4
    }
4
    fn bootstrap_progress(&self) -> DirProgress {
4
        let n_present = self.n_microdescs - self.partial.n_missing();
4
        DirProgress::Validated {
4
            lifetime: self.meta.lifetime().clone(),
4
            usable_lifetime: self.config.tolerance.extend_lifetime(self.meta.lifetime()),
4
            n_mds: (n_present as u32, self.n_microdescs as u32),
4
            usable: self.is_ready(Readiness::Usable),
4
        }
4
    }
2
    fn dl_config(&self) -> DownloadSchedule {
2
        self.config.schedule.retry_microdescs()
2
    }
2
    fn add_from_cache(
2
        &mut self,
2
        docs: HashMap<DocId, DocumentText>,
2
        changed: &mut bool,
2
    ) -> Result<()> {
2
        let mut microdescs = Vec::new();
4
        for (id, text) in docs {
2
            if let DocId::Microdesc(digest) = id {
2
                if let Ok(md) = Microdesc::parse(text.as_str().map_err(Error::BadUtf8InCache)?) {
2
                    if md.digest() == &digest {
2
                        microdescs.push(md);
2
                        continue;
                    }
                }
                warn!("Found a mismatched microdescriptor in cache; ignoring");
            }
        }
2
        self.register_microdescs(microdescs, &DocSource::LocalCache, changed);
2
        Ok(())
2
    }
2
    fn add_from_download(
2
        &mut self,
2
        text: &str,
2
        request: &ClientRequest,
2
        source: DocSource,
2
        storage: Option<&Mutex<DynStore>>,
2
        changed: &mut bool,
2
    ) -> Result<()> {
2
        let requested: HashSet<_> = if let ClientRequest::Microdescs(req) = request {
2
            req.digests().collect()
        } else {
            return Err(internal!("expected a microdesc request").into());
        };
2
        let mut new_mds = Vec::new();
2
        let mut nonfatal_err = None;
6
        for anno in MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
2
            .map_err(|e| Error::from_netdoc(source.clone(), e))?
        {
6
            let anno = match anno {
                Err(e) => {
                    nonfatal_err.get_or_insert_with(|| Error::from_netdoc(source.clone(), e));
                    continue;
                }
6
                Ok(a) => a,
            };
6
            let txt = anno
6
                .within(text)
6
                .expect("microdesc not from within text as expected");
6
            let md = anno.into_microdesc();
6
            if !requested.contains(md.digest()) {
                warn!(
                    "Received microdescriptor from {} we did not ask for: {:?}",
                    source,
                    md.digest()
                );
                nonfatal_err.get_or_insert(Error::Unwanted("un-requested microdescriptor"));
                continue;
6
            }
6
            new_mds.push((txt, md));
        }
2
        let mark_listed = self.meta.lifetime().valid_after();
2
        if let Some(store) = storage {
2
            let mut s = store
2
                .lock()
                //.get_mut()
2
                .expect("Directory storage lock poisoned");
2
            if !self.newly_listed.is_empty() {
2
                s.update_microdescs_listed(&self.newly_listed, mark_listed)?;
2
                self.newly_listed.clear();
            }
2
            if !new_mds.is_empty() {
2
                s.store_microdescs(
2
                    &new_mds
2
                        .iter()
6
                        .map(|(text, md)| (*text, md.digest()))
2
                        .collect::<Vec<_>>(),
2
                    mark_listed,
                )?;
            }
        }
2
        self.register_microdescs(new_mds.into_iter().map(|(_, md)| md), &source, changed);
2
        opt_err_to_result(nonfatal_err)
2
    }
    fn advance(self: Box<Self>) -> Box<dyn DirState> {
        self
    }
2
    fn reset_time(&self) -> Option<SystemTime> {
        // TODO(nickm): The reset logic is a little wonky here: we don't truly
        // want to _reset_ this state at `replace_dir_time`.  In fact, we ought
        // to be able to have multiple states running in parallel: one filling
        // in the mds for an old consensus, and one trying to fetch a better
        // one.  That's likely to require some amount of refactoring of the
        // bootstrap code.
        Some(match self.partial {
            // If the client has taken a completed netdir, the netdir is now
            // usable: We can reset our download attempt when we choose to try
            // to replace this directory.
            PendingNetDir::Yielding {
                replace_dir_time,
                netdir: None,
                ..
            } => replace_dir_time,
            // We don't have a completed netdir: Keep trying to fill this one in
            // until it is _definitely_ unusable.  (Our clock might be skewed;
            // there might be no up-to-date consensus.)
2
            _ => self.reset_time,
        })
2
    }
2
    fn reset(self: Box<Self>) -> Box<dyn DirState> {
2
        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
            // Cache only means we can't ever download.
            CacheUsage::CacheOnly
2
        } else if self.is_ready(Readiness::Usable) {
            // If we managed to bootstrap a usable consensus, then we won't
            // accept our next consensus from the cache.
            CacheUsage::MustDownload
        } else {
            // If we didn't manage to bootstrap a usable consensus, then we can
            // indeed try again with the one in the cache.
            // TODO(nickm) is this right?
2
            CacheUsage::CacheOkay
        };
2
        Box::new(GetConsensusState::new(
2
            self.rt,
2
            self.config,
2
            cache_usage,
2
            self.prev_netdir,
2
            #[cfg(feature = "dirfilter")]
2
            self.filter,
2
        ))
2
    }
}
/// Choose a random download time to replace a consensus whose lifetime
/// is `lifetime`.
202
fn pick_download_time(lifetime: &Lifetime) -> SystemTime {
202
    let (lowbound, uncertainty) = client_download_range(lifetime);
202
    lowbound + rand::rng().gen_range_infallible(..=uncertainty)
202
}
/// Based on the lifetime for a consensus, return the time range during which
/// clients should fetch the next one.
204
fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
204
    let valid_after = lt.valid_after();
204
    let valid_until = lt.valid_until();
204
    let voting_interval = lt.voting_period();
204
    let whole_lifetime = valid_until
204
        .duration_since(valid_after)
204
        .expect("valid-after must precede valid-until");
    // From dir-spec:
    // "This time is chosen uniformly at random from the interval
    // between the time 3/4 into the first interval after the
    // consensus is no longer fresh, and 7/8 of the time remaining
    // after that before the consensus is invalid."
204
    let lowbound = voting_interval + (voting_interval * 3) / 4;
204
    let remainder = whole_lifetime
204
        .checked_sub(lowbound)
204
        .expect("Arithmetic did not work as expected");
204
    let uncertainty = (remainder * 7) / 8;
204
    (valid_after + lowbound, uncertainty)
204
}
/// If `err` is some, return `Err(err)`.  Otherwise return Ok(()).
8
fn opt_err_to_result(e: Option<Error>) -> Result<()> {
8
    match e {
2
        Some(e) => Err(e),
6
        None => Ok(()),
    }
8
}
/// A dummy state implementation, used when we need to temporarily write a
/// placeholder into a box.
///
/// Calling any method on this state will panic.
#[derive(Clone, Debug)]
pub(crate) struct PoisonedState;
impl DirState for PoisonedState {
    fn describe(&self) -> String {
        unimplemented!()
    }
    fn missing_docs(&self) -> Vec<DocId> {
        unimplemented!()
    }
    fn is_ready(&self, _ready: Readiness) -> bool {
        unimplemented!()
    }
    fn can_advance(&self) -> bool {
        unimplemented!()
    }
    fn add_from_cache(
        &mut self,
        _docs: HashMap<DocId, DocumentText>,
        _changed: &mut bool,
    ) -> Result<()> {
        unimplemented!()
    }
    fn add_from_download(
        &mut self,
        _text: &str,
        _request: &ClientRequest,
        _source: DocSource,
        _storage: Option<&Mutex<DynStore>>,
        _changed: &mut bool,
    ) -> Result<()> {
        unimplemented!()
    }
    fn bootstrap_progress(&self) -> event::DirProgress {
        unimplemented!()
    }
    fn dl_config(&self) -> DownloadSchedule {
        unimplemented!()
    }
    fn advance(self: Box<Self>) -> Box<dyn DirState> {
        unimplemented!()
    }
    fn reset_time(&self) -> Option<SystemTime> {
        unimplemented!()
    }
    fn reset(self: Box<Self>) -> Box<dyn DirState> {
        unimplemented!()
    }
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_time_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    #![allow(clippy::cognitive_complexity)]
    use super::*;
    use std::convert::TryInto;
    use std::sync::Arc;
    use tempfile::TempDir;
    use time::macros::datetime;
    use tor_dircommon::{
        authority::{AuthorityContacts, AuthorityContactsBuilder},
        config::{DownloadScheduleConfig, NetworkConfig},
    };
    use tor_netdoc::doc::authcert::AuthCertKeyIds;
    use tor_rtcompat::RuntimeSubstExt as _;
    use tor_rtmock::simple_time::SimpleMockTimeProvider;
    #[test]
    fn download_schedule() {
        let va = datetime!(2008-08-02 20:00 UTC).into();
        let fu = datetime!(2008-08-02 21:00 UTC).into();
        let vu = datetime!(2008-08-02 23:00 UTC).into();
        let lifetime = Lifetime::new(va, fu, vu).unwrap();
        let expected_start: SystemTime = datetime!(2008-08-02 21:45 UTC).into();
        let expected_range = Duration::from_millis((75 * 60 * 1000) * 7 / 8);
        let (start, range) = client_download_range(&lifetime);
        assert_eq!(start, expected_start);
        assert_eq!(range, expected_range);
        for _ in 0..100 {
            let when = pick_download_time(&lifetime);
            assert!(when > va);
            assert!(when >= expected_start);
            assert!(when < vu);
            assert!(when <= expected_start + range);
        }
    }
    /// Makes a memory-backed storage.
    fn temp_store() -> (TempDir, Mutex<DynStore>) {
        let tempdir = TempDir::new().unwrap();
        let store = crate::storage::SqliteStore::from_path_and_mistrust(
            tempdir.path(),
            &fs_mistrust::Mistrust::new_dangerously_trust_everyone(),
            false,
        )
        .unwrap();
        (tempdir, Mutex::new(Box::new(store)))
    }
    fn make_time_shifted_runtime(now: SystemTime, rt: impl Runtime) -> impl Runtime {
        let msp = SimpleMockTimeProvider::from_wallclock(now);
        rt.with_sleep_provider(msp.clone())
            .with_coarse_time_provider(msp)
    }
    fn make_dirmgr_config(authorities: Option<AuthorityContactsBuilder>) -> Arc<DirMgrConfig> {
        let mut netcfg = NetworkConfig::builder();
        netcfg.set_fallback_caches(vec![]);
        if let Some(a) = authorities {
            *netcfg.authorities() = a;
        }
        let cfg = DirMgrConfig {
            cache_dir: "/we_will_never_use_this/".into(),
            network: netcfg.build().unwrap(),
            ..Default::default()
        };
        Arc::new(cfg)
    }
    // Test data
    const CONSENSUS: &str = include_str!("../testdata/mdconsensus1.txt");
    const CONSENSUS2: &str = include_str!("../testdata/mdconsensus2.txt");
    const AUTHCERT_5696: &str = include_str!("../testdata/cert-5696.txt");
    const AUTHCERT_5A23: &str = include_str!("../testdata/cert-5A23.txt");
    #[allow(unused)]
    const AUTHCERT_7C47: &str = include_str!("../testdata/cert-7C47.txt");
    fn test_time() -> SystemTime {
        datetime!(2020-08-07 12:42:45 UTC).into()
    }
    fn rsa(s: &str) -> RsaIdentity {
        RsaIdentity::from_hex(s).unwrap()
    }
    fn test_authorities() -> AuthorityContactsBuilder {
        let mut builder = AuthorityContacts::builder();
        builder
            .v3idents()
            .push(rsa("5696AB38CB3852AFA476A5C07B2D4788963D5567"));
        builder
            .v3idents()
            .push(rsa("5A23BA701776C9C1AB1C06E734E92AB3D5350D64"));
        builder
    }
    fn authcert_id_5696() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("5696ab38cb3852afa476a5c07b2d4788963d5567"),
            sk_fingerprint: rsa("f6ed4aa64d83caede34e19693a7fcf331aae8a6a"),
        }
    }
    fn authcert_id_5a23() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("5a23ba701776c9c1ab1c06e734e92ab3d5350d64"),
            sk_fingerprint: rsa("d08e965cc6dcb6cb6ed776db43e616e93af61177"),
        }
    }
    // remember, we're saying that we don't recognize this one as an authority.
    fn authcert_id_7c47() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
            sk_fingerprint: rsa("D3C013E0E6C82E246090D1C0798B75FCB7ACF120"),
        }
    }
    fn microdescs() -> HashMap<MdDigest, String> {
        const MICRODESCS: &str = include_str!("../testdata/microdescs.txt");
        let text = MICRODESCS;
        MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
            .unwrap()
            .map(|res| {
                let anno = res.unwrap();
                let text = anno.within(text).unwrap();
                let md = anno.into_microdesc();
                (*md.digest(), text.to_owned())
            })
            .collect()
    }
    #[test]
    fn get_consensus_state() {
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
            let rt = make_time_shifted_runtime(test_time(), rt);
            let cfg = make_dirmgr_config(None);
            let (_tempdir, store) = temp_store();
            let mut state = GetConsensusState::new(
                rt.clone(),
                cfg,
                CacheUsage::CacheOkay,
                None,
                #[cfg(feature = "dirfilter")]
                Arc::new(crate::filter::NilFilter),
            );
            // Is description okay?
            assert_eq!(&state.describe(), "Looking for a consensus.");
            // Basic properties: without a consensus it is not ready to advance.
            assert!(!state.can_advance());
            assert!(!state.is_ready(Readiness::Complete));
            assert!(!state.is_ready(Readiness::Usable));
            // Basic properties: it doesn't want to reset.
            assert!(state.reset_time().is_none());
            // Its starting DirStatus is "fetching a consensus".
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching a consensus"
            );
            // Download configuration is simple: only 1 request can be done in
            // parallel.  It uses a consensus retry schedule.
            let retry = state.dl_config();
            assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus());
            // Do we know what we want?
            let docs = state.missing_docs();
            assert_eq!(docs.len(), 1);
            let docid = docs[0];
            assert!(matches!(
                docid,
                DocId::LatestConsensus {
                    flavor: ConsensusFlavor::Microdesc,
                    cache_usage: CacheUsage::CacheOkay,
                }
            ));
            let source = DocSource::DirServer { source: None };
            // Now suppose that we get some complete junk from a download.
            let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
            let req = crate::docid::ClientRequest::Consensus(req);
            let mut changed = false;
            let outcome = state.add_from_download(
                "this isn't a consensus",
                &req,
                source.clone(),
                Some(&store),
                &mut changed,
            );
            assert!(matches!(outcome, Err(Error::NetDocError { .. })));
            assert!(!changed);
            // make sure it wasn't stored...
            assert!(
                store
                    .lock()
                    .unwrap()
                    .latest_consensus(ConsensusFlavor::Microdesc, None)
                    .unwrap()
                    .is_none()
            );
            // Now try again, with a real consensus... but the wrong authorities.
            let mut changed = false;
            let outcome = state.add_from_download(
                CONSENSUS,
                &req,
                source.clone(),
                Some(&store),
                &mut changed,
            );
            assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities)));
            assert!(!changed);
            assert!(
                store
                    .lock()
                    .unwrap()
                    .latest_consensus(ConsensusFlavor::Microdesc, None)
                    .unwrap()
                    .is_none()
            );
            // Great. Change the receiver to use a configuration where these test
            // authorities are recognized.
            let cfg = make_dirmgr_config(Some(test_authorities()));
            let mut state = GetConsensusState::new(
                rt.clone(),
                cfg,
                CacheUsage::CacheOkay,
                None,
                #[cfg(feature = "dirfilter")]
                Arc::new(crate::filter::NilFilter),
            );
            let mut changed = false;
            let outcome =
                state.add_from_download(CONSENSUS, &req, source, Some(&store), &mut changed);
            assert!(outcome.is_ok());
            assert!(changed);
            assert!(
                store
                    .lock()
                    .unwrap()
                    .latest_consensus(ConsensusFlavor::Microdesc, None)
                    .unwrap()
                    .is_some()
            );
            // And with that, we should be asking for certificates
            assert!(state.can_advance());
            assert_eq!(&state.describe(), "About to fetch certificates.");
            assert_eq!(state.missing_docs(), Vec::new());
            let next = Box::new(state).advance();
            assert_eq!(
                &next.describe(),
                "Downloading certificates for consensus (we are missing 2/2)."
            );
            // Try again, but this time get the state from the cache.
            let cfg = make_dirmgr_config(Some(test_authorities()));
            let mut state = GetConsensusState::new(
                rt,
                cfg,
                CacheUsage::CacheOkay,
                None,
                #[cfg(feature = "dirfilter")]
                Arc::new(crate::filter::NilFilter),
            );
            let text: crate::storage::InputString = CONSENSUS.to_owned().into();
            let map = vec![(docid, text.into())].into_iter().collect();
            let mut changed = false;
            let outcome = state.add_from_cache(map, &mut changed);
            assert!(outcome.is_ok());
            assert!(changed);
            assert!(state.can_advance());
        });
    }
    #[test]
    fn get_certs_state() {
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
            /// Construct a GetCertsState with our test data
            fn new_getcerts_state(rt: impl Runtime) -> Box<dyn DirState> {
                let rt = make_time_shifted_runtime(test_time(), rt);
                let cfg = make_dirmgr_config(Some(test_authorities()));
                let mut state = GetConsensusState::new(
                    rt,
                    cfg,
                    CacheUsage::CacheOkay,
                    None,
                    #[cfg(feature = "dirfilter")]
                    Arc::new(crate::filter::NilFilter),
                );
                let source = DocSource::DirServer { source: None };
                let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
                let req = crate::docid::ClientRequest::Consensus(req);
                let mut changed = false;
                let outcome = state.add_from_download(CONSENSUS, &req, source, None, &mut changed);
                assert!(outcome.is_ok());
                Box::new(state).advance()
            }
            let (_tempdir, store) = temp_store();
            let mut state = new_getcerts_state(rt.clone());
            // Basic properties: description, status, reset time.
            assert_eq!(
                &state.describe(),
                "Downloading certificates for consensus (we are missing 2/2)."
            );
            assert!(!state.can_advance());
            assert!(!state.is_ready(Readiness::Complete));
            assert!(!state.is_ready(Readiness::Usable));
            let consensus_expires: SystemTime = datetime!(2020-08-07 12:43:20 UTC).into();
            let post_valid_tolerance = crate::DirTolerance::default().post_valid_tolerance();
            assert_eq!(
                state.reset_time(),
                Some(consensus_expires + post_valid_tolerance)
            );
            let retry = state.dl_config();
            assert_eq!(retry, DownloadScheduleConfig::default().retry_certs());
            // Bootstrap status okay?
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching authority certificates (0/2)"
            );
            // Check that we get the right list of missing docs.
            let missing = state.missing_docs();
            assert_eq!(missing.len(), 2); // We are missing two certificates.
            assert!(missing.contains(&DocId::AuthCert(authcert_id_5696())));
            assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
            // we don't ask for this one because we don't recognize its authority
            assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47())));
            // Add one from the cache; make sure the list is still right
            let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into();
            // let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into();
            let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())]
                .into_iter()
                .collect();
            let mut changed = false;
            let outcome = state.add_from_cache(docs, &mut changed);
            assert!(changed);
            assert!(outcome.is_ok()); // no error, and something changed.
            assert!(!state.can_advance()); // But we aren't done yet.
            let missing = state.missing_docs();
            assert_eq!(missing.len(), 1); // Now we're only missing one!
            assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching authority certificates (1/2)"
            );
            // Now try to add the other from a download ... but fail
            // because we didn't ask for it.
            let source = DocSource::DirServer { source: None };
            let mut req = tor_dirclient::request::AuthCertRequest::new();
            req.push(authcert_id_5696()); // it's the wrong id.
            let req = ClientRequest::AuthCert(req);
            let mut changed = false;
            let outcome = state.add_from_download(
                AUTHCERT_5A23,
                &req,
                source.clone(),
                Some(&store),
                &mut changed,
            );
            assert!(matches!(outcome, Err(Error::Unwanted(_))));
            assert!(!changed);
            let missing2 = state.missing_docs();
            assert_eq!(missing, missing2); // No change.
            assert!(
                store
                    .lock()
                    .unwrap()
                    .authcerts(&[authcert_id_5a23()])
                    .unwrap()
                    .is_empty()
            );
            // Now try to add the other from a download ... for real!
            let mut req = tor_dirclient::request::AuthCertRequest::new();
            req.push(authcert_id_5a23()); // Right idea this time!
            let req = ClientRequest::AuthCert(req);
            let mut changed = false;
            let outcome =
                state.add_from_download(AUTHCERT_5A23, &req, source, Some(&store), &mut changed);
            assert!(outcome.is_ok()); // No error, _and_ something changed!
            assert!(changed);
            let missing3 = state.missing_docs();
            assert!(missing3.is_empty());
            assert!(state.can_advance());
            assert!(
                !store
                    .lock()
                    .unwrap()
                    .authcerts(&[authcert_id_5a23()])
                    .unwrap()
                    .is_empty()
            );
            let next = state.advance();
            assert_eq!(
                &next.describe(),
                "Downloading microdescriptors (we are missing 6)."
            );
            // If we start from scratch and reset, we're back in GetConsensus.
            let state = new_getcerts_state(rt);
            let state = state.reset();
            assert_eq!(&state.describe(), "Downloading a consensus.");
            // TODO: I'd like even more tests to make sure that we never
            // accept a certificate for an authority we don't believe in.
        });
    }
    #[test]
    fn get_microdescs_state() {
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
            /// Construct a GetCertsState with our test data
            fn new_getmicrodescs_state(rt: impl Runtime) -> GetMicrodescsState<impl Runtime> {
                let rt = make_time_shifted_runtime(test_time(), rt);
                let cfg = make_dirmgr_config(Some(test_authorities()));
                let (signed, rest, consensus) = MdConsensus::parse(CONSENSUS2).unwrap();
                let consensus = consensus
                    .dangerously_assume_timely()
                    .dangerously_assume_wellsigned();
                let meta = ConsensusMeta::from_consensus(signed, rest, &consensus);
                GetMicrodescsState::new(
                    CacheUsage::CacheOkay,
                    consensus,
                    meta,
                    rt,
                    cfg,
                    None,
                    #[cfg(feature = "dirfilter")]
                    Arc::new(crate::filter::NilFilter),
                )
            }
            fn d64(s: &str) -> MdDigest {
                use base64ct::{Base64Unpadded, Encoding as _};
                Base64Unpadded::decode_vec(s).unwrap().try_into().unwrap()
            }
            // If we start from scratch and reset, we're back in GetConsensus.
            let state = new_getmicrodescs_state(rt.clone());
            let state = Box::new(state).reset();
            assert_eq!(&state.describe(), "Looking for a consensus.");
            // Check the basics.
            let mut state = new_getmicrodescs_state(rt.clone());
            assert_eq!(
                &state.describe(),
                "Downloading microdescriptors (we are missing 4)."
            );
            assert!(!state.can_advance());
            assert!(!state.is_ready(Readiness::Complete));
            assert!(!state.is_ready(Readiness::Usable));
            {
                let reset_time = state.reset_time().unwrap();
                let fresh_until: SystemTime = datetime!(2021-10-27 21:27:00 UTC).into();
                let valid_until: SystemTime = datetime!(2021-10-27 21:27:20 UTC).into();
                assert!(reset_time >= fresh_until);
                assert!(reset_time <= valid_until + state.config.tolerance.post_valid_tolerance());
            }
            let retry = state.dl_config();
            assert_eq!(retry, DownloadScheduleConfig::default().retry_microdescs());
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching microdescriptors (0/4)"
            );
            // Now check whether we're missing all the right microdescs.
            let missing = state.missing_docs();
            let md_text = microdescs();
            assert_eq!(missing.len(), 4);
            assert_eq!(md_text.len(), 4);
            let md1 = d64("LOXRj8YZP0kwpEAsYOvBZWZWGoWv5b/Bp2Mz2Us8d8g");
            let md2 = d64("iOhVp33NyZxMRDMHsVNq575rkpRViIJ9LN9yn++nPG0");
            let md3 = d64("/Cd07b3Bl0K0jX2/1cAvsYXJJMi5d8UBU+oWKaLxoGo");
            let md4 = d64("z+oOlR7Ga6cg9OoC/A3D3Ey9Rtc4OldhKlpQblMfQKo");
            for md_digest in [md1, md2, md3, md4] {
                assert!(missing.contains(&DocId::Microdesc(md_digest)));
                assert!(md_text.contains_key(&md_digest));
            }
            // Try adding a microdesc from the cache.
            let (_tempdir, store) = temp_store();
            let doc1: crate::storage::InputString = md_text.get(&md1).unwrap().clone().into();
            let docs = vec![(DocId::Microdesc(md1), doc1.into())]
                .into_iter()
                .collect();
            let mut changed = false;
            let outcome = state.add_from_cache(docs, &mut changed);
            assert!(outcome.is_ok()); // successfully loaded one MD.
            assert!(changed);
            assert!(!state.can_advance());
            assert!(!state.is_ready(Readiness::Complete));
            assert!(!state.is_ready(Readiness::Usable));
            // Now we should be missing 3.
            let missing = state.missing_docs();
            assert_eq!(missing.len(), 3);
            assert!(!missing.contains(&DocId::Microdesc(md1)));
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching microdescriptors (1/4)"
            );
            // Try adding the rest as if from a download.
            let mut req = tor_dirclient::request::MicrodescRequest::new();
            let mut response = "".to_owned();
            for md_digest in [md2, md3, md4] {
                response.push_str(md_text.get(&md_digest).unwrap());
                req.push(md_digest);
            }
            let req = ClientRequest::Microdescs(req);
            let source = DocSource::DirServer { source: None };
            let mut changed = false;
            let outcome = state.add_from_download(
                response.as_str(),
                &req,
                source,
                Some(&store),
                &mut changed,
            );
            assert!(outcome.is_ok()); // successfully loaded MDs
            assert!(changed);
            match state.get_netdir_change().unwrap() {
                NetDirChange::AttemptReplace { netdir, .. } => {
                    assert!(netdir.take().is_some());
                }
                x => panic!("wrong netdir change: {:?}", x),
            }
            assert!(state.is_ready(Readiness::Complete));
            assert!(state.is_ready(Readiness::Usable));
            assert_eq!(
                store
                    .lock()
                    .unwrap()
                    .microdescs(&[md2, md3, md4])
                    .unwrap()
                    .len(),
                3
            );
            let missing = state.missing_docs();
            assert!(missing.is_empty());
        });
    }
}