1
//! Helper implementation for a type wrapping an AsyncBufReader
2
//! and making its poll_fill_buf() method fused with respect to errors and EOF.
3

            
4
use std::{
5
    io,
6
    pin::Pin,
7
    sync::Arc,
8
    task::{Context, Poll, ready},
9
};
10

            
11
use futures::{AsyncBufRead, AsyncWrite};
12
use pin_project::pin_project;
13

            
14
use crate::arc_io_result::ArcIoResult;
15

            
16
/// A wrapper around a [`AsyncBufRead`] that provides fuse-like behavior
17
/// for terminal states (EOF and Err).
18
///
19
/// Only uses the [`consume`] and [`poll_fill_buf`] methods from the inner
20
/// `AsyncBufRead`.  Once `poll_fill_buf` has returned Ok(&[]) or Err(),
21
/// it won't be called again, and only the last returned value will be
22
/// returned.
23
///
24
/// Because we need the ability to return multiple copies of the same error,
25
/// and `io::Error` doesn't support Clone, we have to return a new `io::Error`
26
/// wrapping the original `io::Error` in an `Arc`.
27
///
28
/// [`consume`]: AsyncBufRead::consume
29
/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
30
#[derive(Debug)]
31
#[pin_project]
32
pub(crate) struct FuseBufReader<R> {
33
    /// The inner reader that we're wraping.
34
    #[pin]
35
    inner: R,
36

            
37
    /// The fused outcome of this reader, if any.
38
    outcome: Option<ArcIoResult<()>>,
39
}
40

            
41
impl<R> FuseBufReader<R> {
42
    /// Construct a new FuseBufReader.
43
176
    pub(crate) fn new(reader: R) -> Self {
44
176
        Self {
45
176
            inner: reader,
46
176
            outcome: None,
47
176
        }
48
176
    }
49

            
50
    /// Return a pinned pointer to the inner object.
51
104
    pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
52
104
        self.project().inner
53
104
    }
54

            
55
    /// Consume this `FuseBufReader` and return its underlying reader.
56
    pub(crate) fn into_inner(self) -> R {
57
        self.inner
58
    }
59
}
60

            
61
impl<R: AsyncBufRead> FuseBufReader<R> {
62
    /// As [`AsyncBufRead::consume`].
63
239292
    pub(crate) fn consume(self: Pin<&mut Self>, n_written: usize) {
64
239292
        self.project().inner.consume(n_written);
65
239292
    }
66

            
67
    /// As [`AsyncBufRead::poll_fill_buf`], except as noted in the type documentation.
68
287660
    pub(crate) fn poll_fill_buf(
69
287660
        self: Pin<&mut Self>,
70
287660
        cx: &mut Context<'_>,
71
287660
    ) -> Poll<ArcIoResult<&[u8]>> {
72
287660
        let this = self.project();
73
287660
        if let Some(outcome) = this.outcome {
74
            return Poll::Ready(outcome.clone().map(|()| &[][..]));
75
287660
        }
76

            
77
287660
        match ready!(this.inner.poll_fill_buf(cx)) {
78
265980
            Ok(empty @ &[]) => {
79
152
                *this.outcome = Some(Ok(()));
80
152
                Poll::Ready(Ok(empty))
81
            }
82
265828
            Ok(buf) => Poll::Ready(Ok(buf)),
83

            
84
12
            Err(e) => {
85
12
                let outcome = Err(Arc::new(e));
86
12
                let result = outcome.clone().map(|()| &[][..]);
87
12
                *this.outcome = Some(outcome);
88
12
                Poll::Ready(result)
89
            }
90
        }
91
287660
    }
92
}
93

            
94
impl<W> AsyncWrite for FuseBufReader<W>
95
where
96
    W: AsyncWrite,
97
{
98
105092
    fn poll_write(
99
105092
        self: Pin<&mut Self>,
100
105092
        cx: &mut Context<'_>,
101
105092
        buf: &[u8],
102
105092
    ) -> Poll<io::Result<usize>> {
103
105092
        self.project().inner.poll_write(cx, buf)
104
105092
    }
105

            
106
10328
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
107
10328
        self.project().inner.poll_flush(cx)
108
10328
    }
109

            
110
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
111
        self.project().inner.poll_close(cx)
112
    }
113
}