1
//! Declarations for traits that we need our runtimes to implement.
2
use async_trait::async_trait;
3
use asynchronous_codec::Framed;
4
use futures::future::{FutureExt, RemoteHandle};
5
use futures::stream;
6
use futures::task::{Spawn, SpawnError};
7
use futures::{AsyncRead, AsyncWrite, Future};
8
use std::borrow::Cow;
9
use std::fmt::Debug;
10
use std::io::{self, Result as IoResult};
11
use std::net;
12
use std::time::{Duration, Instant, SystemTime};
13
use tor_general_addr::unix;
14

            
15
#[cfg(feature = "tls-server")]
16
use tor_cert_x509::TlsKeyAndCert;
17

            
18
/// A runtime for use by Tor client library code.
19
///
20
/// This trait comprises several other traits that we require all of our
21
/// runtimes to provide:
22
///
23
/// * [`futures::task::Spawn`] or [`SpawnExt`] to launch new background tasks.
24
/// * [`SleepProvider`] to pause a task for a given amount of time.
25
/// * [`CoarseTimeProvider`] for a cheaper but less accurate notion of time.
26
/// * [`NetStreamProvider`] to launch and accept network connections.
27
/// * [`TlsProvider`] to launch TLS connections.
28
/// * [`Blocking`] to be able to run synchronous (cpubound or IO) code,
29
///   and *re*-enter the async context from synchronous thread
30
///   (This may become optional in the future, if/when we add WASM
31
///   support).
32
///
33
/// A value which is only `Runtime` cannot be used as an *entry point* to the runtime.
34
/// For that, it must also implement [`ToplevelBlockOn`],
35
/// making it a [`ToplevelRuntime`].
36
/// Since you can only [enter a runtime](ToplevelBlockOn::block_on) once,
37
/// typically you use a `ToplevelRuntime` to enter the runtime,
38
/// and use it as only a `Runtime` afterwards.
39
/// This means that library code should typically
40
/// deal with `Runtime` rather than `ToplevelRuntime`.
41
///
42
/// We require that every `Runtime` has an efficient [`Clone`] implementation
43
/// that gives a new opaque reference to the same underlying runtime.
44
///
45
/// Additionally, every `Runtime` is [`Send`] and [`Sync`], though these
46
/// requirements may be somewhat relaxed in the future.
47
///
48
/// At some future point,
49
/// Arti may require that the runtime `impl<S> TlsProvider<S>` (for suitable`S`),
50
/// rather than just for their own `TcpStream`s.
51
/// I.e., Arti may start to require that the runtime's TLS provider can wrap any streams,
52
/// not only the runtime's own TCP streams.
53
/// This might be expressed as an additional supertrait bound on `Runtime`,
54
/// eg when Rust supports GATs,
55
/// or as an additional bound on the Arti APIs that currently use `Runtime`.
56
/// For API future compatibility, if you `impl Runtime for MyRuntime`,
57
/// you should also ensure that you
58
/// ```ignore
59
/// impl<S> TlsProvider<S> for MyRuntime
60
/// where S: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static
61
/// ```
62
//
63
/// Perhaps we will need this if we make our own TLS connections *through* Tor,
64
/// rather than just channels to guards.
65
pub trait Runtime:
66
    Sync
67
    + Send
68
    + Spawn
69
    + Blocking
70
    + Clone
71
    + SleepProvider
72
    + CoarseTimeProvider
73
    + NetStreamProvider<net::SocketAddr>
74
    + NetStreamProvider<unix::SocketAddr>
75
    + TlsProvider<<Self as NetStreamProvider<net::SocketAddr>>::Stream>
76
    + UdpProvider
77
    + Debug
78
    + 'static
79
{
80
}
81

            
82
impl<T> Runtime for T where
83
    T: Sync
84
        + Send
85
        + Spawn
86
        + Blocking
87
        + Clone
88
        + SleepProvider
89
        + CoarseTimeProvider
90
        + NetStreamProvider<net::SocketAddr>
91
        + NetStreamProvider<unix::SocketAddr>
92
        + TlsProvider<<Self as NetStreamProvider<net::SocketAddr>>::Stream>
93
        + UdpProvider
94
        + Debug
95
        + 'static
