1
//! Implement an HTTP1 CONNECT proxy using `hyper`.
2
//!
3
//! Note that Tor defines several extensions to HTTP CONNECT;
4
//! See [the spec](spec.torproject.org/http-connect.html)
5
//! for more information.
6

            
7
use super::{ListenerIsolation, ProxyContext};
8
use anyhow::{Context as _, anyhow};
9
use arti_client::{StreamPrefs, TorAddr};
10
use futures::{AsyncRead, AsyncWrite, io::BufReader};
11
use http::{Method, StatusCode, response::Builder as ResponseBuilder};
12
use hyper::{Response, server::conn::http1::Builder as ServerBuilder, service::service_fn};
13
use safelog::{Sensitive as Sv, sensitive as sv};
14
use tor_error::{ErrorKind, ErrorReport as _, HasKind, into_internal, warn_report};
15
use tor_rtcompat::Runtime;
16
use tor_rtcompat::SpawnExt as _;
17
use tracing::{instrument, warn};
18

            
19
use hyper_futures_io::FuturesIoCompat;
20

            
21
#[cfg(feature = "rpc")]
22
use {crate::rpc::conntarget::ConnTarget, tor_rpcbase as rpc};
23

            
24
cfg_if::cfg_if! {
25
    if #[cfg(feature="rpc")] {
26
        /// Error type returned from a failed connect_with_prefs.
27
        type ClientError = Box<dyn arti_client::rpc::ClientConnectionError>;
28
    } else {
29
        /// Error type returned from a failed connect_with_prefs.
30
        type ClientError = arti_client::Error;
31
    }
32
}
33

            
34
/// Request type that we receive from Hyper.
35
type Request = hyper::Request<hyper::body::Incoming>;
36

            
37
/// We use "String" as our body type, since we only return a body on error,
38
/// in which case it already starts life as a formatted string.
39
///
40
/// (We could use () or `Empty` for our (200 OK) replies,
41
/// but empty strings are cheap enough that it isn't worth it.)
42
type Body = String;
43

            
44
/// A value used to isolate streams received via HTTP CONNECT.
45
#[derive(Clone, Debug, Eq, PartialEq)]
46
pub(super) struct Isolation {
47
    /// The value of the Proxy-Authorization header.
48
    proxy_auth: Option<ProxyAuthorization>,
49
    /// The legacy X-Tor-Isolation token.
50
    x_tor_isolation: Option<String>,
51
    /// The up-to-date Tor-Isolation token.
52
    tor_isolation: Option<String>,
53
}
54

            
55
impl Isolation {
56
    /// Return true if no isolation field in this object is set.
57
    pub(super) fn is_empty(&self) -> bool {
58
        let Isolation {
59
            proxy_auth,
60
            x_tor_isolation,
61
            tor_isolation,
62
        } = self;
63
        proxy_auth.as_ref().is_none_or(ProxyAuthorization::is_empty)
64
            && x_tor_isolation.as_ref().is_none_or(String::is_empty)
65
            && tor_isolation.as_ref().is_none_or(String::is_empty)
66
    }
67
}
68

            
69
/// Constants and code for the HTTP headers we use.
70
mod hdr {
71
    pub(super) use http::header::{CONTENT_TYPE, HOST, PROXY_AUTHORIZATION, SERVER, VIA};
72

            
73
    /// Client-to-proxy: Which IP family should we use?
74
    pub(super) const TOR_FAMILY_PREFERENCE: &str = "Tor-Family-Preference";
75

            
76
    /// Client-To-Proxy: The ID of an RPC object to receive our request.
77
    pub(super) const TOR_RPC_TARGET: &str = "Tor-RPC-Target";
78

            
79
    /// Client-To-Proxy: An isolation token to use with our stream.
80
    /// (Legacy name.)
81
    pub(super) const X_TOR_STREAM_ISOLATION: &str = "X-Tor-Stream-Isolation";
82

            
83
    /// Client-To-Proxy: An isolation token to use with our stream.
84
    pub(super) const TOR_STREAM_ISOLATION: &str = "Tor-Stream-Isolation";
85

            
86
    /// Proxy-to-client: A list of the capabilities that this proxy provides.
87
    pub(super) const TOR_CAPABILITIES: &str = "Tor-Capabilities";
88

            
89
    /// Proxy-to-client: A machine-readable list of failure reasons.
90
    pub(super) const TOR_REQUEST_FAILED: &str = "Tor-Request-Failed";
91

            
92
    /// A list of all the headers that we support from client-to-proxy.
93
    ///
94
    /// Does not include headers that we check for HTTP conformance,
95
    /// but not for any other purpose.
96
    pub(super) const ALL_REQUEST_HEADERS: &[&str] = &[
97
        TOR_FAMILY_PREFERENCE,
98
        TOR_RPC_TARGET,
99
        X_TOR_STREAM_ISOLATION,
100
        TOR_STREAM_ISOLATION,
101
        // Can't use 'PROXY_AUTHORIZATION', since it isn't a str, and its as_str() isn't const.
102
        "Proxy-Authorization",
103
    ];
104

            
105
    /// Return the unique string-valued value of the header `name`;
106
    /// or None if the header doesn't exist,
107
    /// or an error if the header is duplicated or not UTF-8.
108
8
    pub(super) fn uniq_utf8(
109
8
        map: &http::HeaderMap,
110
8
        name: impl http::header::AsHeaderName,
111
8
    ) -> Result<Option<&str>, super::HttpConnectError> {
112
8
        let mut iter = map.get_all(name).iter();
113
8
        let val = match iter.next() {
114
8
            Some(v) => v,
115
            None => return Ok(None),
116
        };
117
8
        match iter.next() {
118
            Some(_) => Err(super::HttpConnectError::DuplicateHeader),
119
8
            None => val
120
8
                .to_str()
121
8
                .map(Some)
122
8
                .map_err(|_| super::HttpConnectError::HeaderNotUtf8),
123
        }
124
8
    }
125
}
126

            
127
/// Given a just-received TCP connection `S` on a HTTP proxy port, handle the
128
/// HTTP handshake and relay the connection over the Tor network.
129
///
130
/// Uses `isolation_info` to decide which circuits this connection
131
/// may use.  Requires that `isolation_info` is a pair listing the listener
132
/// id and the source address for the HTTP request.
133
#[instrument(skip_all, level = "trace")]
134
8
pub(super) async fn handle_http_conn<R, S>(
135
8
    context: super::ProxyContext<R>,
136
8
    stream: BufReader<S>,
137
8
    isolation_info: ListenerIsolation,
138
8
) -> crate::Result<()>
139
8
where
140
8
    R: Runtime,
