1
//! Facilities to build circuits directly, instead of via a circuit manager.
2

            
3
use crate::path::{OwnedPath, TorPath};
4
use crate::timeouts::{self, Action};
5
use crate::{Error, Result};
6
use async_trait::async_trait;
7
use futures::Future;
8
use oneshot_fused_workaround as oneshot;
9
use std::sync::{
10
    Arc,
11
    atomic::{AtomicU32, Ordering},
12
};
13
use std::time::{Duration, Instant};
14
use tor_chanmgr::{ChanMgr, ChanProvenance, ChannelUsage};
15
use tor_error::into_internal;
16
use tor_guardmgr::GuardStatus;
17
use tor_linkspec::{IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget};
18
use tor_netdir::params::NetParameters;
19
use tor_proto::ccparams::{self, AlgorithmType};
20
use tor_proto::client::circuit::{CircParameters, PendingClientTunnel};
21
use tor_proto::{CellCount, ClientTunnel, FlowCtrlParameters};
22
use tor_rtcompat::SpawnExt;
23
use tor_rtcompat::{Runtime, SleepProviderExt};
24
use tor_units::Percentage;
25
use tracing::instrument;
26

            
27
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
28
use tor_guardmgr::vanguards::VanguardMgr;
29

            
30
mod guardstatus;
31

            
32
pub(crate) use guardstatus::GuardStatusHandle;
33

            
34
/// Represents an objects that can be constructed in a circuit-like way.
35
///
36
/// This is only a separate trait for testing purposes, so that we can swap
37
/// our some other type when we're testing Builder.
38
///
39
/// TODO: I'd like to have a simpler testing strategy here; this one
40
/// complicates things a bit.
41
#[async_trait]
42
pub(crate) trait Buildable: Sized {
43
    /// Our equivalent to a tor_proto::Channel.
44
    type Chan: Send + Sync;
45

            
46
    /// Use a channel manager to open a new channel (or find an existing channel)
47
    /// to a provided [`OwnedChanTarget`].
48
    async fn open_channel<RT: Runtime>(
49
        chanmgr: &ChanMgr<RT>,
50
        ct: &OwnedChanTarget,
51
        guard_status: &GuardStatusHandle,
52
        usage: ChannelUsage,
53
    ) -> Result<Arc<Self::Chan>>;
54

            
55
    /// Launch a new one-hop circuit to a given relay, given only a
56
    /// channel target `ct` specifying that relay.
57
    ///
58
    /// (Since we don't have a CircTarget here, we can't extend the circuit
59
    /// to be multihop later on.)
60
    async fn create_chantarget<RT: Runtime>(
61
        chan: Arc<Self::Chan>,
62
        rt: &RT,
63
        ct: &OwnedChanTarget,
64
        params: CircParameters,
65
        timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
66
    ) -> Result<Self>;
67

            
68
    /// Launch a new circuit through a given relay, given a circuit target
69
    /// `ct` specifying that relay.
70
    async fn create<RT: Runtime>(
71
        chan: Arc<Self::Chan>,
72
        rt: &RT,
73
        ct: &OwnedCircTarget,
74
        params: CircParameters,
75
        timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
76
    ) -> Result<Self>;
77

            
78
    /// Extend this circuit-like object by one hop, to the location described
79
    /// in `ct`.
80
    async fn extend<RT: Runtime>(
81
        &self,
82
        rt: &RT,
83
        ct: &OwnedCircTarget,
84
        params: CircParameters,
85
    ) -> Result<()>;
86
}
87

            
88
/// Try to make a [`PendingClientTunnel`] to a given relay, and start its
89
/// reactor.
90
///
91
/// This is common code, shared by all the first-hop functions in the
92
/// implementation of `Buildable` for `ClientTunnel`.
93
#[instrument(level = "trace", skip_all)]
94
async fn create_common<RT: Runtime>(
95
    chan: Arc<tor_proto::channel::Channel>,
96
    timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
97
    rt: &RT,
98
) -> Result<PendingClientTunnel> {
99
    // Construct the (zero-hop) circuit.
100
    let (pending_tunnel, reactor) =
101
        chan.new_tunnel(timeouts)
102
            .await
103
            .map_err(|error| Error::Protocol {
104
                error,
105
                peer: None, // we don't blame the peer, because new_tunnel() does no networking.
106
                action: "initializing circuit",
107
                unique_id: None,
108
            })?;
109

            
110
    tracing::debug!("Spawning reactor...");
111

            
112
    rt.spawn(async {
113
        let _ = reactor.run().await;
114
    })
115
    .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
116

            
117
    Ok(pending_tunnel)
118
}
119

            
120
#[async_trait]
121
impl Buildable for ClientTunnel {
122
    type Chan = tor_proto::channel::Channel;
123

            
124
    #[instrument(level = "trace", skip_all)]
125
    async fn open_channel<RT: Runtime>(
126
        chanmgr: &ChanMgr<RT>,
127
        target: &OwnedChanTarget,
128
        guard_status: &GuardStatusHandle,
129
        usage: ChannelUsage,
130
    ) -> Result<Arc<Self::Chan>> {
131
        // If we fail now, it's the guard's fault.
132
        guard_status.pending(GuardStatus::Failure);
133

            
134
        // Get or construct the channel.
135
        let result = chanmgr.get_or_launch(target, usage).await;
136

            
137
        // Report the clock skew if appropriate, and exit if there has been an error.
138
        match result {
139
            Ok((chan, ChanProvenance::NewlyCreated)) => {
140
                guard_status.skew(chan.clock_skew());
141
                Ok(chan)
142
            }
143
            Ok((chan, _)) => Ok(chan),
144
            Err(cause) => {
145
                if let Some(skew) = cause.clock_skew() {
146
                    guard_status.skew(skew);
147
                }
148
                Err(Error::Channel {
149
                    peer: target.to_logged(),
150
                    cause,
151
                })
152
            }
153
        }
154
    }
155

            
156
    #[instrument(level = "trace", skip_all)]
157
    async fn create_chantarget<RT: Runtime>(
158
        chan: Arc<Self::Chan>,
159
        rt: &RT,
160
        ct: &OwnedChanTarget,
161
        params: CircParameters,
162
        timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
163
    ) -> Result<Self> {
164
        let pending_tunnel = create_common(chan, timeouts, rt).await?;
165
        let unique_id = Some(pending_tunnel.peek_unique_id());
166
        pending_tunnel
167
            .create_firsthop_fast(params)
168
            .await
169
            .map_err(|error| Error::Protocol {
170
                peer: Some(ct.to_logged()),
171
                error,
172
                action: "running CREATE_FAST handshake",
173
                unique_id,
174
            })
175
    }
176
    #[instrument(level = "trace", skip_all)]
177
    async fn create<RT: Runtime>(
178
        chan: Arc<Self::Chan>,
179
        rt: &RT,
180
        ct: &OwnedCircTarget,
181
        params: CircParameters,
182
        timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
183
    ) -> Result<Self> {
184
        let pending_tunnel = create_common(chan, timeouts, rt).await?;
185
        let unique_id = Some(pending_tunnel.peek_unique_id());
186

            
187
        let handshake_res = pending_tunnel.create_firsthop(ct, params).await;
188
        handshake_res.map_err(|error| Error::Protocol {
189
            peer: Some(ct.to_logged()),
190
            error,
191
            action: "creating first hop",
192
            unique_id,
193
        })
194
    }
195
    async fn extend<RT: Runtime>(
196
        &self,
197
        _rt: &RT,
198
        ct: &OwnedCircTarget,
199
        params: CircParameters,
200
    ) -> Result<()> {
201
        let circ = self.as_single_circ().map_err(|error| Error::Protocol {
202
            peer: Some(ct.to_logged()),
203
            error,
204
            action: "extend tunnel",
205
            unique_id: Some(self.unique_id()),
206
        })?;
207

            
208
        let res = circ.extend(ct, params).await;
209
        res.map_err(|error| Error::Protocol {
210
            error,
211
            // We can't know who caused the error, since it may have been
212
            // the hop we were extending from, or the hop we were extending
213
            // to.
214
            peer: None,
215
            action: "extending circuit",
216
            unique_id: Some(self.unique_id()),
217
        })
218
    }
219
}
220

            
221
/// An implementation type for [`TunnelBuilder`].
222
///
223
/// A `TunnelBuilder` holds references to all the objects that are needed
224
/// to build circuits correctly.
225
///
226
/// In general, you should not need to construct or use this object yourself,
227
/// unless you are choosing your own paths.
228
struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
229
    /// The runtime used by this circuit builder.
