1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
#![deny(clippy::string_slice)] // See arti#2571
48
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
49

            
50
mod arc_io_result;
51
mod copy_buf;
52
mod copy_buf_bidi;
53
pub mod eof;
54
mod fuse_buf_reader;
55
use std::{
56
    future::Future,
57
    pin::Pin,
58
    task::{Context, Poll},
59
};
60

            
61
pub use copy_buf::{CopyBuf, copy_buf};
62
pub use copy_buf_bidi::{CopyBufBidirectional, copy_buf_bidirectional};
63
pub use eof::EofStrategy;
64

            
65
use futures::{AsyncRead, AsyncWrite, io::BufReader};
66
use pin_project::pin_project;
67

            
68
/// Return a future to copy bytes from `reader` to `writer`.
69
///
70
/// See [`copy_buf()`] for full details.
71
///
72
/// Unlike `copy_buf`, this function does not require that `reader` implements AsyncBufRead:
73
/// it wraps `reader` internally in a new `BufReader` with default capacity.
74
///
75
/// ## Limitations
76
///
77
/// If an error occurs during transmission, buffered data that was read from `reader`
78
/// but not written to `writer` will be lost.
79
/// To avoid this, use [`copy_buf()`].
80
///
81
/// Similarly, if you drop this future while it is still pending,
82
/// any buffered data will be lost.
83
///
84
/// See the crate-level documentation for further
85
/// [discussion of this function's limitations](crate#Limitations).
86
pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
87
where
88
    R: AsyncRead,
89
    W: AsyncWrite,
90
{
91
    let reader = BufReader::new(reader);
92
    Copy(copy_buf(reader, writer))
93
}
94

            
95
/// Return a future to copies bytes from `stream_a` to `stream_b`,
96
/// and from `stream_b` to `stream_a`.
97
///
98
/// See [`copy_buf_bidirectional()`] for full details.
99
///
100
/// Unlike `copy_buf_bidirectional`, this function does not require that either stream implements AsyncBufRead:
101
/// it wraps them internally in a new `BufReader` with default capacity.
102
///
103
/// ## Limitations
104
///
105
/// If an error occurs during transmission, data that was read from one stream,
106
/// but not written to the other, will be lost.
107
/// To avoid this, use [`copy_buf_bidirectional()`].
108
///
109
/// Similarly, if you drop this future while it is still pending,
110
/// any buffered data will be lost.
111
///
112
/// See the crate-level documentation for further
113
/// [discussion of this function's limitations](crate#Limitations).
114
pub fn copy_bidirectional<A, B, AE, BE>(
115
    stream_a: A,
116
    stream_b: B,
117
    on_a_eof: AE,
118
    on_b_eof: BE,
119
) -> CopyBidirectional<A, B, AE, BE>
120
where
121
    A: AsyncRead + AsyncWrite,
122
    B: AsyncRead + AsyncWrite,
123
    AE: EofStrategy<B>,
124
    BE: EofStrategy<A>,
125
{
126
    let stream_a = BufReader::new(stream_a);
127
    let stream_b = BufReader::new(stream_b);
128
    CopyBidirectional(copy_buf_bidirectional(
129
        stream_a,
130
        stream_b,
131
        eof::BufReaderEofWrapper(on_a_eof),
132
        eof::BufReaderEofWrapper(on_b_eof),
133
    ))
134
}
135

            
136
/// A future returned by [`copy`].
137
#[derive(Debug)]
138
#[pin_project]
139
#[must_use = "futures do nothing unless you `.await` or poll them"]
140
pub struct Copy<R, W>(#[pin] CopyBuf<BufReader<R>, W>);
141

            
142
/// A future returned by [`copy_bidirectional`].
143
#[derive(Debug)]
144
#[pin_project]
145
#[must_use = "futures do nothing unless you `.await` or poll them"]
146
pub struct CopyBidirectional<A, B, AE, BE>(
147
    #[pin]
148
    CopyBufBidirectional<
149
        BufReader<A>,
150
        BufReader<B>,
151
        eof::BufReaderEofWrapper<AE>,
152
        eof::BufReaderEofWrapper<BE>,
153
    >,
154
);
155

            
156
// Note: There is intentionally no `into_inner` implementation for these types,
157
// since returning the original streams would discard any buffered data.
158

            
159
impl<R, W> Future for Copy<R, W>
160
where
161
    R: AsyncRead,
162
    W: AsyncWrite,
163
{
164
    type Output = std::io::Result<u64>;
165

            
166
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
167
        self.project().0.poll(cx)
168
    }
169
}
170

            
171
impl<A, B, AE, BE> Future for CopyBidirectional<A, B, AE, BE>
172
where
173
    A: AsyncRead + AsyncWrite,