141
8
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
142
8
{
143
    // NOTES:
144
    // * We _could_ use a timeout, but we trust that the client is not trying to DOS us.
145
    ServerBuilder::new()
146
        .half_close(false)
147
        .keep_alive(true)
148
        .max_headers(256)
149
        .max_buf_size(16 * 1024)
150
        .title_case_headers(true)
151
        .auto_date_header(false) // We omit the date header out of general principle.
152
        .serve_connection(
153
            FuturesIoCompat(stream),
154
8
            service_fn(|request| handle_http_request::<R, S>(request, &context, isolation_info)),
155
        )
156
        .with_upgrades()
157
        .await?;
158

            
159
    Ok(())
160
8
}
161

            
162
/// Handle a single HTTP request.
163
///
164
/// This function is invoked by hyper.
165
8
async fn handle_http_request<R, S>(
166
8
    request: Request,
167
8
    context: &ProxyContext<R>,
168
8
    listener_isolation: ListenerIsolation,
169
8
) -> Result<Response<Body>, anyhow::Error>
170
8
where
171
8
    R: Runtime,
172
8
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
173
8
{
174
    // Avoid cross-site attacks based on DNS forgery by validating that the Host
175
    // header is in fact localhost.  In these cases, we don't want to reply at all,
176
    // _even with an error message_, since our headers could be used to tell a hostile
177
    // webpage information about the local arti process.
178
    //
179
    // We don't do this for CONNECT requests, since those are forbidden by
180
    // XHR and JS fetch(), and since Host _will_ be non-localhost for those.
181
8
    if request.method() != Method::CONNECT {
182
8
        match hdr::uniq_utf8(request.headers(), hdr::HOST) {
183
            Err(e) => return Err(e).context("Host header invalid. Rejecting request."),
184
8
            Ok(Some(host)) if !host_is_localhost(host) => {
185
4
                return Err(anyhow!(
186
4
                    "Host header {host:?} was not localhost. Rejecting request."
187
4
                ));
188
            }
189
4
            Ok(_) => {}
190
        }
191
    }
192

            
193
4
    match *request.method() {
194
4
        Method::OPTIONS => handle_options_request(&request),
195
        Method::CONNECT => {
196
            handle_connect_request::<R, S>(request, context, listener_isolation).await
197
        }
198
        _ => Ok(ResponseBuilder::new()
199
            .status(StatusCode::NOT_IMPLEMENTED)
200
            .err(
201
                request.method(),
202
                format!("{} is not supported", request.method()),
203
            )?),
204
    }
205
8
}
206

            
207
/// Return an appropriate reply to the given OPTIONS request.
208
4
fn handle_options_request(request: &Request) -> Result<Response<Body>, anyhow::Error> {
209
    use hyper::body::Body as _;
210

            
211
4
    let target = request.uri().to_string();
212
4
    match target.as_str() {
213
4
        "*" => {}
214
        s if TorAddr::from(s).is_ok() => {}
215
        _ => {
216
            return Ok(ResponseBuilder::new()
217
                .status(StatusCode::BAD_REQUEST)
218
                .err(&Method::OPTIONS, "Target was not a valid address")?);
219
        }
220
    }
221
4
    if request.headers().contains_key(hdr::CONTENT_TYPE) {
222
        // RFC 9110 says that if a client wants to include a body with its OPTIONS request (!),
223
        // it must include a Content-Type header.  Therefore, we reject such requests.
224
        return Ok(ResponseBuilder::new()
225
            .status(StatusCode::BAD_REQUEST)
226
            .err(&Method::OPTIONS, "Unexpected Content-Type on OPTIONS")?);
227

            
228
        // TODO: It would be cool to detect nonempty bodies in other ways, though in practice
229
        // it should never come up.
230
4
    }
231
4
    if !request.body().is_end_stream() {
232
        return Ok(ResponseBuilder::new()
233
            .status(StatusCode::BAD_REQUEST)
234
            .err(&Method::OPTIONS, "Unexpected body on OPTIONS request")?);
235
4
    }
236

            
237
4
    Ok(ResponseBuilder::new()
238
4
        .header("Allow", "OPTIONS, CONNECT")
239
4
        .status(StatusCode::OK)
240
4
        .ok(&Method::OPTIONS)?)
241
4
}
242

            
243
/// Return an appropriate reply to the given CONNECT request.
244
async fn handle_connect_request<R, S>(
245
    request: Request,
246
    context: &ProxyContext<R>,
247
    listener_isolation: ListenerIsolation,
248
) -> anyhow::Result<Response<Body>>
249
where
250
    R: Runtime,
