1
//! The `proxy` subcommand.
2

            
3
use std::sync::Arc;
4

            
5
use anyhow::{Context, Result};
6
use cfg_if::cfg_if;
7
use clap::ArgMatches;
8
#[allow(unused)]
9
use tor_config_path::CfgPathResolver;
10
use tracing::{info, instrument, warn};
11

            
12
use arti_client::TorClientConfig;
13
use tor_config::{ConfigurationSources, Listen};
14
use tor_rtcompat::ToplevelRuntime;
15

            
16
#[cfg(feature = "dns-proxy")]
17
use crate::dns;
18
use crate::{
19
    ArtiConfig, TorClient, exit, process,
20
    proxy::{self, ListenProtocols, port_info},
21
    reload_cfg,
22
};
23

            
24
#[cfg(feature = "rpc")]
25
use crate::rpc;
26

            
27
#[cfg(feature = "onion-service-service")]
28
use crate::onion_proxy;
29

            
30
/// Shorthand for a boxed and pinned Future.
31
type PinnedFuture<T> = std::pin::Pin<Box<dyn futures::Future<Output = T>>>;
32

            
33
/// Run the `proxy` subcommand.
34
#[instrument(skip_all, level = "trace")]
35
#[allow(clippy::cognitive_complexity)]
36
pub(crate) fn run<R: ToplevelRuntime>(
37
    runtime: R,
38
    proxy_matches: &ArgMatches,
39
    cfg_sources: ConfigurationSources,
40
    config: ArtiConfig,
41
    client_config: TorClientConfig,
42
) -> Result<()> {
43
    // Override configured listen addresses from the command line.
44
    // This implies listening on localhost ports.
45

            
46
    // TODO: Parse a string rather than calling new_localhost.
47
    let socks_listen = match proxy_matches.get_one::<u16>("socks-port") {
48
        Some(p) => Listen::new_localhost(*p),
49
        None => config.proxy().socks_listen.clone(),
50
    };
51

            
52
    // TODO: Parse a string rather than calling new_localhost.
53
    let dns_listen = match proxy_matches.get_one::<u16>("dns-port") {
54
        Some(p) => Listen::new_localhost(*p),
55
        None => config.proxy().dns_listen.clone(),
56
    };
57

            
58
    if !socks_listen.is_empty() {
59
        info!(
60
            "Starting Arti {} in proxy mode on {} ...",
61
            env!("CARGO_PKG_VERSION"),
62
            socks_listen
63
        );
64
    }
65

            
66
    if let Some(listen) = {
67
        // https://github.com/metrics-rs/metrics/issues/567
68
        config
69
            .metrics
70
            .prometheus
71
            .listen
72
            .single_address_legacy()
73
            .context("can only listen on a single address for Prometheus metrics")?
74
    } {
75
        cfg_if! {
76
            if #[cfg(feature = "metrics")] {
77
                metrics_exporter_prometheus::PrometheusBuilder::new()
78
                    .with_http_listener(listen)
79
                    .install()
80
                    .with_context(|| format!(
81
                        "set up Prometheus metrics exporter on {listen}"
82
                    ))?;
83
                info!("Arti Prometheus metrics export scraper endpoint http://{listen}");
84
            } else {
85
                return Err(anyhow::anyhow!(
86
        "`metrics.prometheus.listen` config set but `metrics` cargo feature compiled out in `arti` crate"
87
                ));
88
            }
89
        }
90
    }
91

            
92
    #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
93
    process::use_max_file_limit(&config);
94

            
95
    let rt_copy = runtime.clone();
96
    rt_copy.block_on(run_proxy(
97
        runtime,
98
        socks_listen,
99
        dns_listen,
100
        config.proxy().protocols(),
101
        cfg_sources,
102
        config,
103
        client_config,
104
    ))?;
105

            
106
    Ok(())
