1
//! `BridgeDescMgr` - downloads and caches bridges' router descriptors
2

            
3
use std::borrow::Cow;
4
use std::cmp::Ordering;
5
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
6
use std::fmt::{self, Debug, Display};
7
use std::num::NonZeroU8;
8
use std::panic::AssertUnwindSafe;
9
use std::sync::{Arc, Mutex, MutexGuard, Weak};
10

            
11
use async_trait::async_trait;
12
use derive_more::{Deref, DerefMut};
13
use educe::Educe;
14
use futures::FutureExt;
15
use futures::future;
16
use futures::select;
17
use futures::stream::{BoxStream, StreamExt};
18
use futures::task::SpawnError;
19
use tracing::{debug, info, trace};
20

            
21
use safelog::sensitive;
22
use tor_basic_utils::retry::RetryDelay;
23
use tor_checkable::{SelfSigned, Timebound};
24
use tor_circmgr::CircMgr;
25
use tor_error::{AbsRetryTime, HasRetryTime, RetryTime};
26
use tor_error::{ErrorKind, HasKind, error_report, internal};
27
use tor_guardmgr::bridge::{BridgeConfig, BridgeDesc};
28
use tor_guardmgr::bridge::{BridgeDescError, BridgeDescEvent, BridgeDescList, BridgeDescProvider};
29
use tor_netdoc::doc::routerdesc::RouterDesc;
30
use tor_rtcompat::{Runtime, SpawnExt as _};
31
use web_time_compat::{Duration, Instant, SystemTime};
32

            
33
use crate::event::FlagPublisher;
34
use crate::storage::CachedBridgeDescriptor;
35
use crate::{DirMgrStore, DynStore};
36

            
37
#[cfg(test)]
38
mod bdtest;
39

            
40
/// The key we use in all our data structures
41
///
42
/// This type saves typing and would make it easier to change the bridge descriptor manager
43
/// to take and handle another way of identifying the bridges it is working with.
44
type BridgeKey = BridgeConfig;
45

            
46
/// Active vs dormant state, as far as the bridge descriptor manager is concerned
47
///
48
/// This is usually derived in higher layers from `arti_client::DormantMode`,
49
/// whether `TorClient::bootstrap()` has been called, etc.
50
#[non_exhaustive]
51
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
52
// TODO: These proliferating `Dormancy` enums should be centralized and unified with `TaskHandle`
53
//     https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/845#note_2853190
54
pub enum Dormancy {
55
    /// Dormant (inactive)
56
    ///
57
    /// Bridge descriptor downloads, or refreshes, will not be started.
58
    ///
59
    /// In-progress downloads will be stopped if possible,
60
    /// but they may continue until they complete (or fail).
61
    // TODO async task cancellation: actually cancel these in this case
62
    ///
63
    /// So a dormant BridgeDescMgr may still continue to
64
    /// change the return value from [`bridges()`](BridgeDescProvider::bridges)
65
    /// and continue to report [`BridgeDescEvent`]s.
66
    ///
67
    /// When the BridgeDescMgr is dormant,
68
    /// `bridges()` may return stale descriptors
69
    /// (that is, descriptors which ought to have been refetched and may no longer be valid),
70
    /// or stale errors
71
    /// (that is, errors which occurred some time ago,
72
    /// and which would normally have been retried by now).
73
    Dormant,
74

            
75
    /// Active
76
    ///
77
    /// Bridge descriptors will be downloaded as requested.
78
    ///
79
    /// When a bridge descriptor manager has been `Dormant`,
80
    /// it may continue to provide stale data (as described)
81
    /// for a while after it is made `Active`,
82
    /// until the required refreshes and retries have taken place (or failed).
83
    Active,
84
}
85

            
86
/// **Downloader and cache for bridges' router descriptors**
87
///
88
/// This is a handle which is cheap to clone and has internal mutability.
89
#[derive(Clone)]
90
pub struct BridgeDescMgr<R: Runtime, M = ()>
91
where
92
    M: Mockable<R>,
93
{
94
    /// The actual manager
95
    ///
96
    /// We have the `Arc` in here, rather than in our callers, because this
97
    /// makes the API nicer for them, and also because some of our tasks
98
    /// want a handle they can use to relock and modify the state.
99
    mgr: Arc<Manager<R, M>>,
100
}
101

            
102
/// Configuration for the `BridgeDescMgr`
103
///
104
/// Currently, the only way to make this is via its `Default` impl.
105
// TODO: there should be some way to override the defaults.  See #629 for considerations.
106
#[derive(Debug, Clone)]
107
pub struct BridgeDescDownloadConfig {
108
    /// How many bridge descriptor downloads to attempt in parallel?
109
    parallelism: NonZeroU8,
110

            
111
    /// Default/initial time to retry a failure to download a descriptor
112
    ///
113
    /// (This has the semantics of an initial delay for [`RetryDelay`],
114
    /// and is used unless there is more specific retry information for the particular failure.)
115
    retry: Duration,
116

            
117
    /// When a downloaded descriptor is going to expire, how soon in advance to refetch it?
118
    prefetch: Duration,
119

            
120
    /// Minimum interval between successive refetches of the descriptor for the same bridge
121
    ///
122
    /// This limits the download activity which can be caused by an errant bridge.
123
    ///
124
    /// If the descriptor's validity information is shorter than this, we will use
125
    /// it after it has expired (rather than treating the bridge as broken).
126
    min_refetch: Duration,
127

            
128
    /// Maximum interval between successive refetches of the descriptor for the same bridge
129
    ///
130
    /// This sets an upper bound on how old a descriptor we are willing to use.
131
    /// When this time expires, a refetch attempt will be started even if the
132
    /// descriptor is not going to expire soon.
133
    //
134
    // TODO: When this is configurable, we need to make sure we reject
135
    // configurations with max_refresh < min_refresh, or we may panic.
136
    max_refetch: Duration,
137
}
138

            
139
impl Default for BridgeDescDownloadConfig {
140
20
    fn default() -> Self {
141
20
        let secs = Duration::from_secs;
142
20
        BridgeDescDownloadConfig {
143
20
            parallelism: 4.try_into().expect("parallelism is zero"),
144
20
            retry: secs(30),
145
20
            prefetch: secs(1000),
146
20
            min_refetch: secs(3600),
147
20
            max_refetch: secs(3600 * 3), // matches C Tor behaviour
148
20
        }
149
20
    }
150
}
151

            
152
/// Mockable internal methods for within the `BridgeDescMgr`
153
///
154
/// Implemented for `()`, meaning "do not use mocks: use the real versions of everything".
155
///
156
/// This (`()`) is the default for the type parameter in
157
/// [`BridgeDescMgr`],
158
/// and it is the only publicly available implementation,
159
/// since this trait is sealed.
160
pub trait Mockable<R>: mockable::MockableAPI<R> {}
161
impl<R: Runtime> Mockable<R> for () {}
162

            
163
/// Private module which seals [`Mockable`]
164
/// by containing [`MockableAPI`](mockable::MockableAPI)
165
mod mockable {
166
    use super::*;
167

            
168
    /// Defines the actual mockable APIs
169
    ///
170
    /// Not nameable (and therefore not implementable)
171
    /// outside the `bridgedesc` module,
172
    #[async_trait]
173
    pub trait MockableAPI<R>: Clone + Send + Sync + 'static {
174
        /// Circuit manager
175
        type CircMgr: Send + Sync + 'static;
176

            
177
        /// Download this bridge's descriptor, and return it as a string
178
        ///
179
        /// Runs in a task.
180
        /// Called by `Manager::download_descriptor`, which handles parsing and validation.
181
        ///
182
        /// If `if_modified_since` is `Some`,
183
        /// should tolerate an HTTP 304 Not Modified and return `None` in that case.
184
        /// If `if_modified_since` is `None`, returning `Ok(None,)` is forbidden.
185
        async fn download(
186
            self,
187
            runtime: &R,
188
            circmgr: &Self::CircMgr,
189
            bridge: &BridgeConfig,
190
            if_modified_since: Option<SystemTime>,
191
        ) -> Result<Option<String>, Error>;
192
    }
