1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
#![deny(clippy::string_slice)] // See arti#2571
48
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
49

            
50
// TODO #1645 (either remove this, or decide to have it everywhere)
51
#![cfg_attr(not(all(feature = "full")), allow(unused))]
52

            
53
#[cfg(all(
54
    any(feature = "native-tls", feature = "rustls"),
55
    any(feature = "async-std", feature = "tokio", feature = "smol")
56
))]
57
pub(crate) mod impls;
58
pub mod task;
59

            
60
mod coarse_time;
61
mod compound;
62
mod dyn_time;
63
pub mod general;
64
mod network;
65
mod opaque;
66
pub mod scheduler;
67
mod timer;
68
mod traits;
69
pub mod unimpl;
70
pub mod unix;
71

            
72
#[cfg(any(feature = "async-std", feature = "tokio", feature = "smol"))]
73
use std::io;
74
pub use traits::{
75
    Blocking, CertifiedConn, CoarseTimeProvider, NetStreamListener, NetStreamProvider,
76
    NoOpStreamOpsHandle, Runtime, SleepProvider, SpawnExt, StreamOps, TlsProvider, ToplevelBlockOn,
77
    ToplevelRuntime, UdpProvider, UdpSocket, UnsupportedStreamOp,
78
};
79

            
80
pub use coarse_time::{CoarseDuration, CoarseInstant, RealCoarseTimeProvider};
81
pub use dyn_time::DynTimeProvider;
82
pub use network::{
83
    CommonConnectOptions, CommonListenOptions, TcpConnectOptions, TcpListenOptions,
84
    UnixConnectOptions, UnixListenOptions,
85
};
86
pub use timer::{SleepProviderExt, Timeout, TimeoutError};
87

            
88
/// Traits used to describe TLS connections and objects that can
89
/// create them.
90
pub mod tls {
91
    #[cfg(all(
92
        any(feature = "native-tls", feature = "rustls"),
93
        any(feature = "async-std", feature = "tokio", feature = "smol")
94
    ))]
95
    pub use crate::impls::unimpl_tls::UnimplementedTls;
96
    pub use crate::traits::{
97
        CertifiedConn, TlsAcceptorSettings, TlsConnector, TlsServerUnsupported,
98
    };
99

            
100
    #[cfg(all(
101
        feature = "native-tls",
102
        any(feature = "tokio", feature = "async-std", feature = "smol")
103
    ))]
104
    pub use crate::impls::native_tls::NativeTlsProvider;
105
    #[cfg(all(
106
        feature = "rustls",
107
        any(feature = "tokio", feature = "async-std", feature = "smol")
108
    ))]
109
    pub use crate::impls::rustls::RustlsProvider;
110
    #[cfg(all(
111
        feature = "rustls",
112
        feature = "tls-server",
113
        any(feature = "tokio", feature = "async-std", feature = "smol")
114
    ))]
115
    pub use crate::impls::rustls::rustls_server::{RustlsAcceptor, RustlsServerStream};
116
}
117

            
118
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
119
pub mod tokio;
120

            
121
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "async-std"))]
122
pub mod async_std;
123

            
124
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "smol"))]
125
pub mod smol;
126

            
127
pub use compound::{CompoundRuntime, RuntimeSubstExt};
128

            
129
#[cfg(all(
130
    any(feature = "native-tls", feature = "rustls"),
131
    feature = "async-std",
132
    not(feature = "tokio")
133
))]
134
use async_std as preferred_backend_mod;
135
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
136
use tokio as preferred_backend_mod;
137

            
138
/// The runtime that we prefer to use, out of all the runtimes compiled into the
139
/// tor-rtcompat crate.
140
///
141
/// If `tokio` and `async-std` are both available, we prefer `tokio` for its
142
/// performance.
143
/// If `native_tls` and `rustls` are both available, we prefer `native_tls` since
144
/// it has been used in Arti for longer.
145
///
146
/// The process [**may not fork**](crate#do-not-fork)
147
/// (except, very carefully, before exec)
148
/// after creating this or any other `Runtime`.
149
#[cfg(all(
150
    any(feature = "native-tls", feature = "rustls"),