107
}
108

            
109
/// Run the main loop of the proxy.
110
///
111
/// # Panics
112
///
113
/// Currently, might panic if things go badly enough wrong
114
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
115
#[cfg_attr(docsrs, doc(cfg(feature = "experimental-api")))]
116
#[instrument(skip_all, level = "trace")]
117
async fn run_proxy<R: ToplevelRuntime>(
118
    runtime: R,
119
    socks_listen: Listen,
120
    dns_listen: Listen,
121
    protocols: ListenProtocols,
122
    config_sources: ConfigurationSources,
123
    arti_config: ArtiConfig,
124
    client_config: TorClientConfig,
125
) -> Result<()> {
126
    // Using OnDemand arranges that, while we are bootstrapping, incoming connections wait
127
    // for bootstrap to complete, rather than getting errors.
128
    use arti_client::BootstrapBehavior;
129
    use futures::FutureExt;
130

            
131
    // TODO: We may instead want to provide a way to get these items out of TorClient.
132
    let fs_mistrust = client_config.fs_mistrust().clone();
133
    let path_resolver: CfgPathResolver = AsRef::<CfgPathResolver>::as_ref(&client_config).clone();
134

            
135
    let defer_bootstrap = arti_config.application().defer_bootstrap;
136

            
137
    let bootstrap_behavior = match defer_bootstrap {
138
        true => BootstrapBehavior::Manual,
139
        false => BootstrapBehavior::OnDemand,
140
    };
141

            
142
    let client_builder = TorClient::with_runtime(runtime.clone())
143
        .config(client_config)
144
        .bootstrap_behavior(bootstrap_behavior);
145
    let client = client_builder.create_unbootstrapped_async().await?;
146

            
147
    let launchable_client = Arc::new(reload_cfg::LaunchableTorClient::new(
148
        Arc::clone(&client),
149
        arti_config.application(),
150
    ));
151

            
152
    #[allow(unused_mut)]
153
    let mut reconfigurable_modules: Vec<Arc<dyn reload_cfg::ReconfigurableModule>> = vec![
154
        Arc::clone(&launchable_client) as _,
155
        Arc::new(reload_cfg::Application::new(arti_config.clone())),
156
    ];
157

            
158
    cfg_if::cfg_if! {
159
        if #[cfg(feature = "onion-service-service")] {
160
            let have_onion_svc = if defer_bootstrap {
161
                let onion_services = onion_proxy::ProxySet::new_deferred(Arc::clone(&client));
162
                reconfigurable_modules.push(Arc::new(onion_services));
163
                arti_config.onion_services.values().any(|c| *c.svc_cfg.enabled())
164
            } else {
165
                let onion_services =
166
                    onion_proxy::ProxySet::launch_new(Arc::clone(&client), arti_config.onion_services.clone())?;
167
                let have_onion_svc = !onion_services.is_empty();
168
                reconfigurable_modules.push(Arc::new(onion_services));
169
                have_onion_svc
170
            };
171
        } else {
172
            let have_onion_svc = false;
173
        }
174
    };
175

            
176
    // We weak references here to prevent the thread spawned by watch_for_config_changes from
177
    // keeping these modules alive after this function exits.
178
    //
179
    // NOTE: reconfigurable_modules stores the only strong references to these modules,
180
    // so we must keep the variable alive until the end of the function
181
    let weak_modules = reconfigurable_modules.iter().map(Arc::downgrade).collect();
182
    reload_cfg::watch_for_config_changes(
183
        client.runtime(),
184
        config_sources,
185
        &arti_config,
186
        weak_modules,
187
    )?;
188

            
189
    cfg_if::cfg_if! {
190
        if #[cfg(feature = "rpc")] {
191
            let rpc_data = rpc::launch_rpc_mgr(
192
                &runtime,
193
                &arti_config.rpc,
194
                &path_resolver,
195
                &fs_mistrust,
196
                client.clone(),
197
                launchable_client.clone(),
198
            )
199
            .await?;
200
            let (rpc_mgr, mut rpc_state_sender) = rpc_data
201
                .map(|d| (d.rpc_mgr, d.rpc_state_sender))
202
                .unzip();
203
        } else {
204
            let rpc_mgr = None;
205
        }
206
    }
207

            
208
    // The options that we'll use for our listening proxy sockets.
209
    let mut listen_options = tor_rtcompat::TcpListenOptions::builder();
210
    listen_options
211
        .common()
212
        .send_buffer_size(Some(arti_config.proxy().socket_send_buf_size.as_usize()))
213
        .recv_buffer_size(Some(arti_config.proxy().socket_recv_buf_size.as_usize()));
214
    let listen_options = listen_options.build()?;
215

            
216
    let mut proxy: Vec<PinnedFuture<Result<()>>> = Vec::new();
