1
//! Implement Tor's sort-of-Pareto estimator for circuit build timeouts.
2
//!
3
//! Our build times don't truly follow a
4
//! [Pareto](https://en.wikipedia.org/wiki/Pareto_distribution)
5
//! distribution; instead they seem to be closer to a
6
//! [Fréchet](https://en.wikipedia.org/wiki/Fr%C3%A9chet_distribution)
7
//! distribution.  But those are hard to work with, and we only care
8
//! about the right tail, so we're using Pareto instead.
9
//!
10
//! This estimator also includes several heuristics and kludges to
11
//! try to behave better on unreliable networks.
12
//! For more information on the exact algorithms and their rationales,
13
//! see [`path-spec.txt`](https://gitlab.torproject.org/tpo/core/torspec/-/blob/master/path-spec.txt).
14

            
15
use serde::{Deserialize, Serialize};
16
use std::collections::{BTreeMap, HashMap, VecDeque};
17
use std::time::Duration;
18
use tor_netdir::params::NetParameters;
19

            
20
use super::Action;
21
use tor_persist::JsonValue;
22

            
23
/// How many circuit build time observations do we record?
24
const TIME_HISTORY_LEN: usize = 1000;
25

            
26
/// How many circuit success-versus-timeout observations do we record
27
/// by default?
28
const SUCCESS_HISTORY_DEFAULT_LEN: usize = 20;
29

            
30
/// How many milliseconds wide is each bucket in our histogram?
31
const BUCKET_WIDTH_MSEC: u32 = 10;
32

            
33
/// A circuit build time or timeout duration, measured in milliseconds.
34
///
35
/// Requires that we don't care about tracking timeouts above u32::MAX
36
/// milliseconds (about 49 days).
37
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
38
#[serde(transparent)]
39
struct MsecDuration(u32);
40

            
41
impl MsecDuration {
42
    /// Convert a Duration into a MsecDuration, saturating
43
    /// extra-high values to u32::MAX milliseconds.
44
5160
    fn new_saturating(d: &Duration) -> Self {
45
5160
        let msec = std::cmp::min(d.as_millis(), u128::from(u32::MAX)) as u32;
46
5160
        MsecDuration(msec)
47
5160
    }
48
}
49

            
50
/// Module to hold const assertions.
51
///
52
/// This is a separate module so we can change the clippy warnings on it.
53
#[allow(clippy::checked_conversions)]
54
mod assertion {
55
    // If this assertion is untrue, then we can't safely use u16 fields in
56
    // time_histogram.
57
    const _: () = assert!(super::TIME_HISTORY_LEN <= u16::MAX as usize);
58
}
59

            
60
/// A history of circuit timeout observations, used to estimate our
61
/// likely circuit timeouts.
62
#[derive(Debug, Clone)]
63
struct History {
64
    /// Our most recent observed circuit construction times.
65
    ///
66
    /// For the purpose of this estimator, a circuit counts as
67
    /// "constructed" when a certain "significant" hop (typically the third)
68
    /// is completed.
69
    time_history: BoundedDeque<MsecDuration>,
70

            
71
    /// A histogram representation of the values in [`History::time_history`].
72
    ///
73
    /// This histogram is implemented as a sparse map from the center
74
    /// value of each histogram bucket to the number of entries in
75
    /// that bucket.  It is completely derivable from time_history; we
76
    /// keep it separate here for efficiency.
77
    time_histogram: BTreeMap<MsecDuration, u16>,
78

            
79
    /// Our most recent circuit timeout statuses.
80
    ///
81
    /// Each `true` value represents a successfully completed circuit
82
    /// (all hops).  Each `false` value represents a circuit that
83
    /// timed out after having completed at least one hop.
84
    success_history: BoundedDeque<bool>,
85
}
86

            
87
impl History {
88
    /// Initialize a new empty `History` with no observations.
89
486
    fn new_empty() -> Self {
90
486
        History {
91
486
            time_history: BoundedDeque::new(TIME_HISTORY_LEN),
92
486
            time_histogram: BTreeMap::new(),
93
486
            success_history: BoundedDeque::new(SUCCESS_HISTORY_DEFAULT_LEN),
94
486
        }
95
486
    }
96

            
97
    /// Remove all observations from this `History`.
98
6
    fn clear(&mut self) {
99
6
        self.time_history.clear();
100
6
        self.time_histogram.clear();
101
6
        self.success_history.clear();
102
6
    }
103

            
104
    /// Change the number of successes to record in our success
105
    /// history to `n`.
106
8
    fn set_success_history_len(&mut self, n: usize) {
107
8
        self.success_history.set_max_len(n);
108
8
    }
109

            
110
    /// Change the number of circuit time observations to record in
111
    /// our time history to `n`.
112
    ///
113
    /// This is a testing-only function.
114
    ///
115
    /// # Limitations
116
    ///
117
    /// This method doesn't update time_histogram based on removed entries.
118
    /// That doesn't matter for the tests that use it,
119
    /// but if we ever try to use it in production, we'll need to fix that.
120
    #[cfg(test)]
121
2
    fn set_time_history_len(&mut self, n: usize) {
122
2
        self.time_history.set_max_len(n);
123
2
    }
124

            
125
    /// Construct a new `History` from an iterator representing a sparse
126
    /// histogram of values.
127
    ///
128
    /// The input must be a sequence of `(D,N)` tuples, where each `D`
129
    /// represents a circuit build duration, and `N` represents the
130
    /// number of observations with that duration.
131
    ///
132
    /// These observations are shuffled into a random order, then
133
    /// added to a new History.
134
470
    fn from_sparse_histogram<I>(iter: I) -> Self
135
470
    where
136
470
        I: Iterator<Item = (MsecDuration, u16)>,
137
    {
138
        use rand::seq::{IteratorRandom, SliceRandom};
139
470
        let mut rng = rand::rng();
140

            
141
        // We want to build a vector with the elements of the old histogram in
142
        // random order, but we want to defend ourselves against bogus inputs
143
        // that would take too much RAM.
144
470
        let mut observations = iter
145
470
            .take(TIME_HISTORY_LEN) // limit number of bins
146
1056
            .flat_map(|(dur, n)| std::iter::repeat_n(dur, n as usize))
147
470
            .choose_multiple(&mut rng, TIME_HISTORY_LEN);
148
        // IteratorRand::choose_multiple doesn't guarantee anything about the order of its output.
149
470
        observations.shuffle(&mut rng);
150

            
151
470
        let mut result = History::new_empty();
152
4486
        for obs in observations {
153
4016
            result.add_time(obs);
154
4016
        }
155

            
156
470
        result
157
470
    }
158

            
159
    /// Return an iterator yielding a sparse histogram of the circuit build
160
    /// time values in this `History`.
161
    ///
162
    /// Each histogram entry is a `(D,N)` tuple, where `D` is the
163
    /// center of a histogram bucket, and `N` is the number of
164
    /// observations in that bucket.
165
    ///
166
    /// Buckets with `N=0` are omitted.  Buckets are yielded in order.
167
756
    fn sparse_histogram(&self) -> impl Iterator<Item = (MsecDuration, u16)> + '_ {
168
2599
        self.time_histogram.iter().map(|(d, n)| (*d, *n))
169
756
    }