230
    runtime: R,
231
    /// A channel manager that this circuit builder uses to make channels.
232
    chanmgr: Arc<ChanMgr<R>>,
233
    /// An estimator to determine the correct timeouts for circuit building.
234
    timeouts: Arc<timeouts::Estimator>,
235
    /// We don't actually hold any clientcircs, so we need to put this
236
    /// type here so the compiler won't freak out.
237
    _phantom: std::marker::PhantomData<C>,
238
}
239

            
240
impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
241
    /// Construct a new [`Builder`].
242
58
    fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
243
58
        Builder {
244
58
            runtime,
245
58
            chanmgr,
246
58
            timeouts: Arc::new(timeouts),
247
58
            _phantom: std::marker::PhantomData,
248
58
        }
249
58
    }
250

            
251
    /// Build a circuit, without performing any timeout operations.
252
    ///
253
    /// After each hop is built, increments n_hops_built.  Make sure that
254
    /// `guard_status` has its pending status set correctly to correspond
255
    /// to a circuit failure at any given stage.
256
    ///
257
    /// Requires that `channel` is a channel to the first hop of `path`.
258
    ///
259
    /// (TODO: Find
260
    /// a better design there.)
261
    #[instrument(level = "trace", skip_all)]
262
16
    async fn build_notimeout(
263
16
        self: Arc<Self>,
264
16
        path: OwnedPath,
265
16
        channel: Arc<C::Chan>,
266
16
        params: CircParameters,
267
16
        start_time: Instant,
268
16
        n_hops_built: Arc<AtomicU32>,
269
16
        guard_status: Arc<GuardStatusHandle>,
270
16
    ) -> Result<C> {
271
16
        match path {
272
            OwnedPath::ChannelOnly(target) => {
273
                let timeouts = Arc::clone(&self.timeouts);
274
                let circ =
275
                    C::create_chantarget(channel, &self.runtime, &target, params, timeouts).await?;
276
                self.timeouts
277
                    .note_hop_completed(0, self.runtime.now() - start_time, true);
278
                n_hops_built.fetch_add(1, Ordering::SeqCst);
279
                Ok(circ)
280
            }
281
            OwnedPath::Normal(p) => {
282
                assert!(!p.is_empty());
283
                let n_hops = p.len() as u8;
284
                let timeouts = Arc::clone(&self.timeouts);
285
                // Each hop has its own circ parameters. This is for the first hop (CREATE).
286
                let circ =
287
                    C::create(channel, &self.runtime, &p[0], params.clone(), timeouts).await?;
288
                self.timeouts
289
                    .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
290
                // If we fail after this point, we can't tell whether it's
291
                // the fault of the guard or some later relay.
292
                guard_status.pending(GuardStatus::Indeterminate);
293
                n_hops_built.fetch_add(1, Ordering::SeqCst);
294
                let mut hop_num = 1;
295
                for relay in p[1..].iter() {
296
                    // Get the params per subsequent hop (EXTEND).
297
                    circ.extend(&self.runtime, relay, params.clone()).await?;
298
                    n_hops_built.fetch_add(1, Ordering::SeqCst);
299
                    self.timeouts.note_hop_completed(
300
                        hop_num,
301
                        self.runtime.now() - start_time,
302
                        hop_num == (n_hops - 1),
303
                    );
304
                    hop_num += 1;
305
                }
306
                Ok(circ)
307
            }
308
        }
309
12
    }