151
    any(feature = "async-std", feature = "tokio")
152
))]
153
#[derive(Clone)]
154
pub struct PreferredRuntime {
155
    /// The underlying runtime object.
156
    inner: preferred_backend_mod::PreferredRuntime,
157
}
158

            
159
#[cfg(all(
160
    any(feature = "native-tls", feature = "rustls"),
161
    any(feature = "async-std", feature = "tokio")
162
))]
163
crate::opaque::implement_opaque_runtime! {
164
    PreferredRuntime { inner : preferred_backend_mod::PreferredRuntime }
165
}
166

            
167
#[cfg(all(
168
    any(feature = "native-tls", feature = "rustls"),
169
    any(feature = "async-std", feature = "tokio")
170
))]
171
impl PreferredRuntime {
172
    /// Obtain a [`PreferredRuntime`] from the currently running asynchronous runtime.
173
    /// Generally, this is what you want.
174
    ///
175
    /// This tries to get a handle to a currently running asynchronous runtime, and
176
    /// wraps it; the returned [`PreferredRuntime`] isn't the same thing as the
177
    /// asynchronous runtime object itself (e.g. `tokio::runtime::Runtime`).
178
    ///
179
    /// # Panics
180
    ///
181
    /// When `tor-rtcompat` is compiled with the `tokio` feature enabled
182
    /// (regardless of whether the `async-std` feature is also enabled),
183
    /// panics if called outside of Tokio runtime context.
184
    /// See `tokio::runtime::Handle::current`.
185
    ///
186
    /// # Usage notes
187
    ///
188
    /// Once you have a runtime returned by this function, you should
189
    /// just create more handles to it via [`Clone`].
190
    ///
191
    /// # Limitations
192
    ///
193
    /// If the `tor-rtcompat` crate was compiled with `tokio` support,
194
    /// this function will never return a runtime based on `async_std`.
195
    ///
196
    /// The process [**may not fork**](crate#do-not-fork)
197
    /// (except, very carefully, before exec)
198
    /// after creating this or any other `Runtime`.
199
    //
200
    // ## Note to Arti developers
201
    //
202
    // We should never call this from inside other Arti crates, or from
203
    // library crates that want to support multiple runtimes!  This
204
    // function is for Arti _users_ who want to wrap some existing Tokio
205
    // or Async_std runtime as a [`Runtime`].  It is not for library
206
    // crates that want to work with multiple runtimes.
207
162
    pub fn current() -> io::Result<Self> {
208
162
        let rt = preferred_backend_mod::PreferredRuntime::current()?;
209

            
210
162
        Ok(Self { inner: rt })
211
162
    }
212

            
213
    /// Create and return a new instance of the default [`Runtime`].
214
    ///
215
    /// Generally you should call this function at most once, and then use
216
    /// [`Clone::clone()`] to create additional references to that runtime.
217
    ///
218
    /// Tokio users may want to avoid this function and instead obtain a runtime using
219
    /// [`PreferredRuntime::current`]: this function always _builds_ a runtime,
220
    /// and if you already have a runtime, that isn't what you want with Tokio.
221
    ///
222
    /// If you need more fine-grained control over a runtime, you can create it
223
    /// using an appropriate builder type or function.
224
    ///
225
    /// The process [**may not fork**](crate#do-not-fork)
226
    /// (except, very carefully, before exec)
227
    /// after creating this or any other `Runtime`.
228
    //
229
    // ## Note to Arti developers
230
    //
231
    // We should never call this from inside other Arti crates, or from
232
    // library crates that want to support multiple runtimes!  This
233
    // function is for Arti _users_ who want to wrap some existing Tokio
234
    // or Async_std runtime as a [`Runtime`].  It is not for library
235
    // crates that want to work with multiple runtimes.
236
2990
    pub fn create() -> io::Result<Self> {
237
2990
        let rt = preferred_backend_mod::PreferredRuntime::create()?;
238

            
239
2990
        Ok(Self { inner: rt })
240
2990
    }
241

            
242
    /// Helper to run a single test function in a freshly created runtime.
243
    ///
244
    /// # Panics
245
    ///
246
    /// Panics if we can't create this runtime.
247
    ///
248
    /// # Warning
249
    ///
250
    /// This API is **NOT** for consumption outside Arti. Semver guarantees are not provided.
251
    #[doc(hidden)]
252
100
    pub fn run_test<P, F, O>(func: P) -> O
253
100
    where
254
100
        P: FnOnce(Self) -> F,
255
100
        F: futures::Future<Output = O>,
256
    {
257
100
        let runtime = Self::create().expect("Failed to create runtime");
258
100
        runtime.clone().block_on(func(runtime))
259
100
    }
260
}
261

            
262
/// Helpers for test_with_all_runtimes
263
///
264
/// # Warning
265
///
266
/// This API is **NOT** for consumption outside Arti. Semver guarantees are not provided.
267
#[doc(hidden)]
268
pub mod testing__ {
269
    /// A trait for an object that might represent a test failure, or which
270
    /// might just be `()`.
271
    pub trait TestOutcome {
272
        /// Abort if the test has failed.
273
        fn check_ok(&self);
274
    }
275
    impl TestOutcome for () {
276
        fn check_ok(&self) {}
277
    }
278
    impl<E: std::fmt::Debug> TestOutcome for Result<(), E> {
279
        fn check_ok(&self) {
280
            self.as_ref().expect("Test failure");
281
        }
282
    }
283
}
284

            
285
/// Helper: define a macro that expands a token tree iff a pair of features are
286
/// both present.
287
macro_rules! declare_conditional_macro {
288
    ( $(#[$meta:meta])* macro $name:ident = ($f1:expr, $f2:expr) ) => {
289
        $( #[$meta] )*
290
        #[cfg(all(feature=$f1, feature=$f2))]
291
        #[macro_export]
292
        macro_rules! $name {
293
            ($tt:tt) => {
294
                $tt
295
            };
296
        }
297

            
298
        $( #[$meta] )*
299
        #[cfg(not(all(feature=$f1, feature=$f2)))]
300
        #[macro_export]
301
        macro_rules! $name {
302
            ($tt:tt) => {};
303
        }
304

            
305
        // Needed so that we can access this macro at this path, both within the
306
        // crate and without.
307
        pub use $name;
308
    };
309
}
310

            
311
/// Defines macros that will expand when certain runtimes are available.
312
#[doc(hidden)]
313
pub mod cond {
314
    declare_conditional_macro! {
315
        /// Expand a token tree if the TokioNativeTlsRuntime is available.
316
        #[doc(hidden)]
317
        macro if_tokio_native_tls_present = ("tokio", "native-tls")
318
    }
319
    declare_conditional_macro! {
320
        /// Expand a token tree if the TokioRustlsRuntime is available.
321
        #[doc(hidden)]
322
        macro if_tokio_rustls_present = ("tokio", "rustls")
323
    }
324
    declare_conditional_macro! {
325
        /// Expand a token tree if the TokioNativeTlsRuntime is available.
326
        #[doc(hidden)]
327
        macro if_async_std_native_tls_present = ("async-std", "native-tls")
328
    }
329
    declare_conditional_macro! {
330
        /// Expand a token tree if the TokioNativeTlsRuntime is available.
331
        #[doc(hidden)]
332
        macro if_async_std_rustls_present = ("async-std", "rustls")
333
    }
334
    declare_conditional_macro! {
335
        /// Expand a token tree if the SmolNativeTlsRuntime is available.
336
        #[doc(hidden)]
337
        macro if_smol_native_tls_present = ("smol", "native-tls")
338
    }
339
    declare_conditional_macro! {
340
        /// Expand a token tree if the SmolRustlsRuntime is available.
341
        #[doc(hidden)]
342
        macro if_smol_rustls_present = ("smol", "rustls")
343
    }
344
}
345

            
346
/// Run a test closure, passing as argument every supported runtime.
347
///
348
/// Usually, prefer `tor_rtmock::MockRuntime::test_with_various` to this.
349
/// Use this macro only when you need to interact with things
350
/// that `MockRuntime` can't handle,
351
///
352
/// If everything in your test case is supported by `MockRuntime`,
353
/// you should use that instead:
354
/// that will give superior test coverage *and* a (more) deterministic test.
355
///
356
/// (This is a macro so that it can repeat the closure as multiple separate
357
/// expressions, so it can take on two different types, if needed.)
358
//
359
// NOTE(eta): changing this #[cfg] can affect tests inside this crate that use
360
//            this macro, like in scheduler.rs
361
#[macro_export]
362
#[cfg(all(
363
    any(feature = "native-tls", feature = "rustls"),
364
    any(feature = "tokio", feature = "async-std", feature = "smol"),
365
))]
366
macro_rules! test_with_all_runtimes {
367
    ( $fn:expr ) => {{
368
        use $crate::cond::*;
369
        use $crate::testing__::TestOutcome;
370
        // We have to do this outcome-checking business rather than just using
371
        // the ? operator or calling expect() because some of the closures that
372
        // we use this macro with return (), and some return Result.
373

            
374
        if_tokio_native_tls_present! {{
375
           $crate::tokio::TokioNativeTlsRuntime::run_test($fn).check_ok();
376
        }}
377
        if_tokio_rustls_present! {{
378
            $crate::tokio::TokioRustlsRuntime::run_test($fn).check_ok();
379
        }}
380
        if_async_std_native_tls_present! {{
381
            $crate::async_std::AsyncStdNativeTlsRuntime::run_test($fn).check_ok();
382
        }}
383
        if_async_std_rustls_present! {{
384
            $crate::async_std::AsyncStdRustlsRuntime::run_test($fn).check_ok();
385
        }}
386
        if_smol_native_tls_present! {{
387
            $crate::smol::SmolNativeTlsRuntime::run_test($fn).check_ok();
388
        }}
389
        if_smol_rustls_present! {{
390
            $crate::smol::SmolRustlsRuntime::run_test($fn).check_ok();
391
        }}
392
    }};
393
}
394

            
395
/// Run a test closure, passing as argument one supported runtime.
396
///
397
/// Usually, prefer `tor_rtmock::MockRuntime::test_with_various` to this.
398
/// Use this macro only when you need to interact with things
399
/// that `MockRuntime` can't handle.
400
///
401
/// If everything in your test case is supported by `MockRuntime`,
402
/// you should use that instead:
403
/// that will give superior test coverage *and* a (more) deterministic test.
404
///
405
/// (Always prefers tokio if present.)
406
#[macro_export]
407
#[cfg(all(
408
    any(feature = "native-tls", feature = "rustls"),