193
}
194
#[async_trait]
195
impl<R: Runtime> mockable::MockableAPI<R> for () {
196
    type CircMgr = Arc<CircMgr<R>>;
197

            
198
    /// Actual code for downloading a descriptor document
199
    async fn download(
200
        self,
201
        runtime: &R,
202
        circmgr: &Self::CircMgr,
203
        bridge: &BridgeConfig,
204
        _if_modified_since: Option<SystemTime>,
205
    ) -> Result<Option<String>, Error> {
206
        // TODO actually support _if_modified_since
207
        let tunnel = circmgr.get_or_launch_dir_specific(bridge).await?;
208
        let mut stream = tunnel
209
            .begin_dir_stream()
210
            .await
211
            .map_err(Error::StreamFailed)?;
212
        let request = tor_dirclient::request::RoutersOwnDescRequest::new();
213
        let response = tor_dirclient::send_request(runtime, &request, &mut stream, None)
214
            .await
215
            .map_err(|dce| match dce {
216
                tor_dirclient::Error::RequestFailed(re) => Error::RequestFailed(re),
217
                _ => internal!(
218
                    "tor_dirclient::send_request gave non-RequestFailed {:?}",
219
                    dce
220
                )
221
                .into(),
222
            })?;
223
        let output = response.into_output_string()?;
224
        Ok(Some(output))
225
    }
226
}
227

            
228
/// The actual manager.
229
struct Manager<R: Runtime, M: Mockable<R>> {
230
    /// The mutable state
231
    state: Mutex<State>,
232

            
233
    /// Runtime, used for tasks and sleeping
234
    runtime: R,
235

            
236
    /// Circuit manager, used for creating circuits
237
    circmgr: M::CircMgr,
238

            
239
    /// Persistent state store
240
    store: Arc<Mutex<DynStore>>,
241

            
242
    /// Mock for testing, usually `()`
243
    mockable: M,
244
}
245

            
246
/// State: our downloaded descriptors (cache), and records of what we're doing
247
///
248
/// Various functions (both tasks and public entrypoints),
249
/// which generally start with a `Manager`,
250
/// lock the mutex and modify this.
251
///
252
/// Generally, the flow is:
253
///
254
///  * A public entrypoint, or task, obtains a [`StateGuard`].
255
///    It modifies the state to represent the callers' new requirements,
256
///    or things it has done, by updating the state,
257
///    preserving the invariants but disturbing the "liveness" (see below).
258
///
259
///  * [`StateGuard::drop`] calls [`State::process`].
260
///    This restores the liveness properties.
261
///
262
/// ### Possible states of a bridge:
263
///
264
/// A bridge can be in one of the following states,
265
/// represented by its presence in these particular data structures inside `State`:
266
///
267
///  * `running`/`queued`: newly added, no outcome yet.
268
///  * `current` + `running`/`queued`: we are fetching (or going to)
269
///  * `current = OK` + `refetch_schedule`: fetched OK, will refetch before expiry
270
///  * `current = Err` + `retry_schedule`: failed, will retry at some point
271
///
272
/// ### Invariants:
273
///
274
/// Can be disrupted in the middle of a principal function,
275
/// but should be restored on return.
276
///
277
/// * **Tracked**:
278
///   Each bridge appears at most once in
279
///   `running`, `queued`, `refetch_schedule` and `retry_schedule`.
280
///   We call such a bridge Tracked.
281
///
282
/// * **Current**
283
///   Every bridge in `current` is Tracked.
284
///   (But not every Tracked bridge is necessarily in `current`, yet.)
285
///
286
/// * **Schedules**
287
///   Every bridge in `refetch_schedule` or `retry_schedule` is also in `current`.
288
///
289
/// * **Input**:
290
///   Exactly each bridge that was passed to
291
///   the last call to [`set_bridges()`](BridgeDescMgr::set_bridges) is Tracked.
292
///   (If we encountered spawn failures, we treat this as trying to shut down,
293
///   so we cease attempts to get bridges, and discard the relevant state, violating this.)
294
///
295
/// * **Limit**:
296
///   `running` is capped at the effective parallelism: zero if we are dormant,
297
///   the configured parallelism otherwise.
298
///
299
/// ### Liveness properties:
300
///
301
/// These can be disrupted by any function which holds a [`StateGuard`].
302
/// Will be restored by [`process()`](State::process),
303
/// which is called when `StateGuard` is dropped.
304
///
305
/// Functions that take a `StateGuard` may disturb these invariants
306
/// and rely on someone else to restore them.
307
///
308
/// * **Running**:
309
///   If `queued` is nonempty, `running` is full.
310
///
311
/// * **Timeout**:
312
///   `earliest_timeout` is the earliest timeout in
313
///   either `retry_schedule` or `refetch_schedule`.
314
///   (Disturbances of this property which occur due to system time warps
315
///   are not necessarily detected and remedied in a timely way,
316
///   but will be remedied no later than after `max_refetch`.)
317
struct State {
318
    /// Our configuration
319
    config: Arc<BridgeDescDownloadConfig>,
320

            
321
    /// People who will be told when `current` changes.
322
    subscribers: FlagPublisher<BridgeDescEvent>,
323

            
324
    /// Our current idea of our output, which we give out handles onto.
325
    current: Arc<BridgeDescList>,
326

            
327
    /// Bridges whose descriptors we are currently downloading.
328
    running: HashMap<BridgeKey, RunningInfo>,
329

            
330
    /// Bridges which we want to download,
331
    /// but we're waiting for `running` to be less than `effective_parallelism()`.
332
    queued: VecDeque<QueuedEntry>,
333

            
334
    /// Are we dormant?
335
    dormancy: Dormancy,
336

            
337
    /// Bridges that we have a descriptor for,
338
    /// and when they should be refetched due to validity expiry.
339
    ///
340
    /// This is indexed by `SystemTime` because that helps avoids undesirable behaviors
341
    /// when the system clock changes.
342
    refetch_schedule: BinaryHeap<RefetchEntry<SystemTime, ()>>,
343

            
344
    /// Bridges that failed earlier, and when they should be retried.
345
    retry_schedule: BinaryHeap<RefetchEntry<Instant, RetryDelay>>,
346

            
347
    /// Earliest time from either `retry_schedule` or `refetch_schedule`
348
    ///
349
    /// `None` means "wait indefinitely".
350
    earliest_timeout: postage::watch::Sender<Option<Instant>>,
351
}
352

            
353
impl Debug for State {
354
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
355
        /// Helper to format one bridge entry somewhere
356
        fn fmt_bridge(
357
            f: &mut fmt::Formatter,
358
            b: &BridgeConfig,
359
            info: &(dyn Display + '_),
360
        ) -> fmt::Result {
361
            let info = info.to_string(); // fmt::Formatter doesn't enforce precision, so do this
362
            writeln!(f, "    {:80.80} | {}", info, b)
363
        }
364

            
365
        /// Helper to format one of the schedules
366
        fn fmt_schedule<TT: Ord + Copy + Debug, RD>(
367
            f: &mut fmt::Formatter,
368
            summary: &str,
369
            name: &str,
370
            schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
371
        ) -> fmt::Result {
372
            writeln!(f, "  {}:", name)?;
373
            for b in schedule {
374
                fmt_bridge(f, &b.bridge, &format_args!("{} {:?}", summary, &b.when))?;
375
            }
376
            Ok(())
377
        }
378

            
379
        // We are going to have to go multi-line because of the bridge lines,
380
        // so do completely bespoke formatting rather than `std::fmt::DebugStruct`
381
        // or a derive.
382
        writeln!(f, "State {{")?;
383
        // We'd like to print earliest_timeout but watch::Sender::borrow takes &mut
384
        writeln!(f, "  earliest_timeout: ???, ..,")?;
385
        writeln!(f, "  current:")?;
386
        for (b, v) in &*self.current {
387
            fmt_bridge(
388
                f,
389
                b,
390
                &match v {
391
                    Err(e) => Cow::from(format!("C Err {}", e)),
392
                    Ok(_) => "C Ok".into(),
393
                },
394
            )?;
395
        }
396
        writeln!(f, "  running:")?;
397
        for b in self.running.keys() {
398
            fmt_bridge(f, b, &"R")?;
399
        }
400
        writeln!(f, "  queued:")?;
401
        for qe in &self.queued {
402
            fmt_bridge(f, &qe.bridge, &"Q")?;
403
        }
404
        fmt_schedule(f, "FS", "refetch_schedule", &self.refetch_schedule)?;
405
        fmt_schedule(f, "TS", "retry_schedule", &self.retry_schedule)?;
406
        write!(f, "}}")?;
407

            
408
        Ok(())
409
    }
410
}
411

            
412
/// Value of the entry in `running`
413
#[derive(Debug)]
414
struct RunningInfo {
415
    /// For cancelling downloads no longer wanted
416
    join: JoinHandle,
417

            
418
    /// If this previously failed, the persistent retry delay.
419
    retry_delay: Option<RetryDelay>,
420
}
421

            
422
/// Entry in `queued`
423
#[derive(Debug)]
424
struct QueuedEntry {
425
    /// The bridge to fetch
426
    bridge: BridgeKey,
427

            
428
    /// If this previously failed, the persistent retry delay.
429
    retry_delay: Option<RetryDelay>,
430
}
431

            
432
/// Entry in one of the `*_schedule`s
433
///
434
/// Implements `Ord` and `Eq` but *only looking at the refetch time*.
435
/// So don't deduplicate by `[Partial]Eq`, or use as a key in a map.
436
#[derive(Debug)]
437
struct RefetchEntry<TT, RD> {
438
    /// When should we requeued this bridge for fetching
439
    ///
440
    /// Either [`Instant`] (in `retry_schedule`) or [`SystemTime`] (in `refetch_schedule`).
441
    when: TT,
442

            
443
    /// The bridge to refetch
444
    bridge: BridgeKey,
445

            
446
    /// Retry delay
447
    ///
448
    /// `RetryDelay` if we previously failed (ie, if this is a retry entry);
449
    /// otherwise `()`.
450
    retry_delay: RD,
451
}
452

            
453
impl<TT: Ord, RD> Ord for RefetchEntry<TT, RD> {
454
122
    fn cmp(&self, other: &Self) -> Ordering {
455
122
        self.when.cmp(&other.when).reverse()
456
        // We don't care about the ordering of BridgeConfig or retry_delay.
457
        // Different BridgeConfig with the same fetch time will be fetched in "some order".
458
122
    }
459
}
460

            
461
impl<TT: Ord, RD> PartialOrd for RefetchEntry<TT, RD> {
462
122
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
463
122
        Some(self.cmp(other))
464
122
    }