310

            
311
    /// Build a circuit from an [`OwnedPath`].
312
    #[instrument(level = "trace", skip_all)]
313
16
    async fn build_owned(
314
16
        self: &Arc<Self>,
315
16
        path: OwnedPath,
316
16
        params: &CircParameters,
317
16
        guard_status: Arc<GuardStatusHandle>,
318
16
        usage: ChannelUsage,
319
16
    ) -> Result<C> {
320
        let action = Action::BuildCircuit { length: path.len() };
321
        let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
322

            
323
        // TODO: This is probably not the best way for build_notimeout to
324
        // tell us how many hops it managed to build, but at least it is
325
        // isolated here.
326
        let hops_built = Arc::new(AtomicU32::new(0));
327

            
328
        let self_clone = Arc::clone(self);
329
        let params = params.clone();
330

            
331
        // We open the channel separately from the rest of the circuit, since we don't want to count
332
        // it towards the circuit timeout.
333
        //
334
        // We don't need a separate timeout here, since ChanMgr already implements its own timeouts.
335
        let channel = C::open_channel(
336
            &self.chanmgr,
337
            path.first_hop_as_chantarget(),
338
            guard_status.as_ref(),
339
            usage,
340
        )
341
        .await?;
342

            
343
        let start_time = self.runtime.now();
344

            
345
        let circuit_future = self_clone.build_notimeout(
346
            path,
347
            channel,
348
            params,
349
            start_time,
350
            Arc::clone(&hops_built),
351
            guard_status,
352
        );
353

            
354
        match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
355
            Ok(circuit) => Ok(circuit),
356
            Err(Error::CircTimeout(unique_id)) => {
357
                let n_built = hops_built.load(Ordering::SeqCst);
358
                self.timeouts
359
                    .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
360
                Err(Error::CircTimeout(unique_id))
361
            }
362
            Err(e) => Err(e),
363
        }
364
16
    }
365

            
366
    /// Return a reference to this Builder runtime.
367
    pub(crate) fn runtime(&self) -> &R {
368
        &self.runtime
369
    }
370

            
371
    /// Return a reference to this Builder's timeout estimator.
372
    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
373
        &self.timeouts
374
    }
375
}
376

            
377
/// A factory object to build circuits.
378
///
379
/// A `TunnelBuilder` holds references to all the objects that are needed
380
/// to build circuits correctly.
381
///
382
/// In general, you should not need to construct or use this object yourself,
383
/// unless you are choosing your own paths.
384
pub struct TunnelBuilder<R: Runtime> {
385
    /// The underlying [`Builder`] object
386
    builder: Arc<Builder<R, ClientTunnel>>,
387
    /// Configuration for how to choose paths for circuits.
388
    path_config: tor_config::MutCfg<crate::PathConfig>,
389
    /// State-manager object to use in storing current state.
390
    storage: crate::TimeoutStateHandle,
391
    /// Guard manager to tell us which guards nodes to use for the circuits
392
    /// we build.
393
    guardmgr: tor_guardmgr::GuardMgr<R>,
394
    /// The vanguard manager object used for HS circuits.
395
    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
396
    vanguardmgr: Arc<VanguardMgr<R>>,
397
}
398

            
399
impl<R: Runtime> TunnelBuilder<R> {
400
    /// Construct a new [`TunnelBuilder`].
401
    // TODO: eventually I'd like to make this a public function, but
402
    // TimeoutStateHandle is private.
403
42
    pub(crate) fn new(
404
42
        runtime: R,
405
42
        chanmgr: Arc<ChanMgr<R>>,
406
42
        path_config: crate::PathConfig,
407
42
        storage: crate::TimeoutStateHandle,
408
42
        guardmgr: tor_guardmgr::GuardMgr<R>,
409
42
        #[cfg(all(feature = "vanguards", feature = "hs-common"))] vanguardmgr: VanguardMgr<R>,
410
42
    ) -> Self {
411
42
        let timeouts = timeouts::Estimator::from_storage(&storage);
412

            
413
42
        TunnelBuilder {
414
42
            builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
415
42
            path_config: path_config.into(),
416
42
            storage,
417
42
            guardmgr,
418
42
            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
419
42
            vanguardmgr: Arc::new(vanguardmgr),
420
42
        }
421
42
    }
422

            
423
    /// Return this builder's [`PathConfig`](crate::PathConfig).
424
8
    pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
425
8
        self.path_config.get()
426
8
    }
427

            
428
    /// Replace this builder's [`PathConfig`](crate::PathConfig).
429
4
    pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
430
4
        self.path_config.replace(new_config);
431
4
    }
432

            
433
    /// Flush state to the state manager if we own the lock.
434
    ///
435
    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
436
46
    pub(crate) fn save_state(&self) -> Result<bool> {
437
46
        if !self.storage.can_store() {
438
12
            return Ok(false);
439
34
        }
440
        // TODO: someday we'll want to only do this if there is something
441
        // changed.
442
34
        self.builder.timeouts.save_state(&self.storage)?;
443
34
        self.guardmgr.store_persistent_state()?;
444
34
        Ok(true)
445
46
    }
446

            
447
    /// Replace our state with a new owning state, assuming we have
448
    /// storage permission.
449
    pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
450
        self.builder
