1
//! Low-level nonblocking stream implementation.
2
//!
3
//! This module defines two main types: [`NonblockingStream`].
4
//! (a low-level type for use by external tools
5
//! that want to implement their own nonblocking IO),
6
//! and [`PollingStream`] (a slightly higher-level type
7
//! that we use internally when we are asked to provide
8
//! our own nonblocking IO loop(s)).
9
//!
10
//! This module also defines several traits for use by these types.
11
//!
12
//! Treats messages as unrelated strings, and validates outgoing messages for correctness.
13
//!
14
//! TODO nb: For now, nothing in this module is actually public; we'll want to expose some of these types.
15

            
16
use mio::Interest;
17

            
18
use crate::{
19
    msgs::{request::ValidatedRequest, response::UnparsedResponse},
20
    util::define_from_for_arc,
21
};
22
use std::{
23
    io::{self, Read as _, Write as _},
24
    mem,
25
    sync::{Arc, Mutex},
26
};
27

            
28
/// An IO stream to Arti, along with any supporting logic necessary to check it for readiness.
29
///
30
/// Internally, this uses `mio` along with a [`NonblockingStream`] to check for events.
31
///
32
/// To use this type, mark the stream as nonblocking
33
/// with e.g. [TcpStream::set_nonblocking](std::net::TcpStream::set_nonblocking),
34
/// convert it into a [`mio::event::Source`],
35
/// and pass it to [`PollingStream::new()`]
36
///
37
/// At this point, you can read and write messages via nonblocking IO.
38
///
39
/// The [`PollingStream::writer()`] method will return a handle that you can use from any thread
40
/// that you can use to queue an outbound message.
41
///
42
/// No messages are actually sent or received unless some thread is calling [`PollingStream::interact()`].
43
///
44
/// ## Concurrency and interior mutability
45
///
46
/// A `PollingStream` has (limited) interior mutability.
47
///
48
/// Only a single call to `interact` can be made at the same time.
49
/// So only one thread can be waiting for responses, and
50
/// the caller of `interact` must demultiplex responses as necessary.
51
///
52
/// But, one or more [`WriteHandle`]s can be created,
53
/// and these are `'static + Send + Sync`.
54
/// Using `WriteHandle`, multiple threads can enqueue requests,
55
/// with [`send_valid`](WriteHandle::send_valid), concurrently.
56
///
57
/// (All these restrictions imposed on the caller are enforced by the Rust type system.)
58
#[derive(Debug)]
59
pub(crate) struct PollingStream {
60
    /// The poll object.
61
    ///
62
    /// (This typically corresponds to a kqueue or epoll handle.)
63
    ///
64
    /// ## IO Safety
65
    ///
66
    /// This object (semantically) contains references to the `fd`s or `SOCKETS`
67
    /// of any inserted [`mio::event::Source`].  Therefore it must not outlive those sources.
68
    /// Further, according to `mio`'s documentation, every Source must be deregistered
69
    /// before it can be dropped.
70
    ///
71
    /// We ensure these properties are obeyed as follows:
72
    ///  - We hold the stream via `stream`, the NonblockingStream member of this struct.
73
    ///    We do not let anybody outside this module have the stream or the `Poll`.
74
    ///  - We declare a Drop implementation that deregisters the stream.
75
    ///    This method ensures that the stream is dropped before it is closed.
76
    poll: mio::Poll,
77

            
78
    /// A small buffer to receive IO readiness events.
79
    events: mio::Events,
80

            
81
    /// The underlying stream.
82
    ///
83
    /// Invariant: `stream.stream` is a [`MioStream`], so [`Stream::as_mio_stream`] will return
84
    /// Some when we call it.
85
    stream: NonblockingStream,
86
}
87

            
88
/// A `mio` token corresponding to the Waker we use to tell the interactor about new writes.
89
const WAKE_TOKEN: mio::Token = mio::Token(0);
90

            
91
/// A `mio` token corresponding to the Stream connecting to the RPC
92
const STREAM_TOKEN: mio::Token = mio::Token(1);
93

            
94
impl PollingStream {
95
    /// Create a new PollingStream.
96
    ///
97
    /// The `stream` will be set to use nonblocking IO;
98
    /// on Unix this will affect the behaviour of other `dup`s of the same fd!
99
10
    pub(crate) fn new(stream: Box<dyn MioStream>) -> io::Result<Self> {
100
10
        let poll = mio::Poll::new()?;
101
10
        let waker = mio::Waker::new(poll.registry(), WAKE_TOKEN)?;
102

            
103
10
        let stream = NonblockingStream::new(Box::new(waker), stream);
104

            
105
10
        let mut cio = Self {
106
10
            poll,
107
10
            events: mio::Events::with_capacity(4),
108
10
            stream,
109
10
        };
110

            
111
        // We register the stream here, since we want to use it exclusively with `reregister`
112
        // later on.  We do not deregister the stream until `Drop::drop` is called.
113
10
        cio.poll.registry().register(
114
10
            cio.stream
115
10
                .stream
116
10
                .as_mio_stream()
117
10
                .expect("logic error: not a mio stream."),
118
            STREAM_TOKEN,
119
            Interest::READABLE,
120
        )?;
121

            
122
10
        Ok(cio)
123
10
    }
124

            
125
    /// Return a new [`WriteHandle`] that can be used to queue messages to be sent via this stream.
126
10
    pub(crate) fn writer(&self) -> WriteHandle {
127
10
        self.stream.writer()
128
10
    }
129

            
130
    /// Interact with the peer until some response is received.
131
    ///
132
    /// Sends all requests given to [`WriteHandle::send_valid`]
133
    /// (including calls to `send_valid` made while `interact` is running)
134
    /// while looking for a response from the server.
135
    /// Returns when the first response is received.
136
    ///
137
    ///
138
    /// Returns an error if an IO condition has failed.
139
    /// Returns None if the other side has closed the stream.
140
    /// Otherwise, returns an unparsed message from the RPC server.
141
    ///
142
    /// Unless some thread is calling this method, nobody will actually be reading or writing from
143
    /// the [`PollingStream`], and so nobody's requests will be sent or answered.
144
8062
    pub(crate) fn interact(&mut self) -> io::Result<Option<UnparsedResponse>> {
145
        // Should we try to read and write? Start out by assuming "yes".
146
8062
        let mut try_writing = true;
147
8062
        let mut try_reading = true;
148

            
149
        loop {
150
            // Try interacting with the underlying stream.
151
8266
            let want_io = match self.stream.interact_once(try_writing, try_reading)? {
152
                PollStatus::Closed => return Ok(None),
153
8060
                PollStatus::Msg(msg) => return Ok(Some(msg)),
154
204
                PollStatus::WouldBlock(w) => w,
155
            };
156

            
157
            // We're blocking on reading and possibly writing.  Register our interest,
158
            // so that we get woken as appropriate.
159
204
            self.poll.registry().reregister(
160
204
                self.stream
161
204
                    .stream
162
204
                    .as_mio_stream()
163
204
                    .expect("logic error: not a mio stream!"),
164
                STREAM_TOKEN,
165
204
                want_io.into(),
166
            )?;
167

            
168
            // Poll until the socket is ready to read or write,
169
            // _or_ until somebody invokes the Waker because they have queued more to write.
170
306
            let () = retry_eintr(|| self.poll.poll(&mut self.events, None))?;
171

            
172
            // Now that we've been woken, see which events we've been woken with,
173
            // and adjust our plans accordingly on the next time through the loop.
174
204
            try_reading = false;
175
204
            try_writing = false;
176
220
            for event in self.events.iter() {
177
220
                if event.token() == STREAM_TOKEN {
178
70
                    if event.is_readable() {
179
62
                        try_reading = true;
180
62
                    }
181
70
                    if event.is_writable() {
182
12
                        try_writing = true;
183
58
                    }
184
150
                } else if event.token() == WAKE_TOKEN {
185
150
                    try_writing = true;
186
150
                }
187
            }
188
        }
189
8062
    }
190
}
191

            
192
impl Drop for PollingStream {
193
10
    fn drop(&mut self) {
194
        // IO SAFETY: See "IO Safety" note in documentation for PollingStream.
195
10
        let s = self
196
10
            .stream
197
10
            .stream
198
10
            .as_mio_stream()
199
10
            .expect("Logic error: Stream was not a MIO stream.");
200
10
        self.poll
201
10
            .registry()
202
10
            .deregister(s)
203
10
            .expect("Deregister operation failed");
204
10
    }
205
}
206

            
207
/// A handle that can be used to queue outgoing messages for a nonblocking stream.
208
///
209
/// Note that queueing a message has no effect unless some party is polling the stream,
210
/// either with [`PollingStream::interact()`], or [`NonblockingStream::interact_once()`].
211
#[derive(Clone, Debug)]
212
pub(crate) struct WriteHandle {
213
    /// The actual implementation type for this writer.
214
    inner: Arc<Mutex<WriteHandleImpl>>,
215
}
216

            
217
impl WriteHandle {
218
    /// Queue an outgoing message for a nonblocking stream.
219
4168
    pub(crate) fn send_valid(&self, msg: &ValidatedRequest) -> io::Result<()> {
220
4168
        let mut w = self.inner.lock().expect("Poisoned lock");
221
4168
        w.write_buf.extend_from_slice(msg.as_ref().as_bytes());
222

            
223
        // See TOCTOU note on `WriteHandleImpl`: we need to wake() while we are holding the
224
        // above mutex.
225
4168
        w.waker.wake()
226
4168
    }
227
}
228

            
229
/// An error that has occurred while sending a request.
230
#[derive(Clone, Debug, thiserror::Error)]
231
#[non_exhaustive]
232
pub enum SendRequestError {
233
    /// An IO error occurred while sending a request.
234
    #[error("Unable to wake poling loop")]
235
    Io(#[source] Arc<io::Error>),
236
    /// We found a problem in the JSON while sending a request.
237
    #[error("Invalid Json request")]
238
    InvalidRequest(#[from] crate::InvalidRequestError),
239
    /// Internal error while re-encoding request.  Should be impossible.
240
    #[error("Unable to re-encode request after parsing it‽")]
241
    ReEncode(#[source] Arc<serde_json::Error>),
242
}
243
define_from_for_arc!( io::Error => SendRequestError [Io] );
244

            
245
/// The inner implementation for [`WriteHandle`].
246
///
247
/// NOTE: We need to be careful to avoid TOCTOU problems with this type:
248
/// It would be bad if a writing thread called `waker.wake()`, and then the interactor checked the
249
/// buffer and found it empty, and only then did the writing thread add to the buffer.
250
///
251
/// To solve this, we put the write_buf and the waker behind the same lock:
252
/// While the interactor is checking the buffer, nobody is able to add to the buffer _or_ wake the
253
/// interactor.
254
#[derive(derive_more::Debug)]
255
struct WriteHandleImpl {
256
    /// An underlying buffer holding messages to be sent to the RPC server.
257
    //
258
    // TODO: Consider using a VecDeque or BytesMut or such.
259
    write_buf: Vec<u8>,
260

            
261
    /// The waker to use to wake the polling loop.
262
    #[debug(ignore)]
263
    waker: Box<dyn Waker>,
264
}
265

            
266
/// A lower-level implementation of nonblocking IO for an open stream to the RPC server.
267
///
268
/// Unlike [`PollingStream`], this type _does not_ handle the IO event polling loops:
269
/// the caller is required to provide their own.
270
#[derive(derive_more::Debug)]
271
pub(crate) struct NonblockingStream {
272
    /// A write handle used to write onto this stream.
273
    #[debug(ignore)]
274
    write_handle: WriteHandle,
275

            
276
    /// A buffer of incoming messages (possibly partial) from the RPC server.
277
    //
278
    // TODO: Consider using a VecDeque or BytesMut or such.
279
    read_buf: Vec<u8>,
280

            
281
    /// The underlying nonblocking stream.
282
    #[debug(ignore)]
283
    stream: Box<dyn Stream>,
284
}
285

            
286
/// Helper to return which events a [`NonblockingStream`] is interested in.
287
#[derive(Clone, Debug, Default, Copy)]
288
pub(crate) struct WantIo {
289
    /// True if the stream is interested in writing.
290
    ///
291
    /// (It is always interested in reading.)
292
    write: bool,
293
}
294

            
295
#[allow(dead_code)] // TODO nb: remove or expose.
296
impl WantIo {
297
    /// Return true if the stream is interested in reading.
298
    fn want_read(&self) -> bool {
299
        true
300
    }
301

            
302
    /// Return true if the stream is interested in writing.
303
12
    fn want_write(&self) -> bool {
304
12
        self.write
305
12
    }
306
}
307

            
308
impl From<WantIo> for mio::Interest {
309
204
    fn from(value: WantIo) -> Self {
310
204
        if value.write {
311
12
            mio::Interest::WRITABLE | mio::Interest::READABLE
312
        } else {
313
192
            mio::Interest::READABLE
314
        }
315
204
    }
316
}
317

            
318
/// A return value from [`NonblockingStream::interact_once`].
319
#[derive(Debug, Clone)]
320
pub(crate) enum PollStatus {
321
    /// The stream is closed.
322
    Closed,
323

            
324
    /// No progress can be made until the stream is available for further IO.
325
    WouldBlock(WantIo),
326

            
327
    /// We have received a message.
328
    Msg(UnparsedResponse),
329
}
330

            
331
impl NonblockingStream {
332
    /// Create a new `NonblockingStream` from a provided [`Waker`] and [`Stream`].
333
14
    pub(crate) fn new(waker: Box<dyn Waker>, stream: Box<dyn Stream>) -> Self {
334
14
        Self {
335
14
            write_handle: WriteHandle {
336
14
                inner: Arc::new(Mutex::new(WriteHandleImpl {
337
14
                    write_buf: Default::default(),
338
14
                    waker,
339
14
                })),
340
14
            },
341
14
            read_buf: Default::default(),
342
14
            stream,
343
14
        }
344
14
    }
345

            
346
    /// Return a new [`WriteHandle`] that can be used to queue messages to be sent via this stream.
347
12
    pub(crate) fn writer(&self) -> WriteHandle {
348
12
        self.write_handle.clone()
349
12
    }
350

            
351
    /// Try to exchange messages with the RPC server.
352
    ///
353
    /// If `try_reading` is true, then we should try reading from the RPC server.
354
    /// If `try_writing` is true, then we should try flushing messages to the RPC server
355
    /// (if we have any).
356
    ///
357
    /// If the stream proves to be closed, returns [`PollStatus::Closed`].
358
    ///
359
    /// If a message is available, returns [`PollStatus::Msg`].
360
    /// (Note that a message may be available in the internal buffer here even if try_reading is false.)
361
    ///
362
    /// If no message is available, return [`PollStatus::WouldBlock`] with a [`WantIo`]
363
    /// describing which IO operations we would like to perform.
364
8284
    pub(crate) fn interact_once(
365
8284
        &mut self,
366
8284
        try_writing: bool,
367
8284
        try_reading: bool,
368
8284
    ) -> io::Result<PollStatus> {
369
        use io::ErrorKind::WouldBlock;
370

            
371
8284
        if let Some(msg) = self.extract_msg()? {
372
7376
            return Ok(PollStatus::Msg(msg));
373
908
        }
374

            
375
908
        let mut want_io = WantIo::default();
376

            
377
908
        if try_writing {
378
854
            match self.flush_queue() {
379
852
                Ok(()) => {}
380
2
                Err(e) if e.kind() == WouldBlock => want_io.write = true,
381
                Err(e) => return Err(e),
382
            }
383
54
        }
384
908
        if try_reading {
385
766
            match self.read_msg() {
386
690
                Ok(Some(msg)) => return Ok(PollStatus::Msg(msg)),
387
                Ok(None) => return Ok(PollStatus::Closed),
388
76
                Err(e) if e.kind() == WouldBlock => {}
389
2
                Err(e) => return Err(e),
390
            }
391
142
        }
392

            
393
216
        if !want_io.write && self.has_data_to_write() {
394
12
            want_io.write = true;
395
204
        }
396

            
397
216
        Ok(PollStatus::WouldBlock(want_io))
398
8284
    }
399

            
400
    /// Internal helper: Try to get a buffered message out of our `read_buf`.
401
    ///
402
    /// Returns Ok(None) if there are no complete lines in the buffer.
403
    ///
404
    /// If there is a line, but it is not valid UTF-8, returns an error and discards the line.
405
9742
    fn extract_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
406
        // Look for an eol within the buffer.
407
9742
        let Some(eol_pos) = memchr::memchr(b'\n', &self.read_buf[..]) else {
408
1676
            return Ok(None);
409
        };
410
        // Split off the part of the buffer ending with the EOF from the remainder.
411
8066
        let mut line = self.read_buf.split_off(eol_pos + 1);
412
        // Put the message in "line" and the remainder of the buffer in read_buf.
413
8066
        mem::swap(&mut line, &mut self.read_buf);
414
        // Try to convert the line to an UnparsedResponse.
415
8066
        match String::from_utf8(line) {
416
8066
            Ok(s) => Ok(Some(UnparsedResponse::new(s))),
417
            Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
418
        }
419
9742
    }
420

            
421
    /// Internal helper: Return true if there is any outgoing data queued to be written.
422
214
    fn has_data_to_write(&self) -> bool {
423
214
        let w = self.write_handle.inner.lock().expect("Lock poisoned");
424
        // See TOCTOU note on WriteHandleImpl: Our rule is to check whether we have data to write
425
        // within the same lock used to hold the waker, so that we can't lose any data.
426
214
        !w.write_buf.is_empty()
427
214
    }
428

            
429
    /// Helper: Try to get a message, reading into our read_buf as needed.
430
    ///
431
    /// (We don't use a BufReader here because its behavior with nonblocking IO is kind of underspecified.)
432
766
    fn read_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
433
        const READLEN: usize = 4096;
434
        loop {
435
1458
            if let Some(msg) = self.extract_msg()? {
436
690
                return Ok(Some(msg));
437
768
            }
438

            
439
768
            let len_orig = self.read_buf.len();
440
            // TODO: Impose a maximum length?
441
768
            self.read_buf.resize(len_orig + READLEN, 0);
442
1152
            let result = retry_eintr(|| self.stream.read(&mut self.read_buf[len_orig..]));
443
768
            match result {
444
                Ok(0) => return Ok(None),
445
692
                Ok(n) => {
446
692
                    self.read_buf.truncate(len_orig + n);
447
692
                }
448
76
                Err(e) => {
449
76
                    self.read_buf.truncate(len_orig);
450
76
                    return Err(e);
451
                }
452
            }
453
        }
454
766
    }
455

            
456
    /// Try to flush data from the underlying write buffer.
457
    ///
458
    /// Returns Ok() only if all of the data is flushed, and the write buffer has become empty.
459
854
    fn flush_queue(&mut self) -> io::Result<()> {
460
854
        let mut w = self.write_handle.inner.lock().expect("Poisoned lock.");
461
        loop {
462
1642
            if w.write_buf.is_empty() {
463
852
                return Ok(());
464
790
            }
465

            
466
1185
            let n = retry_eintr(|| self.stream.write(&w.write_buf[..]))?;
467
788
            vec_pop_from_front(&mut w.write_buf, n);
468

            
469
            // This is a no-op for the streams we support so far, but it could be necessary if
470
            // we support more kinds in the future.
471
1182
            let () = retry_eintr(|| self.stream.flush())?;
472
        }
473
854
    }
474
}
475

            
476
/// Any type we can use as a target for [`NonblockingStream`].
477
pub(crate) trait Stream: io::Read + io::Write + Send {
478
    /// If this Stream object is a [`MioStream`], upcast it to one.
479
    ///
480
    /// Otherwise return None.
481
    fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream>;
482
}
483

            
484
/// A [`Stream`] that we can use inside a [`PollingStream`].
485
pub(crate) trait MioStream: Stream + mio::event::Source {}
486

            
487
/// An object that can wake a pending IO poller.
488
///
489
/// When the underlying IO loop is `mio`, this is a [`mio::Waker`];
490
/// otherwise, it is some user-provided type.
491
pub(crate) trait Waker: Send + Sync {
492
    /// Alert the polling thread.
493
    fn wake(&mut self) -> io::Result<()>;
494
}
495

            
496
impl Waker for mio::Waker {
497
4164
    fn wake(&mut self) -> io::Result<()> {
498
4164
        mio::Waker::wake(self)
499
4164
    }
500
}
501

            
502
/// Implement Stream and MioStream for a related pair of types.
503
macro_rules! impl_traits {
504
    { $stream:ty => $mio_stream:ty } => {
505
        impl Stream for $stream {
506
            fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
507
                None
508
            }
509
        }
510
        impl Stream for $mio_stream {
511
224
            fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
512
224
                Some(self as _)
513
224
            }
514
        }
515
        impl MioStream for $mio_stream {}
516
    }
