1
//! Implement an HTTP1 CONNECT proxy using `hyper`.
2
//!
3
//! Note that Tor defines several extensions to HTTP CONNECT;
4
//! See [the spec](spec.torproject.org/http-connect.html)
5
//! for more information.
6

            
7
use super::{ListenerIsolation, ProxyContext};
8
use anyhow::{Context as _, anyhow};
9
use arti_client::{StreamPrefs, TorAddr};
10
use futures::{AsyncRead, AsyncWrite, io::BufReader};
11
use http::{Method, StatusCode, response::Builder as ResponseBuilder};
12
use hyper::{Response, server::conn::http1::Builder as ServerBuilder, service::service_fn};
13
use safelog::{Sensitive as Sv, sensitive as sv};
14
use std::sync::Arc;
15
use tor_error::{ErrorKind, ErrorReport as _, HasKind, into_internal, warn_report};
16
use tor_rtcompat::Runtime;
17
use tor_rtcompat::SpawnExt as _;
18
use tracing::{instrument, warn};
19

            
20
use hyper_futures_io::FuturesIoCompat;
21

            
22
#[cfg(feature = "rpc")]
23
use {crate::rpc::conntarget::ConnTarget, tor_rpcbase as rpc};
24

            
25
cfg_if::cfg_if! {
26
    if #[cfg(feature="rpc")] {
27
        /// Error type returned from a failed connect_with_prefs.
28
        type ClientError = Box<dyn arti_client::rpc::ClientConnectionError>;
29
    } else {
30
        /// Error type returned from a failed connect_with_prefs.
31
        type ClientError = arti_client::Error;
32
    }
33
}
34

            
35
/// Request type that we receive from Hyper.
36
type Request = hyper::Request<hyper::body::Incoming>;
37

            
38
/// We use "String" as our body type, since we only return a body on error,
39
/// in which case it already starts life as a formatted string.
40
///
41
/// (We could use () or `Empty` for our (200 OK) replies,
42
/// but empty strings are cheap enough that it isn't worth it.)
43
type Body = String;
44

            
45
/// A value used to isolate streams received via HTTP CONNECT.
46
#[derive(Clone, Debug, Eq, PartialEq)]
47
pub(super) struct Isolation {
48
    /// The value of the Proxy-Authorization header.
49
    proxy_auth: Option<ProxyAuthorization>,
50
    /// The legacy X-Tor-Isolation token.
51
    x_tor_isolation: Option<String>,
52
    /// The up-to-date Tor-Isolation token.
53
    tor_isolation: Option<String>,
54
}
55

            
56
impl Isolation {
57
    /// Return true if no isolation field in this object is set.
58
    pub(super) fn is_empty(&self) -> bool {
59
        let Isolation {
60
            proxy_auth,
61
            x_tor_isolation,
62
            tor_isolation,
63
        } = self;
64
        proxy_auth.as_ref().is_none_or(ProxyAuthorization::is_empty)
65
            && x_tor_isolation.as_ref().is_none_or(String::is_empty)
66
            && tor_isolation.as_ref().is_none_or(String::is_empty)
67
    }
68
}
69

            
70
/// Constants and code for the HTTP headers we use.
71
mod hdr {
72
    pub(super) use http::header::{CONTENT_TYPE, HOST, PROXY_AUTHORIZATION, SERVER, VIA};
73

            
74
    /// Client-to-proxy: Which IP family should we use?
75
    pub(super) const TOR_FAMILY_PREFERENCE: &str = "Tor-Family-Preference";
76

            
77
    /// Client-To-Proxy: The ID of an RPC object to receive our request.
78
    pub(super) const TOR_RPC_TARGET: &str = "Tor-RPC-Target";
79

            
80
    /// Client-To-Proxy: An isolation token to use with our stream.
81
    /// (Legacy name.)
82
    pub(super) const X_TOR_STREAM_ISOLATION: &str = "X-Tor-Stream-Isolation";
83

            
84
    /// Client-To-Proxy: An isolation token to use with our stream.
85
    pub(super) const TOR_STREAM_ISOLATION: &str = "Tor-Stream-Isolation";
86

            
87
    /// Proxy-to-client: A list of the capabilities that this proxy provides.
88
    pub(super) const TOR_CAPABILITIES: &str = "Tor-Capabilities";
89

            
90
    /// Proxy-to-client: A machine-readable list of failure reasons.
91
    pub(super) const TOR_REQUEST_FAILED: &str = "Tor-Request-Failed";
92

            
93
    /// A list of all the headers that we support from client-to-proxy.
94
    ///
95
    /// Does not include headers that we check for HTTP conformance,
96
    /// but not for any other purpose.
97
    pub(super) const ALL_REQUEST_HEADERS: &[&str] = &[
98
        TOR_FAMILY_PREFERENCE,
99
        TOR_RPC_TARGET,
100
        X_TOR_STREAM_ISOLATION,
101
        TOR_STREAM_ISOLATION,
102
        // Can't use 'PROXY_AUTHORIZATION', since it isn't a str, and its as_str() isn't const.
103
        "Proxy-Authorization",
104
    ];
105

            
106
    /// Return the unique string-valued value of the header `name`;
107
    /// or None if the header doesn't exist,
108
    /// or an error if the header is duplicated or not UTF-8.
109
8
    pub(super) fn uniq_utf8(
110
8
        map: &http::HeaderMap,
111
8
        name: impl http::header::AsHeaderName,
112
8
    ) -> Result<Option<&str>, super::HttpConnectError> {
113
8
        let mut iter = map.get_all(name).iter();
114
8
        let val = match iter.next() {
115
8
            Some(v) => v,
116
            None => return Ok(None),
117
        };
118
8
        match iter.next() {
119
            Some(_) => Err(super::HttpConnectError::DuplicateHeader),
120
8
            None => val
121
8
                .to_str()
122
8
                .map(Some)
123
8
                .map_err(|_| super::HttpConnectError::HeaderNotUtf8),
124
        }
125
8
    }
126
}
127

            
128
/// Given a just-received TCP connection `S` on a HTTP proxy port, handle the
129
/// HTTP handshake and relay the connection over the Tor network.
130
///
131
/// Uses `isolation_info` to decide which circuits this connection
132
/// may use.  Requires that `isolation_info` is a pair listing the listener
133
/// id and the source address for the HTTP request.
134
#[instrument(skip_all, level = "trace")]
135
8
pub(super) async fn handle_http_conn<R, S>(
136
8
    context: super::ProxyContext<R>,
137
8
    stream: BufReader<S>,
138
8
    isolation_info: ListenerIsolation,
139
8
) -> crate::Result<()>
140
8
where
141
8
    R: Runtime,
