1
//! Main implementation of the connection functionality
2

            
3
use std::collections::HashMap;
4
use std::fmt::Debug;
5
use std::marker::PhantomData;
6
use std::sync::Arc;
7

            
8
use async_trait::async_trait;
9
use educe::Educe;
10
use futures::{AsyncRead, AsyncWrite};
11
use itertools::Itertools;
12
use rand::RngExt;
13
use tor_bytes::Writeable;
14
use tor_cell::relaycell::hs::IntroduceAckStatus;
15
use tor_cell::relaycell::hs::intro_payload::{self, IntroduceHandshakePayload};
16
use tor_cell::relaycell::hs::pow::ProofOfWork;
17
use tor_cell::relaycell::msg::{AnyRelayMsg, Introduce1, Rendezvous2};
18
use tor_circmgr::build::onion_circparams_from_netparams;
19
use tor_circmgr::{
20
    ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel, ClientOnionServiceIntroTunnel,
21
};
22
use tor_dirclient::SourceInfo;
23
use tor_error::{Bug, debug_report, warn_report};
24
use tor_hscrypto::Subcredential;
25
use tor_hscrypto::time::TimePeriod;
26
use tor_proto::TargetHop;
27
use tor_proto::client::circuit::handshake::hs_ntor::{self, HsNtorHkdfKeyGenerator};
28
use tracing::{debug, instrument, trace, warn};
29
use web_time_compat::{Duration, Instant, SystemTime};
30

            
31
use retry_error::RetryError;
32
use safelog::{DispRedacted, Sensitive};
33
use tor_cell::relaycell::RelayMsg;
34
use tor_cell::relaycell::hs::{
35
    AuthKeyType, EstablishRendezvous, IntroduceAck, RendezvousEstablished,
36
};
37
use tor_checkable::{Timebound, timed::TimerangeBound};
38
use tor_circmgr::hspool::HsCircPool;
39
use tor_circmgr::timeouts::Action as TimeoutsAction;
40
use tor_dirclient::request::Requestable as _;
41
use tor_error::{HasRetryTime as _, RetryTime};
42
use tor_error::{internal, into_internal};
43
use tor_hscrypto::RendCookie;
44
use tor_hscrypto::pk::{HsBlindId, HsId, HsIdKey};
45
use tor_linkspec::{CircTarget, HasRelayIds, OwnedCircTarget, RelayId};
46
use tor_llcrypto::pk::ed25519::Ed25519Identity;
47
use tor_netdir::{NetDir, Relay};
48
use tor_netdoc::doc::hsdesc::{HsDesc, IntroPointDesc};
49
use tor_proto::client::circuit::{CircParameters, handshake};
50
use tor_proto::{MetaCellDisposition, MsgHandler};
51
use tor_rtcompat::{Runtime, SleepProviderExt as _, TimeoutError};
52

            
53
use crate::Config;
54
use crate::err::RendPtIdentityForError;
55
use crate::pow::HsPowClient;
56
use crate::proto_oneshot;
57
use crate::relay_info::ipt_to_circtarget;
58
use crate::state::MockableConnectorData;
59
use crate::{ConnError, DescriptorError, DescriptorErrorDetail};
60
use crate::{FailedAttemptError, IntroPtIndex, rend_pt_identity_for_error};
61
use crate::{HsClientConnector, HsClientSecretKeys};
62

            
63
use ConnError as CE;
64
use FailedAttemptError as FAE;
65

            
66
/// Given `R, M` where `M: MocksForConnect<M>`, expand to the mockable `ClientCirc`
67
// This is quite annoying.  But the alternative is to write out `<... as // ...>`
68
// each time, since otherwise the compile complains about ambiguous associated types.
69
macro_rules! DataTunnel{ { $R:ty, $M:ty } => {
70
    <<$M as MocksForConnect<$R>>::HsCircPool as MockableCircPool<$R>>::DataTunnel
71
} }
72

            
73
/// Information about a hidden service, including our connection history
74
#[derive(Default, Educe)]
75
#[educe(Debug)]
76
// This type is actually crate-private, since it isn't re-exported, but it must
77
// be `pub` because it appears as a default for a type parameter in HsClientConnector.
78
pub struct Data {
79
    /// The latest known onion service descriptor for this service.
80
    desc: DataHsDesc,
81
    /// Information about the latest status of trying to connect to this service
82
    /// through each of its introduction points.
83
    ipts: DataIpts,
84
    /// Information about the requery period of each HsDir we have recently queried.
85
    ///
86
    /// Each entry represents an HsDir that we cannot requery until
87
    /// its specified timestamp elapses.
88
    ///
89
    /// Any HsDir that does not have an entry in this map can be requeried.
90
    hsdirs: DataHsDirs,
91
}
92

            
93
/// An onion service descriptor and its associated HsBlindId.
94
#[derive(Debug)]
95
struct HsDescForTp {
96
    /// The TP this descriptor is for.
97
    ///
98
    /// Used for determining whether a newly fetched descriptor
99
    /// is for the same time period as this one.
100
    time_period: TimePeriod,
101
    /// The descriptor
102
    desc: TimerangeBound<HsDesc>,
103
}
104

            
105
/// Part of `Data` that relates to our information about the HsDir requery periods
106
type DataHsDirs = HashMap<RelayIdForRequeryPeriod, SystemTime>;
107

            
108
/// Marker type, to make typed HsDir [`RelayIdFor`] keys
109
#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Debug)]
110
struct RequeryPeriodMap;
111

            
112
/// Lookup key for looking up and recording our IPT use experiences
113
type RelayIdForRequeryPeriod = RelayIdFor<RequeryPeriodMap>;
114

            
115
/// Part of `Data` that relates to the HS descriptor
116
type DataHsDesc = Option<HsDescForTp>;
117

            
118
/// Part of `Data` that relates to our information about introduction points
119
type DataIpts = HashMap<RelayIdForExperience, IptExperience>;
120

            
121
/// How things went last time we tried to use this introduction point
122
///
123
/// Neither this data structure, nor [`Data`], is responsible for arranging that we expire this
124
/// information eventually.  If we keep reconnecting to the service, we'll retain information
125
/// about each IPT indefinitely, at least so long as they remain listed in the descriptors we
126
/// receive.
127
///
128
/// Expiry of unused data is handled by `state.rs`, according to `last_used` in `ServiceState`.
129
///
130
/// Choosing which IPT to prefer is done by obtaining an `IptSortKey`
131
/// (from this and other information).
132
//
133
// Don't impl Ord for IptExperience.  We obtain `Option<&IptExperience>` from our
134
// data structure, and if IptExperience were Ord then Option<&IptExperience> would be Ord
135
// but it would be the wrong sort order: it would always prefer None, ie untried IPTs.
136
#[derive(Debug)]
137
struct IptExperience {
138
    /// How long it took us to get whatever outcome occurred
139
    ///
140
    /// We prefer fast successes to slow ones.
141
    /// Then, we prefer failures with earlier `RetryTime`,
142
    /// and, lastly, faster failures to slower ones.
143
    duration: Duration,
144

            
145
    /// What happened and when we might try again
146
    ///
147
    /// Note that we don't actually *enforce* the `RetryTime` here, just sort by it
148
    /// using `RetryTime::loose_cmp`.
149
    ///
150
    /// We *do* return an error that is itself `HasRetryTime` and expect our callers
151
    /// to honour that.
152
    outcome: Result<(), RetryTime>,
153
}
154

            
155
/// Actually make a HS connection, updating our recorded state as necessary
156
///
157
/// `connector` is provided only for obtaining the runtime and netdir (and `mock_for_state`).
158
/// Obviously, `connect` is not supposed to go looking in `services`.
159
///
160
/// This function handles all necessary retrying of fallible operations,
161
/// (and, therefore, must also limit the total work done for a particular call).
162
///
163
/// This function has a minimum of functionality, since it is the boundary
164
/// between "mock connection, used for testing `state.rs`" and
165
/// "mock circuit and netdir, used for testing `connect.rs`",
166
/// so it is not, itself, unit-testable.
167
#[instrument(level = "trace", skip_all)]
168
pub(crate) async fn connect<R: Runtime>(
169
    connector: &HsClientConnector<R>,
170
    netdir: Arc<NetDir>,
171
    config: Arc<Config>,
172
    hsid: HsId,
173
    data: &mut Data,
174
    secret_keys: HsClientSecretKeys,
175
) -> Result<ClientOnionServiceDataTunnel, ConnError> {
176
    Context::new(
177
        &connector.runtime,
178
        &*connector.circpool,
179
        netdir,
180
        config,
181
        hsid,
182
        secret_keys,
183
        (),
184
    )?
185
    .connect(data)
186
    .await
187
}
188

            
189
/// Common context for a single request to connect to a hidden service
190
///
191
/// This saves on passing this same set of (immutable) values (or subsets thereof)
192
/// to each method in the principal functional code, everywhere.
193
/// It also provides a convenient type to be `Self`.
194
///
195
/// Its lifetime is one request to make a new client circuit to a hidden service,
196
/// including all the retries and timeouts.
197
struct Context<'c, R: Runtime, M: MocksForConnect<R>> {
198
    /// Runtime
199
    runtime: &'c R,
200
    /// Circpool
201
    circpool: &'c M::HsCircPool,
202
    /// Netdir
203
    //
204
    // TODO holding onto the netdir for the duration of our attempts is not ideal
205
    // but doing better is fairly complicated.  See discussions here:
206
    //   https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1228#note_2910545
207
    //   https://gitlab.torproject.org/tpo/core/arti/-/issues/884
208
    netdir: Arc<NetDir>,
209
    /// Configuration
210
    config: Arc<Config>,
211
    /// Secret keys to use
212
    secret_keys: HsClientSecretKeys,
213
    /// HS ID
214
    hsid: DispRedacted<HsId>,
215
    /// Blinded HS ID
216
    hs_blind_id: HsBlindId,
217
    /// The subcredential to use during this time period
218
    subcredential: Subcredential,
219
    /// Mock data
220
    mocks: M,
221
}
222

            
223
/// Details of an established rendezvous point
224
///
225
/// Intermediate value for progress during a connection attempt.
226
struct Rendezvous<'r, R: Runtime, M: MocksForConnect<R>> {
227
    /// RPT as a `Relay`
228
    rend_relay: Relay<'r>,
229
    /// Rendezvous circuit
230
    rend_tunnel: DataTunnel!(R, M),
231
    /// Rendezvous cookie
232
    rend_cookie: RendCookie,
233

            
234
    /// Receiver that will give us the RENDEZVOUS2 message.
235
    ///
236
    /// The sending ended is owned by the handler
237
    /// which receives control messages on the rendezvous circuit,
238
    /// and which was installed when we sent `ESTABLISH_RENDEZVOUS`.
239
    ///
240
    /// (`RENDEZVOUS2` is the message containing the onion service's side of the handshake.)
241
    rend2_rx: proto_oneshot::Receiver<Rendezvous2>,
242

            
243
    /// Dummy, to placate compiler
244
    ///
245
    /// Covariant without dropck or interfering with Send/Sync will do fine.
246
    marker: PhantomData<fn() -> (R, M)>,
247
}
248

            
249
/// Random value used as part of IPT selection
250
type IptSortRand = u32;
251

            
252
/// Details of an apparently-useable introduction point
253
///
254
/// Intermediate value for progress during a connection attempt.
255
struct UsableIntroPt<'i> {
256
    /// Index in HS descriptor