96
{
97
}
98

            
99
/// A runtime that we can use to run Tor as a client.
100
/// * [`ToplevelBlockOn`] to block on a top-level future and run it to completion
101
///   (This may become optional in the future, if/when we add WASM
102
///   support).
103
///
104
pub trait ToplevelRuntime: Runtime + ToplevelBlockOn {}
105
impl<T: Runtime + ToplevelBlockOn> ToplevelRuntime for T {}
106

            
107
/// Trait for a runtime that can wait until a timer has expired.
108
///
109
/// Every `SleepProvider` also implements
110
/// [`SleepProviderExt`](crate::SleepProviderExt); see that trait
111
/// for other useful functions.
112
pub trait SleepProvider: Clone + Send + Sync + 'static {
113
    /// A future returned by [`SleepProvider::sleep()`]
114
    type SleepFuture: Future<Output = ()> + Send + 'static;
115
    /// Return a future that will be ready after `duration` has
116
    /// elapsed.
117
    #[must_use = "sleep() returns a future, which does nothing unless used"]
118
    fn sleep(&self, duration: Duration) -> Self::SleepFuture;
119

            
120
    /// Return the SleepProvider's view of the current instant.
121
    ///
122
    /// (This is the same as `Instant::now`, if not running in test mode.)
123
54868
    fn now(&self) -> Instant {
124
54868
        Instant::now()
125
54868
    }
126

            
127
    /// Return the SleepProvider's view of the current wall-clock time.
128
    ///
129
    /// (This is the same as `SystemTime::now`, if not running in test mode.)
130
29498
    fn wallclock(&self) -> SystemTime {
131
29498
        SystemTime::now()
132
29498
    }
133

            
134
    /// Signify that a test running under mock time shouldn't advance time yet, with a given
135
    /// unique reason string. This is useful for making sure (mock) time doesn't advance while
136
    /// things that might require some (real-world) time to complete do so, such as spawning a task
137
    /// on another thread.
138
    ///
139
    /// Call `release_advance` with the same reason string in order to unblock.
140
    ///
141
    /// This method is only for testing: it should never have any
142
    /// effect when invoked on non-testing runtimes.
143
16
    fn block_advance<T: Into<String>>(&self, _reason: T) {}
144

            
145
    /// Signify that the reason to withhold time advancing provided in a call to `block_advance` no
146
    /// longer exists, and it's fine to move time forward if nothing else is blocking advances.
147
    ///
148
    /// This method is only for testing: it should never have any
149
    /// effect when invoked on non-testing runtimes.
150
    fn release_advance<T: Into<String>>(&self, _reason: T) {}
151

            
152
    /// Allow a test running under mock time to advance time by the provided duration, even if the
153
    /// above `block_advance` API has been used.
154
    ///
155
    /// This method is only for testing: it should never have any
156
    /// effect when invoked on non-testing runtimes.
157
1272
    fn allow_one_advance(&self, _dur: Duration) {}
158
}
159

            
160
/// A provider of reduced-precision timestamps
161
///
162
/// This doesn't provide any facility for sleeping.
163
/// If you want to sleep based on reduced-precision timestamps,
164
/// convert the desired sleep duration to `std::time::Duration`
165
/// and use [`SleepProvider`].
166
pub trait CoarseTimeProvider: Clone + Send + Sync + 'static {
167
    /// Return the `CoarseTimeProvider`'s view of the current instant.
168
    ///
169
    /// This is supposed to be cheaper than `std::time::Instant::now`.
170
    fn now_coarse(&self) -> crate::coarse_time::CoarseInstant;
171
}
172

            
173
/// Trait for a runtime that can be entered to block on a toplevel future.
174
///
175
/// This trait is *not* implied by `Runtime`, only by `ToplevelRuntime`.
176
/// `ToplevelRuntime` is available at the toplevel of each program,
177
/// typically, where a concrete async executor is selected.
178
pub trait ToplevelBlockOn: Clone + Send + Sync + 'static {
179
    /// Run `future` until it is ready, and return its output.
180
    ///
181
    /// # Not reentrant!
182
    ///
183
    /// There should be one call to `block_on` (for each fresh `Runtime`),
184
    /// at the toplevel of the program (or test case).
185
    /// (Sequential calls to `block_on` from the same thread are allowed.)
186
    ///
187
    /// `block_on` may not function correctly if is called
188
    /// from multiple threads simultaneously,
189
    /// or if calls involving different `Runtime`s are interleaved on the same thread.
190
    /// (Specific runtimes may offer better guarantees.)
191
    ///
192
    /// (`tor_rtmock::MockExecutor`'s implementation will often detect violations.)
193
    fn block_on<F: Future>(&self, future: F) -> F::Output;
