1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48

            
49
pub mod builder;
50
mod config;
51
mod err;
52
mod event;
53
pub mod factory;
54
mod mgr;
55
#[cfg(test)]
56
mod testing;
57
pub mod transport;
58
pub(crate) mod util;
59

            
60
use futures::StreamExt;
61
use futures::select_biased;
62
use std::result::Result as StdResult;
63
use std::sync::{Arc, Weak};
64
use std::time::Duration;
65
use tor_config::ReconfigureError;
66
use tor_error::error_report;
67
use tor_linkspec::{ChanTarget, OwnedChanTarget};
68
use tor_netdir::{NetDirProvider, params::NetParameters};
69
use tor_proto::channel::Channel;
70
#[cfg(feature = "experimental-api")]
71
use tor_proto::memquota::ChannelAccount;
72
use tor_proto::memquota::ToplevelAccount;
73
use tor_rtcompat::SpawnExt;
74
use tracing::debug;
75
use tracing::instrument;
76
use void::{ResultVoidErrExt, Void};
77

            
78
#[cfg(feature = "relay")]
79
use {
80
    async_trait::async_trait, safelog::Sensitive,
81
    tor_proto::relay::channel_provider::ChannelProvider,
82
};
83

            
84
pub use err::Error;
85

            
86
pub use config::{ChannelConfig, ChannelConfigBuilder};
87
pub use mgr::ChanMgrConfig;
88

            
89
use tor_rtcompat::Runtime;
90

            
91
/// A Result as returned by this crate.
92
pub type Result<T> = std::result::Result<T, Error>;
93

            
94
use crate::factory::BootstrapReporter;
95
pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
96
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
97

            
98
/// An object that remembers a set of live channels, and launches new ones on
99
/// request.
100
///
101
/// Use the [`ChanMgr::get_or_launch`] function to create a new [`Channel`], or
102
/// get one if it exists.  (For a slightly lower-level API that does no caching,
103
/// see [`ChannelFactory`](factory::ChannelFactory) and its implementors.
104
///
105
/// Each channel is kept open as long as there is a reference to it, or
106
/// something else (such as the relay or a network error) kills the channel.
107
///
108
/// After a `ChanMgr` launches a channel, it keeps a reference to it until that
109
/// channel has been unused (that is, had no circuits attached to it) for a
110
/// certain amount of time. (Currently this interval is chosen randomly from
111
/// between 180-270 seconds, but this is an implementation detail that may change
112
/// in the future.)
113
pub struct ChanMgr<R: Runtime> {
114
    /// Internal channel manager object that does the actual work.
115
    ///
116
    /// ## How this is built
117
    ///
118
    /// This internal manager is parameterized over an
119
    /// [`mgr::AbstractChannelFactory`], which here is instantiated with a [`factory::CompoundFactory`].
120
    /// The `CompoundFactory` itself holds:
121
    ///   * A `dyn` [`factory::AbstractPtMgr`] that can provide a `dyn`
122
    ///     [`factory::ChannelFactory`] for each supported pluggable transport.
123
    ///     This starts out as `None`, but can be replaced with [`ChanMgr::set_pt_mgr`].
124
    ///     The `TorClient` code currently sets this using `tor_ptmgr::PtMgr`.
125
    ///     `PtMgr` currently returns `ChannelFactory` implementations that are
126
    ///     built using [`transport::proxied::ExternalProxyPlugin`], which implements
127
    ///     [`transport::TransportImplHelper`], which in turn is wrapped into a
128
    ///     `ChanBuilder` to implement `ChannelFactory`.
129
    ///   * A generic [`factory::ChannelFactory`] that it uses for everything else
130
    ///     We instantiate this with a
131
    ///     [`builder::ChanBuilder`] using a [`transport::default::DefaultTransport`].
132
    // This type is a bit long, but I think it's better to just state it here explicitly rather than
133
    // hiding parts of it behind a type alias to make it look nicer.
134
    mgr: mgr::AbstractChanMgr<
135
        factory::CompoundFactory<builder::ChanBuilder<R, transport::DefaultTransport<R>>>,
136
    >,
137

            
138
    /// Stream of [`ConnStatus`] events.
139
    bootstrap_status: event::ConnStatusEvents,
140

            
141
    /// The runtime. Needed to possibly spawn tasks.
142
    #[allow(unused)] // Relay use this, not client yet. Keep it here instead of gating.
143
    runtime: R,
144
}
145

            
146
/// Description of how we got a channel.
147
#[non_exhaustive]
148
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
149
pub enum ChanProvenance {
150
    /// This channel was newly launched, or was in progress and finished while
151
    /// we were waiting.
152
    NewlyCreated,
153
    /// This channel already existed when we asked for it.
154
    Preexisting,
155
}
156

            
157
/// Dormancy state, as far as the channel manager is concerned
158
///
159
/// This is usually derived in higher layers from `arti_client::DormantMode`.
160
#[non_exhaustive]
161
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
162
pub enum Dormancy {
163
    /// Not dormant
164
    ///
165
    /// Channels will operate normally.
166
    #[default]
167
    Active,
168
    /// Totally dormant
169
    ///
170
    /// Channels will not perform any spontaneous activity (eg, netflow padding)
171
    Dormant,
172
}
173

            
174
/// The usage that we have in mind when requesting a channel.
175
///
176
/// A channel may be used in multiple ways.  Each time a channel is requested
177
/// from `ChanMgr` a separate `ChannelUsage` is passed in to tell the `ChanMgr`
178
/// how the channel will be used this time.
179
///
180
/// To be clear, the `ChannelUsage` is aspect of a _request_ for a channel, and
181
/// is not an immutable property of the channel itself.
182
///
183
/// This type is obtained from a `tor_circmgr::usage::SupportedCircUsage` in
184
/// `tor_circmgr::usage`, and it has roughly the same set of variants.
185
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
186
#[non_exhaustive]
187
pub enum ChannelUsage {
188
    /// Requesting a channel to use for BEGINDIR-based non-anonymous directory
189
    /// connections.
190
    Dir,
191

            
192
    /// Requesting a channel to transmit user traffic (including exit traffic)
193
    /// over the network.
194
    ///
195
    /// This includes the case where we are constructing a circuit preemptively,
196
    /// and _planning_ to use it for user traffic later on.
197
    UserTraffic,
198

            
199
    /// Requesting a channel that the caller does not plan to used at all, or
200
    /// which it plans to use only for testing circuits.
201
    UselessCircuit,
202
}
203

            
204
impl<R: Runtime> ChanMgr<R> {
205
    /// Construct a new channel manager.
206
    ///
207
    /// A new `ChannelAccount` will be made from `memquota`, for each Channel.
208
    ///
209
    /// The `ChannelAccount` is used for data associated with this channel.
210
    ///
211
    /// This does *not* (currently) include downstream outbound data
212
    /// (ie, data processed by the channel implementation here,
213
    /// awaiting TLS processing and actual transmission).
214
    /// In any case we try to keep those buffers small.
215
    ///
216
    /// The ChannelAccount *does* track upstream outbound data
217
    /// (ie, data processed by a circuit, but not yet by the channel),
218
    /// even though that data relates to a specific circuit.
219
    /// TODO #1652 use `CircuitAccount` for circuit->channel queue.
220
    ///
221
    /// # Usage note
222
    ///
223
    /// For the manager to work properly, you will need to call `ChanMgr::launch_background_tasks`.
224
    ///
225
    /// The `keymgr` is only needed for a relay which is used for authenticating its channel to
226
    /// other relays. Pass `None` for a client.
227
58
    pub fn new(
228
58
        runtime: R,
229
58
        config: ChanMgrConfig,
230
58
        dormancy: Dormancy,
231
58
        netparams: &NetParameters,
232
58
        memquota: ToplevelAccount,
233
58
    ) -> Result<Self>
234
58
    where
235
58
        R: 'static,
236
    {
237
58
        let (sender, receiver) = event::channel();
238
58
        let sender = Arc::new(std::sync::Mutex::new(sender));
239
58
        let reporter = BootstrapReporter(sender);
240
58
        let transport =
241
58
            transport::DefaultTransport::new(runtime.clone(), config.cfg.outbound_proxy.clone());
242
        cfg_if::cfg_if! {
243
            if #[cfg(feature = "relay")] {
244
58
                let builder = if let Some(auth_material) = &config.auth_material {
245
                    builder::ChanBuilder::new_relay(runtime.clone(), transport, auth_material.clone(), config.my_addrs)?
246
                } else {
247
                    // Yes, clients can have the "relay" feature enabled (unit tests).
248
58
                    builder::ChanBuilder::new_client(runtime.clone(), transport)
249
                };
250
            } else {
251
                let builder =  builder::ChanBuilder::new_client(runtime.clone(), transport);
252
            }
253
        };
254

            
255
58
        let factory = factory::CompoundFactory::new(
256
58
            Arc::new(builder),
257
            #[cfg(feature = "pt-client")]
258
58
            None,
259
        );
260

            
261
        // Warn if outbound_proxy is configured to a non-loopback address
262
58
        if let Some(ref proxy) = config.cfg.outbound_proxy {
263
            if !proxy.is_loopback() {
264
                tracing::warn!(
265
                    proxy_addr = %proxy,
266
                    "outbound_proxy is configured to a non-loopback address; \
267
                     this may expose Tor traffic to an untrusted intermediate"
268
                );
269
            }
270
58
        }
271

            
272
58
        let mgr =
273
58
            mgr::AbstractChanMgr::new(factory, config.cfg, dormancy, netparams, reporter, memquota);
274

            
275
58
        Ok(ChanMgr {
276
58
            mgr,
277
58
            bootstrap_status: receiver,
278
58
            runtime,
279
58
        })
280
58
    }
