1
//! Provides [`StreamPollSet`]
2

            
3
// So that we can declare these things as if they were in their own crate.
4
#![allow(unreachable_pub)]
5

            
6
use std::{
7
    collections::{BTreeMap, HashMap, hash_map},
8
    future::Future,
9
    hash::Hash,
10
    pin::Pin,
11
    task::{Context, Poll, Waker},
12
};
13

            
14
use futures::{FutureExt, StreamExt as _};
15
use tor_async_utils::peekable_stream::PeekableStream;
16

            
17
use crate::util::{
18
    keyed_futures_unordered::KeyedFuturesUnordered,
19
    tunnel_activity::{InTunnelActivity, TunnelActivity},
20
};
21

            
22
/// A future that wraps a [`PeekableStream`], and yields the stream
23
/// when an item becomes available.
24
struct PeekableReady<S> {
25
    /// The stream to be peeked.
26
    stream: Option<S>,
27
}
28

            
29
impl<S> PeekableReady<S> {
30
    /// Create a new [`PeekableReady`].
31
4690
    fn new(st: S) -> Self {
32
4690
        Self { stream: Some(st) }
33
4690
    }
34

            
35
    /// Get a reference to the inner `S`.
36
    ///
37
    /// None if the future has already completed.
38
4
    fn get_ref(&self) -> Option<&S> {
39
4
        self.stream.as_ref()
40
4
    }
41

            
42
    /// Get a mut reference to the inner `S`.
43
    ///
44
    /// None if the future has already completed.
45
8616
    fn get_mut(&mut self) -> Option<&mut S> {
46
8616
        self.stream.as_mut()
47
8616
    }
48

            
49
    /// Unwrap inner `S`.
50
    ///
51
    /// None if the future has already completed.
52
40
    fn into_inner(self) -> Option<S> {
53
40
        self.stream
54
40
    }
55
}
56

            
57
impl<S> Future for PeekableReady<S>
58
where
59
    S: PeekableStream + Unpin,
60
{
61
    type Output = S;
62

            
63
4566
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64
4566
        let Some(stream) = &mut self.stream else {
65
            panic!("Polled completed future");
66
        };
67
4566
        match Pin::new(stream).poll_peek(cx) {
68
4362
            Poll::Ready(_) => Poll::Ready(self.stream.take().expect("Stream disappeared")),
69
204
            Poll::Pending => Poll::Pending,
70
        }
71
4566
    }
72
}
73

            
74
/// Manages a dynamic set of [`futures::Stream`] with associated keys and
75
/// priorities.
76
///
77
/// Notable features:
78
///
79
/// * Prioritization: streams have an associated priority, and ready-streams are
80
///   iterated over in ascending priority order.
81
/// * Efficient polling: an unready stream won't be polled again until it's
82
///   ready or exhausted (e.g. a corresponding [`futures::Sink`] is written-to or
83
///   dropped). A ready stream won't be polled again until the ready item has been
84
///   removed.
85
pub struct StreamPollSet<K, P, S>
86
where
87
    S: PeekableStream + Unpin,
88
{
89
    /// Priority for each stream in the set, and associated InTunnelActivity token.
90
    // We keep the priority for each stream here instead of bundling it together
91
    // with the stream, so that the priority can easily be changed even while a
92
    // future waiting on the stream is still pending (e.g. to support rescaling
93
    // priorities for EWMA).
94
    // Invariants:
95
    // * Every key is also present in exactly one of `ready_values` or `pending_streams`.
96
    priorities: HashMap<K, (P, InTunnelActivity)>,
97
    /// Streams that have a result ready, in ascending order by priority.
98
    // Invariants:
99
    // * Keys are a (non-strict) subset of those in `priorities`.
100
    ready_streams: BTreeMap<(P, K), S>,
101
    /// Streams for which we're still waiting for the next result.
102
    // Invariants:
103
    // * Keys are a (non-strict) subset of those in `priorities`.
104
    pending_streams: KeyedFuturesUnordered<K, PeekableReady<S>>,
105

            
106
    /// Information about how active this particular hop has been,
107
    /// with respect to tracking overall tunnel activity.
108
    tunnel_activity: TunnelActivity,
109
}
110

            
111
impl<K, P, S> StreamPollSet<K, P, S>
112
where
113
    K: Ord + Hash + Clone + Send + Sync + 'static,
114
    S: PeekableStream + Unpin,
115
    P: Ord + Clone,