194
}
195

            
196
/// Support for interacting with blocking (non-async) code
197
///
198
/// This supports two use cases: blocking IO and CPU-intensive activities.
199
/// (In both of these cases, simply calling the functions within an `async` task
200
/// is a bad idea, because that can block the whole async runtime.)
201
///
202
/// ### Blocking IO
203
///
204
/// `Blocking` can be used to interact with libraries or OS primitives
205
/// that only offer a synchronous, blocking, interface.
206
///
207
/// Use [`spawn_blocking`](Blocking::spawn_blocking)
208
/// when it is convenient to have a long-running thread,
209
/// for these operations.
210
///
211
/// Use [`blocking_io`](Blocking::blocking_io)
212
/// when the blocking code is usually expected to complete quickly,
213
/// and/or you will be switching back and forth a lot
214
/// between sync and async contexts.
215
/// Note that you cannot call back to async code from within `blocking_io`.
216
///
217
/// ### CPU-intensive activities
218
///
219
/// Perform CPU-intensive work, that ought not to block the program's main loop,
220
/// via [`Blocking::spawn_blocking`].
221
///
222
/// `spawn_blocking` does not apply any limiting or prioritisation;
223
/// its threads simply compete for CPU with other threads in the program.
224
/// That must be done by the caller; therefore:
225
///
226
/// **Limit the number of cpu threads** spawned
227
/// in order to limit the total amount of CPU time consumed by any part of the program.
228
/// For example, consider using one CPU thread per Tor Hidden Service.
229
///
230
/// It is most performant to spawn a long-running thread,
231
/// rather than to repeatedly spawn short-lived threads for individual work items.
232
/// This also makes it easier to limit the number of concurrente cpu threads.
233
/// For the same reason, [`Blocking::blocking_io`] should be avoided
234
/// for the CPU-intensive use case.
235
///
236
/// ### Mapping to concrete functions from underlying libraries
237
///
238
/// The semantics of `Blocking` are heavily influenced by Tokio
239
/// and by the desire to be able to use tor-rtmock's `MockExecutor` to test Arti code.
240
///
241
/// | `tor-rtcompat`               | Tokio                 | `MockExecutor`                 |
242
/// |------------------------------|-----------------------|--------------------------------|
243
/// | `ToplevelBlockOn::block_on`  | `Runtime::block_on`   | `ToplevelBlockOn::block_on`    |
244
/// | `Blocking::spawn_blocking`   | `task::spawn_blocking`  | `subthread_spawn`            |
245
/// | `Blocking::reenter_block_on` | `Handle::block_on`    | `subthread_block_on_future`    |
246
/// | `Blocking::blocking_io`      | `block_in_place`      | `subthread_spawn`              |
247
/// | (not available)              | (not implemented)     | `progress_until_stalled` etc.  |
248
///
249
/// Re `block_on`, see also the docs for the underlying implementations in
250
/// [tokio][tokio-threadpool] and
251
/// [async-std][async-std-threadpool].
252
///
253
/// [tokio-threadpool]: https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html
254
/// [async-std-threadpool]: https://docs.rs/async-std/latest/async_std/task/fn.spawn_blocking.html
255
pub trait Blocking: Clone + Send + Sync + 'static {
256
    /// Spawn a thread for blocking IO or CPU-bound work.
257
    ///
258
    /// This is used in two situations:
259
    ///
260
    ///  * To perform blocking IO
261
    ///  * For cpu-intensive work
262
    ///
263
    /// See [`Blocking`]'s trait level docs for advice on choosing
264
    /// between `spawn_blocking` and [`Blocking::blocking_io`].
265
    ///
266
    /// `Blocking::spawn_blocking` is similar to `std::thread::spawn`
267
    /// but also makes any necessary arrangements so that `reenter_block_on`,
268
    /// can be called on the spawned thread.
269
    ///
270
    /// However, `Blocking::spawn_blocking` *does not guarantee*
271
    /// to use a completely fresh thread.
272
    /// The implementation may have a thread pool, allowing it reuse an existing thread.
273
    /// Correspondingly, if a very large number of `Blocking::spawn_blocking` calls,
274
    /// are in progress at once, some of them may block.
275
    /// (For example, the implementation for Tokio uses `tokio::task::spawn_blocking`,
276
    /// which has both of these properties.)
277
    ///
278
    /// ### Typical use of `spawn_blocking`
279
    ///
280
    ///  * Spawn the thread with `SpawnThread::spawn_blocking`.
281
    ///  * On that thread, receive work items from from the async environment
282
    ///    using async inter-task facilities (eg `futures::channel::mpsc::channel`),
