1
//! Functions to download or load directory objects, using the
2
//! state machines in the `states` module.
3

            
4
use std::num::NonZeroUsize;
5
use std::ops::Deref;
6
use std::{
7
    collections::HashMap,
8
    sync::{Arc, Weak},
9
    time::{Duration, SystemTime},
10
};
11

            
12
use crate::DirMgrConfig;
13
use crate::DocSource;
14
use crate::err::BootstrapAction;
15
use crate::state::{DirState, PoisonedState};
16
use crate::{
17
    DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
18
    docid::{self, ClientRequest},
19
    upgrade_weak_ref,
20
};
21

            
22
use futures::FutureExt;
23
use futures::StreamExt;
24
use oneshot_fused_workaround as oneshot;
25
use tor_dirclient::DirResponse;
26
use tor_error::{info_report, warn_report};
27
use tor_rtcompat::Runtime;
28
use tor_rtcompat::scheduler::TaskSchedule;
29
use tracing::{debug, info, instrument, trace, warn};
30

            
31
use crate::storage::Store;
32
#[cfg(test)]
33
use std::sync::LazyLock;
34
#[cfg(test)]
35
use std::sync::Mutex;
36
use tor_circmgr::{CircMgr, DirInfo};
37
use tor_netdir::{NetDir, NetDirProvider as _};
38
use tor_netdoc::doc::netstatus::ConsensusFlavor;
39

            
40
/// Given a Result<()>, exit the current function if it is anything other than
41
/// Ok(), or a nonfatal error.
42
macro_rules! propagate_fatal_errors {
43
    ( $e:expr ) => {
44
        let v: Result<()> = $e;
45
        if let Err(e) = v {
46
            match e.bootstrap_action() {
47
                BootstrapAction::Nonfatal => {}
48
                _ => return Err(e),
49
            }
50
        }
51
    };
52
}
53

            
54
/// Identifier for an attempt to bootstrap a directory.
55
///
56
/// Every time that we decide to download a new directory, _despite already
57
/// having one_, counts as a new attempt.
58
///
59
/// These are used to track the progress of each attempt independently.
60
#[derive(Copy, Clone, Debug, derive_more::Display, Eq, PartialEq, Ord, PartialOrd)]
61
#[display("{0}", id)]
62
pub(crate) struct AttemptId {
63
    /// Which attempt at downloading a directory is this?
64
    id: NonZeroUsize,
65
}
66

            
67
impl AttemptId {
68
    /// Return a new unused AtomicUsize that will be greater than any previous
69
    /// one.
70
    ///
71
    /// # Panics
72
    ///
73
    /// Panics if we have exhausted the possible space of AtomicIds.
74
8
    pub(crate) fn next() -> Self {
75
        use std::sync::atomic::{AtomicUsize, Ordering};
76
        /// atomic used to generate the next attempt.
77
        static NEXT: AtomicUsize = AtomicUsize::new(1);
78
8
        let id = NEXT.fetch_add(1, Ordering::Relaxed);
79
8
        let id = id.try_into().expect("Allocated too many AttemptIds");
80
8
        Self { id }
81
8
    }
82
}
83

            
84
/// If there were errors from a peer in `outcome`, record those errors by
85
/// marking the circuit (if any) as needing retirement, and noting the peer
86
/// (if any) as having failed.
87
fn note_request_outcome<R: Runtime>(
88
    circmgr: &CircMgr<R>,
89
    outcome: &tor_dirclient::Result<tor_dirclient::DirResponse>,
90
) {
91
    use tor_dirclient::{Error::RequestFailed, RequestFailedError};
92
    // Extract an error and a source from this outcome, if there is one.
93
    //
94
    // This is complicated because DirResponse can encapsulate the notion of
95
    // a response that failed part way through a download: in the case, it
96
    // has some data, and also an error.
97
    let (err, source) = match outcome {
98
        Ok(req) => {
99
            if let (Some(e), Some(source)) = (req.error(), req.source()) {
100
                (
101
                    RequestFailed(RequestFailedError {
102
                        error: e.clone(),
103
                        source: Some(source.clone()),
104
                    }),
105
                    source,
106
                )
107
            } else {
108
                return;
109
            }
110
        }
111
        Err(
112
            error @ RequestFailed(RequestFailedError {
113
                source: Some(source),
114
                ..
115
            }),
116
        ) => (error.clone(), source),
117
        _ => return,
118
    };
119

            
120
    note_cache_error(circmgr, source, &err.into());
121
}
122

            
123
/// Record that a problem has occurred because of a failure in an answer from `source`.
124
fn note_cache_error<R: Runtime>(
125
    circmgr: &CircMgr<R>,
126
    source: &tor_dirclient::SourceInfo,
127
    problem: &Error,
128
) {
129
    use tor_circmgr::ExternalActivity;
130

            
131
    if !problem.indicates_cache_failure() {
132
        return;
133
    }
134

            
135
    // Does the error here tell us whom to really blame?  If so, blame them
136
    // instead.
137
    //
138
    // (This can happen if we notice a problem while downloading a certificate,
139
    // but the real problem is that the consensus was no good.)
140
    let real_source = match problem {
141
        Error::NetDocError {
142
            source: DocSource::DirServer { source: Some(info) },
143
            ..
144
        } => info,
145
        _ => source,
146
    };
147

            
148
    info_report!(problem, "Marking {:?} as failed", real_source);
149
    circmgr.note_external_failure(real_source.cache_id(), ExternalActivity::DirCache);
150
    circmgr.retire_circ(source.unique_circ_id());
151
}
152

            
153
/// Record that `source` has successfully given us some directory info.
154
fn note_cache_success<R: Runtime>(circmgr: &CircMgr<R>, source: &tor_dirclient::SourceInfo) {
155
    use tor_circmgr::ExternalActivity;
156

            
157
    trace!("Marking {:?} as successful", source);
158
    circmgr.note_external_success(source.cache_id(), ExternalActivity::DirCache);
159
}
160

            
161
/// Load every document in `missing` and try to apply it to `state`.
162
12
fn load_and_apply_documents<R: Runtime>(
163
12
    missing: &[DocId],
164
12
    dirmgr: &Arc<DirMgr<R>>,
165
12
    state: &mut Box<dyn DirState>,
166
12
    changed: &mut bool,
167
12
) -> Result<()> {
168
    /// How many documents will we try to load at once?  We try to keep this from being too large,
169
    /// to avoid excessive RAM usage.
170
    ///
171
    /// TODO: we may well want to tune this.
172
    const CHUNK_SIZE: usize = 256;
173
12
    for chunk in missing.chunks(CHUNK_SIZE) {
174
12
        let documents = {
175
12
            let store = dirmgr.store.lock().expect("store lock poisoned");
176
12
            load_documents_from_store(chunk, &**store)?
177
        };
178

            
179
12
        state.add_from_cache(documents, changed)?;
180
    }
181

            
182
12
    Ok(())
183
12
}
184

            
185
/// Load a set of documents from a `Store`, returning all documents found in the store.
186
/// Note that this may be less than the number of documents in `missing`.
187
12
fn load_documents_from_store(
188
12
    missing: &[DocId],
189
12
    store: &dyn Store,
190
12
) -> Result<HashMap<DocId, DocumentText>> {
191
12
    let mut loaded = HashMap::new();
192
12
    for query in docid::partition_by_type(missing.iter().copied()).values() {
193
12
        query.load_from_store_into(&mut loaded, store)?;
194
    }
195
12
    Ok(loaded)
196
12
}
197

            
198
/// Construct an appropriate ClientRequest to download a consensus
199
/// of the given flavor.
200
8
pub(crate) fn make_consensus_request(
201
8
    now: SystemTime,
202
8
    flavor: ConsensusFlavor,
203
8
    store: &dyn Store,
204
8
    config: &DirMgrConfig,
205
8
) -> Result<ClientRequest> {
206
8
    let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
207

            
208
8
    let default_cutoff = crate::default_consensus_cutoff(now, &config.tolerance)?;
209

            
210
8
    match store.latest_consensus_meta(flavor) {
211
4
        Ok(Some(meta)) => {
212
4
            let valid_after = meta.lifetime().valid_after();
213
4
            request.set_last_consensus_date(std::cmp::max(valid_after, default_cutoff));
214
4
            request.push_old_consensus_digest(*meta.sha3_256_of_signed());
215
4
        }
216
4
        latest => {
217
4
            if let Err(e) = latest {
218
                warn_report!(e, "Error loading directory metadata");
219
4
            }
220
            // If we don't have a consensus, then request one that's
221
            // "reasonably new".  That way, our clock is set far in the
222
            // future, we won't download stuff we can't use.
223
4
            request.set_last_consensus_date(default_cutoff);
224
        }
225
    }
226

            
227
8
    request.set_skew_limit(
228
        // If we are _fast_ by at least this much, then any valid directory will
229
        // seem to be at least this far in the past.
230
8
        config.tolerance.post_valid_tolerance(),
231
        // If we are _slow_ by this much, then any valid directory will seem to
232
        // be at least this far in the future.
233
8
        config.tolerance.pre_valid_tolerance(),
234
    );
235

            
236
8
    Ok(ClientRequest::Consensus(request))
237
8
}
238

            
239
/// Construct a set of `ClientRequest`s in order to fetch the documents in `docs`.
240
14
pub(crate) fn make_requests_for_documents<R: Runtime>(
241
14
    rt: &R,
242
14
    docs: &[DocId],
243
14
    store: &dyn Store,
244
14
    config: &DirMgrConfig,
245
14
) -> Result<Vec<ClientRequest>> {
246
14
    let mut res = Vec::new();
247
18
    for q in docid::partition_by_type(docs.iter().copied())
248
14
        .into_iter()
249
14
        .flat_map(|(_, x)| x.split_for_download().into_iter())
250
    {
251
18
        match q {
252
4
            DocQuery::LatestConsensus { flavor, .. } => {
253
4
                res.push(make_consensus_request(
254
4
                    rt.wallclock(),
255
4
                    flavor,
256
4
                    store,
257
4
                    config,
258
                )?);
259
            }
260
2
            DocQuery::AuthCert(ids) => {
261
2
                res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
262
2
            }
263
8
            DocQuery::Microdesc(ids) => {
264
8
                res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
265
8
            }
266
            #[cfg(feature = "routerdesc")]
267
4
            DocQuery::RouterDesc(ids) => {
268
4
                res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
269
4
            }
270
        }
271
    }
272
14
    Ok(res)
273
14
}
274

            
275
/// Launch a single client request and get an associated response.
276
#[instrument(level = "trace", skip_all)]
277
async fn fetch_single<R: Runtime>(
278
    rt: &R,
279
    request: ClientRequest,
280
    current_netdir: Option<&NetDir>,
281
    circmgr: Arc<CircMgr<R>>,
282
) -> Result<(ClientRequest, DirResponse)> {
283
    let dirinfo: DirInfo = match current_netdir {
284
        Some(netdir) => netdir.into(),
285
        None => tor_circmgr::DirInfo::Nothing,
286
    };
287
    let outcome =
288
        tor_dirclient::get_resource(request.as_requestable(), dirinfo, rt, circmgr.clone()).await;
289

            
290
    note_request_outcome(&circmgr, &outcome);
291

            
292
    let resource = outcome?;
293
    Ok((request, resource))
294
}
295

            
296
/// Testing helper: if this is Some, then we return it in place of any
297
/// response to fetch_multiple.
298
///
299
/// Note that only one test uses this: otherwise there would be a race
300
/// condition. :p
301
#[cfg(test)]
302
2
static CANNED_RESPONSE: LazyLock<Mutex<Vec<String>>> = LazyLock::new(|| Mutex::new(vec![]));
303

            
304
/// Launch a set of download requests for a set of missing objects in
305
/// `missing`, and return each request along with the response it received.
306
///
307
/// Don't launch more than `parallelism` requests at once.
308
#[allow(clippy::cognitive_complexity)] // TODO: maybe refactor?
309
#[instrument(level = "trace", skip_all)]
310
2
async fn fetch_multiple<R: Runtime>(
311
2
    dirmgr: Arc<DirMgr<R>>,
312
2
    attempt_id: AttemptId,
313
2
    missing: &[DocId],
314
2
    parallelism: usize,
315
2
) -> Result<Vec<(ClientRequest, DirResponse)>> {
316
    let requests = {
317
        let store = dirmgr.store.lock().expect("store lock poisoned");
318
        make_requests_for_documents(&dirmgr.runtime, missing, &**store, &dirmgr.config.get())?
319
    };
320

            
321
    trace!(attempt=%attempt_id, "Launching {} requests for {} documents",
322
           requests.len(), missing.len());
323

            
324
    #[cfg(test)]
325
    {
326
        let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
327
        if !m.is_empty() {
328
            return Ok(requests
329
                .into_iter()
330
                .zip(m.iter().map(DirResponse::from_get_body))
331
                .collect());
332
        }
333
    }
334

            
335
    let circmgr = dirmgr.circmgr()?;
336
    // Only use timely directories for bootstrapping directories; otherwise, we'll try fallbacks.
337
    let netdir = dirmgr.netdir(tor_netdir::Timeliness::Timely).ok();
338

            
339
    // TODO: instead of waiting for all the queries to finish, we
340
    // could stream the responses back or something.
341
    let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
342
        .map(|query| fetch_single(&dirmgr.runtime, query, netdir.as_deref(), circmgr.clone()))
343
        .buffer_unordered(parallelism)
344
        .collect()
345
        .await;
346

            
347
    let mut useful_responses = Vec::new();
348
    for r in responses {
349
        // TODO: on some error cases we might want to stop using this source.
350
        match r {
351
            Ok((request, response)) => {
352
                if response.status_code() == 200 {
353
                    useful_responses.push((request, response));
354
                } else {
355
                    trace!(
356
                        "cache declined request; reported status {:?}",
357
                        response.status_code()
358
                    );
359
                }
360
            }
361
            Err(e) => warn_report!(e, "error while downloading"),
362
        }
363
    }
364

            
365
    trace!(attempt=%attempt_id, "received {} useful responses from our requests.", useful_responses.len());
366

            
367
    Ok(useful_responses)
368
2
}
369

            
370
/// Try to update `state` by loading cached information from `dirmgr`.
371
14
fn load_once<R: Runtime>(
372
14
    dirmgr: &Arc<DirMgr<R>>,
373
14
    state: &mut Box<dyn DirState>,
374
14
    attempt_id: AttemptId,
375
14
    changed_out: &mut bool,
376
14
) -> Result<()> {
377
14
    let missing = state.missing_docs();
378
14
    let mut changed = false;
379
14
    let outcome: Result<()> = if missing.is_empty() {
380
2
        trace!("Found no missing documents; can't advance current state");
381
2
        Ok(())
382
    } else {
383
12
        trace!(
384
            "Found {} missing documents; trying to load them",
385
            missing.len()
386
        );
387

            
388
12
        load_and_apply_documents(&missing, dirmgr, state, &mut changed)
389
    };
390

            
391
    // We have to update the status here regardless of the outcome, if we got
392
    // any information: even if there was an error, we might have received
393
    // partial information that changed our status.
394
14
    if changed {
395
12
        dirmgr.update_progress(attempt_id, state.bootstrap_progress());
396
12
        *changed_out = true;
397
12
    }
398

            
399
14
    outcome
400
14
}
401

            
402
/// Try to load as much state as possible for a provided `state` from the
403
/// cache in `dirmgr`, advancing the state to the extent possible.
404
///
405
/// No downloads are performed; the provided state will not be reset.
406
#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Somewhat due to tracing.
407
2
pub(crate) fn load<R: Runtime>(
408
2
    dirmgr: &Arc<DirMgr<R>>,
409
2
    mut state: Box<dyn DirState>,
410
2
    attempt_id: AttemptId,
411
2
) -> Result<Box<dyn DirState>> {
412
2
    let mut safety_counter = 0_usize;
413
    loop {
414
6
        trace!(attempt=%attempt_id, state=%state.describe(), "Loading from cache");
415
6
        let mut changed = false;
416
6
        let outcome = load_once(dirmgr, &mut state, attempt_id, &mut changed);
417
        {
418
6
            let mut store = dirmgr.store.lock().expect("store lock poisoned");
419
6
            dirmgr.apply_netdir_changes(&mut state, &mut **store)?;
420
6
            dirmgr.update_progress(attempt_id, state.bootstrap_progress());
421
        }
422
6
        trace!(attempt=%attempt_id, ?outcome, "Load operation completed.");
423

            
424
6
        if let Err(e) = outcome {
425
            match e.bootstrap_action() {
426
                BootstrapAction::Nonfatal => {
427
                    debug!("Recoverable error loading from cache: {}", e);
428
                }
429
                BootstrapAction::Fatal | BootstrapAction::Reset => {
430
                    return Err(e);
431
                }
432
            }
433
6
        }
434

            
435
6
        if state.can_advance() {
436
2
            state = state.advance();
437
2
            trace!(attempt=%attempt_id, state=state.describe(), "State has advanced.");
438
2
            safety_counter = 0;
439
        } else {
440
4
            if !changed {
441
                // TODO: Are there more nonfatal errors that mean we should
442
                // break?
443
2
                trace!(attempt=%attempt_id, state=state.describe(), "No state advancement after load; nothing more to find in the cache.");
444
2
                break;
445
2
            }
446
2
            safety_counter += 1;
447
2
            assert!(
448
2
                safety_counter < 100,
449
                "Spent 100 iterations in the same state: this is a bug"
450
            );
451
        }
452
    }
453

            
454
2
    Ok(state)
455
2
}
456

            
457
/// Helper: Make a set of download attempts for the current directory state,
458
/// and on success feed their results into the state object.
459
///
460
/// This can launch one or more download requests, but will not launch more
461
/// than `parallelism` requests at a time.
462
#[allow(clippy::cognitive_complexity)] // TODO: Refactor?
463
#[instrument(level = "trace", skip_all)]
464
2
async fn download_attempt<R: Runtime>(
465
2
    dirmgr: &Arc<DirMgr<R>>,
466
2
    state: &mut Box<dyn DirState>,
467
2
    parallelism: usize,
468
2
    attempt_id: AttemptId,
469
2
) -> Result<()> {
470
    let missing = state.missing_docs();
471
    let fetched = fetch_multiple(Arc::clone(dirmgr), attempt_id, &missing, parallelism).await?;
472
    let mut n_errors = 0;
473
    for (client_req, dir_response) in fetched {
474
        let source = dir_response.source().cloned();
475
        let text = match String::from_utf8(dir_response.into_output_unchecked())
476
            .map_err(Error::BadUtf8FromDirectory)
477
        {
478
            Ok(t) => t,
479
            Err(e) => {
480
                if let Some(source) = source {
481
                    n_errors += 1;
482
                    note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
483
                }
484
                continue;
485
            }
486
        };
487
        match dirmgr.expand_response_text(&client_req, text) {
488
            Ok(text) => {
489
                let doc_source = DocSource::DirServer {
490
                    source: source.clone(),
491
                };
492
                let mut changed = false;
493
                let outcome = state.add_from_download(
494
                    &text,
495
                    &client_req,
496
                    doc_source,
497
                    Some(&dirmgr.store),
498
                    &mut changed,
499
                );
500

            
501
                if !changed {
502
                    debug_assert!(outcome.is_err());
503
                }
504

            
505
                if let Some(source) = source {
506
                    if let Err(e) = &outcome {
507
                        n_errors += 1;
508
                        note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
509
                    } else {
510
                        note_cache_success(dirmgr.circmgr()?.deref(), &source);
511
                    }
512
                }
513

            
514
                if let Err(e) = &outcome {
515
                    dirmgr.note_errors(attempt_id, 1);
516
                    warn_report!(e, "error while adding directory info");
517
                }
518
                propagate_fatal_errors!(outcome);
519
            }
520
            Err(e) => {
521
                warn_report!(e, "Error when expanding directory text");
522
                if let Some(source) = source {
523
                    n_errors += 1;
524
                    note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
525
                }
526
                propagate_fatal_errors!(Err(e));
527
            }
528
        }
529
    }
530
    if n_errors != 0 {
531
        dirmgr.note_errors(attempt_id, n_errors);
532
    }
533
    dirmgr.update_progress(attempt_id, state.bootstrap_progress());
534

            
535
    Ok(())
536
2
}
537

            
538
/// Download information into a DirState state machine until it is
539
/// ["complete"](Readiness::Complete), or until we hit a non-recoverable error.
540
///
541
/// Use `dirmgr` to load from the cache or to launch downloads.
542
///
543
/// Keep resetting the state as needed.
544
///
545
/// The first time that the state becomes ["usable"](Readiness::Usable), notify
546
/// the sender in `on_usable`.
547
#[allow(clippy::cognitive_complexity)] // TODO: Refactor!
548
#[instrument(level = "trace", skip_all)]
549
4
pub(crate) async fn download<R: Runtime>(
550
4
    dirmgr: Weak<DirMgr<R>>,
551
4
    state: &mut Box<dyn DirState>,
552
4
    schedule: &mut TaskSchedule<R>,
553
4
    attempt_id: AttemptId,
554
4
    on_usable: &mut Option<oneshot::Sender<()>>,
555
4
) -> Result<()> {
556
    let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
557

            
558
    trace!(attempt=%attempt_id, state=%state.describe(), "Trying to download directory material.");
559

            
560
    'next_state: loop {
561
        let retry_config = state.dl_config();
562
        let parallelism = retry_config.parallelism();
563

            
564
        // In theory this could be inside the loop below maybe?  If we
565
        // want to drop the restriction that the missing() members of a
566
        // state must never grow, then we'll need to move it inside.
567
        let mut now = {
568
            let dirmgr = upgrade_weak_ref(&dirmgr)?;
569
            let mut changed = false;
570
            trace!(attempt=%attempt_id, state=%state.describe(),"Attempting to load directory information from cache.");
571
            let load_result = load_once(&dirmgr, state, attempt_id, &mut changed);
572
            trace!(attempt=%attempt_id, state=%state.describe(), outcome=?load_result, "Load attempt complete.");
573
            if let Err(e) = &load_result {
574
                // If the load failed but the error can be blamed on a directory
575
                // cache, do so.
576
                if let Some(source) = e.responsible_cache() {
577
                    dirmgr.note_errors(attempt_id, 1);
578
                    note_cache_error(dirmgr.circmgr()?.deref(), source, e);
579
                }
580
            }
581
            propagate_fatal_errors!(load_result);
582
            dirmgr.runtime.wallclock()
583
        };
584

            
585
        // Apply any netdir changes that the state gives us.
586
        // TODO(eta): Consider deprecating state.is_ready().
587
        {
588
            let dirmgr = upgrade_weak_ref(&dirmgr)?;
589
            let mut store = dirmgr.store.lock().expect("store lock poisoned");
590
            dirmgr.apply_netdir_changes(state, &mut **store)?;
591
            dirmgr.update_progress(attempt_id, state.bootstrap_progress());
592
        }
593
        // Skip the downloads if we can...
594
        if state.can_advance() {
595
            advance(state);
596
            trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
597
            continue 'next_state;
598
        }
599
        if state.is_ready(Readiness::Complete) {
600
            trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
601
            return Ok(());
602
        }
603

            
604
        let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
605

            
606
        let mut retry = retry_config.schedule();
607
        let mut delay = None;
608

            
609
        // Make several attempts to fetch whatever we're missing,
610
        // until either we can advance, or we've got a complete
611
        // document, or we run out of tries, or we run out of time.
612
        'next_attempt: for attempt in retry_config.attempts() {
613
            // We wait at the start of this loop, on all attempts but the first.
614
            // This ensures that we always wait between attempts, but not after
615
            // the final attempt.
616
            let next_delay = retry.next_delay(&mut rand::rng());
617
            if let Some(delay) = delay.replace(next_delay) {
618
                let time_until_reset = {
619
                    reset_time
620
                        .duration_since(now)
621
                        .unwrap_or(Duration::from_secs(0))
622
                };
623
                let real_delay = delay.min(time_until_reset);
624
                debug!(attempt=%attempt_id, "Waiting {:?} for next download attempt...", real_delay);
625
                schedule.sleep(real_delay).await?;
626

            
627
                now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
628
                if now >= reset_time {
629
                    info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
630
                    reset(state);
631
                    continue 'next_state;
632
                }
633
            }
634

            
635
            info!(attempt=%attempt_id, "{}: {}", attempt + 1, state.describe());
636
            let reset_time = no_more_than_a_week_from(now, state.reset_time());
637

            
638
            now = {
639
                let dirmgr = upgrade_weak_ref(&dirmgr)?;
640
                futures::select_biased! {
641
                    outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
642
                        if let Err(e) = outcome {
643
                            warn_report!(e, attempt=%attempt_id, "Error while downloading.");
644
                            propagate_fatal_errors!(Err(e));
645
                            continue 'next_attempt;
646
                        } else {
647
                            trace!(attempt=%attempt_id, "Successfully downloaded some information.");
648
                        }
649
                    }
650
                    _ = schedule.sleep_until_wallclock(reset_time).fuse() => {
651
                        // We need to reset. This can happen if (for
652
                        // example) we're downloading the last few
653
                        // microdescriptors on a consensus that now
654
                        // we're ready to replace.
655
                        info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
656
                        reset(state);
657
                        continue 'next_state;
658
                    },
659
                };
660
                dirmgr.runtime.wallclock()
661
            };
