1
//! Implement a simple proxy that relays connections over Tor.
2
//!
3
//! A proxy is launched with [`bind_proxy()`], which opens listener ports.
4
//! `StreamProxy::run_proxy` then listens for new
5
//! connections, handles an appropriate handshake,
6
//! and then relays traffic as appropriate.
7

            
8
semipublic_mod! {
9
    #[cfg(feature="http-connect")]
10
    mod http_connect;
11
    mod socks;
12
    pub(crate) mod port_info;
13
}
14

            
15
use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, Error as IoError};
16
use futures::stream::StreamExt;
17
use std::net::IpAddr;
18
use std::sync::Arc;
19
use tor_basic_utils::error_sources::ErrorSources;
20
use tor_rtcompat::{NetStreamProvider, SpawnExt};
21
use tracing::{debug, error, info, instrument, warn};
22

            
23
#[allow(unused)]
24
use arti_client::HasKind;
25
use arti_client::TorClient;
26
#[cfg(feature = "rpc")]
27
use arti_rpcserver::RpcMgr;
28
use tor_config::Listen;
29
use tor_error::warn_report;
30
use tor_rtcompat::{NetStreamListener, Runtime};
31
use tor_socksproto::SocksAuth;
32

            
33
use anyhow::{Context, Result, anyhow};
34

            
35
/// Placeholder type when RPC is disabled at compile time.
36
#[cfg(not(feature = "rpc"))]
37
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
38
pub(crate) enum RpcMgr {}
39

            
40
/// A Key used to isolate connections.
41
///
42
/// Composed of an usize (representing which listener socket accepted
43
/// the connection, the source IpAddr of the client, and the
44
/// authentication string provided by the client).
45
#[derive(Debug, Clone, PartialEq, Eq)]
46
struct StreamIsolationKey(ListenerIsolation, ProvidedIsolation);
47

            
48
/// Isolation information provided through the proxy connection
49
#[derive(Debug, Clone, PartialEq, Eq)]
50
enum ProvidedIsolation {
51
    /// The socks isolation itself.
52
    LegacySocks(SocksAuth),
53
    /// A bytestring provided as isolation with the extended Socks5 username/password protocol.
54
    ExtendedSocks {
55
        /// Which format was negotiated?
56
        ///
57
        /// (At present, different format codes can't share a circuit.)
58
        format_code: u8,
59
        /// What's the isolation string?
60
        isolation: Box<[u8]>,
61
    },
62
    #[cfg(feature = "http-connect")]
63
    /// An HTTP token, taken from headers.
64
    Http(http_connect::Isolation),
65
}
66

            
67
impl arti_client::isolation::IsolationHelper for StreamIsolationKey {
68
    fn compatible_same_type(&self, other: &Self) -> bool {
69
        self == other
70
    }
71

            
72
    fn join_same_type(&self, other: &Self) -> Option<Self> {
73
        if self == other {
74
            Some(self.clone())
75
        } else {
76
            None
77
        }
78
    }
79

            
80
    fn enables_long_lived_circuits(&self) -> bool {
81
        use ProvidedIsolation as PI;
82
        use SocksAuth as SA;
83
        match &self.1 {
84
            PI::LegacySocks(SA::Socks4(auth)) => !auth.is_empty(),
85
            PI::LegacySocks(SA::Username(uname, pass)) => !(uname.is_empty() && pass.is_empty()),
86
            PI::LegacySocks(_) => false,
87
            PI::ExtendedSocks { isolation, .. } => !isolation.is_empty(),
88
            #[cfg(feature = "http-connect")]
89
            PI::Http(isolation) => !isolation.is_empty(),
90
        }
91
    }
92
}
93

            
94
/// Size of read buffer to apply to application data streams
95
/// and Tor data streams when copying.
96
//
97
// This particular value is chosen more or less arbitrarily.
98
// Larger values let us do fewer reads from the application,
99
// but consume more memory.
100
//
101
// (The default value for BufReader is 8k as of this writing.)
102
const APP_STREAM_BUF_LEN: usize = 4096;
103

            
104
const _: () = {
105
    assert!(APP_STREAM_BUF_LEN >= tor_socksproto::SOCKS_BUF_LEN);
106
};
107

            
108
/// NOTE: The following documentation belongs in a spec.
109
/// But for now, it's our best attempt to document the design and protocol
110
/// implemented here
111
/// for integrating proxies with our RPC system. --nickm
112
///
113
/// Roughly speaking:
114
///
115
/// ## Key concepts
116
///
117
/// A data stream is "RPC-visible" if, when it is created via a proxy connection,
118
/// the RPC system is told about it.
119
///
120
/// Every RPC-visible stream is associated with a given RPC object when it is created.
121
/// (Since the RPC object is being specified in the proxy protocol,
122
/// it must be one with an externally visible Object ID.
123
/// Such Object IDs are cryptographically unguessable and unforgeable,
124
/// and are qualified with a unique identifier for their associated RPC session.)
125
/// Call this RPC Object the "target" object for now.
126
/// This target RPC object must implement
127
/// the [`ConnectWithPrefs`](arti_client::rpc::ConnectWithPrefs) special method.
128
///
129
/// Right now, there are two general kinds of objects that implement this method:
130
/// client-like objects, and one-shot clients.
131
///
132
/// A client-like object is either a `TorClient` or an RPC `Session`.
133
/// It knows about and it is capable of opening multiple data streams.
134
/// Using it as the target object for a proxy connection tells Arti
135
/// that the resulting data stream (if any)
136
/// should be built by it, and associated with its RPC session.
137
///
138
/// An application gets a TorClient by asking the session for one,
139
/// or for asking a TorClient to give you a new variant clone of itself.
140
///
141
/// A one-shot client is an `arti_rpcserver::stream::OneshotClient`.
142
/// It is created from a client-like object, but can only be used for a single data stream.
143
/// When created, it it not yet connected or trying to connect to anywhere:
144
/// the act of using it as the target Object for a proxy connection causes
145
/// it to begin connecting.
146
///
147
/// An application gets a `OneShotClient` by calling `arti:new_oneshot_client`
148
/// on any client-like object.
149
///
150
/// ## The Proxy protocol
151
///
152
/// See the specification for
153
/// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
154
/// for full details on integrating RPC with SOCKS.
155
/// For HTTP integration, see
156
/// [the relevant section of prop365](https://spec.torproject.org/proposals/365-http-connect-ext.html#x-tor-rpc-target-arti-rpc-support).
157
///
158
/// ### Further restrictions on Object IDs and isolation
159
///
160
/// In some cases,
161
/// the RPC Object ID may denote an object
162
/// that already includes information about its intended stream isolation.
163
/// In such cases, the stream isolation MUST be blank.
164
/// Implementations MUST reject non-blank stream isolation in such cases.
165
///
166
/// In some cases, the RPC object ID may denote an object
167
/// that already includes information
168
/// about its intended destination address and port.
169
/// In such cases, the destination address MUST be `0.0.0.0` or `::`
170
/// (encoded either as an IPv4 address, an IPv6 address, or a hostname)
171
/// and the destination port MUST be 0.
172
/// Implementations MUST reject other addresses in such cases.
173
///
174
/// ### Another proposed change
175
///
176
/// We could add a new method to clients, with a name like
177
/// "open_stream" or "connect_stream".
178
/// This method would include all target and isolation information in its parameters.
179
/// It would actually create a DataStream immediately, tell it to begin connecting,
180
/// and return an externally visible object ID.
181
/// The RPC protocol could be used to watch the DataStream object,
182
/// to see when it was connected.
183
///
184
/// The resulting DataStream object could also be used as the target of a proxy connection.
185
/// We would require in such a case that no isolation be provided in the proxy handshake,
186
/// and that the target address was (e.g.) INADDR_ANY.
187
///
188
/// ## Intended use cases (examples)
189
///
190
/// (These examples assume that the application
191
/// already knows the proxy port it should use.
192
/// I'm leaving out the isolation strings as orthogonal.)
193
///
194
/// These are **NOT** the only possible use cases;
195
/// they're just the two that help understand this system best (I hope).
196
///
197
/// ### Case 1: Using a client-like object directly.
198
///
199
/// Here the application has authenticated to RPC
200
/// and gotten the session ID `SESSION-1`.
201
/// (In reality, this would be a longer ID, and full of crypto).
202
///
203
/// The application wants to open a new stream to www.example.com.
204
/// They don't particularly care about isolation,
205
/// but they do want their stream to use their RPC session.
206
/// They don't want an Object ID for the stream.
207
///
208
/// To do this, they make a SOCKS connection to arti,
209
/// with target address www.example.com.
210
/// They set the username to `<torS0X>0SESSION-1`,
211
/// and the password to the empty string.
212
///
213
/// (Alternatively, it could use HTTP CONNECT, setting
214
/// Tor-Rpc-Target to SESSION-1.)
215
///
216
/// Arti looks up the Session object via the `SESSION-1` object ID
217
/// and tells it (via the ConnectWithPrefs special method)
218
/// to connect to www.example.com.
219
/// The session creates a new DataStream using its internal TorClient,
220
/// but does not register the stream with an RPC Object ID.
221
/// Arti proxies the application's connection through this DataStream.
222
///
223
///
224
/// ### Case 2: Creating an identifiable stream.
225
///
226
/// Here the application wants to be able to refer to its DataStream
227
/// after the stream is created.
228
/// As before, we assume that it's on an RPC session
229
/// where the Session ID is `SESSION-1`.
230
///
231
/// The application sends an RPC request of the form:
232
/// `{"id": 123, "obj": "SESSION-1", "method": "arti:new_oneshot_client", "params": {}}`
233
///
234
/// It receives a reply like:
235
/// `{"id": 123, "result": {"id": "STREAM-1"} }`
236
///
237
/// (In reality, `STREAM-1` would also be longer and full of crypto.)
238
///
239
/// Now the application has an object called `STREAM-1` that is not yet a connected
240
/// stream, but which may become one.
241
///
242
/// This time, it wants to set its isolation string to "xyzzy".
243
///
244
/// The application opens a socks connection as before.
245
/// For the username it sends `<torS0X>0STREAM-1`,
246
/// and for the password it sends `xyzzy`.
247
///
248
/// (Alternatively, it could use HTTP CONNECT, setting Tor-Isolation to xyzzy,
249
/// and Tor-Rpc-Target to STREAM-1.)
250
///
251
/// Now Arti looks up the `RpcDataStream` object via `STREAM-1`,
252
/// and tells it (via the ConnectWithPrefs special method)
253
/// to connect to www.example.com.
254
/// This causes the `RpcDataStream` internally to create a new `DataStream`,
255
/// and to store that `DataStream` in itself.
256
/// The `RpcDataStream` with Object ID `STREAM-1`
257
/// is now an alias for the newly created `DataStream`.
258
/// Arti proxies the application's connection through that `DataStream`.
259
///
260
#[cfg(feature = "rpc")]
261
#[allow(dead_code)]
262
mod socks_and_rpc {}
263

            
264
/// Information used to implement a proxy listener.
265
struct ProxyContext<R: Runtime> {
266
    /// A TorClient to use (by default) to anonymize requests.
267
    tor_client: TorClient<R>,
268
    /// If present, an RpcMgr to use when for attaching requests to RPC
269
    /// sessions.
270
    #[cfg(feature = "rpc")]
271
    rpc_mgr: Option<Arc<arti_rpcserver::RpcMgr>>,
272
}
273

            
274
/// Type alias for the isolation information associated with a given proxy
275
/// connection _before_ any negotiation occurs.
276
///
277
/// Currently this is an index for which listener accepted the connection, plus
278
/// the address of the client that connected to the proxy port.
279
type ListenerIsolation = (usize, IpAddr);
280

            
281
/// write_all the data to the writer & flush the writer if write_all is successful.
282
async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
283
where
284
    W: AsyncWrite + Unpin,