174
    B: AsyncRead + AsyncWrite,
175
    AE: EofStrategy<B>,
176
    BE: EofStrategy<A>,
177
{
178
    type Output = std::io::Result<(u64, u64)>;
179

            
180
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181
        self.project().0.poll(cx)
182
    }
183
}
184

            
185
#[cfg(test)]
186
mod test {
187
    // @@ begin test lint list maintained by maint/add_warning @@
188
    #![allow(clippy::bool_assert_comparison)]
189
    #![allow(clippy::clone_on_copy)]
190
    #![allow(clippy::dbg_macro)]
191
    #![allow(clippy::mixed_attributes_style)]
192
    #![allow(clippy::print_stderr)]
193
    #![allow(clippy::print_stdout)]
194
    #![allow(clippy::single_char_pattern)]
195
    #![allow(clippy::unwrap_used)]
196
    #![allow(clippy::unchecked_time_subtraction)]
197
    #![allow(clippy::useless_vec)]
198
    #![allow(clippy::needless_pass_by_value)]
199
    #![allow(clippy::string_slice)] // See arti#2571
200
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
201

            
202
    use super::*;
203
    use std::io;
204

            
205
    /// A struct that implements AsyncRead and AsyncWrite, but always returns an error.
206
    #[derive(Debug, Clone)]
207
    pub(crate) struct ErrorRW(pub(crate) io::ErrorKind);
208

            
209
    impl AsyncRead for ErrorRW {
210
        fn poll_read(
211
            self: Pin<&mut Self>,
212
            _cx: &mut Context<'_>,
213
            _buf: &mut [u8],
214
        ) -> Poll<io::Result<usize>> {
215
            Poll::Ready(Err(io::Error::from(self.0)))
216
        }
217
    }
218

            
219
    impl AsyncWrite for ErrorRW {
220
        fn poll_write(
221
            self: Pin<&mut Self>,
222
            _cx: &mut Context<'_>,
223
            _buf: &[u8],
224
        ) -> Poll<io::Result<usize>> {
225
            Poll::Ready(Err(io::Error::from(self.0)))
226
        }
227

            
228
        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
229
            Poll::Ready(Err(io::Error::from(self.0)))
230
        }
231

            
232
        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
233
            Poll::Ready(Err(io::Error::from(self.0)))
234
        }
235
    }
236

            
237
    /// A struct that implements AsyncRead, but never returns any data.
238
    ///
239
    /// (This reader is always _pending_.)
240
    pub(crate) struct PausedRead;
241

            
242
    impl AsyncRead for PausedRead {
243
        fn poll_read(
244
            self: Pin<&mut Self>,
245
            _cx: &mut Context<'_>,
246
            _buf: &mut [u8],
247
        ) -> Poll<io::Result<usize>> {
248
            Poll::Pending
249
        }
250
    }
251

            
252
    /// A read-write pair, stapled into a Read+Write stream.
253
    #[pin_project]
254
    pub(crate) struct RWPair<R, W>(#[pin] pub(crate) R, #[pin] pub(crate) W);
255

            
256
    impl<R: AsyncRead, W> AsyncRead for RWPair<R, W> {
257
        fn poll_read(
258
            self: Pin<&mut Self>,
259
            cx: &mut Context<'_>,
260
            buf: &mut [u8],
261
        ) -> Poll<io::Result<usize>> {
262
            self.project().0.poll_read(cx, buf)
263
        }
264
    }
265

            
266
    impl<R, W: AsyncWrite> AsyncWrite for RWPair<R, W> {
267
        fn poll_write(
268
            self: Pin<&mut Self>,
269
            cx: &mut Context<'_>,
270
            buf: &[u8],
271
        ) -> Poll<io::Result<usize>> {
272
            self.project().1.poll_write(cx, buf)
273
        }
274

            
275
        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
276
            self.project().1.poll_flush(cx)
277
        }
278

            
279
        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
280
            self.project().1.poll_close(cx)
281
        }
282
    }
283
}