116
{
117
    /// Create a new, empty, `StreamPollSet`.
118
1044
    pub fn new() -> Self {
119
1044
        Self {
120
1044
            priorities: Default::default(),
121
1044
            ready_streams: Default::default(),
122
1044
            pending_streams: KeyedFuturesUnordered::new(),
123
1044
            tunnel_activity: TunnelActivity::never_used(),
124
1044
        }
125
1044
    }
126

            
127
    /// Insert a `stream`, with an associated `key` and `priority`.
128
    ///
129
    /// If the `key` is already in use, the parameters are returned without altering `self`.
130
    // To *replace* an existing key, we'd need to cancel any pending future and
131
    // ensure that the cancellation is processed before inserting the new key, to
132
    // ensure we don't assign a value from the previous key to the new key's
133
    // stream.
134
416
    pub fn try_insert(
135
416
        &mut self,
136
416
        key: K,
137
416
        priority: P,
138
416
        stream: S,
139
416
    ) -> Result<(), KeyAlreadyInsertedError<K, P, S>> {
140
416
        let hash_map::Entry::Vacant(v) = self.priorities.entry(key.clone()) else {
141
            // We already have an entry for this key.
142
            return Err(KeyAlreadyInsertedError {
143
                key,
144
                priority,
145
                stream,
146
            });
147
        };
148
416
        self.pending_streams
149
416
            .try_insert(key, PeekableReady::new(stream))
150
            // By `pending_streams` invariant that keys are a subset of those in
151
            // `priorities`.
152
416
            .unwrap_or_else(|_| panic!("Unexpected duplicate key"));
153
416
        let token = self.tunnel_activity.inc_streams();
154
416
        v.insert((priority, token));
155
416
        Ok(())
156
416
    }
157

            
158
    /// Remove the entry for `key`, if any. This is the key, priority, buffered
159
    /// poll_next result, and stream.
160
132
    pub fn remove(&mut self, key: &K) -> Option<(K, P, S)> {
161
132
        let (priority, token) = self.priorities.remove(key)?;
162
110
        self.tunnel_activity.dec_streams(token);
163
110
        if let Some((key, fut)) = self.pending_streams.remove(key) {
164
            // Validate `priorities` invariant that keys are also present in exactly one of
165
            // `pending_streams` and `ready_values`.
166
40
            debug_assert!(
167
40
                !self
168
40
                    .ready_streams
169
40
                    .contains_key(&(priority.clone(), key.clone()))
170
            );
171
40
            let stream = fut
172
40
                .into_inner()
173
                // We know the future hasn't completed, so the stream should be present.
174
40
                .expect("Missing stream");
175
40
            Some((key, priority, stream))
176
        } else {
177
70
            let ((_priority, key), stream) = self
178
70
                .ready_streams
179
70
                .remove_entry(&(priority.clone(), key.clone()))
180
70
                // By
181
70
                // * `pending_streams` invariant that keys are also present in
182
70
                // exactly one of `pending_streams` and `ready_values`.
183
70
                // * validated above that the key was in `pending_streams`, and
184
70
                // not in `ready_values`.
185
70
                .expect("Unexpectedly no value for key");
186
70
            Some((key, priority, stream))
187
        }
188
132
    }
189

            
190
    /// Polls streams that are ready to be polled, and returns an iterator over all streams
191
    /// for which we have a buffered `Poll::Ready` result, in ascending priority order.
192
    ///
193
    /// Registers the provided [`Context`] to be woken when
194
    /// any of the internal streams that weren't ready in the previous call to
195
    /// this method (and therefore wouldn't have appeared in the iterator
196
    /// results) become potentially ready (based on when the inner stream wakes
197
    /// the `Context` provided to its own `poll_next`).
198
    ///
199
    /// The same restrictions apply as for [`Self::stream_mut`].  e.g. do not
200
    /// directly call [`PeekableStream::poll_peek`] to see what item is
201
    /// available on the stream; instead use [`Self::peek_mut`]. (Or
202
    /// [`tor_async_utils::peekable_stream::UnobtrusivePeekableStream`] if
203
    /// implemented for the stream).
204
    ///
205
    /// This method does *not* drain ready items. `Some` values can be removed
206
    /// with [`Self::take_ready_value_and_reprioritize`]. `None` values can only
207
    /// be removed by removing the whole stream with [`Self::remove`].
208
    ///
209
    /// This API is meant to allow callers to find the first stream (in priority
210
    /// order) that is ready, and that the caller is able to process now. i.e.
211
    /// it's specifically to support the use-case where external factors may
212
    /// prevent the processing of some streams but not others.
213
    ///
214
    /// Example:
215
    ///
216
    /// ```nocompile
217
    /// # // We need the `nocompile` since `StreamPollSet` is non-pub.
218
    /// # // TODO: take away the nocompile if we make this pub or implement some
219
    /// # // workaround to expose it to doc-tests.
220
    /// # type Key=u64;
221
    /// # type Value=u64;
222
    /// # type Priority=u64;
223
    /// # type MyStream=Box<dyn futures::Stream<Item=Value> + Unpin>;
224
    /// # fn can_process(key: &Key, val: &Value) -> bool { true }
225
    /// # fn process(val: Value) { }
226
    /// # fn new_priority(priority: &Priority) -> Priority { *priority }
227
    /// fn process_a_ready_stream(sps: &mut StreamPollSet<Key, Value, Priority, MyStream>, cx: &mut std::task::Context) -> std::task::Poll<()> {
228
    ///   let mut iter = sps.poll_ready_iter(cx);
229
    ///   while let Some((key, priority, stream)) = iter.next() {
230
    ///     let Some(value) = stream.unobtrusive_peek(Pin::new(stream)) else {
231
    ///        // Stream exhausted. Remove the stream. We have to drop the iterator
232
    ///        // first, though, so that we can mutate.
233
    ///        let key = *key;
234
    ///        drop(iter);
235
    ///        sps.remove(&key).unwrap();
236
    ///        return std::task::Poll::Ready(());
237
    ///     };
238
    ///     if can_process(key, value) {
239
    ///        let key = *key;
240
    ///        let priority = new_priority(priority);
241
    ///        drop(iter);
242
    ///        let (_old_priority, value) = sps.take_ready_value_and_reprioritize(&key, priority).unwrap();
243
    ///        process(value);
244
    ///        return std::task::Poll::Ready(());
245
    ///     }
246
    ///   }
247
    ///   return std::task::Poll::Pending;
248
    /// }
249
    /// ```
250
    // In the current implementation we *could* actually permit the caller to
251
    // `poll_peek` a stream that we know is ready. But this may change as the
252
    // impl evolves further, and it's probably better to blanket disallow it
253
    // than to have complex rules for the caller about when it's ok.
254
    //
255
    // TODO: It would be nice if the returned iterator supported additional
256
    // actions, e.g. allowing the user to consume the iterator and take and
257
    // reprioritize the inner value, but this is tricky.
258
    //
259
    // I've sketched out a working "cursor" that holds the current position (K, P)
260
    // and a &mut StreamPollSet. This can't implement the Iterator interface though
261
    // since it needs to borrow from self. I was able to implement an Iterator-*like* interface
262
    // that does borrow from self, but this doesn't compose well. e.g. in StreamMap
263
    // we can't use the same technique again since the object would need a mut reference to the
264
    // StreamMap *and* to this inner cursor object, which is illegal.
265
19164
    pub fn poll_ready_iter_mut<'a>(
266
19164
        &'a mut self,
267
19164
        cx: &mut Context,
268
19164
    ) -> impl Iterator<Item = (&'a K, &'a P, &'a mut S)> + 'a + use<'a, K, P, S> {
269
        // First poll for ready streams
270
23526
        while let Poll::Ready(Some((key, stream))) = self.pending_streams.poll_next_unpin(cx) {
271
4362
            let (priority, _) = self
272
4362
                .priorities
273
4362
                .get(&key)
274
4362
                // By `pending_streams` invariant that all keys are also in `priorities`.
275
4362
                .expect("Missing priority");
276
4362
            let prev = self.ready_streams.insert((priority.clone(), key), stream);
277
4362
            assert!(prev.is_none());
278
        }
279
19164
        self.ready_streams.iter_mut().map(|((p, k), s)| (k, p, s))
280
19164
    }