465
}
466

            
467
impl<TT: Ord, RD> PartialEq for RefetchEntry<TT, RD> {
468
    fn eq(&self, other: &Self) -> bool {
469
        self.cmp(other) == Ordering::Equal
470
    }
471
}
472

            
473
impl<TT: Ord, RD> Eq for RefetchEntry<TT, RD> {}
474

            
475
/// Dummy task join handle
476
///
477
/// We would like to be able to cancel now-redundant downloads
478
/// using something like `tokio::task::JoinHandle::abort()`.
479
/// tor-rtcompat doesn't support that so we stub it for now.
480
///
481
/// Providing this stub means the place where the cancellation needs to take place
482
/// already has the appropriate call to our [`JoinHandle::abort`].
483
#[derive(Debug)]
484
struct JoinHandle;
485

            
486
impl JoinHandle {
487
    /// Would abort this async task, if we could do that.
488
    fn abort(&self) {}
489
}
490

            
491
impl<R: Runtime> BridgeDescMgr<R> {
492
    /// Create a new `BridgeDescMgr`
493
    ///
494
    /// This is the public constructor.
495
    //
496
    // TODO: That this constructor requires a DirMgr is rather odd.
497
    // In principle there is little reason why you need a DirMgr to make a BridgeDescMgr.
498
    // However, BridgeDescMgr needs a Store, and currently that is a private trait, and the
499
    // implementation is constructible only from the dirmgr's config.  This should probably be
500
    // tidied up somehow, at some point, perhaps by exposing `Store` and its configuration.
501
    pub fn new(
502
        config: &BridgeDescDownloadConfig,
503
        runtime: R,
504
        store: DirMgrStore<R>,
505
        circmgr: Arc<tor_circmgr::CircMgr<R>>,
506
        dormancy: Dormancy,
507
    ) -> Result<Self, StartupError> {
508
        Self::new_internal(runtime, circmgr, store.store, config, dormancy, ())
509
    }
510
}
511

            
512
/// If download was successful, what we obtained
513
///
514
/// Generated by `process_document`, from a downloaded (or cached) textual descriptor.
515
#[derive(Debug)]
516
struct Downloaded {
517
    /// The bridge descriptor, fully parsed and verified
518
    desc: BridgeDesc,
519

            
520
    /// When we should start a refresh for this descriptor
521
    ///
522
    /// This is derived from the expiry time,
523
    /// and clamped according to limits in the configuration).
524
    refetch: SystemTime,
525
}
526

            
527
impl<R: Runtime, M: Mockable<R>> BridgeDescMgr<R, M> {
528
    /// Actual constructor, which takes a mockable
529
    //
530
    // Allow passing `runtime` by value, which is usual API for this kind of setup function.
531
    #[allow(clippy::needless_pass_by_value)]
532
16
    fn new_internal(
533
16
        runtime: R,
534
16
        circmgr: M::CircMgr,
535
16
        store: Arc<Mutex<DynStore>>,
536
16
        config: &BridgeDescDownloadConfig,
537
16
        dormancy: Dormancy,
538
16
        mockable: M,
539
16
    ) -> Result<Self, StartupError> {
540
        /// Convenience alias
541
96
        fn default<T: Default>() -> T {
542
96
            Default::default()
543
96
        }
544

            
545
16
        let config = config.clone().into();
546
16
        let (earliest_timeout, timeout_update) = postage::watch::channel();
547

            
548
16
        let state = Mutex::new(State {
549
16
            config,
550
16
            subscribers: default(),
551
16
            current: default(),
552
16
            running: default(),
553
16
            queued: default(),
554
16
            dormancy,
555
16
            retry_schedule: default(),
556
16
            refetch_schedule: default(),
557
16
            earliest_timeout,
558
16
        });
559
16
        let mgr = Arc::new(Manager {
560
16
            state,
561
16
            runtime: runtime.clone(),
562
16
            circmgr,
563
16
            store,
564
16
            mockable,
565
16
        });
566

            
567
16
        runtime
568
16
            .spawn(timeout_task(
569
16
                runtime.clone(),
570
16
                Arc::downgrade(&mgr),
571
16
                timeout_update,
572
            ))
573
16
            .map_err(|cause| StartupError::Spawn {
574
                spawning: "timeout task",
575
                cause: cause.into(),
576
            })?;
577

            
578
16
        Ok(BridgeDescMgr { mgr })
579
16
    }
580

            
581
    /// Consistency check convenience wrapper
582
    #[cfg(test)]
583
92
    fn check_consistency<'i, I>(&self, input_bridges: Option<I>)
584
92
    where
585
92
        I: IntoIterator<Item = &'i BridgeKey>,
586
    {
587
92
        self.mgr
588
92
            .lock_only()
589
92
            .check_consistency(&self.mgr.runtime, input_bridges);
590
92
    }
591

            
592
    /// Set whether this `BridgeDescMgr` is active
593
    // TODO this should instead be handled by a central mechanism; see TODO on Dormancy
594
8
    pub fn set_dormancy(&self, dormancy: Dormancy) {
595
8
        self.mgr.lock_then_process().dormancy = dormancy;
596
8
    }
597
}
598

            
599
impl<R: Runtime, M: Mockable<R>> BridgeDescProvider for BridgeDescMgr<R, M> {
600
124
    fn bridges(&self) -> Arc<BridgeDescList> {
601
124
        self.mgr.lock_only().current.clone()
602
124
    }
603

            
604
12
    fn events(&self) -> BoxStream<'static, BridgeDescEvent> {
605
12
        let stream = self.mgr.lock_only().subscribers.subscribe();
606
12
        Box::pin(stream) as _
607
12
    }