281

            
282
    /// Launch the periodic daemon tasks required by the manager to function properly.
283
    ///
284
    /// Returns a [`TaskHandle`] that can be used to manage
285
    /// those daemon tasks that poll periodically.
286
    #[instrument(level = "trace", skip_all)]
287
22
    pub fn launch_background_tasks(
288
22
        self: &Arc<Self>,
289
22
        runtime: &R,
290
22
        netdir: Arc<dyn NetDirProvider>,
291
22
    ) -> Result<Vec<TaskHandle>> {
292
22
        runtime
293
22
            .spawn(Self::continually_update_channels_config(
294
22
                Arc::downgrade(self),
295
22
                netdir,
296
            ))
297
22
            .map_err(|e| Error::from_spawn("channels config task", e))?;
298

            
299
22
        let (sched, handle) = TaskSchedule::new(runtime.clone());
300
22
        runtime
301
22
            .spawn(Self::continually_expire_channels(
302
22
                sched,
303
22
                Arc::downgrade(self),
304
            ))
305
22
            .map_err(|e| Error::from_spawn("channel expiration task", e))?;
306
22
        Ok(vec![handle])
307
22
    }
308

            
309
    /// Build a channel for an incoming stream.
310
    ///
311
    /// The `my_addrs` are the IP address(es) that are advertised by the relay in the consensus. We
