1
//! Lower level connection type based on nonblocking IO.
2
//!
3
//! This module defines [`NonblockingConnection`], which provides a nonblocking
4
//! wrapper around an underlying nonblocking stream,
5
//! [`WriteHandle`], which queues messages for a `NonblockingConnection`,
6
//! and [`EventLoop`], a trait wrapping access to an event loop
7
//! based on poll, select, kqueue, epoll, etc.
8
//!
9
//! `NonblockingConnection` is used directly in `RpcPoll` if the user wants to provide their own
10
//! event loop, or wrapped in a [`BlockingConnection`](super::BlockingConnection)
11
//! if this RPC library is providing its own event loop.
12

            
13
use crate::{
14
    msgs::{request::ValidatedRequest, response::UnparsedResponse},
15
    util::define_from_for_arc,
16
};
17
use std::{
18
    io::{self, Read as _, Write as _},
19
    mem::{self},
20
    sync::{Arc, Mutex},
21
};
22

            
23
#[cfg(unix)]
24
use std::os::fd::BorrowedFd as BorrowedOsHandle;
25
#[cfg(windows)]
26
use std::os::windows::io::BorrowedSocket as BorrowedOsHandle;
27

            
28
use super::{Stream, retry_eintr};
29

            
30
/// A lower-level implementation of nonblocking IO for an open stream to the RPC server.
31
///
32
/// Unlike [`BlockingConnection`], this type _does not_ handle the IO event polling loops:
33
/// the caller is required to provide their own.
34
///
35
/// [`BlockingConnection`]: super::BlockingConnection
36
#[derive(derive_more::Debug)]
37
pub(crate) struct NonblockingConnection {
38
    /// A write handle used to write onto this stream.
39
    #[debug(ignore)]
40
    write_handle: WriteHandle,
41

            
42
    /// A buffer of incoming messages (possibly partial) from the RPC server.
43
    //
44
    // TODO: Consider using a VecDeque or BytesMut or such.
45
    read_buf: Vec<u8>,
46

            
47
    /// The underlying nonblocking stream.
48
    #[debug(ignore)]
49
    stream: Box<dyn Stream>,
50
}
51

            
52
/// A return value from [`NonblockingConnection::interact_once`].
53
#[derive(Debug, Clone)]
54
pub(crate) enum PollStatus {
55
    /// The stream is closed.
56
    Closed,
57

            
58
    /// No progress can be made until the stream is available for further IO.
59
    WouldBlock,
60

            
61
    /// We have received a message.
62
    Msg(UnparsedResponse),
63
}
64

            
65
/// A handle that can be used to queue outgoing messages for a nonblocking stream.
66
///
67
/// Note that queueing a message has no effect unless some party is polling the stream,
68
/// either with [`BlockingConnection::interact()`], or [`NonblockingConnection::interact_once()`].
69
///
70
/// [`BlockingConnection::interact()`]: super::BlockingConnection::interact
71
#[derive(Clone, Debug)]
72
pub(crate) struct WriteHandle {
73
    /// The actual implementation type for this writer.
74
    inner: Arc<Mutex<WriteHandleImpl>>,
75
}
76

            
77
impl WriteHandle {
78
    /// Queue an outgoing message for a nonblocking stream.
79
4190
    pub(crate) fn send_valid(&self, msg: &ValidatedRequest) -> io::Result<()> {
80
4190
        let mut w = self.inner.lock().expect("Poisoned lock");
81
4190
        let was_empty = w.write_buf.is_empty();
82
4190
        w.write_buf.extend_from_slice(msg.as_ref().as_bytes());
83

            
84
        // See TOCTOU note on `WriteHandleImpl`:
85
        // we need to change our interest while we are holding the
86
        // above mutex.
87
4190
        if was_empty {
88
804
            w.event_loop.start_writing()?;
89
3386
        }
90
4190
        Ok(())
91
4190
    }
92
}
93

            
94
/// An error that has occurred while sending a request.
95
#[derive(Clone, Debug, thiserror::Error)]
96
#[non_exhaustive]
97
pub enum SendRequestError {
98
    /// An IO error occurred while sending a request.
99
    #[error("Unable to wake poling loop")]
100
    Io(#[source] Arc<io::Error>),
101
    /// We found a problem in the JSON while sending a request.
102
    #[error("Invalid Json request")]
103
    InvalidRequest(#[from] crate::InvalidRequestError),
104
    /// Internal error while re-encoding request.  Should be impossible.
105
    #[error("Unable to re-encode request after parsing it‽")]
106
    ReEncode(#[source] Arc<serde_json::Error>),
107
}
108
define_from_for_arc!( io::Error => SendRequestError [Io] );
109

            
110
/// The inner implementation for [`WriteHandle`].
111
///
112
/// NOTE: We need to be careful to avoid TOCTOU problems with this type:
113
/// It would be bad if a writing thread said "now I care about write events",
114
/// and then the interactor checked the
115
/// buffer and found it empty, and only then did the writing thread add to the buffer.
116
///
117
/// To solve this, we put the `write_buf` and the `event_loop` behind the same lock:
118
/// While the interactor is checking the buffer, nobody is able to add to the buffer _or_ wake the
119
/// interactor.
120
#[derive(derive_more::Debug)]
121
struct WriteHandleImpl {
122
    /// An underlying buffer holding messages to be sent to the RPC server.
123
    //
124
    // TODO: Consider using a VecDeque or BytesMut or such.
125
    write_buf: Vec<u8>,
126

            
127
    /// The handle to use to wake the polling loop.
128
    #[debug(ignore)]
129
    event_loop: Box<dyn EventLoop>,
130
}
131

            
132
impl NonblockingConnection {
133
    /// Create a new `NonblockingConnection` from a provided [`EventLoop`] and [`Stream`].
134
14
    pub(crate) fn new(event_loop: Box<dyn EventLoop>, stream: Box<dyn Stream>) -> Self {
135
14
        Self {
136
14
            write_handle: WriteHandle {
137
14
                inner: Arc::new(Mutex::new(WriteHandleImpl {
138
14
                    write_buf: Default::default(),
139
14
                    event_loop,
140
14
                })),
141
14
            },
142
14
            read_buf: Default::default(),
143
14
            stream,
144
14
        }
145
14
    }
146

            
147
    /// Return a reference to this connection as a mio source.
148
    ///
149
    /// Returns None if this is was not constructed with a mio stream,
150
    /// or if `downgrade_source` has been called.
151
204
    pub(super) fn as_mio_source(&mut self) -> Option<&mut dyn mio::event::Source> {
152
204
        self.stream.as_mut().as_mio_source()
153
204
    }
154

            
155
    /// Remove any mio wrappers from this connection.
156
    pub(super) fn downgrade_source(&mut self) {
157
        // We need this rigamarole because `self.stream = self.stream.remove_mio()`
158
        // gives a "can't move out of self.stream, which is behind a mutable reference"
159
        // error.
160
        let mut s: Box<dyn Stream> = Box::new(std::io::empty());
161
        mem::swap(&mut s, &mut self.stream);
162
        self.stream = s.remove_mio();
163
    }
164

            
165
    /// Return a new [`WriteHandle`] that can be used to queue messages to be sent via this connection.
166
12
    pub(crate) fn writer(&self) -> WriteHandle {
167
12
        self.write_handle.clone()
168
12
    }
169

            
170
    /// Try to return an OS-level handle for use with this connection.
171
    ///
172
    /// This is an fd on unix and a SOCKET on windows.
173
    pub(crate) fn try_as_handle(&self) -> io::Result<BorrowedOsHandle<'_>> {
174
        self.stream.try_as_handle()
175
    }
176

            
177
    /// Replace the current `EventLoop` this [`NonblockingConnection`].
178
    ///
179
    /// This should only be done while nothing else is interacting with the stream or the waker.
180
    pub(crate) fn replace_event_loop_handle(&mut self, new_event_loop_handle: Box<dyn EventLoop>) {
181
        let mut h = self.write_handle.inner.lock().expect("Poisoned lock");
182
        h.event_loop = new_event_loop_handle;
183
    }
184

            
185
    /// Return true iff this [`NonblockingConnection`] currently wants to write
186
    ///
187
    /// See [`RpcPoll::wants_to_write`] and [`EventLoop`]
188
    /// for the semantics.
189
    ///
190
    /// [`RpcPoll::wants_to_write`]: crate::RpcPoll::wants_to_write
191
    /// [`EventLoop`]: crate::EventLoop
192
196
    pub(crate) fn wants_to_write(&self) -> bool {
193
196
        self.has_data_to_write()
194
196
    }
195

            
196
    /// Try to exchange messages with the RPC server.
197
    ///
198
    /// If the stream proves to be closed, returns [`PollStatus::Closed`].
199
    ///
200
    /// If a message is available, returns [`PollStatus::Msg`].
201
    /// (Note that a message may be available in the internal buffer here
202
    /// even if try_reading is false.)
203
    ///
204
    /// If no message is available, return [`PollStatus::WouldBlock`].
205
8300
    pub(crate) fn interact_once(&mut self) -> io::Result<PollStatus> {
206
        use io::ErrorKind::WouldBlock;
207

            
208
8300
        if let Some(msg) = self.extract_msg()? {
209
7412
            return Ok(PollStatus::Msg(msg));
210
888
        }
211

            
212
888
        match self.flush_queue() {
213
886
            Ok(()) => {}
214
2
            Err(e) if e.kind() == WouldBlock => {}
215
            Err(e) => return Err(e),
216
        }
217

            
218
888
        match self.read_msg() {
219
690
            Ok(Some(msg)) => return Ok(PollStatus::Msg(msg)),
220
            Ok(None) => return Ok(PollStatus::Closed),
221
198
            Err(e) if e.kind() == WouldBlock => {}
222
2
            Err(e) => return Err(e),
223
        }
224

            
225
196
        Ok(PollStatus::WouldBlock)
226
8300
    }
227

            
228
    /// Internal helper: Try to get a buffered message out of our `read_buf`.
229
    ///
230
    /// Returns Ok(None) if there are no complete lines in the buffer.
231
    ///
232
    /// If there is a line, but it is not valid UTF-8, returns an error and discards the line.
233
9880
    fn extract_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
234
        // Look for an eol within the buffer.
235
9880
        let Some(eol_pos) = memchr::memchr(b'\n', &self.read_buf[..]) else {
236
1778
            return Ok(None);
237
        };
238
        // Split off the part of the buffer ending with the EOF from the remainder.
239
8102
        let mut line = self.read_buf.split_off(eol_pos + 1);
240
        // Put the message in "line" and the remainder of the buffer in read_buf.
241
8102
        mem::swap(&mut line, &mut self.read_buf);
242
        // Try to convert the line to an UnparsedResponse.
243
8102
        match String::from_utf8(line) {
244
8102
            Ok(s) => Ok(Some(UnparsedResponse::new(s))),
245
            Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
246
        }
247
9880
    }
248

            
249
    /// Internal helper: Return true if there is any outgoing data queued to be written.
250
196
    fn has_data_to_write(&self) -> bool {
251
196
        let w = self.write_handle.inner.lock().expect("Lock poisoned");
252
        // See TOCTOU note on WriteHandleImpl: Our rule is to check whether we have data to write
253
        // within the same lock used to hold the waker, so that we can't lose any data.
254
196
        !w.write_buf.is_empty()
255
196
    }
256

            
257
    /// Helper: Try to get a message, reading into our read_buf as needed.
258
    ///
259
    /// (We don't use a BufReader here because
260
    /// its behavior with nonblocking IO is kind of underspecified.)
261
888
    fn read_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
262
        const READLEN: usize = 4096;
263
        loop {
264
1580
            if let Some(msg) = self.extract_msg()? {
265
690
                return Ok(Some(msg));
266
890
            }
267

            
268
890
            let len_orig = self.read_buf.len();
269
            // TODO: Impose a maximum length?
270
890
            self.read_buf.resize(len_orig + READLEN, 0);
271
1335
            let result = retry_eintr(|| self.stream.read(&mut self.read_buf[len_orig..]));
272
890
            match result {
273
                Ok(0) => return Ok(None),
274
692
                Ok(n) => {
275
692
                    self.read_buf.truncate(len_orig + n);
276
692
                }
277
198
                Err(e) => {
278
198
                    self.read_buf.truncate(len_orig);
279
198
                    return Err(e);
280
                }
281
            }
282
        }
283
888
    }
284

            
285
    /// Try to flush data from the underlying write buffer.
286
    ///
287
    /// Returns Ok() only if all of the data is flushed, and the write buffer has become empty.
288
888
    fn flush_queue(&mut self) -> io::Result<()> {
289
888
        let mut w = self.write_handle.inner.lock().expect("Poisoned lock.");
290

            
291
        loop {
292
1690
            if w.write_buf.is_empty() {
293
886
                return Ok(());
294
804
            }
295

            
296
1206
            let n = retry_eintr(|| self.stream.write(&w.write_buf[..]))?;
297
802
            vec_pop_from_front(&mut w.write_buf, n);
298

            
299
802
            if w.write_buf.is_empty() {
300
800
                w.event_loop.stop_writing()?;
301
2
            }
302

            
303
            // This is a no-op for the streams we support so far, but it could be necessary if
304
            // we support more kinds in the future.
305
1203
            let () = retry_eintr(|| self.stream.flush())?;
306
        }
307
888
    }
308
}
309

            
310
/// Representation of an event loop that can watch a handle and arrange to call `poll`
311
///
312
/// Provided to the event-driven nonblocking RPC connection API,
313
/// by the user, via [`connect_polling`].
314
///
315
/// This is only used along with [`RpcPoll`]; if you aren't using that type,
316
/// you don't need to worry about this trait.
317
///
318
/// # Operating principles
319
///
320
/// The user code must implement an event loop,
321
/// which can monitor for handle readability/writeability.
322
///
323
/// The RPC library provides the user code with
324
/// the OS handle for the transport connection.
325
///
326
/// The RPC library always wants to read from the handle.
327
/// It informs the user code whether the RPC library wants to write to the handle.
328
///
329
/// When the handle is readable, or (if applicable) writeable,
330
/// the user code must call into the RPC library via [`RpcPoll::poll`]
331
/// so that the library can perform IO.
332
///
333
/// # Implementation strategies
334
///
335
/// The RPC library's API is designed to be easy to interface
336
/// to existing event loops.
337
///
338
/// ## Plumbing to using an existing event loop
339
///
340
/// With an existing event loop which is suitably reentrant across multiple threads,
341
/// or in single-threaded programs with an existing event loop:
342
///
343
///  * Call `connect_polling`; use `try_as_fd` or `try_as_socket`
344
///    on the returned `RpcPoll` to obtain the OS handle.
345
///  * Register the OS handle with the event loop,
346
///    and ask to be notified when the handle is readable.
347
///  * Implement `EventLoop::start_writing` and `EventLoop::stop_writing`:
348
///    `start_writing` should reregister the handle with the event loop
349
///    to request writeability notifications too;
350
///    `stop_writing` should stop writeability notifications.
351
///  * When notified that the handle is readable or writeable,
352
///    call [`RpcPoll::poll`].
353
///
354
/// Depending on the event loop's API, the type implementing `EventLoop`
355
/// might be a unit struct (if the event loop is global);
356
/// or it might be a handle onto the event loop,
357
/// or some kind of "event source" object if the event loop has those.
358
///
359
/// ## "Main thread only" event loops in multithreaded programs
360
///
361
/// Many real event loops have a "main thread",
362
/// and require all changes to OS handle interests to happen on that thread.
363
///
364
/// If you can't guarantee that all calls to `submit` will be made on the main thread,
365
/// you need to arrange that `start_writing` can add the writeability interest
366
/// even if another thread is currently blocked in the event loop waiting for IO events.
367
///
368
/// To achieve this:
369
///
370
///  * Use an inter-thread communication facility, such as an event loop "waker",
371
///    a self-pipe, or similar technique.  (We'll call this the "waker".)
372
///  * Entrol the receiving end of the waker in the event loop during setup.
373
///  * `EventLoop` contains just the sending handle of the waker,
374
///    not a reference to the real event loop.
375
///    `start_writing` notifies the waker.  `stop_writing` is a no-op.
376
///
377
/// When the event loop notifies your glue code (necessarily, on the main thread)
378
/// that the waker, or the RPC connection OS handle, is ready for IO:
379
///
380
///  * Repeatedly call `RpcPoll::poll` and dispatch any returned `Response`,
381
///    until it returns `WouldBlock`.
382
///  * Call `RpcPoll::wants_to_write` and adjust the RPC connection OS handle interest,
383
///    in the event loop.
384
///
385
/// (You can respond to all such wakeups with this identical, idempotent, response.)
386
///
387
/// # Single-threaded open-coded event loops
388
///
389
/// In a single-threaded program, with an open-coded event loop,
390
/// it is permissible to simply call `wants_to_write`
391
/// to determine the correct value for `pollfd.events`
392
/// (`POLLIN`, plus `POLLOUT` iff `wants_to_write`),
393
/// then `poll(2)`,
394
/// and `RpcPoll::poll` and/or `submit` after `poll(2)`.
395
///
396
/// You can pass an `EventLoop` which implements
397
/// `start_writing` and `stop_writing` as no-ops.
398
///
399
/// (This is because only `submit` can cause `wants_to_write` to change to `true`,
400
/// and if there is only one thread, you can know that you're not calling `submit`
401
/// between `wants_to_write` and `poll(2)`.)
402
///
403
/// # Detailed requirements and guarantees
404
///
405
/// Progress will only be made during calls to `poll`.
406
/// In particular, `submit` does not actually send the data.
407
///
408
/// The program should not sleep, without arranging that readability
409
/// and (as applicable) writeability of the RPC connection handle
410
/// will result in a wakeup.
411
///
412
/// `start_writing` must be effective right away,
413
/// without waiting for any other events:
414
/// if `submit` can be called while another thread
415
/// is in the program's event loop waiting for OS events,
416
/// user code implementing `start_writing` must
417
/// arrange to wake up the event loop if necessary,
418
/// so that writeability will result in a call to `poll`.
419
///
420
/// `start_writing` is only ever called from `submit`,
421
/// on the same thread.
422
///
423
/// `stop_writing` is only ever called from `RpcPoll::poll`,
424
/// on the same thread.
425
///
426
/// All changes to the value which would be returned from `wants_to_write`
427
/// are reflected in `start_writing` and `stop_writing`,
428
/// and vice versa.
429
///
430
/// It is OK to call `RpcPoll::poll` when the handle is not known to be ready;
431
/// `RpcPoll::poll` never blocks, and instead immediately returns `WouldBlock`.
432
/// (A loop which calls `RpcPoll::poll` should involve waiting for
433
/// appropriate readiness on the underling OS handle,
434
/// as the program would otherwise spin rather than wait.)
435
///
436
/// Immediately after creation, a connection made with `connect_polling`
437
/// is not interested in writing.
438
///
439
/// # Relationship to the synchronous API
440
///
441
/// It is also permissible to call the [`.execute()`](crate::RpcConn::execute)
442
/// family of methods on the `RpcConn` returned from `connect_polling`.
443
///
444
/// In this case, `start_writing` might be called
445
/// from `execute`, not just from `submit`.
446
///
447
/// [`RpcPoll`]: crate::RpcPoll
448
/// [`RpcPoll::poll`]: crate::RpcPoll::poll
449
/// [`connect_polling`]: crate::conn::RpcConnBuilder::connect_polling
450
//
451
// When the underlying IO loop is `mio`, this is a [`MioWaker`];
452
// otherwise, it is some user-provided type.
453
pub trait EventLoop: Send + Sync {
454
    /// Alert the polling thread that we are no longer interested in write events.
455
    ///
456
    /// In a user-provided `EventLoop`,
457
    /// this method will only be invoked from within [`RpcPoll::poll`](crate::RpcPoll::poll).
458
    fn stop_writing(&mut self) -> io::Result<()>;
459

            
460
    /// Alert the polling thread that we have become interested in write events.
461
    ///
462
    /// In a user-provided `EventLoop`,
463
    /// this method will only be invoked from within one of the `submit` or `execute` methods
464
    /// on [`RpcConn`](crate::RpcConn).
465
    fn start_writing(&mut self) -> io::Result<()>;
466
}
467

            
468
/// Remove n elements from the front of v.
469
///
470
/// # Panics
471
///
472
/// Panics if `n > v.len()`.
473
812
fn vec_pop_from_front(v: &mut Vec<u8>, n: usize) {
474
    // This returns an iterator, but we don't need to actually iterate over the elements.
475
    // The compiler appears to be smart enough to optimize it away.
476
    // (Cargo asm indicates that this optimizes down to a memmove.)
477
812
    v.drain(0..n);
478
812
}
479

            
480
#[cfg(test)]
481
mod test {
482
    // @@ begin test lint list maintained by maint/add_warning @@
483
    #![allow(clippy::bool_assert_comparison)]
484
    #![allow(clippy::clone_on_copy)]
485
    #![allow(clippy::dbg_macro)]
486
    #![allow(clippy::mixed_attributes_style)]
487
    #![allow(clippy::print_stderr)]
488
    #![allow(clippy::print_stdout)]
489
    #![allow(clippy::single_char_pattern)]
490
    #![allow(clippy::unwrap_used)]
491
    #![allow(clippy::unchecked_time_subtraction)]
492
    #![allow(clippy::useless_vec)]
493
    #![allow(clippy::needless_pass_by_value)]
494
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
495

            
496
    use assert_matches::assert_matches;
497
    use std::cmp::min;
498

            
499
    use super::*;
500

            
501
    impl super::PollStatus {
502
        fn unwrap_msg(self) -> UnparsedResponse {
503
            match self {
504
                PollStatus::Msg(msg) => msg,
505
                other => panic!("Wanted Msg; found {other:?}"),
506
            }
507
        }
508
    }
509

            
510
    #[derive(Default, Debug)]
511
    struct TestWaker {
512
        n_wakes: usize,
513
    }
514
    impl EventLoop for TestWaker {
515
        fn start_writing(&mut self) -> io::Result<()> {
516
            self.n_wakes += 1;
517
            Ok(())
518
        }
519
        fn stop_writing(&mut self) -> io::Result<()> {
520
            Ok(())
521
        }
522
    }
523

            
524
    // Helper: Simulates nonblocking IO.
525
    //
526
    // Has interior mutability so we can inspect it.
527
    #[derive(Default, Debug, Clone)]
528
    struct TestStream {
529
        inner: Arc<Mutex<TestStreamInner>>,
530
    }
531
    #[derive(Default, Debug, Clone)]
532
    struct TestStreamInner {
533
        // Bytes that we have _received_ from the client.
534
        received: Vec<u8>,
535
        // Bytes that we are _sending_ to the client.
536
        sending: Vec<u8>,
537
        receive_capacity: Option<usize>,
538
    }
539
    impl TestStream {
540
        fn push(&self, b: &[u8]) {
541
            self.inner.lock().unwrap().sending.extend_from_slice(b);
542
        }
543
        fn drain(&self, n: usize) -> Vec<u8> {
544
            let mut s = self.inner.lock().unwrap();
545
            let n = min(n, s.received.len());
546
            let mut v = vec![0_u8; n];
547
            v[..].copy_from_slice(&s.received[..n]);
548
            vec_pop_from_front(&mut s.received, n);
549
            v
550
        }
551
    }
552

            
553
    impl io::Read for TestStream {
554
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
555
            let mut s = self.inner.lock().unwrap();
556
            if s.sending.is_empty() {
557
                return Err(io::ErrorKind::WouldBlock.into());
558
            }
559

            
560
            let n_to_copy = min(s.sending.len(), buf.len());
561
            buf[..n_to_copy].copy_from_slice(&s.sending[..n_to_copy]);
562
            vec_pop_from_front(&mut s.sending, n_to_copy);
563
            Ok(n_to_copy)
564
        }
565
    }