281

            
282
    /// If the stream for `key` has `Some(value)` ready, take that value and set the
283
    /// priority for it to `new_priority`.
284
    ///
285
    /// This method doesn't register a waker with the polled stream. Use
286
    /// `poll_ready_iter` to ensure streams make progress.
287
    ///
288
    /// If the key doesn't exist, the stream isn't ready, or the stream's value
289
    /// is `None` (indicating the end of the stream), this function returns
290
    /// `None` without mutating anything.
291
    ///
292
    /// Ended streams should be removed using [`Self::remove`].
293
4274
    pub fn take_ready_value_and_reprioritize(
294
4274
        &mut self,
295
4274
        key: &K,
296
4274
        new_priority: P,
297
4274
    ) -> Option<(P, S::Item)> {
298
        // Get the priority entry, but don't replace until the lookup in ready_streams is confirmed.
299
4274
        let hash_map::Entry::Occupied(mut priority_entry) = self.priorities.entry(key.clone())
300
        else {
301
            // Key isn't present at all.
302
            return None;
303
        };
304
4274
        let (priority_mut, _) = priority_entry.get_mut();
305
4274
        let Some(((_p, key), mut stream)) = self
306
4274
            .ready_streams
307
4274
            .remove_entry(&(priority_mut.clone(), key.clone()))
308
        else {
309
            // This stream isn't in the ready list.
310
            return None;
311
        };
312
4274
        match Pin::new(&mut stream).poll_peek(&mut Context::from_waker(Waker::noop())) {
313
4274
            Poll::Ready(Some(_val)) => (), // Stream is ready, and has an item. Proceed.
314
            Poll::Ready(None) => {
315
                // Stream is ready, but is terminated.
316
                // Leave in place and return `None`.
317
                return None;
318
            }
319
            Poll::Pending => {
320
                // Stream wasn't actually ready, despite being on the ready
321
                // list. This should be impossible by the stability guarantees
322
                // of `PeekableStream` and our own internal logic, but we can
323
                // recover.
324
                tracing::error!("Bug: Stream unexpectedly unready");
325
                self.pending_streams
326
                    .try_insert(key.clone(), PeekableReady::new(stream))
327
                    // By invariant on `priorities` that keys are in exactly one of the ready or pending lists.
328
                    .unwrap_or_else(|_| {
329
                        unreachable!("Key unexpectedly in both ready and unready list")
330
                    });
331
                return None;
332
            }
333
        }
334
4274
        let Some(Some(val)) = stream.next().now_or_never() else {
335
            panic!("Polling stream returned a different result than peeking");
336
        };
337
4274
        let prev_priority = std::mem::replace(priority_mut, new_priority);
338
4274
        self.pending_streams
339
4274
            .try_insert(key, PeekableReady::new(stream))
340
            // We verified above that the key wasn't present in `priorities`,
341
            // and `pending_streams` has the invariant that its keys are a
342
            // subset of those in `priorities`.
343
4274
            .unwrap_or_else(|_| panic!("Unexpected pending stream entry"));
344
4274
        Some((prev_priority, val))
345
4274
    }
