1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
#![deny(clippy::string_slice)] // See arti#2571
48
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
49

            
50
pub mod config;
51
pub mod err;
52

            
53
#[cfg(feature = "managed-pts")]
54
pub mod ipc;
55

            
56
#[cfg(feature = "managed-pts")]
57
mod managed;
58

            
59
use crate::config::{TransportConfig, TransportOptions};
60
use crate::err::PtError;
61
use oneshot_fused_workaround as oneshot;
62
use std::collections::HashMap;
63
use std::net::SocketAddr;
64
use std::path::PathBuf;
65
use std::sync::{Arc, RwLock};
66
use tor_chanmgr::ProxyProtocol;
67
use tor_config_path::CfgPathResolver;
68
use tor_linkspec::PtTransportName;
69
use tor_rtcompat::Runtime;
70
use tor_socksproto::SocksVersion;
71
#[cfg(any(feature = "tor-channel-factory", feature = "managed-pts"))]
72
use tracing::info;
73
use tracing::warn;
74
#[cfg(feature = "managed-pts")]
75
use {
76
    crate::managed::{PtReactor, PtReactorMessage},
77
    futures::channel::mpsc::{self, UnboundedSender},
78
    tor_error::error_report,
79
    tor_rtcompat::SpawnExt,
80
};
81
#[cfg(feature = "tor-channel-factory")]
82
use {
83
    async_trait::async_trait,
84
    tor_chanmgr::{
85
        builder::ChanBuilder,
86
        factory::{AbstractPtError, ChannelFactory},
87
        transport::ExternalProxyPlugin,
88
    },
89
    tracing::trace,
90
};
91

            
92
/// Shared mutable state between the `PtReactor` and `PtMgr`.
93
#[derive(Default, Debug)]
94
struct PtSharedState {
95
    /// Connection information for pluggable transports from currently running binaries.
96
    ///
97
    /// Unmanaged pluggable transports are not included in this map.
98
    #[allow(dead_code)]
99
    managed_cmethods: HashMap<PtTransportName, PtClientMethod>,
100
    /// Current configured set of pluggable transports.
101
    configured: HashMap<PtTransportName, TransportOptions>,
102
    /// The global Tor outbound proxy, if any.
103
    outbound_proxy: Option<ProxyProtocol>,
104
}
105

            
106
/// A pluggable transport manager knows how to make different
107
/// kinds of connections to the Tor network, for censorship avoidance.
108
pub struct PtMgr<R> {
109
    /// An underlying `Runtime`, used to spawn background tasks.
110
    #[allow(dead_code)]
111
    runtime: R,
112
    /// State for this `PtMgr` that's shared with the `PtReactor`.
113
    state: Arc<RwLock<PtSharedState>>,
114
    /// PtReactor channel when the `managed-pts` feature is enabled.
115
    #[cfg(feature = "managed-pts")]
116
    tx: UnboundedSender<PtReactorMessage>,
117
}
118

            
119
impl<R: Runtime> PtMgr<R> {
120
    /// Transform the config into a more useful representation indexed by transport name.
121
    fn transform_config(
122
        binaries: Vec<TransportConfig>,
123
    ) -> Result<HashMap<PtTransportName, TransportOptions>, tor_error::Bug> {
124
        let mut ret = HashMap::new();
125
        // FIXME(eta): You can currently specify overlapping protocols, and it'll
126
        //             just use the last transport specified.
127
        //             I attempted to fix this, but decided I didn't want to stare into the list
128
        //             builder macro void after trying it for 15 minutes.
129
        for thing in binaries {
130
            for tn in thing.protocols.iter() {
131
                ret.insert(tn.clone(), thing.clone().try_into()?);
132
            }
133
        }
134
        for opt in ret.values() {
135
            if let TransportOptions::Unmanaged(u) = opt {
136
                if !u.is_localhost() {
137
                    warn!(
138
                        "Configured to connect to a PT on a non-local addresses. This is usually insecure! We recommend running PTs on localhost only."
139
                    );
140
                }
141
            }
142
        }
143
        Ok(ret)
144
    }
145

            
146
    /// Create a new PtMgr.
147
    // TODO: maybe don't have the Vec directly exposed?
148
    pub fn new(
149
        transports: Vec<TransportConfig>,
150
        #[allow(unused)] state_dir: PathBuf,
151
        path_resolver: Arc<CfgPathResolver>,
152
        outbound_proxy: Option<ProxyProtocol>,
153
        rt: R,
154
    ) -> Result<Self, PtError> {
155
        let state = PtSharedState {
156
            managed_cmethods: Default::default(),
157
            configured: Self::transform_config(transports)?,
158
            outbound_proxy,
159
        };
160
        let state = Arc::new(RwLock::new(state));
161

            
162
        // reactor is only needed if we support managed pts
163
        #[cfg(feature = "managed-pts")]
164
        let tx = {
165
            let (tx, rx) = mpsc::unbounded();
166

            
167
            let mut reactor =
168
                PtReactor::new(rt.clone(), state.clone(), rx, state_dir, path_resolver);
169
            rt.spawn(async move {
170
                loop {
171
                    match reactor.run_one_step().await {
172
                        Ok(true) => return,
173
                        Ok(false) => {}
174
                        Err(e) => {
175
                            error_report!(e, "PtReactor failed");
176
                            return;
177
                        }
178
                    }
179
                }
180
            })
181
            .map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
182

            
183
            tx
184
        };
185

            
186
        Ok(Self {
187
            runtime: rt,
188
            state,
189
            #[cfg(feature = "managed-pts")]
190
            tx,
191
        })
192
    }
193

            
194
    /// Reload the configuration
195
    pub fn reconfigure(
196
        &self,
197
        how: tor_config::Reconfigure,
198
        transports: Vec<TransportConfig>,
199
        outbound_proxy: Option<ProxyProtocol>,
200
    ) -> Result<(), tor_config::ReconfigureError> {
201
        let configured = Self::transform_config(transports)?;
202
        if how == tor_config::Reconfigure::CheckAllOrNothing {
203
            return Ok(());
204
        }
205
        {
206
            let mut inner = self.state.write().expect("ptmgr poisoned");
207
            inner.configured = configured;
208
            inner.outbound_proxy = outbound_proxy;
209
        }
210
        // We don't have any way of propagating this sanely; the caller will find out the reactor
211
        // has died later on anyway.
212
        #[cfg(feature = "managed-pts")]
213
        let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
214
        Ok(())
215
    }
216

            
217
    /// Given a transport name, return a method that we can use to contact that transport.
218
    ///
219
    /// May have to launch a managed transport as needed.
220
    ///
221
    /// Returns Ok(None) if no such transport exists.
222
    #[cfg(feature = "tor-channel-factory")]
223
    async fn get_cmethod_for_transport(
224
        &self,
225
        transport: &PtTransportName,
226
    ) -> Result<Option<PtClientMethod>, PtError> {
227
        #[allow(unused)]
228
        let (cfg, managed_cmethod) = {
229
            // NOTE(eta): This is using a RwLock inside async code (but not across an await point).
230
            //            Arguably this is fine since it's just a small read, and nothing should ever
231
            //            hold this lock for very long.
232
            let inner = self.state.read().expect("ptmgr poisoned");
233
            let cfg = inner.configured.get(transport);
234
            let managed_cmethod = inner.managed_cmethods.get(transport);
235
            (cfg.cloned(), managed_cmethod.cloned())
236
        };
237

            
238
        match cfg {
239
            Some(TransportOptions::Unmanaged(cfg)) => {
240
                let cmethod = cfg.cmethod();
241
                trace!(
242
                    "Found configured unmanaged transport {transport} accessible via {cmethod:?}"
243
                );
244
                Ok(Some(cmethod))
245
            }
246
            #[cfg(feature = "managed-pts")]
247
            Some(TransportOptions::Managed(_cfg)) => {
248
                match managed_cmethod {
249
                    // A configured-and-running cmethod.
250
                    Some(cmethod) => {
251
                        trace!(
252
                            "Found configured managed transport {transport} accessible via {cmethod:?}"
253
                        );
254
                        Ok(Some(cmethod))
255
                    }
256
                    // A configured-but-not-running cmethod.
257
                    None => {
258
                        // There is going to be a lot happening "under the hood" here.
259
                        //
260
                        // When we are asked to get a ChannelFactory for a given
261
                        // connection, we will need to:
262
                        //    - launch the binary for that transport if it is not already running*.
263
                        //    - If we launched the binary, talk to it and see which ports it
264
                        //      is listening on.
265
                        //    - Return a ChannelFactory that connects via one of those ports,
266
                        //      using the appropriate version of SOCKS, passing K=V parameters
267
                        //      encoded properly.
268
                        //
269
                        // * As in other managers, we'll need to avoid trying to launch the same
270
                        //   transport twice if we get two concurrent requests.
271
                        //
272
                        // Later if the binary crashes, we should detect that.  We should relaunch
273
                        // it on demand.
274
                        //
275
                        // On reconfigure, we should shut down any no-longer-used transports.
276
                        //
277
                        // Maybe, we should shut down transports that haven't been used
278
                        // for a long time.
279
                        Ok(Some(self.spawn_transport(transport).await?))
280
                    }
281
                }
282
            }
283
            // No configuration for this transport.
284
            None => {
285
                trace!("Got a request for transport {transport}, which is not configured.");
286
                Ok(None)
287
            }
288
        }
289
    }
290

            
291
    /// Communicate with the PT reactor to launch a managed transport.
292
    #[cfg(all(feature = "tor-channel-factory", feature = "managed-pts"))]
293
    async fn spawn_transport(
294
        &self,
295
        transport: &PtTransportName,
296
    ) -> Result<PtClientMethod, PtError> {
297
        // Tell the reactor to spawn the PT, and wait for it.
298
        // (The reactor will handle coalescing multiple requests.)
299
        info!(
300
            "Got a request for transport {transport}, which is not currently running. Launching it."
301
        );
302

            
303
        let (tx, rx) = oneshot::channel();
304
        self.tx
305
            .unbounded_send(PtReactorMessage::Spawn {
306
                pt: transport.clone(),
307
                result: tx,
308
            })
309
            .map_err(|_| {
310
                PtError::Internal(tor_error::internal!("PT reactor closed unexpectedly"))
311
            })?;
312

            
313
        let method = match rx.await {
314
            Err(_) => {
315
                return Err(PtError::Internal(tor_error::internal!(
316
                    "PT reactor closed unexpectedly"
317
                )));
318
            }
319
            Ok(Err(e)) => {
320
                warn!("PT for {transport} failed to launch: {e}");
321
                return Err(e);
322
            }
323
            Ok(Ok(method)) => method,
324
        };
325

            
326
        info!("Successfully launched PT for {transport} at {method:?}.");
327
        Ok(method)
328
    }
329
}
330

            
331
/// A SOCKS endpoint to connect through a pluggable transport.
332
#[derive(Debug, Clone, PartialEq, Eq)]
333
pub struct PtClientMethod {
334
    /// The SOCKS protocol version to use.
335
    pub(crate) kind: SocksVersion,
336
    /// The socket address to connect to.
337
    pub(crate) endpoint: SocketAddr,
338
}
339

            
340
impl PtClientMethod {
341
    /// Get the SOCKS protocol version to use.
342
    pub fn kind(&self) -> SocksVersion {
343
        self.kind
344
    }
345

            
346
    /// Get the socket address to connect to.
347
    pub fn endpoint(&self) -> SocketAddr {
348
        self.endpoint
349
    }
350
}
351

            
352
#[cfg(feature = "tor-channel-factory")]
353
#[async_trait]
354
impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
355
    async fn factory_for_transport(
356
        &self,
357
        transport: &PtTransportName,
358
    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
359
        let cmethod = match self.get_cmethod_for_transport(transport).await {
360
            Err(e) => return Err(Arc::new(e)),
361
            Ok(None) => return Ok(None),
362
            Ok(Some(m)) => m,
363
        };
364

            
365
        let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
366
        let factory = ChanBuilder::new_client(self.runtime.clone(), proxy);
367
        // FIXME(eta): Should we cache constructed factories? If no: should this still be an Arc?
368
        // FIXME(eta): Should we track what transports are live somehow, so we can shut them down?
369
        Ok(Some(Arc::new(factory)))
370
    }
371
}