608

            
609
44
    fn set_bridges(&self, new_bridges: &[BridgeConfig]) {
610
        /// Helper: Called for each bridge that is currently Tracked.
611
        ///
612
        /// Checks if `new_bridges` has `bridge`.  If so, removes it from `new_bridges`,
613
        /// and returns `true`, indicating that this bridge should be kept.
614
        ///
615
        /// If not, returns `false`, indicating that this bridge should be removed,
616
        /// and logs a message.
617
92
        fn note_found_keep_p(
618
92
            new_bridges: &mut HashSet<BridgeKey>,
619
92
            bridge: &BridgeKey,
620
92
            was_state: &str,
621
92
        ) -> bool {
622
92
            let keep = new_bridges.remove(bridge);
623
92
            if !keep {
624
16
                debug!(r#"forgetting bridge ({}) "{}""#, was_state, bridge);
625
76
            }
626
92
            keep
627
92
        }
628

            
629
        /// Helper: filters `*_schedule` so that it contains only things in `new_bridges`,
630
        /// removing them as we go.
631
88
        fn filter_schedule<TT: Ord + Copy, RD>(
632
88
            new_bridges: &mut HashSet<BridgeKey>,
633
88
            schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
634
88
            was_state: &str,
635
88
        ) {
636
92
            schedule.retain(|b| note_found_keep_p(new_bridges, &b.bridge, was_state));
637
88
        }
638

            
639
44
        let mut state = self.mgr.lock_then_process();
640
44
        let state = &mut **state;
641

            
642
        // We go through our own data structures, comparing them with `new_bridges`.
643
        // Entries in our own structures that aren't in `new_bridges` are removed.
644
        // Entries that *are* are removed from `new_bridges`.
645
        // Eventually `new_bridges` is just the list of new bridges to *add*.
646
44
        let mut new_bridges: HashSet<_> = new_bridges.iter().cloned().collect();
647

            
648
        // Is there anything in `current` that ought to be deleted?
649
74
        if state.current.keys().any(|b| !new_bridges.contains(b)) {
650
            // Found a bridge In `current` but not `new`
651
            // We need to remove it (and any others like it) from `current`.
652
            //
653
            // Disturbs the invariant *Schedules*:
654
            // After this maybe the schedules have entries they shouldn't.
655
16
            let current: BridgeDescList = state
656
16
                .current
657
16
                .iter()
658
60
                .filter(|(b, _)| new_bridges.contains(&**b))
659
44
                .map(|(b, v)| (b.clone(), v.clone()))
660
16
                .collect();
661
16
            state.set_current_and_notify(current);
662
28
        } else {
663
28
            // Nothing is being removed, so we can keep `current`.
664
28
        }
665
        // Bridges being newly requested will be added to `current`
666
        // later, after they have been fetched.
667

            
668
        // Is there anything in running we should abort?
669
44
        state.running.retain(|b, ri| {
670
            let keep = note_found_keep_p(&mut new_bridges, b, "was downloading");
671
            if !keep {
672
                ri.join.abort();
673
            }
674
            keep
675
        });
676

            
677
        // Is there anything in queued we should forget about?
678
44
        state
679
44
            .queued
680
44
            .retain(|qe| note_found_keep_p(&mut new_bridges, &qe.bridge, "was queued"));
681

            
682
        // Restore the invariant *Schedules*, that the schedules contain only things in current,
683
        // by removing the same things from the schedules that we earlier removed from current.
684
44
        filter_schedule(
685
44
            &mut new_bridges,
686
44
            &mut state.retry_schedule,
687
44
            "previously failed",
688
        );
689
44
        filter_schedule(
690
44
            &mut new_bridges,
691
44
            &mut state.refetch_schedule,
692
44
            "previously downloaded",
693
        );
694

            
695
        // OK now we have the list of bridges to add (if any).
696
44
        state.queued.extend(new_bridges.into_iter().map(|bridge| {
697
44
            debug!(r#" added bridge, queueing for download "{}""#, &bridge);
698
44
            QueuedEntry {
699
44
                bridge,
700
44
                retry_delay: None,
701
44
            }
702
44
        }));
703

            
704
        // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
705
        // to make further progress and restore the liveness properties.
706
44
    }
707
}
708

            
709
impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
710
    /// Obtain a lock on state, for functions that want to disrupt liveness properties
711
    ///
712
    /// When `StateGuard` is dropped, the liveness properties will be restored
713
    /// by making whatever progress is required.
714
    ///
715
    /// See [`State`].
716
166
    fn lock_then_process<'s>(self: &'s Arc<Self>) -> StateGuard<'s, R, M> {
717
166
        StateGuard {
718
166
            state: self.lock_only(),
719
166
            mgr: self,
720
166
        }
721
166
    }
722

            
723
    /// Obtains the lock on state.
724
    ///
725
    /// Caller ought not to modify state
726
    /// so as to invalidate invariants or liveness properties.
727
    /// Callers which are part of the algorithms in this crate
728
    /// ought to consider [`lock_then_process`](Manager::lock_then_process) instead.
729
414
    fn lock_only(&self) -> MutexGuard<State> {
730
414
        self.state.lock().expect("bridge desc manager poisoned")
731
414
    }
732
}
733

            
734
/// Writeable reference to [`State`], entitling the holder to disrupt liveness properties.
735
///
736
/// The holder must still maintain the invariants.
737
///
738
/// Obtained from [`Manager::lock_then_process`].  See [`State`].
739
#[derive(Educe, Deref, DerefMut)]
740
#[educe(Debug)]
741
struct StateGuard<'s, R: Runtime, M: Mockable<R>> {
742
    /// Reference to the mutable state