517
}
518

            
519
impl_traits! { std::net::TcpStream => mio::net::TcpStream }
520
#[cfg(unix)]
521
impl_traits! { std::os::unix::net::UnixStream => mio::net::UnixStream }
522

            
523
/// Remove n elements from the front of v.
524
///
525
/// # Panics
526
///
527
/// Panics if `n > v.len()`.
528
798
fn vec_pop_from_front(v: &mut Vec<u8>, n: usize) {
529
    // This returns an iterator, but we don't need to actually iterate over the elements.
530
    // The compiler appears to be smart enough to optimize it away.
531
    // (Cargo asm indicates that this optimizes down to a memmove.)
532
798
    v.drain(0..n);
533
798
}
534

            
535
/// Retry `f` until it returns Ok() or an error whose kind is not `Interrupted`.
536
2550
fn retry_eintr<F, T>(mut f: F) -> io::Result<T>
537
2550
where
538
2550
    F: FnMut() -> io::Result<T>,
539
{
540
    loop {
541
2550
        let r = f();
542
78
        match r {
543
78
            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
544
2550
            _ => return r,
545
        }
546
    }
547
2550
}
548

            
549
#[cfg(test)]
550
mod test {
551
    // @@ begin test lint list maintained by maint/add_warning @@
552
    #![allow(clippy::bool_assert_comparison)]
553
    #![allow(clippy::clone_on_copy)]
554
    #![allow(clippy::dbg_macro)]
555
    #![allow(clippy::mixed_attributes_style)]
556
    #![allow(clippy::print_stderr)]
557
    #![allow(clippy::print_stdout)]
558
    #![allow(clippy::single_char_pattern)]
559
    #![allow(clippy::unwrap_used)]
560
    #![allow(clippy::unchecked_time_subtraction)]
561
    #![allow(clippy::useless_vec)]
562
    #![allow(clippy::needless_pass_by_value)]
563
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
564

            
565
    use std::cmp::min;
566

            
567
    use super::*;
568

            
569
    impl super::PollStatus {
570
        fn unwrap_wantio(self) -> WantIo {
571
            match self {
572
                PollStatus::WouldBlock(want_io) => want_io,
573
                other => panic!("Wanted WantIo; found {other:?}"),
574
            }
575
        }
576

            
577
        fn unwrap_msg(self) -> UnparsedResponse {
578
            match self {
579
                PollStatus::Msg(msg) => msg,
580
                other => panic!("Wanted Msg; found {other:?}"),
581
            }
582
        }
583
    }
584

            
585
    #[derive(Default, Debug)]
586
    struct TestWaker {
587
        n_wakes: usize,
588
    }
589
    impl Waker for TestWaker {
590
        fn wake(&mut self) -> io::Result<()> {
591
            self.n_wakes += 1;
592
            Ok(())
593
        }
594
    }
595

            
596
    // Helper: Simulates nonblocking IO.
597
    //
598
    // Has interior mutability so we can inspect it.
599
    #[derive(Default, Debug, Clone)]
600
    struct TestStream {
601
        inner: Arc<Mutex<TestStreamInner>>,
602
    }
603
    #[derive(Default, Debug, Clone)]
604
    struct TestStreamInner {
605
        // Bytes that we have _received_ from the client.
606
        received: Vec<u8>,
607
        // Bytes that we are _sending_ to the client.
608
        sending: Vec<u8>,
609
        receive_capacity: Option<usize>,
610
    }
611
    impl TestStream {
612
        fn push(&self, b: &[u8]) {
613
            self.inner.lock().unwrap().sending.extend_from_slice(b);
614
        }
615
        fn drain(&self, n: usize) -> Vec<u8> {
616
            let mut s = self.inner.lock().unwrap();
617
            let n = min(n, s.received.len());
618
            let mut v = vec![0_u8; n];
619
            v[..].copy_from_slice(&s.received[..n]);
620
            vec_pop_from_front(&mut s.received, n);
621
            v
622
        }
623
    }
624

            
625
    impl io::Read for TestStream {
626
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
627
            let mut s = self.inner.lock().unwrap();
628
            if s.sending.is_empty() {
629
                return Err(io::ErrorKind::WouldBlock.into());
630
            }
631

            
632
            let n_to_copy = min(s.sending.len(), buf.len());
633
            buf[..n_to_copy].copy_from_slice(&s.sending[..n_to_copy]);
634
            vec_pop_from_front(&mut s.sending, n_to_copy);
635
            Ok(n_to_copy)
636
        }
637
    }