257
    intro_index: IntroPtIndex,
258
    /// IPT descriptor
259
    intro_desc: &'i IntroPointDesc,
260
    /// IPT `CircTarget`
261
    intro_target: OwnedCircTarget,
262
    /// Random value used as part of IPT selection
263
    sort_rand: IptSortRand,
264
}
265

            
266
/// Lookup key for looking up and recording information about a relay
267
///
268
/// Used to identify a relay when looking to see what happened last time we used it,
269
/// and storing that information after we tried it.
270
///
271
/// We store the experience information under an arbitrary one of the relay's identities,
272
/// as returned by the `HasRelayIds::identities().next()`.
273
/// When we do lookups, we check all the relay's identities to see if we find
274
/// anything relevant.
275
/// If relay identities permute in strange ways, whether we find our previous
276
/// knowledge about them is not particularly well defined, but that's fine.
277
///
278
/// While this is, structurally, a relay identity, it is not suitable for other purposes.
279
#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug)]
280
struct RelayIdFor<K> {
281
    /// The relay id
282
    inner: RelayId,
283

            
284
    /// Phantom data to allow parameterizing over `K`
285
    ///
286
    /// `K` is a marker type that represents the kind of map
287
    /// this key will be used in.
288
    marker: PhantomData<K>,
289
}
290

            
291
/// Marker type, to make typed Ipt exprience [`RelayIdFor`] keys
292
#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Debug)]
293
struct IptExperienceMap;
294

            
295
/// Lookup key for looking up and recording our IPT use experiences
296
type RelayIdForExperience = RelayIdFor<IptExperienceMap>;
297

            
298
/// Details of an apparently-successful INTRODUCE exchange
299
///
300
/// Intermediate value for progress during a connection attempt.
301
struct Introduced<R: Runtime, M: MocksForConnect<R>> {
302
    /// End-to-end crypto NTORv3 handshake with the service
303
    ///
304
    /// Created as part of generating our `INTRODUCE1`,
305
    /// and then used when processing `RENDEZVOUS2`.
306
    handshake_state: hs_ntor::HsNtorClientState,
307

            
308
    /// Dummy, to placate compiler
309
    ///
310
    /// `R` and `M` only used for getting to mocks.
311
    /// Covariant without dropck or interfering with Send/Sync will do fine.
312
    marker: PhantomData<fn() -> (R, M)>,
313
}
314

            
315
impl<K> RelayIdFor<K> {
316
    /// Create a new key for use with `T`
317
912
    fn new(inner: RelayId) -> Self {
318
912
        Self {
319
912
            inner,
320
912
            marker: Default::default(),
321
912
        }
322
912
    }
323

            
324
    /// Identities to use to try to find previous experience information about this IPT
325
490
    fn for_lookup<T: HasRelayIds>(ids: &T) -> impl Iterator<Item = Self> + '_ {
326
728
        ids.identities().map(|id| RelayIdFor::new(id.to_owned()))
327
490
    }
328

            
329
    /// Identity to use to store previous experience information about this IPT
330
184
    fn for_store<T: HasRelayIds>(ids: &T) -> Result<Self, Bug> {
331
184
        let id = ids
332
184
            .identities()
333
184
            .next()
334
184
            .ok_or_else(|| internal!("introduction point relay with no identities"))?
335
184
            .to_owned();
336
184
        Ok(RelayIdFor::new(id))
337
184
    }
338
}
339

            
340
/// Sort key for an introduction point, for selecting the best IPTs to try first
341
///
342
/// Ordering is most preferable first.
343
///
344
/// We use this to sort our `UsableIpt`s using `.sort_by_key`.
345
/// (This implementation approach ensures that we obey all the usual ordering invariants.)
346
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
347
struct IptSortKey {
348
    /// Sort by how preferable the experience was
349
    outcome: IptSortKeyOutcome,
350
    /// Failing that, choose randomly
351
    sort_rand: IptSortRand,
352
}
353

            
354
/// Component of the [`IptSortKey`] representing outcome of our last attempt, if any
355
///
356
/// This is the main thing we use to decide which IPTs to try first.
357
/// It is calculated for each IPT
358
/// (via `.sort_by_key`, so repeatedly - it should therefore be cheap to make.)
359
///
360
/// Ordering is most preferable first.
361
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
362
enum IptSortKeyOutcome {
363
    /// Prefer successes
364
    Success {
365
        /// Prefer quick ones
366
        duration: Duration,
367
    },
368
    /// Failing that, try one we don't know to have failed
369
    Untried,
370
    /// Failing that, it'll have to be ones that didn't work last time
371
    Failed {
372
        /// Prefer failures with an earlier retry time
373
        retry_time: tor_error::LooseCmpRetryTime,
374
        /// Failing that, prefer quick failures (rather than slow ones eg timeouts)
375
        duration: Duration,
376
    },
377
}
378

            
379
impl From<Option<&IptExperience>> for IptSortKeyOutcome {
380
148
    fn from(experience: Option<&IptExperience>) -> IptSortKeyOutcome {
381
        use IptSortKeyOutcome as O;
382
148
        match experience {
383
18
            None => O::Untried,
384
130
            Some(IptExperience { duration, outcome }) => match outcome {
385
2
                Ok(()) => O::Success {
386
2
                    duration: *duration,
387
2
                },
388
128
                Err(retry_time) => O::Failed {
389
128
                    retry_time: (*retry_time).into(),
390
128
                    duration: *duration,
391
128
                },
392
            },
393
        }
394
148
    }
395
}
396

            
397
/// Token indicating that a descriptor fetch is wanted
398
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
399
struct RefetchDescriptor;
400

            
401
impl<'c, R: Runtime, M: MocksForConnect<R>> Context<'c, R, M> {
402
    /// Make a new `Context` from the input data
403
2
    fn new(
404
2
        runtime: &'c R,
405
2
        circpool: &'c M::HsCircPool,
406
2
        netdir: Arc<NetDir>,
407
2
        config: Arc<Config>,
408
2
        hsid: HsId,
409
2
        secret_keys: HsClientSecretKeys,
410
2
        mocks: M,
411
2
    ) -> Result<Self, ConnError> {
412
2
        let time_period = netdir.hs_time_period();
413
2
        let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
414
2
            .map_err(|_| CE::InvalidHsId)?
415
2
            .compute_blinded_key(time_period)
416
2
            .map_err(
417
                // TODO HS what on earth do these errors mean, in practical terms ?
418
                // In particular, we'll want to convert them to a ConnError variant,
419
                // but what ErrorKind should they have ?
420
2
                into_internal!("key blinding error, don't know how to handle"),
421
            )?;
422
2
        let hs_blind_id = hs_blind_id_key.id();
423

            
424
2
        Ok(Context {
425
2
            netdir,
426
2
            config,
427
2
            hsid: DispRedacted(hsid),
428
2
            hs_blind_id,
429
2
            subcredential,
430
2
            circpool,
431
2
            runtime,
432
2
            secret_keys,
433
2
            mocks,
434
2
        })
435
2
    }
436

            
437
    /// Actually make a HS connection, updating our recorded state as necessary
438
    ///
439
    /// Called by the `connect` function in this module.
440
    ///
441
    /// This function handles all necessary retrying of fallible operations,
442
    /// (and, therefore, must also limit the total work done for a particular call).
443
    #[instrument(level = "trace", skip_all)]
444
16
    async fn connect(&self, data: &mut Data) -> Result<DataTunnel!(R, M), ConnError> {
445
        // This function must do the following, retrying as appropriate.
446
        //  - Look up the onion descriptor in the state.
447
        //  - Download the onion descriptor if one isn't there.
448
        //  - In parallel:
449
        //    - Pick a rendezvous point from the netdirprovider and launch a
450
        //      rendezvous circuit to it. Then send ESTABLISH_INTRO.
451
        //    - Pick a number of introduction points (1 or more) and try to
452
        //      launch circuits to them.
453
        //  - On a circuit to an introduction point, send an INTRODUCE1 cell.
454
        //  - Wait for a RENDEZVOUS2 cell on the rendezvous circuit
455
        //  - Add a virtual hop to the rendezvous circuit.
456
        //  - Return the rendezvous circuit.
457

            
458
        let mocks = self.mocks.clone();
459

            
460
        let desc = self
461
            .descriptor_ensure(&mut data.desc, &mut data.hsdirs, None)
462
            .await?;
463

            
464
        mocks.test_got_desc(desc);
465

            
466
        let tunnel = match self.intro_rend_connect(desc, &mut data.ipts).await {
467
            Ok(tunnel) => tunnel,
468
            Err(e) => {
469
14
                let is_intro_nack = |e| {
470
14
                    if let FAE::IntroductionFailed { status, .. } = e {
471
14
                        status == IntroduceAckStatus::NOT_RECOGNIZED
472
                    } else {
473
                        false
474
                    }
475
14
                };
476

            
477
                let retry = if let CE::Failed(ref errors) = e {
478
                    // If any of the errors are an INTRODUCE_NACK,
479
                    // then it's worth retrying one more time
480
                    // with a fresh descriptor.
481
                    errors
482
                        .clone()
483
                        .into_iter()
484
                        .any(is_intro_nack)
485
                        .then_some(RefetchDescriptor)
486
                } else {
487
                    None
488
                };
489

            
490
                if let Some(RefetchDescriptor) = retry {
491
                    debug!(
492
                        "Introduction to {} NACKed, refetching descriptor and retrying",
493
                        &self.hsid,
494
                    );
495
                    // Refetch the descriptor and try one more time
496
                    let desc = self
497
                        .descriptor_ensure(&mut data.desc, &mut data.hsdirs, retry)
498
                        .await?;
499
                    mocks.test_got_desc(desc);
500
                    self.intro_rend_connect(desc, &mut data.ipts).await?
501
                } else {
502
                    return Err(e);
503
                }
504
            }
505
        };
506

            
507
        mocks.test_got_tunnel(&tunnel);
508

            
509
        Ok(tunnel)
510
16
    }
511

            
512
    /// Ensure that `Data.desc` contains the HS descriptor
513
    ///
514
    /// If we have a previously-downloaded descriptor, which is still valid,
515
    /// just returns a reference to it.
516
    ///
517
    /// Otherwise, tries to obtain the descriptor by downloading it from hsdir(s).
518
    ///
519
    /// If `refetch` is `true`, a new descriptor will be refetched
520
    /// from the hsdir(s) unconditionally.
521
    ///
522
    /// Does all necessary retries and timeouts.
523
    /// Returns an error if no valid descriptor could be found.
524
    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
525
    #[instrument(level = "trace", skip_all)]
526
30
    async fn descriptor_ensure<'d>(
527
30
        &self,
528
30
        data: &'d mut DataHsDesc,
529
30
        recent_hsdirs: &'d mut DataHsDirs,
530
30
        refetch: Option<RefetchDescriptor>,
531
30
    ) -> Result<&'d HsDesc, CE> {
532
        // Maximum number of hsdir connection and retrieval attempts we'll make
533
        let max_total_attempts = self
534
            .config
535
            .retry
536
            .hs_desc_fetch_attempts()
537
            .try_into()
538
            // User specified a very large u32.  We must be downcasting it to 16bit!
539
            // let's give them as many retries as we can manage.
540
            .unwrap_or(usize::MAX);