312
    /// need to pass them so they can be sent in the NETINFO cell.
313
    ///
314
    /// The channel may or may not be authenticated. This method will wait until the channel is
315
    /// usable, and may return an error if we already have an existing channel to this peer, or if
316
    /// there are already too many open connections with this peer or subnet (as a dos defence).
317
    #[cfg(feature = "relay")]
318
    pub async fn handle_incoming(
319
        &self,
320
        src: Sensitive<std::net::SocketAddr>,
321
        stream: <R as tor_rtcompat::NetStreamProvider>::Stream,
322
    ) -> Result<Arc<Channel>> {
323
        self.mgr.handle_incoming(src, stream).await
324
    }
325

            
326
    /// Try to get a suitable channel to the provided `target`,
327
    /// launching one if one does not exist.
328
    ///
329
    /// This function does not guarantee that the returned channel
330
    /// satisfies all of the properties of `target`. For example if an
331
    /// existing channel is returned, it might not be connected to any
332
    /// of the addresses specified in `target`.
333
    // ^ see https://gitlab.torproject.org/tpo/core/arti/-/issues/2344
334
    ///
335
    /// If there is already a channel launch attempt in progress, this
336
    /// function will wait until that launch is complete, and succeed
337
    /// or fail depending on its outcome.
338
    #[instrument(level = "trace", skip_all)]
339
    pub async fn get_or_launch<T: ChanTarget + ?Sized>(
340
        &self,
341
        target: &T,
342
        usage: ChannelUsage,
343
    ) -> Result<(Arc<Channel>, ChanProvenance)> {
344
        let targetinfo = OwnedChanTarget::from_chan_target(target);
345

            
346
        let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
347
        // Double-check the match to make sure that the RSA identity is
348
        // what we wanted too.
349
        chan.check_match(target)
350
            .map_err(|e| Error::from_proto_no_skew(e, target))?;
351
        Ok((chan, provenance))
352
    }