638

            
639
    impl io::Write for TestStream {
640
        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
641
            if buf.is_empty() {
642
                return Ok(0);
643
            }
644

            
645
            let mut s = self.inner.lock().unwrap();
646

            
647
            let n_to_copy = match s.receive_capacity {
648
                Some(0) => return Err(io::ErrorKind::WouldBlock.into()),
649
                Some(n) => min(n, buf.len()),
650
                None => buf.len(),
651
            };
652

            
653
            s.received.extend_from_slice(&buf[..n_to_copy]);
654
            if let Some(ref mut n) = s.receive_capacity {
655
                *n -= n_to_copy;
656
            }
657

            
658
            Ok(n_to_copy)
659
        }
660

            
661
        fn flush(&mut self) -> io::Result<()> {
662
            Ok(())
663
        }
664
    }
665
    impl Stream for TestStream {
666
        fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
667
            None
668
        }
669
    }
670

            
671
    #[test]
672
    fn read_msg() {
673
        let test_stream = TestStream::default();
674
        let mut stream = NonblockingStream::new(
675
            Box::new(TestWaker::default()),
676
            Box::new(test_stream.clone()),
677
        );
678

            
679
        // Try interacting with nothing to do.
680
        let r = stream.interact_once(true, true);
681
        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
682

            
683
        // Give it a partial message.
684
        test_stream.push(b"Hello world");
685
        let r = stream.interact_once(true, true);
686
        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
687

            
688
        // Finish the message.
689
        test_stream.push(b"\nAnd many happy");
690
        let r = stream.interact_once(true, true);
691
        assert_eq!(r.unwrap().unwrap_msg().as_str(), "Hello world\n");
692

            
693
        // Then it should block...
694
        let r = stream.interact_once(true, true);
695
        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
696

            
697
        // Finish two more messages, and leave a partial message.
698
        test_stream.push(b" returns\nof the day\nto you!");
699
        let r = stream.interact_once(true, true);
700
        assert_eq!(r.unwrap().unwrap_msg().as_str(), "And many happy returns\n");
701
        let r = stream.interact_once(true, true);
702
        assert_eq!(r.unwrap().unwrap_msg().as_str(), "of the day\n");
703
    }