743
    #[deref]
744
    #[deref_mut]
745
    state: MutexGuard<'s, State>,
746

            
747
    /// Reference to the outer container
748
    ///
749
    /// Allows the holder to obtain a `'static` (owned) handle `Arc<Manager>`,
750
    /// for use by spawned tasks.
751
    #[educe(Debug(ignore))]
752
    mgr: &'s Arc<Manager<R, M>>,
753
}
754

            
755
impl<R: Runtime, M: Mockable<R>> Drop for StateGuard<'_, R, M> {
756
166
    fn drop(&mut self) {
757
166
        self.state.process(self.mgr);
758
166
    }
759
}
760

            
761
impl State {
762
    /// Ensure progress is made, by restoring all the liveness invariants
763
    ///
764
    /// This includes launching circuits as needed.
765
166
    fn process<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
766
        // Restore liveness property *Running*
767
166
        self.consider_launching(mgr);
768

            
769
166
        let now_wall = mgr.runtime.wallclock();
770

            
771
        // Mitigate clock warping
772
        //
773
        // If the earliest `SystemTime` is more than `max_refetch` away,
774
        // the clock must have warped.  If that happens we clamp
775
        // them all to `max_refetch`.
776
        //
777
        // (This is not perfect but will mitigate the worst effects by ensuring
778
        // that we do *something* at least every `max_refetch`, in the worst case,
779
        // other than just getting completely stuck.)
780
166
        let max_refetch_wall = now_wall + self.config.max_refetch;
781
166
        if self
782
166
            .refetch_schedule
783
166
            .peek()
784
166
            .map(|re| re.when > max_refetch_wall)
785
166
            == Some(true)
786
        {
787
            info!("bridge descriptor manager: clock warped, clamping refetch times");
788
            self.refetch_schedule = self
789
                .refetch_schedule
790
                .drain()
791
                .map(|mut re| {
792
                    re.when = max_refetch_wall;
793
                    re
794
                })
795
                .collect();
796
166
        }
797

            
798
        // Restore liveness property *Timeout**
799
        // postage::watch will tell up the timeout task about the new wake-up time.
800
166
        let new_earliest_timeout = [
801
            // First retry.  These are std Instant.
802
166
            self.retry_schedule.peek().map(|re| re.when),
803
            // First refetch.  These are SystemTime, so we must convert them.
804
166
            self.refetch_schedule.peek().map(|re| {
805
                // If duration_since gives Err, that means when is before now,
806
                // ie we should not be waiting: the wait duration should be 0.
807
106
                let wait = re.when.duration_since(now_wall).unwrap_or_default();
808

            
809
106
                mgr.runtime.now() + wait
810
106
            }),
811
        ]
812
166
        .into_iter()
813
166
        .flatten()
814
166
        .min();
815
166
        *self.earliest_timeout.borrow_mut() = new_earliest_timeout;
816
166
    }
817

            
818
    /// Launch download attempts if we can
819
    ///
820
    /// Specifically: if we have things in `queued`, and `running` is shorter than
821
    /// `effective_parallelism()`, we launch task(s) to attempt download(s).
822
    ///
823
    /// Restores liveness invariant *Running*.
824
    ///
825
    /// Idempotent.  Forms part of `process`.
826
    #[allow(clippy::blocks_in_conditions)]
827
166
    fn consider_launching<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
828
166
        let mut to_remove = vec![];
829

            
830
258
        while self.running.len() < self.effective_parallelism() {
831
            let QueuedEntry {
832
92
                bridge,
833
92
                retry_delay,
834
210
            } = match self.queued.pop_front() {
835
92
                Some(qe) => qe,
836
118
                None => break,
837
            };
838
92
            match mgr
839
92
                .runtime
840
92
                .spawn({
841
92
                    let config = self.config.clone();
842
92
                    let bridge = bridge.clone();
843
92
                    let inner = mgr.clone();
844
92
                    let mockable = inner.mockable.clone();
845

            
846
                    // The task which actually downloads a descriptor.
847
92
                    async move {
848
92
                        let got =
849
92
                            AssertUnwindSafe(inner.download_descriptor(mockable, &bridge, &config))
850
92
                                .catch_unwind()
851
92
                                .await
852
92
                                .unwrap_or_else(|_| {
853
                                    Err(internal!("download descriptor task panicked!").into())
854
                                });
855
92
                        match &got {
856
28
                            Ok(_) => debug!(r#"download succeeded for "{}""#, bridge),
857
64
                            Err(err) => debug!(r#"download failed for "{}": {}"#, bridge, err),
858
                        };
859
92
                        let mut state = inner.lock_then_process();
860
92
                        state.record_download_outcome(bridge, got);
861
                        // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
862
                        // to make further progress and restore the liveness properties.
863
92
                    }
864
                })
865
92
                .map(|()| JoinHandle)
866
            {
867
92
                Ok(join) => {
868
92
                    self.running
869
92
                        .insert(bridge, RunningInfo { join, retry_delay });
870
92
                }
871
                Err(_) => {
872
                    // Spawn failed.
873
                    //
874
                    // We are going to forget about this bridge.
875
                    // And we're going to do that without notifying anyone.
876
                    // We *do* want to remove it from `current` because simply forgetting
877
                    // about a refetch could leave expired data there.
878
                    // We amortize this, so we don't do a lot of O(n^2) work on shutdown.
879
                    to_remove.push(bridge);
880
                }
881
            }
882
        }
883

            
884
166
        if !to_remove.is_empty() {
885
            self.modify_current(|current| {
886
                for bridge in to_remove {
887
                    current.remove(&bridge);
888
                }
889
            });
890
166
        }
891
166
    }
892

            
893
    /// Modify `current` and notify subscribers
894
    ///
895
    /// Helper function which modifies only `current`, not any of the rest of the state.
896
    /// it is the caller's responsibility to ensure that the invariants are upheld.
897
    ///
898
    /// The implementation actually involves cloning `current`,
899
    /// so it is best to amortize calls to this function.
900
92
    fn modify_current<T, F: FnOnce(&mut BridgeDescList) -> T>(&mut self, f: F) -> T {
901
92
        let mut current = (*self.current).clone();
902
92
        let r = f(&mut current);
903
92
        self.set_current_and_notify(current);
904
92
        r
905
92
    }
906

            
907
    /// Set `current` to a value and notify
908
    ///
909
    /// Helper function which modifies only `current`, not any of the rest of the state.
910
    /// it is the caller's responsibility to ensure that the invariants are upheld.
911
108
    fn set_current_and_notify<BDL: Into<Arc<BridgeDescList>>>(&mut self, new: BDL) {
912
108
        self.current = new.into();
913
108
        self.subscribers.publish(BridgeDescEvent::SomethingChanged);
914
108
    }
915

            
916
    /// Obtain the currently-desired level of parallelism
917
    ///
918
    /// Helper function.  The return value depends the mutable state and also the `config`.
919
    ///
920
    /// This is how we implement dormancy.
921
350
    fn effective_parallelism(&self) -> usize {
922
350
        match self.dormancy {
923
342
            Dormancy::Active => usize::from(u8::from(self.config.parallelism)),
924
8
            Dormancy::Dormant => 0,
925
        }
926
350
    }
927
}
928

            
929
impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
930
    /// Record a download outcome.
931
    ///
932
    /// Final act of the descriptor download task.