566

            
567
    impl io::Write for TestStream {
568
        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
569
            if buf.is_empty() {
570
                return Ok(0);
571
            }
572

            
573
            let mut s = self.inner.lock().unwrap();
574

            
575
            let n_to_copy = match s.receive_capacity {
576
                Some(0) => return Err(io::ErrorKind::WouldBlock.into()),
577
                Some(n) => min(n, buf.len()),
578
                None => buf.len(),
579
            };
580

            
581
            s.received.extend_from_slice(&buf[..n_to_copy]);
582
            if let Some(ref mut n) = s.receive_capacity {
583
                *n -= n_to_copy;
584
            }
585

            
586
            Ok(n_to_copy)
587
        }
588

            
589
        fn flush(&mut self) -> io::Result<()> {
590
            Ok(())
591
        }
592
    }
593
    impl Stream for TestStream {
594
        fn as_mio_source(&mut self) -> Option<&mut dyn mio::event::Source> {
595
            None
596
        }
597
        fn remove_mio(self: Box<Self>) -> Box<dyn Stream> {
598
            self
599
        }
600
        fn try_as_handle(&self) -> io::Result<BorrowedOsHandle<'_>> {
601
            Err(io::Error::from(io::ErrorKind::Other))
602
        }
603
    }
604

            
605
    fn assert_wants_rw(nb: &NonblockingConnection, r: &io::Result<PollStatus>) {
606
        assert_matches!(r, Ok(PollStatus::WouldBlock));
607
        assert_eq!(nb.wants_to_write(), true);
608
    }
