Lines
71.11 %
Functions
6.59 %
Branches
100 %
//! Main implementation of the connection functionality
use std::collections::HashMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use async_trait::async_trait;
use educe::Educe;
use futures::{AsyncRead, AsyncWrite};
use itertools::Itertools;
use rand::RngExt;
use tor_bytes::Writeable;
use tor_cell::relaycell::hs::IntroduceAckStatus;
use tor_cell::relaycell::hs::intro_payload::{self, IntroduceHandshakePayload};
use tor_cell::relaycell::hs::pow::ProofOfWork;
use tor_cell::relaycell::msg::{AnyRelayMsg, Introduce1, Rendezvous2};
use tor_circmgr::build::onion_circparams_from_netparams;
use tor_circmgr::{
ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel, ClientOnionServiceIntroTunnel,
};
use tor_dirclient::SourceInfo;
use tor_error::{Bug, debug_report, warn_report};
use tor_hscrypto::Subcredential;
use tor_hscrypto::time::TimePeriod;
use tor_proto::TargetHop;
use tor_proto::client::circuit::handshake::hs_ntor::{self, HsNtorHkdfKeyGenerator};
use tracing::{debug, instrument, trace, warn};
use web_time_compat::{Duration, Instant, SystemTime};
use retry_error::RetryError;
use safelog::{DispRedacted, Sensitive};
use tor_cell::relaycell::RelayMsg;
use tor_cell::relaycell::hs::{
AuthKeyType, EstablishRendezvous, IntroduceAck, RendezvousEstablished,
use tor_checkable::{Timebound, timed::TimerangeBound};
use tor_circmgr::hspool::HsCircPool;
use tor_circmgr::timeouts::Action as TimeoutsAction;
use tor_dirclient::request::Requestable as _;
use tor_error::{HasRetryTime as _, RetryTime};
use tor_error::{internal, into_internal};
use tor_hscrypto::RendCookie;
use tor_hscrypto::pk::{HsBlindId, HsId, HsIdKey};
use tor_linkspec::{CircTarget, HasRelayIds, OwnedCircTarget, RelayId};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_netdir::{NetDir, Relay};
use tor_netdoc::doc::hsdesc::{HsDesc, IntroPointDesc};
use tor_proto::client::circuit::{CircParameters, handshake};
use tor_proto::{MetaCellDisposition, MsgHandler};
use tor_rtcompat::{Runtime, SleepProviderExt as _, TimeoutError};
use crate::Config;
use crate::err::RendPtIdentityForError;
use crate::pow::HsPowClient;
use crate::proto_oneshot;
use crate::relay_info::ipt_to_circtarget;
use crate::state::MockableConnectorData;
use crate::{ConnError, DescriptorError, DescriptorErrorDetail};
use crate::{FailedAttemptError, IntroPtIndex, rend_pt_identity_for_error};
use crate::{HsClientConnector, HsClientSecretKeys};
use ConnError as CE;
use FailedAttemptError as FAE;
/// Given `R, M` where `M: MocksForConnect<M>`, expand to the mockable `ClientCirc`
// This is quite annoying. But the alternative is to write out `<... as // ...>`
// each time, since otherwise the compile complains about ambiguous associated types.
macro_rules! DataTunnel{ { $R:ty, $M:ty } => {
<<$M as MocksForConnect<$R>>::HsCircPool as MockableCircPool<$R>>::DataTunnel
} }
/// Information about a hidden service, including our connection history
#[derive(Default, Educe)]
#[educe(Debug)]
// This type is actually crate-private, since it isn't re-exported, but it must
// be `pub` because it appears as a default for a type parameter in HsClientConnector.
pub struct Data {
/// The latest known onion service descriptor for this service.
desc: DataHsDesc,
/// Information about the latest status of trying to connect to this service
/// through each of its introduction points.
ipts: DataIpts,
/// Information about the requery period of each HsDir we have recently queried.
///
/// Each entry represents an HsDir that we cannot requery until
/// its specified timestamp elapses.
/// Any HsDir that does not have an entry in this map can be requeried.
hsdirs: DataHsDirs,
}
/// An onion service descriptor and its associated HsBlindId.
#[derive(Debug)]
struct HsDescForTp {
/// The TP this descriptor is for.
/// Used for determining whether a newly fetched descriptor
/// is for the same time period as this one.
time_period: TimePeriod,
/// The descriptor
desc: TimerangeBound<HsDesc>,
/// Part of `Data` that relates to our information about the HsDir requery periods
type DataHsDirs = HashMap<RelayIdForRequeryPeriod, SystemTime>;
/// Marker type, to make typed HsDir [`RelayIdFor`] keys
#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Debug)]
struct RequeryPeriodMap;
/// Lookup key for looking up and recording our IPT use experiences
type RelayIdForRequeryPeriod = RelayIdFor<RequeryPeriodMap>;
/// Part of `Data` that relates to the HS descriptor
type DataHsDesc = Option<HsDescForTp>;
/// Part of `Data` that relates to our information about introduction points
type DataIpts = HashMap<RelayIdForExperience, IptExperience>;
/// How things went last time we tried to use this introduction point
/// Neither this data structure, nor [`Data`], is responsible for arranging that we expire this
/// information eventually. If we keep reconnecting to the service, we'll retain information
/// about each IPT indefinitely, at least so long as they remain listed in the descriptors we
/// receive.
/// Expiry of unused data is handled by `state.rs`, according to `last_used` in `ServiceState`.
/// Choosing which IPT to prefer is done by obtaining an `IptSortKey`
/// (from this and other information).
//
// Don't impl Ord for IptExperience. We obtain `Option<&IptExperience>` from our
// data structure, and if IptExperience were Ord then Option<&IptExperience> would be Ord
// but it would be the wrong sort order: it would always prefer None, ie untried IPTs.
struct IptExperience {
/// How long it took us to get whatever outcome occurred
/// We prefer fast successes to slow ones.
/// Then, we prefer failures with earlier `RetryTime`,
/// and, lastly, faster failures to slower ones.
duration: Duration,
/// What happened and when we might try again
/// Note that we don't actually *enforce* the `RetryTime` here, just sort by it
/// using `RetryTime::loose_cmp`.
/// We *do* return an error that is itself `HasRetryTime` and expect our callers
/// to honour that.
outcome: Result<(), RetryTime>,
/// Actually make a HS connection, updating our recorded state as necessary
/// `connector` is provided only for obtaining the runtime and netdir (and `mock_for_state`).
/// Obviously, `connect` is not supposed to go looking in `services`.
/// This function handles all necessary retrying of fallible operations,
/// (and, therefore, must also limit the total work done for a particular call).
/// This function has a minimum of functionality, since it is the boundary
/// between "mock connection, used for testing `state.rs`" and
/// "mock circuit and netdir, used for testing `connect.rs`",
/// so it is not, itself, unit-testable.
#[instrument(level = "trace", skip_all)]
pub(crate) async fn connect<R: Runtime>(
connector: &HsClientConnector<R>,
netdir: Arc<NetDir>,
config: Arc<Config>,
hsid: HsId,
data: &mut Data,
secret_keys: HsClientSecretKeys,
) -> Result<ClientOnionServiceDataTunnel, ConnError> {
Context::new(
&connector.runtime,
&*connector.circpool,
netdir,
config,
hsid,
secret_keys,
(),
)?
.connect(data)
.await
/// Common context for a single request to connect to a hidden service
/// This saves on passing this same set of (immutable) values (or subsets thereof)
/// to each method in the principal functional code, everywhere.
/// It also provides a convenient type to be `Self`.
/// Its lifetime is one request to make a new client circuit to a hidden service,
/// including all the retries and timeouts.
struct Context<'c, R: Runtime, M: MocksForConnect<R>> {
/// Runtime
runtime: &'c R,
/// Circpool
circpool: &'c M::HsCircPool,
/// Netdir
// TODO holding onto the netdir for the duration of our attempts is not ideal
// but doing better is fairly complicated. See discussions here:
// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1228#note_2910545
// https://gitlab.torproject.org/tpo/core/arti/-/issues/884
/// Configuration
/// Secret keys to use
/// HS ID
hsid: DispRedacted<HsId>,
/// Blinded HS ID
hs_blind_id: HsBlindId,
/// The subcredential to use during this time period
subcredential: Subcredential,
/// Mock data
mocks: M,
/// Details of an established rendezvous point
/// Intermediate value for progress during a connection attempt.
struct Rendezvous<'r, R: Runtime, M: MocksForConnect<R>> {
/// RPT as a `Relay`
rend_relay: Relay<'r>,
/// Rendezvous circuit
rend_tunnel: DataTunnel!(R, M),
/// Rendezvous cookie
rend_cookie: RendCookie,
/// Receiver that will give us the RENDEZVOUS2 message.
/// The sending ended is owned by the handler
/// which receives control messages on the rendezvous circuit,
/// and which was installed when we sent `ESTABLISH_RENDEZVOUS`.
/// (`RENDEZVOUS2` is the message containing the onion service's side of the handshake.)
rend2_rx: proto_oneshot::Receiver<Rendezvous2>,
/// Dummy, to placate compiler
/// Covariant without dropck or interfering with Send/Sync will do fine.
marker: PhantomData<fn() -> (R, M)>,
/// Random value used as part of IPT selection
type IptSortRand = u32;
/// Details of an apparently-useable introduction point
struct UsableIntroPt<'i> {
/// Index in HS descriptor
intro_index: IntroPtIndex,
/// IPT descriptor
intro_desc: &'i IntroPointDesc,
/// IPT `CircTarget`
intro_target: OwnedCircTarget,
sort_rand: IptSortRand,
/// Lookup key for looking up and recording information about a relay
/// Used to identify a relay when looking to see what happened last time we used it,
/// and storing that information after we tried it.
/// We store the experience information under an arbitrary one of the relay's identities,
/// as returned by the `HasRelayIds::identities().next()`.
/// When we do lookups, we check all the relay's identities to see if we find
/// anything relevant.
/// If relay identities permute in strange ways, whether we find our previous
/// knowledge about them is not particularly well defined, but that's fine.
/// While this is, structurally, a relay identity, it is not suitable for other purposes.
#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug)]
struct RelayIdFor<K> {
/// The relay id
inner: RelayId,
/// Phantom data to allow parameterizing over `K`
/// `K` is a marker type that represents the kind of map
/// this key will be used in.
marker: PhantomData<K>,
/// Marker type, to make typed Ipt exprience [`RelayIdFor`] keys
struct IptExperienceMap;
type RelayIdForExperience = RelayIdFor<IptExperienceMap>;
/// Details of an apparently-successful INTRODUCE exchange
struct Introduced<R: Runtime, M: MocksForConnect<R>> {
/// End-to-end crypto NTORv3 handshake with the service
/// Created as part of generating our `INTRODUCE1`,
/// and then used when processing `RENDEZVOUS2`.
handshake_state: hs_ntor::HsNtorClientState,
/// `R` and `M` only used for getting to mocks.
impl<K> RelayIdFor<K> {
/// Create a new key for use with `T`
fn new(inner: RelayId) -> Self {
Self {
inner,
marker: Default::default(),
/// Identities to use to try to find previous experience information about this IPT
fn for_lookup<T: HasRelayIds>(ids: &T) -> impl Iterator<Item = Self> + '_ {
ids.identities().map(|id| RelayIdFor::new(id.to_owned()))
/// Identity to use to store previous experience information about this IPT
fn for_store<T: HasRelayIds>(ids: &T) -> Result<Self, Bug> {
let id = ids
.identities()
.next()
.ok_or_else(|| internal!("introduction point relay with no identities"))?
.to_owned();
Ok(RelayIdFor::new(id))
/// Sort key for an introduction point, for selecting the best IPTs to try first
/// Ordering is most preferable first.
/// We use this to sort our `UsableIpt`s using `.sort_by_key`.
/// (This implementation approach ensures that we obey all the usual ordering invariants.)
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
struct IptSortKey {
/// Sort by how preferable the experience was
outcome: IptSortKeyOutcome,
/// Failing that, choose randomly
/// Component of the [`IptSortKey`] representing outcome of our last attempt, if any
/// This is the main thing we use to decide which IPTs to try first.
/// It is calculated for each IPT
/// (via `.sort_by_key`, so repeatedly - it should therefore be cheap to make.)
enum IptSortKeyOutcome {
/// Prefer successes
Success {
/// Prefer quick ones
},
/// Failing that, try one we don't know to have failed
Untried,
/// Failing that, it'll have to be ones that didn't work last time
Failed {
/// Prefer failures with an earlier retry time
retry_time: tor_error::LooseCmpRetryTime,
/// Failing that, prefer quick failures (rather than slow ones eg timeouts)
impl From<Option<&IptExperience>> for IptSortKeyOutcome {
fn from(experience: Option<&IptExperience>) -> IptSortKeyOutcome {
use IptSortKeyOutcome as O;
match experience {
None => O::Untried,
Some(IptExperience { duration, outcome }) => match outcome {
Ok(()) => O::Success {
duration: *duration,
Err(retry_time) => O::Failed {
retry_time: (*retry_time).into(),
/// Token indicating that a descriptor fetch is wanted
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
struct RefetchDescriptor;
impl<'c, R: Runtime, M: MocksForConnect<R>> Context<'c, R, M> {
/// Make a new `Context` from the input data
fn new(
) -> Result<Self, ConnError> {
let time_period = netdir.hs_time_period();
let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
.map_err(|_| CE::InvalidHsId)?
.compute_blinded_key(time_period)
.map_err(
// TODO HS what on earth do these errors mean, in practical terms ?
// In particular, we'll want to convert them to a ConnError variant,
// but what ErrorKind should they have ?
into_internal!("key blinding error, don't know how to handle"),
)?;
let hs_blind_id = hs_blind_id_key.id();
Ok(Context {
hsid: DispRedacted(hsid),
hs_blind_id,
subcredential,
circpool,
runtime,
mocks,
})
/// Called by the `connect` function in this module.
async fn connect(&self, data: &mut Data) -> Result<DataTunnel!(R, M), ConnError> {
// This function must do the following, retrying as appropriate.
// - Look up the onion descriptor in the state.
// - Download the onion descriptor if one isn't there.
// - In parallel:
// - Pick a rendezvous point from the netdirprovider and launch a
// rendezvous circuit to it. Then send ESTABLISH_INTRO.
// - Pick a number of introduction points (1 or more) and try to
// launch circuits to them.
// - On a circuit to an introduction point, send an INTRODUCE1 cell.
// - Wait for a RENDEZVOUS2 cell on the rendezvous circuit
// - Add a virtual hop to the rendezvous circuit.
// - Return the rendezvous circuit.
let mocks = self.mocks.clone();
let desc = self
.descriptor_ensure(&mut data.desc, &mut data.hsdirs, None)
.await?;
mocks.test_got_desc(desc);
let tunnel = match self.intro_rend_connect(desc, &mut data.ipts).await {
Ok(tunnel) => tunnel,
Err(e) => {
let is_intro_nack = |e| {
if let FAE::IntroductionFailed { status, .. } = e {
status == IntroduceAckStatus::NOT_RECOGNIZED
} else {
false
let retry = if let CE::Failed(ref errors) = e {
// If any of the errors are an INTRODUCE_NACK,
// then it's worth retrying one more time
// with a fresh descriptor.
errors
.clone()
.into_iter()
.any(is_intro_nack)
.then_some(RefetchDescriptor)
None
if let Some(RefetchDescriptor) = retry {
debug!(
"Introduction to {} NACKed, refetching descriptor and retrying",
&self.hsid,
);
// Refetch the descriptor and try one more time
.descriptor_ensure(&mut data.desc, &mut data.hsdirs, retry)
self.intro_rend_connect(desc, &mut data.ipts).await?
return Err(e);
mocks.test_got_tunnel(&tunnel);
Ok(tunnel)
/// Ensure that `Data.desc` contains the HS descriptor
/// If we have a previously-downloaded descriptor, which is still valid,
/// just returns a reference to it.
/// Otherwise, tries to obtain the descriptor by downloading it from hsdir(s).
/// If `refetch` is `true`, a new descriptor will be refetched
/// from the hsdir(s) unconditionally.
/// Does all necessary retries and timeouts.
/// Returns an error if no valid descriptor could be found.
#[allow(clippy::cognitive_complexity)] // TODO: Refactor
async fn descriptor_ensure<'d>(
&self,
data: &'d mut DataHsDesc,
recent_hsdirs: &'d mut DataHsDirs,
refetch: Option<RefetchDescriptor>,
) -> Result<&'d HsDesc, CE> {
// Maximum number of hsdir connection and retrieval attempts we'll make
let max_total_attempts = self
.config
.retry
.hs_desc_fetch_attempts()
.try_into()
// User specified a very large u32. We must be downcasting it to 16bit!
// let's give them as many retries as we can manage.
.unwrap_or(usize::MAX);
let now = self.runtime.wallclock();
let unwrap_valid_desc = |data: &'d mut DataHsDesc| -> &'d HsDesc {
data.as_ref()
.expect("Some but now None")
.desc
.as_ref()
.check_valid_at(&now)
.expect("Ok but now Err")
// We retain a previously obtained descriptor precisely until its lifetime expires,
// or until we refetch a more recent one
// as a result of an `intro_rend_connect()` failure caused by introduce NACK.
// When it expires, we discard it completely and try to obtain a new one.
// We only replace our cached descriptor if the new one has a higher revision counter.
// TODO SPEC: Discuss HS descriptor lifetime and expiry client behaviour
let stored_revision = data.as_ref().and_then(|previously| {
if let Ok(desc) = previously.desc.as_ref().check_valid_at(&now) {
// Ideally we would just return desc but that confuses borrowck,
// so we have to use unwrap_valid_desc() each time
// we need the known-to-be-Some descriptor instead.
// https://github.com/rust-lang/rust/issues/51545
Some((desc.revision(), previously.time_period))
// Seems to be not valid now. Try to fetch a fresh one.
});
match (stored_revision, refetch) {
(Some(_), None) => {
// Our cached descriptor is still timely,
// and we don't need to fetch a new one.
return Ok(unwrap_valid_desc(data));
(None, _) => {
// We don't have a timely descriptor,
// so ignore the requery_interval,
// and reach out to all HsDirs
recent_hsdirs.clear();
(_, Some(RefetchDescriptor)) => {
// We have been asked to try to fetch a new descriptor.
// We will only reach out to the HsDirs that are
// not within the `hs_dir_requery_interval`
// First, filter out any HsDirs that we *can* requery
recent_hsdirs.retain(|_hsdir, requery| *requery > now);
let working_tp = self.netdir.hs_time_period();
let hs_dirs = self.netdir.hs_dirs_download(
self.hs_blind_id,
working_tp,
&mut self.mocks.thread_rng(),
trace!(
"HS desc fetch for {}, for period {}, using {} hsdirs",
hs_dirs.len()
let hs_dirs = hs_dirs
.filter(|hsdir| {
// Skip over any HsDirs that we are not allowed to requery right now
let should_skip = recent_hsdirs.keys().any(|recent| {
RelayIdForRequeryPeriod::for_lookup(hsdir).any(|id| id == *recent)
!should_skip
.collect::<Vec<_>>();
if hs_dirs.is_empty() {
warn!(
"Tried to fetch HS desc for {}, for period {}, but all hsdirs are rate-limited",
&self.hsid, working_tp,
if stored_revision.is_none() {
// We can't fetch a new descriptor, and we don't have a cached one.
return Err(CE::NoUsableHsDirs);
// Return our cached descriptor
// We might consider launching requests to multiple HsDirs in parallel.
// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1118#note_2894463
// But C Tor doesn't and our HS experts don't consider that important:
// https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914436
// (Additionally, making multiple HSDir requests at once may make us
// more vulnerable to traffic analysis.)
let mut attempts = hs_dirs.iter().cycle().take(max_total_attempts);
let mut errors = RetryError::in_attempt_to("retrieve hidden service descriptor");
let desc = loop {
let relay = match attempts.next() {
Some(relay) => relay,
None => {
return Err(if errors.is_empty() {
CE::NoHsDirs
CE::DescriptorDownload(errors)
let hsdir_for_error: Sensitive<Ed25519Identity> = (*relay.id()).into();
let hsdir = RelayIdForRequeryPeriod::for_store(relay)?;
// Ensure we wait at least hs_dir_requery_interval() until we try to
// fecth from this HsDir again
recent_hsdirs.insert(hsdir, now + self.config.retry.hs_dir_requery_interval());
match self.descriptor_fetch_attempt(relay).await {
Ok(desc) => break desc,
Err(error) => {
if error.should_report_as_suspicious() {
// Note that not every protocol violation is suspicious:
// we only warn on the protocol violations that look like attempts
// to do a traffic tagging attack via hsdir inflation.
// (See proposal 360.)
warn_report!(
&error,
"Suspicious failure while downloading hsdesc for {} from relay {}",
relay.display_relay_ids(),
debug_report!(
"failed hsdir desc fetch for {} from {}/{}",
&relay.id(),
&relay.rsa_id()
errors.push_timed(
tor_error::Report(DescriptorError {
hsdir: hsdir_for_error,
error,
}),
self.runtime.now(),
Some(self.runtime.wallclock()),
// If our existing descriptor is newer than the one we have just fetched,
// we should retain it.
if let Some(stored_revision) = stored_revision {
// It is safe to dangerously_assume_timely,
// as descriptor_fetch_attempt has already checked the timeliness of the descriptor.
let new_desc = desc.as_ref().dangerously_assume_timely();
// Revision counters are monotonically increasing within a given time period.
// If our newly fetched descriptor has the same HsBlindId as our cached one,
// it means they are both used for the same time period,
// and so we should only update our cache if the new descriptor is more recent
// (i.e. it has a higher revision counter).
if stored_revision >= (new_desc.revision(), working_tp) {
// Our cached descriptor is still timely, and has a higher revision counter
// than the one we've just fetched, so we retain it.
// Store the bounded value in the cache for reuse,
// but return a reference to the unwrapped `HsDesc`.
// The `HsDesc` must be owned by `data.desc`,
// so first add it to `data.desc`,
// and then dangerously_assume_timely to get a reference out again.
let desc = HsDescForTp {
time_period: working_tp,
desc,
let ret = data.insert(desc);
Ok(ret.desc.as_ref().dangerously_assume_timely())
/// Make one attempt to fetch the descriptor from a specific hsdir
/// No timeout
/// On success, returns the descriptor.
/// While the returned descriptor is `TimerangeBound`, its validity at the current time *has*
/// been checked.
async fn descriptor_fetch_attempt(
hsdir: &Relay<'_>,
) -> Result<TimerangeBound<HsDesc>, DescriptorErrorDetail> {
let max_len: usize = self
.netdir
.params()
.hsdir_max_desc_size
.get()
.map_err(into_internal!("BoundedInt was not truly bounded!"))?;
let request = {
let mut r = tor_dirclient::request::HsDescDownloadRequest::new(self.hs_blind_id);
r.set_max_len(max_len);
r
"hsdir for {}, trying {}/{}, request {:?} (http request {:?})",
&hsdir.id(),
&hsdir.rsa_id(),
&request,
request.debug_request()
let circuit = self
.circpool
.m_get_or_launch_dir(&self.netdir, OwnedCircTarget::from_circ_target(hsdir))
let n_hops = circuit.m_num_hops()?;
let timeout_roundtrip =
self.estimate_timeout(&[(1, TimeoutsAction::RoundTrip { length: n_hops })]);
let source: Option<SourceInfo> = circuit
.m_source_info()
.map_err(into_internal!("Couldn't get SourceInfo for circuit"))?;
let mut stream = self
.runtime
// NOTE: In fact this timeout is overkill: this operation should succeed immediately,
// since we always send BEGINDIR messages optimistically (without waiting for a reply).
// But since our code is complex, and since it could become possible for this to block
// if the circuit is saturated or we implement proposal 367 or something,
// we may as well have _some_ timeout here.
.timeout(timeout_roundtrip, circuit.m_begin_dir_stream())
.await?
.map_err(DescriptorErrorDetail::Circuit)?;
let request_future =
tor_dirclient::send_request(self.runtime, &request, &mut stream, source);
let response = self
.timeout(timeout_roundtrip, request_future)
.map_err(|dir_error| match dir_error {
tor_dirclient::Error::RequestFailed(rfe) => DescriptorErrorDetail::from(rfe.error),
tor_dirclient::Error::CircMgr(ce) => into_internal!(
"tor-dirclient complains about circmgr going wrong but we gave it a stream"
)(ce)
.into(),
other => into_internal!(
"tor-dirclient gave unexpected error, tor-hsclient code needs updating"
)(other)
})?;
let desc_text = response.into_output_string().map_err(|rfe| rfe.error)?;
let hsc_desc_enc = self.secret_keys.keys.ks_hsc_desc_enc.as_ref();
HsDesc::parse_decrypt_validate(
&desc_text,
&self.hs_blind_id,
now,
&self.subcredential,
hsc_desc_enc,
)
.map_err(DescriptorErrorDetail::from)
/// Given the descriptor, try to connect to service
/// Does all necessary retries, timeouts, etc.
async fn intro_rend_connect(
desc: &HsDesc,
data: &mut DataIpts,
) -> Result<DataTunnel!(R, M), CE> {
// Maximum number of rendezvous/introduction attempts we'll make
.hs_intro_rend_attempts()
// We can't reliably distinguish IPT failure from RPT failure, so we iterate over IPTs
// (best first) and each time use a random RPT.
// We limit the number of rendezvous establishment attempts, separately, since we don't
// try to talk to the intro pt until we've established the rendezvous circuit.
let mut rend_attempts = 0..max_total_attempts;
// But, we put all the errors into the same bucket, since we might have a mixture.
let mut errors = RetryError::in_attempt_to("make circuit to hidden service");
// Note that IntroPtIndex is *not* the index into this Vec.
// It is the index into the original list of introduction points in the descriptor.
let mut usable_intros: Vec<UsableIntroPt> = desc
.intro_points()
.iter()
.enumerate()
.map(|(intro_index, intro_desc)| {
let intro_index = intro_index.into();
let intro_target = ipt_to_circtarget(intro_desc, &self.netdir)
.map_err(|error| FAE::UnusableIntro { error, intro_index })?;
// Lack of TAIT means this clone
let intro_target = OwnedCircTarget::from_circ_target(&intro_target);
Ok::<_, FailedAttemptError>(UsableIntroPt {
intro_index,
intro_desc,
intro_target,
sort_rand: self.mocks.thread_rng().random(),
.filter_map(|entry| match entry {
Ok(y) => Some(y),
errors.push_timed(e, self.runtime.now(), Some(self.runtime.wallclock()));
.collect_vec();
// Delete experience information for now-unlisted intro points
// Otherwise, as the IPTs change `Data` might grow without bound,
// if we keep reconnecting to the same HS.
data.retain(|k, _v| {
usable_intros
.any(|ipt| RelayIdForExperience::for_lookup(&ipt.intro_target).any(|id| &id == k))
// Join with existing state recording our experiences,
// sort by descending goodness, and then randomly
// (so clients without any experience don't all pile onto the same, first, IPT)
usable_intros.sort_by_key(|ipt: &UsableIntroPt| {
let experience =
RelayIdForExperience::for_lookup(&ipt.intro_target).find_map(|id| data.get(&id));
IptSortKey {
outcome: experience.into(),
sort_rand: ipt.sort_rand,
self.mocks.test_got_ipts(&usable_intros);
let mut intro_attempts = usable_intros.iter().cycle().take(max_total_attempts);
// We retain a rendezvous we managed to set up in here. That way if we created it, and
// then failed before we actually needed it, we can reuse it.
// If we exit with an error, we will waste it - but because we isolate things we do
// for different services, it wouldn't be reusable anyway.
let mut saved_rendezvous = None;
// If we are using proof-of-work DoS mitigation, this chooses an
// algorithm and initial effort, and adjusts that effort when we retry.
let mut pow_client = HsPowClient::new(&self.hs_blind_id, desc);
// We might consider making multiple INTRODUCE attempts to different
// IPTs in parallel, and somehow aggregating the errors and
// experiences.
// However our HS experts don't consider that important:
// https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
// Parallelizing our HsCircPool circuit building would likely have
// greater impact. (See #1149.)
loop {
// When did we start doing things that depended on the IPT?
// Used for recording our experience with the selected IPT
let mut ipt_use_started = None::<Instant>;
// Error handling inner async block (analogous to an IEFE):
// * Ok(Some()) means this attempt succeeded
// * Ok(None) means all attempts exhausted
// * Err(error) means this attempt failed
let outcome = async {
// We establish a rendezvous point first. Although it appears from reading
// this code that this means we serialise establishment of the rendezvous and
// introduction circuits, this isn't actually the case. The circmgr maintains
// a pool of circuits. What actually happens in the "standing start" case is
// that we obtain a circuit for rendezvous from the circmgr's pool, expecting
// one to be available immediately; the circmgr will then start to build a new
// one to replenish its pool, and that happens in parallel with the work we do
// here - but in arrears. If the circmgr pool is empty, then we must wait.
// Perhaps this should be parallelised here. But that's really what the pool
// is for, since we expect building the rendezvous circuit and building the
// introduction circuit to take about the same length of time.
// We *do* serialise the ESTABLISH_RENDEZVOUS exchange, with the
// building of the introduction circuit. That could be improved, at the cost
// of some additional complexity here.
// Our HS experts don't consider it important to increase the parallelism:
// https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914444
// https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914445
if saved_rendezvous.is_none() {
debug!("hs conn to {}: setting up rendezvous point", &self.hsid);
// Establish a rendezvous circuit.
let Some(_): Option<usize> = rend_attempts.next() else {
return Ok(None);
saved_rendezvous = Some(self.establish_rendezvous().await?);
let Some(ipt) = intro_attempts.next() else {
let intro_index = ipt.intro_index;
let is_single_onion_service = desc.is_single_onion_service();
let proof_of_work = match pow_client.solve().await {
Ok(solution) => solution,
"failing to compute proof-of-work, trying without. ({:?})",
e
// We record how long things take, starting from here, as
// as a statistic we'll use for the IPT in future.
// This is stored in a variable outside this async block,
// so that the outcome handling can use it.
ipt_use_started = Some(self.runtime.now());
// No `Option::get_or_try_insert_with`, or we'd avoid this expect()
let rend_pt_for_error = rend_pt_identity_for_error(
&saved_rendezvous
.expect("just made Some")
.rend_relay,
"hs conn to {}: RPT {}",
rend_pt_for_error.as_inner()
let (rendezvous, introduced) =
self.exchange_introduce(ipt, &mut saved_rendezvous, proof_of_work)
// TODO: Maybe try, once, to extend-and-reuse the intro circuit.
// If the introduction fails, the introduction circuit is in principle
// still usable. We believe that in this case, C Tor extends the intro
// circuit by one hop to the next IPT to try. That saves on building a
// whole new 3-hop intro circuit. However, our HS experts tell us that
// if introduction fails at one IPT it is likely to fail at the others too,
// so that optimisation might reduce our network impact and time to failure,
// but isn't likely to improve our chances of success.
// However, it's not clear whether this approach risks contaminating
// the 2nd attempt with some fault relating to the introduction point.
// The 1st ipt might also gain more knowledge about which HS we're talking to.
// TODO SPEC: Discuss extend-and-reuse HS intro circuit after nack
?;
#[allow(unused_variables)] // it's *supposed* to be unused
let saved_rendezvous = (); // don't use `saved_rendezvous` any more, use rendezvous
let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
let circ = self.complete_rendezvous(ipt, rendezvous, introduced, is_single_onion_service)
"hs conn to {}: RPT {} IPT {}: success",
rend_pt.as_inner(),
Ok::<_, FAE>(Some((intro_index, circ)))
.await;
// Store the experience `outcome` we had with IPT `intro_index`, in `data`
#[allow(clippy::unused_unit)] // -> () is here for error handling clarity
let mut store_experience = |intro_index, outcome| -> () {
(|| {
let ipt = usable_intros
.find(|ipt| ipt.intro_index == intro_index)
.ok_or_else(|| internal!("IPT not found by index"))?;
let id = RelayIdForExperience::for_store(&ipt.intro_target)?;
let started = ipt_use_started.ok_or_else(|| {
internal!("trying to record IPT use but no IPT start time noted")
let duration = self
.now()
.checked_duration_since(started)
.ok_or_else(|| internal!("clock overflow calculating IPT use duration"))?;
data.insert(id, IptExperience { duration, outcome });
Ok::<_, Bug>(())
})()
.unwrap_or_else(|e| warn_report!(e, "error recording HS IPT use experience"));
match outcome {
Ok(Some((intro_index, y))) => {
// Record successful outcome in Data
store_experience(intro_index, Ok(()));
return Ok(y);
Ok(None) => return Err(CE::Failed(errors)),
debug_report!(&error, "hs conn to {}: attempt failed", &self.hsid);
// Record error outcome in Data, if in fact we involved the IPT
// at all. The IPT information is be retrieved from `error`,
// since only some of the errors implicate the introduction point.
if let Some(intro_index) = error.intro_index() {
store_experience(intro_index, Err(error.retry_time()));
errors.push_timed(error, self.runtime.now(), Some(self.runtime.wallclock()));
// If we are using proof-of-work DoS mitigation, try harder next time
pow_client.increase_effort();
/// Make one attempt to establish a rendezvous circuit
/// This doesn't really depend on anything,
/// other than (obviously) the isolation implied by our circuit pool.
/// In particular it doesn't depend on the introduction point.
/// Applies timeouts as appropriate.
async fn establish_rendezvous(&'c self) -> Result<Rendezvous<'c, R, M>, FAE> {
let (rend_tunnel, rend_relay) = self
.m_get_or_launch_client_rend(&self.netdir)
.map_err(|error| FAE::RendezvousCircuitObtain { error })?;
let rend_pt = rend_pt_identity_for_error(&rend_relay);
let rend_cookie: RendCookie = self.mocks.thread_rng().random();
let message = EstablishRendezvous::new(rend_cookie);
let (rend_established_tx, rend_established_rx) = proto_oneshot::channel();
let (rend2_tx, rend2_rx) = proto_oneshot::channel();
/// Handler which expects `RENDEZVOUS_ESTABLISHED` and then
/// `RENDEZVOUS2`. Returns each message via the corresponding `oneshot`.
struct Handler {
/// Sender for a RENDEZVOUS_ESTABLISHED message.
rend_established_tx: proto_oneshot::Sender<RendezvousEstablished>,
/// Sender for a RENDEZVOUS2 message.
rend2_tx: proto_oneshot::Sender<Rendezvous2>,
impl MsgHandler for Handler {
fn handle_msg(
&mut self,
msg: AnyRelayMsg,
) -> Result<MetaCellDisposition, tor_proto::Error> {
// The first message we expect is a RENDEZVOUS_ESTABALISHED.
if self.rend_established_tx.still_expected() {
self.rend_established_tx
.deliver_expected_message(msg, MetaCellDisposition::Consumed)
self.rend2_tx
.deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
"hs conn to {}: RPT {}: sending ESTABLISH_RENDEZVOUS",
let failed_map_err = |error| FAE::RendezvousEstablish {
rend_pt: rend_pt.clone(),
let handler = Handler {
rend_established_tx,
rend2_tx,
let num_hops = rend_tunnel
.m_num_own_hops()
self.estimate_timeout(&[(1, TimeoutsAction::RoundTrip { length: num_hops })]);
// TODO(conflux) This error handling is horrible. Problem is that this Mock system requires
// to send back a tor_circmgr::Error while our reply handler requires a tor_proto::Error.
// And unifying both is hard here considering it needs to be converted to yet another Error
// type "FAE" so we have to do these hoops and jumps.
rend_tunnel
.m_start_conversation_last_hop(Some(message.into()), handler)
.map_err(|e| {
let proto_error = match e {
tor_circmgr::Error::Protocol { error, .. } => error,
_ => tor_proto::Error::CircuitClosed,
FAE::RendezvousEstablish {
error: proto_error,
// `start_conversation` returns as soon as the control message has been sent.
// We need to obtain the RENDEZVOUS_ESTABLISHED message, which is "returned" via the oneshot.
let _: RendezvousEstablished = self
.timeout(timeout_roundtrip, rend_established_rx.recv(failed_map_err))
|_timeout: tor_rtcompat::TimeoutError| FAE::RendezvousEstablishTimeout {
)??;
"hs conn to {}: RPT {}: got RENDEZVOUS_ESTABLISHED",
Ok(Rendezvous {
rend_tunnel,
rend_cookie,
rend_relay,
rend2_rx,
marker: PhantomData,
/// Attempt (once) to send an INTRODUCE1 and wait for the INTRODUCE_ACK
/// `take`s the input `rendezvous` (but only takes it if it gets that far)
/// and, if successful, returns it.
/// (This arranges that the rendezvous is "used up" precisely if
/// we sent its secret somewhere.)
/// Although this function handles the `Rendezvous`,
/// nothing in it actually involves the rendezvous point.
/// So if there's a failure, it's purely to do with the introduction point.
#[allow(clippy::cognitive_complexity, clippy::type_complexity)] // TODO: Refactor
async fn exchange_introduce(
&'c self,
ipt: &UsableIntroPt<'_>,
rendezvous: &mut Option<Rendezvous<'c, R, M>>,
proof_of_work: Option<ProofOfWork>,
) -> Result<(Rendezvous<'c, R, M>, Introduced<R, M>), FAE> {
"hs conn to {}: IPT {}: obtaining intro circuit",
&self.hsid, intro_index,
let intro_circ = self
.m_get_or_launch_intro(
&self.netdir,
ipt.intro_target.clone(), // &OwnedCircTarget isn't CircTarget apparently
.map_err(|error| FAE::IntroductionCircuitObtain { error, intro_index })?;
let rendezvous = rendezvous.take().ok_or_else(|| internal!("no rend"))?;
"hs conn to {}: RPT {} IPT {}: making introduction",
// Now we construct an introduce1 message and perform the first part of the
// rendezvous handshake.
// This process is tricky because the header of the INTRODUCE1 message
// -- which depends on the IntroPt configuration -- is authenticated as
// part of the HsDesc handshake.
// Construct the header, since we need it as input to our encryption.
let intro_header = {
let ipt_sid_key = ipt.intro_desc.ipt_sid_key();
let intro1 = Introduce1::new(
AuthKeyType::ED25519_SHA3_256,
ipt_sid_key.as_bytes().to_vec(),
vec![],
let mut header = vec![];
intro1
.encode_onto(&mut header)
.map_err(into_internal!("couldn't encode intro1 header"))?;
header
// Construct the introduce payload, which tells the onion service how to find
// our rendezvous point. (We could do this earlier if we wanted.)
let intro_payload = {
let onion_key =
intro_payload::OnionKey::NtorOnionKey(*rendezvous.rend_relay.ntor_onion_key());
let linkspecs = rendezvous
.rend_relay
.linkspecs()
.map_err(into_internal!("Couldn't encode link specifiers"))?;
let payload = IntroduceHandshakePayload::new(
rendezvous.rend_cookie,
onion_key,
linkspecs,
proof_of_work,
let mut encoded = vec![];
payload
.write_onto(&mut encoded)
.map_err(into_internal!("Couldn't encode introduce1 payload"))?;
encoded
// Perform the cryptographic handshake with the onion service.
let service_info = hs_ntor::HsNtorServiceInfo::new(
ipt.intro_desc.svc_ntor_key().clone(),
ipt.intro_desc.ipt_sid_key().clone(),
self.subcredential,
let handshake_state =
hs_ntor::HsNtorClientState::new(&mut self.mocks.thread_rng(), service_info);
let encrypted_body = handshake_state
.client_send_intro(&intro_header, &intro_payload)
.map_err(into_internal!("can't begin hs-ntor handshake"))?;
// Build our actual INTRODUCE1 message.
let intro1_real = Introduce1::new(
ipt.intro_desc.ipt_sid_key().as_bytes().to_vec(),
encrypted_body,
/// Handler which expects just `INTRODUCE_ACK`
/// Sender for `INTRODUCE_ACK`
intro_ack_tx: proto_oneshot::Sender<IntroduceAck>,
self.intro_ack_tx
let failed_map_err = |error| FAE::IntroductionExchange { error, intro_index };
let (intro_ack_tx, intro_ack_rx) = proto_oneshot::channel();
let handler = Handler { intro_ack_tx };
let num_hops = intro_circ
.m_num_hops()
// NOTE: Should we allow this to be longer in case the introduction point is grievously
// overloaded?
"hs conn to {}: RPT {} IPT {}: making introduction - sending INTRODUCE1",
intro_circ
.m_start_conversation_last_hop(Some(intro1_real.into()), handler)
FAE::IntroductionExchange {
// Status is checked by `.success()`, and we don't look at the extensions;
// just discard the known-successful `IntroduceAck`
let _: IntroduceAck = self
.timeout(timeout_roundtrip, intro_ack_rx.recv(failed_map_err))
.map_err(|_timeout: TimeoutError| FAE::IntroductionTimeout { intro_index })??
.success()
.map_err(|status| FAE::IntroductionFailed {
status,
"hs conn to {}: RPT {} IPT {}: making introduction - success",
// Having received INTRODUCE_ACK. we can forget about this circuit
// (and potentially tear it down).
drop(intro_circ);
Ok((
rendezvous,
Introduced {
handshake_state,
))
/// Attempt (once) to connect a rendezvous circuit using the given intro pt.
/// That is to say, we simply wait for a RENDEZVOUS2 message,
/// and if we get one, we add a virtual hop.
/// Timeouts here might be due to the IPT, RPT, service,
/// or any of the intermediate relays.
/// If, rather than a timeout, we actually encounter some kind of error,
/// we'll return the appropriate `FailedAttemptError`.
/// (Who is responsible may vary, so the `FailedAttemptError` variant will reflect that.)
async fn complete_rendezvous(
rendezvous: Rendezvous<'c, R, M>,
introduced: Introduced<R, M>,
is_single_onion_service: bool,
) -> Result<DataTunnel!(R, M), FAE> {
/// Largest number of hops that the onion service must build for _its_
/// circuits to our rendezvous points.
/// This is 4 hops (assuming that it has full vanguards enabled) plus one for the
/// renedezvous point itself.
const MAX_PEER_REND_HOPS: usize = 5;
/// Largest number of retries that we think the peer might make if its
/// circuits are failing.
const MAX_PEER_CIRC_RETRIES: u32 = 3;
let failed_map_err = |error| FAE::RendezvousCompletionCircuitError {
"hs conn to {}: RPT {} IPT {}: awaiting rendezvous completion",
let num_hops = rendezvous
.rend_tunnel
// This is not necessarily the best error, but it isn't totally wrong.
// We can't wrap the tor_circuit error in anything else that makes sense.
// See #2513.
// Maximum length of the circuit that the peer will build to the rendezvous point.
let peer_rend_circ_len = if is_single_onion_service {
1
MAX_PEER_REND_HOPS
// The total number of hops from the peer to us.
// We subtract 1 because both circuits terminate at the rendezvous point.
let total_circ_len = peer_rend_circ_len + num_hops - 1;
// Limit on the duration of each attempt for activities involving both
// RPT and IPT.
let rpt_ipt_timeout = self.estimate_timeout(&[
// The API requires us to specify a number of circuit builds and round trips.
// So what we tell the estimator is a rather imprecise description.
// What we are timing here is:
// INTRODUCE2 goes from IPT to HS.
// This happens in parallel with our waiting for the INTRODUCE_ACK,
// and we know that our own introduction circuit is always at least
// as long as the peer's (even if they are using full vanguards),
// so we don't need any additional delay here.
// HS builds to our RPT
(
MAX_PEER_CIRC_RETRIES,
TimeoutsAction::BuildCircuit {
length: peer_rend_circ_len,
),
// RENDEZVOUS1 goes from HS to RPT. `peer_circ_len`, one-way.
// RENDEZVOUS2 goes from RPT to us. `num_hops`, one-way.
1,
TimeoutsAction::OneWay {
length: total_circ_len,
]);
let rend2_msg: Rendezvous2 = self
.timeout(rpt_ipt_timeout, rendezvous.rend2_rx.recv(failed_map_err))
.map_err(|_: TimeoutError| FAE::RendezvousCompletionTimeout {
})??;
"hs conn to {}: RPT {} IPT {}: received RENDEZVOUS2",
// In theory would be great if we could have multiple introduction attempts in parallel
// with similar x,X values but different IPTs. However, our HS experts don't
// think increasing parallelism here is important:
let handshake_state = introduced.handshake_state;
// Try to complete the cryptographic handshake.
let keygen =
self.mocks
.rendezvous_handshake(handshake_state, rend2_msg, intro_index, &rend_pt)?;
let params = onion_circparams_from_netparams(self.netdir.params())
.map_err(into_internal!("Failed to build CircParameters"))?;
// TODO: We may be able to infer more about the supported protocols of the other side from our
// handshake, and from its descriptors.
// TODO CC: This is relevant for congestion control!
let protocols = self.netdir.client_protocol_status().required_protocols();
rendezvous
.m_extend_virtual(
handshake::RelayProtocol::HsV3,
handshake::HandshakeRole::Initiator,
keygen,
params,
protocols,
.map_err(into_internal!(
"actually this is probably a 'circuit closed' error" // TODO HS
))?;
"hs conn to {}: RPT {} IPT {}: HS circuit established",
Ok(rendezvous.rend_tunnel)
/// Helper to estimate a timeout for a complicated operation
/// `actions` is a list of `(count, action)`, where each entry
/// represents doing `action`, `count` times sequentially.
/// Combines the timeout estimates and returns an overall timeout.
fn estimate_timeout(&self, actions: &[(u32, TimeoutsAction)]) -> Duration {
// This algorithm is, perhaps, wrong. For uncorrelated variables, a particular
// percentile estimate for a sum of random variables, is not calculated by adding the
// percentile estimates of the individual variables.
// But the actual lengths of times of the operations aren't uncorrelated.
// If they were *perfectly* correlated, then this addition would be correct.
// It will do for now; it just might be rather longer than it ought to be.
actions
.map(|(count, action)| {
self.circpool
.m_estimate_timeout(action)
.saturating_mul(*count)
.fold(Duration::ZERO, Duration::saturating_add)
/// Mocks used for testing `connect.rs`
/// This is different to `MockableConnectorData`,
/// which is used to *replace* this file, when testing `state.rs`.
/// `MocksForConnect` provides mock facilities for *testing* this file.
// TODO this should probably live somewhere else, maybe tor-circmgr even?
// TODO this really ought to be made by macros or something
trait MocksForConnect<R>: Clone {
/// HS circuit pool
type HsCircPool: MockableCircPool<R>;
/// A random number generator
type Rng: rand::Rng + rand::CryptoRng;
/// Key generator used for generating the keys for the virtual hop.
type KeyGenerator: tor_proto::client::circuit::handshake::KeyGenerator + Send;
/// Tell tests we got this descriptor text
fn test_got_desc(&self, _: &HsDesc) {}
/// Tell tests we got this data tunnel.
fn test_got_tunnel(&self, _: &DataTunnel!(R, Self)) {}
/// Tell tests we have obtained and sorted the intros like this
fn test_got_ipts(&self, _: &[UsableIntroPt]) {}
/// Return a random number generator
fn thread_rng(&self) -> Self::Rng;
/// Complete the rendezvous handshake, returning the resulting keygen
fn rendezvous_handshake(
rend2_msg: Rendezvous2,
rend_pt: &RendPtIdentityForError,
) -> Result<Self::KeyGenerator, FAE>;
/// Mock for `HsCircPool`
/// Methods start with `m_` to avoid the following problem:
/// `ClientCirc::start_conversation` (say) means
/// to use the inherent method if one exists,
/// but will use a trait method if there isn't an inherent method.
/// So if the inherent method is renamed, the call in the impl here
/// turns into an always-recursive call.
/// This is not detected by the compiler due to the situation being
/// complicated by futures, `#[async_trait]` etc.
/// <https://github.com/rust-lang/rust/issues/111177>
#[async_trait]
trait MockableCircPool<R> {
/// Directory tunnel.
type DirTunnel: MockableClientDir;
/// Data tunnel.
type DataTunnel: MockableClientData;
/// Intro tunnel.
type IntroTunnel: MockableClientIntro;
async fn m_get_or_launch_dir(
netdir: &NetDir,
target: impl CircTarget + Send + Sync + 'async_trait,
) -> tor_circmgr::Result<Self::DirTunnel>;
async fn m_get_or_launch_intro(
) -> tor_circmgr::Result<Self::IntroTunnel>;
/// Client circuit
async fn m_get_or_launch_client_rend<'a>(
netdir: &'a NetDir,
) -> tor_circmgr::Result<(Self::DataTunnel, Relay<'a>)>;
/// Estimate timeout
fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration;
/// Mock for onion service client directory tunnel.
trait MockableClientDir: Debug {
type DirStream: AsyncRead + AsyncWrite + Send + Unpin;
async fn m_begin_dir_stream(&self) -> tor_circmgr::Result<Self::DirStream>;
/// Get a tor_dirclient::SourceInfo for this circuit, if possible.
fn m_source_info(&self) -> tor_proto::Result<Option<SourceInfo>>;
/// Return the length of this circuit.
fn m_num_hops(&self) -> tor_circmgr::Result<usize>;
/// Mock for onion service client data tunnel.
trait MockableClientData: Debug {
/// Conversation
type Conversation<'r>
where
Self: 'r;
/// Converse
async fn m_start_conversation_last_hop(
msg: Option<AnyRelayMsg>,
reply_handler: impl MsgHandler + Send + 'static,
) -> tor_circmgr::Result<Self::Conversation<'_>>;
/// Add a virtual hop to the circuit.
async fn m_extend_virtual(
protocol: handshake::RelayProtocol,
role: handshake::HandshakeRole,
handshake: impl handshake::KeyGenerator + Send,
params: CircParameters,
capabilities: &tor_protover::Protocols,
) -> tor_circmgr::Result<()>;
/// Return the number of our own hops in this circuit.
/// This does not count any hops for the service's rendezvous circuit.
/// It does count our virtual hop, if we have one.
/// (That isn't a problem, since we only use this method to calculate
/// timeouts, and we only calculate timeouts _before_ we establish
/// the virtual hop.)
fn m_num_own_hops(&self) -> tor_circmgr::Result<usize>;
/// Mock for onion service client introduction tunnel.
trait MockableClientIntro: Debug {
/// Return the number of hops in this circuit.
impl<R: Runtime> MocksForConnect<R> for () {
type HsCircPool = HsCircPool<R>;
type Rng = rand::rngs::ThreadRng;
type KeyGenerator = HsNtorHkdfKeyGenerator;
fn thread_rng(&self) -> Self::Rng {
rand::rng()
) -> Result<Self::KeyGenerator, FAE> {
handshake_state
.client_receive_rend(rend2_msg.handshake_info())
// If this goes wrong. either the onion service has mangled the crypto,
// or the rendezvous point has misbehaved (that that is possible is a protocol bug),
// or we have used the wrong handshake_state (let's assume that's not true).
// If this happens we'll go and try another RPT.
.map_err(|error| FAE::RendezvousCompletionHandshake {
impl<R: Runtime> MockableCircPool<R> for HsCircPool<R> {
type DirTunnel = ClientOnionServiceDirTunnel;
type DataTunnel = ClientOnionServiceDataTunnel;
type IntroTunnel = ClientOnionServiceIntroTunnel;
) -> tor_circmgr::Result<Self::DirTunnel> {
Ok(HsCircPool::get_or_launch_client_dir(self, netdir, target).await?)
) -> tor_circmgr::Result<Self::IntroTunnel> {
Ok(HsCircPool::get_or_launch_client_intro(self, netdir, target).await?)
) -> tor_circmgr::Result<(Self::DataTunnel, Relay<'a>)> {
HsCircPool::get_or_launch_client_rend(self, netdir).await
fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
HsCircPool::estimate_timeout(self, action)
impl MockableClientDir for ClientOnionServiceDirTunnel {
type DirStream = tor_proto::client::stream::DataStream;
async fn m_begin_dir_stream(&self) -> tor_circmgr::Result<Self::DirStream> {
Self::begin_dir_stream(self).await
fn m_source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
SourceInfo::from_tunnel(self)
fn m_num_hops(&self) -> tor_circmgr::Result<usize> {
self.n_hops()
impl MockableClientData for ClientOnionServiceDataTunnel {
type Conversation<'r> = tor_proto::Conversation<'r>;
) -> tor_circmgr::Result<Self::Conversation<'_>> {
Self::start_conversation(self, msg, reply_handler, TargetHop::LastHop).await
) -> tor_circmgr::Result<()> {
Self::extend_virtual(self, protocol, role, handshake, params, capabilities).await
fn m_num_own_hops(&self) -> tor_circmgr::Result<usize> {
impl MockableClientIntro for ClientOnionServiceIntroTunnel {
impl MockableConnectorData for Data {
type MockGlobalState = ();
async fn connect<R: Runtime>(
data: &mut Self,
) -> Result<Self::DataTunnel, ConnError> {
connect(connector, netdir, config, hsid, data, secret_keys).await
fn tunnel_is_ok(tunnel: &Self::DataTunnel) -> bool {
!tunnel.is_closed()
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
#![allow(dead_code, unused_variables)] // TODO HS TESTS delete, after tests are completed
use super::*;
use crate::*;
use itertools::chain;
use std::iter;
use tokio_crate as tokio;
use tor_async_utils::JoinReadWrite;
use tor_basic_utils::test_rng::{TestingRng, testing_rng};
use tor_hscrypto::pk::{HsClientDescEncKey, HsClientDescEncKeypair};
use tor_llcrypto::pk::curve25519;
use tor_netdoc::doc::{hsdesc::test_data, netstatus::Lifetime};
use tor_rtcompat::RuntimeSubstExt as _;
use tor_rtcompat::tokio::TokioNativeTlsRuntime;
use tor_rtmock::simple_time::SimpleMockTimeProvider;
use tracing_test::traced_test;
#[derive(derive_more::Debug, Default)]
struct MocksGlobal {
hsdirs_asked: Vec<OwnedCircTarget>,
got_desc: Option<HsDesc>,
#[debug(skip)]
rendezvous: Option<Box<dyn MsgHandler + Send + 'static>>,
intro_acks: Vec<(IntroduceAck, MetaCellDisposition)>,
#[derive(Clone, Debug)]
struct Mocks<I> {
mglobal: Arc<Mutex<MocksGlobal>>,
id: I,
struct MockKeyGenerator;
impl handshake::KeyGenerator for MockKeyGenerator {
fn expand(self, _keylen: usize) -> tor_proto::Result<tor_bytes::SecretBuf> {
todo!()
impl<R: Runtime> MocksForConnect<R> for Mocks<()> {
type HsCircPool = Mocks<()>;
type Rng = TestingRng;
type KeyGenerator = MockKeyGenerator;
fn test_got_desc(&self, desc: &HsDesc) {
self.mglobal.lock().unwrap().got_desc = Some(desc.clone());
fn test_got_ipts(&self, desc: &[UsableIntroPt]) {}
testing_rng()
_handshake_state: hs_ntor::HsNtorClientState,
_rend2_msg: Rendezvous2,
_intro_index: IntroPtIndex,
_rend_pt: &RendPtIdentityForError,
Ok(MockKeyGenerator)
impl<R: Runtime> MockableCircPool<R> for Mocks<()> {
type DataTunnel = Mocks<()>;
type DirTunnel = Mocks<()>;
type IntroTunnel = Mocks<()>;
_netdir: &NetDir,
let target = OwnedCircTarget::from_circ_target(&target);
self.mglobal.lock().unwrap().hsdirs_asked.push(target);
Ok(self.clone())
// Pick one of the relays we know to be in the test net as our RPT
let rpt = netdir.by_id(&Ed25519Identity::from([12; 32])).unwrap();
Ok((self.clone(), rpt))
Duration::from_secs(10)
impl MockableClientDir for Mocks<()> {
type DirStream = JoinReadWrite<futures::io::Cursor<Box<[u8]>>, futures::io::Sink>;
let response = format!(
r#"HTTP/1.1 200 OK
{}"#,
test_data::TEST_DATA_2
.into_bytes()
.into_boxed_slice();
Ok(JoinReadWrite::new(
futures::io::Cursor::new(response),
futures::io::sink(),
Ok(None)
Ok(4)
impl MockableClientData for Mocks<()> {
type Conversation<'r> = &'r ();
mut reply_handler: impl MsgHandler + Send + 'static,
match msg {
Some(AnyRelayMsg::EstablishRendezvous(_)) => {
let reply = RendezvousEstablished::default();
let disp = reply_handler.handle_msg(reply.into()).unwrap();
assert_eq!(disp, MetaCellDisposition::Consumed);
// Save this, because we'll need to use it later,
// when handling the INTRODUCE1
let mut global = self.mglobal.lock().unwrap();
global.rendezvous = Some(Box::new(reply_handler));
_ => panic!("unexpected msg {msg:?}"),
Ok(&())
Ok(())
impl MockableClientIntro for Mocks<()> {
Some(AnyRelayMsg::Introduce1(introduce1)) => {
let (reply, expected_disp) = global.intro_acks.remove(0);
assert_eq!(disp, expected_disp);
// Mock the service's response
let rendezvous = global
.rendezvous
.as_mut()
.expect("got INTRODUCE1 before ESTABLISH_RENDEZVOUS?!");
let reply = Rendezvous2::new(b"dummy handshake info, ignored");
let disp = rendezvous.handle_msg(reply.into()).unwrap();
assert_eq!(disp, MetaCellDisposition::ConversationFinished);
fn ks_hsc_desc_enc() -> HsClientDescEncKeypair {
let pk: HsClientDescEncKey = curve25519::PublicKey::from(test_data::TEST_PUBKEY_2).into();
let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
HsClientDescEncKeypair::new(pk, sk)
fn expected_hsdesc(hsid: HsId, netdir: &NetDir, now: SystemTime) -> HsDesc {
.unwrap()
.unwrap();
test_data::TEST_DATA_2,
&hs_blind_id,
&subcredential,
Some(&ks_hsc_desc_enc()),
.dangerously_assume_timely()
fn build_test_netdir() -> Arc<NetDir> {
let valid_after = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
let fresh_until = valid_after + humantime::parse_duration("1 hours").unwrap();
let valid_until = valid_after + humantime::parse_duration("24 hours").unwrap();
let lifetime = Lifetime::new(valid_after, fresh_until, valid_until).unwrap();
let netdir = tor_netdir::testnet::construct_custom_netdir_with_params(
tor_netdir::testnet::simple_net_func,
iter::empty::<(&str, _)>(),
Some(lifetime),
.expect("failed to build default testing netdir");
Arc::new(netdir.unwrap_if_sufficient().unwrap())
#[traced_test]
#[tokio::test]
async fn test_connect() {
use MetaCellDisposition::*;
let netdir = build_test_netdir();
let runtime = TokioNativeTlsRuntime::current().unwrap();
let now = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
let mock_sp = SimpleMockTimeProvider::from_wallclock(now);
let runtime = runtime
.with_sleep_provider(mock_sp.clone())
.with_coarse_time_provider(mock_sp.clone());
let success = (
IntroduceAck::new(IntroduceAckStatus::SUCCESS),
ConversationFinished,
let nack = (
IntroduceAck::new(IntroduceAckStatus::NOT_RECOGNIZED),
// The number of times to make Context:connect() fail due to intro NACK
// Set to 5 in order to trigger a rate-limit for all 6 HsDirs:
// there are 6 HsDirs in total, one of which is "used up" by the
// first (successful) connect() attempt below.
const INTRO_FAIL_COUNT: usize = 5;
/// The number of times we expect the client to retry the
/// introduction per connect() call
/// (it will essentially try two rounds of `intro_rend_connect()`,
/// once with the cached descriptor, and once with the potentially
/// new descriptor).
const IPT_RETRY_COUNT: usize = 12;
// The first introduction will succeed
let intro_acks = chain!(
[&success],
// But the next INTRO_FAIL_COUNT connect() will fail
// (+1 because we want to fail *again*, in order to find
// that there's now a limit on all our HsDirs)
[&nack; IPT_RETRY_COUNT * (INTRO_FAIL_COUNT + 1)],
// One more round of failures, to trigger a refecth after the rate-limit is lifted
[&nack; IPT_RETRY_COUNT - 1],
// After refetching the descriptor, the client will retry the introduction,
// and succeed.
.cloned()
.collect();
let mglobal = Arc::new(Mutex::new(MocksGlobal {
intro_acks,
..Default::default()
}));
let mocks = Mocks { mglobal, id: () };
// From C Tor src/test/test_hs_common.c test_build_address
let hsid = test_data::TEST_HSID_2.into();
let mut data = Data::default();
let mut expected_hsdirs_asked = 1;
let mut secret_keys_builder = HsClientSecretKeysBuilder::default();
secret_keys_builder.ks_hsc_desc_enc(ks_hsc_desc_enc());
let secret_keys = secret_keys_builder.build().unwrap();
let ctx = Context::new(
&runtime,
&mocks,
Arc::clone(&netdir),
Default::default(),
mocks.clone(),
let _got = ctx.connect(&mut data).await.unwrap();
// Our mock IPT hasn't sent any NACKs yet
assert!(!logs_contain("NACKed, refetching descriptor and retrying"));
let hsdesc = expected_hsdesc(hsid, &netdir, now);
{
let mglobal = mocks.mglobal.lock().unwrap();
assert_eq!(mglobal.hsdirs_asked.len(), expected_hsdirs_asked);
// TODO hs: here and in other places, consider implementing PartialEq instead, or creating
// an assert_dbg_eq macro (which would be part of a test_helpers crate or something)
assert_eq!(
format!("{:?}", mglobal.got_desc),
format!("{:?}", Some(hsdesc.clone()))
// Check how long the descriptor is valid for
let (start_time, end_time) = data.desc.as_ref().unwrap().desc.bounds();
assert_eq!(start_time, None);
let desc_valid_until = humantime::parse_rfc3339("2023-02-11T20:00:00Z").unwrap();
assert_eq!(end_time, Some(desc_valid_until));
// These attempts will all fail due to intro NACK,
// and trigger a rate-limit for all 6 HsDirs
for i in 1..=INTRO_FAIL_COUNT + 1 {
let err = ctx.connect(&mut data).await.unwrap_err();
let is_intro_nack = |e| matches!(e, FAE::IntroductionFailed { status, .. });
// All attempts failed because of our repeated intro NACKs
assert!(matches!(err, CE::Failed(e) if e.clone().into_iter().all(is_intro_nack)));
assert!(logs_contain("NACKed, refetching descriptor and retrying"));
// Because all intro attempts failed with NACK (NOT_RECOGNIZED),
// the client must've tried to refetch the descriptor
if i <= INTRO_FAIL_COUNT {
// No rate limiting yet, so the client must've tried to fetch a new
// descriptor, before failing again.
expected_hsdirs_asked += 1;
assert!(!logs_contain("but all hsdirs are rate-limited"));
// The final failure won't lead to an HsDir fetch
// because all HsDirs will be rate-limited at that point
assert!(logs_contain("but all hsdirs are rate-limited"));
// Same descriptor each time
// By default, the HsDir fetches are rate-limited for 15min
mock_sp.advance(Duration::from_secs(15 * 60));
// Finally, we succeed.
// And it turns out we did, in fact refetch the descriptor
// Finally, we try again, but find that all HsDirs are now rate-limited!
// So now we advance the time to lift the rate limit, and hope that
// TODO HS TESTS: we could extend our mock infrastructure
// to support returning a different hsdesc this time,
// with various revision counters, to check that the client is indeed
// keeping the newest one.
// TODO HS TESTS: check the circuit in got is the one we gave out
// TODO HS TESTS: continue with this
// TODO HS TESTS: Test IPT state management and expiry:
// - obtain a test descriptor with only a broken ipt
// (broken in the sense that intro can be attempted, but will fail somehow)
// - try to make a connection and expect it to fail
// - assert that the ipt data isn't empty
// - cause the descriptor to expire (advance clock)
// - start using a mocked RNG if we weren't already and pin its seed here
// - make a new descriptor with two IPTs: the broken one from earlier, and a new one
// - make a new connection
// - use test_got_ipts to check that the random numbers
// would sort the bad intro first, *and* that the good one is appears first
// - assert that connection succeeded
// - cause the circuit and descriptor to expire (advance clock)
// - go back to the previous descriptor contents, but with a new validity period
// - try to make a connection
// - use test_got_ipts to check that only the broken ipt is present
// TODO HS TESTS: test retries (of every retry loop we have here)
// TODO HS TESTS: test error paths