541

            
542
        let now = self.runtime.wallclock();
543
28
        let unwrap_valid_desc = |data: &'d mut DataHsDesc| -> &'d HsDesc {
544
28
            data.as_ref()
545
28
                .expect("Some but now None")
546
28
                .desc
547
28
                .as_ref()
548
28
                .check_valid_at(&now)
549
28
                .expect("Ok but now Err")
550
28
        };
551

            
552
        // We retain a previously obtained descriptor precisely until its lifetime expires,
553
        // or until we refetch a more recent one
554
        // as a result of an `intro_rend_connect()` failure caused by introduce NACK.
555
        //
556
        // When it expires, we discard it completely and try to obtain a new one.
557
        //
558
        // We only replace our cached descriptor if the new one has a higher revision counter.
559
        //
560
        // TODO SPEC: Discuss HS descriptor lifetime and expiry client behaviour
561
        let now = self.runtime.wallclock();
562

            
563
28
        let stored_revision = data.as_ref().and_then(|previously| {
564
28
            if let Ok(desc) = previously.desc.as_ref().check_valid_at(&now) {
565
                // Ideally we would just return desc but that confuses borrowck,
566
                // so we have to use unwrap_valid_desc() each time
567
                // we need the known-to-be-Some descriptor instead.
568
                //
569
                // https://github.com/rust-lang/rust/issues/51545
570
28
                Some((desc.revision(), previously.time_period))
571
            } else {
572
                // Seems to be not valid now.  Try to fetch a fresh one.
573
                None
574
            }
575
28
        });
576

            
577
        match (stored_revision, refetch) {
578
            (Some(_), None) => {
579
                // Our cached descriptor is still timely,
580
                // and we don't need to fetch a new one.
581
                return Ok(unwrap_valid_desc(data));
582
            }
583
            (None, _) => {
584
                // We don't have a timely descriptor,
585
                // so ignore the requery_interval,
586
                // and reach out to all HsDirs
587
                recent_hsdirs.clear();
588
            }
589
            (_, Some(RefetchDescriptor)) => {
590
                // We have been asked to try to fetch a new descriptor.
591
                // We will only reach out to the HsDirs that are
592
                // not within the `hs_dir_requery_interval`
593
            }
594
        }
595

            
596
        // First, filter out any HsDirs that we *can* requery
597
54
        recent_hsdirs.retain(|_hsdir, requery| *requery > now);
598

            
599
        let working_tp = self.netdir.hs_time_period();
600
        let hs_dirs = self.netdir.hs_dirs_download(
601
            self.hs_blind_id,
602
            working_tp,
603
            &mut self.mocks.thread_rng(),
604
        )?;
605

            
606
        trace!(
607
            "HS desc fetch for {}, for period {}, using {} hsdirs",
608
            &self.hsid,
609
            working_tp,
610
            hs_dirs.len()
611
        );
612

            
613
        let hs_dirs = hs_dirs
614
            .into_iter()
615
96
            .filter(|hsdir| {
616
                // Skip over any HsDirs that we are not allowed to requery right now
617
182
                let should_skip = recent_hsdirs.keys().any(|recent| {
618
322
                    RelayIdForRequeryPeriod::for_lookup(hsdir).any(|id| id == *recent)
619
182
                });
620

            
621
96
                !should_skip
622
96
            })
623
            .collect::<Vec<_>>();
624

            
625
        if hs_dirs.is_empty() {
626
            warn!(
627
                "Tried to fetch HS desc for {}, for period {}, but all hsdirs are rate-limited",
628
                &self.hsid, working_tp,
629
            );
630

            
631
            if stored_revision.is_none() {
632
                // We can't fetch a new descriptor, and we don't have a cached one.
633
                return Err(CE::NoUsableHsDirs);
634
            } else {
635
                // Return our cached descriptor
636
                return Ok(unwrap_valid_desc(data));
637
            }
638
        }
639

            
640
        // We might consider launching requests to multiple HsDirs in parallel.
641
        //   https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1118#note_2894463
642
        // But C Tor doesn't and our HS experts don't consider that important:
643
        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914436
644
        // (Additionally, making multiple HSDir requests at once may make us
645
        // more vulnerable to traffic analysis.)
646
        let mut attempts = hs_dirs.iter().cycle().take(max_total_attempts);
647
        let mut errors = RetryError::in_attempt_to("retrieve hidden service descriptor");
648
        let desc = loop {
649
            let relay = match attempts.next() {
650
                Some(relay) => relay,
651
                None => {
652
                    return Err(if errors.is_empty() {
653
                        CE::NoHsDirs
654
                    } else {
655
                        CE::DescriptorDownload(errors)
656
                    });
657
                }
658
            };
659
            let hsdir_for_error: Sensitive<Ed25519Identity> = (*relay.id()).into();
660

            
661
            let hsdir = RelayIdForRequeryPeriod::for_store(relay)?;
662
            // Ensure we wait at least hs_dir_requery_interval() until we try to
663
            // fecth from this HsDir again
664
            recent_hsdirs.insert(hsdir, now + self.config.retry.hs_dir_requery_interval());
665

            
666
            match self.descriptor_fetch_attempt(relay).await {
667
                Ok(desc) => break desc,
668
                Err(error) => {
669
                    if error.should_report_as_suspicious() {
670
                        // Note that not every protocol violation is suspicious:
671
                        // we only warn on the protocol violations that look like attempts
672
                        // to do a traffic tagging attack via hsdir inflation.
673
                        // (See proposal 360.)
674
                        warn_report!(
675
                            &error,
676
                            "Suspicious failure while downloading hsdesc for {} from relay {}",
677
                            &self.hsid,
678
                            relay.display_relay_ids(),
679
                        );
680
                    } else {
681
                        debug_report!(
682
                            &error,
683
                            "failed hsdir desc fetch for {} from {}/{}",
684
                            &self.hsid,
685
                            &relay.id(),
686
                            &relay.rsa_id()
687
                        );
688
                    }
689
                    errors.push_timed(
690
                        tor_error::Report(DescriptorError {
691
                            hsdir: hsdir_for_error,
692
                            error,
693
                        }),
694
                        self.runtime.now(),
695
                        Some(self.runtime.wallclock()),
696
                    );
697
                }
698
            }
699
        };
700

            
701
        // If our existing descriptor is newer than the one we have just fetched,
702
        // we should retain it.
703
        if let Some(stored_revision) = stored_revision {
704
            // It is safe to dangerously_assume_timely,
705
            // as descriptor_fetch_attempt has already checked the timeliness of the descriptor.
706
            let new_desc = desc.as_ref().dangerously_assume_timely();
707

            
708
            // Revision counters are monotonically increasing within a given time period.
709
            // If our newly fetched descriptor has the same HsBlindId as our cached one,
710
            // it means they are both used for the same time period,
711
            // and so we should only update our cache if the new descriptor is more recent
712
            // (i.e. it has a higher revision counter).
713
            if stored_revision >= (new_desc.revision(), working_tp) {
714
                // Our cached descriptor is still timely, and has a higher revision counter
715
                // than the one we've just fetched, so we retain it.
716
                return Ok(unwrap_valid_desc(data));
717
            }
718
        }
719

            
720
        // Store the bounded value in the cache for reuse,
721
        // but return a reference to the unwrapped `HsDesc`.
722
        //
723
        // The `HsDesc` must be owned by `data.desc`,
724
        // so first add it to `data.desc`,
725
        // and then dangerously_assume_timely to get a reference out again.
726
        //
727
        // It is safe to dangerously_assume_timely,
728
        // as descriptor_fetch_attempt has already checked the timeliness of the descriptor.
729
        let desc = HsDescForTp {
730
            time_period: working_tp,
731
            desc,
732
        };
733
        let ret = data.insert(desc);
734
        Ok(ret.desc.as_ref().dangerously_assume_timely())
735
30
    }
736

            
737
    /// Make one attempt to fetch the descriptor from a specific hsdir
738
    ///
739
    /// No timeout
740
    ///
741
    /// On success, returns the descriptor.
742
    ///
743
    /// While the returned descriptor is `TimerangeBound`, its validity at the current time *has*
744
    /// been checked.
745
    #[instrument(level = "trace", skip_all)]
746
14
    async fn descriptor_fetch_attempt(
747
14
        &self,
748
14
        hsdir: &Relay<'_>,
749
14
    ) -> Result<TimerangeBound<HsDesc>, DescriptorErrorDetail> {
750
        let max_len: usize = self
751
            .netdir
752
            .params()
753
            .hsdir_max_desc_size
754
            .get()
755
            .try_into()
756
            .map_err(into_internal!("BoundedInt was not truly bounded!"))?;
757
        let request = {
758
            let mut r = tor_dirclient::request::HsDescDownloadRequest::new(self.hs_blind_id);
759
            r.set_max_len(max_len);
760
            r
761
        };
762
        trace!(
763
            "hsdir for {}, trying {}/{}, request {:?} (http request {:?})",
764
            &self.hsid,
765
            &hsdir.id(),
766
            &hsdir.rsa_id(),
767
            &request,
768
            request.debug_request()
769
        );
770

            
771
        let circuit = self
772
            .circpool
773
            .m_get_or_launch_dir(&self.netdir, OwnedCircTarget::from_circ_target(hsdir))
774
            .await?;
775
        let n_hops = circuit.m_num_hops()?;
776
        let timeout_roundtrip =
777
            self.estimate_timeout(&[(1, TimeoutsAction::RoundTrip { length: n_hops })]);
778

            
779
        let source: Option<SourceInfo> = circuit
780
            .m_source_info()
781
            .map_err(into_internal!("Couldn't get SourceInfo for circuit"))?;
782

            
783
        let mut stream = self
784
            .runtime
785
            // NOTE: In fact this timeout is overkill: this operation should succeed immediately,
786
            // since we always send BEGINDIR messages optimistically (without waiting for a reply).
787
            // But since our code is complex, and since it could become possible for this to block
788
            // if the circuit is saturated or we implement proposal 367 or something,
789
            // we may as well have _some_ timeout here.
790
            .timeout(timeout_roundtrip, circuit.m_begin_dir_stream())
791
            .await?
792
            .map_err(DescriptorErrorDetail::Circuit)?;
793

            
794
        let request_future =
795
            tor_dirclient::send_request(self.runtime, &request, &mut stream, source);
796
        let response = self
797
            .runtime
798
            .timeout(timeout_roundtrip, request_future)
799
            .await?
800
            .map_err(|dir_error| match dir_error {
801
                tor_dirclient::Error::RequestFailed(rfe) => DescriptorErrorDetail::from(rfe.error),
802
                tor_dirclient::Error::CircMgr(ce) => into_internal!(
803
                    "tor-dirclient complains about circmgr going wrong but we gave it a stream"
804
                )(ce)
805
                .into(),
806
                other => into_internal!(
807
                    "tor-dirclient gave unexpected error, tor-hsclient code needs updating"
808
                )(other)
809
                .into(),
810
            })?;
811

            
812
        let desc_text = response.into_output_string().map_err(|rfe| rfe.error)?;
813
        let hsc_desc_enc = self.secret_keys.keys.ks_hsc_desc_enc.as_ref();
814

            
815
        let now = self.runtime.wallclock();
816

            
817
        HsDesc::parse_decrypt_validate(
818
            &desc_text,
819
            &self.hs_blind_id,
820
            now,
821
            &self.subcredential,
822
            hsc_desc_enc,
823
        )
824
        .map_err(DescriptorErrorDetail::from)
825
14
    }