251
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
252
{
253
    match handle_connect_request_impl::<R, S>(request, context, listener_isolation).await {
254
        Ok(response) => Ok(response),
255
        Err(e) => Ok(e.try_into_response()?),
256
    }
257
}
258

            
259
/// Helper for handle_connect_request:
260
/// return an error type that can be converted into an HTTP message.
261
///
262
/// (This is a separate function to make error handling simpler.)
263
async fn handle_connect_request_impl<R, S>(
264
    request: Request,
265
    context: &ProxyContext<R>,
266
    listener_isolation: ListenerIsolation,
267
) -> Result<Response<Body>, HttpConnectError>
268
where
269
    R: Runtime,
270
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
271
{
272
    let target = request.uri().to_string();
273
    let tor_addr =
274
        TorAddr::from(&target).map_err(|e| HttpConnectError::InvalidStreamTarget(sv(target), e))?;
275

            
276
    let mut stream_prefs = StreamPrefs::default();
277
    set_family_preference(&mut stream_prefs, &tor_addr, request.headers())?;
278

            
279
    set_isolation(&mut stream_prefs, request.headers(), listener_isolation)?;
280

            
281
    let client = find_conn_target(
282
        context,
283
        hdr::uniq_utf8(request.headers(), hdr::TOR_RPC_TARGET)?,
284
    )?;
285

            
286
    // If we reach this point, the request looks okay, so we'll try to connect.
287
    let tor_stream = client
288
        .connect_with_prefs(&tor_addr, &stream_prefs)
289
        .await
290
        .map_err(|e| HttpConnectError::ConnectFailed(sv(tor_addr), e))?;
291

            
292
    // We have connected.  We need to launch a separate task to actually be the proxy, though,
293
    // since IIUC hyper::upgrade::on won't return an answer
294
    // until after the response is given to the client.
295
    context
296
        .tor_client
297
        .runtime()
298
        .spawn(async move {
299
            match transfer::<S>(request, tor_stream).await {
300
                Ok(()) => {}
301
                Err(e) => {
302
                    warn_report!(e, "Error while launching transfer");
303
                }
304
            }
305
        })
306
        .map_err(into_internal!("Unable to spawn transfer task"))?;
307

            
308
    ResponseBuilder::new()
309
        .status(StatusCode::OK)
310
        .ok(&Method::CONNECT)
311
}
312

            
313
/// Set the IP family preference in `prefs`.
314
fn set_family_preference(
315
    prefs: &mut StreamPrefs,
316
    addr: &TorAddr,
317
    headers: &http::HeaderMap,
318
) -> Result<(), HttpConnectError> {
319
    if let Some(val) = hdr::uniq_utf8(headers, hdr::TOR_FAMILY_PREFERENCE)? {
320
        match val.trim() {
321
            "ipv4-preferred" => prefs.ipv4_preferred(),
322
            "ipv6-preferred" => prefs.ipv6_preferred(),
323
            "ipv4-only" => prefs.ipv4_only(),
324
            "ipv6-only" => prefs.ipv6_only(),
325
            _ => return Err(HttpConnectError::InvalidFamilyPreference),
326
        };
327
    } else if let Some(ip) = addr.as_ip_address() {
328
        // TODO: Perhaps we should check unconditionally whether the IP address is consistent with header,
329
        // if one was given?  On the other hand, if the application tells us to make an IPV6-only
330
        // connection to an IPv4 address, it probably deserves what it gets.
331
        if ip.is_ipv4() {
332
            prefs.ipv4_only();
333
        } else {
334
            prefs.ipv6_only();
335
        }
336
    }
337

            
338
    Ok(())
339
}
340

            
341
/// Configure the stream isolation from the provided headers.
342
fn set_isolation(
343
    prefs: &mut StreamPrefs,
344
    headers: &http::HeaderMap,
345
    listener_isolation: ListenerIsolation,
346
) -> Result<(), HttpConnectError> {
347
    let proxy_auth =
348
        hdr::uniq_utf8(headers, hdr::PROXY_AUTHORIZATION)?.map(ProxyAuthorization::from_header);
349
    let x_tor_isolation = hdr::uniq_utf8(headers, hdr::X_TOR_STREAM_ISOLATION)?.map(str::to_owned);
350
    let tor_isolation = hdr::uniq_utf8(headers, hdr::TOR_STREAM_ISOLATION)?.map(str::to_owned);
351

            
352
    let isolation = super::ProvidedIsolation::Http(Isolation {
353
        proxy_auth,
354
        x_tor_isolation,
355
        tor_isolation,
356
    });
357

            
358
    let isolation = super::StreamIsolationKey(listener_isolation, isolation);
359
    prefs.set_isolation(isolation);
360

            
361
    Ok(())
362
}
363

            
364
/// An isolation value based on the Proxy-Authorization header.
365
#[derive(Debug, Clone, Eq, PartialEq)]
366
pub(super) enum ProxyAuthorization {
367
    /// The entire contents of the Proxy-Authorization header.
368
    Legacy(String),
369
    /// The decoded value of the basic authorization, with the user set to "tor-iso".
370
    Modern(Vec<u8>),
371
}
372

            
373
impl ProxyAuthorization {
374
    /// Return a ProxyAuthorization based on the value of the Proxy-Authorization header.
375
    ///
376
    /// Give a warning if the header is in the legacy (obsolete) format.
377
    fn from_header(value: &str) -> Self {
378
        if let Some(result) = Self::modern_from_header(value) {
379
            result
380
        } else {
381
            warn!(
382
                "{} header in obsolete format. If you want isolation, use {}, \
383
                 or {} with Basic authentication and username 'tor-iso'",
384
                hdr::PROXY_AUTHORIZATION,
385
                hdr::X_TOR_STREAM_ISOLATION,
386
                hdr::PROXY_AUTHORIZATION
387
            );
388
            Self::Legacy(value.to_owned())
389
        }
390
    }
391

            
392
    /// Helper: Try to return a Modern authorization value, if this is one.
393
    fn modern_from_header(value: &str) -> Option<Self> {
394
        use base64ct::Encoding as _;
395
        let value = value.trim_ascii();
396
        let (kind, value) = value.split_once(' ')?;
397
        if kind != "Basic" {
398
            return None;
399
        }
400
        let value = value.trim_ascii();
401
        // TODO: Is this the right format, or should we allow missing padding?
402
        let decoded = base64ct::Base64::decode_vec(value).ok()?;
403
        if decoded.starts_with(b"tor-iso:") {
404
            Some(ProxyAuthorization::Modern(decoded))
405
        } else {
406
            None
407
        }
408
    }
409

            
410
    /// Return true if this ProxyAuthorization has no authorization information.
411
    fn is_empty(&self) -> bool {
412
        match self {
413
            ProxyAuthorization::Legacy(s) => s.is_empty(),
414
            ProxyAuthorization::Modern(v) => v.is_empty(),
415
        }
416
    }
417
}
418

            
419
/// Look up the connection target given the value of an Tor-RPC-Target header.
420
#[cfg(feature = "rpc")]
421
fn find_conn_target<R: Runtime>(
422
    context: &ProxyContext<R>,
423
    rpc_target: Option<&str>,
424
) -> Result<ConnTarget<R>, HttpConnectError> {
425
    let Some(target_id) = rpc_target else {
426
        return Ok(ConnTarget::Client(Box::new(context.tor_client.clone())));
427
    };
428

            
429
    let Some(rpc_mgr) = &context.rpc_mgr else {
430
        return Err(HttpConnectError::NoRpcSupport);
431
    };
432

            
433
    let (context, object) = rpc_mgr
434
        .lookup_object(&rpc::ObjectId::from(target_id))
435
        .map_err(|_| HttpConnectError::RpcObjectNotFound)?;
436

            
437
    Ok(ConnTarget::Rpc { object, context })
438
}
439

            
440
/// Look up the connection target given the value of an Tor-RPC-Target header
441
//
442
// (This is the implementation when we have no RPC support.)
443
#[cfg(not(feature = "rpc"))]
444
fn find_conn_target<R: Runtime>(
445
    context: &ProxyContext<R>,
446
    rpc_target: Option<&str>,
447
) -> Result<arti_client::TorClient<R>, HttpConnectError> {
448
    if rpc_target.is_some() {
449
        Err(HttpConnectError::NoRpcSupport)
450
    } else {
451
        Ok(context.tor_client.clone())
452
    }
453
}
454

            
455
/// Extension trait on ResponseBuilder
456
trait RespBldExt {
457
    /// Return a response for a successful builder.
458
    fn ok(self, method: &Method) -> anyhow::Result<Response<Body>, HttpConnectError>;
459

            
460
    /// Return a response for an error message.
461
    fn err(
462
        self,
463
        method: &Method,
464
        message: impl Into<String>,
465
    ) -> Result<Response<Body>, HttpConnectError>;
466
}
467

            
468
impl RespBldExt for ResponseBuilder {
469
4
    fn ok(self, method: &Method) -> Result<Response<Body>, HttpConnectError> {
470
4
        let bld = add_common_headers(self, method);
471
4
        Ok(bld
472
4
            .body("".into())
473
4
            .map_err(into_internal!("Formatting HTTP response"))?)
474
4
    }
475

            
476
    fn err(
477
        self,
478
        method: &Method,
479
        message: impl Into<String>,
480
    ) -> Result<Response<Body>, HttpConnectError> {
481
        let bld = add_common_headers(self, method).header(hdr::CONTENT_TYPE, "text/plain");
482
        Ok(bld
483
            .body(message.into())
484
            .map_err(into_internal!("Formatting HTTP response"))?)
485
    }
486
}
487

            
488
/// Return a string representing our capabilities.
489
4
fn capabilities() -> &'static str {
490
    use std::sync::LazyLock;