170

            
171
    /// Return the center value for the bucket containing `time`.
172
8950
    fn bucket_center(time: MsecDuration) -> MsecDuration {
173
8950
        let idx = time.0 / BUCKET_WIDTH_MSEC;
174
8950
        let msec = (idx * BUCKET_WIDTH_MSEC) + (BUCKET_WIDTH_MSEC) / 2;
175
8950
        MsecDuration(msec)
176
8950
    }
177

            
178
    /// Increment the histogram bucket containing `time` by one.
179
8532
    fn inc_bucket(&mut self, time: MsecDuration) {
180
8532
        let center = History::bucket_center(time);
181
8532
        *self.time_histogram.entry(center).or_insert(0) += 1;
182
8532
    }
183

            
184
    /// Decrement the histogram bucket containing `time` by one, removing
185
    /// it if it becomes 0.
186
410
    fn dec_bucket(&mut self, time: MsecDuration) {
187
        use std::collections::btree_map::Entry;
188
410
        let center = History::bucket_center(time);
189
410
        match self.time_histogram.entry(center) {
190
2
            Entry::Vacant(_) => {
191
2
                // this is a bug.
192
2
            }
193
408
            Entry::Occupied(e) if e.get() <= &1 => {
194
4
                e.remove();
195
4
            }
196
404
            Entry::Occupied(mut e) => {
197
404
                *e.get_mut() -= 1;
198
404
            }
199
        }
200
410
    }
201

            
202
    /// Add `time` to our list of circuit build time observations, and
203
    /// adjust the histogram accordingly.
204
8516
    fn add_time(&mut self, time: MsecDuration) {
205
8516
        match self.time_history.push_back(time) {
206
8112
            None => {}
207
404
            Some(removed_time) => {
208
404
                // `removed_time` just fell off the end of the deque:
209
404
                // remove it from the histogram.
210
404
                self.dec_bucket(removed_time);
211
404
            }
212
        }
213
8516
        self.inc_bucket(time);
214
8516
    }
215

            
216
    /// Return the number of observations in our time history.
217
    ///
218
    /// This will always be `<= TIME_HISTORY_LEN`.
219
756
    fn n_times(&self) -> usize {
220
756
        self.time_history.len()
221
756
    }
222

            
223
    /// Record a success (true) or timeout (false) in our record of whether
224
    /// circuits timed out or not.
225
4932
    fn add_success(&mut self, succeeded: bool) {
226
4932
        self.success_history.push_back(succeeded);
227
4932
    }
228

            
229
    /// Return the number of timeouts recorded in our success history.
230
92
    fn n_recent_timeouts(&self) -> usize {
231
1224
        self.success_history.iter().filter(|x| !**x).count()
232
92
    }
233

            
234
    /// Helper: return the `n` most frequent histogram bins.
235
30
    fn n_most_frequent_bins(&self, n: usize) -> Vec<(MsecDuration, u16)> {
236
        use itertools::Itertools;
237
        // we use cmp::Reverse here so that we can use k_smallest as
238
        // if it were "k_largest".
239
        use std::cmp::Reverse;
240

            
241
        // We want the buckets that have the _largest_ counts; we want
242
        // to break ties in favor of the _smallest_ values.  So we
243
        // apply Reverse only to the counts before passing the tuples
244
        // to k_smallest.
245

            
246
30
        self.sparse_histogram()
247
1267
            .map(|(center, count)| (Reverse(count), center))
248
            // (k_smallest runs in O(n_bins * lg(n))
249
30
            .k_smallest(n)
250
99
            .map(|(Reverse(count), center)| (center, count))
251
30
            .collect()
252
30
    }
253

            
254
    /// Return an estimator for the `X_m` of our Pareto distribution,