346

            
347
    /// Get a mut reference to a ready value for key `key`, if one exists.
348
    ///
349
    /// This method doesn't poll the internal streams. Use `poll_ready_iter` to
350
    /// ensure streams make progress.
351
    // This will be used for packing and fragmentation, to take part of a DATA message.
352
    #[allow(unused)]
353
    pub fn peek_mut<'a>(&'a mut self, key: &K) -> Option<Poll<Option<&'a mut S::Item>>> {
354
        let (priority, _) = self.priorities.get(key)?;
355
        let Some(peekable) = self.ready_streams.get_mut(&(priority.clone(), key.clone())) else {
356
            return Some(Poll::Pending);
357
        };
358
        // We don't have a waker registered here, so we can just use the noop waker.
359
        // TODO: Create a mut future for `PeekableStream`.
360
        Some(Pin::new(peekable).poll_peek_mut(&mut Context::from_waker(Waker::noop())))
361
    }
362

            
363
    /// Get a reference to the stream for `key`.
364
    ///
365
    /// The same restrictions apply as for [`Self::stream_mut`] (e.g. using
366
    /// interior mutability).
367
    #[allow(dead_code)]
368
4
    pub fn stream(&self, key: &K) -> Option<&S> {
369
4
        if let Some(s) = self.pending_streams.get(key) {
370
4
            let s = s.get_ref();
371
            // Stream must be present since it's still pending.
372
4
            debug_assert!(s.is_some(), "Unexpected missing pending stream");
373
4
            return s;
374
        }
375
        let (priority, _) = self.priorities.get(key)?;
376
        self.ready_streams.get(&(priority.clone(), key.clone()))
377
4
    }
378

            
379
    /// Get a mut reference to the stream for `key`.
380
    ///
381
    /// Polling the stream through this reference, or otherwise causing its
382
    /// registered `Waker` to be removed without waking it, will result in
383
    /// unspecified (but not unsound) behavior.
384
    ///
385
    /// This is mostly intended for accessing non-`Stream` functionality of the stream
386
    /// object, though it *is* permitted to mutate it in a way that the stream becomes
387
    /// ready (potentially removing and waking its registered Waker(s)).
388
    //
389
    // In particular:
390
    // * Polling a stream in the pending list and getting a Pending result
391
    //   will overwrite our Waker, resulting in us not polling it again.
392
    // * Doing so with a stream on the pending list and getting a Ready result
393
    //   might be ok if it had already woken our waker. Otoh it could potentially