451
            .timeouts
452
            .upgrade_to_owning_storage(&self.storage);
453
        self.guardmgr.upgrade_to_owned_persistent_state()?;
454
        Ok(())
455
    }
456

            
457
    /// Reload persistent state from disk, if we don't have storage permission.
458
    #[instrument(level = "trace", skip_all)]
459
    pub(crate) fn reload_state(&self) -> Result<()> {
460
        if !self.storage.can_store() {
461
            self.builder
462
                .timeouts
463
                .reload_readonly_from_storage(&self.storage);
464
        }
465
        self.guardmgr.reload_persistent_state()?;
466
        Ok(())
467
    }
468

            
469
    /// Reconfigure this builder using the latest set of network parameters.
470
    ///
471
    /// (NOTE: for now, this only affects circuit timeout estimation.)
472
    pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
473
        self.builder.timeouts.update_params(p);
474
    }
475

            
476
    /// Like `build`, but construct a new circuit from an [`OwnedPath`].
477
    #[instrument(level = "trace", skip_all)]
478
    pub(crate) async fn build_owned(
479
        &self,
480
        path: OwnedPath,
481
        params: &CircParameters,
482
        guard_status: Arc<GuardStatusHandle>,
483
        usage: ChannelUsage,
484
    ) -> Result<ClientTunnel> {
485
        self.builder
486
            .build_owned(path, params, guard_status, usage)
487
            .await
488
    }
489

            
490
    /// Try to construct a new circuit from a given path, using appropriate
491
    /// timeouts.
492
    ///
493
    /// This circuit is _not_ automatically registered with any
494
    /// circuit manager; if you don't hang on it it, it will
495
    /// automatically go away when the last reference is dropped.
496
    #[instrument(level = "trace", skip_all)]
497
    pub async fn build(
498
        &self,
499
        path: &TorPath<'_>,
500
        params: &CircParameters,
501
        usage: ChannelUsage,
502
    ) -> Result<ClientTunnel> {
503
        let owned = path.try_into()?;
504
        self.build_owned(owned, params, Arc::new(None.into()), usage)
505
            .await
506
    }
507

            
508
    /// Return true if this builder is currently learning timeout info.
509
    pub(crate) fn learning_timeouts(&self) -> bool {
510
        self.builder.timeouts.learning_timeouts()
511
    }
512

            
513
    /// Return a reference to this builder's `GuardMgr`.
514
48
    pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
515
48
        &self.guardmgr
516
48
    }
517

            
518
    /// Return a reference to this builder's `VanguardMgr`.
519
    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
520
38
    pub(crate) fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>> {
521
38
        &self.vanguardmgr
522
38
    }
523

            
524
    /// Return a reference to this builder's runtime
525
    pub(crate) fn runtime(&self) -> &R {
526
        self.builder.runtime()
527
    }
528

            
529
    /// Return a reference to this builder's timeout estimator.
530
    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
531
        self.builder.estimator()
532
    }
533
}
534

            
535
/// Return the congestion control Vegas algorithm using the given network parameters.
536
#[cfg(feature = "flowctl-cc")]
537
22
fn build_cc_vegas(
538
22
    inp: &NetParameters,
539
22
    vegas_queue_params: ccparams::VegasQueueParams,
540
22
) -> ccparams::Algorithm {
541
22
    ccparams::Algorithm::Vegas(
542
22
        ccparams::VegasParamsBuilder::default()
543
22
            .cell_in_queue_params(vegas_queue_params)
544
22
            .ss_cwnd_max(inp.cc_ss_max.into())
545
22
            .cwnd_full_gap(inp.cc_cwnd_full_gap.into())
546
22
            .cwnd_full_min_pct(Percentage::new(
547
22
                inp.cc_cwnd_full_minpct.as_percent().get() as u32
548
22
            ))
549
22
            .cwnd_full_per_cwnd(inp.cc_cwnd_full_per_cwnd.into())
550
22
            .build()
551
22
            .expect("Unable to build Vegas params from NetParams"),
552
22
    )
553
22
}
554

            
555
/// Return the congestion control FixedWindow algorithm using the given network parameters.
556
fn build_cc_fixedwindow(inp: &NetParameters) -> ccparams::Algorithm {
557
    ccparams::Algorithm::FixedWindow(build_cc_fixedwindow_params(inp))
558
}
559

            
560
/// Return the parameters for the congestion control FixedWindow algorithm
561
/// using the given network parameters.
562
22
fn build_cc_fixedwindow_params(inp: &NetParameters) -> ccparams::FixedWindowParams {
563
22
    ccparams::FixedWindowParamsBuilder::default()
564
22
        .circ_window_start(inp.circuit_window.get() as u16)
565
22
        .circ_window_min(inp.circuit_window.lower() as u16)
566
22
        .circ_window_max(inp.circuit_window.upper() as u16)
567
22
        .build()
568
22
        .expect("Unable to build FixedWindow params from NetParams")
569
22
}
570

            
571
/// Return a new circuit parameter struct using the given network parameters and algorithm to use.
572
22
fn circparameters_from_netparameters(
573
22
    inp: &NetParameters,
574
22
    alg: ccparams::Algorithm,
575
22
) -> Result<CircParameters> {
576
22
    let cwnd_params = ccparams::CongestionWindowParamsBuilder::default()
577
22
        .cwnd_init(inp.cc_cwnd_init.into())
578
22
        .cwnd_inc_pct_ss(Percentage::new(
579
22
            inp.cc_cwnd_inc_pct_ss.as_percent().get() as u32
580
22
        ))
581
22
        .cwnd_inc(inp.cc_cwnd_inc.into())
582
22
        .cwnd_inc_rate(inp.cc_cwnd_inc_rate.into())
583
22
        .cwnd_min(inp.cc_cwnd_min.into())
584
22
        .cwnd_max(inp.cc_cwnd_max.into())
585
22
        .sendme_inc(inp.cc_sendme_inc.into())
586
22
        .build()
587
22
        .map_err(into_internal!(
588
            "Unable to build CongestionWindow params from NetParams"
589
        ))?;
590
22
    let rtt_params = ccparams::RoundTripEstimatorParamsBuilder::default()
591
22
        .ewma_cwnd_pct(Percentage::new(
592
22
            inp.cc_ewma_cwnd_pct.as_percent().get() as u32
593
22
        ))
594
22
        .ewma_max(inp.cc_ewma_max.into())
595
22
        .ewma_ss_max(inp.cc_ewma_ss.into())
596
22
        .rtt_reset_pct(Percentage::new(
597
22
            inp.cc_rtt_reset_pct.as_percent().get() as u32
598
22
        ))
599
22
        .build()
600
22
        .map_err(into_internal!("Unable to build RTT params from NetParams"))?;
601
22
    let ccontrol = ccparams::CongestionControlParamsBuilder::default()
602
22
        .alg(alg)
603
22
        .fixed_window_params(build_cc_fixedwindow_params(inp))
604
22
        .cwnd_params(cwnd_params)
605
22
        .rtt_params(rtt_params)
606
22
        .build()
607
22
        .map_err(into_internal!(
608
            "Unable to build CongestionControl params from NetParams"
609
        ))?;
610
22
    let flow_ctrl_params = FlowCtrlParameters {
611
22
        cc_xoff_client: CellCount::new(inp.cc_xoff_client.get_u32()),
612
22
        cc_xoff_exit: CellCount::new(inp.cc_xoff_exit.get_u32()),
613
22
        cc_xon_rate: CellCount::new(inp.cc_xon_rate.get_u32()),
614
22
        cc_xon_change_pct: inp.cc_xon_change_pct.get_u32(),
615
22
        cc_xon_ewma_cnt: inp.cc_xon_ewma_cnt.get_u32(),
616
22
    };
617
22
    Ok(CircParameters::new(
618
22
        inp.extend_by_ed25519_id.into(),
619
22
        ccontrol,
620
22
        flow_ctrl_params,
621
22
    ))
622
22
}
623

            
624
/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an exit circuit or
625
/// single onion service (when implemented).
626
22
pub fn exit_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
627
22
    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