704

            
705
    #[test]
706
    fn write_msg() {
707
        let test_stream = TestStream::default();
708
        let mut stream = NonblockingStream::new(
709
            Box::new(TestWaker::default()),
710
            Box::new(test_stream.clone()),
711
        );
712
        let writer = stream.writer();
713

            
714
        // Make sure we can write in a nonblocking way...
715
        let req1 = r#"{"id":7,
716
                 "obj":"foo",
717
                 "method":"arti:x-frob", "params":{},
718
                 "extra": "preserved"
719
            }"#;
720
        let v = ValidatedRequest::from_string_strict(req1).unwrap();
721
        writer.send_valid(&v).unwrap();
722

            
723
        // At this point the above request is queued, but won't be sent until we interact.
724
        {
725
            assert!(test_stream.inner.lock().unwrap().received.is_empty());
726
        }
727

            
728
        // Now interact. This will cause the whole request to get flushed.
729
        let r = stream.interact_once(true, true);
730
        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
731

            
732
        let m = test_stream.drain(v.as_ref().len());
733
        assert_eq!(m, v.as_ref().as_bytes());
734

            
735
        // Now try again, but with a blocked stream.
736
        {
737
            test_stream.inner.lock().unwrap().receive_capacity = Some(32);
738
        }
739
        writer.send_valid(&v).unwrap();
740

            
741
        let r: Result<PollStatus, io::Error> = stream.interact_once(true, true);
742
        assert_eq!(r.unwrap().unwrap_wantio().want_write(), true);
743
        {
744
            assert_eq!(test_stream.inner.lock().unwrap().received.len(), 32);
745
            // Make the capacity unlimited.
746
            test_stream.inner.lock().unwrap().receive_capacity = None;
747
        }
748
        let r: Result<PollStatus, io::Error> = stream.interact_once(true, true);
749
        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
750
        let m = test_stream.drain(v.as_ref().len());
751
        assert_eq!(m, v.as_ref().as_bytes());
752
    }
753

            
754
    // TODO nb: It would be good to have additional tests for the MIO code as well.
755
}