255
    /// by looking at the `n_modes` most frequently filled histogram
256
    /// bins.
257
    ///
258
    /// It is not a true `X_m` value, since there are definitely
259
    /// values less than this, but it seems to work as a decent
260
    /// heuristic.
261
    ///
262
    /// Return `None` if we have no observations.
263
24
    fn estimate_xm(&self, n_modes: usize) -> Option<u32> {
264
        // From path-spec:
265
        //   Tor clients compute the Xm parameter using the weighted
266
        //   average of the midpoints of the 'cbtnummodes' (10)
267
        //   most frequently occurring 10ms histogram bins.
268

            
269
        // The most frequently used bins.
270
24
        let bins = self.n_most_frequent_bins(n_modes);
271
        // Total number of observations in these bins.
272
24
        let n_observations: u16 = bins.iter().map(|(_, n)| n).sum();
273
        // Sum of all observations in these bins.
274
24
        let total_observations: u64 = bins
275
24
            .iter()
276
82
            .map(|(d, n)| u64::from(d.0 * u32::from(*n)))
277
24
            .sum();
278

            
279
24
        if n_observations == 0 {
280
6
            None
281
        } else {
282
18
            Some((total_observations / u64::from(n_observations)) as u32)
283
        }
284
24
    }
285

            
286
    /// Compute a maximum-likelihood pareto distribution based on this
287
    /// history, computing `X_m` based on the `n_modes` most frequent
288
    /// histograms.
289
    ///
290
    /// Return None if we have no observations.
291
20
    fn pareto_estimate(&self, n_modes: usize) -> Option<ParetoDist> {
292
20
        let xm = self.estimate_xm(n_modes)?;
293

            
294
        // From path-spec:
295
        //     alpha = n/(Sum_n{ln(MAX(Xm, x_i))} - n*ln(Xm))
296

            
297
16
        let n = self.time_history.len();
298
16
        let sum_of_log_observations: f64 = self
299
16
            .time_history
300
16
            .iter()
301
10068
            .map(|m| f64::from(std::cmp::max(m.0, xm)).ln())
302
16
            .sum();
303
16
        let sum_of_log_xm = (n as f64) * f64::from(xm).ln();
304

            
305
        // We're computing 1/alpha here, instead of alpha.  This avoids
306
        // division by zero, and has the advantage of being what our
307
        // quantile estimator actually needs.
308
16
        let inv_alpha = (sum_of_log_observations - sum_of_log_xm) / (n as f64);
309

            
310
16
        Some(ParetoDist {
311
16
            x_m: f64::from(xm),
312
16
            inv_alpha,
313
16
        })
314
20
    }