142
8
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
143
8
{
144
    // NOTES:
145
    // * We _could_ use a timeout, but we trust that the client is not trying to DOS us.
146
    ServerBuilder::new()
147
        .half_close(false)
148
        .keep_alive(true)
149
        .max_headers(256)
150
        .max_buf_size(16 * 1024)
151
        .title_case_headers(true)
152
        .auto_date_header(false) // We omit the date header out of general principle.
153
        .serve_connection(
154
            FuturesIoCompat(stream),
155
8
            service_fn(|request| handle_http_request::<R, S>(request, &context, isolation_info)),
156
        )
157
        .with_upgrades()
158
        .await?;
159

            
160
    Ok(())
161
8
}
162

            
163
/// Handle a single HTTP request.
164
///
165
/// This function is invoked by hyper.
166
8
async fn handle_http_request<R, S>(
167
8
    request: Request,
168
8
    context: &ProxyContext<R>,
169
8
    listener_isolation: ListenerIsolation,
170
8
) -> Result<Response<Body>, anyhow::Error>
171
8
where
172
8
    R: Runtime,
173
8
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
174
8
{
175
    // Avoid cross-site attacks based on DNS forgery by validating that the Host
176
    // header is in fact localhost.  In these cases, we don't want to reply at all,
177
    // _even with an error message_, since our headers could be used to tell a hostile
178
    // webpage information about the local arti process.
179
    //
180
    // We don't do this for CONNECT requests, since those are forbidden by
181
    // XHR and JS fetch(), and since Host _will_ be non-localhost for those.
182
8
    if request.method() != Method::CONNECT {
183
8
        match hdr::uniq_utf8(request.headers(), hdr::HOST) {
184
            Err(e) => return Err(e).context("Host header invalid. Rejecting request."),
185
8
            Ok(Some(host)) if !host_is_localhost(host) => {
186
4
                return Err(anyhow!(
187
4
                    "Host header {host:?} was not localhost. Rejecting request."
188
4
                ));
189
            }
190
4
            Ok(_) => {}
191
        }
192
    }
193

            
194
4
    match *request.method() {
195
4
        Method::OPTIONS => handle_options_request(&request),
196
        Method::CONNECT => {
197
            handle_connect_request::<R, S>(request, context, listener_isolation).await
198
        }
199
        _ => Ok(ResponseBuilder::new()
200
            .status(StatusCode::NOT_IMPLEMENTED)
201
            .err(
202
                request.method(),
203
                format!("{} is not supported", request.method()),
204
            )?),
205
    }
206
8
}
207

            
208
/// Return an appropriate reply to the given OPTIONS request.
209
4
fn handle_options_request(request: &Request) -> Result<Response<Body>, anyhow::Error> {
210
    use hyper::body::Body as _;
211

            
212
4
    let target = request.uri().to_string();
213
4
    match target.as_str() {
214
4
        "*" => {}
215
        s if TorAddr::from(s).is_ok() => {}
216
        _ => {
217
            return Ok(ResponseBuilder::new()
218
                .status(StatusCode::BAD_REQUEST)
219
                .err(&Method::OPTIONS, "Target was not a valid address")?);
220
        }
221
    }
222
4
    if request.headers().contains_key(hdr::CONTENT_TYPE) {
223
        // RFC 9110 says that if a client wants to include a body with its OPTIONS request (!),
224
        // it must include a Content-Type header.  Therefore, we reject such requests.
225
        return Ok(ResponseBuilder::new()
226
            .status(StatusCode::BAD_REQUEST)
227
            .err(&Method::OPTIONS, "Unexpected Content-Type on OPTIONS")?);
228

            
229
        // TODO: It would be cool to detect nonempty bodies in other ways, though in practice
230
        // it should never come up.
231
4
    }
232
4
    if !request.body().is_end_stream() {
233
        return Ok(ResponseBuilder::new()
234
            .status(StatusCode::BAD_REQUEST)
235
            .err(&Method::OPTIONS, "Unexpected body on OPTIONS request")?);
236
4
    }
237

            
238
4
    Ok(ResponseBuilder::new()
239
4
        .header("Allow", "OPTIONS, CONNECT")
240
4
        .status(StatusCode::OK)
241
4
        .ok(&Method::OPTIONS)?)
242
4
}
243

            
244
/// Return an appropriate reply to the given CONNECT request.
245
async fn handle_connect_request<R, S>(
246
    request: Request,
247
    context: &ProxyContext<R>,
248
    listener_isolation: ListenerIsolation,
249
) -> anyhow::Result<Response<Body>>
250
where
251
    R: Runtime,