409
    any(feature = "tokio", feature = "async-std"),
410
))]
411
macro_rules! test_with_one_runtime {
412
    ( $fn:expr ) => {{ $crate::PreferredRuntime::run_test($fn) }};
413
}
414

            
415
#[cfg(all(
416
    test,
417
    any(feature = "native-tls", feature = "rustls"),
418
    any(feature = "async-std", feature = "tokio", feature = "smol"),
419
    not(miri), // Many of these tests use real sockets or SystemTime.
420
))]
421
mod test {
422
    // @@ begin test lint list maintained by maint/add_warning @@
423
    #![allow(clippy::bool_assert_comparison)]
424
    #![allow(clippy::clone_on_copy)]
425
    #![allow(clippy::dbg_macro)]
426
    #![allow(clippy::mixed_attributes_style)]
427
    #![allow(clippy::print_stderr)]
428
    #![allow(clippy::print_stdout)]
429
    #![allow(clippy::single_char_pattern)]
430
    #![allow(clippy::unwrap_used)]
431
    #![allow(clippy::unchecked_time_subtraction)]
432
    #![allow(clippy::useless_vec)]
433
    #![allow(clippy::needless_pass_by_value)]
434
    #![allow(clippy::string_slice)] // See arti#2571
435
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
436
    #![allow(clippy::unnecessary_wraps)]
437
    use crate::SleepProviderExt;
438
    use crate::ToplevelRuntime;
439

            
440
    use crate::traits::*;
441

            
442
    use futures::io::{AsyncReadExt, AsyncWriteExt};
443
    use futures::stream::StreamExt;
444
    use native_tls_crate as native_tls;
445
    use std::io::Result as IoResult;
446
    use std::net::SocketAddr;
447
    use std::net::{Ipv4Addr, SocketAddrV4};
448
    use web_time_compat::{Duration, Instant, InstantExt, SystemTimeExt};
449

            
450
    // Test "sleep" with a tiny delay, and make sure that at least that
451
    // much delay happens.
452
    fn small_delay<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
453
        let rt = runtime.clone();
454
        runtime.block_on(async {
455
            let i1 = Instant::get();
456
            let one_msec = Duration::from_millis(1);
457
            rt.sleep(one_msec).await;
458
            let i2 = Instant::get();
459
            assert!(i2 >= i1 + one_msec);
460
        });
461
        Ok(())
462
    }
