1
//! [`SometimesUnboundedSink`]
2

            
3
use std::collections::VecDeque;
4
use std::pin::Pin;
5
use std::task::{Context, Poll, Poll::*, ready};
6

            
7
use futures::{Sink, future};
8

            
9
use pin_project::pin_project;
10

            
11
/// Wraps a [`Sink`], providing an only-sometimes-used unbounded buffer
12
///
13
/// For example, consider `SometimesUnboundedSink<T, mpsc::Receiver>`.
14
/// The `Receiver` is not always ready for writing:
15
/// if the capacity is exceeded, `send` will block.
16
///
17
/// `SometimesUnboundedSink`'s `Sink` implementation works the same way.
18
/// But there are also two methods
19
/// [`pollish_send_unbounded`](SometimesUnboundedSink::pollish_send_unbounded)
20
/// and
21
/// [`send_unbounded`](SometimesUnboundedSink::send_unbounded)
22
/// which will always succeed immediately.
23
/// Items which the underlying sink `S` is not ready to accept are queued,
24
/// and will be delivered to `S` when possible.
25
///
26
/// ### You must poll this type
27
///
28
/// For queued items to be delivered,
29
/// `SometimesUnboundedSink` must be polled,
30
/// even if you don't have an item to send.
31
///
32
/// You can use [`Sink::poll_ready`] for this.
33
/// Any [`Context`]-taking methods is suitable.
34
///
35
/// (This is a difference between [`SometimesUnboundedSink`]
36
/// and [`mpsc::UnboundedSender`](futures::channel::mpsc::UnboundedSender):
37
/// [`UnboundedSender::unbounded_send`](futures::channel::mpsc::UnboundedSender::unbounded_send)
38
/// does not require a flush operation.
39
/// In this way, [`SometimesUnboundedSink::send_unbounded`] behaves more like
40
/// [`Sink::start_send`], which _does_ require a subsequent flush.)
41
//
42
/// ### Error handling
43
///
44
/// Errors from the underlying sink may not be reported immediately,
45
/// due to the buffering in `SometimesUnboundedSink`.
46
///
47
/// However, if the sink reports errors from `poll_ready`
48
/// these will surface in a timely fashion.
49
///
50
/// After an error has been reported, there may still be buffered data,
51
/// which will only be delivered if `SometimesUnboundedSink` is polled again
52
/// (and the error in the underlying sink was transient).
53
#[pin_project]
54
pub(crate) struct SometimesUnboundedSink<T, S> {
55
    /// Things we couldn't send_unbounded right away
56
    ///
57
    /// Invariants:
58
    ///
59
    ///  * Everything here must be fed to `inner` before any further user data
60
    ///    (unbounded user data may be appended).
61
    ///
62
    ///  * If this is nonempty, the executor knows to wake this task.
63
    ///    This is achieved as follows:
64
    ///    If this is nonempty, `inner.poll_ready()` has been called.
65
    buf: VecDeque<T>,
66

            
67
    /// The actual sink
68
    ///
69
    /// This also has the relevant `Waker`.
70
    ///
71
    /// # Waker invariant
72
    ///
73
    /// Whenever either
74
    ///
75
    ///  * The last call to any of our public methods returned `Pending`, or
76
    ///  * `buf` is nonempty,
77
    ///
78
    /// the last method call `inner` *also* returned `Pending`.
79
    /// (Or, we have reported an error.)
80
    ///
81
    /// So, in those situations, this task has been recorded for wakeup
82
    /// by `inner` (specifically, its other end, if it's a channel)
83
    /// when `inner` becomes readable.
84
    ///
85
    /// Therefore this task will be woken up, and, if the caller actually
86
    /// polls us again (as is usual and is required by our docs),
87
    /// we'll drain any queued data.
88
    #[pin]
89
    inner: S,
90
}
91

            
92
impl<T, S: Sink<T>> SometimesUnboundedSink<T, S> {
93
    /// Wrap an inner `Sink` with a `SometimesUnboundedSink`
94
    //
95
    // There is no method for unwrapping.  If we make this type more public,
96
    // there should be, but that method will need `where S: Unpin`.
97
380
    pub(crate) fn new(inner: S) -> Self {
98
380
        SometimesUnboundedSink {
99
380
            buf: VecDeque::new(),
100
380
            inner,
101
380
        }
102
380
    }
103

            
104
    /// Return the number of T queued in this sink.
105
44
    pub(crate) fn n_queued(&self) -> usize {
106
44
        self.buf.len()
107
44
    }
108

            
109
    /// Return an iterator over the items queued in this sink.
110
    ///
111
    /// (Used by circuit padding to see whether we have a cell queued for a given hop.)
112
    #[cfg(feature = "circ-padding")]
113
    pub(crate) fn iter_queue(&self) -> impl Iterator<Item = &T> + '_ {
114
        self.buf.iter()
115
    }