353

            
354
    /// Return a stream of [`ConnStatus`] events to tell us about changes
355
    /// in our ability to connect to the internet.
356
    ///
357
    /// Note that this stream can be lossy: the caller will not necessarily
358
    /// observe every event on the stream
359
22
    pub fn bootstrap_events(&self) -> ConnStatusEvents {
360
22
        self.bootstrap_status.clone()
361
22
    }
362

            
363
    /// Expire all channels that have been unused for too long.
364
    ///
365
    /// Return the duration from now until next channel expires.
366
36
    pub fn expire_channels(&self) -> Duration {
367
36
        self.mgr.expire_channels()
368
36
    }
369

            
370
    /// Notifies the chanmgr to be dormant like dormancy
371
18
    pub fn set_dormancy(
372
18
        &self,
373
18
        dormancy: Dormancy,
374
18
        netparams: Arc<dyn AsRef<NetParameters>>,
375
18
    ) -> StdResult<(), tor_error::Bug> {
376
18
        self.mgr.set_dormancy(dormancy, netparams)
377
18
    }
378

            
379
    /// Reconfigure all channels
380
8
    pub fn reconfigure(
381
8
        &self,
382
8
        config: &ChannelConfig,
383
8
        how: tor_config::Reconfigure,
384
8
        netparams: Arc<dyn AsRef<NetParameters>>,
385
8
    ) -> StdResult<(), ReconfigureError> {
386
8
        if how == tor_config::Reconfigure::CheckAllOrNothing {
387
            // Since `self.mgr.reconfigure` returns an error type of `Bug` and not
388
            // `ReconfigureError` (see check below), the reconfigure should only fail due to bugs.
389
            // This means we can return `Ok` here since there should never be an error with the
390
            // provided `config` values.
391
4
            return Ok(());
392
4
        }
393

            
394
4
        let r = self.mgr.reconfigure(config, netparams);
395

            
396
        // Check that `self.mgr.reconfigure` returns an error type of `Bug` (see comment above).
397
4
        let _: Option<&tor_error::Bug> = r.as_ref().err();
398

            
399
4
        Ok(r?)
400
8
    }
401

            
402
    /// Replace the transport registry with one that may know about
403
    /// more transports.
404
    ///
405
    /// Note that the [`ChannelFactory`](factory::ChannelFactory) instances returned by `ptmgr` are
406
    /// required to time-out channels that take too long to build.  You'll get
407
    /// this behavior by default if the factories implement [`ChannelFactory`](factory::ChannelFactory) using
408
    /// [`transport::proxied::ExternalProxyPlugin`], which `tor-ptmgr` does.
409
    #[cfg(feature = "pt-client")]
410
22
    pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
411
22
        self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
412
22
    }
413

            
414
    /// Replace the relay auth material used for building new channels.
415
    ///
416
    /// This rebuilds the internal channel builder with the provided `auth_material`, which includes a
417
    /// new TLS cert and key. Existing channels are not affected, only newly created channels will
418
    /// use the new keys.
419
    #[cfg(feature = "relay")]
420
    pub fn set_relay_auth_material(
421
        &self,
422
        auth_material: Arc<tor_proto::RelayChannelAuthMaterial>,
423
    ) -> Result<()> {
424
        let mut result = Ok(());
425
        self.mgr.with_mut_builder(|f| {
426
            match f
427
                .default_factory()
428
                .rebuild_with_auth_material(auth_material)
429
            {
430
                Ok(b) => f.replace_default_factory(Arc::new(b)),
431
                Err(e) => result = Err(e),
432
            }
433
        });
434
        result
435
    }
436

            
437
    /// Try to create a new, unmanaged channel to `target`.
438
    ///
439
    /// Unlike [`get_or_launch`](ChanMgr::get_or_launch), this function always
440
    /// creates a new channel, never retries transient failure, and does not
441
    /// register this channel with the `ChanMgr`.
442
    ///
443
    /// Generally you should not use this function; `get_or_launch` is usually a
444
    /// better choice.  This function is the right choice if, for whatever
445
    /// reason, you need to manage the lifetime of the channel you create, and