463

            
464
    // Try a timeout operation that will succeed.
465
    fn small_timeout_ok<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
466
        let rt = runtime.clone();
467
        runtime.block_on(async {
468
            let one_day = Duration::from_secs(86400);
469
            let outcome = rt.timeout(one_day, async { 413_u32 }).await;
470
            assert_eq!(outcome, Ok(413));
471
        });
472
        Ok(())
473
    }
474

            
475
    // Try a timeout operation that will time out.
476
    fn small_timeout_expire<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
477
        use futures::future::pending;
478

            
479
        let rt = runtime.clone();
480
        runtime.block_on(async {
481
            let one_micros = Duration::from_micros(1);
482
            let outcome = rt.timeout(one_micros, pending::<()>()).await;
483
            assert_eq!(outcome, Err(crate::TimeoutError));
484
            assert_eq!(
485
                outcome.err().unwrap().to_string(),
486
                "Timeout expired".to_string()
487
            );
488
        });
489
        Ok(())
490
    }
491
    // Try a little wallclock delay.
492
    //
493
    // NOTE: This test will fail if the clock jumps a lot while it's
494
    // running.  We should use simulated time instead.
495
    fn tiny_wallclock<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
496
        let rt = runtime.clone();
497
        runtime.block_on(async {
498
            let i1 = Instant::get();
499
            let now = runtime.wallclock();
500
            let one_millis = Duration::from_millis(1);
501
            let one_millis_later = now + one_millis;
502

            
503
            rt.sleep_until_wallclock(one_millis_later).await;
504

            
505
            let i2 = Instant::get();
506
            let newtime = runtime.wallclock();
507
            assert!(newtime >= one_millis_later);
508
            assert!(i2 - i1 >= one_millis);
509
        });