315
}
316

            
317
/// A Pareto distribution, for use in estimating timeouts.
318
///
319
/// Values are represented by a number of milliseconds.
320
#[derive(Debug)]
321
struct ParetoDist {
322
    /// The lower bound for the pareto distribution.
323
    x_m: f64,
324
    /// The inverse of the alpha parameter in the pareto distribution.
325
    ///
326
    /// (We use 1/alpha here to save a step in [`ParetoDist::quantile`].
327
    inv_alpha: f64,
328
}
329

            
330
impl ParetoDist {
331
    /// Compute an inverse CDF for this distribution.
332
    ///
333
    /// Given a `q` value between 0 and 1, compute a distribution `v`
334
    /// value such that `q` of the Pareto Distribution is expected to
335
    /// be less than `v`.
336
    ///
337
    /// If `q` is out of bounds, it is clamped to [0.0, 1.0].
338
32
    fn quantile(&self, q: f64) -> f64 {
339
32
        let q = q.clamp(0.0, 1.0);
340
32
        self.x_m / ((1.0 - q).powf(self.inv_alpha))
341
32
    }
342
}
343

            
344
/// A set of parameters determining the behavior of a ParetoTimeoutEstimator.
345
///
346
/// These are typically derived from a set of consensus parameters.
347
#[derive(Clone, Debug)]
348
pub(crate) struct Params {
349
    /// Should we use our estimates when deciding on circuit timeouts.
350
    ///
351
    /// When this is false, our timeouts are fixed to the default.
352
    use_estimates: bool,
353
    /// How many observations must we have made before we can use our
354
    /// Pareto estimators to guess a good set of timeouts?
355
    min_observations: u16,
356
    /// Which hop is the "significant hop" we should use when recording circuit
357
    /// build times?  (Watch out! This is zero-indexed.)
358
    significant_hop: u8,
359
    /// A quantile (in range [0.0,1.0]) describing a point in the
360
    /// Pareto distribution to use when determining when a circuit
361
    /// should be treated as having "timed out".
362
    ///
363
    /// (A "timed out" circuit continues building for measurement
364
    /// purposes, but can't be used for traffic.)
365
    timeout_quantile: f64,
366
    /// A quantile (in range [0.0,1.0]) describing a point in the Pareto
367
    /// distribution to use when determining when a circuit should be
368
    /// "abandoned".
369
    ///
370
    /// (An "abandoned" circuit is stopped entirely, and not included
371
    /// in measurements.
372
    abandon_quantile: f64,
373
    /// Default values to return from the `timeouts` function when we
374
    /// have no observations.
375
    default_thresholds: (Duration, Duration),
376
    /// Number of histogram buckets to use when determining the Xm estimate.
377
    ///
378
    /// (See [`History::estimate_xm`] for details.)
379
    n_modes_for_xm: usize,
380
    /// How many entries do we record in our success/timeout history?
381
    success_history_len: usize,
382
    /// How many timeouts should we allow in our success/timeout history
383
    /// before we assume that network has changed in a way that makes
384
    /// our estimates completely wrong?
385
    reset_after_timeouts: usize,
386
    /// Minimum base timeout to ever infer or return.
387
    min_timeout: Duration,
388
}
389

            
390
impl Default for Params {
391
476
    fn default() -> Self {
392
476
        Params {
393
476
            use_estimates: true,
394
476
            min_observations: 100,
395
476
            significant_hop: 2,
396
476
            timeout_quantile: 0.80,
397
476
            abandon_quantile: 0.99,
398
476
            default_thresholds: (Duration::from_secs(60), Duration::from_secs(60)),
399
476
            n_modes_for_xm: 10,
400
476
            success_history_len: SUCCESS_HISTORY_DEFAULT_LEN,
401
476
            reset_after_timeouts: 18,
402
476
            min_timeout: Duration::from_millis(10),
403
476
        }
404
476
    }
405
}
406

            
407
impl From<&NetParameters> for Params {
408
6
    fn from(p: &NetParameters) -> Params {
409
        // Because of the underlying bounds, the "unwrap_or_else"
410
        // conversions here should be impossible, and the "as"
411
        // conversions should always be in-range.
412

            
413
6
        let timeout = p
414
6
            .cbt_initial_timeout
415
6
            .try_into()
416
6
            .unwrap_or_else(|_| Duration::from_secs(60));
417
6
        let learning_disabled: bool = p.cbt_learning_disabled.into();
418
        Params {
419
6
            use_estimates: !learning_disabled,
420
6
            min_observations: p.cbt_min_circs_for_estimate.get() as u16,
421
            significant_hop: 2,
422
6
            timeout_quantile: p.cbt_timeout_quantile.as_fraction(),
423
6
            abandon_quantile: p.cbt_abandon_quantile.as_fraction(),
424
6
            default_thresholds: (timeout, timeout),
425
6
            n_modes_for_xm: p.cbt_num_xm_modes.get() as usize,
426
6
            success_history_len: p.cbt_success_count.get() as usize,
427
6
            reset_after_timeouts: p.cbt_max_timeouts.get() as usize,
428
6
            min_timeout: p
429
6
                .cbt_min_timeout
430
6
                .try_into()
431
6
                .unwrap_or_else(|_| Duration::from_millis(10)),
432
        }
433
6
    }
434
}
435

            
436
/// Tor's default circuit build timeout estimator.
437
///
438
/// This object records a set of observed circuit build times, and
439
/// uses it to determine good values for how long we should allow
440
/// circuits to build.
441
///
442
/// For full details of the algorithms used, see
443
/// [`path-spec.txt`](https://gitlab.torproject.org/tpo/core/torspec/-/blob/master/path-spec.txt).
444
pub(crate) struct ParetoTimeoutEstimator {
445
    /// Our observations for circuit build times and success/failure
446
    /// history.
447
    history: History,
448

            
449
    /// Our most recent timeout estimate, if we have one that is
450
    /// up-to-date.
451
    ///
452
    /// (We reset this to None whenever we get a new observation.)
453
    timeouts: Option<(Duration, Duration)>,
454

            
455
    /// The timeouts that we use when we do not have sufficient observations
456
    /// to conclude anything about our circuit build times.
457
    ///
458
    /// These start out as `p.default_thresholds`, but can be adjusted
459
    /// depending on how many timeouts we've been seeing.
460
    fallback_timeouts: (Duration, Duration),
461

            
462
    /// A set of parameters to use in computing circuit build timeout
463
    /// estimates.
464
    p: Params,
465
}
466

            
467
impl Default for ParetoTimeoutEstimator {
468
6
    fn default() -> Self {
469
6
        Self::from_history(History::new_empty())
470
6
    }
471
}
472

            
473
/// An object used to serialize our timeout history for persistent state.
474
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
475
#[serde(default)]
476
pub(crate) struct ParetoTimeoutState {
477
    /// A version field used to help encoding and decoding.
478
    #[allow(dead_code)]
479
    version: usize,
480
    /// A record of observed timeouts, as returned by `sparse_histogram()`.
481
    histogram: Vec<(MsecDuration, u16)>,
482
    /// The current timeout estimate: kept for reference.
483
    current_timeout: Option<MsecDuration>,
484

            
485
    /// Fields from the state file that was used to make this `ParetoTimeoutState` that
486
    /// this version of Arti doesn't understand.
487
    #[serde(flatten)]
488
    unknown_fields: HashMap<String, JsonValue>,
489
}
490

            
491
impl ParetoTimeoutState {
492
    /// Return the latest base timeout estimate, as recorded in this state.
493
184
    pub(crate) fn latest_estimate(&self) -> Option<Duration> {
494
184
        self.current_timeout
495
186
            .map(|m| Duration::from_millis(m.0.into()))
496
184
    }
497
}
498

            
499
impl ParetoTimeoutEstimator {
500
    /// Construct a new ParetoTimeoutEstimator from the provided history
501
    /// object.
502
474
    fn from_history(history: History) -> Self {
503
474
        let p = Params::default();
504
474
        ParetoTimeoutEstimator {
505
474
            history,
506
474
            timeouts: None,
507
474
            fallback_timeouts: p.default_thresholds,
508
474
            p,
509
474
        }
510
474
    }
511

            
512
    /// Create a new ParetoTimeoutEstimator based on a loaded
513
    /// ParetoTimeoutState.
514
468
    pub(crate) fn from_state(state: ParetoTimeoutState) -> Self {
515
468
        let history = History::from_sparse_histogram(state.histogram.into_iter());
516
468
        Self::from_history(history)
517
468
    }
518

            
519
    /// Compute an unscaled basic pair of timeouts for a circuit of
520
    /// the "normal" length.
521
    ///
522
    /// Return a cached value if we have no observations since the
523
    /// last time this function was called.
524
758
    fn base_timeouts(&mut self) -> (Duration, Duration) {
525
758
        if let Some(x) = self.timeouts {
526
            // Great; we have a cached value.
527
14
            return x;
528
744
        }
529

            
530
744
        if self.history.n_times() < self.p.min_observations as usize {
531
            // We don't have enough values to estimate.
532
728
            return self.fallback_timeouts;
533
16
        }
534

            
535
        // Here we're going to compute the timeouts, cache them, and
536
        // return them.
537
16
        let dist = match self.history.pareto_estimate(self.p.n_modes_for_xm) {
538
14
            Some(dist) => dist,
539
            None => {
540
2
                return self.fallback_timeouts;
541
            }
542
        };
543
14
        let timeout_threshold = dist.quantile(self.p.timeout_quantile);
544
14
        let abandon_threshold = dist
545
14
            .quantile(self.p.abandon_quantile)
546
14
            .max(timeout_threshold);
547

            
548
14
        let timeouts = (
549
14
            Duration::from_secs_f64(timeout_threshold / 1000.0).max(self.p.min_timeout),
550
14
            Duration::from_secs_f64(abandon_threshold / 1000.0).max(self.p.min_timeout),
551
14
        );
552
14
        self.timeouts = Some(timeouts);
553

            
554
14
        timeouts
555
758
    }
556
}
557

            
558
impl super::TimeoutEstimator for ParetoTimeoutEstimator {
559
4
    fn update_params(&mut self, p: &NetParameters) {
560
4
        let parameters = p.into();
561
4
        self.p = parameters;
562
4
        let new_success_len = self.p.success_history_len;
563
4
        self.history.set_success_history_len(new_success_len);
564
4
    }
565

            
566
4440
    fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
567
4440
        if hop == self.p.significant_hop {
568
4440
            let time = MsecDuration::new_saturating(&delay);
569
4440
            self.history.add_time(time);
570
4440
            self.timeouts.take();
571
4440
        }
572
4440
        if is_last {
573
4440
            tracing::trace!(%hop, ?delay, "Circuit creation success");
574
4440
            self.history.add_success(true);
575
        }
576
4440
    }