116

            
117
    /// Hand `item` to the inner Sink if possible, or queue it otherwise
118
    ///
119
    /// Like a `poll_...` method in that it takes a `Context`.
120
    /// That's needed to make sure we get polled again
121
    /// when the underlying sink can accept items.
122
    ///
123
    /// But unlike a `poll_...` method in that it doesn't return `Poll`,
124
    /// since completion is always immediate.
125
4662
    pub(crate) fn pollish_send_unbounded(
126
4662
        mut self: Pin<&mut Self>,
127
4662
        cx: &mut Context<'_>,
128
4662
        item: T,
129
4662
    ) -> Result<(), S::Error> {
130
4662
        match self.as_mut().poll_ready(cx) {
131
            // Waker invariant: poll_ready only returns Ready(Ok(())) if `buf` is empty
132
4654
            Ready(Ok(())) => self.as_mut().start_send(item),
133
            // Waker invariant: if we report an error, we're then allowed to expect polling again
134
4
            Ready(Err(e)) => Err(e),
135
            Pending => {
136
                // Waker invariant: poll_ready() returned Pending,
137
                // so the task has indeed already been recorded.
138
4
                self.as_mut().project().buf.push_back(item);
139
4
                Ok(())
140
            }
141
        }
142
4662
    }
143

            
144
    /// Hand `item` to the inner Sink if possible, or queue it otherwise (async fn)
145
    ///
146
    /// You must `.await` this, but it will never block.
147
    /// (Its future is always `Ready`.)
148
4662
    pub(crate) async fn send_unbounded(mut self: Pin<&mut Self>, item: T) -> Result<(), S::Error> {
149
        // Waker invariant: this is just a wrapper around `pollish_send_unbounded`
150
4662
        let mut item = Some(item);
151
4662
        future::poll_fn(move |cx| {
152
4662
            let item = item.take().expect("polled after Ready");
153
4662
            Ready(self.as_mut().pollish_send_unbounded(cx, item))
154
4662
        })
155
4662
        .await
156
4662
    }
157

            
158
    /// Flush the buffer.  On a `Ready(())` return, it's empty.
159
    ///
160
    /// This satisfies the Waker invariant as if it were a public method.
161
18922
    fn flush_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
162
18922
        let mut self_ = self.as_mut().project();
163
18926
        while !self_.buf.is_empty() {
164
            // Waker invariant:
165
            // if inner gave Pending, we give Pending too: ok
166
            // if inner gave Err, we're allowed to want polling again
167
8
            ready!(self_.inner.as_mut().poll_ready(cx))?;
168
4
            let item = self_.buf.pop_front().expect("suddenly empty!");
169
            // Waker invariant: returning Err
170
4
            self_.inner.as_mut().start_send(item)?;
171
        }
172
        // Waker invariant: buffer is empty, and we're not about to return Pending
173
18918
        Ready(Ok(()))
174
18922
    }
175

            
176
    /// Obtain a reference to the inner `Sink`, `S`
177
    ///
178
    /// This method should be used with a little care, since it bypasses the wrapper.
179
    /// For example, if `S` has interior mutability, and this method is used to
180
    /// modify it, the `SometimesUnboundedSink` may malfunction.
181
4710
    pub(crate) fn as_inner(&self) -> &S {
182
4710
        &self.inner
183
4710
    }
184

            
185
    /// Obtain a mutable reference to the inner `Sink`, `S`
186
    ///
187
    /// This method should be used with extra care, since it bypasses the wrapper.
188
    /// Before you call this method,
189
    /// make sure you understand the internal invariants for `SometimesUnboundedSink`,
190
    /// and make sure that you are not violating them.