285
{
286
    writer
287
        .write_all(buf)
288
        .await
289
        .context("Error while writing proxy reply")?;
290
    writer
291
        .flush()
292
        .await
293
        .context("Error while flushing proxy stream")
294
}
295

            
296
/// write_all the data to the writer & close the writer if write_all is successful.
297
async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
298
where
299
    W: AsyncWrite + Unpin,
300
{
301
    writer
302
        .write_all(buf)
303
        .await
304
        .context("Error while writing proxy reply")?;
305
    writer
306
        .close()
307
        .await
308
        .context("Error while closing proxy stream")
309
}
310

            
311
/// Return true if a given IoError, when received from accept, is a fatal
312
/// error.
313
fn accept_err_is_fatal(err: &IoError) -> bool {
314
    #![allow(clippy::match_like_matches_macro)]
315

            
316
    /// Re-declaration of WSAEMFILE with the right type to match
317
    /// `raw_os_error()`.
318
    #[cfg(windows)]
319
    const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
320

            
321
    // Currently, EMFILE and ENFILE aren't distinguished by ErrorKind;
322
    // we need to use OS-specific errors. :P
323
    match err.raw_os_error() {
324
        #[cfg(unix)]
325
        Some(libc::EMFILE) | Some(libc::ENFILE) => false,
326
        #[cfg(windows)]
327
        Some(WSAEMFILE) => false,
328
        _ => true,
329
    }
330
}
331

            
332
/// A stream proxy listening on one or more local ports, ready to relay traffic.
333
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
334
#[must_use]
335
pub(crate) struct StreamProxy<R: Runtime> {
336
    /// A tor client to use when relaying traffic.
337
    tor_client: TorClient<R>,
338
    /// The listeners that we've actually bound to.
339
    listeners: Vec<<R as NetStreamProvider>::Listener>,
340
    /// An RPC manager to use when incoming requests are tied to streams.
341
    rpc_mgr: Option<Arc<RpcMgr>>,
342
}
343

            
344
/// Launch a proxy to listen on a given set of ports.
345
///
346
/// Requires a `runtime` to use for launching tasks and handling
347
/// timeouts, and a `tor_client` to use in connecting over the Tor
348
/// network.
349
///
350
/// Returns the proxy, and a list of the ports that we have
351
/// bound to.
352
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
353
#[instrument(skip_all, level = "trace")]
354
pub(crate) async fn bind_proxy<R: Runtime>(
355
    runtime: R,
356
    tor_client: TorClient<R>,
357
    listen: Listen,
358
    rpc_mgr: Option<Arc<RpcMgr>>,
359
) -> Result<StreamProxy<R>> {
360
    if !listen.is_loopback_only() {
361
        warn!(
362
            "Configured to listen for proxy connections on non-local addresses. \
363
            This is usually insecure! We recommend listening on localhost only."
364
        );
365
    }
366

            
367
    let mut listeners = Vec::new();
368

            
369
    // Try to bind to the listener ports.
370
    match listen.ip_addrs() {
371
        Ok(addrgroups) => {
372
            for addrgroup in addrgroups {
373
                for addr in addrgroup {
374
                    match runtime.listen(&addr).await {
375
                        Ok(listener) => {
376
                            let bound_addr = listener.local_addr()?;
377
                            info!("Listening on {:?}", bound_addr);
378
                            listeners.push(listener);
379
                        }
380
                        #[cfg(unix)]
381
                        Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
382
                            warn_report!(e, "Address family not supported {}", addr);
383
                        }
384
                        Err(ref e) => {
385
                            return Err(anyhow!("Can't listen on {}: {e}", addr));
386
                        }
387
                    }
388
                }
389
                // TODO: We are supposed to fail if every address in the group failed!
390
            }
391
        }
392
        Err(e) => warn_report!(e, "Invalid listen spec"),
393
    }