252
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
253
{
254
    match handle_connect_request_impl::<R, S>(request, context, listener_isolation).await {
255
        Ok(response) => Ok(response),
256
        Err(e) => Ok(e.try_into_response()?),
257
    }
258
}
259

            
260
/// Helper for handle_connect_request:
261
/// return an error type that can be converted into an HTTP message.
262
///
263
/// (This is a separate function to make error handling simpler.)
264
async fn handle_connect_request_impl<R, S>(
265
    request: Request,
266
    context: &ProxyContext<R>,
267
    listener_isolation: ListenerIsolation,
268
) -> Result<Response<Body>, HttpConnectError>
269
where
270
    R: Runtime,
271
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
272
{
273
    let target = request.uri().to_string();
274
    let tor_addr =
275
        TorAddr::from(&target).map_err(|e| HttpConnectError::InvalidStreamTarget(sv(target), e))?;
276

            
277
    let mut stream_prefs = StreamPrefs::default();
278
    set_family_preference(&mut stream_prefs, &tor_addr, request.headers())?;
279

            
280
    set_isolation(&mut stream_prefs, request.headers(), listener_isolation)?;
281

            
282
    let client = find_conn_target(
283
        context,
284
        hdr::uniq_utf8(request.headers(), hdr::TOR_RPC_TARGET)?,
285
    )?;
286

            
287
    // If we reach this point, the request looks okay, so we'll try to connect.
288
    let tor_stream = client
289
        .connect_with_prefs(&tor_addr, &stream_prefs)
290
        .await
291
        .map_err(|e| HttpConnectError::ConnectFailed(sv(tor_addr), e))?;
292

            
293
    // We have connected.  We need to launch a separate task to actually be the proxy, though,
294
    // since IIUC hyper::upgrade::on won't return an answer
295
    // until after the response is given to the client.
296
    context
297
        .tor_client
298
        .runtime()
299
        .spawn(async move {
300
            match transfer::<S>(request, tor_stream).await {
301
                Ok(()) => {}
302
                Err(e) => {
303
                    warn_report!(e, "Error while launching transfer");
304
                }
305
            }
306
        })
307
        .map_err(into_internal!("Unable to spawn transfer task"))?;
308

            
309
    ResponseBuilder::new()
310
        .status(StatusCode::OK)
311
        .ok(&Method::CONNECT)
312
}
313

            
314
/// Set the IP family preference in `prefs`.
315
fn set_family_preference(
316
    prefs: &mut StreamPrefs,
317
    addr: &TorAddr,
318
    headers: &http::HeaderMap,
319
) -> Result<(), HttpConnectError> {
320
    if let Some(val) = hdr::uniq_utf8(headers, hdr::TOR_FAMILY_PREFERENCE)? {
321
        match val.trim() {
322
            "ipv4-preferred" => prefs.ipv4_preferred(),
323
            "ipv6-preferred" => prefs.ipv6_preferred(),
324
            "ipv4-only" => prefs.ipv4_only(),
325
            "ipv6-only" => prefs.ipv6_only(),
326
            _ => return Err(HttpConnectError::InvalidFamilyPreference),
327
        };
328
    } else if let Some(ip) = addr.as_ip_address() {
329
        // TODO: Perhaps we should check unconditionally whether the IP address is consistent with header,
330
        // if one was given?  On the other hand, if the application tells us to make an IPV6-only
331
        // connection to an IPv4 address, it probably deserves what it gets.
332
        if ip.is_ipv4() {
333
            prefs.ipv4_only();
334
        } else {
335
            prefs.ipv6_only();
336
        }
337
    }
338

            
339
    Ok(())
340
}
341

            
342
/// Configure the stream isolation from the provided headers.
343
fn set_isolation(
344
    prefs: &mut StreamPrefs,
345
    headers: &http::HeaderMap,
346
    listener_isolation: ListenerIsolation,
347
) -> Result<(), HttpConnectError> {
348
    let proxy_auth =
349
        hdr::uniq_utf8(headers, hdr::PROXY_AUTHORIZATION)?.map(ProxyAuthorization::from_header);
350
    let x_tor_isolation = hdr::uniq_utf8(headers, hdr::X_TOR_STREAM_ISOLATION)?.map(str::to_owned);
351
    let tor_isolation = hdr::uniq_utf8(headers, hdr::TOR_STREAM_ISOLATION)?.map(str::to_owned);
352

            
353
    let isolation = super::ProvidedIsolation::Http(Isolation {
354
        proxy_auth,
355
        x_tor_isolation,
356
        tor_isolation,
357
    });
358

            
359
    let isolation = super::StreamIsolationKey(listener_isolation, isolation);
360
    prefs.set_isolation(isolation);
361

            
362
    Ok(())
363
}
364

            
365
/// An isolation value based on the Proxy-Authorization header.
366
#[derive(Debug, Clone, Eq, PartialEq)]
367
pub(super) enum ProxyAuthorization {
368
    /// The entire contents of the Proxy-Authorization header.
369
    Legacy(String),
370
    /// The decoded value of the basic authorization, with the user set to "tor-iso".
371
    Modern(Vec<u8>),
372
}
373

            
374
impl ProxyAuthorization {
375
    /// Return a ProxyAuthorization based on the value of the Proxy-Authorization header.
376
    ///
377
    /// Give a warning if the header is in the legacy (obsolete) format.
378
    fn from_header(value: &str) -> Self {
379
        if let Some(result) = Self::modern_from_header(value) {
380
            result
381
        } else {
382
            warn!(
383
                "{} header in obsolete format. If you want isolation, use {}, \
384
                 or {} with Basic authentication and username 'tor-iso'",
385
                hdr::PROXY_AUTHORIZATION,
386
                hdr::X_TOR_STREAM_ISOLATION,
387
                hdr::PROXY_AUTHORIZATION
388
            );
389
            Self::Legacy(value.to_owned())
390
        }
391
    }
392

            
393
    /// Helper: Try to return a Modern authorization value, if this is one.
394
    fn modern_from_header(value: &str) -> Option<Self> {
395
        use base64ct::Encoding as _;
396
        let value = value.trim_ascii();
397
        let (kind, value) = value.split_once(' ')?;
398
        if kind != "Basic" {
399
            return None;
400
        }
401
        let value = value.trim_ascii();
402
        // TODO: Is this the right format, or should we allow missing padding?
403
        let decoded = base64ct::Base64::decode_vec(value).ok()?;
404
        if decoded.starts_with(b"tor-iso:") {
405
            Some(ProxyAuthorization::Modern(decoded))
406
        } else {
407
            None
408
        }
409
    }
410

            
411
    /// Return true if this ProxyAuthorization has no authorization information.
412
    fn is_empty(&self) -> bool {
413
        match self {
414
            ProxyAuthorization::Legacy(s) => s.is_empty(),
415
            ProxyAuthorization::Modern(v) => v.is_empty(),
416
        }
417
    }
418
}
419

            
420
/// Look up the connection target given the value of an Tor-RPC-Target header.
421
#[cfg(feature = "rpc")]
422
fn find_conn_target<R: Runtime>(
423
    context: &ProxyContext<R>,
424
    rpc_target: Option<&str>,
425
) -> Result<ConnTarget<R>, HttpConnectError> {
426
    let Some(target_id) = rpc_target else {
427
        return Ok(ConnTarget::Client(Arc::clone(&context.tor_client)));
428
    };
429

            
430
    let Some(rpc_mgr) = &context.rpc_mgr else {
431
        return Err(HttpConnectError::NoRpcSupport);
432
    };
433

            
434
    let (context, object) = rpc_mgr
435
        .lookup_object(&rpc::ObjectId::from(target_id))
436
        .map_err(|_| HttpConnectError::RpcObjectNotFound)?;
437

            
438
    Ok(ConnTarget::Rpc { object, context })
439
}
440

            
441
/// Look up the connection target given the value of an Tor-RPC-Target header
442
//
443
// (This is the implementation when we have no RPC support.)
444
#[cfg(not(feature = "rpc"))]
445
fn find_conn_target<R: Runtime>(
446
    context: &ProxyContext<R>,
447
    rpc_target: Option<&str>,
448
) -> Result<arti_client::TorClient<R>, HttpConnectError> {
449
    if rpc_target.is_some() {
450
        Err(HttpConnectError::NoRpcSupport)
451
    } else {
452
        Ok(context.tor_client.clone())
453
    }
454
}
455

            
456
/// Extension trait on ResponseBuilder
457
trait RespBldExt {
458
    /// Return a response for a successful builder.
459
    fn ok(self, method: &Method) -> anyhow::Result<Response<Body>, HttpConnectError>;
460

            
461
    /// Return a response for an error message.
462
    fn err(
463
        self,
464
        method: &Method,
465
        message: impl Into<String>,
466
    ) -> Result<Response<Body>, HttpConnectError>;
467
}
468

            
469
impl RespBldExt for ResponseBuilder {
470
4
    fn ok(self, method: &Method) -> Result<Response<Body>, HttpConnectError> {
471
4
        let bld = add_common_headers(self, method);
472
4
        Ok(bld
473
4
            .body("".into())
474
4
            .map_err(into_internal!("Formatting HTTP response"))?)
475
4
    }
476

            
477
    fn err(
478
        self,
479
        method: &Method,
480
        message: impl Into<String>,
481
    ) -> Result<Response<Body>, HttpConnectError> {
482
        let bld = add_common_headers(self, method).header(hdr::CONTENT_TYPE, "text/plain");
483
        Ok(bld
484
            .body(message.into())
485
            .map_err(into_internal!("Formatting HTTP response"))?)
486
    }
487
}
488

            
489
/// Return a string representing our capabilities.
490
4
fn capabilities() -> &'static str {
491
    use std::sync::LazyLock;
