1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48

            
49
mod arc_io_result;
50
mod copy_buf;
51
mod copy_buf_bidi;
52
pub mod eof;
53
mod fuse_buf_reader;
54
use std::{
55
    future::Future,
56
    pin::Pin,
57
    task::{Context, Poll},
58
};
59

            
60
pub use copy_buf::{CopyBuf, copy_buf};
61
pub use copy_buf_bidi::{CopyBufBidirectional, copy_buf_bidirectional};
62
pub use eof::EofStrategy;
63

            
64
use futures::{AsyncRead, AsyncWrite, io::BufReader};
65
use pin_project::pin_project;
66

            
67
/// Return a future to copy bytes from `reader` to `writer`.
68
///
69
/// See [`copy_buf()`] for full details.
70
///
71
/// Unlike `copy_buf`, this function does not require that `reader` implements AsyncBufRead:
72
/// it wraps `reader` internally in a new `BufReader` with default capacity.
73
///
74
/// ## Limitations
75
///
76
/// If an error occurs during transmission, buffered data that was read from `reader`
77
/// but not written to `writer` will be lost.
78
/// To avoid this, use [`copy_buf()`].
79
///
80
/// Similarly, if you drop this future while it is still pending,
81
/// any buffered data will be lost.
82
///
83
/// See the crate-level documentation for further
84
/// [discussion of this function's limitations](crate#Limitations).
85
pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
86
where
87
    R: AsyncRead,
88
    W: AsyncWrite,
89
{
90
    let reader = BufReader::new(reader);
91
    Copy(copy_buf(reader, writer))
92
}
93

            
94
/// Return a future to copies bytes from `stream_a` to `stream_b`,
95
/// and from `stream_b` to `stream_a`.
96
///
97
/// See [`copy_buf_bidirectional()`] for full details.
98
///
99
/// Unlike `copy_buf_bidirectional`, this function does not require that either stream implements AsyncBufRead:
100
/// it wraps them internally in a new `BufReader` with default capacity.
101
///
102
/// ## Limitations
103
///
104
/// If an error occurs during transmission, data that was read from one stream,
105
/// but not written to the other, will be lost.
106
/// To avoid this, use [`copy_buf_bidirectional()`].
107
///
108
/// Similarly, if you drop this future while it is still pending,
109
/// any buffered data will be lost.
110
///
111
/// See the crate-level documentation for further
112
/// [discussion of this function's limitations](crate#Limitations).
113
pub fn copy_bidirectional<A, B, AE, BE>(
114
    stream_a: A,
115
    stream_b: B,
116
    on_a_eof: AE,
117
    on_b_eof: BE,
118
) -> CopyBidirectional<A, B, AE, BE>
119
where
120
    A: AsyncRead + AsyncWrite,
121
    B: AsyncRead + AsyncWrite,
122
    AE: EofStrategy<B>,
123
    BE: EofStrategy<A>,
124
{
125
    let stream_a = BufReader::new(stream_a);
126
    let stream_b = BufReader::new(stream_b);
127
    CopyBidirectional(copy_buf_bidirectional(
128
        stream_a,
129
        stream_b,
130
        eof::BufReaderEofWrapper(on_a_eof),
131
        eof::BufReaderEofWrapper(on_b_eof),
132
    ))
133
}
134

            
135
/// A future returned by [`copy`].
136
#[derive(Debug)]
137
#[pin_project]
138
#[must_use = "futures do nothing unless you `.await` or poll them"]
139
pub struct Copy<R, W>(#[pin] CopyBuf<BufReader<R>, W>);
140

            
141
/// A future returned by [`copy_bidirectional`].
142
#[derive(Debug)]
143
#[pin_project]
144
#[must_use = "futures do nothing unless you `.await` or poll them"]
145
pub struct CopyBidirectional<A, B, AE, BE>(
146
    #[pin]
147
    CopyBufBidirectional<
148
        BufReader<A>,
149
        BufReader<B>,
150
        eof::BufReaderEofWrapper<AE>,
151
        eof::BufReaderEofWrapper<BE>,
152
    >,
153
);
154

            
155
// Note: There is intentionally no `into_inner` implementation for these types,
156
// since returning the original streams would discard any buffered data.
157

            
158
impl<R, W> Future for Copy<R, W>
159
where
160
    R: AsyncRead,
161
    W: AsyncWrite,
162
{
163
    type Output = std::io::Result<u64>;
164

            
165
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166
        self.project().0.poll(cx)
167
    }
168
}
169

            
170
impl<A, B, AE, BE> Future for CopyBidirectional<A, B, AE, BE>
171
where
172
    A: AsyncRead + AsyncWrite,
