1
//! Low-level connection implementations.
2
//!
3
//! This module defines two main types: [`NonblockingConnection`].
4
//! (a low-level type for use with external tools
5
//! that want to implement their own nonblocking IO),
6
//! and [`BlockingConnection`] (a slightly higher-level type
7
//! that we use internally when we are asked to provide
8
//! our own nonblocking IO loop(s)).
9
//!
10
//! This module also defines several traits for use by these types.
11
//!
12
//! Treats messages as unrelated strings, and validates outgoing messages for correctness.
13

            
14
mod blocking;
15
mod nonblocking;
16

            
17
use std::io;
18

            
19
#[cfg(unix)]
20
use std::os::fd::{AsFd as _, BorrowedFd as BorrowedOsHandle};
21
#[cfg(windows)]
22
use std::os::windows::io::{AsSocket as _, BorrowedSocket as BorrowedOsHandle};
23

            
24
pub(crate) use blocking::BlockingConnection;
25
pub(crate) use nonblocking::{NonblockingConnection, PollStatus, WriteHandle};
26

            
27
pub use nonblocking::{EventLoop, SendRequestError};
28

            
29
/// Retry `f` until it returns Ok() or an error whose kind is not `Interrupted`
30
2364
fn retry_eintr<F, T>(mut f: F) -> io::Result<T>
31
2364
where
32
2364
    F: FnMut() -> io::Result<T>,
33
{
34
    loop {
35
2364
        let r = f();
36
150
        match r {
37
150
            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
38
2364
            _ => return r,
39
        }
40
    }
41
2364
}
42

            
43
/// Any type we can use as a target for [`NonblockingConnection`].
44
pub(crate) trait Stream: io::Read + io::Write + Send {
45
    /// If this Stream object is a [`MioStream`], return it as a `mio::event::Source`.
46
    ///
47
    /// Otherwise return None.
48
    fn as_mio_source(&mut self) -> Option<&mut dyn mio::event::Source>;
49

            
50
    /// Discard any mio-specific wrappers on this stream.
51
    fn remove_mio(self: Box<Self>) -> Box<dyn Stream>;
52

            
53
    /// Return an os-specific handle for using this stream type within a nonblocking event loop.
54
    ///
55
    /// (This will be an fd on unix and a SOCKET on windows.)
56
    fn try_as_handle(&self) -> io::Result<BorrowedOsHandle<'_>>;
57
}
58

            
59
/// A [`Stream`] that we can use inside a [`BlockingConnection`].
60
pub(crate) trait MioStream: Stream + mio::event::Source {}
61

            
62
/// Implement Stream and MioStream for a related pair of types.
63
macro_rules! impl_traits {
64
    { $stream:ty => $mio_stream:ty } => {
65
        impl Stream for $stream {
66
            fn as_mio_source(&mut self) -> Option<&mut dyn mio::event::Source> {
67
                None
68
            }
69
            fn remove_mio(self: Box<Self>) -> Box<dyn Stream> {
70
                self
71
            }
72
            fn try_as_handle(&self) -> io::Result<BorrowedOsHandle<'_>> {
73
                cfg_if::cfg_if!{
74
                    if #[cfg(unix)] {
75
                        Ok(self.as_fd())
76
                    } else if #[cfg(windows)] {
77
                        Ok(self.as_socket())
78
                    }
79
                }
80
            }
81
        }
82
        impl Stream for $mio_stream {
83
154
            fn as_mio_source(&mut self) -> Option<&mut dyn mio::event::Source> {
84
154
                Some(self as _)
85
154
            }
86
            fn remove_mio(self: Box<Self>) -> Box<dyn Stream> {
87
                Box::new(<$stream>::from(*self))
88
            }
89
            fn try_as_handle(&self) -> io::Result<BorrowedOsHandle<'_>> {
90
                cfg_if::cfg_if!{
91
                    if #[cfg(unix)] {
92
                        Ok(self.as_fd())
93
                    } else if #[cfg(windows)] {
94
                        Ok(self.as_socket())
95
                    }
96
                }
97
            }
98
        }
99
        impl MioStream for $mio_stream {
100
        }
101
    }
102
}
103

            
104
impl_traits! { std::net::TcpStream => mio::net::TcpStream }
105
#[cfg(unix)]
106
impl_traits! { std::os::unix::net::UnixStream => mio::net::UnixStream }
107

            
108
// We implement "Stream" for Empty so that we can use it to temporarily swap it in
109
// as a placeholder for a Box<dyn Stream>.
110
impl Stream for std::io::Empty {
111
    fn as_mio_source(&mut self) -> Option<&mut dyn mio::event::Source> {
112
        None
113
    }
114

            
115
    fn remove_mio(self: Box<Self>) -> Box<dyn Stream> {
116
        self
117
    }
118

            
119
    fn try_as_handle(&self) -> io::Result<BorrowedOsHandle<'_>> {
120
        Err(io::ErrorKind::Unsupported.into())
121
    }
122
}