510
        Ok(())
511
    }
512

            
513
    // Try connecting to ourself and sending a little data.
514
    //
515
    // NOTE: requires Ipv4 localhost.
516
    fn self_connect_tcp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
517
        let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
518
        let rt1 = runtime.clone();
519

            
520
        let listen_options = Default::default();
521
        let listener =
522
            runtime.block_on(rt1.listen(&(SocketAddr::from(localhost)), &listen_options))?;
523
        let addr = listener.local_addr()?;
524

            
525
        runtime.block_on(async {
526
            let task1 = async {
527
                let mut buf = vec![0_u8; 11];
528
                let (mut con, _addr) = listener.incoming().next().await.expect("closed?")?;
529
                con.read_exact(&mut buf[..]).await?;
530
                IoResult::Ok(buf)
531
            };
532
            let task2 = async {
533
                let connect_options = Default::default();
534
                let mut con = rt1.connect(&addr, &connect_options).await?;
535
                con.write_all(b"Hello world").await?;
536
                con.flush().await?;
537
                IoResult::Ok(())
538
            };
539

            
540
            let (data, send_r) = futures::join!(task1, task2);
541
            send_r?;
542

            
543
            assert_eq!(&data?[..], b"Hello world");
544

            
545
            Ok(())
546
        })
547
    }
