1
//! A simple reverse-proxy implementation for onion services.
2

            
3
use std::sync::{Arc, Mutex};
4

            
5
use futures::io::BufReader;
6
use futures::{
7
    AsyncRead, AsyncWrite, Future, FutureExt as _, Stream, StreamExt as _, select_biased,
8
};
9
use itertools::iproduct;
10
use oneshot_fused_workaround as oneshot;
11
use safelog::sensitive as sv;
12
use std::collections::HashMap;
13
use std::io::Error as IoError;
14
use strum::IntoEnumIterator;
15
use tor_cell::relaycell::msg as relaymsg;
16
use tor_error::{ErrorKind, HasKind, debug_report};
17
use tor_hsservice::{HsNickname, RendRequest, StreamRequest};
18
use tor_log_ratelim::log_ratelim;
19
use tor_proto::client::stream::DataStream;
20
use tor_proto::stream::IncomingStreamRequest;
21
use tor_rtcompat::{Runtime, SpawnExt as _};
22

            
23
use crate::config::{
24
    Encapsulation, ProxyAction, ProxyActionDiscriminants, ProxyConfig, TargetAddr,
25
};
26

            
27
/// A reverse proxy that handles connections from an `OnionService` by routing
28
/// them to local addresses.
29
#[derive(Debug)]
30
pub struct OnionServiceReverseProxy {
31
    /// Mutable state held by this reverse proxy.
32
    state: Mutex<State>,
33
}
34

            
35
/// Mutable part of an RProxy
36
#[derive(Debug)]
37
struct State {
38
    /// The current configuration for this reverse proxy.
39
    config: ProxyConfig,
40
    /// A sender that we'll drop when it's time to shut down this proxy.
41
    shutdown_tx: Option<oneshot::Sender<void::Void>>,
42
    /// A receiver that we'll use to monitor for shutdown signals.
43
    shutdown_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
44
}
45

            
46
/// An error that prevents further progress while processing requests.
47
#[derive(Clone, Debug, thiserror::Error)]
48
#[non_exhaustive]
49
pub enum HandleRequestsError {
50
    /// The runtime says it was unable to spawn a task.
51
    #[error("Unable to spawn a task")]
52
    Spawn(#[source] Arc<futures::task::SpawnError>),
53
}
54

            
55
impl HasKind for HandleRequestsError {
56
    fn kind(&self) -> ErrorKind {
57
        match self {
58
            HandleRequestsError::Spawn(e) => e.kind(),
59
        }
60
    }
61
}
62

            
63
impl OnionServiceReverseProxy {
64
    /// Create a new proxy with a given configuration.
65
    pub fn new(config: ProxyConfig) -> Arc<Self> {
66
        let (shutdown_tx, shutdown_rx) = oneshot::channel();
67
        Arc::new(Self {
68
            state: Mutex::new(State {
69
                config,
70
                shutdown_tx: Some(shutdown_tx),
71
                shutdown_rx: shutdown_rx.shared(),
72
            }),
73
        })
74
    }
75

            
76
    /// Try to change the configuration of this proxy.
77
    ///
78
    /// This change applies only to new connections through the proxy; existing
79
    /// connections are not affected.
80
    pub fn reconfigure(
81
        &self,
82
        config: ProxyConfig,
83
        how: tor_config::Reconfigure,
84
    ) -> Result<(), tor_config::ReconfigureError> {
85
        if how == tor_config::Reconfigure::CheckAllOrNothing {
86
            // Every possible reconfiguration is allowed.
87
            return Ok(());
88
        }
89
        let mut state = self.state.lock().expect("poisoned lock");
90
        state.config = config;
91
        // Note: we don't need to use a postage::watch here, since we just want
92
        // to lock this configuration whenever we get a request.  We could use a
93
        // Mutex<Arc<>> instead, but the performance shouldn't matter.
94
        //
95
        Ok(())
96
    }
97

            
98
    /// Shut down all request-handlers running using with this proxy.
99
    pub fn shutdown(&self) {
100
        let mut state = self.state.lock().expect("poisoned lock");
101
        let _ = state.shutdown_tx.take();
102
    }
103

            
104
    /// Use this proxy to handle a stream of [`RendRequest`]s.
105
    ///
106
    /// The future returned by this function blocks indefinitely, so you may
107
    /// want to spawn a separate task for it.
108
    ///
109
    /// The provided nickname is used for logging.
110
    pub async fn handle_requests<R, S>(
111
        &self,
112
        runtime: R,
113
        nickname: HsNickname,
114
        requests: S,
115
    ) -> Result<(), HandleRequestsError>
116
    where
117
        R: Runtime,
118
        S: Stream<Item = RendRequest> + Unpin,
119
    {
120
        let mut stream_requests = tor_hsservice::handle_rend_requests(requests).fuse();
121
        let mut shutdown_rx = self
122
            .state
123
            .lock()
124
            .expect("poisoned lock")
125
            .shutdown_rx
126
            .clone()
127
            .fuse();
128
        let nickname = Arc::new(nickname);
129

            
130
        /// Which of the three counters for each action
131
        #[cfg(feature = "metrics")]
132
        #[derive(Clone, Copy, Eq, PartialEq, Hash)]
133
        enum CounterSelector {
134
            /// Two counters, one for successes, one for failures
135
            Ret(Result<(), ()>),
136
            /// One counter for the total
137
            Total,
138
        }
139

            
140
        #[cfg(feature = "metrics")]
141
        let metrics_counters = {
142
            use CounterSelector as CS;
143

            
144
            let counters = iproduct!(
145
                ProxyActionDiscriminants::iter(),
146
                [
147
                    (CS::Total, "arti_hss_proxy_connections_total"),
148
                    (CS::Ret(Ok(())), "arti_hss_proxy_connections_ok_total"),
149
                    (CS::Ret(Err(())), "arti_hss_proxy_connections_failed_total"),
150
                ],
151
            )
152
            .map(|(action, (outcome, name))| {
153
                let k = (action, outcome);
154
                let nickname = nickname.to_string();
155
                let action: &str = action.into();
156
                let v = metrics::counter!(name, "nickname" => nickname, "action" => action);
157
                (k, v)
158
            })
159
            .collect::<HashMap<(ProxyActionDiscriminants, CounterSelector), _>>();
160

            
161
            Arc::new(counters)
162
        };
163

            
164
        loop {
165
            let stream_request = select_biased! {
166
                _ = shutdown_rx => return Ok(()),
167
                stream_request = stream_requests.next() => match stream_request {
168
                    None => return Ok(()),
169
                    Some(s) => s,
170
                }
171
            };
172

            
173
            runtime.spawn({
174
                let action = self.choose_action(stream_request.request());
175
                let runtime = runtime.clone();
176
                let nickname = nickname.clone();
177
                let req = stream_request.request().clone();
178

            
179
                #[cfg(feature = "metrics")]
180
                let metrics_counters = metrics_counters.clone();
181

            
182
                async move {
183
                    let outcome =
184
                        run_action(runtime, nickname.as_ref(), action.clone(), stream_request).await;
185

            
186
                    #[cfg(feature = "metrics")]
187
                    {
188
                        use CounterSelector as CS;
189

            
190
                        let action = ProxyActionDiscriminants::from(&action);
191
                        let outcome = outcome.as_ref().map(|_|()).map_err(|_|());
192
                        for outcome in [CS::Total, CS::Ret(outcome)] {
193
                            if let Some(counter) = metrics_counters.get(&(action, outcome)) {
194
                                counter.increment(1);
195
                            } else {
196
                                // statically be impossible, but let's not panic
197
                            }
198
                        }
199
                    }
200

            
201
                    log_ratelim!(
202
                        "Performing action on {}", nickname;
203
                        outcome;
204
                        Err(_) => WARN, "Unable to take action {:?} for request {:?}", sv(action), sv(req)
205
                    );
206
                }
207
            })
208
                .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?;
209
        }
210
    }
211

            
212
    /// Choose the configured action that we should take in response to a
213
    /// [`StreamRequest`], based on our current configuration.
214
    fn choose_action(&self, stream_request: &IncomingStreamRequest) -> ProxyAction {
215
        let port: u16 = match stream_request {
216
            IncomingStreamRequest::Begin(begin) => {
217
                // The C tor implementation deliberately ignores the address and
218
                // flags on the BEGIN message, so we do too.
219
                begin.port()
220
            }
221
            other => {
222
                tracing::warn!(
223
                    "Rejecting onion service request for invalid command {:?}. Internal error.",
224
                    other
225
                );
226
                return ProxyAction::DestroyCircuit;
227
            }
228
        };
229

            
230
        self.state
231
            .lock()
232
            .expect("poisoned lock")
233
            .config
234
            .resolve_port_for_begin(port)
235
            .cloned()
236
            // The default action is "destroy the circuit."
237
            .unwrap_or(ProxyAction::DestroyCircuit)
238
    }
239
}
240

            
241
/// Take the configured action from `action` on the incoming request `request`.
242
async fn run_action<R: Runtime>(
243
    runtime: R,
244
    nickname: &HsNickname,
245
    action: ProxyAction,
246
    request: StreamRequest,
247
) -> Result<(), RequestFailed> {
248
    match action {
249
        ProxyAction::DestroyCircuit => {
250
            request
251
                .shutdown_circuit()
252
                .map_err(RequestFailed::CantDestroy)?;
253
        }
254
        ProxyAction::Forward(encap, target) => match (encap, target) {
255
            (Encapsulation::Simple, ref addr @ TargetAddr::Inet(a)) => {
256
                let rt_clone = runtime.clone();
257

            
258
                // We don't use any custom options on the socket.
259
                let connect_options = Default::default();
260
                let stream = runtime.connect(&a, &connect_options);
261

            
262
                forward_connection(rt_clone, request, stream, nickname, addr).await?;
263
            } /* TODO (#1246)
264
                (Encapsulation::Simple, TargetAddr::Unix(_)) => {
265
                    // TODO: We need to implement unix connections.
266
                }
267
              */
268
        },
269
        ProxyAction::RejectStream => {
270
            // C tor sends DONE in this case, so we do too.
271
            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE);
272

            
273
            request
274
                .reject(end)
275
                .await
276
                .map_err(RequestFailed::CantReject)?;
277
        }
278
        ProxyAction::IgnoreStream => drop(request),
279
    };
280
    Ok(())
281
}
282

            
283
/// An error from a single attempt to handle an onion service request.
284
#[derive(thiserror::Error, Debug, Clone)]
285
enum RequestFailed {
286
    /// Encountered an error trying to destroy a circuit.
287
    #[error("Unable to destroy onion service circuit")]
288
    CantDestroy(#[source] tor_error::Bug),
289

            
290
    /// Encountered an error trying to reject a single stream request.
291
    #[error("Unable to reject onion service request")]
292
    CantReject(#[source] tor_hsservice::ClientError),
293

            
294
    /// Encountered an error trying to tell the remote onion service client that
295
    /// we have accepted their connection.
296
    #[error("Unable to accept onion service connection")]
297
    AcceptRemote(#[source] tor_hsservice::ClientError),
298

            
299
    /// The runtime refused to spawn a task for us.
300
    #[error("Unable to spawn task")]
301
    Spawn(#[source] Arc<futures::task::SpawnError>),
302
}
303

            
304
impl HasKind for RequestFailed {
305
    fn kind(&self) -> ErrorKind {
306
        match self {
307
            RequestFailed::CantDestroy(e) => e.kind(),
308
            RequestFailed::CantReject(e) => e.kind(),
309
            RequestFailed::AcceptRemote(e) => e.kind(),
310
            RequestFailed::Spawn(e) => e.kind(),
311
        }
312
    }
313
}
314

            
315
/// Size of buffer to use for communication between Arti and the
316
/// target service.
317
//
318
// This particular value is chosen more or less arbitrarily.
319
// Larger values let us do fewer reads from the application,
320
// but consume more memory.
321
//
322
// (The default value for BufReader is 8k as of this writing.)
323
const STREAM_BUF_LEN: usize = 4096;
324

            
325
/// Try to open a connection to an appropriate local target using
326
/// `target_stream_future`.  If successful, try to report success on `request`
327
/// and transmit data between the two stream indefinitely.  On failure, close
328
/// `request`.
329
///
330
/// Only return an error if we were unable to behave as intended due to a
331
/// problem we did not already report.
332
async fn forward_connection<R, FUT, TS>(
333
    runtime: R,
334
    request: StreamRequest,
335
    target_stream_future: FUT,
336
    nickname: &HsNickname,
337
    addr: &TargetAddr,
338
) -> Result<(), RequestFailed>
339
where
340
    R: Runtime,
341
    FUT: Future<Output = Result<TS, IoError>>,
342
    TS: AsyncRead + AsyncWrite + Send + 'static,
343
{
344
    let local_stream = target_stream_future.await.map_err(Arc::new);
345

            
346
    // TODO: change this to "log_ratelim!(nickname=%nickname, ..." when log_ratelim can do that
347
    // (we should search for HSS log messages and make them all be in the same form)
348
    log_ratelim!(
349
        "Connecting to {} for onion service {}", sv(addr), nickname;
350
        local_stream
351
    );
352

            
353
    let local_stream = match local_stream {
354
        Ok(s) => s,
355
        Err(_) => {
356
            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE);
357
            if let Err(e_rejecting) = request.reject(end).await {
358
                debug_report!(
359
                    &e_rejecting,
360
                    "Unable to reject onion service request from client"
361
                );
362
                return Err(RequestFailed::CantReject(e_rejecting));
363
            }
364
            // We reported the (rate-limited) error from local_stream in
365
            // DEBUG_REPORT above.
366
            return Ok(());
367
        }
368
    };
369

            
370
    let onion_service_stream: DataStream = {
371
        let connected = relaymsg::Connected::new_empty();
372
        request
373
            .accept(connected)
374
            .await
375
            .map_err(RequestFailed::AcceptRemote)?
376
    };
377

            
378
    let onion_service_stream = BufReader::with_capacity(STREAM_BUF_LEN, onion_service_stream);
379
    let local_stream = BufReader::with_capacity(STREAM_BUF_LEN, local_stream);
380

            
381
    runtime
382
        .spawn(
383
            futures_copy::copy_buf_bidirectional(
384
                onion_service_stream,
385
                local_stream,
386
                futures_copy::eof::Close,
387
                futures_copy::eof::Close,
388
            )
389
            .map(|_| ()),
390
        )
391
        .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?;
392

            
393
    Ok(())
394
}