1
//! Different implementations of a common async API for use in arti
2
//!
3
//! Currently only async_std, tokio and smol are provided.
4

            
5
#[cfg(feature = "async-std")]
6
pub(crate) mod async_std;
7

            
8
#[cfg(feature = "tokio")]
9
pub(crate) mod tokio;
10

            
11
#[cfg(feature = "smol")]
12
pub(crate) mod smol;
13

            
14
#[cfg(feature = "rustls")]
15
pub(crate) mod rustls;
16

            
17
#[cfg(feature = "native-tls")]
18
pub(crate) mod native_tls;
19

            
20
pub(crate) mod streamops;
21
pub(crate) mod unimpl_tls;
22

            
23
use crate::network::{
24
    CommonConnectOptions, CommonListenOptions, TcpConnectOptions, TcpListenOptions,
25
};
26
use socket2::Socket;
27

            
28
#[cfg(unix)]
29
use tor_error::warn_report;
30

            
31
/// Connection backlog size to use for `listen()` calls on IP sockets.
32
//
33
// How this was chosen:
34
//
35
// 1. The rust standard library uses a backlog of 128 for TCP sockets. This matches `SOMAXCONN` on
36
//    most systems.
37
//
38
// 2. Mio (backend for tokio) previously used 1024. But they recently (confusingly) copied the logic
39
//    from the standard library's unix socket implementation, which uses different values on
40
//    different platforms. These values were tuned for unix sockets, so I think we should ignore
41
//    them and mio's implementation here.
42
//    https://github.com/tokio-rs/mio/pull/1896
43
//
44
// 3. Tor first tries using `INT_MAX`, and if that fails falls back to `SOMAXCONN` (using a global
45
//    to remember if it did the fallback for future listen() calls; see `tor_listen`).
46
//
47
// 4. On supported platforms, if you use a backlog that is too large, the system will supposedly
48
//    silently cap the value instead of failing.
49
//
50
//     Linux:
51
//     listen(2)
52
//     > If the backlog argument is greater than the value in /proc/sys/net/core/somaxconn, then it
53
//     > is silently capped to that value.
54
//
55
//     FreeBSD:
56
//     listen(2)
57
//     > The sysctl(3) MIB variable kern.ipc.soacceptqueue specifies a hard limit on backlog; if a
58
//     > value greater than kern.ipc.soacceptqueue or less than zero is specified, backlog is
59
//     > silently forced to kern.ipc.soacceptqueue.
60
//
61
//     OpenBSD:
62
//     listen(2)
63
//     > [BUGS] The backlog is currently limited (silently) to the value of the kern.somaxconn
64
//     > sysctl, which defaults to 128.
65
//
66
//     Windows:
67
//     https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-listen
68
//     > The backlog parameter is limited (silently) to a reasonable value as determined by the
69
//     > underlying service provider. Illegal values are replaced by the nearest legal value.
70
//
71
//     Mac OS:
72
//     Archived listen(2) docs
73
//     https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/listen.2.html
74
//     > [BUGS] The backlog is currently limited (silently) to 128.
75
//
76
// 5. While the rust APIs take a `u32`, the libc API uses `int`. So we shouldn't use a value larger
77
//    than `c_int::MAX`.
78
//
79
// 6. We should be careful not to set this too large, as supposedly some systems will truncate this to
80
//    16 bits. So for example a value of `65536` would cause a backlog of 1. But maybe they are just
81
//    referring to systems where `int` is 16 bits?
82
//    https://bugs.python.org/issue38699#msg357957
83
//
84
// Here we use `u16::MAX`. We assume that this will succeed on all supported platforms. Unlike tor,
85
// we do not try again with a smaller value since this doesn't seem to be needed on modern systems.
86
// We can add it if we find that it's needed.
87
//
88
// A value of `u16::MAX` is arguably too high, since a smaller value like 4096 would be large enough
89
// for legitimate traffic, and illegitimate traffic would be better handled by the kernel with
90
// something like SYN cookies. But it's easier for users to reduce the max using
91
// `/proc/sys/net/core/somaxconn` than to increase this max by recompiling arti.
92
const LISTEN_BACKLOG: i32 = u16::MAX as i32;
93

            
94
/// Open a listening TCP socket.
95
///
96
/// The socket will be non-blocking, and the socket handle will be close-on-exec/non-inheritable.
97
/// Other socket options may also be set depending on the socket type and platform.
98
///
99
/// Historically we relied on the runtime to create a listening socket, but we need some specific
100
/// socket options set, and not all runtimes will behave the same. It's better for us to create the
101
/// socket with the options we need and with consistent behaviour across all runtimes. For example
102
/// if each runtime were using a different `listen()` backlog size, it might be difficult to debug
103
/// related issues.
104
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
105
22
pub(crate) fn tcp_listen(
106
22
    addr: &std::net::SocketAddr,
107
22
    options: &TcpListenOptions,
108
22
) -> std::io::Result<std::net::TcpListener> {
109
    use socket2::{Domain, Type};
110

            
111
    // Destructure the options so that we don't forget to use any.
112
    let TcpListenOptions {
113
        common:
114
            CommonListenOptions {
115
22
                send_buffer_size,
116
22
                recv_buffer_size,
117
            },
118
22
    } = options;
119

            
120
    // `socket2::Socket::new()`:
121
    // > This function corresponds to `socket(2)` on Unix and `WSASocketW` on Windows.
122
    // >
123
    // > On Unix-like systems, the close-on-exec flag is set on the new socket. Additionally, on
124
    // > Apple platforms `SOCK_NOSIGPIPE` is set. On Windows, the socket is made non-inheritable.
125
22
    let socket = match addr {
126
22
        std::net::SocketAddr::V4(_) => Socket::new(Domain::IPV4, Type::STREAM, None)?,
127
        std::net::SocketAddr::V6(_) => {
128
            let socket = Socket::new(Domain::IPV6, Type::STREAM, None)?;
129

            
130
            // On `cfg(unix)` systems, set `IPV6_V6ONLY` so that we can bind AF_INET and
131
            // AF_INET6 sockets to the same port.
132
            // This is `cfg(unix)` as I'm not sure what the socket option does (if anything) on
133
            // non-unix platforms.
134
            #[cfg(unix)]
135
            if let Err(e) = socket.set_only_v6(true) {
136
                // If we see this, we should exclude more platforms.
137
                warn_report!(
138
                    e,
139
                    "Failed to set `IPV6_V6ONLY` on `AF_INET6` socket. \
140
                    Please report this bug at https://gitlab.torproject.org/tpo/core/arti/-/issues",
141
                );
142
            }
143

            
144
            socket
145
        }
146
    };
147

            
148
    // Below we try to match what a `tokio::net::TcpListener::bind()` would do. This is a bit tricky
149
    // since tokio documents "Calling TcpListener::bind("127.0.0.1:8080") is equivalent to:" with
150
    // some provided example code, but this logic actually appears to happen in the mio crate, and
151
    // doesn't match exactly with tokio's documentation. So here we acknowledge that we likely do
152
    // deviate from `tokio::net::TcpListener::bind()` a bit.
153

            
154
22
    socket.set_nonblocking(true)?;
155

            
156
    // The docs for `tokio::net::TcpSocket` say:
157
    //
158
    // > // On platforms with Berkeley-derived sockets, this allows to quickly
159
    // > // rebind a socket, without needing to wait for the OS to clean up the
160
    // > // previous one.
161
    // >
162
    // > // On Windows, this allows rebinding sockets which are actively in use,
163
    // > // which allows "socket hijacking", so we explicitly don't set it here.
164
    // > // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
165
    //
166
    // This appears to be a comment that tokio copied from mio.
167
    //
168
    // So here we only set SO_REUSEADDR for `cfg(unix)` to match tokio.
169
    #[cfg(unix)]
170
22
    socket.set_reuse_address(true)?;
171

            
172
    // tcp(7):
173
    //
174
    // > On individual connections, the socket buffer size must be set prior to the listen(2) or
175
    // > connect(2) calls in order to have it take effect.
176
22
    if let Some(send_buffer_size) = send_buffer_size {
177
        socket.set_send_buffer_size(*send_buffer_size)?;
178
22
    }
179
22
    if let Some(recv_buffer_size) = recv_buffer_size {
180
        socket.set_recv_buffer_size(*recv_buffer_size)?;
181
22
    }
182

            
183
22
    socket.bind(&(*addr).into())?;
184

            
185
22
    socket.listen(LISTEN_BACKLOG)?;
186

            
187
22
    Ok(socket.into())
188
22
}
189

            
190
/// Stub replacement for tcp_listen on wasm32-unknown
191
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
192
pub(crate) fn tcp_listen(
193
    _addr: &std::net::SocketAddr,
194
    _options: &TcpListenOptions,
195
) -> std::io::Result<std::net::TcpListener> {
196
    Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
197
}
198

            
199
/// Initialize a TCP socket in preparation for a connect().
200
///
201
/// The socket will be non-blocking, and the socket handle will be close-on-exec/non-inheritable.
202
/// Other socket options may also be set depending on the socket type and platform.
203
///
204
/// This returns a socket without any `connect()` call. The caller MUST:
205
///
206
/// 1. connect() the socket.
207
/// 2. Wait for the socket to become writable using whatever mechanism
208
///    is available with the current runtime.
209
/// 3. Check `SO_ERROR` for errors.
210
///
211
/// Historically we relied on the runtime to create and connect the socket, but we need some
212
/// specific socket options set, and not all runtimes will behave the same. It's better for us to
213
/// create the socket with the options we need and with consistent behaviour across all runtimes.
214
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
215
76
pub(crate) fn tcp_pre_connect(
216
76
    addr: &std::net::SocketAddr,
217
76
    options: &TcpConnectOptions,
218
76
) -> std::io::Result<socket2::Socket> {
219
    use socket2::{Domain, Type};
220

            
221
    // Destructure the options so that we don't forget to use any.
222
    let TcpConnectOptions {
223
        common:
224
            CommonConnectOptions {
225
76
                send_buffer_size,
226
76
                recv_buffer_size,
227
            },
228
76
    } = options;
229

            
230
76
    let domain = match addr {
231
76
        std::net::SocketAddr::V4(_) => Domain::IPV4,
232
        std::net::SocketAddr::V6(_) => Domain::IPV6,
233
    };
234

            
235
    // `socket2::Socket::new()`:
236
    // > This function corresponds to `socket(2)` on Unix and `WSASocketW` on Windows.
237
    // >
238
    // > On Unix-like systems, the close-on-exec flag is set on the new socket. Additionally, on
239
    // > Apple platforms `SOCK_NOSIGPIPE` is set. On Windows, the socket is made non-inheritable.
240
76
    let socket = Socket::new(domain, Type::STREAM, None)?;
241

            
242
76
    socket.set_nonblocking(true)?;
243

            
244
    // tcp(7):
245
    //
246
    // > On individual connections, the socket buffer size must be set prior to the listen(2) or
247
    // > connect(2) calls in order to have it take effect.
248
76
    if let Some(send_buffer_size) = send_buffer_size {
249
        socket.set_send_buffer_size(*send_buffer_size)?;
250
76
    }
251
76
    if let Some(recv_buffer_size) = recv_buffer_size {
252
        socket.set_recv_buffer_size(*recv_buffer_size)?;
253
76
    }
254

            
255
    // TODO: In the future, we'll likely want to support optionally binding to an address or to a
256
    // network interface (`SO_BINDTODEVICE`). See c-tor's `OutboundBindAddresses`.
257
    // If we do, we will also want to set `IP_BIND_ADDRESS_NO_PORT`.
258
    // We may also want to consider setting `IPV6_V6ONLY` (do we want to support connecting to
259
    // IPv4-mapped IPv6 addresses while we already do happy eyeballs?).
260

            
261
    // We do not connect() here so that we can use whatever connection mechanism is best for the
262
    // runtime being used.
263

            
264
76
    Ok(socket)
265
76
}
266

            
267
/// Stub replacement for tcp_pre_connect on wasm32-unknown
268
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
269
pub(crate) fn tcp_pre_connect(
270
    _addr: &std::net::SocketAddr,
271
    _options: &TcpConnectOptions,
272
) -> std::io::Result<socket2::Socket> {
273
    Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
274
}
275

            
276
/// Connect a TCP socket using the async-io crate.
277
///
278
/// This in theory should be runtime-independent as async-io spawns its own thread to poll the
279
/// socket. But this is inefficient on some runtimes like tokio.
280
///
281
/// Runtimes that want to connect manually should use [`tcp_pre_connect()`] to set up the socket,
282
/// and then connect it manually.
283
#[cfg(any(feature = "async-std", feature = "smol"))]
284
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
285
40
pub(crate) async fn tcp_async_io_connect(
286
40
    addr: &std::net::SocketAddr,
287
40
    options: &TcpConnectOptions,
288
60
) -> std::io::Result<std::net::TcpStream> {
289
    use async_io::Async;
290

            
291
    // The socket before connect() has been called.
292
40
    let socket = tcp_pre_connect(addr, options)?;
293

            
294
    // Different platforms return different results from non-blocking `connect()`s.
295
    // Here we've checked that we match mio (tokio's low-level I/O code) for unix and windows
296
    // to ensure that we're handling the right error kind/errno.
297
40
    match socket.connect(&(*addr).into()) {
298
        Ok(()) => {}
299
        // On unix, mio checks for `EINPROGRESS`:
300
        // https://github.com/tokio-rs/mio/blob/0db25a7eae653f02e964a28d9aaf65b74c941208/src/sys/unix/tcp.rs#L35
301
        #[cfg(unix)]
302
40
        Err(e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
303
        // On windows, mio checks for `WouldBlock`:
304
        // https://github.com/tokio-rs/mio/blob/0db25a7eae653f02e964a28d9aaf65b74c941208/src/sys/windows/tcp.rs#L44
305
        #[cfg(windows)]
306
        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
307
        Err(e) => return Err(e),
308
    }
309

            
310
    // The socket is already non-blocking,
311
    // so `Async` doesn't need to set as non-blocking again.
312
40
    let socket = Async::new_nonblocking(socket)?;
313

            
314
    // Wait for the socket to become writable, indicating that it's connected.
315
40
    socket.writable().await?;
316

            
317
    // Check `SO_ERROR`.
318
40
    if let Some(e) = socket.get_ref().take_error()? {
319
        return Err(e);
320
40
    }
321

            
322
40
    Ok(socket.into_inner()?.into())
323
40
}
324

            
325
/// Stub replacement for tcp_async_io_connect on wasm32-unknown
326
#[cfg(any(feature = "async-std", feature = "smol"))]
327
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
328
pub(crate) async fn tcp_async_io_connect(
329
    _addr: &std::net::SocketAddr,
330
    _options: &TcpConnectOptions,
331
) -> std::io::Result<std::net::TcpStream> {
332
    Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
333
}
334

            
335
/// Helper: Implement an unreachable NetProvider<unix::SocketAddr> for a given runtime.
336
#[cfg(not(unix))]
337
macro_rules! impl_unix_non_provider {
338
    { $for_type:ty } => {
339

            
340
        #[async_trait]
341
        impl crate::traits::NetStreamProvider<tor_general_addr::unix::SocketAddr> for $for_type {
342
            type Stream = crate::unimpl::FakeStream;
343
            type Listener = crate::unimpl::FakeListener<tor_general_addr::unix::SocketAddr>;
344
            type ConnectOptions = crate::network::UnixConnectOptions;
345
            type ListenOptions = crate::network::UnixListenOptions;
346
            async fn connect(
347
                &self,
348
                _a: &tor_general_addr::unix::SocketAddr,
349
                _options: &Self::ConnectOptions,
350
            ) -> IoResult<Self::Stream> {
351
                Err(tor_general_addr::unix::NoAfUnixSocketSupport::default().into())
352

            
353
            }
354
            async fn listen(
355
                &self,
356
                _a: &tor_general_addr::unix::SocketAddr,
357
                _options: &Self::ListenOptions,
358
            ) -> IoResult<Self::Listener> {
359
                Err(tor_general_addr::unix::NoAfUnixSocketSupport::default().into())
360
            }
361
        }
362
    }
363
}
364
#[cfg(not(unix))]
365
pub(crate) use impl_unix_non_provider;