394

            
395
    // We weren't able to bind any ports: There's nothing to do.
396
    if listeners.is_empty() {
397
        error!("Couldn't open any listeners.");
398
        return Err(anyhow!("Couldn't open listeners"));
399
    }
400

            
401
    Ok(StreamProxy {
402
        tor_client,
403
        listeners,
404
        rpc_mgr,
405
    })
406
}
407

            
408
impl<R: Runtime> StreamProxy<R> {
409
    /// Run indefinitely, processing incoming connections and relaying traffic.
410
    pub(crate) async fn run_proxy(self) -> Result<()> {
411
        let StreamProxy {
412
            tor_client,
413
            listeners,
414
            rpc_mgr,
415
        } = self;
416
        run_proxy_with_listeners(tor_client, listeners, rpc_mgr).await
417
    }
418

            
419
    /// Return a list of the ports that we've bound to.
420
    pub(crate) fn port_info(&self) -> Result<Vec<port_info::Port>> {
421
        let mut ports = Vec::new();
422
        for listener in &self.listeners {
423
            let address = listener.local_addr()?;
424
            ports.extend([
425
                port_info::Port {
426
                    protocol: port_info::SupportedProtocol::Socks,
427
                    address: address.into(),
428
                },
429
                // If http-connect is enabled, every socks proxy is also http.
430
                #[cfg(feature = "http-connect")]
431
                port_info::Port {
432
                    protocol: port_info::SupportedProtocol::Http,
433
                    address: address.into(),
434
                },
435
            ]);
436
        }
437

            
438
        Ok(ports)
439
    }
440
}
441

            
442
/// Launch a proxy from a given set of already bound listeners.
443
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
444
#[instrument(skip_all, level = "trace")]
445
pub(crate) async fn run_proxy_with_listeners<R: Runtime>(
446
    tor_client: TorClient<R>,
447
    listeners: Vec<<R as tor_rtcompat::NetStreamProvider>::Listener>,
448
    rpc_mgr: Option<Arc<RpcMgr>>,
449
) -> Result<()> {
450
    // Create a stream of (incoming socket, listener_id) pairs, selected
451
    // across all the listeners.
452
    let mut incoming = futures::stream::select_all(
453
        listeners
454
            .into_iter()
455
            .map(NetStreamListener::incoming)
456
            .enumerate()
457
            .map(|(listener_id, incoming_conns)| {
458
                incoming_conns.map(move |socket| (socket, listener_id))
459
            }),
460
    );
461

            
462
    // Loop over all incoming connections.  For each one, call
463
    // handle_proxy_conn() in a new task.
464
    while let Some((stream, sock_id)) = incoming.next().await {
465
        let (stream, addr) = match stream {
466
            Ok((s, a)) => (s, a),
467
            Err(err) => {
468
                if accept_err_is_fatal(&err) {
469
                    return Err(err).context("Failed to receive incoming stream on proxy port");
470
                } else {
471
                    warn_report!(err, "Incoming stream failed");
472
                    continue;
473
                }
474
            }
475
        };
476
        let proxy_context = ProxyContext {
477
            tor_client: tor_client.clone(),
478
            #[cfg(feature = "rpc")]
479
            rpc_mgr: rpc_mgr.clone(),
480
        };
481
        tor_client.runtime().spawn(async move {
482
            let res = handle_proxy_conn(proxy_context, stream, (sock_id, addr.ip())).await;
483
            if let Err(e) = res {
484
                report_proxy_error(e);
485
            }
486
        })?;
487
    }
488

            
489
    Ok(())
490
}
491

            
492
/// A (possibly) supported proxy protocol.
493
enum ProxyProtocols {
494
    /// Some HTTP/1 command or other.
495
    ///
496
    /// (We only support CONNECT and OPTIONS, but we reject other commands in [`http_connect`].)
497
    Http1,
498
    /// SOCKS4 or SOCKS5.
499
    Socks,
500
}
501

            
502
/// Look at the first byte of a proxy connection, and guess what protocol
503
/// what protocol it is trying to speak.
504
fn classify_protocol_from_first_byte(byte: u8) -> Option<ProxyProtocols> {
505
    match byte {
506
        b'a'..=b'z' | b'A'..=b'Z' => Some(ProxyProtocols::Http1),
507
        4 | 5 => Some(ProxyProtocols::Socks),
508
        _ => None,
509
    }
510
}
511

            
512
/// Handle a single connection `stream` from an application.
513
///
514
/// Depending on what protocol the application is speaking
515
/// (and what protocols we support!), negotiate an appropriate set of options,
516
/// and relay traffic to and from the application.
517
async fn handle_proxy_conn<R, S>(
518
    context: ProxyContext<R>,
519
    stream: S,
520
    isolation_info: ListenerIsolation,
521
) -> Result<()>
522
where
523
    R: Runtime,