628
        #[cfg(feature = "flowctl-cc")]
629
22
        AlgorithmType::VEGAS => build_cc_vegas(
630
22
            inp,
631
22
            (
632
22
                inp.cc_vegas_alpha_exit.into(),
633
22
                inp.cc_vegas_beta_exit.into(),
634
22
                inp.cc_vegas_delta_exit.into(),
635
22
                inp.cc_vegas_gamma_exit.into(),
636
22
                inp.cc_vegas_sscap_exit.into(),
637
22
            )
638
22
                .into(),
639
        ),
640
        // Unrecognized, fallback to fixed window as in SENDME v0.
641
        _ => build_cc_fixedwindow(inp),
642
    };
643
22
    circparameters_from_netparameters(inp, alg)
644
22
}
645

            
646
/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an onion circuit
647
/// which also includes an onion service with Vanguard.
648
pub fn onion_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
649
    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
650
        #[cfg(feature = "flowctl-cc")]
651
        AlgorithmType::VEGAS => {
652
            // NOTE: At the time of writing, we don't yet support cc negotiation for onion services.
653
            // See `HopSettings::onion_circparams_from_netparams()` where we use a fallback
654
            // algorithm for HsV3 circuits instead, and see arti#2037.
655
            build_cc_vegas(
656
                inp,
657
                (
658
                    inp.cc_vegas_alpha_onion.into(),
659
                    inp.cc_vegas_beta_onion.into(),
660
                    inp.cc_vegas_delta_onion.into(),
661
                    inp.cc_vegas_gamma_onion.into(),
662
                    inp.cc_vegas_sscap_onion.into(),
663
                )
664
                    .into(),
665
            )
666
        }
667
        // Unrecognized, fallback to fixed window as in SENDME v0.
668
        _ => build_cc_fixedwindow(inp),
669
    };
670
    circparameters_from_netparameters(inp, alg)
671
}
672

            
673
/// Helper function: spawn a future as a background task, and run it with
674
/// two separate timeouts.
675
///
676
/// If the future does not complete by `timeout`, then return a
677
/// timeout error immediately, but keep running the future in the
678
/// background.
679
///
680
/// If the future does not complete by `abandon`, then abandon the
681
/// future completely.
682
32
async fn double_timeout<R, F, T>(
683
32
    runtime: &R,
684
32
    fut: F,
685
32
    timeout: Duration,
686
32
    abandon: Duration,
687
32
) -> Result<T>
688
32
where
689
32
    R: Runtime,
690
32
    F: Future<Output = Result<T>> + Send + 'static,
691
32
    T: Send + 'static,