283
    ///    called via [`reenter_block_on`](Blocking::reenter_block_on).
284
    ///  * Return answers with async inter-task facilities, calling either
285
    ///    a non-blocking immediate send (eg `[try_send`])
286
    ///    or an async send call via `reneter_block_on`.
287
    ///
288
    /// ### CPU-intensive work
289
    ///
290
    /// Limit the number of CPU-intensive concurrent threads spawned with `spawn_blocking`.
291
    /// See the [trait-level docs](Blocking) for more details.
292
    ///
293
    /// ### Panics
294
    ///
295
    /// `Blocking::spawn_blocking` may only be called from within either:
296
    ///
297
    ///  * A task or future being polled by this `Runtime`; or
298
    ///  * A thread itself spawned with `Blocking::spawn_blocking` on the this runtime.
299
    ///
300
    /// Otherwise it may malfunction or panic.
301
    /// (`tor_rtmock::MockExecutor`'s implementation will usually detect violations.)
302
    ///
303
    /// If `f` panics, `ThreadHandle` will also panic when polled
304
    /// (perhaps using `resume_unwind`).
305
    fn spawn_blocking<F, T>(&self, f: F) -> Self::ThreadHandle<T>
306
    where
307
        F: FnOnce() -> T + Send + 'static,
308
        T: Send + 'static;
309

            
310
    /// Future from [`spawn_blocking`](Self::spawn_blocking)
311
    ///
312
    /// The function callback (`f: F` in [`spawn_blocking`](Self::spawn_blocking)
313
    /// will start to run regardless of whether this future is awaited.
314
    ///
315
    /// Dropping this future doesn't stop the callback; it detaches it:
316
    /// the function will continue to run, but its output can no longer be collected.
317
    type ThreadHandle<T: Send + 'static>: Future<Output = T>;
318

            
319
    /// Block on a future, from within `Blocking::spawn_blocking`
320
    ///
321
    /// Reenters the executor, blocking this thread until `future` is `Ready`.
322
    ///
323
    /// See [`spawn_blocking`](Blocking::spawn_blocking) and
324
    /// [`Blocking`]'s trait-level docs for more details.
325
    ///
326
    /// It is not guaranteed what thread the future will be polled on.
327
    /// In production `Runtime`s, it will usually be the thread calling `reenter_block_on`.
328
    // All existing runtimes other than MockExecutor accept a non-Send future, but
329
    // MockExecutor::subthread_block_on_future does not.
330
    // If this restriction turns out to be awkward, MockExecutor could be changed, with some work.
331
    ///
332
    /// ### Panics
333
    ///
334
    /// Must only be called on a thread made with `Blocking::spawn_blocking`.
335
    /// **Not** allowed within [`blocking_io`](Blocking::blocking_io).
336
    ///
337
    /// Otherwise it may malfunction or panic.
338
    /// (`tor_rtmock::MockExecutor`'s implementation will usually detect violations.)
339
    fn reenter_block_on<F>(&self, future: F) -> F::Output
340
    where
341
        F: Future,
342
        F::Output: Send + 'static;
343

            
344
    /// Perform some blocking IO from an async future
345
    ///
346
    /// Call the blocking function `f`, informing the async executor
347
    /// that we are going to perform blocking IO.
348
    ///
349
    /// This is a usually-faster, but simpler, alternative to [`Blocking::spawn_blocking`].
350
    ///
351
    /// Its API can be more convenient than `spawn_blocking`.
352
    /// `blocking_io` is intended to be more performant than `spawn_blocking`
353
    /// when called repeatedly (ie, when switching quickly between sync and async).
354
    ///
355
    /// See [`Blocking`]'s trait-level docs for more information about
356
    /// the performance properties, and on choosing between `blocking_io`
357
    /// and `spawn_blocking`.
358
    /// (Avoid using `blocking_io` for CPU-intensive work.)
359
    ///
360
    /// ### Limitations
361
    ///
362
    ///  * `f` may **not** call [`Blocking::reenter_block_on`], so:
363
    ///  * `f` cannot execute any futures.
364
    ///    If this is needed, break up `f` into smaller pieces so that the
365
    ///    futures can be awaited outside the call to `blocking_io`,
366
    ///    or use `spawn_blocking` for the whole activity.
367
    ///  * `f` *may* be called on the calling thread when `blocking_io` is called,
368
    ///    on an executor thread when the returned future is polled,
369
    ///    or a different thread.
370
    ///  * Not suitable for CPU-intensive work
