1
//! Infrastructure required to support managed PTs.
2

            
3
use crate::config::{ManagedTransportOptions, TransportOptions};
4
use crate::err;
5
use crate::err::PtError;
6
use crate::ipc::{
7
    PluggableClientTransport, PluggableTransport, PtClientParameters, PtCommonParameters,
8
    sealed::PluggableTransportPrivate,
9
};
10
use crate::{PtClientMethod, PtSharedState};
11
use futures::channel::mpsc::UnboundedReceiver;
12
use futures::stream::FuturesUnordered;
13
use futures::{FutureExt, StreamExt, select};
14
use oneshot_fused_workaround as oneshot;
15
use std::collections::{HashMap, HashSet};
16
use std::future::Future;
17
use std::path::{Path, PathBuf};
18
use std::pin::Pin;
19
use std::sync::{Arc, RwLock};
20
use tor_chanmgr::ProxyProtocol;
21
use tor_config_path::CfgPathResolver;
22
use tor_error::internal;
23
use tor_linkspec::PtTransportName;
24
use tor_rtcompat::Runtime;
25
use tracing::{debug, warn};
26

            
27
/// A message to the `PtReactor`.
28
pub(crate) enum PtReactorMessage {
29
    /// Notify the reactor that the currently configured set of PTs has changed.
30
    Reconfigured,
31
    /// Ask the reactor to spawn a pluggable transport binary.
32
    Spawn {
33
        /// Spawn a binary to provide this PT.
34
        pt: PtTransportName,
35
        /// Notify the result via this channel.
36
        result: oneshot::Sender<err::Result<PtClientMethod>>,
37
    },
38
}
39

            
40
/// The result of a spawn attempt: the list of transports the spawned binary covers, and the result.
41
type SpawnResult = (Vec<PtTransportName>, err::Result<PluggableClientTransport>);
42

            
43
/// Background reactor to handle managing pluggable transport binaries.
44
pub(crate) struct PtReactor<R> {
45
    /// Runtime.
46
    rt: R,
47
    /// Currently running pluggable transport binaries.
48
    running: Vec<PluggableClientTransport>,
49
    /// A map of asked-for transports.
50
    ///
51
    /// If a transport name has an entry, we will append any additional requests for that entry.
52
    /// If no entry is present, we will start a request.
53
    requests: HashMap<PtTransportName, Vec<oneshot::Sender<err::Result<PtClientMethod>>>>,
54
    /// FuturesUnordered that spawned tasks get pushed on to.
55
    ///
56
    /// WARNING: This MUST always contain one "will never resolve" future!
57
    spawning: FuturesUnordered<Pin<Box<dyn Future<Output = SpawnResult> + Send>>>,
58
    /// State for the corresponding PtMgr.
59
    state: Arc<RwLock<PtSharedState>>,
60
    /// PtMgr channel.
61
    /// (Unbounded so that we can reconfigure without blocking: we're unlikely to have the reactor
62
    /// get behind.)
63
    rx: UnboundedReceiver<PtReactorMessage>,
64
    /// State directory.
65
    state_dir: PathBuf,
66
    /// Path resolver for configuration files.
67
    path_resolver: Arc<CfgPathResolver>,
68
}
69

            
70
impl<R: Runtime> PtReactor<R> {
71
    /// Make a new reactor.
72
    pub(crate) fn new(
73
        rt: R,
74
        state: Arc<RwLock<PtSharedState>>,
75
        rx: UnboundedReceiver<PtReactorMessage>,
76
        state_dir: PathBuf,
77
        path_resolver: Arc<CfgPathResolver>,
78
    ) -> Self {
79
        let spawning = FuturesUnordered::new();
80
        spawning.push(Box::pin(futures::future::pending::<SpawnResult>())
81
            as Pin<Box<dyn Future<Output = _> + Send>>);
82
        Self {
83
            rt,
84
            running: vec![],
85
            requests: Default::default(),
86
            spawning,
87
            state,
88
            rx,
89
            state_dir,
90
            path_resolver,
91
        }
92
    }
93

            
94
    /// Called when a spawn request completes.
95
    #[allow(clippy::needless_pass_by_value)]
96
    fn handle_spawned(
97
        &mut self,
98
        covers: Vec<PtTransportName>,
99
        result: err::Result<PluggableClientTransport>,
100
    ) {
101
        match result {
102
            Err(e) => {
103
                warn!("Spawning PT for {:?} failed: {}", covers, e);
104
                // Go and tell all the transports about the bad news.
105
                let senders = covers
106
                    .iter()
107
                    .flat_map(|x| self.requests.remove(x))
108
                    .flatten();
109
                for sender in senders {
110
                    // We don't really care if the sender went away.
111
                    let _ = sender.send(Err(e.clone()));
112
                }
113
            }
114
            Ok(pt) => {
115
                let mut state = self.state.write().expect("ptmgr state poisoned");
116
                for (transport, method) in pt.transport_methods() {
117
                    state
118
                        .managed_cmethods
119
                        .insert(transport.clone(), method.clone());
120
                    for sender in self.requests.remove(transport).into_iter().flatten() {
121
                        let _ = sender.send(Ok(method.clone()));
122
                    }
123
                }
124

            
125
                let requested: HashSet<_> = covers.iter().collect();
126
                let found: HashSet<_> = pt.transport_methods().keys().collect();
127
                if requested != found {
128
                    warn!(
129
                        "Bug: PT {} succeeded, but did not give the same transports we asked for. ({:?} vs {:?})",
130
                        pt.identifier(),
131
                        found,
132
                        requested
133
                    );
134
                }
135
                self.running.push(pt);
136
            }
137
        }
138
    }
139

            
140
    /// Called to remove a pluggable transport from the shared state.
141
    fn remove_pt(&self, pt: PluggableClientTransport) {
142
        let mut state = self.state.write().expect("ptmgr state poisoned");
143
        for transport in pt.transport_methods().keys() {
144
            state.managed_cmethods.remove(transport);
145
        }
146
        // to satisfy clippy, and make it clear that this is a desired side-effect: doing this
147
        // shuts down the PT (asynchronously).
148
        drop(pt);
149
    }
150

            
151
    /// Run one step of the reactor. Returns true if the reactor should terminate.
152
    pub(crate) async fn run_one_step(&mut self) -> err::Result<bool> {
153
        use futures::future::Either;
154

            
155
        // FIXME(eta): This allocates a lot, which is technically unnecessary but requires careful
156
        //             engineering to get right. It's not really in the hot path, at least.
157
        let mut all_next_messages = self
158
            .running
159
            .iter_mut()
160
            // We could avoid the Box, but that'd require using unsafe to replicate what tokio::pin!
161
            // does under the hood.
162
            .map(|pt| Box::pin(pt.next_message()))
163
            .collect::<Vec<_>>();
164

            
165
        // We can't construct a select_all if all_next_messages is empty.
166
        let mut next_message = if all_next_messages.is_empty() {
167
            Either::Left(futures::future::pending())
168
        } else {
169
            Either::Right(futures::future::select_all(all_next_messages.iter_mut()).fuse())
170
        };
171

            
172
        select! {
173
            (result, idx, _) = next_message => {
174
                drop(all_next_messages); // no idea why NLL doesn't just infer this but sure
175

            
176
                match result {
177
                    Ok(m) => {
178
                        // FIXME(eta): We should forward the Status messages onto API consumers.
179
                        debug!("PT {} message: {:?}", self.running[idx].identifier(), m);
180
                    },
181
                    Err(e) => {
182
                        warn!("PT {} quit: {:?}", self.running[idx].identifier(), e);
183
                        let pt = self.running.remove(idx);
184
                        self.remove_pt(pt);
185
                    }
186
                }
187
            },
188
            spawn_result = self.spawning.next() => {
189
                drop(all_next_messages);
190
                // See the Warning in this field's documentation.
191
                let (covers, result) = spawn_result.expect("self.spawning should never dry up");
192
                self.handle_spawned(covers, result);
193
            }
194
            internal = self.rx.next() => {
195
                drop(all_next_messages);
196

            
197
                match internal {
198
                    Some(PtReactorMessage::Reconfigured) => {},
199
                    Some(PtReactorMessage::Spawn { pt, result }) => {
200
                        // Make sure we don't already have a running request.
201
                        if let Some(requests) = self.requests.get_mut(&pt) {
202
                            requests.push(result);
203
                            return Ok(false);
204
                        }
205
                        // Make sure we don't already have a binary for this PT.
206
                        for rpt in self.running.iter() {
207
                            if let Some(cmethod) = rpt.transport_methods().get(&pt) {
208
                                let _ = result.send(Ok(cmethod.clone()));
209
                                return Ok(false);
210
                            }
211
                        }
212
                        // We don't, so time to spawn one.
213
                        let (config, outbound_proxy) = {
214
                            let state = self.state.read().expect("ptmgr state poisoned");
215
                            (state.configured.get(&pt).cloned(), state.outbound_proxy.clone())
216
                        };
217

            
218
                        let Some(config) = config else {
219
                            let _ = result.send(Err(PtError::UnconfiguredTransportDueToConcurrentReconfiguration));
220
                            return Ok(false);
221
                        };
222

            
223
                        let TransportOptions::Managed(config) = config else {
224
                            let _ = result.send(Err(internal!("Tried to spawn an unmanaged transport").into()));
225
                            return Ok(false);
226
                        };
227

            
228
                        // Keep track of the request, and also fill holes in other protocols so
229
                        // we don't try and run another spawn request for those.
230
                        self.requests.entry(pt).or_default().push(result);
231
                        for proto in config.protocols.iter() {
232
                            self.requests.entry(proto.clone()).or_default();
233
                        }
234

            
235
                        // Add the spawn future to our pile of them.
236
                        let spawn_fut = Box::pin(
237
                            spawn_from_config(
238
                                self.rt.clone(),
239
                                self.state_dir.clone(),
240
                                config.clone(),
241
                                Arc::clone(&self.path_resolver),
242
                                outbound_proxy,
243
                            )
244
                            .map(|result| (config.protocols, result))
245
                        );
246
                        self.spawning.push(spawn_fut);
247
                    },
248
                    None => return Ok(true)
249
                }
250
            }
251
        }
252
        Ok(false)
253
    }
254
}
255

            
256
/// Spawn a managed `PluggableTransport` using a `ManagedTransportOptions`.
257
async fn spawn_from_config<R: Runtime>(
258
    rt: R,
259
    state_dir: PathBuf,
260
    cfg: ManagedTransportOptions,
261
    path_resolver: Arc<CfgPathResolver>,
262
    outbound_proxy: Option<ProxyProtocol>,
263
) -> Result<PluggableClientTransport, PtError> {
264
    // FIXME(eta): I really think this expansion should happen at builder validation time...
265

            
266
    let cfg_path = cfg.path;
267

            
268
    let binary_path = cfg_path
269
        .path(&path_resolver)
270
        .map_err(|e| PtError::PathExpansionFailed {
271
            path: cfg_path.clone(),
272
            error: e,
273
        })?;
274

            
275
    let filename = pt_identifier_as_path(&binary_path)?;
276

            
277
    // HACK(eta): Currently the state directory is named after the PT binary name. Maybe we should
278
    //            invent a better way of doing this?
279
    let new_state_dir = state_dir.join(filename);
280
    std::fs::create_dir_all(&new_state_dir).map_err(|e| PtError::StatedirCreateFailed {
281
        path: new_state_dir.clone(),
282
        error: Arc::new(e),
283
    })?;
284

            
285
    // FIXME(eta): make the rest of these parameters configurable
286
    let pt_common_params = PtCommonParameters::builder()
287
        .state_location(new_state_dir)
288
        .build()
289
        .expect("PtCommonParameters constructed incorrectly");
290

            
291
    // The PT spec defines `TOR_PT_PROXY` as a URI, so we only render the
292
    // structured `ProxyProtocol` to a string at this boundary.
293
    let pt_client_params = PtClientParameters::builder()
294
        .transports(cfg.protocols)
295
        .proxy_uri(outbound_proxy.as_ref().map(ToString::to_string))
296
        .build()
297
        .expect("PtClientParameters constructed incorrectly");
298

            
299
    let mut pt = PluggableClientTransport::new(
300
        binary_path,
301
        cfg.arguments,
302
        pt_common_params,
303
        pt_client_params,
304
    );
305
    pt.launch(rt).await?;
306
    Ok(pt)
307
}
308

            
309
/// Given a path to a binary for a pluggable transport, return an identifier for
310
/// that binary in a format that can be used as a path component.
311
fn pt_identifier_as_path(binary_path: impl AsRef<Path>) -> Result<PathBuf, PtError> {
312
    // Extract the final component.
313
    let mut filename =
314
        PathBuf::from(
315
            binary_path
316
                .as_ref()
317
                .file_name()
318
                .ok_or_else(|| PtError::NotAFile {
319
                    path: binary_path.as_ref().to_path_buf(),
320
                })?,
321
        );
322

            
323
    // Strip an "exe" off the end, if appropriate.
324
    if let Some(ext) = filename.extension() {
325
        if ext.eq_ignore_ascii_case(std::env::consts::EXE_EXTENSION) {
326
            filename.set_extension("");
327
        }
328
    }
329

            
330
    Ok(filename)
331
}
332

            
333
/// Given a path to a binary for a pluggable transport, return an identifier for
334
/// that binary in human-readable form.
335
pub(crate) fn pt_identifier(binary_path: impl AsRef<Path>) -> Result<String, PtError> {
336
    Ok(pt_identifier_as_path(binary_path)?
337
        .to_string_lossy()
338
        .to_string())
339
}