933
    /// `got` is from [`download_descriptor`](Manager::download_descriptor).
934
92
    fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
935
92
        let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
936
92
            Some(ri) => ri,
937
            None => {
938
                debug!("bridge descriptor download completed for no-longer-configured bridge");
939
                return;
940
            }
941
        };
942

            
943
92
        let insert = match got {
944
28
            Ok(Downloaded { desc, refetch }) => {
945
                // Successful download.  Schedule the refetch, and we'll insert Ok.
946

            
947
28
                self.refetch_schedule.push(RefetchEntry {
948
28
                    when: refetch,
949
28
                    bridge: bridge.clone(),
950
28
                    retry_delay: (),
951
28
                });
952

            
953
28
                Ok(desc)
954
            }
955
64
            Err(err) => {
956
                // Failed.  Schedule the retry, and we'll insert Err.
957

            
958
64
                let mut retry_delay =
959
64
                    retry_delay.unwrap_or_else(|| RetryDelay::from_duration(self.config.retry));
960

            
961
64
                let retry = err.retry_time();
962
                // We retry at least as early as
963
64
                let now = self.mgr.runtime.now();
964
64
                let retry = retry.absolute(now, || retry_delay.next_delay(&mut rand::rng()));
965
                // Retry at least as early as max_refetch.  That way if a bridge is
966
                // misconfigured we will see it be fixed eventually.
967
64
                let retry = {
968
64
                    let earliest = now;
969
64
                    let latest = || now + self.config.max_refetch;
970
64
                    match retry {
971
                        AbsRetryTime::Immediate => earliest,
972
                        AbsRetryTime::Never => latest(),
973
64
                        AbsRetryTime::At(i) => i.clamp(earliest, latest()),
974
                    }
975
                };
976
64
                self.retry_schedule.push(RefetchEntry {
977
64
                    when: retry,
978
64
                    bridge: bridge.clone(),
979
64
                    retry_delay,
980
64
                });
981

            
982
64
                Err(Box::new(err) as _)
983
            }
984
        };
985

            
986
92
        self.modify_current(|current| current.insert(bridge, insert));
987
92
    }
988
}
989

            
990
impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
991
    /// Downloads a descriptor.
992
    ///
993
    /// The core of the descriptor download task
994
    /// launched by `State::consider_launching`.
995
    ///
996
    /// Uses Mockable::download to actually get the document.
997
    /// So most of this function is parsing and checking.
998
    ///
999
    /// The returned value is precisely the `got` input to
    /// [`record_download_outcome`](StateGuard::record_download_outcome).