692
32
{
693
32
    let (snd, rcv) = oneshot::channel();
694
32
    let rt = runtime.clone();
695
    // We create these futures now, since we want them to look at the current
696
    // time when they decide when to expire.
697
32
    let inner_timeout_future = rt.timeout(abandon, fut);
698
32
    let outer_timeout_future = rt.timeout(timeout, rcv);
699

            
700
32
    runtime
701
32
        .spawn(async move {
702
32
            let result = inner_timeout_future.await;
703
32
            let _ignore_cancelled_error = snd.send(result);
704
32
        })
705
32
        .map_err(|e| Error::from_spawn("circuit construction task", e))?;
706

            
707
32
    let outcome = outer_timeout_future.await;
708
    // 4 layers of error to collapse:
709
    //     One from the receiver being cancelled.
710
    //     One from the outer timeout.
711
    //     One from the inner timeout.
712
    //     One from the actual future's result.
713
    //
714
    // (Technically, we could refrain from unwrapping the future's result,
715
    // but doing it this way helps make it more certain that we really are
716
    // collapsing all the layers into one.)
717
32
    outcome
718
32
        .map_err(|_| Error::CircTimeout(None))??
719
16
        .map_err(|_| Error::CircTimeout(None))?
720
32
}
721

            
722
#[cfg(test)]
723
mod test {
724
    // @@ begin test lint list maintained by maint/add_warning @@
725
    #![allow(clippy::bool_assert_comparison)]
726
    #![allow(clippy::clone_on_copy)]
727
    #![allow(clippy::dbg_macro)]
728
    #![allow(clippy::mixed_attributes_style)]
729
    #![allow(clippy::print_stderr)]
730
    #![allow(clippy::print_stdout)]
731
    #![allow(clippy::single_char_pattern)]
732
    #![allow(clippy::unwrap_used)]
733
    #![allow(clippy::unchecked_time_subtraction)]
734
    #![allow(clippy::useless_vec)]
735
    #![allow(clippy::needless_pass_by_value)]
736
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
737
    use super::*;
738
    use crate::timeouts::TimeoutEstimator;
739
    use futures::FutureExt;
740
    use std::sync::Mutex;
741
    use tor_chanmgr::ChannelUsage as CU;
742
    use tor_linkspec::ChanTarget;
743
    use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
744
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
745
    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
746
    use tor_proto::memquota::ToplevelAccount;
747
    use tor_rtcompat::SleepProvider;
748
    use tracing::trace;
749

            
750
    /// Make a new nonfunctional `Arc<GuardStatusHandle>`
751
    fn gs() -> Arc<GuardStatusHandle> {
752
        Arc::new(None.into())
753
    }
754

            
755
    #[test]
756
    // Re-enabled after work from eta, discussed in arti#149
757
    fn test_double_timeout() {
758
        let t1 = Duration::from_secs(1);
759
        let t10 = Duration::from_secs(10);
760
        /// Return true if d1 is in range [d2...d2 + 0.5sec]
761
        fn duration_close_to(d1: Duration, d2: Duration) -> bool {
762
            d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
763
        }
764

            
765
        tor_rtmock::MockRuntime::test_with_various(|rto| async move {
766
            // Try a future that's ready immediately.
767
            let x = double_timeout(&rto, async { Ok(3_u32) }, t1, t10).await;
768
            assert!(x.is_ok());
769
            assert_eq!(x.unwrap(), 3_u32);
770

            
771
            trace!("acquiesce after test1");
772
            #[allow(clippy::clone_on_copy)]
773
            #[allow(deprecated)] // TODO #1885
774
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
775

            
776
            // Try a future that's ready after a short delay.
777
            let rt_clone = rt.clone();
778
            // (We only want the short delay to fire, not any of the other timeouts.)
779
            rt_clone.block_advance("manually controlling advances");
780
            let x = rt
781
                .wait_for(double_timeout(
782
                    &rt,
783
                    async move {
784
                        let sl = rt_clone.sleep(Duration::from_millis(100));
785
                        rt_clone.allow_one_advance(Duration::from_millis(100));
786
                        sl.await;
787
                        Ok(4_u32)
788
                    },
789
                    t1,
790
                    t10,
791
                ))
792
                .await;
793
            assert!(x.is_ok());
794
            assert_eq!(x.unwrap(), 4_u32);
795

            
796
            trace!("acquiesce after test2");
797
            #[allow(clippy::clone_on_copy)]
798
            #[allow(deprecated)] // TODO #1885
799
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
800

            
801
            // Try a future that passes the first timeout, and make sure that
802
            // it keeps running after it times out.
803
            let rt_clone = rt.clone();
804
            let (snd, rcv) = oneshot::channel();
805
            let start = rt.now();
806
            rt.block_advance("manually controlling advances");
807
            let x = rt
808
                .wait_for(double_timeout(
809
                    &rt,
810
                    async move {
811
                        let sl = rt_clone.sleep(Duration::from_secs(2));
812
                        rt_clone.allow_one_advance(Duration::from_secs(2));
813
                        sl.await;
814
                        snd.send(()).unwrap();
815
                        Ok(4_u32)
816
                    },
817
                    t1,
818
                    t10,
819
                ))
820
                .await;
821
            assert!(matches!(x, Err(Error::CircTimeout(_))));
822
            let end = rt.now();
823
            assert!(duration_close_to(end - start, Duration::from_secs(1)));
824
            let waited = rt.wait_for(rcv).await;
825
            assert_eq!(waited, Ok(()));
826

            
827
            trace!("acquiesce after test3");
828
            #[allow(clippy::clone_on_copy)]
829
            #[allow(deprecated)] // TODO #1885
830
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
831

            
832
            // Try a future that times out and gets abandoned.
833
            let rt_clone = rt.clone();
834
            rt.block_advance("manually controlling advances");
835
            let (snd, rcv) = oneshot::channel();
836
            let start = rt.now();
837
            // Let it hit the first timeout...
838
            rt.allow_one_advance(Duration::from_secs(1));
839
            let x = rt
840
                .wait_for(double_timeout(
841
                    &rt,
842
                    async move {
843
                        rt_clone.sleep(Duration::from_secs(30)).await;
844
                        snd.send(()).unwrap();
845
                        Ok(4_u32)
846
                    },
847
                    t1,
848
                    t10,
849
                ))
850
                .await;
851
            assert!(matches!(x, Err(Error::CircTimeout(_))));
852
            let end = rt.now();
853
            // ...and let it hit the second, too.
854
            rt.allow_one_advance(Duration::from_secs(9));
855
            let waited = rt.wait_for(rcv).await;
856
            assert!(waited.is_err());
857
            let end2 = rt.now();
858
            assert!(duration_close_to(end - start, Duration::from_secs(1)));
859
            assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
860
        });
861
    }
862

            
863
    /// Get a pair of timeouts that we've encoded as an Ed25519 identity.
864
    ///
865
    /// In our FakeCircuit code below, the first timeout is the amount of
866
    /// time that we should sleep while building a hop to this key,
867
    /// and the second timeout is the length of time-advance we should allow
868
    /// after the hop is built.
869
    ///
870
    /// (This is pretty silly, but it's good enough for testing.)
871
    fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
872
        let mut be = [0; 8];
873
        be[..].copy_from_slice(&id.as_bytes()[0..8]);
874
        let dur = u64::from_be_bytes(be);
875
        be[..].copy_from_slice(&id.as_bytes()[8..16]);
876
        let dur2 = u64::from_be_bytes(be);
877
        (Duration::from_millis(dur), Duration::from_millis(dur2))
878
    }