173
    B: AsyncRead + AsyncWrite,
174
    AE: EofStrategy<B>,
175
    BE: EofStrategy<A>,
176
{
177
    type Output = std::io::Result<(u64, u64)>;
178

            
179
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180
        self.project().0.poll(cx)
181
    }
182
}
183

            
184
#[cfg(test)]
185
mod test {
186
    // @@ begin test lint list maintained by maint/add_warning @@
187
    #![allow(clippy::bool_assert_comparison)]
188
    #![allow(clippy::clone_on_copy)]
189
    #![allow(clippy::dbg_macro)]
190
    #![allow(clippy::mixed_attributes_style)]
191
    #![allow(clippy::print_stderr)]
192
    #![allow(clippy::print_stdout)]
193
    #![allow(clippy::single_char_pattern)]
194
    #![allow(clippy::unwrap_used)]
195
    #![allow(clippy::unchecked_time_subtraction)]
196
    #![allow(clippy::useless_vec)]
197
    #![allow(clippy::needless_pass_by_value)]
198
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
199

            
200
    use super::*;
201
    use std::io;
202

            
203
    /// A struct that implements AsyncRead and AsyncWrite, but always returns an error.
204
    #[derive(Debug, Clone)]
205
    pub(crate) struct ErrorRW(pub(crate) io::ErrorKind);
206

            
207
    impl AsyncRead for ErrorRW {
208
        fn poll_read(
209
            self: Pin<&mut Self>,
210
            _cx: &mut Context<'_>,
211
            _buf: &mut [u8],
212
        ) -> Poll<io::Result<usize>> {
213
            Poll::Ready(Err(io::Error::from(self.0)))
214
        }
215
    }
216

            
217
    impl AsyncWrite for ErrorRW {
218
        fn poll_write(
219
            self: Pin<&mut Self>,
220
            _cx: &mut Context<'_>,
221
            _buf: &[u8],
222
        ) -> Poll<io::Result<usize>> {
223
            Poll::Ready(Err(io::Error::from(self.0)))
224
        }
225

            
226
        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
227
            Poll::Ready(Err(io::Error::from(self.0)))
228
        }
229

            
230
        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
231
            Poll::Ready(Err(io::Error::from(self.0)))
232
        }
233
    }
234

            
235
    /// A struct that implements AsyncRead, but never returns any data.
236
    ///
237
    /// (This reader is always _pending_.)
238
    pub(crate) struct PausedRead;
239

            
240
    impl AsyncRead for PausedRead {
241
        fn poll_read(
242
            self: Pin<&mut Self>,
243
            _cx: &mut Context<'_>,
244
            _buf: &mut [u8],
245
        ) -> Poll<io::Result<usize>> {
246
            Poll::Pending
247
        }
248
    }
249

            
250
    /// A read-write pair, stapled into a Read+Write stream.
251
    #[pin_project]
252
    pub(crate) struct RWPair<R, W>(#[pin] pub(crate) R, #[pin] pub(crate) W);
253

            
254
    impl<R: AsyncRead, W> AsyncRead for RWPair<R, W> {
255
        fn poll_read(
256
            self: Pin<&mut Self>,
257
            cx: &mut Context<'_>,
258
            buf: &mut [u8],
259
        ) -> Poll<io::Result<usize>> {
260
            self.project().0.poll_read(cx, buf)
261
        }
262
    }
263

            
264
    impl<R, W: AsyncWrite> AsyncWrite for RWPair<R, W> {
265
        fn poll_write(
266
            self: Pin<&mut Self>,
267
            cx: &mut Context<'_>,
268
            buf: &[u8],
269
        ) -> Poll<io::Result<usize>> {
270
            self.project().1.poll_write(cx, buf)
271
        }
272

            
273
        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
274
            self.project().1.poll_flush(cx)
275
        }
276

            
277
        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
278
            self.project().1.poll_close(cx)
279
        }
280
    }
281
}