826

            
827
    /// Given the descriptor, try to connect to service
828
    ///
829
    /// Does all necessary retries, timeouts, etc.
830
30
    async fn intro_rend_connect(
831
30
        &self,
832
30
        desc: &HsDesc,
833
30
        data: &mut DataIpts,
834
30
    ) -> Result<DataTunnel!(R, M), CE> {
835
        // Maximum number of rendezvous/introduction attempts we'll make
836
30
        let max_total_attempts = self
837
30
            .config
838
30
            .retry
839
30
            .hs_intro_rend_attempts()
840
30
            .try_into()
841
            // User specified a very large u32.  We must be downcasting it to 16bit!
842
            // let's give them as many retries as we can manage.
843
30
            .unwrap_or(usize::MAX);
844

            
845
        // We can't reliably distinguish IPT failure from RPT failure, so we iterate over IPTs
846
        // (best first) and each time use a random RPT.
847

            
848
        // We limit the number of rendezvous establishment attempts, separately, since we don't
849
        // try to talk to the intro pt until we've established the rendezvous circuit.
850
30
        let mut rend_attempts = 0..max_total_attempts;
851

            
852
        // But, we put all the errors into the same bucket, since we might have a mixture.
853
30
        let mut errors = RetryError::in_attempt_to("make circuit to hidden service");
854

            
855
        // Note that IntroPtIndex is *not* the index into this Vec.
856
        // It is the index into the original list of introduction points in the descriptor.
857
30
        let mut usable_intros: Vec<UsableIntroPt> = desc
858
30
            .intro_points()
859
30
            .iter()
860
30
            .enumerate()
861
90
            .map(|(intro_index, intro_desc)| {
862
90
                let intro_index = intro_index.into();
863
90
                let intro_target = ipt_to_circtarget(intro_desc, &self.netdir)
864
90
                    .map_err(|error| FAE::UnusableIntro { error, intro_index })?;
865
                // Lack of TAIT means this clone
866
90
                let intro_target = OwnedCircTarget::from_circ_target(&intro_target);
867
90
                Ok::<_, FailedAttemptError>(UsableIntroPt {
868
90
                    intro_index,
869
90
                    intro_desc,
870
90
                    intro_target,
871
90
                    sort_rand: self.mocks.thread_rng().random(),
872
90
                })
873
90
            })
874
90
            .filter_map(|entry| match entry {
875
90
                Ok(y) => Some(y),
876
                Err(e) => {
877
                    errors.push_timed(e, self.runtime.now(), Some(self.runtime.wallclock()));
878
                    None
879
                }
880
90
            })
881
30
            .collect_vec();
882

            
883
        // Delete experience information for now-unlisted intro points
884
        // Otherwise, as the IPTs change `Data` might grow without bound,
885
        // if we keep reconnecting to the same HS.
886
80
        data.retain(|k, _v| {
887
80
            usable_intros
888
80
                .iter()
889
240
                .any(|ipt| RelayIdForExperience::for_lookup(&ipt.intro_target).any(|id| &id == k))
890
80
        });
891

            
892
        // Join with existing state recording our experiences,
893
        // sort by descending goodness, and then randomly
894
        // (so clients without any experience don't all pile onto the same, first, IPT)
895
148
        usable_intros.sort_by_key(|ipt: &UsableIntroPt| {
896
148
            let experience =
897
166
                RelayIdForExperience::for_lookup(&ipt.intro_target).find_map(|id| data.get(&id));
898
148
            IptSortKey {
899
148
                outcome: experience.into(),
900
148
                sort_rand: ipt.sort_rand,
901
148
            }
902
148
        });
903
30
        self.mocks.test_got_ipts(&usable_intros);
904

            
905
30
        let mut intro_attempts = usable_intros.iter().cycle().take(max_total_attempts);
906

            
907
        // We retain a rendezvous we managed to set up in here.  That way if we created it, and
908
        // then failed before we actually needed it, we can reuse it.
909
        // If we exit with an error, we will waste it - but because we isolate things we do
910
        // for different services, it wouldn't be reusable anyway.
911
30
        let mut saved_rendezvous = None;
912

            
913
        // If we are using proof-of-work DoS mitigation, this chooses an
914
        // algorithm and initial effort, and adjusts that effort when we retry.
915
30
        let mut pow_client = HsPowClient::new(&self.hs_blind_id, desc);
916

            
917
        // We might consider making multiple INTRODUCE attempts to different
918
        // IPTs in parallel, and somehow aggregating the errors and
919
        // experiences.
920
        // However our HS experts don't consider that important:
921
        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
922
        // Parallelizing our HsCircPool circuit building would likely have
923
        // greater impact. (See #1149.)
924
        loop {
925
            // When did we start doing things that depended on the IPT?
926
            //
927
            // Used for recording our experience with the selected IPT
928
196
            let mut ipt_use_started = None::<Instant>;
929

            
930
            // Error handling inner async block (analogous to an IEFE):
931
            //  * Ok(Some()) means this attempt succeeded
932
            //  * Ok(None) means all attempts exhausted
933
            //  * Err(error) means this attempt failed
934
            //
935
196
            let outcome = async {
936
                // We establish a rendezvous point first.  Although it appears from reading
937
                // this code that this means we serialise establishment of the rendezvous and
938
                // introduction circuits, this isn't actually the case.  The circmgr maintains
939
                // a pool of circuits.  What actually happens in the "standing start" case is
940
                // that we obtain a circuit for rendezvous from the circmgr's pool, expecting
941
                // one to be available immediately; the circmgr will then start to build a new
942
                // one to replenish its pool, and that happens in parallel with the work we do
943
                // here - but in arrears.  If the circmgr pool is empty, then we must wait.
944
                //
945
                // Perhaps this should be parallelised here.  But that's really what the pool
946
                // is for, since we expect building the rendezvous circuit and building the
947
                // introduction circuit to take about the same length of time.
948
                //
949
                // We *do* serialise the ESTABLISH_RENDEZVOUS exchange, with the
950
                // building of the introduction circuit.  That could be improved, at the cost
951
                // of some additional complexity here.
952
                //
953
                // Our HS experts don't consider it important to increase the parallelism:
954
                //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914444
955
                //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914445
956
196
                if saved_rendezvous.is_none() {
957
196
                    debug!("hs conn to {}: setting up rendezvous point", &self.hsid);
958
                    // Establish a rendezvous circuit.
959
196
                    let Some(_): Option<usize> = rend_attempts.next() else {
960
26
                        return Ok(None);
961
                    };
962

            
963
170
                    saved_rendezvous = Some(self.establish_rendezvous().await?);
964
                }
965

            
966
170
                let Some(ipt) = intro_attempts.next() else {
967
                    return Ok(None);
968
                };
969
170
                let intro_index = ipt.intro_index;
970
170
                let is_single_onion_service = desc.is_single_onion_service();
971

            
972
170
                let proof_of_work = match pow_client.solve().await {
973
170
                    Ok(solution) => solution,
974
                    Err(e) => {
975
                        debug!(
976
                            "failing to compute proof-of-work, trying without. ({:?})",
977
                            e
978
                        );
979
                        None
980
                    }
981
                };
982

            
983
                // We record how long things take, starting from here, as
984
                // as a statistic we'll use for the IPT in future.
985
                // This is stored in a variable outside this async block,
986
                // so that the outcome handling can use it.
987
170
                ipt_use_started = Some(self.runtime.now());
988

            
989
                // No `Option::get_or_try_insert_with`, or we'd avoid this expect()
990
170
                let rend_pt_for_error = rend_pt_identity_for_error(
991
170
                    &saved_rendezvous
992
170
                        .as_ref()
993
170
                        .expect("just made Some")
994
170
                        .rend_relay,
995
                );
996
170
                debug!(
997
                    "hs conn to {}: RPT {}",
998
                    &self.hsid,
999
                    rend_pt_for_error.as_inner()
                );
4
                let (rendezvous, introduced) =
170
                    self.exchange_introduce(ipt, &mut saved_rendezvous, proof_of_work)
170
                    .await
                    // TODO: Maybe try, once, to extend-and-reuse the intro circuit.
                    //
                    // If the introduction fails, the introduction circuit is in principle
                    // still usable.  We believe that in this case, C Tor extends the intro
                    // circuit by one hop to the next IPT to try.  That saves on building a
                    // whole new 3-hop intro circuit.  However, our HS experts tell us that
                    // if introduction fails at one IPT it is likely to fail at the others too,
                    // so that optimisation might reduce our network impact and time to failure,
                    // but isn't likely to improve our chances of success.
                    //
                    // However, it's not clear whether this approach risks contaminating
                    // the 2nd attempt with some fault relating to the introduction point.
                    // The 1st ipt might also gain more knowledge about which HS we're talking to.
                    //
                    // TODO SPEC: Discuss extend-and-reuse HS intro circuit after nack
166
                    ?;
                #[allow(unused_variables)] // it's *supposed* to be unused
4
                let saved_rendezvous = (); // don't use `saved_rendezvous` any more, use rendezvous
4
                let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
4
                let circ = self.complete_rendezvous(ipt, rendezvous, introduced, is_single_onion_service)
4
                    .await?;
4
                debug!(
                    "hs conn to {}: RPT {} IPT {}: success",
                    &self.hsid,
                    rend_pt.as_inner(),
                    intro_index,
                );
4
                Ok::<_, FAE>(Some((intro_index, circ)))
196
            }
196
            .await;
            // Store the experience `outcome` we had with IPT `intro_index`, in `data`
            #[allow(clippy::unused_unit)] // -> () is here for error handling clarity
196
            let mut store_experience = |intro_index, outcome| -> () {
170
                (|| {
170
                    let ipt = usable_intros
170
                        .iter()
338
                        .find(|ipt| ipt.intro_index == intro_index)
170
                        .ok_or_else(|| internal!("IPT not found by index"))?;
170
                    let id = RelayIdForExperience::for_store(&ipt.intro_target)?;
170
                    let started = ipt_use_started.ok_or_else(|| {
                        internal!("trying to record IPT use but no IPT start time noted")
                    })?;
170
                    let duration = self
170
                        .runtime
170
                        .now()
170
                        .checked_duration_since(started)
170
                        .ok_or_else(|| internal!("clock overflow calculating IPT use duration"))?;
170
                    data.insert(id, IptExperience { duration, outcome });
170
                    Ok::<_, Bug>(())
                })()
170
                .unwrap_or_else(|e| warn_report!(e, "error recording HS IPT use experience"));
170
            };
30
            match outcome {
4
                Ok(Some((intro_index, y))) => {
                    // Record successful outcome in Data
4
                    store_experience(intro_index, Ok(()));
4
                    return Ok(y);
                }
26
                Ok(None) => return Err(CE::Failed(errors)),
166
                Err(error) => {
166
                    debug_report!(&error, "hs conn to {}: attempt failed", &self.hsid);
                    // Record error outcome in Data, if in fact we involved the IPT
                    // at all.  The IPT information is be retrieved from `error`,
                    // since only some of the errors implicate the introduction point.
166
                    if let Some(intro_index) = error.intro_index() {
166
                        store_experience(intro_index, Err(error.retry_time()));
166
                    }
166
                    errors.push_timed(error, self.runtime.now(), Some(self.runtime.wallclock()));
                    // If we are using proof-of-work DoS mitigation, try harder next time
166
                    pow_client.increase_effort();
                }
            }
        }
30
    }
    /// Make one attempt to establish a rendezvous circuit
    ///
    /// This doesn't really depend on anything,
    /// other than (obviously) the isolation implied by our circuit pool.
    /// In particular it doesn't depend on the introduction point.
    ///
    /// Applies timeouts as appropriate.
    #[instrument(level = "trace", skip_all)]