492
2
    static CAPS: LazyLock<String> = LazyLock::new(|| {
493
2
        let mut caps = hdr::ALL_REQUEST_HEADERS.to_vec();
494
2
        caps.sort();
495
2
        caps.join(" ")
496
2
    });
497

            
498
4
    CAPS.as_str()
499
4
}
500

            
501
/// Add all common headers to the builder `bld`, and return a new builder.
502
4
fn add_common_headers(mut bld: ResponseBuilder, method: &Method) -> ResponseBuilder {
503
4
    bld = bld.header(hdr::TOR_CAPABILITIES, capabilities());
504
4
    if let (Some(software), Some(version)) = (
505
4
        option_env!("CARGO_PKG_NAME"),
506
4
        option_env!("CARGO_PKG_VERSION"),
507
    ) {
508
4
        if method == Method::CONNECT {
509
            bld = bld.header(
510
                hdr::VIA,
511
                format!("tor/1.0 tor-network ({software} {version})"),
512
            );
513
4
        } else {
514
4
            bld = bld.header(hdr::SERVER, format!("tor/1.0 ({software} {version})"));
515
4
        }
516
    }
517
4
    bld
518
4
}
519

            
520
/// An error that occurs during an HTTP CONNECT attempt, which can (usually)
521
/// be reported to the client.
522
#[derive(Clone, Debug, thiserror::Error)]
523
enum HttpConnectError {
524
    /// Tried to connect to an invalid stream target.
525
    #[error("Invalid target address {0:?}")]
526
    InvalidStreamTarget(Sv<String>, #[source] arti_client::TorAddrError),
527

            
528
    /// We found a duplicate HTTP header that we do not allow.
529
    ///
530
    /// (We only enforce this for the headers that we look at ourselves.)
531
    #[error("Duplicate HTTP header found.")]
532
    DuplicateHeader,
533

            
534
    /// We tried to found an HTTP header whose value wasn't encode as UTF-8.
535
    ///
536
    /// (We only enforce this for the headers that we look at ourselves.)
537
    #[error("HTTP header value was not in UTF-8")]
538
    HeaderNotUtf8,
539

            
540
    /// The Tor-Family-Preference header wasn't as expected.
541
    #[error("Unrecognized value for {}", hdr::TOR_FAMILY_PREFERENCE)]
542
    InvalidFamilyPreference,
543

            
544
    /// The user asked to use an RPC object, but we don't support RPC.
545
    #[error(
546
        "Found {} header, but we are running without RPC support",
547
        hdr::TOR_RPC_TARGET
548
    )]
