1
//! Define a wrapper around [`NonblockingConnection`] providing blocking io,
2
//! based on the [`mio`] library.
3
//!
4
//! We use this wrapper when the user is not providing their own event loop.
5

            
6
use mio::Interest;
7

            
8
use crate::msgs::response::UnparsedResponse;
9
use std::io;
10

            
11
use super::nonblocking::{EventLoop, NonblockingConnection, PollStatus, WriteHandle};
12
use super::{MioStream, retry_eintr};
13

            
14
/// An IO connection to Arti, along with any supporting logic necessary to check it for readiness.
15
///
16
/// Internally, this uses `mio` along with a [`NonblockingConnection`] to check for events.
17
///
18
/// To use this type, mark the stream as nonblocking
19
/// with e.g. [TcpStream::set_nonblocking](std::net::TcpStream::set_nonblocking),
20
/// convert it into a [`mio::event::Source`],
21
/// and pass it to [`BlockingConnection::new()`]
22
///
23
/// At this point, you can read and write messages via nonblocking IO.
24
///
25
/// The [`BlockingConnection::writer()`] method will return a handle that you can use from any thread
26
/// that you can use to queue an outbound message.
27
///
28
/// No messages are actually sent or received unless
29
/// some thread is calling [`BlockingConnection::interact()`].
30
///
31
/// ## Concurrency and interior mutability
32
///
33
/// A `BlockingConnection` has (limited) interior mutability.
34
///
35
/// Only a single call to `interact` can be made at the same time.
36
/// So only one thread can be waiting for responses, and
37
/// the caller of `interact` must demultiplex responses as necessary.
38
///
39
/// But, one or more [`WriteHandle`]s can be created,
40
/// and these are `'static + Send + Sync`.
41
/// Using `WriteHandle`, multiple threads can enqueue requests,
42
/// with [`send_valid`](WriteHandle::send_valid), concurrently.
43
///
44
/// (All these restrictions imposed on the caller are enforced by the Rust type system.)
45
#[derive(Debug)]
46
pub(crate) struct BlockingConnection {
47
    /// The poll object.
48
    ///
49
    /// (This typically corresponds to a kqueue or epoll handle.)
50
    ///
51
    /// ## IO Safety
52
    ///
53
    /// This object (semantically) contains references to the `fd`s or `SOCKETS`
54
    /// of any inserted [`mio::event::Source`].  Therefore it must not outlive those sources.
55
    /// Further, according to `mio`'s documentation, every Source must be deregistered
56
    /// before it can be dropped.
57
    ///
58
    /// We ensure these properties are obeyed as follows:
59
    ///  - We hold the stream via `conn`, the NonblockingConnection member of this struct.
60
    ///    We do not let anybody outside this module have the stream or the `Poll`.
61
    ///  - We declare a Drop implementation that deregisters the stream.
62
    ///    This method ensures that the stream is dropped before it is closed.
63
    poll: mio::Poll,
64

            
65
    /// A small buffer to receive IO readiness events.
66
    events: mio::Events,
67

            
68
    /// The underlying NonblockingConnection.
69
    ///
70
    /// Invariant: `nbconn.stream` is a [`MioStream`],
71
    /// so [`NonblockingConnection::as_mio_source`] will return
72
    /// Some when we call it.
73
    ///
74
    /// This is None only if we have called `into_nonblocking()` or `drop()`.
75
    /// We store this in an Option so that we can move it out of this object.
76
    nbconn: Option<NonblockingConnection>,
77
}
78

            
79
/// A `mio` token corresponding to the Waker we use to tell the interactor about new writes.
80
const WAKE_TOKEN: mio::Token = mio::Token(0);
81

            
82
/// A `mio` token corresponding to the Stream connecting to the RPC
83
const STREAM_TOKEN: mio::Token = mio::Token(1);
84

            
85
/// Wrapper around [`mio::Waker`] on which we implement [`EventLoop`].
86
///
87
/// We don't do so on `mio::Waker` directly
88
/// since other implementations of `EventLoop` on `mio::Waker`
89
/// are possible.
90
struct MioWaker(mio::Waker);
91

            
92
impl BlockingConnection {
93
    /// Create a new BlockingConnection.
94
    ///
95
    /// The `stream` will be set to use nonblocking IO;
96
    /// on Unix this will affect the behaviour of other `dup`s of the same fd!
97
10
    pub(crate) fn new(stream: Box<dyn MioStream>) -> io::Result<Self> {
98
10
        let poll = mio::Poll::new()?;
99
10
        let waker = mio::Waker::new(poll.registry(), WAKE_TOKEN)?;
100

            
101
10
        let nbconn = NonblockingConnection::new(Box::new(MioWaker(waker)), stream);
102

            
103
10
        let mut cio = Self {
104
10
            poll,
105
10
            events: mio::Events::with_capacity(4),
106
10
            nbconn: Some(nbconn),
107
10
        };
108

            
109
        // We register the stream here, since we want to use it exclusively with `reregister`
110
        // later on.  We do not deregister the stream until `Drop::drop` is called.
111
10
        cio.poll.registry().register(
112
10
            cio.nbconn
113
10
                .as_mut()
114
10
                .expect("Logic error: stream not present")
115
10
                .as_mio_source()
116
10
                .expect("logic error: not a mio stream."),
117
            STREAM_TOKEN,
118
            Interest::READABLE,
119
        )?;
120

            
121
10
        Ok(cio)
122
10
    }
123

            
124
    /// Return a new [`WriteHandle`] that can be used to queue messages to be sent via this stream.
125
10
    pub(crate) fn writer(&self) -> WriteHandle {
126
10
        self.nbconn
127
10
            .as_ref()
128
10
            .expect("logic error: stream not present")
129
10
            .writer()
130
10
    }
131

            
132
    /// Interact with the peer until some response is received.
133
    ///
134
    /// Sends all requests given to [`WriteHandle::send_valid`]
135
    /// (including calls to `send_valid` made while `interact` is running)
136
    /// while looking for a response from the server.
137
    /// Returns when the first response is received.
138
    ///
139
    ///
140
    /// Returns an error if an IO condition has failed.
141
    /// Returns None if the other side has closed the stream.
142
    /// Otherwise, returns an unparsed message from the RPC server.
143
    ///
144
    /// Unless some thread is calling this method, nobody will actually be reading or writing from
145
    /// the [`BlockingConnection`], and so nobody's requests will be sent or answered.
146
8098
    pub(crate) fn interact(&mut self) -> io::Result<Option<UnparsedResponse>> {
147
        // Should we try to read and write? Start out by assuming "yes".
148

            
149
        loop {
150
8282
            let nbconn = self
151
8282
                .nbconn
152
8282
                .as_mut()
153
8282
                .expect("logic error: connection not present!");
154

            
155
            // Try interacting with the underlying stream.
156
8282
            match nbconn.interact_once()? {
157
                PollStatus::Closed => return Ok(None),
158
8096
                PollStatus::Msg(msg) => return Ok(Some(msg)),
159
184
                PollStatus::WouldBlock => {}
160
            };
161

            
162
            // We're blocking on reading and possibly writing.  Register our interest,
163
            // so that we get woken as appropriate.
164
            //
165
            // TOCTOU note: If `want_write` is true, it will not become
166
            // false until the next time we call stream.interact_once().
167
            //
168
            // If `wantio.want_write()` is false, Whenever it becomes true,
169
            // `MioWaker` will be invoked.  That will cause the
170
            // self.poll.poll() to return, and the loop to repeat.
171
184
            let want_write = nbconn.wants_to_write();
172
184
            let interests = if want_write {
173
26
                Interest::READABLE | Interest::WRITABLE
174
            } else {
175
158
                Interest::READABLE
176
            };
177
184
            self.poll.registry().reregister(
178
184
                nbconn
179
184
                    .as_mio_source()
180
184
                    .expect("logic error: not a mio stream!"),
181
                STREAM_TOKEN,
182
184
                interests,
183
            )?;
184

            
185
            // Poll until the socket is ready to read or write,
186
            // _or_ until somebody invokes the EventLoop because they have queued more to write.
187
276
            let () = retry_eintr(|| self.poll.poll(&mut self.events, None))?;
188

            
189
            // Now that we've been woken, see which events we've been woken with,
190
            // and adjust our plans accordingly on the next time through the loop.
191
184
            self.events.clear();
192
        }
193
8098
    }
194

            
195
    /// Downgrade this stream into a [`NonblockingConnection`]
196
    /// for use within an [`RpcPoll`](crate::RpcPoll).
197
    pub(crate) fn into_nonblocking(mut self) -> NonblockingConnection {
198
        let mut nb_conn = self
199
            .deregister_and_take_nb_conn()
200
            .expect("logic error: stream not present!");
201
        nb_conn.downgrade_source();
202
        nb_conn
203
    }
204

            
205
    /// Implementation helper for Drop and into_nonblocking:
206
    ///
207
    /// Deregisters the NonblockingConnection with the mio Registry, removes it from this object,
208
    /// and returns it.
209
    ///
210
    /// After this method is called, this object may no longer be used.
211
10
    fn deregister_and_take_nb_conn(&mut self) -> Option<NonblockingConnection> {
212
        // IO SAFETY: See "IO Safety" note in documentation for BlockingConnection.
213
10
        let mut nbconn = self.nbconn.take()?;
214
10
        let s: &mut _ = nbconn
215
10
            .as_mio_source()
216
10
            .expect("Logic error: Stream was not a MIO stream.");
217
10
        self.poll
218
10
            .registry()
219
10
            .deregister(s)
220
10
            .expect("Deregister operation failed");
221
10
        Some(nbconn)
222
10
    }
223
}
224

            
225
impl Drop for BlockingConnection {
226
10
    fn drop(&mut self) {
227
        // IO SAFETY: See "IO Safety" note in documentation for BlockingConnection.
228
10
        let _ = self.deregister_and_take_nb_conn();
229
10
    }
230
}
231

            
232
impl EventLoop for MioWaker {
233
800
    fn start_writing(&mut self) -> io::Result<()> {
234
800
        mio::Waker::wake(&self.0)
235
800
    }
236
796
    fn stop_writing(&mut self) -> io::Result<()> {
237
796
        Ok(())
238
796
    }
239
}
240

            
241
// TODO: It would be good to have additional tests for this code.
242
// It's exercised by all tests for `conn` that don't provide their own event loop,
243
// but there could definitely be more things to look at.