394
    //   result in our waker never getting woken, and hence us not polling it again.
395
    // * Doing so with a stream on the ready list should actually be ok, since
396
    //   we don't have a registered waker, and don't do our own buffering.
397
8696
    pub fn stream_mut(&mut self, key: &K) -> Option<&mut S> {
398
8696
        if let Some(s) = self.pending_streams.get_mut(key) {
399
8616
            let s = s.get_mut();
400
            // Stream must be present since it's still pending.
401
8616
            debug_assert!(s.is_some(), "Unexpected missing pending stream");
402
8616
            return s;
403
80
        }
404
80
        let (priority, _) = self.priorities.get(key)?;
405
2
        self.ready_streams.get_mut(&(priority.clone(), key.clone()))
406
8696
    }
407

            
408
    /// Number of streams managed by this object.
409
530
    pub fn len(&self) -> usize {
410
530
        self.priorities.len()
411
530
    }
412

            
413
    /// Return a `TunnelActivity` for this hop.
414
    pub fn tunnel_activity(&self) -> TunnelActivity {
415
        assert_eq!(self.len(), self.tunnel_activity.n_open_streams());
416
        self.tunnel_activity
417
    }
418
}
419

            
420
impl<K, P, S> Drop for StreamPollSet<K, P, S>
421
where
422
    S: PeekableStream + Unpin,
423
{
424
912
    fn drop(&mut self) {
425
912
        self.priorities
426
912
            .drain()
427
1061
            .for_each(|(_key, (_prio, token))| token.consume_and_forget());
428
912
    }
429
}
430

            
431
/// Error returned by [`StreamPollSet::try_insert`].
432
#[derive(Debug, thiserror::Error)]
433
#[allow(clippy::exhaustive_structs)]
434
pub struct KeyAlreadyInsertedError<K, P, S> {
435
    /// Key that caller tried to insert.
436
    #[allow(dead_code)]
437
    pub key: K,
438
    /// Priority that caller tried to insert.
439
    #[allow(dead_code)]
440
    pub priority: P,
441
    /// Stream that caller tried to insert.
442
    #[allow(dead_code)]
443
    pub stream: S,
444
}
445

            
446
#[cfg(test)]
447
mod test {
448
    // @@ begin test lint list maintained by maint/add_warning @@
449
    #![allow(clippy::bool_assert_comparison)]
450
    #![allow(clippy::clone_on_copy)]
451
    #![allow(clippy::dbg_macro)]
452
    #![allow(clippy::mixed_attributes_style)]
453
    #![allow(clippy::print_stderr)]
454
    #![allow(clippy::print_stdout)]
455
    #![allow(clippy::single_char_pattern)]
456
    #![allow(clippy::unwrap_used)]
457
    #![allow(clippy::unchecked_time_subtraction)]
458
    #![allow(clippy::useless_vec)]
459
    #![allow(clippy::needless_pass_by_value)]
460
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
461

            
462
    use std::{
463
        collections::VecDeque,
464
        sync::{Arc, Mutex},
465
        task::Poll,
466
    };
467

            
468
    use futures::{SinkExt as _, stream::Peekable};
469
    use pin_project::pin_project;
470
    use tor_rtmock::MockRuntime;
471

            
472
    use super::*;
473

            
474
    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
475
    struct Key(u64);
476

            
477
    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
478
    struct Priority(u64);
479

            
480
    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
481
    struct Value(u64);
482

            
483
    /// Test stream that we can directly manipulate and examine.
484
    #[derive(Debug)]
485
    #[pin_project]
486
    struct VecDequeStream<T> {
487
        // Ready items.
488
        vec: VecDeque<T>,
489
        // Whether any more items will be written.
490
        closed: bool,
491
        // Registered waker.
492
        waker: Option<std::task::Waker>,
493
    }
494
    impl<T> VecDequeStream<T> {
495
        fn new_open<I: IntoIterator<Item = T>>(values: I) -> Self {
496
            Self {
497
                vec: VecDeque::from_iter(values),
498
                waker: None,
499
                closed: false,
500
            }
501
        }
502
        fn new_closed<I: IntoIterator<Item = T>>(values: I) -> Self {
503
            Self {
504
                vec: VecDeque::from_iter(values),
505
                waker: None,
506
                closed: true,
507
            }
508
        }
509
        fn push(&mut self, value: T) {
510
            assert!(!self.closed);
511
            self.vec.push_back(value);
512
            if let Some(waker) = self.waker.take() {
513
                waker.wake();
514
            }
515
        }
516
    }
517
    impl<T> futures::Stream for VecDequeStream<T> {
518
        type Item = T;
519

            
520
        fn poll_next(
521
            mut self: std::pin::Pin<&mut Self>,
522
            cx: &mut std::task::Context<'_>,
523
        ) -> Poll<Option<Self::Item>> {
524
            if let Some(val) = self.as_mut().vec.pop_front() {
525
                Poll::Ready(Some(val))
526
            } else if self.as_mut().closed {
527
                // No more items coming.
528
                Poll::Ready(None)
529
            } else {
530
                self.as_mut().waker.replace(cx.waker().clone());
531
                Poll::Pending
532
            }
533
        }
534
    }
535
    impl<T> PeekableStream for VecDequeStream<T> {
536
        fn poll_peek_mut(
537
            self: Pin<&mut Self>,
538
            cx: &mut Context<'_>,
539
        ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
540
            let s = self.project();
541
            if let Some(val) = s.vec.front_mut() {
542
                Poll::Ready(Some(val))
543
            } else if *s.closed {
544
                // No more items coming.
545
                Poll::Ready(None)
546
            } else {
547
                s.waker.replace(cx.waker().clone());
548
                Poll::Pending
549
            }
550
        }
551
    }
552
    impl<T> std::cmp::PartialEq for VecDequeStream<T>
553
    where
554
        T: std::cmp::PartialEq,
555
    {
556
        fn eq(&self, other: &Self) -> bool {
557
            // Ignore waker, which isn't comparable
558
            self.vec == other.vec && self.closed == other.closed
559
        }
560
    }
561
    impl<T> std::cmp::Eq for VecDequeStream<T> where T: std::cmp::Eq {}
562

            
563
    type TestStream = VecDequeStream<Value>;
564

            
565
    #[test]
566
    fn test_empty() {
567
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
568
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
569
            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
570
            Poll::Ready(())
571
        }));