662

            
663
            // Apply any netdir changes that the state gives us.
664
            // TODO(eta): Consider deprecating state.is_ready().
665
            {
666
                let dirmgr = upgrade_weak_ref(&dirmgr)?;
667
                let mut store = dirmgr.store.lock().expect("store lock poisoned");
668
                let outcome = dirmgr.apply_netdir_changes(state, &mut **store);
669
                dirmgr.update_progress(attempt_id, state.bootstrap_progress());
670
                propagate_fatal_errors!(outcome);
671
            }
672

            
673
            // Exit if there is nothing more to download.
674
            if state.is_ready(Readiness::Complete) {
675
                trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
676
                return Ok(());
677
            }
678

            
679
            // Report usable-ness if appropriate.
680
            if on_usable.is_some() && state.is_ready(Readiness::Usable) {
681
                trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Usable.");
682
                // Unwrap should be safe due to parent `.is_some()` check
683
                #[allow(clippy::unwrap_used)]
684
                let _ = on_usable.take().unwrap().send(());
685
            }
686

            
687
            if state.can_advance() {
688
                // We have enough info to advance to another state.
689
                advance(state);
690
                trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
691
                continue 'next_state;
692
            }
693
        }
694

            
695
        // We didn't advance the state, after all the retries.
696
        warn!(n_attempts=retry_config.n_attempts(),
697
              state=%state.describe(),
698
              "Unable to advance downloading state");