548

            
549
    // Try connecting to ourself and sending a little data.
550
    //
551
    // NOTE: requires Ipv4 localhost.
552
    fn self_connect_udp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
553
        let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
554
        let rt1 = runtime.clone();
555

            
556
        let socket1 = runtime.block_on(rt1.bind(&(localhost.into())))?;
557
        let addr1 = socket1.local_addr()?;
558

            
559
        let socket2 = runtime.block_on(rt1.bind(&(localhost.into())))?;
560
        let addr2 = socket2.local_addr()?;
561

            
562
        runtime.block_on(async {
563
            let task1 = async {
564
                let mut buf = [0_u8; 16];
565
                let (len, addr) = socket1.recv(&mut buf[..]).await?;
566
                IoResult::Ok((buf[..len].to_vec(), addr))
567
            };
568
            let task2 = async {
569
                socket2.send(b"Hello world", &addr1).await?;
570
                IoResult::Ok(())
571
            };
572

            
573
            let (recv_r, send_r) = futures::join!(task1, task2);
574
            send_r?;
575
            let (buff, addr) = recv_r?;
576
            assert_eq!(addr2, addr);
577
            assert_eq!(&buff, b"Hello world");
578

            
579
            Ok(())
580
        })
581
    }
582

            
583
    // Try out our incoming connection stream code.
584
    //
585
    // We launch a few connections and make sure that we can read data on
586
    // them.
587
    fn listener_stream<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
588
        let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
589
        let rt1 = runtime.clone();
590

            
591
        let listen_options = Default::default();
592
        let listener = runtime
593
            .block_on(rt1.listen(&SocketAddr::from(localhost), &listen_options))
594
            .unwrap();
595
        let addr = listener.local_addr().unwrap();
596
        let mut stream = listener.incoming();
597

            
598
        runtime.block_on(async {
599
            let task1 = async {
600
                let mut n = 0_u32;
601
                loop {
602
                    let (mut con, _addr) = stream.next().await.unwrap()?;
603
                    let mut buf = [0_u8; 11];
604
                    con.read_exact(&mut buf[..]).await?;
605
                    n += 1;
606
                    if &buf[..] == b"world done!" {
607
                        break IoResult::Ok(n);
608
                    }
609
                }
610
            };
611
            let task2 = async {
612
                let connect_options = Default::default();
613
                for _ in 0_u8..5 {
614
                    let mut con = rt1.connect(&addr, &connect_options).await?;
615
                    con.write_all(b"Hello world").await?;
616
                    con.flush().await?;
617
                }
618
                let mut con = rt1.connect(&addr, &connect_options).await?;
619
                con.write_all(b"world done!").await?;
620
                con.flush().await?;
621
                con.close().await?;
622
                IoResult::Ok(())
623
            };
624

            
625
            let (n, send_r) = futures::join!(task1, task2);
626
            send_r?;
627

            
628
            assert_eq!(n?, 6);
629

            
630
            Ok(())
631
        })
632
    }
633

            
634
    // Try listening on an address and connecting there, except using TLS.
635
    //
636
    // Note that since we didn't have TLS server support when this test was first written,
637
    // we're going to use a thread.
638
    fn simple_tls<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
639
        /*
640
         A simple expired self-signed rsa-2048 certificate.
641

            
642
         Generated by running the make-cert.c program in tor-rtcompat/test-data-helper,
643
         and then making a PFX file using
644

            
645
         openssl pkcs12 -export -certpbe PBE-SHA1-3DES -out test.pfx -inkey test.key -in test.crt
646

            
647
         The password is "abc".
648
        */