524
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
525
{
526
    let mut stream = BufReader::with_capacity(APP_STREAM_BUF_LEN, stream);
527
    use futures::AsyncBufReadExt as _;
528

            
529
    let buf: &[u8] = stream.fill_buf().await?;
530
    if buf.is_empty() {
531
        // connection closed
532
        return Ok(());
533
    }
534
    match classify_protocol_from_first_byte(buf[0]) {
535
        Some(ProxyProtocols::Http1) => {
536
            cfg_if::cfg_if! {
537
                if #[cfg(feature="http-connect")] {
538
                    http_connect::handle_http_conn(context, stream, isolation_info).await
539
                } else {
540
                    write_all_and_close(&mut stream, socks::WRONG_PROTOCOL_PAYLOAD).await?;
541
                    Ok(())
542
                }
543
            }
544
        }
545
        Some(ProxyProtocols::Socks) => {
546
            socks::handle_socks_conn(context, stream, isolation_info).await
547
        }
548
        None => {
549
            // We have no idea what protocol the client expects,
550
            // so we have no idea how to tell it so.
551
            warn!(
552
                "Unrecognized protocol on proxy listener (first byte {:x})",
553
                buf[0]
554
            );
555
            Ok(())
556
        }
557
    }
558
}
559

            
560
/// If any source of the provided `error` is a [`tor_proto::Error`], return a reference to that
561
/// [`tor_proto::Error`].
562
fn extract_proto_err<'a>(
563
    error: &'a (dyn std::error::Error + 'static),
564
) -> Option<&'a tor_proto::Error> {
565
    for error in ErrorSources::new(error) {
566
        if let Some(downcast) = error.downcast_ref::<tor_proto::Error>() {
567
            return Some(downcast);
568
        }
569
    }
570

            
571
    None
572
}
573

            
574
/// Report an error that occurred within a single proxy task.
575
fn report_proxy_error(e: anyhow::Error) {
576
    use tor_proto::Error as PE;
577
    // TODO: In the long run it might be a good idea to use an ErrorKind here if we can get one.
578
    // This is a bit of a kludge based on the fact that we're using anyhow.
579
    //
580
    // TODO: It might be handy to have a way to collapse CircuitClosed into EOF earlier.
581
    // But that loses information, so it should be optional.
582
    //
583
    // TODO: Maybe we should look at io::ErrorKind as well, if it's there.  That's another reason
584
    // to discard or restrict our anyhow usage.
585
    match extract_proto_err(e.as_ref()) {
586
        Some(PE::CircuitClosed) => debug!("Connection exited with circuit close"),
587
        // TODO: warn_report doesn't work on anyhow::Error.
588
        _ => warn!("connection exited with error: {}", tor_error::Report(e)),
589
    }
590
}