609

            
610
    fn assert_wants_r_only(nb: &NonblockingConnection, r: &io::Result<PollStatus>) {
611
        assert_matches!(r, Ok(PollStatus::WouldBlock));
612
        assert_eq!(nb.wants_to_write(), false);
613
    }
614

            
615
    #[test]
616
    fn read_msg() {
617
        let test_stream = TestStream::default();
618
        let mut nbconn = NonblockingConnection::new(
619
            Box::new(TestWaker::default()),
620
            Box::new(test_stream.clone()),
621
        );
622

            
623
        // Try interacting with nothing to do.
624
        let r = nbconn.interact_once();
625
        assert_wants_r_only(&nbconn, &r);
626

            
627
        // Give it a partial message.
628
        test_stream.push(b"Hello world");
629
        let r = nbconn.interact_once();
630
        assert_wants_r_only(&nbconn, &r);
631

            
632
        // Finish the message.
633
        test_stream.push(b"\nAnd many happy");
634
        let r = nbconn.interact_once();
635
        assert_eq!(r.unwrap().unwrap_msg().as_str(), "Hello world\n");
636

            
637
        // Then it should block...
638
        let r = nbconn.interact_once();
639
        assert_wants_r_only(&nbconn, &r);
640

            
641
        // Finish two more messages, and leave a partial message.
642
        test_stream.push(b" returns\nof the day\nto you!");
643
        let r = nbconn.interact_once();
644
        assert_eq!(r.unwrap().unwrap_msg().as_str(), "And many happy returns\n");
645
        let r = nbconn.interact_once();
646
        assert_eq!(r.unwrap().unwrap_msg().as_str(), "of the day\n");
647
    }