649
        static PFX_ID: &[u8] = include_bytes!("test.pfx");
650
        // Note that we need to set a password on the pkcs12 file, since apparently
651
        // OSX doesn't support pkcs12 with empty passwords. (That was arti#111).
652
        static PFX_PASSWORD: &str = "abc";
653

            
654
        let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
655
        let listener = std::net::TcpListener::bind(localhost)?;
656
        let addr = listener.local_addr()?;
657

            
658
        let identity = native_tls::Identity::from_pkcs12(PFX_ID, PFX_PASSWORD).unwrap();
659

            
660
        // See note on function for why we're using a thread here.
661
        let th = std::thread::spawn(move || {
662
            // Accept a single TLS connection and run an echo server
663
            use std::io::{Read, Write};
664
            let acceptor = native_tls::TlsAcceptor::new(identity).unwrap();
665
            let (con, _addr) = listener.accept()?;
666
            let mut con = acceptor.accept(con).unwrap();
667
            let mut buf = [0_u8; 16];
668
            loop {
669
                let n = con.read(&mut buf)?;
670
                if n == 0 {
671
                    break;
672
                }
673
                con.write_all(&buf[..n])?;
674
            }
675
            IoResult::Ok(())
676
        });
677

            
678
        let connector = runtime.tls_connector();
679

            
680
        runtime.block_on(async {
681
            let text = b"I Suddenly Dont Understand Anything";
682
            let mut buf = vec![0_u8; text.len()];
683
            let connect_options = Default::default();
684
            let conn = runtime.connect(&addr, &connect_options).await?;
685
            let mut conn = connector.negotiate_unvalidated(conn, "Kan.Aya").await?;
686
            assert!(conn.peer_certificate()?.is_some());
687
            conn.write_all(text).await?;
688
            conn.flush().await?;
689
            conn.read_exact(&mut buf[..]).await?;
690
            assert_eq!(&buf[..], text);
691
            conn.close().await?;
692
            IoResult::Ok(())
693
        })?;
694

            
695
        th.join().unwrap()?;
696
        IoResult::Ok(())
697
    }
698

            
699
    fn simple_tls_server<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
700
        let mut rng = tor_basic_utils::test_rng::testing_rng();
701
        let tls_cert = tor_cert_x509::TlsKeyAndCert::create(
702
            &mut rng,
703
            std::time::SystemTime::get(),
704
            "prospit.example.org",
705
            "derse.example.org",
706
        )
707
        .unwrap();
708
        let cert = tls_cert.certificates_der()[0].to_vec();
709
        let settings = TlsAcceptorSettings::new(tls_cert).unwrap();
710

            
711
        let Ok(tls_acceptor) = runtime.tls_acceptor(settings) else {
712
            println!("Skipping tls-server test for runtime {:?}", &runtime);
713
            return IoResult::Ok(());
714
        };
715
        println!("Running tls-server test for runtime {:?}", &runtime);
716

            
717
        let tls_connector = runtime.tls_connector();
718

            
719
        let localhost: SocketAddr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into();
720
        let rt1 = runtime.clone();
721

            
722
        let msg = b"Derse Reviles Him And Outlaws Frogs Wherever They Can";
723
        runtime.block_on(async move {
724
            let listen_options = Default::default();
725
            let listener = runtime.listen(&localhost, &listen_options).await.unwrap();
726
            let address = listener.local_addr().unwrap();
727

            
728
            let h1 = runtime
729
                .spawn_with_handle(async move {
730
                    let conn = listener.incoming().next().await.unwrap().unwrap().0;
731
                    let mut conn = tls_acceptor.negotiate_unvalidated(conn, "").await.unwrap();
732

            
733
                    let mut buf = vec![];
734
                    conn.read_to_end(&mut buf).await.unwrap();
735
                    (buf, conn.own_certificate().unwrap().unwrap().into_owned())
736
                })
737
                .unwrap();
738

            
739
            let h2 = runtime
740
                .spawn_with_handle(async move {
741
                    let connect_options = Default::default();
742
                    let conn = rt1.connect(&address, &connect_options).await.unwrap();
743
                    let mut conn = tls_connector
744
                        .negotiate_unvalidated(conn, "prospit.example.org")
745
                        .await
746
                        .unwrap();
747
                    conn.write_all(msg).await.unwrap();
748
                    conn.close().await.unwrap();
749
                    conn.peer_certificate().unwrap().unwrap().into_owned()
750
                })
751
                .unwrap();
752

            
753
            let (received, server_own_cert) = h1.await;
754
            let client_peer_cert = h2.await;
755
            assert_eq!(received, msg);
756
            assert_eq!(&server_own_cert, &cert);
757
            assert_eq!(&client_peer_cert, &cert);
758
        });