549
    NoRpcSupport,
550

            
551
    /// The user asked to use an RPC object, but we didn't find the one they wanted.
552
    #[error("RPC target object not found")]
553
    RpcObjectNotFound,
554

            
555
    /// arti_client was unable to connect to a stream target.
556
    #[error("Unable to connect to {0}")]
557
    ConnectFailed(Sv<TorAddr>, #[source] ClientError),
558

            
559
    /// We encountered an internal error.
560
    #[error("Internal error while handling request")]
561
    Internal(#[from] tor_error::Bug),
562
}
563

            
564
impl HasKind for HttpConnectError {
565
    fn kind(&self) -> ErrorKind {
566
        use ErrorKind as EK;
567
        use HttpConnectError as HCE;
568
        match self {
569
            HCE::InvalidStreamTarget(_, _)
570
            | HCE::DuplicateHeader
571
            | HCE::HeaderNotUtf8
572
            | HCE::InvalidFamilyPreference
573
            | HCE::RpcObjectNotFound => EK::LocalProtocolViolation,
574
            HCE::NoRpcSupport => EK::FeatureDisabled,
575
            HCE::ConnectFailed(_, e) => e.kind(),
576
            HCE::Internal(e) => e.kind(),
577
        }
578
    }
579
}
580

            
581
impl HttpConnectError {
582
    /// Return an appropriate HTTP status code for this error.
583
    fn status_code(&self) -> StatusCode {
584
        use HttpConnectError as HCE; // Not a Joyce reference
585
        use StatusCode as SC;
586
        if let Some(end_reason) = self.remote_end_reason() {
587
            return end_reason_to_http_status(end_reason);
588
        }
589
        match self {
590
            HCE::InvalidStreamTarget(_, _)
591
            | HCE::DuplicateHeader
592
            | HCE::HeaderNotUtf8
593
            | HCE::InvalidFamilyPreference
594
            | HCE::RpcObjectNotFound
595
            | HCE::NoRpcSupport => SC::BAD_REQUEST,
596
            HCE::ConnectFailed(_, e) => e.kind().http_status_code(),
597
            HCE::Internal(e) => e.kind().http_status_code(),
598
        }
599
    }
600

            
601
    /// If possible, return a response that we should give to this error.
602
    fn try_into_response(self) -> Result<Response<Body>, HttpConnectError> {
603
        let error_kind = self.kind();
604
        let end_reason = self.remote_end_reason();
605
        let status_code = self.status_code();
606
        let mut request_failed = format!("arti/{error_kind:?}");
607
        if let Some(end_reason) = end_reason {
608
            request_failed.push_str(&format!(" end/{end_reason}"));
609
        }
610

            
611
        ResponseBuilder::new()
612
            .status(status_code)
613
            .header(hdr::TOR_REQUEST_FAILED, request_failed)
614
            .err(&Method::CONNECT, self.report().to_string())
615
    }
616

            
617
    /// Return the end reason for this error, if this error does in fact represent an END message
618
    /// from the remote side of a stream.
619
    //
620
    // TODO: This function is a bit fragile; it forces us to use APIs from tor-proto and
621
    // tor-cell that are not re-exported from arti-client.  It also relies on the fact that
622
    // there is a single error type way down in `tor-proto` representing a received END message.
623
    fn remote_end_reason(&self) -> Option<tor_cell::relaycell::msg::EndReason> {
624
        use tor_proto::Error::EndReceived;
625
        if let Some(EndReceived(reason)) = super::extract_proto_err(self) {
626
            Some(*reason)
627
        } else {
628
            None
629
        }
630
    }
631
}
632

            
633
/// Return the appropriate HTTP status code for a remote END reason.
634
///
635
/// Return `None` if the END reason is unrecognized and we should use the `ErrorKind`
636
///
637
/// (We  _could_ use the ErrorKind unconditionally,
638
/// but the mapping from END reason to ErrorKind is [given in the spec][spec],
639
/// so we try to obey it.)
640
///
641
/// [spec]: https://spec.torproject.org/http-connect.html#error-codes
642
fn end_reason_to_http_status(end_reason: tor_cell::relaycell::msg::EndReason) -> StatusCode {
643
    use StatusCode as S;
644
    use tor_cell::relaycell::msg::EndReason as R;
645
    match end_reason {
646
        //
647
        R::CONNECTREFUSED => S::FORBIDDEN, // 403
648
        // 500: Internal server error.
649
        R::MISC | R::NOTDIRECTORY => S::INTERNAL_SERVER_ERROR,
650

            
651
        // 502: Bad Gateway.
652
        R::DESTROY | R::DONE | R::HIBERNATING | R::INTERNAL | R::RESOURCELIMIT | R::TORPROTOCOL => {
653
            S::BAD_GATEWAY
654
        }
655
        // 503: Service unavailable
656
        R::CONNRESET | R::EXITPOLICY | R::NOROUTE | R::RESOLVEFAILED => S::SERVICE_UNAVAILABLE,
657

            
658
        // 504: Gateway timeout.
659
        R::TIMEOUT => S::GATEWAY_TIMEOUT,
660

            
661
        // This is possible if the other side sent an unrecognized error code.
662
        _ => S::INTERNAL_SERVER_ERROR, // 500
663
    }
664
}
665

            
666
/// Recover the original stream from a [`hyper::upgrade::Upgraded`].
667
fn deconstruct_upgrade<S>(upgraded: hyper::upgrade::Upgraded) -> Result<BufReader<S>, anyhow::Error>
668
where
669
    S: AsyncRead + AsyncWrite + Unpin + 'static,
670
{
671
    let parts: hyper::upgrade::Parts<FuturesIoCompat<BufReader<S>>> = upgraded
672
        .downcast()
673
        .map_err(|_| anyhow!("downcast failed!"))?;
674
    let hyper::upgrade::Parts { io, read_buf, .. } = parts;
675
    if !read_buf.is_empty() {
676
        // TODO Figure out whether this can happen, due to possible race conditions if the client
677
        // gets the OK before we check this?.
678
        return Err(anyhow!(
679
            "Extraneous data on hyper buffer after upgrade to proxy mode"
680
        ));
681
    }
682
    let io: BufReader<S> = io.0;
683
    Ok(io)
684
}
685

            
686
/// Recover the application stream from `request`, and launch tasks to transfer data between the application and
687
/// the `tor_stream`.
688
async fn transfer<S>(request: Request, tor_stream: arti_client::DataStream) -> anyhow::Result<()>
689
where
690
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
691
{
692
    let upgraded = hyper::upgrade::on(request)
693
        .await
694
        .context("Unable to upgrade connection")?;
695
    let app_stream: BufReader<S> = deconstruct_upgrade(upgraded)?;
696
    let tor_stream = BufReader::with_capacity(super::APP_STREAM_BUF_LEN, tor_stream);
697

            
698
    // Finally. relay traffic between
699
    // the application stream and the tor stream, forever.
700
    let _ = futures_copy::copy_buf_bidirectional(
701
        app_stream,
702
        tor_stream,
703
        futures_copy::eof::Close,
704
        futures_copy::eof::Close,
705
    )
706
    .await?;
707

            
708
    Ok(())
709
}
710

            
711
/// Return true if `host` is a possible value for a Host header addressing localhost.
712
44
fn host_is_localhost(host: &str) -> bool {
713
44
    if let Ok(addr) = host.parse::<std::net::SocketAddr>() {
714
12
        addr.ip().is_loopback()
715
32
    } else if let Ok(ip) = host.parse::<std::net::IpAddr>() {
716
6
        ip.is_loopback()
717
26
    } else if let Some((addr, port)) = host.split_once(':') {
718
14
        port.parse::<std::num::NonZeroU16>().is_ok() && addr.eq_ignore_ascii_case("localhost")
719
    } else {
720
12
        host.eq_ignore_ascii_case("localhost")
721
    }
722
44
}
723

            
724
/// Helper module: Make `futures` types usable by `hyper`.
725
//
726
// TODO: We may want to expose this as a separate crate, or move it into tor-async-utils,
727
// if we turn out to need it elsewhere.
728
mod hyper_futures_io {
729
    use pin_project::pin_project;
730
    use std::{
731
        io,
732
        pin::Pin,
733
        task::{Context, Poll, ready},
734
    };
735

            
736
    use hyper::rt::ReadBufCursor;
737

            
738
    /// A wrapper around an AsyncBufRead + AsyncWrite to implement traits required by hyper.
739
    #[derive(Debug)]
740
    #[pin_project]
741
    pub(super) struct FuturesIoCompat<T>(#[pin] pub(super) T);
742

            
743
    impl<T> hyper::rt::Read for FuturesIoCompat<T>
744
    where
745
        // We require AsyncBufRead here it is a good match for ReadBufCursor::put_slice.
746
        T: futures::io::AsyncBufRead,
747
    {
748
16
        fn poll_read(
749
16
            self: Pin<&mut Self>,
750
16
            cx: &mut Context<'_>,
751
16
            mut buf: ReadBufCursor<'_>,
752
16
        ) -> Poll<Result<(), io::Error>> {
753
16
            let mut this = self.project();
754

            
755
16
            let available: &[u8] = ready!(this.0.as_mut().poll_fill_buf(cx))?;
756
8
            let n_available = available.len();
757

            
758
8
            if !available.is_empty() {
759
8
                buf.put_slice(available);
760
8
                this.0.consume(n_available);
761
8
            }
762

            
763
            // This means either "data arrived" or "EOF" depending on whether we added new bytes.
764
8
            Poll::Ready(Ok(()))
765
16
        }
766
    }
767

            
768
    impl<T> hyper::rt::Write for FuturesIoCompat<T>
769
    where
770
        T: futures::io::AsyncWrite,
771
    {
772
4
        fn poll_write(
773
4
            self: Pin<&mut Self>,
774
4
            cx: &mut Context<'_>,
775
4
            buf: &[u8],
776
4
        ) -> Poll<Result<usize, std::io::Error>> {
777
4
            self.project().0.poll_write(cx, buf)
778
4
        }
779

            
780
4
        fn poll_flush(
781
4
            self: Pin<&mut Self>,
782
4
            cx: &mut Context<'_>,
783
4
        ) -> Poll<Result<(), std::io::Error>> {
784
4
            self.project().0.poll_flush(cx)
785
4
        }
786

            
787
4
        fn poll_shutdown(
788
4
            self: Pin<&mut Self>,
789
4
            cx: &mut Context<'_>,
790
4
        ) -> Poll<Result<(), std::io::Error>> {
791
4
            self.project().0.poll_close(cx)
792
4
        }
793
    }
794
}
795

            
796
#[cfg(test)]
797
mod test {
798
    // @@ begin test lint list maintained by maint/add_warning @@
799
    #![allow(clippy::bool_assert_comparison)]
800
    #![allow(clippy::clone_on_copy)]
801
    #![allow(clippy::dbg_macro)]
802
    #![allow(clippy::mixed_attributes_style)]
803
    #![allow(clippy::print_stderr)]
804
    #![allow(clippy::print_stdout)]
805
    #![allow(clippy::single_char_pattern)]
806
    #![allow(clippy::unwrap_used)]
807
    #![allow(clippy::unchecked_time_subtraction)]
808
    #![allow(clippy::useless_vec)]
809
    #![allow(clippy::needless_pass_by_value)]
810
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
811

            
812
    use arti_client::{BootstrapBehavior, TorClient, config::TorClientConfigBuilder};
813
    use futures::{AsyncReadExt as _, AsyncWriteExt as _};
814
    use tor_rtmock::{MockRuntime, io::stream_pair};
815

            
816
    use super::*;
817

            
818
    // Make sure that HeaderMap is case-insensitive as the documentation implies.
819
    #[test]
820
    fn headermap_casei() {
821
        use http::header::{HeaderMap, HeaderValue};
822
        let mut hm = HeaderMap::new();
823
        hm.append(
824
            "my-head-is-a-house-for",
825
            HeaderValue::from_str("a-secret").unwrap(),
826
        );
827
        assert_eq!(
828
            hm.get("My-Head-Is-A-House-For").unwrap().as_bytes(),
829
            b"a-secret"
830
        );
831
        assert_eq!(
832
            hm.get("MY-HEAD-IS-A-HOUSE-FOR").unwrap().as_bytes(),
833
            b"a-secret"
834
        );
835
    }
836

            
837
    #[test]
838
    fn host_header_localhost() {
839
        assert_eq!(host_is_localhost("localhost"), true);
840
        assert_eq!(host_is_localhost("localhost:9999"), true);
841
        assert_eq!(host_is_localhost("localHOSt:9999"), true);
842
        assert_eq!(host_is_localhost("127.0.0.1:9999"), true);
843
        assert_eq!(host_is_localhost("[::1]:9999"), true);
844
        assert_eq!(host_is_localhost("127.1.2.3:1234"), true);
845
        assert_eq!(host_is_localhost("127.0.0.1"), true);
846
        assert_eq!(host_is_localhost("::1"), true);
847

            
848
        assert_eq!(host_is_localhost("[::1]"), false); // not in the right format!
849
        assert_eq!(host_is_localhost("www.torproject.org"), false);
850
        assert_eq!(host_is_localhost("www.torproject.org:1234"), false);
851
        assert_eq!(host_is_localhost("localhost:0"), false);
852
        assert_eq!(host_is_localhost("localhost:999999"), false);
853
        assert_eq!(host_is_localhost("plocalhost:1234"), false);
854
        assert_eq!(host_is_localhost("[::0]:1234"), false);
855
        assert_eq!(host_is_localhost("192.0.2.55:1234"), false);
856
        assert_eq!(host_is_localhost("3fff::1"), false);
857
        assert_eq!(host_is_localhost("[3fff::1]:1234"), false);
858
    }
859

            
860
    fn interactive_test_setup(
861
        rt: &MockRuntime,
862
    ) -> anyhow::Result<(
863
        tor_rtmock::io::LocalStream,
864
        impl Future<Output = anyhow::Result<()>>,
865
        tempfile::TempDir,
866
    )> {
867
        let (s1, s2) = stream_pair();
868
        let s1: BufReader<_> = BufReader::new(s1);
869

            
870
        let iso: ListenerIsolation = (7, "127.0.0.1".parse().unwrap());
871
        let dir = tempfile::TempDir::new().unwrap();
872
        let cfg = TorClientConfigBuilder::from_directories(
873
            dir.as_ref().join("state"),
874
            dir.as_ref().join("cache"),
875
        )
876
        .build()
877
        .unwrap();
878
        let tor_client = TorClient::with_runtime(rt.clone())
879
            .config(cfg)
880
            .bootstrap_behavior(BootstrapBehavior::Manual)
881
            .create_unbootstrapped()?;
882
        let context: ProxyContext<_> = ProxyContext {
883
            tor_client,
884
            #[cfg(feature = "rpc")]
885
            rpc_mgr: None,
886
            protocols: crate::proxy::ListenProtocols::SocksAndHttpConnect,
887
        };
888
        let handle = rt.spawn_join("HTTP Handler", handle_http_conn(context, s1, iso));
889
        Ok((s2, handle, dir))
890
    }
891

            
892
    #[test]
893
    fn successful_options_test() -> anyhow::Result<()> {
894
        // Try an OPTIONS request and make sure we get a plausible-looking answer.
895
        //
896
        // (This test is mostly here to make sure that invalid_host_test() isn't failing because
897
        // of anything besides the Host header.)
898
        MockRuntime::try_test_with_various(async |rt| -> anyhow::Result<()> {
899
            let (mut s, join, _dir) = interactive_test_setup(&rt)?;
900

            
901
            s.write_all(b"OPTIONS * HTTP/1.0\r\nHost: localhost\r\n\r\n")
902
                .await?;
903
            let mut buf = Vec::new();
904
            let _n_read = s.read_to_end(&mut buf).await?;
905
            let () = join.await?;
906

            
907
            let reply = std::str::from_utf8(&buf)?;
908
            assert!(dbg!(reply).starts_with("HTTP/1.0 200 OK\r\n"));
909

            
910
            Ok(())
911
        })
912
    }
913

            
914
    #[test]
915
    fn invalid_host_test() -> anyhow::Result<()> {
916
        // Try a hostname that looks like a CSRF attempt and make sure that we discard it without
917
        // any reply.
918
        MockRuntime::try_test_with_various(async |rt| -> anyhow::Result<()> {
919
            let (mut s, join, _dir) = interactive_test_setup(&rt)?;
920

            
921
            s.write_all(b"OPTIONS * HTTP/1.0\r\nHost: csrf.example.com\r\n\r\n")
922
                .await?;
923
            let mut buf = Vec::new();
924
            let n_read = s.read_to_end(&mut buf).await?;
925
            let http_outcome = join.await;
926

            
927
            assert_eq!(n_read, 0);
928
            assert!(buf.is_empty());
929
            assert!(http_outcome.is_err());
930

            
931
            let error_msg = http_outcome.unwrap_err().source().unwrap().to_string();
932
            assert_eq!(
933
                error_msg,
934
                r#"Host header "csrf.example.com" was not localhost. Rejecting request."#
935
            );
936

            
937
            Ok(())
938
        })
939
    }
940
}