371
    ///    (mostly because there is no practical way to ration or limit
372
    ///    the amount of cpu time used).
373
    ///    Use `spawn_blocking` for that.
374
    ///  * Performance better than using `spawn_blocking` each time is not guaranteed.
375
    ///
376
    /// Otherwise the semantics are the same as
377
    /// [`spawn_blocking`](Self::spawn_blocking).
378
    ///
379
    /// ### Panics
380
    ///
381
    /// `Blocking::block_in_place` may only be called from within
382
    /// a task or future being polled by this `Runtime`.
383
    ///
384
    /// Otherwise it may malfunction or panic.
385
    /// (`tor_rtmock::MockExecutor`'s implementation will usually detect violations.)
386
    ///
387
    /// ### Fallback (provided) implementation
388
    ///
389
    /// The fallback implementation is currently used with `async_std`.
390
    /// It spawns a thread with `spawn_blocking`, once for each `blocking_io` call.
391
    fn blocking_io<F, T>(&self, f: F) -> impl Future<Output = T>
392
    where
393
        F: FnOnce() -> T + Send + 'static,
394
        T: Send + 'static,
395
    {
396
        self.spawn_blocking(f)
397
    }
398
}
399

            
400
/// Extension trait for [`Spawn`].
401
///
402
/// This is very similar to, and preferred over, [`futures::task::SpawnExt`].
403
/// Unlike `futures::task::SpawnExt`, it is compatible with tokio-console,
404
/// and preserves span information for `tracing`.
405
// If https://github.com/rust-lang/futures-rs/issues/2977 is ever addressed,
406
// we can consider transitioning back to `futures::task::SpawnExt`.
407
pub trait SpawnExt: Spawn {
408
    /// Spawns a task that polls the given future with output `()` to completion.
409
    ///
410
    /// See [`futures::task::SpawnExt::spawn`].
411
    #[track_caller]
412
4312
    fn spawn<Fut>(&self, future: Fut) -> Result<(), SpawnError>
413
4312
    where
414
4312
        Fut: Future<Output = ()> + Send + 'static,
415
    {
416
        use tracing::Instrument as _;
417
4312
        self.spawn_obj(Box::new(future.in_current_span()).into())
418
4312
    }
419

            
420
    /// Spawns a task that polls the given future to completion and returns a future that resolves
421
    /// to the spawned future’s output.
422
    ///
423
    /// See [`futures::task::SpawnExt::spawn_with_handle`].
424
    #[track_caller]
425
580
    fn spawn_with_handle<Fut>(
426
580
        &self,
427
580
        future: Fut,
428
580
    ) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
429
580
    where
430
580
        Fut: Future + Send + 'static,
431
580
        <Fut as Future>::Output: Send,
432
    {
433
580
        let (future, handle) = future.remote_handle();
434
580
        self.spawn(future)?;
435
580
        Ok(handle)
436
580
    }
437
}
438

            
439
impl<T: Spawn> SpawnExt for T {}
440

            
441
/// Trait providing additional operations on network sockets.
442
pub trait StreamOps {
443
    /// Set the [`TCP_NOTSENT_LOWAT`] socket option, if this `Stream` is a TCP stream.
444
    ///
445
    /// Implementations should return an [`UnsupportedStreamOp`] IO error
446
    /// if the stream is not a TCP stream,
447
    /// and on platforms where the operation is not supported.
448
    ///
449
    /// [`TCP_NOTSENT_LOWAT`]: https://lwn.net/Articles/560082/
450
    fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
451
        Err(UnsupportedStreamOp {
452
            op: "set_tcp_notsent_lowat",
453
            reason: "unsupported object type",
454
        }
455
        .into())
456
    }
457

            
458
    /// Return a new handle that implements [`StreamOps`],
459
    /// and that can be used independently of `self`.
460
2
    fn new_handle(&self) -> Box<dyn StreamOps + Send + Unpin> {
461
2
        Box::new(NoOpStreamOpsHandle)
462
2
    }
463
}
464

            
465
/// A [`StreamOps`] handle that always returns an error.
466
///
467
/// Returned from [`StreamOps::new_handle`] for types and platforms
468
/// that do not support `StreamOps`.
469
#[derive(Copy, Clone, Debug, Default)]
470
#[non_exhaustive]
471
pub struct NoOpStreamOpsHandle;
472

            
473
impl StreamOps for NoOpStreamOpsHandle {
474
    fn new_handle(&self) -> Box<dyn StreamOps + Send + Unpin> {
475
        Box::new(*self)
476
    }
477
}
478

            
479
impl<T: StreamOps, C> StreamOps for Framed<T, C> {
480
    fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
481
        let inner: &T = self;
482
        inner.set_tcp_notsent_lowat(notsent_lowat)
483
    }