217
    let mut ports = Vec::new();
218
    if !socks_listen.is_empty() {
219
        let runtime = runtime.clone();
220
        let client = client.isolated_client();
221
        let socks_listen = socks_listen.clone();
222
        let listener_type = protocols.to_string();
223

            
224
        let stream_proxy = proxy::bind_proxy(
225
            runtime,
226
            client,
227
            socks_listen,
228
            listen_options,
229
            protocols,
230
            rpc_mgr,
231
        )
232
        .await
233
        .with_context(|| format!("Unable to launch {listener_type} proxy"))?;
234
        let port_info = stream_proxy.port_info()?;
235

            
236
        ports.extend(port_info);
237

            
238
        let failure_message = format!("{listener_type} proxy died unexpectedly");
239
        let proxy_future = stream_proxy
240
            .run_proxy()
241
            .map(|future_result| future_result.context(failure_message));
242
        proxy.push(Box::pin(proxy_future));
243
    }
244

            
245
    #[cfg(feature = "dns-proxy")]
246
    if !dns_listen.is_empty() {
247
        let runtime = runtime.clone();
248
        let client = client.isolated_client();
249
        let dns_proxy = dns::bind_dns_resolver(runtime, client, dns_listen)
250
            .await
251
            .context("Unable to launch DNS proxy")?;
252
        ports.extend(dns_proxy.port_info().context("Unable to find DNS ports")?);
253
        let proxy_future = dns_proxy
254
            .run_dns_proxy()
255
            .map(|future_result| future_result.context("DNS proxy died unexpectedly"));
256
        proxy.push(Box::pin(proxy_future));
257
    }
258

            
259
    #[cfg(not(feature = "dns-proxy"))]
260
    if !dns_listen.is_empty() {
261
        warn!(
262
            "Tried to specify a DNS proxy address, but Arti was built without dns-proxy support."
263
        );
264
        return Ok(());
265
    }
266

            
267
    if proxy.is_empty() {
268
        if !have_onion_svc {
269
            // TODO: rename "socks_listen" to "proxy_listen", preserving compat, once http-connect is stable.
270
            warn!(
271
                "No proxy address set; \
272
                specify -p PORT (to override `socks_listen`) \
273
                or -d PORT (to override `dns_listen`). \
274
                Alternatively, use the `socks_listen` or `dns_listen` configuration options."
275
            );
276
            return Ok(());
277
        } else {
278
            // Push a dummy future to appease future::select_all,
279
            // which expects a non-empty list
280
            proxy.push(Box::pin(futures::future::pending()));
281
        }
282
    }
283

            
284
    cfg_if::cfg_if! {
285
        if #[cfg(feature="rpc")] {
286
            if let Some(rpc_state_sender) = &mut rpc_state_sender {
287
                rpc_state_sender.set_stream_listeners(&ports[..]);
288
            }
289
        }
290
    }
291

            
292
    {
293
        let port_info = port_info::PortInfo { ports };
294
        let port_info_file = arti_config
295
            .storage()
296
            .port_info_file
297
            .path(&path_resolver)
298
            .context("Can't find path for port_info_file")?;
299
        if port_info_file.to_str() != Some("") {
300
            port_info.write_to_file(&fs_mistrust, &port_info_file)?;
301
        }
302
    }
303

            
304
    let proxy = futures::future::select_all(proxy).map(|(finished, _index, _others)| finished);
305
    futures::select!(
306
        r = exit::wait_for_ctrl_c().fuse()
307
            => r.context("waiting for termination signal"),
308
        r = proxy.fuse()
309
            => r,
310
        r = async {
311
            if defer_bootstrap {
312
                info!("Bootstrapping deferred.");
313
            } else {
314
                client.bootstrap().await?;
315
                if !socks_listen.is_empty() {
316
                    info!("Sufficiently bootstrapped; proxy now functional.");
317
                } else {
318
                    info!("Sufficiently bootstrapped.");
319
                }
320
            }
321
            futures::future::pending::<Result<()>>().await
322
        }.fuse()
323
            => r.context("bootstrap"),
324
    )?;
325

            
326
    // The modules can be dropped now, because we are exiting.
327
    drop(reconfigurable_modules);
328

            
329
    Ok(())
330
}