572
    }
573

            
574
    #[test]
575
    fn test_one_pending() {
576
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
577
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
578
            pollset
579
                .try_insert(Key(0), Priority(0), TestStream::new_open([]))
580
                .unwrap();
581
            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
582
            Poll::Ready(())
583
        }));
584
    }
585

            
586
    #[test]
587
    fn test_one_ready() {
588
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
589
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
590
            pollset
591
                .try_insert(
592
                    Key(0),
593
                    Priority(0),
594
                    TestStream::new_closed([Value(1), Value(2)]),
595
                )
596
                .unwrap();
597

            
598
            // We only see the first value of the one ready stream.
599
            assert_eq!(
600
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
601
                vec![(
602
                    &Key(0),
603
                    &Priority(0),
604
                    &mut TestStream::new_closed([Value(1), Value(2)])
605
                )],
606
            );
607

            
608
            // Same result, the same value is still at the head of the stream..
609
            assert_eq!(
610
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
611
                vec![(
612
                    &Key(0),
613
                    &Priority(0),
614
                    &mut TestStream::new_closed([Value(1), Value(2)])
615
                )]
616
            );
617

            
618
            // Take the head of the stream.
619
            assert_eq!(
620
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(1)),
621
                Some((Priority(0), Value(1)))
622
            );
623

            
624
            // Should see the next value, with the new priority.
625
            assert_eq!(
626
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
627
                vec![(
628
                    &Key(0),
629
                    &Priority(1),
630
                    &mut TestStream::new_closed([Value(2)])
631
                )]
632
            );
633

            
634
            // Take again.
635
            assert_eq!(
636
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
637
                Some((Priority(1), Value(2)))
638
            );
639

            
640
            // Should see end-of-stream.
641
            assert_eq!(
642
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
643
                vec![(&Key(0), &Priority(2), &mut TestStream::new_closed([]))]
644
            );
645

            
646
            // Remove the now-ended stream.
647
            assert_eq!(
648
                pollset.remove(&Key(0)),
649
                Some((Key(0), Priority(2), TestStream::new_closed([])))
650
            );
651

            
652
            // Should now be empty.
653
            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
654

            
655
            Poll::Ready(())
656
        }));
657
    }
658

            
659
    #[test]