879
    /// Encode a pair of timeouts as an Ed25519 identity.
880
    ///
881
    /// In our FakeCircuit code below, the first timeout is the amount of
882
    /// time that we should sleep while building a hop to this key,
883
    /// and the second timeout is the length of time-advance we should allow
884
    /// after the hop is built.
885
    ///
886
    /// (This is pretty silly but it's good enough for testing.)
887
    fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
888
        let mut bytes = [0; 32];
889
        let dur = (d1.as_millis() as u64).to_be_bytes();
890
        bytes[0..8].copy_from_slice(&dur);
891
        let dur = (d2.as_millis() as u64).to_be_bytes();
892
        bytes[8..16].copy_from_slice(&dur);
893
        bytes.into()
894
    }
895

            
896
    /// As [`timeouts_from_key`], but first extract the relevant key from the
897
    /// OwnedChanTarget.
898
    fn timeouts_from_chantarget<CT: ChanTarget>(ct: &CT) -> (Duration, Duration) {
899
        // Extracting the Ed25519 identity should always succeed in this case:
900
        // we put it there ourselves!
901
        let ed_id = ct
902
            .identity(RelayIdType::Ed25519)
903
            .expect("No ed25519 key was present for fake ChanTarget‽")
904
            .try_into()
905
            .expect("ChanTarget provided wrong key type");
906
        timeouts_from_key(ed_id)
907
    }
908

            
909
    /// Replacement type for circuit, to implement buildable.
910
    #[derive(Debug, Clone)]
911
    struct FakeCirc {
912
        hops: Vec<RelayIds>,
913
        onehop: bool,
914
    }
915
    #[async_trait]
916
    impl Buildable for Mutex<FakeCirc> {
917
        type Chan = ();
918

            
919
        async fn open_channel<RT: Runtime>(
920
            _chanmgr: &ChanMgr<RT>,
921
            _ct: &OwnedChanTarget,
922
            _guard_status: &GuardStatusHandle,
923
            _usage: ChannelUsage,
924
        ) -> Result<Arc<Self::Chan>> {
925
            Ok(Arc::new(()))
926
        }
927

            
928
        async fn create_chantarget<RT: Runtime>(
929
            _: Arc<Self::Chan>,
930
            rt: &RT,
931
            ct: &OwnedChanTarget,
932
            _: CircParameters,
933
            _timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
934
        ) -> Result<Self> {
935
            let (d1, d2) = timeouts_from_chantarget(ct);
936
            rt.sleep(d1).await;
937
            if !d2.is_zero() {
938
                rt.allow_one_advance(d2);
939
            }
940

            
941
            let c = FakeCirc {
942
                hops: vec![RelayIds::from_relay_ids(ct)],
943
                onehop: true,
944
            };
945
            Ok(Mutex::new(c))
946
        }
947
        async fn create<RT: Runtime>(
948
            _: Arc<Self::Chan>,
949
            rt: &RT,
950
            ct: &OwnedCircTarget,
951
            _: CircParameters,
952
            _timeouts: Arc<dyn tor_proto::client::circuit::TimeoutEstimator>,
953
        ) -> Result<Self> {
954
            let (d1, d2) = timeouts_from_chantarget(ct);
955
            rt.sleep(d1).await;
956
            if !d2.is_zero() {
957
                rt.allow_one_advance(d2);
958
            }
959

            
960
            let c = FakeCirc {
961
                hops: vec![RelayIds::from_relay_ids(ct)],
962
                onehop: false,
963
            };
964
            Ok(Mutex::new(c))
965
        }
966
        async fn extend<RT: Runtime>(
967
            &self,
968
            rt: &RT,
969
            ct: &OwnedCircTarget,
970
            _: CircParameters,
971
        ) -> Result<()> {
972
            let (d1, d2) = timeouts_from_chantarget(ct);
973
            rt.sleep(d1).await;
974
            if !d2.is_zero() {
975
                rt.allow_one_advance(d2);
976
            }
977

            
978
            {
979
                let mut c = self.lock().unwrap();
980
                c.hops.push(RelayIds::from_relay_ids(ct));
981
            }
982
            Ok(())
983
        }
984
    }
985

            
986
    /// Fake implementation of TimeoutEstimator that just records its inputs.
987
    struct TimeoutRecorder<R> {
988
        runtime: R,
989
        hist: Vec<(bool, u8, Duration)>,
990
        // How much advance to permit after being told of a timeout?
991
        on_timeout: Duration,
992
        // How much advance to permit after being told of a success?
993
        on_success: Duration,
994

            
995
        snd_success: Option<oneshot::Sender<()>>,
996
        rcv_success: Option<oneshot::Receiver<()>>,
997
    }