491
2
    static CAPS: LazyLock<String> = LazyLock::new(|| {
492
2
        let mut caps = hdr::ALL_REQUEST_HEADERS.to_vec();
493
2
        caps.sort();
494
2
        caps.join(" ")
495
2
    });
496

            
497
4
    CAPS.as_str()
498
4
}
499

            
500
/// Add all common headers to the builder `bld`, and return a new builder.
501
4
fn add_common_headers(mut bld: ResponseBuilder, method: &Method) -> ResponseBuilder {
502
4
    bld = bld.header(hdr::TOR_CAPABILITIES, capabilities());
503
4
    if let (Some(software), Some(version)) = (
504
4
        option_env!("CARGO_PKG_NAME"),
505
4
        option_env!("CARGO_PKG_VERSION"),
506
    ) {
507
4
        if method == Method::CONNECT {
508
            bld = bld.header(
509
                hdr::VIA,
510
                format!("tor/1.0 tor-network ({software} {version})"),
511
            );
512
4
        } else {
513
4
            bld = bld.header(hdr::SERVER, format!("tor/1.0 ({software} {version})"));
514
4
        }
515
    }
516
4
    bld
517
4
}
518

            
519
/// An error that occurs during an HTTP CONNECT attempt, which can (usually)
520
/// be reported to the client.
521
#[derive(Clone, Debug, thiserror::Error)]
522
enum HttpConnectError {
523
    /// Tried to connect to an invalid stream target.
524
    #[error("Invalid target address {0:?}")]
525
    InvalidStreamTarget(Sv<String>, #[source] arti_client::TorAddrError),
526

            
527
    /// We found a duplicate HTTP header that we do not allow.
528
    ///
529
    /// (We only enforce this for the headers that we look at ourselves.)
530
    #[error("Duplicate HTTP header found.")]
531
    DuplicateHeader,
532

            
533
    /// We tried to found an HTTP header whose value wasn't encode as UTF-8.
534
    ///
535
    /// (We only enforce this for the headers that we look at ourselves.)
536
    #[error("HTTP header value was not in UTF-8")]
537
    HeaderNotUtf8,
538

            
539
    /// The Tor-Family-Preference header wasn't as expected.
540
    #[error("Unrecognized value for {}", hdr::TOR_FAMILY_PREFERENCE)]
541
    InvalidFamilyPreference,
542

            
543
    /// The user asked to use an RPC object, but we don't support RPC.
544
    #[error(
545
        "Found {} header, but we are running without RPC support",
546
        hdr::TOR_RPC_TARGET
547
    )]