660
    fn test_round_robin() {
661
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
662
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
663
            pollset
664
                .try_insert(
665
                    Key(0),
666
                    Priority(0),
667
                    TestStream::new_closed([Value(1), Value(2)]),
668
                )
669
                .unwrap();
670
            pollset
671
                .try_insert(
672
                    Key(1),
673
                    Priority(1),
674
                    TestStream::new_closed([Value(3), Value(4)]),
675
                )
676
                .unwrap();
677

            
678
            // Should see both ready streams, in priority order.
679
            assert_eq!(
680
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
681
                vec![
682
                    (
683
                        &Key(0),
684
                        &Priority(0),
685
                        &mut TestStream::new_closed([Value(1), Value(2)])
686
                    ),
687
                    (
688
                        &Key(1),
689
                        &Priority(1),
690
                        &mut TestStream::new_closed([Value(3), Value(4)])
691
                    ),
692
                ]
693
            );
694

            
695
            // Take from the first stream and send it to the back via priority assignment.
696
            assert_eq!(
697
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
698
                Some((Priority(0), Value(1)))
699
            );
700

            
701
            // Should see both ready streams, in the new priority order.
702
            assert_eq!(
703
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
704
                vec![
705
                    (
706
                        &Key(1),
707
                        &Priority(1),
708
                        &mut TestStream::new_closed([Value(3), Value(4)])
709
                    ),
710
                    (
711
                        &Key(0),
712
                        &Priority(2),
713
                        &mut TestStream::new_closed([Value(2)])
714
                    ),
715
                ]
716
            );
717

            
718
            // Keep going ...
719
            assert_eq!(
720
                pollset.take_ready_value_and_reprioritize(&Key(1), Priority(3)),
721
                Some((Priority(1), Value(3)))
722
            );
723
            assert_eq!(
724
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
725
                vec![
726
                    (
727
                        &Key(0),
728
                        &Priority(2),
729
                        &mut TestStream::new_closed([Value(2)])
730
                    ),
731
                    (
732
                        &Key(1),
733
                        &Priority(3),
734
                        &mut TestStream::new_closed([Value(4)])
735
                    ),
736
                ]
737
            );
738
            assert_eq!(
739
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(4)),
740
                Some((Priority(2), Value(2)))
741
            );
742
            assert_eq!(
743
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
744
                vec![
745
                    (
746
                        &Key(1),
747
                        &Priority(3),
748
                        &mut TestStream::new_closed([Value(4)])
749
                    ),
750
                    (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
751
                ]
752
            );
753
            assert_eq!(
754
                pollset.take_ready_value_and_reprioritize(&Key(1), Priority(5)),
755
                Some((Priority(3), Value(4)))
756
            );
757
            assert_eq!(
758
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
759
                vec![
760
                    (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
761
                    (&Key(1), &Priority(5), &mut TestStream::new_closed([])),
762
                ]
763
            );
764

            
765
            Poll::Ready(())
766
        }));
767
    }
768

            
769
    #[test]
770
    fn test_remove_and_reuse_key() {
771
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
772
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
773
            pollset
774
                .try_insert(
775
                    Key(0),
776
                    Priority(0),
777
                    TestStream::new_closed([Value(1), Value(2)]),
778
                )
779
                .unwrap();
780
            assert_eq!(
781
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
782
                vec![(
783
                    &Key(0),
784
                    &Priority(0),
785
                    &mut TestStream::new_closed([Value(1), Value(2)])
786
                ),]
787
            );
788
            assert_eq!(
789
                pollset.remove(&Key(0)),
790
                Some((
791
                    Key(0),
792
                    Priority(0),
793
                    TestStream::new_closed([Value(1), Value(2)])
794
                ))
795
            );
796
            pollset
797
                .try_insert(
798
                    Key(0),
799
                    Priority(1),
800
                    TestStream::new_closed([Value(3), Value(4)]),
801
                )
802
                .unwrap();
803
            // Ensure we see the ready value in the new stream, and *not* anything from the previous stream at that key.
804
            assert_eq!(
805
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
806
                vec![(
807
                    &Key(0),
808
                    &Priority(1),
809
                    &mut TestStream::new_closed([Value(3), Value(4)])
810
                ),]
811
            );
812
            Poll::Ready(())
813
        }));
814
    }
815

            
816
    #[test]
817
    fn get_ready_stream() {
818
        futures::executor::block_on(futures::future::poll_fn(|_ctx| {
819
            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
820
            pollset
821
                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(1)]))
822
                .unwrap();
823
            assert_eq!(pollset.stream(&Key(0)).unwrap().vec[0], Value(1));
824
            Poll::Ready(())
825
        }));
826
    }
827

            
828
    #[test]
829
    fn get_pending_stream() {
830
        futures::executor::block_on(futures::future::poll_fn(|_ctx| {
831
            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
832
            pollset
833
                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
834
                .unwrap();
835
            assert!(pollset.stream(&Key(0)).unwrap().vec.is_empty());
836
            Poll::Ready(())
837
        }));
838
    }
839

            
840
    #[test]