484

            
485
4
    fn new_handle(&self) -> Box<dyn StreamOps + Send + Unpin> {
486
4
        let inner: &T = self;
487
4
        inner.new_handle()
488
4
    }
489
}
490

            
491
/// Error: Tried to perform a [`StreamOps`] operation on an unsupported stream type
492
/// or on an unsupported platform.
493
///
494
/// (For example, you can't call [`StreamOps::set_tcp_notsent_lowat`] on Windows
495
/// or on a stream type that is not backed by a TCP socket.)
496
#[derive(Clone, Debug, thiserror::Error)]
497
#[error("Operation {op} not supported: {reason}")]
498
pub struct UnsupportedStreamOp {
499
    /// The unsupported operation.
500
    op: &'static str,
501
    /// The reason the operation is unsupported.
502
    reason: &'static str,
503
}
504

            
505
impl UnsupportedStreamOp {
506
    /// Construct a new `UnsupportedStreamOp` error with the provided operation and reason.
507
    pub fn new(op: &'static str, reason: &'static str) -> Self {
508
        Self { op, reason }
509
    }
510
}
511

            
512
impl From<UnsupportedStreamOp> for io::Error {
513
    fn from(value: UnsupportedStreamOp) -> Self {
514
        io::Error::new(io::ErrorKind::Unsupported, value)
515
    }
516
}
517

            
518
/// Trait for a runtime that can create and accept connections
519
/// over network sockets.
520
///
521
/// (In Arti we use the [`AsyncRead`] and [`AsyncWrite`] traits from
522
/// [`futures::io`] as more standard, even though the ones from Tokio
523
/// can be a bit more efficient.  Let's hope that they converge in the
524
/// future.)
525
// TODO: Use of async_trait is not ideal, since we have to box with every
526
// call.  Still, async_io basically makes that necessary :/
527
#[async_trait]
528
pub trait NetStreamProvider<ADDR = net::SocketAddr>: Clone + Send + Sync + 'static {
529
    /// The type for the connections returned by [`Self::connect()`].
530
    type Stream: AsyncRead + AsyncWrite + StreamOps + Send + Sync + Unpin + 'static;
531
    /// The type for the listeners returned by [`Self::listen()`].
532
    type Listener: NetStreamListener<ADDR, Stream = Self::Stream> + Send + Sync + Unpin + 'static;
533

            
534
    /// Launch a connection connection to a given socket address.
535
    ///
536
    /// Note that unlike `std::net:TcpStream::connect`, we do not accept
537
    /// any types other than a single `ADDR`.  We do this because
538
    /// we must be absolutely sure not to perform
539
    /// unnecessary DNS lookups.
540
    async fn connect(&self, addr: &ADDR) -> IoResult<Self::Stream>;
541

            
542
    /// Open a listener on a given socket address.
543
    async fn listen(&self, addr: &ADDR) -> IoResult<Self::Listener>;
544
}
545

            
546
/// Trait for a local socket that accepts incoming streams.
547
///
548
/// These objects are returned by instances of [`NetStreamProvider`].  To use
549
/// one,
550
/// use `incoming` to convert this object into a [`stream::Stream`].
551
pub trait NetStreamListener<ADDR = net::SocketAddr> {
552
    /// The type of connections returned by [`Self::incoming()`].
553
    type Stream: AsyncRead + AsyncWrite + StreamOps + Send + Sync + Unpin + 'static;
554

            
555
    /// The type of [`stream::Stream`] returned by [`Self::incoming()`].
556
    type Incoming: stream::Stream<Item = IoResult<(Self::Stream, ADDR)>>
557
        + Send
558
        + Sync
559
        + Unpin
560
        + 'static;
561

            
562
    /// Wrap this listener into a new [`stream::Stream`] that yields
563
    /// streams and addresses.
564
    fn incoming(self) -> Self::Incoming;
565

            
566
    /// Return the local address that this listener is bound to.
567
    fn local_addr(&self) -> IoResult<ADDR>;
568
}
569

            
570
/// Trait for a runtime that can send and receive UDP datagrams.
571
#[async_trait]
572
pub trait UdpProvider: Clone + Send + Sync + 'static {
573
    /// The type of Udp Socket returned by [`Self::bind()`]