548
    NoRpcSupport,
549

            
550
    /// The user asked to use an RPC object, but we didn't find the one they wanted.
551
    #[error("RPC target object not found")]
552
    RpcObjectNotFound,
553

            
554
    /// arti_client was unable to connect to a stream target.
555
    #[error("Unable to connect to {0}")]
556
    ConnectFailed(Sv<TorAddr>, #[source] ClientError),
557

            
558
    /// We encountered an internal error.
559
    #[error("Internal error while handling request")]
560
    Internal(#[from] tor_error::Bug),
561
}
562

            
563
impl HasKind for HttpConnectError {
564
    fn kind(&self) -> ErrorKind {
565
        use ErrorKind as EK;
566
        use HttpConnectError as HCE;
567
        match self {
568
            HCE::InvalidStreamTarget(_, _)
569
            | HCE::DuplicateHeader
570
            | HCE::HeaderNotUtf8
571
            | HCE::InvalidFamilyPreference
572
            | HCE::RpcObjectNotFound => EK::LocalProtocolViolation,
573
            HCE::NoRpcSupport => EK::FeatureDisabled,
574
            HCE::ConnectFailed(_, e) => e.kind(),
575
            HCE::Internal(e) => e.kind(),
576
        }
577
    }
578
}
579

            
580
impl HttpConnectError {
581
    /// Return an appropriate HTTP status code for this error.
582
    fn status_code(&self) -> StatusCode {
583
        use HttpConnectError as HCE; // Not a Joyce reference
584
        use StatusCode as SC;
585
        if let Some(end_reason) = self.remote_end_reason() {
586
            return end_reason_to_http_status(end_reason);
587
        }
588
        match self {
589
            HCE::InvalidStreamTarget(_, _)
590
            | HCE::DuplicateHeader
591
            | HCE::HeaderNotUtf8
592
            | HCE::InvalidFamilyPreference
593
            | HCE::RpcObjectNotFound
594
            | HCE::NoRpcSupport => SC::BAD_REQUEST,
595
            HCE::ConnectFailed(_, e) => e.kind().http_status_code(),
596
            HCE::Internal(e) => e.kind().http_status_code(),
597
        }
598
    }
599

            
600
    /// If possible, return a response that we should give to this error.
601
    fn try_into_response(self) -> Result<Response<Body>, HttpConnectError> {
602
        let error_kind = self.kind();
603
        let end_reason = self.remote_end_reason();
604
        let status_code = self.status_code();
605
        let mut request_failed = format!("arti/{error_kind:?}");
606
        if let Some(end_reason) = end_reason {
607
            request_failed.push_str(&format!(" end/{end_reason}"));
608
        }
609

            
610
        ResponseBuilder::new()
611
            .status(status_code)
612
            .header(hdr::TOR_REQUEST_FAILED, request_failed)
613
            .err(&Method::CONNECT, self.report().to_string())
614
    }
615

            
616
    /// Return the end reason for this error, if this error does in fact represent an END message
617
    /// from the remote side of a stream.
618
    //
619
    // TODO: This function is a bit fragile; it forces us to use APIs from tor-proto and
620
    // tor-cell that are not re-exported from arti-client.  It also relies on the fact that
621
    // there is a single error type way down in `tor-proto` representing a received END message.
622
    fn remote_end_reason(&self) -> Option<tor_cell::relaycell::msg::EndReason> {
623
        use tor_proto::Error::EndReceived;
624
        if let Some(EndReceived(reason)) = super::extract_proto_err(self) {
625
            Some(*reason)
626
        } else {
627
            None
628
        }
629
    }
630
}
631

            
632
/// Return the appropriate HTTP status code for a remote END reason.
633
///
634
/// Return `None` if the END reason is unrecognized and we should use the `ErrorKind`
635
///
636
/// (We  _could_ use the ErrorKind unconditionally,
637
/// but the mapping from END reason to ErrorKind is [given in the spec][spec],
638
/// so we try to obey it.)
639
///
640
/// [spec]: https://spec.torproject.org/http-connect.html#error-codes
641
fn end_reason_to_http_status(end_reason: tor_cell::relaycell::msg::EndReason) -> StatusCode {
642
    use StatusCode as S;
643
    use tor_cell::relaycell::msg::EndReason as R;
644
    match end_reason {
645
        //
646
        R::CONNECTREFUSED => S::FORBIDDEN, // 403
647
        // 500: Internal server error.
648
        R::MISC | R::NOTDIRECTORY => S::INTERNAL_SERVER_ERROR,
649

            
650
        // 502: Bad Gateway.
651
        R::DESTROY | R::DONE | R::HIBERNATING | R::INTERNAL | R::RESOURCELIMIT | R::TORPROTOCOL => {
652
            S::BAD_GATEWAY
653
        }
654
        // 503: Service unavailable
655
        R::CONNRESET | R::EXITPOLICY | R::NOROUTE | R::RESOLVEFAILED => S::SERVICE_UNAVAILABLE,
656

            
657
        // 504: Gateway timeout.
658
        R::TIMEOUT => S::GATEWAY_TIMEOUT,
659

            
660
        // This is possible if the other side sent an unrecognized error code.
661
        _ => S::INTERNAL_SERVER_ERROR, // 500
662
    }
663
}
664

            
665
/// Recover the original stream from a [`hyper::upgrade::Upgraded`].
666
fn deconstruct_upgrade<S>(upgraded: hyper::upgrade::Upgraded) -> Result<BufReader<S>, anyhow::Error>
667
where
668
    S: AsyncRead + AsyncWrite + Unpin + 'static,
669
{
670
    let parts: hyper::upgrade::Parts<FuturesIoCompat<BufReader<S>>> = upgraded
671
        .downcast()
672
        .map_err(|_| anyhow!("downcast failed!"))?;
673
    let hyper::upgrade::Parts { io, read_buf, .. } = parts;
674
    if !read_buf.is_empty() {
675
        // TODO Figure out whether this can happen, due to possible race conditions if the client
676
        // gets the OK before we check this?.
677
        return Err(anyhow!(
678
            "Extraneous data on hyper buffer after upgrade to proxy mode"
679
        ));
680
    }
681
    let io: BufReader<S> = io.0;
682
    Ok(io)
683
}
684

            
685
/// Recover the application stream from `request`, and launch tasks to transfer data between the application and
686
/// the `tor_stream`.
687
async fn transfer<S>(request: Request, tor_stream: arti_client::DataStream) -> anyhow::Result<()>
688
where
689
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
690
{
691
    let upgraded = hyper::upgrade::on(request)
692
        .await
693
        .context("Unable to upgrade connection")?;
694
    let app_stream: BufReader<S> = deconstruct_upgrade(upgraded)?;
695
    let tor_stream = BufReader::with_capacity(super::APP_STREAM_BUF_LEN, tor_stream);
696

            
697
    // Finally. relay traffic between
698
    // the application stream and the tor stream, forever.
699
    let _ = futures_copy::copy_buf_bidirectional(
700
        app_stream,
701
        tor_stream,
702
        futures_copy::eof::Close,
703
        futures_copy::eof::Close,
704
    )
705
    .await?;
706

            
707
    Ok(())
708
}
709

            
710
/// Return true if `host` is a possible value for a Host header addressing localhost.
711
44
fn host_is_localhost(host: &str) -> bool {
712
44
    if let Ok(addr) = host.parse::<std::net::SocketAddr>() {
713
12
        addr.ip().is_loopback()
714
32
    } else if let Ok(ip) = host.parse::<std::net::IpAddr>() {
715
6
        ip.is_loopback()
716
26
    } else if let Some((addr, port)) = host.split_once(':') {
717
14
        port.parse::<std::num::NonZeroU16>().is_ok() && addr.eq_ignore_ascii_case("localhost")
718
    } else {
719
12
        host.eq_ignore_ascii_case("localhost")
720
    }
721
44
}
722

            
723
/// Helper module: Make `futures` types usable by `hyper`.
724
//
725
// TODO: We may want to expose this as a separate crate, or move it into tor-async-utils,
726
// if we turn out to need it elsewhere.
727
mod hyper_futures_io {
728
    use pin_project::pin_project;
729
    use std::{
730
        io,
731
        pin::Pin,
732
        task::{Context, Poll, ready},
733
    };
734

            
735
    use hyper::rt::ReadBufCursor;
736

            
737
    /// A wrapper around an AsyncBufRead + AsyncWrite to implement traits required by hyper.
738
    #[derive(Debug)]
739
    #[pin_project]
740
    pub(super) struct FuturesIoCompat<T>(#[pin] pub(super) T);
741

            
742
    impl<T> hyper::rt::Read for FuturesIoCompat<T>
743
    where
744
        // We require AsyncBufRead here it is a good match for ReadBufCursor::put_slice.
745
        T: futures::io::AsyncBufRead,
746
    {
747
16
        fn poll_read(
748
16
            self: Pin<&mut Self>,
749
16
            cx: &mut Context<'_>,
750
16
            mut buf: ReadBufCursor<'_>,
751
16
        ) -> Poll<Result<(), io::Error>> {
752
16
            let mut this = self.project();
753

            
754
16
            let available: &[u8] = ready!(this.0.as_mut().poll_fill_buf(cx))?;
755
8
            let n_available = available.len();
756

            
757
8
            if !available.is_empty() {
758
8
                buf.put_slice(available);
759
8
                this.0.consume(n_available);
760
8
            }
761

            
762
            // This means either "data arrived" or "EOF" depending on whether we added new bytes.
763
8
            Poll::Ready(Ok(()))
764
16
        }
765
    }
766

            
767
    impl<T> hyper::rt::Write for FuturesIoCompat<T>
768
    where
769
        T: futures::io::AsyncWrite,
770
    {
771
4
        fn poll_write(
772
4
            self: Pin<&mut Self>,
773
4
            cx: &mut Context<'_>,
774
4
            buf: &[u8],
775
4
        ) -> Poll<Result<usize, std::io::Error>> {
776
4
            self.project().0.poll_write(cx, buf)
777
4
        }
778

            
779
4
        fn poll_flush(
780
4
            self: Pin<&mut Self>,
781
4
            cx: &mut Context<'_>,
782
4
        ) -> Poll<Result<(), std::io::Error>> {
783
4
            self.project().0.poll_flush(cx)
784
4
        }
785

            
786
4
        fn poll_shutdown(
787
4
            self: Pin<&mut Self>,
788
4
            cx: &mut Context<'_>,
789
4
        ) -> Poll<Result<(), std::io::Error>> {
790
4
            self.project().0.poll_close(cx)
791
4
        }
792
    }
793
}
794

            
795
#[cfg(test)]
796
mod test {
797
    // @@ begin test lint list maintained by maint/add_warning @@
798
    #![allow(clippy::bool_assert_comparison)]
799
    #![allow(clippy::clone_on_copy)]
800
    #![allow(clippy::dbg_macro)]
801
    #![allow(clippy::mixed_attributes_style)]
802
    #![allow(clippy::print_stderr)]
803
    #![allow(clippy::print_stdout)]
804
    #![allow(clippy::single_char_pattern)]
805
    #![allow(clippy::unwrap_used)]
806
    #![allow(clippy::unchecked_time_subtraction)]
807
    #![allow(clippy::useless_vec)]
808
    #![allow(clippy::needless_pass_by_value)]
809
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
810

            
811
    use arti_client::{BootstrapBehavior, TorClient, config::TorClientConfigBuilder};
812
    use futures::{AsyncReadExt as _, AsyncWriteExt as _};
813
    use tor_rtmock::{MockRuntime, io::stream_pair};
814

            
815
    use super::*;
816

            
817
    // Make sure that HeaderMap is case-insensitive as the documentation implies.
818
    #[test]
819
    fn headermap_casei() {
820
        use http::header::{HeaderMap, HeaderValue};
821
        let mut hm = HeaderMap::new();
822
        hm.append(
823
            "my-head-is-a-house-for",
824
            HeaderValue::from_str("a-secret").unwrap(),
825
        );
826
        assert_eq!(
827
            hm.get("My-Head-Is-A-House-For").unwrap().as_bytes(),
828
            b"a-secret"
829
        );
830
        assert_eq!(
831
            hm.get("MY-HEAD-IS-A-HOUSE-FOR").unwrap().as_bytes(),
832
            b"a-secret"
833
        );
834
    }
835

            
836
    #[test]
837
    fn host_header_localhost() {
838
        assert_eq!(host_is_localhost("localhost"), true);
839
        assert_eq!(host_is_localhost("localhost:9999"), true);
840
        assert_eq!(host_is_localhost("localHOSt:9999"), true);
841
        assert_eq!(host_is_localhost("127.0.0.1:9999"), true);
842
        assert_eq!(host_is_localhost("[::1]:9999"), true);
843
        assert_eq!(host_is_localhost("127.1.2.3:1234"), true);
844
        assert_eq!(host_is_localhost("127.0.0.1"), true);
845
        assert_eq!(host_is_localhost("::1"), true);
846

            
847
        assert_eq!(host_is_localhost("[::1]"), false); // not in the right format!
848
        assert_eq!(host_is_localhost("www.torproject.org"), false);
849
        assert_eq!(host_is_localhost("www.torproject.org:1234"), false);
850
        assert_eq!(host_is_localhost("localhost:0"), false);
851
        assert_eq!(host_is_localhost("localhost:999999"), false);
852
        assert_eq!(host_is_localhost("plocalhost:1234"), false);
853
        assert_eq!(host_is_localhost("[::0]:1234"), false);
854
        assert_eq!(host_is_localhost("192.0.2.55:1234"), false);
855
        assert_eq!(host_is_localhost("3fff::1"), false);
856
        assert_eq!(host_is_localhost("[3fff::1]:1234"), false);
857
    }
858

            
859
    fn interactive_test_setup(
860
        rt: &MockRuntime,
861
    ) -> anyhow::Result<(
862
        tor_rtmock::io::LocalStream,
863
        impl Future<Output = anyhow::Result<()>>,
864
        tempfile::TempDir,
865
    )> {
866
        let (s1, s2) = stream_pair();
867
        let s1: BufReader<_> = BufReader::new(s1);
868

            
869
        let iso: ListenerIsolation = (7, "127.0.0.1".parse().unwrap());
870
        let dir = tempfile::TempDir::new().unwrap();
871
        let cfg = TorClientConfigBuilder::from_directories(
872
            dir.as_ref().join("state"),
873
            dir.as_ref().join("cache"),
874
        )
875
        .build()
876
        .unwrap();
877
        let tor_client = TorClient::with_runtime(rt.clone())
878
            .config(cfg)
879
            .bootstrap_behavior(BootstrapBehavior::Manual)
880
            .create_unbootstrapped()?;
881
        let context: ProxyContext<_> = ProxyContext {
882
            tor_client,
883
            #[cfg(feature = "rpc")]
884
            rpc_mgr: None,
885
        };
886
        let handle = rt.spawn_join("HTTP Handler", handle_http_conn(context, s1, iso));
887
        Ok((s2, handle, dir))
888
    }
889

            
890
    #[test]
891
    fn successful_options_test() -> anyhow::Result<()> {
892
        // Try an OPTIONS request and make sure we get a plausible-looking answer.
893
        //
894
        // (This test is mostly here to make sure that invalid_host_test() isn't failing because
895
        // of anything besides the Host header.)
896
        MockRuntime::try_test_with_various(async |rt| -> anyhow::Result<()> {
897
            let (mut s, join, _dir) = interactive_test_setup(&rt)?;
898

            
899
            s.write_all(b"OPTIONS * HTTP/1.0\r\nHost: localhost\r\n\r\n")
900
                .await?;
901
            let mut buf = Vec::new();
902
            let _n_read = s.read_to_end(&mut buf).await?;
903
            let () = join.await?;
904

            
905
            let reply = std::str::from_utf8(&buf)?;
906
            assert!(dbg!(reply).starts_with("HTTP/1.0 200 OK\r\n"));
907

            
908
            Ok(())
909
        })
910
    }
911

            
912
    #[test]
913
    fn invalid_host_test() -> anyhow::Result<()> {
914
        // Try a hostname that looks like a CSRF attempt and make sure that we discard it without
915
        // any reply.
916
        MockRuntime::try_test_with_various(async |rt| -> anyhow::Result<()> {
917
            let (mut s, join, _dir) = interactive_test_setup(&rt)?;
918

            
919
            s.write_all(b"OPTIONS * HTTP/1.0\r\nHost: csrf.example.com\r\n\r\n")
920
                .await?;
921
            let mut buf = Vec::new();
922
            let n_read = s.read_to_end(&mut buf).await?;
923
            let http_outcome = join.await;
924

            
925
            assert_eq!(n_read, 0);
926
            assert!(buf.is_empty());
927
            assert!(http_outcome.is_err());
928

            
929
            let error_msg = http_outcome.unwrap_err().source().unwrap().to_string();
930
            assert_eq!(
931
                error_msg,
932
                r#"Host header "csrf.example.com" was not localhost. Rejecting request."#
933
            );
934

            
935
            Ok(())
936
        })
937
    }
938
}