841
    fn mutate_pending_stream() {
842
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
843
            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
844
            pollset
845
                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
846
                .unwrap();
847
            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
848

            
849
            // This should cause the stream to become ready.
850
            pollset.stream_mut(&Key(0)).unwrap().push(Value(0));
851

            
852
            assert_eq!(
853
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
854
                vec![(
855
                    &Key(0),
856
                    &Priority(0),
857
                    &mut VecDequeStream::new_open([Value(0)])
858
                ),]
859
            );
860

            
861
            Poll::Ready(())
862
        }));
863
    }
864

            
865
    #[test]
866
    fn mutate_ready_stream() {
867
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
868
            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
869
            pollset
870
                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(0)]))
871
                .unwrap();
872
            assert_eq!(
873
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
874
                vec![(
875
                    &Key(0),
876
                    &Priority(0),
877
                    &mut VecDequeStream::new_open([Value(0)])
878
                ),]
879
            );
880

            
881
            pollset.stream_mut(&Key(0)).unwrap().push(Value(1));
882

            
883
            assert_eq!(
884
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
885
                vec![(
886
                    &Key(0),
887
                    &Priority(0),
888
                    &mut VecDequeStream::new_open([Value(0), Value(1)])
889
                ),]
890
            );
891

            
892
            // Consume the value that was there.
893
            assert_eq!(
894
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(0)),
895
                Some((Priority(0), Value(0)))
896
            );
897

            
898
            // We should now see the value we added.
899
            assert_eq!(
900
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
901
                vec![(
902
                    &Key(0),
903
                    &Priority(0),
904
                    &mut VecDequeStream::new_open([Value(1)])
905
                ),]
906
            );
907

            
908
            Poll::Ready(())
909
        }));
910
    }
911

            
912
    #[test]
913
    fn test_async() {
914
        MockRuntime::test_with_various(|rt| async move {
915
            let mut pollset = StreamPollSet::<
916
                Key,
917
                Priority,
918
                Peekable<futures::channel::mpsc::Receiver<Value>>,
919
            >::new();
920

            
921
            // Create 2 mpsc channels, bounded so that we can exercise back-pressure.
922
            // These are analogous to Tor streams.
923
            for streami in 1..=2 {
924
                let (mut send, recv) = futures::channel::mpsc::channel::<Value>(2);
925
                pollset
926
                    .try_insert(Key(streami), Priority(streami), recv.peekable())
927
                    .unwrap();
928
                rt.spawn_identified(format!("stream{streami}"), async move {
929
                    for val in 0..10 {
930
                        send.send(Value(val * streami)).await.unwrap();
931
                    }
932
                });
933
            }
934

            
935
            let output = Arc::new(Mutex::new(Vec::new()));
936

            
937
            rt.spawn_identified("mux", {
938
                let output = output.clone();
939
                async move {
940
                    loop {
941
                        let (key, priority, value) = futures::future::poll_fn(|ctx| {
942
                            match pollset.poll_ready_iter_mut(ctx).next() {
943
                                Some((key, priority, stream)) => {
944
                                    let Poll::Ready(value) = Pin::new(stream).poll_peek(ctx) else {
945
                                        panic!("poll_ready_iter_mut returned non-ready stream")
946
                                    };
947
                                    Poll::Ready((*key, *priority, value.copied()))
948
                                }
949
                                // No streams ready, but there could be more items coming.
950
                                // The current `ctx` should be registered to wake us
951
                                // if and when there are.
952
                                None => Poll::Pending,
953
                            }
954
                        })
955
                        .await;
956
                        if let Some(value) = value {
957
                            // Take the value, and haphazardly set priority to push this stream "back".
958
                            pollset
959
                                .take_ready_value_and_reprioritize(&key, Priority(priority.0 + 10))
960
                                .unwrap();
961
                            output.lock().unwrap().push((key, value));
962
                        } else {
963
                            // Stream ended. Remove it.
964
                            let _ = pollset.remove(&key).unwrap();
965
                        }
966
                    }
967
                }
968
            });
969

            
970
            rt.advance_until_stalled().await;
971

            
972
            let output = output.lock().unwrap();
973

            
974
            // We can't predict exactly how the stream values will be
975
            // interleaved, but we should get all items from each stream, with
976
            // correct order within each stream.
977
            for streami in 1..=2 {
978
                let expected = (0..10).map(|val| Value(val * streami)).collect::<Vec<_>>();
979
                let actual = output
980
                    .iter()
981
                    .filter_map(|(k, v)| (k == &Key(streami)).then_some(*v))
982
                    .collect::<Vec<_>>();
983
                assert_eq!(actual, expected);
984
            }
985
        });
986
    }
987
}