574
    type UdpSocket: UdpSocket + Send + Sync + Unpin + 'static;
575

            
576
    /// Bind a local port to send and receive packets from
577
    async fn bind(&self, addr: &net::SocketAddr) -> IoResult<Self::UdpSocket>;
578
}
579

            
580
/// Trait for a locally bound Udp socket that can send and receive datagrams.
581
///
582
/// These objects are returned by instances of [`UdpProvider`].
583
//
584
// NOTE that UdpSocket objects are _necessarily_ un-connected.  If you need to
585
// implement a connected Udp socket in the future, please make a new trait (and
586
// a new type.)
587
#[async_trait]
588
pub trait UdpSocket {
589
    /// Wait for an incoming datagram; return it along its address.
590
    async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, net::SocketAddr)>;
591
    /// Send a datagram to the provided address.
592
    async fn send(&self, buf: &[u8], target: &net::SocketAddr) -> IoResult<usize>;
593
    /// Return the local address that this socket is bound to.
594
    fn local_addr(&self) -> IoResult<net::SocketAddr>;
595
}
596

            
597
/// An object with a peer certificate: typically a TLS connection.
598
pub trait CertifiedConn {
599
    /// Return the keying material (RFC 5705) given a label and an optional context.
600
    fn export_keying_material(
601
        &self,
602
        len: usize,
603
        label: &[u8],
604
        context: Option<&[u8]>,
605
    ) -> IoResult<Vec<u8>>;
606
    /// Try to return the (DER-encoded) peer certificate for this
607
    /// connection, if any.
608
    fn peer_certificate(&self) -> IoResult<Option<Cow<'_, [u8]>>>;
609

            
610
    /// Try to return the (DER-encoded) link certificate (if any) containing
611
    /// the key we used to authenticate this connection.
612
    ///
613
    /// Ordinarily, this will return a certificate for server connections,
614
    /// and None for client connections.
615
    //
616
    // NOTE: (The correct return value in the _absence_ of a certificate is None.
617
    // Later, if we support optional certificates for clients,
618
    // the place to return an Unsupported error would be
619
    // from whatever function tries to set such a certificate.)
620
    fn own_certificate(&self) -> IoResult<Option<Cow<'_, [u8]>>>;
621
}
622

            
623
/// An object that knows how to wrap a TCP connection (where the type of said TCP
624
/// connection is `S`) with TLS.
625
///
626
/// # Usage notes
627
///
628
/// Note that because of Tor's peculiarities, this is not a
629
/// general-purpose TLS type.  Unlike typical users, Tor does not want
630
/// its TLS library to check whether the certificates used in TLS are signed
631
/// within the web PKI hierarchy, or what their hostnames are, or even whether
632
/// they are valid.  It *does*, however, check that the subject public key in the
633
/// certificate is indeed correctly used to authenticate the TLS handshake.
634
///
635
/// If you are implementing something other than Tor, this is **not** the
636
/// functionality you want.
637
///
638
/// How can this behavior be remotely safe, even in Tor?  It only works for Tor
639
/// because the certificate that a Tor relay uses in TLS is not actually being
640
/// used to certify that relay's public key.  Instead, the certificate only used
641
/// as a container for the relay's public key.  The real certification happens
642
/// later, inside the TLS session, when the relay presents a CERTS cell.
643
///
644
/// Such sneakiness was especially necessary before TLS 1.3, which encrypts more
645
/// of the handshake, and before pluggable transports, which make
646
/// "innocuous-looking TLS handshakes" less important than they once were.  Once
647
/// TLS 1.3 is completely ubiquitous, we might be able to specify a simpler link
648
/// handshake than Tor uses now.
649
#[async_trait]
650
pub trait TlsConnector<S> {
651
    /// The type of connection returned by this connector
652
    type Conn: AsyncRead + AsyncWrite + CertifiedConn + Unpin + Send + 'static;
653

            
654
    /// Start a TLS session over the provided TCP stream `stream`.
655
    ///
656
    /// For a client connection,
657
    /// declare `sni_hostname` as the desired hostname, but don't actually check
658
    /// whether the hostname in the certificate matches it.  The connector may
659
    /// send `sni_hostname` as part of its handshake, if it supports
660
    /// [SNI](https://en.wikipedia.org/wiki/Server_Name_Indication) or one of
661
    /// the TLS 1.3 equivalents.
662
    ///
663
    /// (For a server connection, `sni_hostname` is ignored.)
664
    async fn negotiate_unvalidated(&self, stream: S, sni_hostname: &str) -> IoResult<Self::Conn>;
665
}
666

            
667
/// Trait for a runtime that knows how to create TLS connections over
668
/// TCP streams of type `S`.
669
///
670
/// This is separate from [`TlsConnector`] because eventually we may
671
/// eventually want to support multiple `TlsConnector` implementations
672
/// that use a single [`Runtime`].
673
///
674
/// See the [`TlsConnector`] documentation for a discussion of the Tor-specific
675
/// limitations of this trait: If you are implementing something other than Tor,
676
/// this is **not** the functionality you want.
677
pub trait TlsProvider<S: StreamOps>: Clone + Send + Sync + 'static {
678
    /// The Connector object that this provider can return.