577

            
578
78
    fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
579
        // Only record this timeout if we have seen some network activity since
580
        // we launched the circuit.
581
78
        let have_seen_recent_activity =
582
78
            if let Some(last_traffic) = tor_proto::time_since_last_incoming_traffic() {
583
                last_traffic < delay
584
            } else {
585
                // TODO: Is this the correct behavior in this case?
586
78
                true
587
            };
588

            
589
78
        tracing::trace!(%hop, ?delay, %have_seen_recent_activity, "Circuit timeout");
590

            
591
78
        if hop > 0 && have_seen_recent_activity {
592
78
            self.history.add_success(false);
593
78
            if self.history.n_recent_timeouts() > self.p.reset_after_timeouts {
594
4
                tracing::debug!("Multiple connections failed, resetting timeouts...");
595
4
                let base_timeouts = self.base_timeouts();
596
4
                self.history.clear();
597
4
                self.timeouts.take();
598
                // If we already had a timeout that was at least the
599
                // length of our fallback timeouts, we should double
600
                // those fallback timeouts, up to a maximum.
601
4
                if base_timeouts.0 >= self.fallback_timeouts.0 {
602
                    /// Largest value we'll allow a fallback timeout
603
                    /// (the one we return when we have insufficient data)
604
                    /// to reach.
605
                    ///
606
                    /// TODO: This is a ridiculous over-estimate.
607
                    const MAX_FALLBACK_TIMEOUT: Duration = Duration::from_secs(7200);
608
2
                    self.fallback_timeouts.0 =
609
2
                        (self.fallback_timeouts.0 * 2).min(MAX_FALLBACK_TIMEOUT);
610
2
                    self.fallback_timeouts.1 =
611
2
                        (self.fallback_timeouts.1 * 2).min(MAX_FALLBACK_TIMEOUT);
612
2
                }
613
74
            }
614
        }
615
78
    }
616

            
617
34
    fn timeouts(&mut self, action: &Action) -> (Duration, Duration) {
618
34
        let (base_t, base_a) = if self.p.use_estimates {
619
34
            self.base_timeouts()
620
        } else {
621
            // If we aren't using this estimator, then just return the
622
            // default thresholds from our parameters.
623
            return self.p.default_thresholds;
624
        };
625

            
626
34
        let reference_action = Action::BuildCircuit {
627
34
            length: self.p.significant_hop as usize + 1,
628
34
        };
629
34
        debug_assert!(reference_action.timeout_scale() > 0);
630

            
631
34
        let multiplier =
632
34
            (action.timeout_scale() as f64) / (reference_action.timeout_scale() as f64);
633

            
634
        // TODO-SPEC The spec doesn't define any of this
635
        // action-based-multiplier stuff.  Tor doesn't multiply the
636
        // abandon timeout.
637
        use super::mul_duration_f64_saturating as mul;
638
34
        (mul(base_t, multiplier), mul(base_a, multiplier))
639
34
    }