998

            
999
    impl<R> TimeoutRecorder<R> {
        fn new(runtime: R) -> Self {
            Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
        }
        fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
            let (snd_success, rcv_success) = oneshot::channel();
            Self {
                runtime,
                hist: Vec::new(),
                on_timeout,
                on_success,
                rcv_success: Some(rcv_success),
                snd_success: Some(snd_success),
            }
        }
    }
    impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
        fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
            if !is_last {
                return;
            }
            let (rt, advance) = {
                let mut this = self.lock().unwrap();
                this.hist.push((true, hop, delay));
                let _ = this.snd_success.take().unwrap().send(());
                (this.runtime.clone(), this.on_success)
            };
            if !advance.is_zero() {
                rt.allow_one_advance(advance);
            }
        }
        fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
            let (rt, advance) = {
                let mut this = self.lock().unwrap();
                this.hist.push((false, hop, delay));
                (this.runtime.clone(), this.on_timeout)
            };
            if !advance.is_zero() {
                rt.allow_one_advance(advance);
            }
        }
        fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
            (Duration::from_secs(3), Duration::from_secs(100))
        }
        fn learning_timeouts(&self) -> bool {
            false
        }
        fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
        fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
            None
        }
    }
    /// Testing only: create a bogus circuit target
    fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
        let mut builder = OwnedCircTarget::builder();
        builder
            .chan_target()
            .ed_identity(id)
            .rsa_identity([0x20; 20].into());
        builder
            .ntor_onion_key([0x33; 32].into())
            .protocols("".parse().unwrap())
            .build()
            .unwrap()
    }
    /// Testing only: create a bogus channel target
    fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
        OwnedChanTarget::builder()
            .ed_identity(id)
            .rsa_identity([0x20; 20].into())
            .build()
            .unwrap()
    }
    async fn run_builder_test(
        rt: tor_rtmock::MockRuntime,
        advance_initial: Duration,
        path: OwnedPath,
        advance_on_timeout: Option<(Duration, Duration)>,
        usage: ChannelUsage,
    ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
        let chanmgr = Arc::new(
            ChanMgr::new(
                rt.clone(),
                Default::default(),
                Default::default(),
                &Default::default(),
                ToplevelAccount::new_noop(),
            )
            .unwrap(),
        );
        // always has 3 second timeout, 100 second abandon.
        let timeouts = match advance_on_timeout {
            Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
            None => TimeoutRecorder::new(rt.clone()),
        };
        let timeouts = Arc::new(Mutex::new(timeouts));
        let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
            rt.clone(),
            chanmgr,
            timeouts::Estimator::new(Arc::clone(&timeouts)),
        );
        rt.block_advance("manually controlling advances");
        rt.allow_one_advance(advance_initial);
        let outcome = rt.spawn_join("build-owned", async move {
            let arcbuilder = Arc::new(builder);
            let params = exit_circparams_from_netparams(&NetParameters::default())?;
            arcbuilder.build_owned(path, &params, gs(), usage).await
        });
        // Now we wait for a success to finally, finally be reported.
        if advance_on_timeout.is_some() {
            let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
            rt.spawn_identified("receiver", async move {
                receiver.await.unwrap();
            });
        }
        rt.advance_until_stalled().await;
        let circ = outcome.map(|m| Ok(m?.lock().unwrap().clone())).await;
        let timeouts = timeouts.lock().unwrap().hist.clone();
        (circ, timeouts)
    }
    #[test]
    fn build_onehop() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
            let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
            let (outcome, timeouts) =
                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
            let circ = outcome.unwrap();
            assert!(circ.onehop);
            assert_eq!(circ.hops.len(), 1);
            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
            assert_eq!(timeouts.len(), 1);
            assert!(timeouts[0].0); // success
            assert_eq!(timeouts[0].1, 0); // one-hop
            assert_eq!(timeouts[0].2, Duration::from_millis(100));
        });
    }
    #[test]
    fn build_threehop() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let id_100ms =
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
            let id_200ms =
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
            let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
            let path =
                OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
            let (outcome, timeouts) =
                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
            let circ = outcome.unwrap();
            assert!(!circ.onehop);
            assert_eq!(circ.hops.len(), 3);
            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
            assert!(circ.hops[1].same_relay_ids(&chan_t(id_200ms)));
            assert!(circ.hops[2].same_relay_ids(&chan_t(id_300ms)));
            assert_eq!(timeouts.len(), 1);
            assert!(timeouts[0].0); // success
            assert_eq!(timeouts[0].1, 2); // three-hop
            assert_eq!(timeouts[0].2, Duration::from_millis(600));
        });
    }
    #[test]
    fn build_huge_timeout() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let id_100ms =
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
            let id_200ms =
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
            let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
            let (outcome, timeouts) =
                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
            assert_eq!(timeouts.len(), 1);
            assert!(!timeouts[0].0); // timeout
            // BUG: Sometimes this is 1 and sometimes this is 2.
            // assert_eq!(timeouts[0].1, 2); // at third hop.
            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
        });
    }
    #[test]
    fn build_modest_timeout() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let id_100ms =
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
            let id_200ms =
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
            let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
            let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
            let (outcome, timeouts) = run_builder_test(
                rt.clone(),
                Duration::from_millis(100),
                path,
                Some(timeout_advance),
                CU::UserTraffic,
            )
            .await;
            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
            assert_eq!(timeouts.len(), 2);
            assert!(!timeouts[0].0); // timeout
            // BUG: Sometimes this is 1 and sometimes this is 2.
            //assert_eq!(timeouts[0].1, 2); // at third hop.
            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
            assert!(timeouts[1].0); // success
            assert_eq!(timeouts[1].1, 2); // three-hop
            // BUG: This timer is not always reliable, due to races.
            //assert_eq!(timeouts[1].2, Duration::from_millis(3300));
        });
    }
}