648

            
649
    #[test]
650
    fn write_msg() {
651
        let test_stream = TestStream::default();
652
        let mut nbconn = NonblockingConnection::new(
653
            Box::new(TestWaker::default()),
654
            Box::new(test_stream.clone()),
655
        );
656
        let writer = nbconn.writer();
657

            
658
        // Make sure we can write in a nonblocking way...
659
        let req1 = r#"{"id":7,
660
                 "obj":"foo",
661
                 "method":"arti:x-frob", "params":{},
662
                 "extra": "preserved"
663
            }"#;
664
        let v = ValidatedRequest::from_string_strict(req1).unwrap();
665
        writer.send_valid(&v).unwrap();
666

            
667
        // At this point the above request is queued, but won't be sent until we interact.
668
        {
669
            assert!(test_stream.inner.lock().unwrap().received.is_empty());
670
        }
671

            
672
        // Now interact. This will cause the whole request to get flushed.
673
        let r = nbconn.interact_once();
674
        assert_wants_r_only(&nbconn, &r);
675

            
676
        let m = test_stream.drain(v.as_ref().len());
677
        assert_eq!(m, v.as_ref().as_bytes());
678

            
679
        // Now try again, but with a blocked stream.
680
        {
681
            test_stream.inner.lock().unwrap().receive_capacity = Some(32);
682
        }
683
        writer.send_valid(&v).unwrap();
684

            
685
        let r: Result<PollStatus, io::Error> = nbconn.interact_once();
686
        assert_wants_rw(&nbconn, &r);
687
        {
688
            assert_eq!(test_stream.inner.lock().unwrap().received.len(), 32);
689
            // Make the capacity unlimited.
690
            test_stream.inner.lock().unwrap().receive_capacity = None;
691
        }
692
        let r: Result<PollStatus, io::Error> = nbconn.interact_once();
693
        assert_wants_r_only(&nbconn, &r);
694
        let m = test_stream.drain(v.as_ref().len());
695
        assert_eq!(m, v.as_ref().as_bytes());
696
    }
697
}