759
        IoResult::Ok(())
760
    }
761

            
762
    macro_rules! tests_with_runtime {
763
        { $runtime:expr  => $($id:ident),* $(,)? } => {
764
            $(
765
                #[test]
766
                fn $id() -> std::io::Result<()> {
767
                    super::$id($runtime)
768
                }
769
            )*
770
        }
771
    }
772

            
773
    macro_rules! runtime_tests {
774
        { $($id:ident),* $(,)? } =>
775
        {
776
           #[cfg(feature="tokio")]
777
            mod tokio_runtime_tests {
778
                tests_with_runtime! { &crate::tokio::PreferredRuntime::create()? => $($id),* }
779
            }
780
            #[cfg(feature="async-std")]
781
            mod async_std_runtime_tests {
782
                tests_with_runtime! { &crate::async_std::PreferredRuntime::create()? => $($id),* }
783
            }
784
            #[cfg(feature="smol")]
785
            mod smol_runtime_tests {
786
                tests_with_runtime! { &crate::smol::PreferredRuntime::create()? => $($id),* }
787
            }
788
            mod default_runtime_tests {
789
                tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
790
            }
791
        }
792
    }
793

            
794
    macro_rules! tls_runtime_tests {
795
        { $($id:ident),* $(,)? } =>
796
        {
797
            #[cfg(all(feature="tokio", feature = "native-tls"))]
798
            mod tokio_native_tls_tests {
799
                tests_with_runtime! { &crate::tokio::TokioNativeTlsRuntime::create()? => $($id),* }
800
            }
801
            #[cfg(all(feature="async-std", feature = "native-tls"))]
802
            mod async_std_native_tls_tests {
803
                tests_with_runtime! { &crate::async_std::AsyncStdNativeTlsRuntime::create()? => $($id),* }
804
            }
805
            #[cfg(all(feature="smol", feature = "native-tls"))]
806
            mod smol_native_tls_tests {
807
                tests_with_runtime! { &crate::smol::SmolNativeTlsRuntime::create()? => $($id),* }
808
            }
809
            #[cfg(all(feature="tokio", feature="rustls"))]
810
            mod tokio_rustls_tests {
811
                tests_with_runtime! {  &crate::tokio::TokioRustlsRuntime::create()? => $($id),* }
812
            }
813
            #[cfg(all(feature="async-std", feature="rustls"))]
814
            mod async_std_rustls_tests {
815
                tests_with_runtime! {  &crate::async_std::AsyncStdRustlsRuntime::create()? => $($id),* }
816
            }
817
            #[cfg(all(feature="smol", feature="rustls"))]
818
            mod smol_rustls_tests {
819
                tests_with_runtime! {  &crate::smol::SmolRustlsRuntime::create()? => $($id),* }
820
            }
821
            mod default_runtime_tls_tests {
822
                tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
823
            }
824
        }
825
    }
826

            
827
    runtime_tests! {
828
        small_delay,
829
        small_timeout_ok,
830
        small_timeout_expire,
831
        tiny_wallclock,
832
        self_connect_tcp,
833
        self_connect_udp,
834
        listener_stream,
835
    }
836

            
837
    tls_runtime_tests! {
838
        simple_tls,
839
        simple_tls_server,
840
    }
841
}