699
        return Err(Error::CantAdvanceState);
700
    }
701
4
}
702

            
703
/// Replace `state` with `state.reset()`.
704
fn reset(state: &mut Box<dyn DirState>) {
705
    let cur_state = std::mem::replace(state, Box::new(PoisonedState));
706
    *state = cur_state.reset();
707
}
708

            
709
/// Replace `state` with `state.advance()`.
710
4
fn advance(state: &mut Box<dyn DirState>) {
711
4
    let cur_state = std::mem::replace(state, Box::new(PoisonedState));
712
4
    *state = cur_state.advance();
713
4
}
714

            
715
/// Helper: Clamp `v` so that it is no more than one week from `now`.
716
///
717
/// If `v` is absent, return the time that's one week from now.
718
///
719
/// We use this to determine a reset time when no reset time is
720
/// available, or when it is too far in the future.
721
12
fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
722
12
    let one_week_later = now + Duration::new(86400 * 7, 0);
723
12
    match v {
724
6
        Some(t) => std::cmp::min(t, one_week_later),
725
6
        None => one_week_later,
726
    }
727
12
}
728

            
729
#[cfg(test)]
730
mod test {
731
    // @@ begin test lint list maintained by maint/add_warning @@
732
    #![allow(clippy::bool_assert_comparison)]
733
    #![allow(clippy::clone_on_copy)]
734
    #![allow(clippy::dbg_macro)]
735
    #![allow(clippy::mixed_attributes_style)]
736
    #![allow(clippy::print_stderr)]
737
    #![allow(clippy::print_stdout)]
738
    #![allow(clippy::single_char_pattern)]
739
    #![allow(clippy::unwrap_used)]
740
    #![allow(clippy::unchecked_time_subtraction)]
741
    #![allow(clippy::useless_vec)]
742
    #![allow(clippy::needless_pass_by_value)]
743
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
744
    use super::*;
745
    use crate::storage::DynStore;
746
    use crate::test::new_mgr;
747
    use std::sync::Mutex;
748
    use tor_dircommon::retry::DownloadSchedule;
749
    use tor_netdoc::doc::microdesc::MdDigest;
750
    use tor_rtcompat::SleepProvider;
751

            
752
    #[test]
753
    fn week() {
754
        let now = SystemTime::now();
755
        let one_day = Duration::new(86400, 0);
756

            
757
        assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
758
        assert_eq!(
759
            no_more_than_a_week_from(now, Some(now + one_day)),
760
            now + one_day
761
        );
762
        assert_eq!(
763
            no_more_than_a_week_from(now, Some(now - one_day)),
764
            now - one_day
765
        );
766
        assert_eq!(
767
            no_more_than_a_week_from(now, Some(now + 30 * one_day)),
768
            now + one_day * 7
769
        );
770
    }
771

            
772
    /// A fake implementation of DirState that just wants a fixed set
773
    /// of microdescriptors.  It doesn't care if it gets them: it just
774
    /// wants to be told that the IDs exist.
775
    #[derive(Debug, Clone)]
776
    struct DemoState {
777
        second_time_around: bool,
778
        got_items: HashMap<MdDigest, bool>,
779
    }
780

            
781
    // Constants from Lou Reed
782
    const H1: MdDigest = *b"satellite's gone up to the skies";
783
    const H2: MdDigest = *b"things like that drive me out of";
784
    const H3: MdDigest = *b"my mind i watched it for a littl";
785
    const H4: MdDigest = *b"while i like to watch things on ";
786
    const H5: MdDigest = *b"TV Satellite of love Satellite--";
787

            
788
    impl DemoState {
789
        fn new1() -> Self {
790
            DemoState {
791
                second_time_around: false,
792
                got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
793
            }
794
        }
795
        fn new2() -> Self {
796
            DemoState {
797
                second_time_around: true,
798
                got_items: vec![(H3, false), (H4, false), (H5, false)]
799
                    .into_iter()
800
                    .collect(),
801
            }
802
        }
803
        fn n_ready(&self) -> usize {
804
            self.got_items.values().filter(|x| **x).count()
805
        }
806
    }
807

            
808
    impl DirState for DemoState {
809
        fn describe(&self) -> String {
810
            format!("{:?}", &self)
811
        }
812
        fn bootstrap_progress(&self) -> crate::event::DirProgress {
813
            crate::event::DirProgress::default()
814
        }
815
        fn is_ready(&self, ready: Readiness) -> bool {
816
            match (ready, self.second_time_around) {
817
                (_, false) => false,
818
                (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
819
                (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
820
            }
821
        }
822
        fn can_advance(&self) -> bool {
823
            if self.second_time_around {
824
                false
825
            } else {
826
                self.n_ready() == self.got_items.len()
827
            }
828
        }
829
        fn missing_docs(&self) -> Vec<DocId> {
830
            self.got_items
831
                .iter()
832
                .filter_map(|(id, have)| {
833
                    if *have {
834
                        None
835
                    } else {
836
                        Some(DocId::Microdesc(*id))
837
                    }
838
                })
839
                .collect()
840
        }
841
        fn add_from_cache(
842
            &mut self,
843
            docs: HashMap<DocId, DocumentText>,
844
            changed: &mut bool,
845
        ) -> Result<()> {
846
            for id in docs.keys() {
847
                if let DocId::Microdesc(id) = id {
848
                    if self.got_items.get(id) == Some(&false) {
849
                        self.got_items.insert(*id, true);
850
                        *changed = true;
851
                    }
852
                }
853
            }
854
            Ok(())
855
        }
856
        fn add_from_download(
857
            &mut self,
858
            text: &str,
859
            _request: &ClientRequest,
860
            _source: DocSource,
861
            _storage: Option<&Mutex<DynStore>>,
862
            changed: &mut bool,
863
        ) -> Result<()> {
864
            for token in text.split_ascii_whitespace() {
865
                if let Ok(v) = hex::decode(token) {
866
                    if let Ok(id) = v.try_into() {
867
                        if self.got_items.get(&id) == Some(&false) {
868
                            self.got_items.insert(id, true);
869
                            *changed = true;
870
                        }
871
                    }
872
                }
873
            }
874
            Ok(())
875
        }
876
        fn dl_config(&self) -> DownloadSchedule {
877
            DownloadSchedule::default()
878
        }
879
        fn advance(self: Box<Self>) -> Box<dyn DirState> {
880
            if self.can_advance() {
881
                Box::new(Self::new2())
882
            } else {
883
                self
884
            }
885
        }
886
        fn reset_time(&self) -> Option<SystemTime> {
887
            None
888
        }
889
        fn reset(self: Box<Self>) -> Box<dyn DirState> {
890
            Box::new(Self::new1())
891
        }
892
    }
893

            
894
    #[test]
895
    fn all_in_cache() {
896
        // Let's try bootstrapping when everything is in the cache.
897
        tor_rtcompat::test_with_one_runtime!(|rt| async {
898
            let now = rt.wallclock();
899
            let (_tempdir, mgr) = new_mgr(rt.clone());
900
            let (mut schedule, _handle) = TaskSchedule::new(rt);
901

            
902
            {
903
                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
904
                for h in [H1, H2, H3, H4, H5] {
905
                    store.store_microdescs(&[("ignore", &h)], now).unwrap();
906
                }
907
            }
908
            let mgr = Arc::new(mgr);
909
            let attempt_id = AttemptId::next();
910

            
911
            // Try just a load.
912
            let state = Box::new(DemoState::new1());
913
            let result = super::load(&mgr, state, attempt_id).unwrap();
914
            assert!(result.is_ready(Readiness::Complete));
915

            
916
            // Try a bootstrap that could (but won't!) download.
917
            let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
918

            
919
            let mut on_usable = None;
920
            super::download(
921
                Arc::downgrade(&mgr),
922
                &mut state,
923
                &mut schedule,
924
                attempt_id,
925
                &mut on_usable,
926
            )
927
            .await
928
            .unwrap();
929
            assert!(state.is_ready(Readiness::Complete));
930
        });
931
    }
932

            
933
    #[test]
934
    fn partly_in_cache() {
935
        // Let's try bootstrapping with all of phase1 and part of
936
        // phase 2 in cache.
937
        tor_rtcompat::test_with_one_runtime!(|rt| async {
938
            let now = rt.wallclock();
939
            let (_tempdir, mgr) = new_mgr(rt.clone());
940
            let (mut schedule, _handle) = TaskSchedule::new(rt);
941

            
942
            {
943
                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
944
                for h in [H1, H2, H3] {
945
                    store.store_microdescs(&[("ignore", &h)], now).unwrap();
946
                }
947
            }
948
            {
949
                let mut resp = CANNED_RESPONSE.lock().unwrap();
950
                // H4 and H5.
951
                *resp = vec![
952
                    "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
953
                     545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
954
                        .to_owned(),
955
                ];
956
            }
957
            let mgr = Arc::new(mgr);
958
            let mut on_usable = None;
959
            let attempt_id = AttemptId::next();
960

            
961
            let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
962
            super::download(
963
                Arc::downgrade(&mgr),
964
                &mut state,
965
                &mut schedule,
966
                attempt_id,
967
                &mut on_usable,
968
            )
969
            .await
970
            .unwrap();
971
            assert!(state.is_ready(Readiness::Complete));
972
        });
973
    }
974
}