92
    async fn download_descriptor(
92
        &self,
92
        mockable: M,
92
        bridge: &BridgeConfig,
92
        config: &BridgeDescDownloadConfig,
92
    ) -> Result<Downloaded, Error> {
        // convenience alias, capturing the usual parameters from our variables.
92
        let process_document = |text| process_document(&self.runtime, config, text);
116
        let store = || {
116
            self.store
116
                .lock()
116
                .map_err(|_| internal!("bridge descriptor store poisoned"))
116
        };
92
        let cache_entry: Option<CachedBridgeDescriptor> = (|| store()?.lookup_bridgedesc(bridge))()
92
            .unwrap_or_else(|err| {
                error_report!(
                    err,
                    r#"bridge descriptor cache lookup failed, for "{}""#,
                    sensitive(bridge),
                );
                None
            });
92
        let now = self.runtime.wallclock();
92
        let cached_good: Option<Downloaded> = if let Some(cached) = &cache_entry {
16
            if cached.fetched > now {
                // was fetched "in the future"
                None
            } else {
                // let's see if it's any use
16
                match process_document(&cached.document) {
4
                    Err(err) => {
                        // We had a doc in the cache but our attempt to use it failed
                        // We wouldn't have written a bad cache entry.
                        // So one of the following must be true:
                        //  * We were buggy or are stricter now or something
                        //  * The document was valid but its validity time has expired
                        // In any case we can't reuse it.
                        // (This happens in normal operation, when a document expires.)
4
                        trace!(r#"cached document for "{}" invalid: {}"#, &bridge, err);
4
                        None
                    }
12
                    Ok(got) => {
                        // The cached document looks valid.
                        // But how long ago did we fetch it?
                        // We need to enforce max_refresh even for still-valid documents.
12
                        if now.duration_since(cached.fetched).ok() <= Some(config.max_refetch) {
                            // Was fetched recently, too.  We can just reuse it.
4
                            return Ok(got);
8
                        }
8
                        Some(got)
                    }
                }
            }
        } else {
76
            None
        };
        // If cached_good is Some, we found a plausible cache entry; if we got here, it was
        // past its max_refresh.  So in that case we want to send a request with
        // if-modified-since.  If we get Not Modified, we can reuse it (and update the fetched time).
88
        let if_modified_since = cached_good
88
            .as_ref()
88
            .map(|got| got.desc.as_ref().published());
88
        debug!(
            r#"starting download for "{}"{}"#,
            bridge,
            match if_modified_since {
                Some(ims) => format!(
                    " if-modified-since {}",
                    humantime::format_rfc3339_seconds(ims),
                ),
                None => "".into(),
            }
        );
88
        let text = mockable
88
            .clone()
88
            .download(&self.runtime, &self.circmgr, bridge, if_modified_since)
88
            .await?;
24
        let (document, got) = if let Some(text) = text {
20
            let got = process_document(&text)?;
20
            (text, got)
4
        } else if let Some(cached) = cached_good {
4
            (
4
                cache_entry
4
                    .expect("cached_good but not cache_entry")
4
                    .document,
4
                cached,
4
            )
        } else {
            return Err(internal!("download gave None but no if-modified-since").into());
        };
        // IEFI catches cache store errors, which we log but don't do anything else with
24
        (|| {
24
            let cached = CachedBridgeDescriptor {
24
                document,
24
                fetched: now, // this is from before we started the fetch, which is correct
24
            };
            // Calculate when the cache should forget about this.
            // We want to add a bit of slop for the purposes of mild clock skew handling,
            // etc., and the prefetch time is a good proxy for that.
24
            let until = got
24
                .refetch
24
                .checked_add(config.prefetch)
24
                .unwrap_or(got.refetch /*uh*/);
24
            store()?.store_bridgedesc(bridge, cached, until)?;
24
            Ok(())
        })()
24
        .unwrap_or_else(|err: crate::Error| {
            error_report!(err, "failed to cache downloaded bridge descriptor",);
        });
24
        Ok(got)
92
    }
}
/// Processes and analyses a textual descriptor document into a `Downloaded`
///
/// Parses it, checks the signature, checks the document validity times,
/// and if that's all good, calculates when will want to refetch it.
64
fn process_document<R: Runtime>(
64
    runtime: &R,
64
    config: &BridgeDescDownloadConfig,
64
    text: &str,
64
) -> Result<Downloaded, Error> {
64
    let desc = RouterDesc::parse(text)?;
    // We *could* just trust this because we have trustworthy provenance
    // we know that the channel machinery authenticated the identity keys in `bridge`.
    // But let's do some cross-checking anyway.
    // `check_signature` checks the self-signature.
60
    let desc = desc.check_signature().map_err(Arc::new)?;
56
    let now = runtime.wallclock();
56
    desc.is_valid_at(&now)?;
    // Justification that use of "dangerously" is correct:
    // 1. We have checked this just above, so it is valid now.
    // 2. We are extracting the timeout and implement our own refetch logic using expires.
48
    let (desc, (_, expires)) = desc.dangerously_into_parts();
    // Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
    // The following situations can result in a nominally-expired descriptor being used:
    //
    // 1. We primarily enforce the timeout by looking at the expiry time,
    //    subtracting a configured constant, and scheduling the start of a refetch then.
    //    If it takes us longer to do the retry, than the prefetch constant,
    //    we'll still be providing the old descriptor to consumers in the meantime.
    //
    // 2. We apply a minimum time before we will refetch a descriptor.
    //    So if the validity time is unreasonably short, we'll use it beyond that time.
    //
    // 3. Clock warping could confuse this algorithm.  This is inevitable because we
    //    are relying on calendar times (SystemTime) in the descriptor, and because
    //    we don't have a mechanism for being told about clock warps rather than the
    //    passage of time.
    //
    // We think this is all OK given that a bridge descriptor is used for trying to
    // connect to the bridge itself.  In particular, we don't want to completely trust
    // bridges to control our retry logic.
48
    let refetch = match expires {
48
        Some(expires) => expires
48
            .checked_sub(config.prefetch)
48
            .ok_or(Error::ExtremeValidityTime)?,
        None => now
            .checked_add(config.max_refetch)
            .ok_or(Error::ExtremeValidityTime)?,
    };
48
    let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
48
    let desc = BridgeDesc::new(Arc::new(desc));
48
    Ok(Downloaded { desc, refetch })
64
}
/// Task which waits for the timeout, and requeues bridges that need to be refetched
///
/// This task's job is to execute the wakeup instructions provided via `updates`.
///
/// `updates` is the receiving end of [`State`]'s `earliest_timeout`,
/// which is maintained to be the earliest time any of the schedules says we should wake up
/// (liveness property *Timeout*).
16
async fn timeout_task<R: Runtime, M: Mockable<R>>(
16
    runtime: R,
16
    inner: Weak<Manager<R, M>>,
16
    update: postage::watch::Receiver<Option<Instant>>,
16
) {
    /// Requeue things in `*_schedule` whose time for action has arrived
    ///
    /// `retry_delay_map` converts `retry_delay` from the schedule (`RetryDelay` or `()`)
    /// into the `Option` which appears in [`QueuedEntry`].
    ///
    /// Helper function.  Idempotent.
44
    fn requeue_as_required<TT: Ord + Copy + Debug, RD, RDM: Fn(RD) -> Option<RetryDelay>>(
44
        queued: &mut VecDeque<QueuedEntry>,
44
        schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
44
        now: TT,
44
        retry_delay_map: RDM,
44
    ) {
92
        while let Some(ent) = schedule.peek() {
54
            if ent.when > now {
6
                break;
48
            }
48
            let re = schedule.pop().expect("schedule became empty!");
48
            let bridge = re.bridge;
48
            let retry_delay = retry_delay_map(re.retry_delay);
48
            queued.push_back(QueuedEntry {
48
                bridge,
48
                retry_delay,
48
            });
        }
44
    }
16
    let mut next_wakeup = Some(runtime.now());
16
    let mut update = update.fuse();
    loop {
114
        select! {
            // Someone modified the schedules, and sent us a new earliest timeout
114
            changed = update.next() => {
                // changed is Option<Option< >>.
                // The outer Option is from the Stream impl for watch::Receiver - None means EOF.
                // The inner Option is Some(wakeup_time), or None meaning "wait indefinitely"
88
                next_wakeup = if let Some(changed) = changed {
76
                    changed
                } else {
                    // Oh, actually, the watch::Receiver is EOF - we're to shut down
12
                    break
                }
            },
            // Wait until the specified earliest wakeup time
92
            () = async {
92
                if let Some(next_wakeup) = next_wakeup {
70
                    let now = runtime.now();
70
                    if next_wakeup > now {
46
                        let duration = next_wakeup - now;
46
                        runtime.sleep(duration).await;
24
                    }
                } else {
                    #[allow(clippy::semicolon_if_nothing_returned)] // rust-clippy/issues/9729
22
                    { future::pending().await }
                }
114
            }.fuse() => {
                // We have reached the pre-programmed time.  Check what needs doing.
26
                let inner = if let Some(i) = inner.upgrade() { i } else { break; };
22
                let mut state = inner.lock_then_process();
22
                let state = &mut **state; // Do the DerefMut once so we can borrow fields
22
                requeue_as_required(
22
                    &mut state.queued,
22
                    &mut state.refetch_schedule,
22
                    runtime.wallclock(),
                    |()| None,
                );
22
                requeue_as_required(
22
                    &mut state.queued,
22
                    &mut state.retry_schedule,
22
                    runtime.now(),
                    Some,
                );
                // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
                // to make further progress and restore the liveness properties.
            }
        }
    }
16
}
/// Error which occurs during bridge descriptor manager startup
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum StartupError {
    /// No circuit manager in the directory manager
    #[error(
        "tried to create bridge descriptor manager from directory manager with no circuit manager"
    )]
    MissingCircMgr,
    /// Unable to spawn task
    //
    // TODO lots of our Errors have a variant exactly like this.
    // Maybe we should make a struct tor_error::SpawnError.
    #[error("Unable to spawn {spawning}")]
    Spawn {
        /// What we were trying to spawn.
        spawning: &'static str,
        /// What happened when we tried to spawn it.
        #[source]
        cause: Arc<SpawnError>,
    },
}
impl HasKind for StartupError {
    fn kind(&self) -> ErrorKind {
        use ErrorKind as EK;
        use StartupError as SE;
        match self {
            SE::MissingCircMgr => EK::Internal,
            SE::Spawn { cause, .. } => cause.kind(),
        }
    }
}
/// An error which occurred trying to obtain the descriptor for a particular bridge
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
    /// Couldn't establish a circuit to the bridge
    #[error("Failed to establish circuit")]
    CircuitFailed(#[from] tor_circmgr::Error),
    /// Couldn't establish a directory stream to the bridge
    #[error("Failed to establish directory stream")]
    StreamFailed(#[source] tor_circmgr::Error),
    /// Directory request failed
    #[error("Directory request failed")]
    RequestFailed(#[from] tor_dirclient::RequestFailedError),
    /// Failed to parse descriptor in response
    #[error("Failed to parse descriptor in response")]
    ParseFailed(#[from] tor_netdoc::Error),
    /// Signature check failed
    #[error("Signature check failed")]
    SignatureCheckFailed(#[from] Arc<signature::Error>),
    /// Obtained descriptor but it is outside its validity time
    #[error("Descriptor is outside its validity time, as supplied")]
    BadValidityTime(#[from] tor_checkable::TimeValidityError),
    /// A bridge descriptor has very extreme validity times
    /// such that our refetch time calculations overflow.
    #[error("Descriptor validity time range is too extreme for us to cope with")]
    ExtremeValidityTime,
    /// There was a programming error somewhere in our code, or the calling code.
    #[error("Programming error")]
    Bug(#[from] tor_error::Bug),
    /// Error used for testing
    #[cfg(test)]
    #[error("Error for testing, {0:?}, retry at {1:?}")]
    TestError(&'static str, RetryTime),
}
impl HasKind for Error {
    fn kind(&self) -> ErrorKind {
        use Error as E;
        use ErrorKind as EK;
        let bridge_protocol_violation = EK::TorAccessFailed;
        match self {
            // We trust that tor_circmgr returns TorAccessFailed when it ought to.
            E::CircuitFailed(e) => e.kind(),
            E::StreamFailed(e) => e.kind(),
            E::RequestFailed(e) => e.kind(),
            E::ParseFailed(..) => bridge_protocol_violation,
            E::SignatureCheckFailed(..) => bridge_protocol_violation,
            E::ExtremeValidityTime => bridge_protocol_violation,
            E::BadValidityTime(..) => EK::ClockSkew,
            E::Bug(e) => e.kind(),
            #[cfg(test)]
            E::TestError(..) => EK::Internal,
        }
    }
}
impl HasRetryTime for Error {
64
    fn retry_time(&self) -> RetryTime {
        use Error as E;
        use RetryTime as R;
64
        match self {
            // Errors with their own retry times
            E::CircuitFailed(e) => e.retry_time(),
            // Remote misbehavior, maybe the network is being strange?
            E::StreamFailed(..) => R::AfterWaiting,
            E::RequestFailed(..) => R::AfterWaiting,
            // Remote misconfiguration, detected *after* we successfully made the channel
            // (so not a network problem).  We'll say "never" for RetryTime,
            // even though actually we will in fact retry in at most `max_refetch`.
            E::ParseFailed(..) => R::Never,
            E::SignatureCheckFailed(..) => R::Never,
            E::BadValidityTime(..) => R::Never,
            E::ExtremeValidityTime => R::Never,
            // Probably, things are broken here, rather than remotely.
            E::Bug(..) => R::Never,
            #[cfg(test)]
64
            E::TestError(_, retry) => *retry,
        }
64
    }
}
impl BridgeDescError for Error {}
impl State {
    /// Consistency check (for testing)
    ///
    /// `input` should be what was passed to `set_bridges` (or `None` if not known).
    ///
    /// Does not make any changes.
    /// Only takes `&mut` because postage::watch::Sender::borrow` wants it.
    #[cfg(test)]
92
    fn check_consistency<'i, R, I>(&mut self, runtime: &R, input: Option<I>)
92
    where
92
        R: Runtime,
92
        I: IntoIterator<Item = &'i BridgeKey>,
    {
        /// Where we found a thing was Tracked
        #[derive(Debug, Clone, Copy, Eq, PartialEq)]
        enum Where {
            /// Found in `running`
            Running,
            /// Found in `queued`
            Queued,
            /// Found in the schedule `sch`
            Schedule {
                sch_name: &'static str,
                /// Starts out as `false`, set to `true` when we find this in `current`
                found_in_current: bool,
            },
        }
        /// Records the expected input from `input`, and what we have found so far
        struct Tracked {
            /// Were we told what the last `set_bridges` call got as input?
            known_input: bool,
            /// `Some` means we have seen this bridge in one our records (other than `current`)
            tracked: HashMap<BridgeKey, Option<Where>>,
            /// Earliest instant found in any schedule
            earliest: Option<Instant>,
        }
92
        let mut tracked = if let Some(input) = input {
520
            let tracked = input.into_iter().map(|b| (b.clone(), None)).collect();
92
            Tracked {
92
                tracked,
92
                known_input: true,
92
                earliest: None,
92
            }
        } else {
            Tracked {
                tracked: HashMap::new(),
                known_input: false,
                earliest: None,
            }
        };
        impl Tracked {
            /// Note that `bridge` is Tracked
520
            fn note(&mut self, where_: Where, b: &BridgeKey) {
520
                match self.tracked.get(b) {
                    // Invariant *Tracked* - ie appears at most once
                    Some(Some(prev_where)) => {
                        panic!("duplicate {:?} {:?} {:?}", prev_where, where_, b);
                    }
                    // Invariant *Input (every tracked bridge is was in input)*
                    None if self.known_input => {
                        panic!("unexpected {:?} {:?}", where_, b);
                    }
                    // OK, we've not seen it before, note it as being here
520
                    _ => {
520
                        self.tracked.insert(b.clone(), Some(where_));
520
                    }
                }
520
            }
        }
        /// Walk `schedule` and update `tracked` (including `tracked.earliest`)
        ///
        /// Check invariant *Tracked* and *Schedule* wrt this schedule.
        #[cfg(test)]
184
        fn walk_sch<TT: Ord + Copy + Debug, RD, CT: Fn(TT) -> Instant>(
184
            tracked: &mut Tracked,
184
            sch_name: &'static str,
184
            schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
184
            conv_time: CT,
184
        ) {
184
            let where_ = Where::Schedule {
184
                sch_name,
184
                found_in_current: false,
184
            };
184
            if let Some(first) = schedule.peek() {
                // Of course this is a heap, so this ought to be a wasteful scan,
                // but, indirectly,this tests our implementation of `Ord` for `RefetchEntry`.
600
                for re in schedule {
446
                    tracked.note(where_, &re.bridge);
446
                }
154
                let scanned = schedule
154
                    .iter()
154
                    .map(|re| re.when)
154
                    .min()
154
                    .expect("schedule empty!");
154
                assert_eq!(scanned, first.when);
154
                tracked.earliest = Some(
154
                    [tracked.earliest, Some(conv_time(scanned))]
154
                        .into_iter()
154
                        .flatten()
154
                        .min()
154
                        .expect("flatten of chain Some was empty"),
154
                );
30
            }
184
        }
        // *Timeout* (prep)
        //
        // This will fail if there is clock skew, but won't mind if
        // the earliest refetch time is in the past.
92
        let now_wall = runtime.wallclock();
92
        let now_mono = runtime.now();
92
        let adj_wall = |wallclock: SystemTime| {
            // Good grief what a palaver!
78
            if let Ok(ahead) = wallclock.duration_since(now_wall) {
66
                now_mono + ahead
12
            } else if let Ok(behind) = now_wall.duration_since(wallclock) {
12
                now_mono
12
                    .checked_sub(behind)
12
                    .expect("time subtraction underflow")
            } else {
                panic!("times should be totally ordered!")
            }
78
        };
        // *Tracked*
        //
        // We walk our data structures in turn
92
        for b in self.running.keys() {
66
            tracked.note(Where::Running, b);
66
        }
100
        for qe in &self.queued {
8
            tracked.note(Where::Queued, &qe.bridge);
8
        }
92
        walk_sch(&mut tracked, "refetch", &self.refetch_schedule, adj_wall);
92
        walk_sch(&mut tracked, "retry", &self.retry_schedule, |t| t);
        // *Current*
476
        for b in self.current.keys() {
476
            let found = tracked
476
                .tracked
476
                .get_mut(b)
476
                .and_then(Option::as_mut)
476
                .unwrap_or_else(|| panic!("current but untracked {:?}", b));
            if let Where::Schedule {
446
                found_in_current, ..
476
            } = found
446
            {
446
                *found_in_current = true;
446
            }
        }
        // *Input (sense: every input bridge is tracked)*
        //
        // (Will not cope if spawn ever failed, since that violates the invariant.)
612
        for (b, where_) in &tracked.tracked {
520
            match where_ {
                None => panic!("missing {}", &b),
                Some(Where::Schedule {
446
                    sch_name,
446
                    found_in_current,
                }) => {
446
                    assert!(found_in_current, "not-Schedule {} {}", &b, sch_name);
                }
74
                _ => {}
            }
        }
        // *Limit*
92
        let parallelism = self.effective_parallelism();
92
        assert!(self.running.len() <= parallelism);
        // *Running*
92
        assert!(self.running.len() == parallelism || self.queued.is_empty());
        // *Timeout* (final)
92
        assert_eq!(tracked.earliest, *self.earliest_timeout.borrow());
92
    }
}