640

            
641
8
    fn learning_timeouts(&self) -> bool {
642
8
        self.p.use_estimates && self.history.n_times() < usize::from(self.p.min_observations)
643
8
    }
644

            
645
720
    fn build_state(&mut self) -> Option<ParetoTimeoutState> {
646
720
        let cur_timeout = MsecDuration::new_saturating(&self.base_timeouts().0);
647
720
        Some(ParetoTimeoutState {
648
720
            version: 1,
649
720
            histogram: self.history.sparse_histogram().collect(),
650
720
            current_timeout: Some(cur_timeout),
651
720
            unknown_fields: Default::default(),
652
720
        })
653
720
    }
654
}
655

            
656
/// A wrapper around `VecDeque<T>` that prevents more a certain number of entries from being inserted.
657
#[derive(Clone, Debug)]
658
struct BoundedDeque<T> {
659
    /// The underlying `VecDeque`.
660
    ///
661
    /// We could use a `SmallVec` or an array instead,
662
    /// but that would require reimplementing more of `VecDeque`.
663
    inner: VecDeque<T>,
664

            
665
    /// The maximum number of elements to permit.
666
    limit: usize,
667
}
668
impl<T> BoundedDeque<T> {
669
    /// Construct a new empty `BoundedDeque`, limited to `limit` entries.
670
972
    fn new(limit: usize) -> Self {
671
972
        Self {
672
972
            inner: VecDeque::with_capacity(limit),
673
972
            limit,
674
972
        }
675
972
    }
676

            
677
    /// Remove every entry from this `BoundedDeque`.
678
12
    fn clear(&mut self) {
679
12
        self.inner.clear();
680
12
    }
681

            
682
    /// Return the number of entries in this `BoundedDeque`.
683
14226
    fn len(&self) -> usize {
684
14226
        self.inner.len()
685
14226
    }
686

            
687
    /// Add a new entry to the back of this `BoundedDeque`.
688
    ///
689
    /// If the deque was at its limit, pop and return the entry at the front.
690
13448
    fn push_back(&mut self, item: T) -> Option<T> {
691
13448
        if self.limit == 0 {
692
            return None;
693
13448
        }
694
13448
        let removed = if self.len() == self.limit {
695
5072
            self.inner.pop_front()
696
        } else {
697
8376
            None
698
        };
699
13448
        self.inner.push_back(item);
700
13448
        removed
701
13448
    }
702

            
703
    /// Return an iterator over the entries in this `BoundedDeque`, from front to back.
704
108
    fn iter(&self) -> impl Iterator<Item = &T> {
705
108
        self.inner.iter()
706
108
    }
707

            
708
    /// Replace the maximum number of observations in this `BoundedDeque`.
709
    ///
710
    /// Unlike the equivalent method in the old BoundedVecDeque crate,
711
    /// if the new limit is smaller than the previous limit,
712
    /// this method will remove the _oldest_ items from the queue
713
    /// - that is, the ones from the front.
714
10
    fn set_max_len(&mut self, new_limit: usize) {
715
10
        if new_limit < self.limit {
716
4
            let n_to_drain = self.inner.len().saturating_sub(new_limit);
717
4
            self.inner.drain(0..n_to_drain);
718
4
            self.inner.shrink_to_fit();
719
6
        }
720
10
        self.limit = new_limit;
721
10
    }
722
}
723

            
724
#[cfg(test)]
725
mod test {
726
    // @@ begin test lint list maintained by maint/add_warning @@
727
    #![allow(clippy::bool_assert_comparison)]
728
    #![allow(clippy::clone_on_copy)]
729
    #![allow(clippy::dbg_macro)]
730
    #![allow(clippy::mixed_attributes_style)]
731
    #![allow(clippy::print_stderr)]
732
    #![allow(clippy::print_stdout)]
733
    #![allow(clippy::single_char_pattern)]
734
    #![allow(clippy::unwrap_used)]
735
    #![allow(clippy::unchecked_time_subtraction)]
736
    #![allow(clippy::useless_vec)]
737
    #![allow(clippy::needless_pass_by_value)]
738
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
739
    use super::*;
740
    use crate::timeouts::TimeoutEstimator;
741
    use tor_basic_utils::RngExt as _;
742
    use tor_basic_utils::test_rng::testing_rng;
743

            
744
    /// Return an action to build a 3-hop circuit.
745
    fn b3() -> Action {
746
        Action::BuildCircuit { length: 3 }
747
    }
748

            
749
    impl From<u32> for MsecDuration {
750
        fn from(v: u32) -> Self {
751
            Self(v)
752
        }
753
    }
754

            
755
    #[test]
756
    fn ms_partial_cmp() {
757
        #![allow(clippy::eq_op)]
758
        let myriad: MsecDuration = 10_000.into();
759
        let lakh: MsecDuration = 100_000.into();
760
        let crore: MsecDuration = 10_000_000.into();
761

            
762
        assert!(myriad < lakh);
763
        assert!(myriad == myriad);
764
        assert!(crore > lakh);
765
        assert!(crore >= crore);
766
        assert!(crore <= crore);
767
    }
768

            
769
    #[test]
770
    fn history_lowlev() {
771
        assert_eq!(History::bucket_center(1.into()), 5.into());
772
        assert_eq!(History::bucket_center(903.into()), 905.into());
773
        assert_eq!(History::bucket_center(0.into()), 5.into());
774
        assert_eq!(History::bucket_center(u32::MAX.into()), 4294967295.into());
775

            
776
        let mut h = History::new_empty();
777
        h.inc_bucket(7.into());
778
        h.inc_bucket(8.into());
779
        h.inc_bucket(9.into());
780
        h.inc_bucket(10.into());
781
        h.inc_bucket(11.into());
782
        h.inc_bucket(12.into());
783
        h.inc_bucket(13.into());
784
        h.inc_bucket(299.into());
785
        assert_eq!(h.time_histogram.get(&5.into()), Some(&3));
786
        assert_eq!(h.time_histogram.get(&15.into()), Some(&4));
787
        assert_eq!(h.time_histogram.get(&25.into()), None);
788
        assert_eq!(h.time_histogram.get(&295.into()), Some(&1));
789

            
790
        h.dec_bucket(299.into());
791
        h.dec_bucket(24.into());
792
        h.dec_bucket(12.into());
793

            
794
        assert_eq!(h.time_histogram.get(&15.into()), Some(&3));
795
        assert_eq!(h.time_histogram.get(&25.into()), None);
796
        assert_eq!(h.time_histogram.get(&295.into()), None);
797

            
798
        h.add_success(true);
799
        h.add_success(false);
800
        assert_eq!(h.success_history.len(), 2);
801

            
802
        h.clear();
803
        assert_eq!(h.time_histogram.len(), 0);
804
        assert_eq!(h.time_history.len(), 0);
805
        assert_eq!(h.success_history.len(), 0);
806
    }
807

            
808
    #[test]
809
    fn time_observation_management() {
810
        let mut h = History::new_empty();
811
        h.set_time_history_len(8); // to make it easier to overflow.
812

            
813
        h.add_time(300.into());
814
        h.add_time(500.into());
815
        h.add_time(542.into());
816
        h.add_time(305.into());
817
        h.add_time(543.into());
818
        h.add_time(307.into());
819

            
820
        assert_eq!(h.n_times(), 6);
821
        let v = h.n_most_frequent_bins(10);
822
        assert_eq!(&v[..], [(305.into(), 3), (545.into(), 2), (505.into(), 1)]);
823
        let v = h.n_most_frequent_bins(2);
824
        assert_eq!(&v[..], [(305.into(), 3), (545.into(), 2)]);
825

            
826
        let v: Vec<_> = h.sparse_histogram().collect();
827
        assert_eq!(&v[..], [(305.into(), 3), (505.into(), 1), (545.into(), 2)]);
828

            
829
        h.add_time(212.into());
830
        h.add_time(203.into());
831
        // now we replace the first couple of older elements.
832
        h.add_time(617.into());
833
        h.add_time(413.into());
834

            
835
        assert_eq!(h.n_times(), 8);
836

            
837
        let v: Vec<_> = h.sparse_histogram().collect();
838
        assert_eq!(
839
            &v[..],
840
            [
841
                (205.into(), 1),
842
                (215.into(), 1),
843
                (305.into(), 2),
844
                (415.into(), 1),
845
                (545.into(), 2),
846
                (615.into(), 1)
847
            ]
848
        );
849

            
850
        let h2 = History::from_sparse_histogram(v.clone().into_iter());
851
        let v2: Vec<_> = h2.sparse_histogram().collect();
852
        assert_eq!(v, v2);
853
    }
854

            
855
    #[test]
856
    fn success_observation_mechanism() {
857
        let mut h = History::new_empty();
858
        h.set_success_history_len(20);
859

            
860
        assert_eq!(h.n_recent_timeouts(), 0);
861
        h.add_success(true);
862
        assert_eq!(h.n_recent_timeouts(), 0);
863
        h.add_success(false);
864
        assert_eq!(h.n_recent_timeouts(), 1);
865
        for _ in 0..200 {
866
            h.add_success(false);
867
        }
868
        assert_eq!(h.n_recent_timeouts(), 20);
869
        h.add_success(true);
870
        h.add_success(true);
871
        h.add_success(true);
872
        assert_eq!(h.n_recent_timeouts(), 20 - 3);
873

            
874
        h.set_success_history_len(10);
875
        assert_eq!(h.n_recent_timeouts(), 10 - 3);
876
    }
877

            
878
    #[test]
879
    fn xm_calculation() {
880
        let mut h = History::new_empty();
881
        assert_eq!(h.estimate_xm(2), None);
882

            
883
        for n in &[300, 500, 542, 305, 543, 307, 212, 203, 617, 413] {
884
            h.add_time(MsecDuration(*n));
885
        }
886

            
887
        let v = h.n_most_frequent_bins(2);
888
        assert_eq!(&v[..], [(305.into(), 3), (545.into(), 2)]);
889
        let est = (305 * 3 + 545 * 2) / 5;
890
        assert_eq!(h.estimate_xm(2), Some(est));
891
        assert_eq!(est, 401);
892
    }
893

            
894
    #[test]
895
    fn pareto_estimate() {
896
        let mut h = History::new_empty();
897
        assert!(h.pareto_estimate(2).is_none());
898

            
899
        for n in &[300, 500, 542, 305, 543, 307, 212, 203, 617, 413] {
900
            h.add_time(MsecDuration(*n));
901
        }
902
        let expected_log_sum: f64 = [401, 500, 542, 401, 543, 401, 401, 401, 617, 413]
903
            .iter()
904
            .map(|x| f64::from(*x).ln())
905
            .sum();
906
        let expected_log_xm: f64 = (401_f64).ln() * 10.0;
907
        let expected_alpha = 10.0 / (expected_log_sum - expected_log_xm);
908
        let expected_inv_alpha = 1.0 / expected_alpha;
909

            
910
        let p = h.pareto_estimate(2).unwrap();
911

            
912
        // We can't do "eq" with floats, so we'll do "very close".
913
        assert!((401.0 - p.x_m).abs() < 1.0e-9);
914
        assert!((expected_inv_alpha - p.inv_alpha).abs() < 1.0e-9);
915

            
916
        let q60 = p.quantile(0.60);
917
        let q99 = p.quantile(0.99);
918

            
919
        assert!((q60 - 451.127) < 0.001);
920
        assert!((q99 - 724.841) < 0.001);
921
    }
922

            
923
    #[test]
924
    fn pareto_estimate_timeout() {
925
        let mut est = ParetoTimeoutEstimator::default();
926

            
927
        assert_eq!(
928
            est.timeouts(&b3()),
929
            (Duration::from_secs(60), Duration::from_secs(60))
930
        );
931
        // Set the parameters up to mimic the situation in
932
        // `pareto_estimate` above.
933
        est.p.min_observations = 0;
934
        est.p.n_modes_for_xm = 2;
935
        assert_eq!(
936
            est.timeouts(&b3()),
937
            (Duration::from_secs(60), Duration::from_secs(60))
938
        );
939

            
940
        for msec in &[300, 500, 542, 305, 543, 307, 212, 203, 617, 413] {
941
            let d = Duration::from_millis(*msec);
942
            est.note_hop_completed(2, d, true);
943
        }
944

            
945
        let t = est.timeouts(&b3());
946
        assert_eq!(t.0.as_micros(), 493_169);
947
        assert_eq!(t.1.as_micros(), 724_841);
948

            
949
        let t2 = est.timeouts(&b3());
950
        assert_eq!(t2, t);
951

            
952
        let t2 = est.timeouts(&Action::BuildCircuit { length: 4 });
953
        assert_eq!(t2.0, t.0.mul_f64(10.0 / 6.0));
954
        assert_eq!(t2.1, t.1.mul_f64(10.0 / 6.0));
955
    }
956

            
957
    #[test]
958
    fn pareto_estimate_clear() {
959
        let mut est = ParetoTimeoutEstimator::default();
960

            
961
        // Set the parameters up to mimic the situation in
962
        // `pareto_estimate` above.
963
        let params = NetParameters::from_map(&"cbtmincircs=1 cbtnummodes=2".parse().unwrap());
964
        est.update_params(&params);
965

            
966
        assert_eq!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
967
        assert!(est.learning_timeouts());
968

            
969
        for msec in &[300, 500, 542, 305, 543, 307, 212, 203, 617, 413] {
970
            let d = Duration::from_millis(*msec);
971
            est.note_hop_completed(2, d, true);
972
        }
973
        assert_ne!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
974
        assert!(!est.learning_timeouts());
975
        assert_eq!(est.history.n_recent_timeouts(), 0);
976

            
977
        // 17 timeouts happen and we're still getting real numbers...
978
        for _ in 0..18 {
979
            est.note_circ_timeout(2, Duration::from_secs(2000));
980
        }
981
        assert_ne!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
982

            
983
        // ... but 18 means "reset".
984
        est.note_circ_timeout(2, Duration::from_secs(2000));
985
        assert_eq!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
986

            
987
        // And if we fail 18 bunch more times, it doubles.
988
        for _ in 0..20 {
989
            est.note_circ_timeout(2, Duration::from_secs(2000));
990
        }
991
        assert_eq!(est.timeouts(&b3()).0.as_micros(), 120_000_000);
992
    }
993

            
994
    #[test]
995
    fn default_params() {
996
        let p1 = Params::default();
997
        let p2 = Params::from(&tor_netdir::params::NetParameters::default());
998
        // discount version of derive(eq)
999
        assert_eq!(format!("{:?}", p1), format!("{:?}", p2));
    }
    #[test]
    fn state_conversion() {
        // We have tests elsewhere for converting to and from
        // histograms, so all we really need to ddo here is make sure
        // that the histogram conversion happens.
        let mut est = ParetoTimeoutEstimator::default();
        let mut rng = testing_rng();
        for _ in 0..1000 {
            let d = Duration::from_millis(rng.gen_range_checked(10..3_000).unwrap());
            est.note_hop_completed(2, d, true);
        }
        let state = est.build_state().unwrap();
        assert_eq!(state.version, 1);
        assert!(state.current_timeout.is_some());
        let mut est2 = ParetoTimeoutEstimator::from_state(state);
        let act = Action::BuildCircuit { length: 3 };
        // This isn't going to be exact, since we're recording histogram bins
        // instead of exact timeouts.
        let ms1 = est.timeouts(&act).0.as_millis() as i32;
        let ms2 = est2.timeouts(&act).0.as_millis() as i32;
        assert!((ms1 - ms2).abs() < 50);
    }
    #[test]
    fn validate_iterator_choose_multiple() {
        // The documentation for IteratorRandom::choose_multiple says that it
        // returns fewer than N elements if the iterators has fewer than N elements.
        // But rand has changed behavior in the past, so let's make sure this doesn't
        // change in the future.
        use rand::seq::IteratorRandom as _;
        let mut rng = testing_rng();
        let mut ten_elements = (1..=10).choose_multiple(&mut rng, 100);
        ten_elements.sort();
        assert_eq!(ten_elements.len(), 10);
        assert_eq!(ten_elements, (1..=10).collect::<Vec<_>>());
    }
    // TODO: add tests from Tor.
}