191
    /// In particular, do not queue anything onto the resulting `Sink` directly.
192
44
    pub(crate) fn as_inner_mut(&mut self) -> &mut S {
193
44
        &mut self.inner
194
44
    }
195
}
196

            
197
// Waker invariant for all these impls:
198
// returning Err or Pending from flush_buf: OK, flush_buf ensures the condition holds
199
// returning from the inner method: trivially OK
200
impl<T, S: Sink<T>> Sink<T> for SometimesUnboundedSink<T, S> {
201
    type Error = S::Error;
202

            
203
    // Only returns `Ready(Ok(()))` if `buf` is empty
204
18838
    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
205
18838
        ready!(self.as_mut().flush_buf(cx))?;
206
18834
        self.project().inner.poll_ready(cx)
207
18838
    }
208

            
209
4670
    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), S::Error> {
210
4670
        assert!(self.buf.is_empty(), "start_send without poll_ready");
211
4670
        self.project().inner.start_send(item)
212
4670
    }
213

            
214
80
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
215
80
        ready!(self.as_mut().flush_buf(cx))?;
216
80
        self.project().inner.poll_flush(cx)
217
80
    }
218

            
219
4
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
220
4
        ready!(self.as_mut().flush_buf(cx))?;
221
4
        self.project().inner.poll_close(cx)
222
4
    }
223
}
224

            
225
#[cfg(test)]
226
mod test {
227
    // @@ begin test lint list maintained by maint/add_warning @@
228
    #![allow(clippy::bool_assert_comparison)]
229
    #![allow(clippy::clone_on_copy)]
230
    #![allow(clippy::dbg_macro)]
231
    #![allow(clippy::mixed_attributes_style)]
232
    #![allow(clippy::print_stderr)]
233
    #![allow(clippy::print_stdout)]
234
    #![allow(clippy::single_char_pattern)]
235
    #![allow(clippy::unwrap_used)]
236
    #![allow(clippy::unchecked_time_subtraction)]
237
    #![allow(clippy::useless_vec)]
238
    #![allow(clippy::needless_pass_by_value)]
239
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
240
    use super::*;
241
    use futures::channel::mpsc;
242
    use futures::{SinkExt as _, StreamExt as _};
243
    use std::pin::pin;
244
    use tor_rtmock::MockRuntime;
245

            
246
    #[test]
247
    fn cases() {
248
        // `test_with_various` runs with both LIFO and FIFO scheduling policies,
249
        // so should interleave the sending and receiving tasks
250
        // in ways that exercise the corner cases we're interested in.
251
        MockRuntime::test_with_various(|runtime| async move {
252
            let (tx, rx) = mpsc::channel(1);
253
            let tx = SometimesUnboundedSink::new(tx);
254

            
255
            runtime.spawn_identified("sender", async move {
256
                let mut tx = pin!(tx);
257
                let mut n = 0..;
258
                let mut n = move || n.next().unwrap();
259

            
260
                // unbounded when we can send right away
261
                tx.as_mut().send_unbounded(n()).await.unwrap();
262
                tx.as_mut().send(n()).await.unwrap();
263
                tx.as_mut().send(n()).await.unwrap();
264
                tx.as_mut().send(n()).await.unwrap();
265
                // unbounded when we maybe can't and might queue
266
                tx.as_mut().send_unbounded(n()).await.unwrap();
267
                tx.as_mut().send_unbounded(n()).await.unwrap();
268
                tx.as_mut().send_unbounded(n()).await.unwrap();
269
                // some interleaving
270
                tx.as_mut().send(n()).await.unwrap();
271
                tx.as_mut().send_unbounded(n()).await.unwrap();
272
                // flush
273
                tx.as_mut().flush().await.unwrap();
274
                // close
275
                tx.as_mut().close().await.unwrap();
276
            });
277

            
278
            runtime.spawn_identified("receiver", async move {
279
                let mut rx = pin!(rx);
280
                let mut exp = 0..;
281

            
282
                while let Some(n) = rx.next().await {
283
                    assert_eq!(n, exp.next().unwrap());
284
                }
285
                assert_eq!(exp.next().unwrap(), 9);
286
            });
287

            
288
            runtime.progress_until_stalled().await;
289
        });
290
    }
291
}