446
    /// make sure that no other code with access to this `ChanMgr` will be able
447
    /// to use the channel.
448
    #[cfg(feature = "experimental-api")]
449
    #[instrument(level = "trace", skip_all)]
450
    pub async fn build_unmanaged_channel(
451
        &self,
452
        target: impl tor_linkspec::IntoOwnedChanTarget,
453
        memquota: ChannelAccount,
454
    ) -> Result<Arc<Channel>> {
455
        use factory::ChannelFactory as _;
456
        let target = target.to_owned();
457

            
458
        self.mgr
459
            .channels
460
            .builder()
461
            .connect_via_transport(&target, self.mgr.reporter.clone(), memquota)
462
            .await
463
    }
464

            
465
    /// Watch for things that ought to change the configuration of all channels in the client
466
    ///
467
    /// Currently this handles enabling and disabling channel padding.
468
    ///
469
    /// This is a daemon task that runs indefinitely in the background,
470
    /// and exits when we find that `chanmgr` is dropped.
471
    #[instrument(level = "trace", skip_all)]
472
22
    async fn continually_update_channels_config(
473
22
        self_: Weak<Self>,
474
22
        netdir: Arc<dyn NetDirProvider>,
475
22
    ) {
476
        use tor_netdir::DirEvent as DE;
477
        let mut netdir_stream = netdir.events().fuse();
478
        let netdir = {
479
            let weak = Arc::downgrade(&netdir);
480
            drop(netdir);
481
            weak
482
        };
483
22
        let termination_reason: std::result::Result<Void, &str> = async move {
484
            loop {
485
22
                select_biased! {
486
22
                    direvent = netdir_stream.next() => {
487
                        let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
488
                        if ! matches!(direvent, DE::NewConsensus) { continue };
489
                        let self_ = self_.upgrade().ok_or("channel manager gone away")?;
490
                        let netdir = netdir.upgrade().ok_or("netdir gone away")?;
491
                        let netparams = netdir.params();
492
                        self_.mgr.update_netparams(netparams).map_err(|e| {
493
                            error_report!(e, "continually_update_channels_config: failed to process!");
494
                            "error processing netdir"
495
                        })?;
496
                    }
497
                }
498
            }
499
        }
500
        .await;
501
        debug!(
502
            "continually_update_channels_config: shutting down: {}",
503
            termination_reason.void_unwrap_err()
504
        );
505
    }
506

            
507
    /// Periodically expire any channels that have been unused beyond
508
    /// the maximum duration allowed.
509
    ///
510
    /// Exist when we find that `chanmgr` is dropped
511
    ///
512
    /// This is a daemon task that runs indefinitely in the background
513
    #[instrument(level = "trace", skip_all)]
514
22
    async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
515
        while sched.next().await.is_some() {
516
            let Some(cm) = Weak::upgrade(&chanmgr) else {
517
                // channel manager is closed.
518
                return;
519
            };
520
            let delay = cm.expire_channels();
521
            // This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
522
            sched.fire_in(delay);
523
        }
524
22
    }
525
}
526

            
527
#[cfg(feature = "relay")]
528
#[async_trait]
529
impl<R: Runtime> ChannelProvider for ChanMgr<R> {
530
    type BuildSpec = OwnedChanTarget;
531

            
532
    fn get_or_launch(
533
        self: Arc<Self>,
534
        reactor_id: tor_proto::circuit::UniqId,
535
        target: Self::BuildSpec,
536
        tx: tor_proto::relay::channel_provider::OutboundChanSender,
537
    ) -> tor_proto::Result<()> {
538
        use tor_error::into_internal;
539

            
540
        debug!("Get or launch channel to {target} for circuit reactor {reactor_id}");
541

            
542
        let chanmgr = self.clone();
543
        self.runtime
544
            .spawn(async move {
545
                let r = chanmgr
546
                    .mgr
547
                    .get_or_launch(target, ChannelUsage::UserTraffic)
548
                    .await
549
                    .map_err(|e| tor_proto::Error::ChanProto(e.to_string())); // Is it a ChanProto?
550
                // Send back the channel.
551
                tx.send(r.map(|(chan, _)| chan));
552
            })
553
            .map_err(into_internal!("Failed to launch channel provider task"))?;
554

            
555
        Ok(())
556
    }
557
}