679
    type Connector: TlsConnector<S, Conn = Self::TlsStream> + Send + Sync + Unpin;
680

            
681
    /// The type of the stream returned by that connector.
682
    type TlsStream: AsyncRead + AsyncWrite + StreamOps + CertifiedConn + Unpin + Send + 'static;
683

            
684
    /// The Acceptor object that this provider can return, for handling incoming connections.
685
    type Acceptor: TlsConnector<S, Conn = Self::TlsServerStream> + Send + Sync + Unpin;
686

            
687
    /// The type of stream returned by that Acceptor.
688
    type TlsServerStream: AsyncRead
689
        + AsyncWrite
690
        + StreamOps
691
        + CertifiedConn
692
        + Unpin
693
        + Send
694
        + 'static;
695

            
696
    /// Return a TLS connector for use with this runtime.
697
    fn tls_connector(&self) -> Self::Connector;
698

            
699
    /// Return a TLS acceptor for use with this runtime.
700
    ///
701
    /// Not every [`TlsProvider`] supports this method.
702
    /// For those that do, this method is only supported
703
    /// when `tor-rtcompat` is built with the `tls-server` feature.
704
    /// When this method is unsupported, it returns an error.
705
    fn tls_acceptor(&self, settings: TlsAcceptorSettings) -> IoResult<Self::Acceptor>;
706

            
707
    /// Return true iff the keying material exporters (RFC 5705) is supported.
708
    fn supports_keying_material_export(&self) -> bool;
709
}
710

            
711
/// Settings used for constructing a TlsAcceptor.
712
#[derive(Debug, Clone)]
713
#[non_exhaustive]
714
pub struct TlsAcceptorSettings {
715
    /// The certificates and keys for this acceptor.
716
    #[cfg(feature = "tls-server")]
717
    pub(crate) identity: TlsKeyAndCert,
718

            
719
    /// Indicates that this type can not be constructed.
720
    #[cfg(not(feature = "tls-server"))]
721
    unconstructable: void::Void,
722
    //
723
    // TODO: Add support for additional certificates in a chain.
724
    // TODO: Possibly, add support for PEM.
725
}
726

            
727
impl TlsAcceptorSettings {
728
    /// Create a new TlsAcceptorSettings from a certificate and its associated private key,
729
    /// both in DER format.
730
    ///
731
    /// Does not perform full (or even, necessarily, any) validation.
732
    //
733
    // TODO: It would be great to take a tor_cert::x509::TlsKeyAndCert instead,
734
    // but that would (apparently) introduce a dependency cycle.  It would be cool to figure out how
735
    // to invert that.
736
    #[allow(clippy::unnecessary_wraps)]
737
    #[cfg(feature = "tls-server")]
738
14
    pub fn new(identity: TlsKeyAndCert) -> std::io::Result<Self> {
739
14
        Ok(Self { identity })
740
14
    }
741

            
742
    /// Return the primary certificate for this [`TlsAcceptorSettings`], in DER format.
743
6
    pub fn cert_der(&self) -> &[u8] {
744
        #[cfg(not(feature = "tls-server"))]
745
        {
746
            void::unreachable(self.unconstructable);
747
        }
748
        #[cfg(feature = "tls-server")]
749
6
        self.identity.certificates_der()[0]
750
6
    }
751
}
752

            
753
/// An error returned by TlsProvider::tls_acceptor when the TlsProvider does not have TLS server support.
754
#[derive(Clone, Debug, thiserror::Error)]
755
#[non_exhaustive]
756
#[error("This TlsProvider does not support running as a server")]
757
pub struct TlsServerUnsupported {}
758

            
759
impl From<TlsServerUnsupported> for io::Error {
760
8
    fn from(value: TlsServerUnsupported) -> Self {
761
8
        io::Error::new(io::ErrorKind::Unsupported, value)
762
8
    }
763
}