170
    async fn establish_rendezvous(&'c self) -> Result<Rendezvous<'c, R, M>, FAE> {
        let (rend_tunnel, rend_relay) = self
            .circpool
            .m_get_or_launch_client_rend(&self.netdir)
            .await
            .map_err(|error| FAE::RendezvousCircuitObtain { error })?;
        let rend_pt = rend_pt_identity_for_error(&rend_relay);
        let rend_cookie: RendCookie = self.mocks.thread_rng().random();
        let message = EstablishRendezvous::new(rend_cookie);
        let (rend_established_tx, rend_established_rx) = proto_oneshot::channel();
        let (rend2_tx, rend2_rx) = proto_oneshot::channel();
        /// Handler which expects `RENDEZVOUS_ESTABLISHED` and then
        /// `RENDEZVOUS2`.   Returns each message via the corresponding `oneshot`.
        struct Handler {
            /// Sender for a RENDEZVOUS_ESTABLISHED message.
            rend_established_tx: proto_oneshot::Sender<RendezvousEstablished>,
            /// Sender for a RENDEZVOUS2 message.
            rend2_tx: proto_oneshot::Sender<Rendezvous2>,
        }
        impl MsgHandler for Handler {
340
            fn handle_msg(
340
                &mut self,
340
                msg: AnyRelayMsg,
340
            ) -> Result<MetaCellDisposition, tor_proto::Error> {
                // The first message we expect is a RENDEZVOUS_ESTABALISHED.
340
                if self.rend_established_tx.still_expected() {
170
                    self.rend_established_tx
170
                        .deliver_expected_message(msg, MetaCellDisposition::Consumed)
                } else {
170
                    self.rend2_tx
170
                        .deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
                }
340
            }
        }
        debug!(
            "hs conn to {}: RPT {}: sending ESTABLISH_RENDEZVOUS",
            &self.hsid,
            rend_pt.as_inner(),
        );
        let failed_map_err = |error| FAE::RendezvousEstablish {
            error,
            rend_pt: rend_pt.clone(),
        };
        let handler = Handler {
            rend_established_tx,
            rend2_tx,
        };
        let num_hops = rend_tunnel
            .m_num_own_hops()
            .map_err(|error| FAE::RendezvousCircuitObtain { error })?;
        let timeout_roundtrip =
            self.estimate_timeout(&[(1, TimeoutsAction::RoundTrip { length: num_hops })]);
        // TODO(conflux) This error handling is horrible. Problem is that this Mock system requires
        // to send back a tor_circmgr::Error while our reply handler requires a tor_proto::Error.
        // And unifying both is hard here considering it needs to be converted to yet another Error
        // type "FAE" so we have to do these hoops and jumps.
        rend_tunnel
            .m_start_conversation_last_hop(Some(message.into()), handler)
            .await
            .map_err(|e| {
                let proto_error = match e {
                    tor_circmgr::Error::Protocol { error, .. } => error,
                    _ => tor_proto::Error::CircuitClosed,
                };
                FAE::RendezvousEstablish {
                    error: proto_error,
                    rend_pt: rend_pt.clone(),
                }
            })?;
        // `start_conversation` returns as soon as the control message has been sent.
        // We need to obtain the RENDEZVOUS_ESTABLISHED message, which is "returned" via the oneshot.
        let _: RendezvousEstablished = self
            .runtime
            .timeout(timeout_roundtrip, rend_established_rx.recv(failed_map_err))
            .await
            .map_err(
                |_timeout: tor_rtcompat::TimeoutError| FAE::RendezvousEstablishTimeout {
                    rend_pt: rend_pt.clone(),
                },
            )??;
        debug!(
            "hs conn to {}: RPT {}: got RENDEZVOUS_ESTABLISHED",
            &self.hsid,
            rend_pt.as_inner(),
        );
        Ok(Rendezvous {
            rend_tunnel,
            rend_cookie,
            rend_relay,
            rend2_rx,
            marker: PhantomData,
        })
170
    }
    /// Attempt (once) to send an INTRODUCE1 and wait for the INTRODUCE_ACK
    ///
    /// `take`s the input `rendezvous` (but only takes it if it gets that far)
    /// and, if successful, returns it.
    /// (This arranges that the rendezvous is "used up" precisely if
    /// we sent its secret somewhere.)
    ///
    /// Although this function handles the `Rendezvous`,
    /// nothing in it actually involves the rendezvous point.
    /// So if there's a failure, it's purely to do with the introduction point.
    ///
    /// Applies timeouts as appropriate.
    #[allow(clippy::cognitive_complexity, clippy::type_complexity)] // TODO: Refactor
    #[instrument(level = "trace", skip_all)]
170
    async fn exchange_introduce(
170
        &'c self,
170
        ipt: &UsableIntroPt<'_>,
170
        rendezvous: &mut Option<Rendezvous<'c, R, M>>,
170
        proof_of_work: Option<ProofOfWork>,
170
    ) -> Result<(Rendezvous<'c, R, M>, Introduced<R, M>), FAE> {
170
        let intro_index = ipt.intro_index;
        debug!(
            "hs conn to {}: IPT {}: obtaining intro circuit",
            &self.hsid, intro_index,
        );
        let intro_circ = self
            .circpool
            .m_get_or_launch_intro(
                &self.netdir,
                ipt.intro_target.clone(), // &OwnedCircTarget isn't CircTarget apparently
            )
            .await
            .map_err(|error| FAE::IntroductionCircuitObtain { error, intro_index })?;
        let rendezvous = rendezvous.take().ok_or_else(|| internal!("no rend"))?;
        let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
        debug!(
            "hs conn to {}: RPT {} IPT {}: making introduction",
            &self.hsid,
            rend_pt.as_inner(),
            intro_index,
        );
        // Now we construct an introduce1 message and perform the first part of the
        // rendezvous handshake.
        //
        // This process is tricky because the header of the INTRODUCE1 message
        // -- which depends on the IntroPt configuration -- is authenticated as
        // part of the HsDesc handshake.
        // Construct the header, since we need it as input to our encryption.
        let intro_header = {
            let ipt_sid_key = ipt.intro_desc.ipt_sid_key();
            let intro1 = Introduce1::new(
                AuthKeyType::ED25519_SHA3_256,
                ipt_sid_key.as_bytes().to_vec(),
                vec![],
            );
            let mut header = vec![];
            intro1
                .encode_onto(&mut header)
                .map_err(into_internal!("couldn't encode intro1 header"))?;
            header
        };
        // Construct the introduce payload, which tells the onion service how to find
        // our rendezvous point.  (We could do this earlier if we wanted.)
        let intro_payload = {
            let onion_key =
                intro_payload::OnionKey::NtorOnionKey(*rendezvous.rend_relay.ntor_onion_key());
            let linkspecs = rendezvous
                .rend_relay
                .linkspecs()
                .map_err(into_internal!("Couldn't encode link specifiers"))?;
            let payload = IntroduceHandshakePayload::new(
                rendezvous.rend_cookie,
                onion_key,
                linkspecs,
                proof_of_work,
            );
            let mut encoded = vec![];
            payload
                .write_onto(&mut encoded)
                .map_err(into_internal!("Couldn't encode introduce1 payload"))?;
            encoded
        };
        // Perform the cryptographic handshake with the onion service.
        let service_info = hs_ntor::HsNtorServiceInfo::new(
            ipt.intro_desc.svc_ntor_key().clone(),
            ipt.intro_desc.ipt_sid_key().clone(),
            self.subcredential,
        );
        let handshake_state =
            hs_ntor::HsNtorClientState::new(&mut self.mocks.thread_rng(), service_info);
        let encrypted_body = handshake_state
            .client_send_intro(&intro_header, &intro_payload)
            .map_err(into_internal!("can't begin hs-ntor handshake"))?;
        // Build our actual INTRODUCE1 message.
        let intro1_real = Introduce1::new(
            AuthKeyType::ED25519_SHA3_256,
            ipt.intro_desc.ipt_sid_key().as_bytes().to_vec(),
            encrypted_body,
        );
        /// Handler which expects just `INTRODUCE_ACK`
        struct Handler {
            /// Sender for `INTRODUCE_ACK`
            intro_ack_tx: proto_oneshot::Sender<IntroduceAck>,
        }
        impl MsgHandler for Handler {
170
            fn handle_msg(
170
                &mut self,
170
                msg: AnyRelayMsg,
170
            ) -> Result<MetaCellDisposition, tor_proto::Error> {
170
                self.intro_ack_tx
170
                    .deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
170
            }
        }
        let failed_map_err = |error| FAE::IntroductionExchange { error, intro_index };
        let (intro_ack_tx, intro_ack_rx) = proto_oneshot::channel();
        let handler = Handler { intro_ack_tx };
        let num_hops = intro_circ
            .m_num_hops()
            .map_err(|error| FAE::IntroductionCircuitObtain { error, intro_index })?;
        // NOTE: Should we allow this to be longer in case the introduction point is grievously
        // overloaded?
        let timeout_roundtrip =
            self.estimate_timeout(&[(1, TimeoutsAction::RoundTrip { length: num_hops })]);
        debug!(
            "hs conn to {}: RPT {} IPT {}: making introduction - sending INTRODUCE1",
            &self.hsid,
            rend_pt.as_inner(),
            intro_index,
        );
        // TODO(conflux) This error handling is horrible. Problem is that this Mock system requires
        // to send back a tor_circmgr::Error while our reply handler requires a tor_proto::Error.
        // And unifying both is hard here considering it needs to be converted to yet another Error
        // type "FAE" so we have to do these hoops and jumps.
        intro_circ
            .m_start_conversation_last_hop(Some(intro1_real.into()), handler)
            .await
            .map_err(|e| {
                let proto_error = match e {
                    tor_circmgr::Error::Protocol { error, .. } => error,
                    _ => tor_proto::Error::CircuitClosed,
                };
                FAE::IntroductionExchange {
                    error: proto_error,
                    intro_index,
                }
            })?;
        // Status is checked by `.success()`, and we don't look at the extensions;
        // just discard the known-successful `IntroduceAck`
        let _: IntroduceAck = self
            .runtime
            .timeout(timeout_roundtrip, intro_ack_rx.recv(failed_map_err))
            .await
            .map_err(|_timeout: TimeoutError| FAE::IntroductionTimeout { intro_index })??
            .success()
            .map_err(|status| FAE::IntroductionFailed {
166
                status,
166
                intro_index,
166
            })?;
        debug!(
            "hs conn to {}: RPT {} IPT {}: making introduction - success",
            &self.hsid,
            rend_pt.as_inner(),
            intro_index,
        );
        // Having received INTRODUCE_ACK. we can forget about this circuit
        // (and potentially tear it down).
        drop(intro_circ);
        Ok((
            rendezvous,
            Introduced {
                handshake_state,
                marker: PhantomData,
            },
        ))
170
    }
    /// Attempt (once) to connect a rendezvous circuit using the given intro pt.
    ///
    /// That is to say, we simply wait for a RENDEZVOUS2 message,
    /// and if we get one, we add a virtual hop.
    ///
    /// Timeouts here might be due to the IPT, RPT, service,
    /// or any of the intermediate relays.
    ///
    /// If, rather than a timeout, we actually encounter some kind of error,
    /// we'll return the appropriate `FailedAttemptError`.
    /// (Who is responsible may vary, so the `FailedAttemptError` variant will reflect that.)
4
    async fn complete_rendezvous(
4
        &'c self,
4
        ipt: &UsableIntroPt<'_>,
4
        rendezvous: Rendezvous<'c, R, M>,
4
        introduced: Introduced<R, M>,
4
        is_single_onion_service: bool,
4
    ) -> Result<DataTunnel!(R, M), FAE> {
        /// Largest number of hops that the onion service must build for _its_
        /// circuits to our rendezvous points.
        ///
        /// This is 4 hops (assuming that it has full vanguards enabled) plus one for the
        /// renedezvous point itself.
        const MAX_PEER_REND_HOPS: usize = 5;
        /// Largest number of retries that we think the peer might make if its
        /// circuits are failing.
        const MAX_PEER_CIRC_RETRIES: u32 = 3;
4
        let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
4
        let intro_index = ipt.intro_index;
4
        let failed_map_err = |error| FAE::RendezvousCompletionCircuitError {
            error,
            intro_index,
            rend_pt: rend_pt.clone(),
        };
4
        debug!(
            "hs conn to {}: RPT {} IPT {}: awaiting rendezvous completion",
            &self.hsid,
            rend_pt.as_inner(),
            intro_index,
        );
4
        let num_hops = rendezvous
4
            .rend_tunnel
4
            .m_num_own_hops()
            // This is not necessarily the best error, but it isn't totally wrong.
            // We can't wrap the tor_circuit error in anything else that makes sense.
            // See #2513.
4
            .map_err(|error| FAE::RendezvousCircuitObtain { error })?;
        // Maximum length of the circuit that the peer will build to the rendezvous point.
4
        let peer_rend_circ_len = if is_single_onion_service {
            1
        } else {
4
            MAX_PEER_REND_HOPS
        };
        // The total number of hops from the peer to us.
        //
        // We subtract 1 because both circuits terminate at the rendezvous point.
4
        let total_circ_len = peer_rend_circ_len + num_hops - 1;
        // Limit on the duration of each attempt for activities involving both
        // RPT and IPT.
4
        let rpt_ipt_timeout = self.estimate_timeout(&[
4
            // The API requires us to specify a number of circuit builds and round trips.
4
            // So what we tell the estimator is a rather imprecise description.
4
            //
4
            // What we are timing here is:
4
            //
4
            //    INTRODUCE2 goes from IPT to HS.
4
            //    This happens in parallel with our waiting for the INTRODUCE_ACK,
4
            //    and we know that our own introduction circuit is always at least
4
            //    as long as the peer's (even if they are using full vanguards),
4
            //    so we don't need any additional delay here.
4
            //
4
            //    HS builds to our RPT
4
            (
4
                MAX_PEER_CIRC_RETRIES,
4
                TimeoutsAction::BuildCircuit {
4
                    length: peer_rend_circ_len,
4
                },
4
            ),
4
            //
4
            //    RENDEZVOUS1 goes from HS to RPT.  `peer_circ_len`, one-way.
4
            //    RENDEZVOUS2 goes from RPT to us.  `num_hops`, one-way.
4
            (
4
                1,
4
                TimeoutsAction::OneWay {
4
                    length: total_circ_len,
4
                },
4
            ),
4
        ]);
4
        let rend2_msg: Rendezvous2 = self
4
            .runtime
4
            .timeout(rpt_ipt_timeout, rendezvous.rend2_rx.recv(failed_map_err))
4
            .await
4
            .map_err(|_: TimeoutError| FAE::RendezvousCompletionTimeout {
                intro_index,
                rend_pt: rend_pt.clone(),
            })??;
4
        debug!(
            "hs conn to {}: RPT {} IPT {}: received RENDEZVOUS2",
            &self.hsid,
            rend_pt.as_inner(),
            intro_index,
        );
        // In theory would be great if we could have multiple introduction attempts in parallel
        // with similar x,X values but different IPTs.  However, our HS experts don't
        // think increasing parallelism here is important:
        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
4
        let handshake_state = introduced.handshake_state;
        // Try to complete the cryptographic handshake.
4
        let keygen =
4
            self.mocks
4
                .rendezvous_handshake(handshake_state, rend2_msg, intro_index, &rend_pt)?;
4
        let params = onion_circparams_from_netparams(self.netdir.params())
4
            .map_err(into_internal!("Failed to build CircParameters"))?;
        // TODO: We may be able to infer more about the supported protocols of the other side from our
        // handshake, and from its descriptors.
        //
        // TODO CC: This is relevant for congestion control!
4
        let protocols = self.netdir.client_protocol_status().required_protocols();
4
        rendezvous
4
            .rend_tunnel
4
            .m_extend_virtual(
4
                handshake::RelayProtocol::HsV3,
4
                handshake::HandshakeRole::Initiator,
4
                keygen,
4
                params,
4
                protocols,
4
            )
4
            .await
4
            .map_err(into_internal!(
                "actually this is probably a 'circuit closed' error" // TODO HS
            ))?;
4
        debug!(
            "hs conn to {}: RPT {} IPT {}: HS circuit established",
            &self.hsid,
            rend_pt.as_inner(),
            intro_index,
        );
4
        Ok(rendezvous.rend_tunnel)
4
    }
    /// Helper to estimate a timeout for a complicated operation
    ///
    /// `actions` is a list of `(count, action)`, where each entry
    /// represents doing `action`, `count` times sequentially.
    ///
    /// Combines the timeout estimates and returns an overall timeout.
358
    fn estimate_timeout(&self, actions: &[(u32, TimeoutsAction)]) -> Duration {
        // This algorithm is, perhaps, wrong.  For uncorrelated variables, a particular
        // percentile estimate for a sum of random variables, is not calculated by adding the
        // percentile estimates of the individual variables.
        //
        // But the actual lengths of times of the operations aren't uncorrelated.
        // If they were *perfectly* correlated, then this addition would be correct.
        // It will do for now; it just might be rather longer than it ought to be.
358
        actions
358
            .iter()
362
            .map(|(count, action)| {
362
                self.circpool
362
                    .m_estimate_timeout(action)
362
                    .saturating_mul(*count)
362
            })
358
            .fold(Duration::ZERO, Duration::saturating_add)
358
    }
}
/// Mocks used for testing `connect.rs`
///
/// This is different to `MockableConnectorData`,
/// which is used to *replace* this file, when testing `state.rs`.
///
/// `MocksForConnect` provides mock facilities for *testing* this file.
//
// TODO this should probably live somewhere else, maybe tor-circmgr even?
// TODO this really ought to be made by macros or something
trait MocksForConnect<R>: Clone {
    /// HS circuit pool
    type HsCircPool: MockableCircPool<R>;
    /// A random number generator
    type Rng: rand::Rng + rand::CryptoRng;
    /// Key generator used for generating the keys for the virtual hop.
    type KeyGenerator: tor_proto::client::circuit::handshake::KeyGenerator + Send;
    /// Tell tests we got this descriptor text
    fn test_got_desc(&self, _: &HsDesc) {}
    /// Tell tests we got this data tunnel.
4
    fn test_got_tunnel(&self, _: &DataTunnel!(R, Self)) {}
    /// Tell tests we have obtained and sorted the intros like this
    fn test_got_ipts(&self, _: &[UsableIntroPt]) {}
    /// Return a random number generator
    fn thread_rng(&self) -> Self::Rng;
    /// Complete the rendezvous handshake, returning the resulting keygen
    fn rendezvous_handshake(
        &self,
        handshake_state: hs_ntor::HsNtorClientState,
        rend2_msg: Rendezvous2,
        intro_index: IntroPtIndex,
        rend_pt: &RendPtIdentityForError,
    ) -> Result<Self::KeyGenerator, FAE>;
}
/// Mock for `HsCircPool`
///
/// Methods start with `m_` to avoid the following problem:
/// `ClientCirc::start_conversation` (say) means
/// to use the inherent method if one exists,
/// but will use a trait method if there isn't an inherent method.
///
/// So if the inherent method is renamed, the call in the impl here
/// turns into an always-recursive call.
/// This is not detected by the compiler due to the situation being
/// complicated by futures, `#[async_trait]` etc.
/// <https://github.com/rust-lang/rust/issues/111177>
#[async_trait]
trait MockableCircPool<R> {
    /// Directory tunnel.
    type DirTunnel: MockableClientDir;
    /// Data tunnel.
    type DataTunnel: MockableClientData;
    /// Intro tunnel.
    type IntroTunnel: MockableClientIntro;
    async fn m_get_or_launch_dir(
        &self,
        netdir: &NetDir,
        target: impl CircTarget + Send + Sync + 'async_trait,
    ) -> tor_circmgr::Result<Self::DirTunnel>;
    async fn m_get_or_launch_intro(
        &self,
        netdir: &NetDir,
        target: impl CircTarget + Send + Sync + 'async_trait,
    ) -> tor_circmgr::Result<Self::IntroTunnel>;
    /// Client circuit
    async fn m_get_or_launch_client_rend<'a>(
        &self,
        netdir: &'a NetDir,
    ) -> tor_circmgr::Result<(Self::DataTunnel, Relay<'a>)>;
    /// Estimate timeout
    fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration;
}
/// Mock for onion service client directory tunnel.
#[async_trait]
trait MockableClientDir: Debug {
    /// Client circuit
    type DirStream: AsyncRead + AsyncWrite + Send + Unpin;
    async fn m_begin_dir_stream(&self) -> tor_circmgr::Result<Self::DirStream>;
    /// Get a tor_dirclient::SourceInfo for this circuit, if possible.
    fn m_source_info(&self) -> tor_proto::Result<Option<SourceInfo>>;
    /// Return the length of this circuit.
    fn m_num_hops(&self) -> tor_circmgr::Result<usize>;
}
/// Mock for onion service client data tunnel.
#[async_trait]
trait MockableClientData: Debug {
    /// Conversation
    type Conversation<'r>
    where
        Self: 'r;
    /// Converse
    async fn m_start_conversation_last_hop(
        &self,
        msg: Option<AnyRelayMsg>,
        reply_handler: impl MsgHandler + Send + 'static,
    ) -> tor_circmgr::Result<Self::Conversation<'_>>;
    /// Add a virtual hop to the circuit.
    async fn m_extend_virtual(
        &self,
        protocol: handshake::RelayProtocol,
        role: handshake::HandshakeRole,
        handshake: impl handshake::KeyGenerator + Send,
        params: CircParameters,
        capabilities: &tor_protover::Protocols,
    ) -> tor_circmgr::Result<()>;
    /// Return the number of our own hops in this circuit.
    ///
    /// This does not count any hops for the service's rendezvous circuit.
    /// It does count our virtual hop, if we have one.
    /// (That isn't a problem, since we only use this method to calculate
    /// timeouts, and we only calculate timeouts _before_ we establish
    /// the virtual hop.)
    fn m_num_own_hops(&self) -> tor_circmgr::Result<usize>;
}
/// Mock for onion service client introduction tunnel.
#[async_trait]
trait MockableClientIntro: Debug {
    /// Conversation
    type Conversation<'r>
    where
        Self: 'r;
    /// Converse
    async fn m_start_conversation_last_hop(
        &self,
        msg: Option<AnyRelayMsg>,
        reply_handler: impl MsgHandler + Send + 'static,
    ) -> tor_circmgr::Result<Self::Conversation<'_>>;
    /// Return the number of hops in this circuit.
    fn m_num_hops(&self) -> tor_circmgr::Result<usize>;
}
impl<R: Runtime> MocksForConnect<R> for () {
    type HsCircPool = HsCircPool<R>;
    type Rng = rand::rngs::ThreadRng;
    type KeyGenerator = HsNtorHkdfKeyGenerator;
    fn thread_rng(&self) -> Self::Rng {
        rand::rng()
    }
    fn rendezvous_handshake(
        &self,
        handshake_state: hs_ntor::HsNtorClientState,
        rend2_msg: Rendezvous2,
        intro_index: IntroPtIndex,
        rend_pt: &RendPtIdentityForError,
    ) -> Result<Self::KeyGenerator, FAE> {
        // Try to complete the cryptographic handshake.
        handshake_state
            .client_receive_rend(rend2_msg.handshake_info())
            // If this goes wrong. either the onion service has mangled the crypto,
            // or the rendezvous point has misbehaved (that that is possible is a protocol bug),
            // or we have used the wrong handshake_state (let's assume that's not true).
            //
            // If this happens we'll go and try another RPT.
            .map_err(|error| FAE::RendezvousCompletionHandshake {
                error,
                intro_index,
                rend_pt: rend_pt.clone(),
            })
    }
}
#[async_trait]
impl<R: Runtime> MockableCircPool<R> for HsCircPool<R> {
    type DirTunnel = ClientOnionServiceDirTunnel;
    type DataTunnel = ClientOnionServiceDataTunnel;
    type IntroTunnel = ClientOnionServiceIntroTunnel;
    #[instrument(level = "trace", skip_all)]
    async fn m_get_or_launch_dir(
        &self,
        netdir: &NetDir,
        target: impl CircTarget + Send + Sync + 'async_trait,
    ) -> tor_circmgr::Result<Self::DirTunnel> {
        Ok(HsCircPool::get_or_launch_client_dir(self, netdir, target).await?)
    }
    #[instrument(level = "trace", skip_all)]
    async fn m_get_or_launch_intro(
        &self,
        netdir: &NetDir,
        target: impl CircTarget + Send + Sync + 'async_trait,
    ) -> tor_circmgr::Result<Self::IntroTunnel> {
        Ok(HsCircPool::get_or_launch_client_intro(self, netdir, target).await?)
    }
    #[instrument(level = "trace", skip_all)]
    async fn m_get_or_launch_client_rend<'a>(
        &self,
        netdir: &'a NetDir,
    ) -> tor_circmgr::Result<(Self::DataTunnel, Relay<'a>)> {
        HsCircPool::get_or_launch_client_rend(self, netdir).await
    }
    fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
        HsCircPool::estimate_timeout(self, action)
    }
}
#[async_trait]
impl MockableClientDir for ClientOnionServiceDirTunnel {
    /// Client circuit
    type DirStream = tor_proto::client::stream::DataStream;
    async fn m_begin_dir_stream(&self) -> tor_circmgr::Result<Self::DirStream> {
        Self::begin_dir_stream(self).await
    }
    /// Get a tor_dirclient::SourceInfo for this circuit, if possible.
    fn m_source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
        SourceInfo::from_tunnel(self)
    }
    fn m_num_hops(&self) -> tor_circmgr::Result<usize> {
        self.n_hops()
    }
}
#[async_trait]
impl MockableClientData for ClientOnionServiceDataTunnel {
    type Conversation<'r> = tor_proto::Conversation<'r>;
    async fn m_start_conversation_last_hop(
        &self,
        msg: Option<AnyRelayMsg>,
        reply_handler: impl MsgHandler + Send + 'static,
    ) -> tor_circmgr::Result<Self::Conversation<'_>> {
        Self::start_conversation(self, msg, reply_handler, TargetHop::LastHop).await
    }
    async fn m_extend_virtual(
        &self,
        protocol: handshake::RelayProtocol,
        role: handshake::HandshakeRole,
        handshake: impl handshake::KeyGenerator + Send,
        params: CircParameters,
        capabilities: &tor_protover::Protocols,
    ) -> tor_circmgr::Result<()> {
        Self::extend_virtual(self, protocol, role, handshake, params, capabilities).await
    }
    fn m_num_own_hops(&self) -> tor_circmgr::Result<usize> {
        self.n_hops()
    }
}
#[async_trait]
impl MockableClientIntro for ClientOnionServiceIntroTunnel {
    type Conversation<'r> = tor_proto::Conversation<'r>;
    async fn m_start_conversation_last_hop(
        &self,
        msg: Option<AnyRelayMsg>,
        reply_handler: impl MsgHandler + Send + 'static,
    ) -> tor_circmgr::Result<Self::Conversation<'_>> {
        Self::start_conversation(self, msg, reply_handler, TargetHop::LastHop).await
    }
    fn m_num_hops(&self) -> tor_circmgr::Result<usize> {
        self.n_hops()
    }
}
#[async_trait]
impl MockableConnectorData for Data {
    type DataTunnel = ClientOnionServiceDataTunnel;
    type MockGlobalState = ();
    async fn connect<R: Runtime>(
        connector: &HsClientConnector<R>,
        netdir: Arc<NetDir>,
        config: Arc<Config>,
        hsid: HsId,
        data: &mut Self,
        secret_keys: HsClientSecretKeys,
    ) -> Result<Self::DataTunnel, ConnError> {
        connect(connector, netdir, config, hsid, data, secret_keys).await
    }
    fn tunnel_is_ok(tunnel: &Self::DataTunnel) -> bool {
        !tunnel.is_closed()
    }
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_time_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    #![allow(dead_code, unused_variables)] // TODO HS TESTS delete, after tests are completed
    use super::*;
    use crate::*;
    use itertools::chain;
    use std::iter;
    use tokio_crate as tokio;
    use tor_async_utils::JoinReadWrite;
    use tor_basic_utils::test_rng::{TestingRng, testing_rng};
    use tor_hscrypto::pk::{HsClientDescEncKey, HsClientDescEncKeypair};
    use tor_llcrypto::pk::curve25519;
    use tor_netdoc::doc::{hsdesc::test_data, netstatus::Lifetime};
    use tor_rtcompat::RuntimeSubstExt as _;
    use tor_rtcompat::tokio::TokioNativeTlsRuntime;
    use tor_rtmock::simple_time::SimpleMockTimeProvider;
    use tracing_test::traced_test;
    #[derive(derive_more::Debug, Default)]
    struct MocksGlobal {
        hsdirs_asked: Vec<OwnedCircTarget>,
        got_desc: Option<HsDesc>,
        #[debug(skip)]
        rendezvous: Option<Box<dyn MsgHandler + Send + 'static>>,
        intro_acks: Vec<(IntroduceAck, MetaCellDisposition)>,
    }
    #[derive(Clone, Debug)]
    struct Mocks<I> {
        mglobal: Arc<Mutex<MocksGlobal>>,
        id: I,
    }
    struct MockKeyGenerator;
    impl handshake::KeyGenerator for MockKeyGenerator {
        fn expand(self, _keylen: usize) -> tor_proto::Result<tor_bytes::SecretBuf> {
            todo!()
        }
    }
    impl<R: Runtime> MocksForConnect<R> for Mocks<()> {
        type HsCircPool = Mocks<()>;
        type Rng = TestingRng;
        type KeyGenerator = MockKeyGenerator;
        fn test_got_desc(&self, desc: &HsDesc) {
            self.mglobal.lock().unwrap().got_desc = Some(desc.clone());
        }
        fn test_got_ipts(&self, desc: &[UsableIntroPt]) {}
        fn thread_rng(&self) -> Self::Rng {
            testing_rng()
        }
        fn rendezvous_handshake(
            &self,
            _handshake_state: hs_ntor::HsNtorClientState,
            _rend2_msg: Rendezvous2,
            _intro_index: IntroPtIndex,
            _rend_pt: &RendPtIdentityForError,
        ) -> Result<Self::KeyGenerator, FAE> {
            Ok(MockKeyGenerator)
        }
    }
    #[async_trait]
    impl<R: Runtime> MockableCircPool<R> for Mocks<()> {
        type DataTunnel = Mocks<()>;
        type DirTunnel = Mocks<()>;
        type IntroTunnel = Mocks<()>;
        async fn m_get_or_launch_dir(
            &self,
            _netdir: &NetDir,
            target: impl CircTarget + Send + Sync + 'async_trait,
        ) -> tor_circmgr::Result<Self::DirTunnel> {
            let target = OwnedCircTarget::from_circ_target(&target);
            self.mglobal.lock().unwrap().hsdirs_asked.push(target);
            Ok(self.clone())
        }
        async fn m_get_or_launch_intro(
            &self,
            _netdir: &NetDir,
            target: impl CircTarget + Send + Sync + 'async_trait,
        ) -> tor_circmgr::Result<Self::IntroTunnel> {
            Ok(self.clone())
        }
        /// Client circuit
        async fn m_get_or_launch_client_rend<'a>(
            &self,
            netdir: &'a NetDir,
        ) -> tor_circmgr::Result<(Self::DataTunnel, Relay<'a>)> {
            // Pick one of the relays we know to be in the test net as our RPT
            let rpt = netdir.by_id(&Ed25519Identity::from([12; 32])).unwrap();
            Ok((self.clone(), rpt))
        }
        fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
            Duration::from_secs(10)
        }
    }
    #[async_trait]
    impl MockableClientDir for Mocks<()> {
        type DirStream = JoinReadWrite<futures::io::Cursor<Box<[u8]>>, futures::io::Sink>;
        async fn m_begin_dir_stream(&self) -> tor_circmgr::Result<Self::DirStream> {
            let response = format!(
                r#"HTTP/1.1 200 OK
{}"#,
                test_data::TEST_DATA_2
            )
            .into_bytes()
            .into_boxed_slice();
            Ok(JoinReadWrite::new(
                futures::io::Cursor::new(response),
                futures::io::sink(),
            ))
        }
        fn m_source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
            Ok(None)
        }
        fn m_num_hops(&self) -> tor_circmgr::Result<usize> {
            Ok(4)
        }
    }
    #[async_trait]
    impl MockableClientData for Mocks<()> {
        type Conversation<'r> = &'r ();
        async fn m_start_conversation_last_hop(
            &self,
            msg: Option<AnyRelayMsg>,
            mut reply_handler: impl MsgHandler + Send + 'static,
        ) -> tor_circmgr::Result<Self::Conversation<'_>> {
            match msg {
                Some(AnyRelayMsg::EstablishRendezvous(_)) => {
                    let reply = RendezvousEstablished::default();
                    let disp = reply_handler.handle_msg(reply.into()).unwrap();
                    assert_eq!(disp, MetaCellDisposition::Consumed);
                    // Save this, because we'll need to use it later,
                    // when handling the INTRODUCE1
                    let mut global = self.mglobal.lock().unwrap();
                    global.rendezvous = Some(Box::new(reply_handler));
                }
                _ => panic!("unexpected msg {msg:?}"),
            }
            Ok(&())
        }
        async fn m_extend_virtual(
            &self,
            protocol: handshake::RelayProtocol,
            role: handshake::HandshakeRole,
            handshake: impl handshake::KeyGenerator + Send,
            params: CircParameters,
            capabilities: &tor_protover::Protocols,
        ) -> tor_circmgr::Result<()> {
            Ok(())
        }
        fn m_num_own_hops(&self) -> tor_circmgr::Result<usize> {
            Ok(4)
        }
    }
    #[async_trait]
    impl MockableClientIntro for Mocks<()> {
        type Conversation<'r> = &'r ();
        async fn m_start_conversation_last_hop(
            &self,
            msg: Option<AnyRelayMsg>,
            mut reply_handler: impl MsgHandler + Send + 'static,
        ) -> tor_circmgr::Result<Self::Conversation<'_>> {
            match msg {
                Some(AnyRelayMsg::Introduce1(introduce1)) => {
                    let mut global = self.mglobal.lock().unwrap();
                    let (reply, expected_disp) = global.intro_acks.remove(0);
                    let disp = reply_handler.handle_msg(reply.into()).unwrap();
                    assert_eq!(disp, expected_disp);
                    // Mock the service's response
                    let rendezvous = global
                        .rendezvous
                        .as_mut()
                        .expect("got INTRODUCE1 before ESTABLISH_RENDEZVOUS?!");
                    let reply = Rendezvous2::new(b"dummy handshake info, ignored");
                    let disp = rendezvous.handle_msg(reply.into()).unwrap();
                    assert_eq!(disp, MetaCellDisposition::ConversationFinished);
                }
                _ => panic!("unexpected msg {msg:?}"),
            }
            Ok(&())
        }
        fn m_num_hops(&self) -> tor_circmgr::Result<usize> {
            Ok(4)
        }
    }
    fn ks_hsc_desc_enc() -> HsClientDescEncKeypair {
        let pk: HsClientDescEncKey = curve25519::PublicKey::from(test_data::TEST_PUBKEY_2).into();
        let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
        HsClientDescEncKeypair::new(pk, sk)
    }
    fn expected_hsdesc(hsid: HsId, netdir: &NetDir, now: SystemTime) -> HsDesc {
        let time_period = netdir.hs_time_period();
        let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
            .unwrap()
            .compute_blinded_key(time_period)
            .unwrap();
        let hs_blind_id = hs_blind_id_key.id();
        HsDesc::parse_decrypt_validate(
            test_data::TEST_DATA_2,
            &hs_blind_id,
            now,
            &subcredential,
            Some(&ks_hsc_desc_enc()),
        )
        .unwrap()
        .dangerously_assume_timely()
    }
    fn build_test_netdir() -> Arc<NetDir> {
        let valid_after = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
        let fresh_until = valid_after + humantime::parse_duration("1 hours").unwrap();
        let valid_until = valid_after + humantime::parse_duration("24 hours").unwrap();
        let lifetime = Lifetime::new(valid_after, fresh_until, valid_until).unwrap();
        let netdir = tor_netdir::testnet::construct_custom_netdir_with_params(
            tor_netdir::testnet::simple_net_func,
            iter::empty::<(&str, _)>(),
            Some(lifetime),
        )
        .expect("failed to build default testing netdir");
        Arc::new(netdir.unwrap_if_sufficient().unwrap())
    }
    #[traced_test]
    #[tokio::test]
    async fn test_connect() {
        use MetaCellDisposition::*;
        let netdir = build_test_netdir();
        let runtime = TokioNativeTlsRuntime::current().unwrap();
        let now = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
        let mock_sp = SimpleMockTimeProvider::from_wallclock(now);
        let runtime = runtime
            .with_sleep_provider(mock_sp.clone())
            .with_coarse_time_provider(mock_sp.clone());
        let success = (
            IntroduceAck::new(IntroduceAckStatus::SUCCESS),
            ConversationFinished,
        );
        let nack = (
            IntroduceAck::new(IntroduceAckStatus::NOT_RECOGNIZED),
            ConversationFinished,
        );
        // The number of times to make Context:connect() fail due to intro NACK
        //
        // Set to 5 in order to trigger a rate-limit for all 6 HsDirs:
        //
        // there are 6 HsDirs in total, one of which is "used up" by the
        // first (successful) connect() attempt below.
        const INTRO_FAIL_COUNT: usize = 5;
        /// The number of times we expect the client to retry the
        /// introduction per connect() call
        /// (it will essentially try two rounds of `intro_rend_connect()`,
        /// once with the cached descriptor, and once with the potentially
        /// new descriptor).
        const IPT_RETRY_COUNT: usize = 12;
        // The first introduction will succeed
        let intro_acks = chain!(
            [&success],
            // But the next INTRO_FAIL_COUNT connect() will fail
            // (+1 because we want to fail *again*, in order to find
            // that there's now a limit on all our HsDirs)
            [&nack; IPT_RETRY_COUNT * (INTRO_FAIL_COUNT + 1)],
            // One more round of failures, to trigger a refecth after the rate-limit is lifted
            [&nack; IPT_RETRY_COUNT - 1],
            // After refetching the descriptor, the client will retry the introduction,
            // and succeed.
            [&success],
        )
        .cloned()
        .collect();
        let mglobal = Arc::new(Mutex::new(MocksGlobal {
            intro_acks,
            ..Default::default()
        }));
        let mocks = Mocks { mglobal, id: () };
        // From C Tor src/test/test_hs_common.c test_build_address
        let hsid = test_data::TEST_HSID_2.into();
        let mut data = Data::default();
        let mut expected_hsdirs_asked = 1;
        let mut secret_keys_builder = HsClientSecretKeysBuilder::default();
        secret_keys_builder.ks_hsc_desc_enc(ks_hsc_desc_enc());
        let secret_keys = secret_keys_builder.build().unwrap();
        let ctx = Context::new(
            &runtime,
            &mocks,
            Arc::clone(&netdir),
            Default::default(),
            hsid,
            secret_keys,
            mocks.clone(),
        )
        .unwrap();
        let _got = ctx.connect(&mut data).await.unwrap();
        // Our mock IPT hasn't sent any NACKs yet
        assert!(!logs_contain("NACKed, refetching descriptor and retrying"));
        let hsdesc = expected_hsdesc(hsid, &netdir, now);
        {
            let mglobal = mocks.mglobal.lock().unwrap();
            assert_eq!(mglobal.hsdirs_asked.len(), expected_hsdirs_asked);
            // TODO hs: here and in other places, consider implementing PartialEq instead, or creating
            // an assert_dbg_eq macro (which would be part of a test_helpers crate or something)
            assert_eq!(
                format!("{:?}", mglobal.got_desc),
                format!("{:?}", Some(hsdesc.clone()))
            );
        }
        // Check how long the descriptor is valid for
        let (start_time, end_time) = data.desc.as_ref().unwrap().desc.bounds();
        assert_eq!(start_time, None);
        let desc_valid_until = humantime::parse_rfc3339("2023-02-11T20:00:00Z").unwrap();
        assert_eq!(end_time, Some(desc_valid_until));
        // These attempts will all fail due to intro NACK,
        // and trigger a rate-limit for all 6 HsDirs
        for i in 1..=INTRO_FAIL_COUNT + 1 {
            let err = ctx.connect(&mut data).await.unwrap_err();
            let is_intro_nack = |e| matches!(e, FAE::IntroductionFailed { status, .. });
            // All attempts failed because of our repeated intro NACKs
            assert!(matches!(err, CE::Failed(e) if e.clone().into_iter().all(is_intro_nack)));
            {
                assert!(logs_contain("NACKed, refetching descriptor and retrying"));
                let mglobal = mocks.mglobal.lock().unwrap();
                // Because all intro attempts failed with NACK (NOT_RECOGNIZED),
                // the client must've tried to refetch the descriptor
                if i <= INTRO_FAIL_COUNT {
                    // No rate limiting yet, so the client must've tried to fetch a new
                    // descriptor, before failing again.
                    expected_hsdirs_asked += 1;
                    assert!(!logs_contain("but all hsdirs are rate-limited"));
                    assert_eq!(mglobal.hsdirs_asked.len(), expected_hsdirs_asked);
                } else {
                    // The final failure won't lead to an HsDir fetch
                    // because all HsDirs will be rate-limited at that point
                    assert!(logs_contain("but all hsdirs are rate-limited"));
                    assert_eq!(mglobal.hsdirs_asked.len(), expected_hsdirs_asked);
                }
                // Same descriptor each time
                // TODO hs: here and in other places, consider implementing PartialEq instead, or creating
                // an assert_dbg_eq macro (which would be part of a test_helpers crate or something)
                assert_eq!(
                    format!("{:?}", mglobal.got_desc),
                    format!("{:?}", Some(hsdesc.clone()))
                );
            }
            let (start_time, end_time) = data.desc.as_ref().unwrap().desc.bounds();
            assert_eq!(start_time, None);
            let desc_valid_until = humantime::parse_rfc3339("2023-02-11T20:00:00Z").unwrap();
            assert_eq!(end_time, Some(desc_valid_until));
        }
        // By default, the HsDir fetches are rate-limited for 15min
        mock_sp.advance(Duration::from_secs(15 * 60));
        // Finally, we succeed.
        let _got = ctx.connect(&mut data).await.unwrap();
        // And it turns out we did, in fact refetch the descriptor
        // Finally, we try again, but find that all HsDirs are now rate-limited!
        // So now we advance the time to lift the rate limit, and hope that
        //
        // TODO HS TESTS: we could extend our mock infrastructure
        // to support returning a different hsdesc this time,
        // with various revision counters, to check that the client is indeed
        // keeping the newest one.
        {
            assert!(logs_contain("NACKed, refetching descriptor and retrying"));
            let mglobal = mocks.mglobal.lock().unwrap();
            // Because all intro attempts failed with NACK (NOT_RECOGNIZED),
            // the client must've tried to refetch the descriptor
            expected_hsdirs_asked += 1;
            assert_eq!(mglobal.hsdirs_asked.len(), expected_hsdirs_asked);
        }
        // TODO HS TESTS: check the circuit in got is the one we gave out
        // TODO HS TESTS: continue with this
    }
    // TODO HS TESTS: Test IPT state management and expiry:
    //   - obtain a test descriptor with only a broken ipt
    //     (broken in the sense that intro can be attempted, but will fail somehow)
    //   - try to make a connection and expect it to fail
    //   - assert that the ipt data isn't empty
    //   - cause the descriptor to expire (advance clock)
    //   - start using a mocked RNG if we weren't already and pin its seed here
    //   - make a new descriptor with two IPTs: the broken one from earlier, and a new one
    //   - make a new connection
    //   - use test_got_ipts to check that the random numbers
    //     would sort the bad intro first, *and* that the good one is appears first
    //   - assert that connection succeeded
    //   - cause the circuit and descriptor to expire (advance clock)
    //   - go back to the previous descriptor contents, but with a new validity period
    //   - try to make a connection
    //   - use test_got_ipts to check that only the broken ipt is present
    // TODO HS TESTS: test retries (of every retry loop we have here)
    // TODO HS TESTS: test error paths
}