1
//! Provides [`StreamPollSet`]
2

            
3
// So that we can declare these things as if they were in their own crate.
4
#![allow(unreachable_pub)]
5

            
6
use std::{
7
    collections::{BTreeMap, HashMap, hash_map},
8
    future::Future,
9
    hash::Hash,
10
    pin::Pin,
11
    task::{Context, Poll, Waker},
12
};
13

            
14
use futures::{FutureExt, StreamExt as _};
15
use tor_async_utils::peekable_stream::PeekableStream;
16

            
17
use crate::util::{
18
    keyed_futures_unordered::KeyedFuturesUnordered,
19
    tunnel_activity::{InTunnelActivity, TunnelActivity},
20
};
21

            
22
/// A future that wraps a [`PeekableStream`], and yields the stream
23
/// when an item becomes available.
24
struct PeekableReady<S> {
25
    /// The stream to be peeked.
26
    stream: Option<S>,
27
}
28

            
29
impl<S> PeekableReady<S> {
30
    /// Create a new [`PeekableReady`].
31
4698
    fn new(st: S) -> Self {
32
4698
        Self { stream: Some(st) }
33
4698
    }
34

            
35
    /// Get a reference to the inner `S`.
36
    ///
37
    /// None if the future has already completed.
38
4
    fn get_ref(&self) -> Option<&S> {
39
4
        self.stream.as_ref()
40
4
    }
41

            
42
    /// Get a mut reference to the inner `S`.
43
    ///
44
    /// None if the future has already completed.
45
8624
    fn get_mut(&mut self) -> Option<&mut S> {
46
8624
        self.stream.as_mut()
47
8624
    }
48

            
49
    /// Unwrap inner `S`.
50
    ///
51
    /// None if the future has already completed.
52
40
    fn into_inner(self) -> Option<S> {
53
40
        self.stream
54
40
    }
55
}
56

            
57
impl<S> Future for PeekableReady<S>
58
where
59
    S: PeekableStream + Unpin,
60
{
61
    type Output = S;
62

            
63
4562
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64
4562
        let Some(stream) = &mut self.stream else {
65
            panic!("Polled completed future");
66
        };
67
4562
        match Pin::new(stream).poll_peek(cx) {
68
4366
            Poll::Ready(_) => Poll::Ready(self.stream.take().expect("Stream disappeared")),
69
196
            Poll::Pending => Poll::Pending,
70
        }
71
4562
    }
72
}
73

            
74
/// Manages a dynamic set of [`futures::Stream`] with associated keys and
75
/// priorities.
76
///
77
/// Notable features:
78
///
79
/// * Prioritization: streams have an associated priority, and ready-streams are
80
///   iterated over in ascending priority order.
81
/// * Efficient polling: an unready stream won't be polled again until it's
82
///   ready or exhausted (e.g. a corresponding [`futures::Sink`] is written-to or
83
///   dropped). A ready stream won't be polled again until the ready item has been
84
///   removed.
85
pub struct StreamPollSet<K, P, S>
86
where
87
    S: PeekableStream + Unpin,
88
{
89
    /// Priority for each stream in the set, and associated InTunnelActivity token.
90
    // We keep the priority for each stream here instead of bundling it together
91
    // with the stream, so that the priority can easily be changed even while a
92
    // future waiting on the stream is still pending (e.g. to support rescaling
93
    // priorities for EWMA).
94
    // Invariants:
95
    // * Every key is also present in exactly one of `ready_values` or `pending_streams`.
96
    priorities: HashMap<K, (P, InTunnelActivity)>,
97
    /// Streams that have a result ready, in ascending order by priority.
98
    // Invariants:
99
    // * Keys are a (non-strict) subset of those in `priorities`.
100
    ready_streams: BTreeMap<(P, K), S>,
101
    /// Streams for which we're still waiting for the next result.
102
    // Invariants:
103
    // * Keys are a (non-strict) subset of those in `priorities`.
104
    pending_streams: KeyedFuturesUnordered<K, PeekableReady<S>>,
105

            
106
    /// Information about how active this particular hop has been,
107
    /// with respect to tracking overall tunnel activity.
108
    tunnel_activity: TunnelActivity,
109
}
110

            
111
impl<K, P, S> StreamPollSet<K, P, S>
112
where
113
    K: Ord + Hash + Clone + Send + Sync + 'static,
114
    S: PeekableStream + Unpin,
115
    P: Ord + Clone,
116
{
117
    /// Create a new, empty, `StreamPollSet`.
118
1048
    pub fn new() -> Self {
119
1048
        Self {
120
1048
            priorities: Default::default(),
121
1048
            ready_streams: Default::default(),
122
1048
            pending_streams: KeyedFuturesUnordered::new(),
123
1048
            tunnel_activity: TunnelActivity::never_used(),
124
1048
        }
125
1048
    }
126

            
127
    /// Insert a `stream`, with an associated `key` and `priority`.
128
    ///
129
    /// If the `key` is already in use, the parameters are returned without altering `self`.
130
    // To *replace* an existing key, we'd need to cancel any pending future and
131
    // ensure that the cancellation is processed before inserting the new key, to
132
    // ensure we don't assign a value from the previous key to the new key's
133
    // stream.
134
420
    pub fn try_insert(
135
420
        &mut self,
136
420
        key: K,
137
420
        priority: P,
138
420
        stream: S,
139
420
    ) -> Result<(), KeyAlreadyInsertedError<K, P, S>> {
140
420
        let hash_map::Entry::Vacant(v) = self.priorities.entry(key.clone()) else {
141
            // We already have an entry for this key.
142
            return Err(KeyAlreadyInsertedError {
143
                key,
144
                priority,
145
                stream,
146
            });
147
        };
148
420
        self.pending_streams
149
420
            .try_insert(key, PeekableReady::new(stream))
150
            // By `pending_streams` invariant that keys are a subset of those in
151
            // `priorities`.
152
420
            .unwrap_or_else(|_| panic!("Unexpected duplicate key"));
153
420
        let token = self.tunnel_activity.inc_streams();
154
420
        v.insert((priority, token));
155
420
        Ok(())
156
420
    }
157

            
158
    /// Remove the entry for `key`, if any. This is the key, priority, buffered
159
    /// poll_next result, and stream.
160
130
    pub fn remove(&mut self, key: &K) -> Option<(K, P, S)> {
161
130
        let (priority, token) = self.priorities.remove(key)?;
162
108
        self.tunnel_activity.dec_streams(token);
163
108
        if let Some((key, fut)) = self.pending_streams.remove(key) {
164
            // Validate `priorities` invariant that keys are also present in exactly one of
165
            // `pending_streams` and `ready_values`.
166
40
            debug_assert!(
167
40
                !self
168
40
                    .ready_streams
169
40
                    .contains_key(&(priority.clone(), key.clone()))
170
            );
171
40
            let stream = fut
172
40
                .into_inner()
173
                // We know the future hasn't completed, so the stream should be present.
174
40
                .expect("Missing stream");
175
40
            Some((key, priority, stream))
176
        } else {
177
68
            let ((_priority, key), stream) = self
178
68
                .ready_streams
179
68
                .remove_entry(&(priority.clone(), key.clone()))
180
68
                // By
181
68
                // * `pending_streams` invariant that keys are also present in
182
68
                // exactly one of `pending_streams` and `ready_values`.
183
68
                // * validated above that the key was in `pending_streams`, and
184
68
                // not in `ready_values`.
185
68
                .expect("Unexpectedly no value for key");
186
68
            Some((key, priority, stream))
187
        }
188
130
    }
189

            
190
    /// Polls streams that are ready to be polled, and returns an iterator over all streams
191
    /// for which we have a buffered `Poll::Ready` result, in ascending priority order.
192
    ///
193
    /// Registers the provided [`Context`] to be woken when
194
    /// any of the internal streams that weren't ready in the previous call to
195
    /// this method (and therefore wouldn't have appeared in the iterator
196
    /// results) become potentially ready (based on when the inner stream wakes
197
    /// the `Context` provided to its own `poll_next`).
198
    ///
199
    /// The same restrictions apply as for [`Self::stream_mut`].  e.g. do not
200
    /// directly call [`PeekableStream::poll_peek`] to see what item is
201
    /// available on the stream; instead use [`Self::peek_mut`]. (Or
202
    /// [`tor_async_utils::peekable_stream::UnobtrusivePeekableStream`] if
203
    /// implemented for the stream).
204
    ///
205
    /// This method does *not* drain ready items. `Some` values can be removed
206
    /// with [`Self::take_ready_value_and_reprioritize`]. `None` values can only
207
    /// be removed by removing the whole stream with [`Self::remove`].
208
    ///
209
    /// This API is meant to allow callers to find the first stream (in priority
210
    /// order) that is ready, and that the caller is able to process now. i.e.
211
    /// it's specifically to support the use-case where external factors may
212
    /// prevent the processing of some streams but not others.
213
    ///
214
    /// Example:
215
    ///
216
    /// ```nocompile
217
    /// # // We need the `nocompile` since `StreamPollSet` is non-pub.
218
    /// # // TODO: take away the nocompile if we make this pub or implement some
219
    /// # // workaround to expose it to doc-tests.
220
    /// # type Key=u64;
221
    /// # type Value=u64;
222
    /// # type Priority=u64;
223
    /// # type MyStream=Box<dyn futures::Stream<Item=Value> + Unpin>;
224
    /// # fn can_process(key: &Key, val: &Value) -> bool { true }
225
    /// # fn process(val: Value) { }
226
    /// # fn new_priority(priority: &Priority) -> Priority { *priority }
227
    /// fn process_a_ready_stream(sps: &mut StreamPollSet<Key, Value, Priority, MyStream>, cx: &mut std::task::Context) -> std::task::Poll<()> {
228
    ///   let mut iter = sps.poll_ready_iter(cx);
229
    ///   while let Some((key, priority, stream)) = iter.next() {
230
    ///     let Some(value) = stream.unobtrusive_peek(Pin::new(stream)) else {
231
    ///        // Stream exhausted. Remove the stream. We have to drop the iterator
232
    ///        // first, though, so that we can mutate.
233
    ///        let key = *key;
234
    ///        drop(iter);
235
    ///        sps.remove(&key).unwrap();
236
    ///        return std::task::Poll::Ready(());
237
    ///     };
238
    ///     if can_process(key, value) {
239
    ///        let key = *key;
240
    ///        let priority = new_priority(priority);
241
    ///        drop(iter);
242
    ///        let (_old_priority, value) = sps.take_ready_value_and_reprioritize(&key, priority).unwrap();
243
    ///        process(value);
244
    ///        return std::task::Poll::Ready(());
245
    ///     }
246
    ///   }
247
    ///   return std::task::Poll::Pending;
248
    /// }
249
    /// ```
250
    // In the current implementation we *could* actually permit the caller to
251
    // `poll_peek` a stream that we know is ready. But this may change as the
252
    // impl evolves further, and it's probably better to blanket disallow it
253
    // than to have complex rules for the caller about when it's ok.
254
    //
255
    // TODO: It would be nice if the returned iterator supported additional
256
    // actions, e.g. allowing the user to consume the iterator and take and
257
    // reprioritize the inner value, but this is tricky.
258
    //
259
    // I've sketched out a working "cursor" that holds the current position (K, P)
260
    // and a &mut StreamPollSet. This can't implement the Iterator interface though
261
    // since it needs to borrow from self. I was able to implement an Iterator-*like* interface
262
    // that does borrow from self, but this doesn't compose well. e.g. in StreamMap
263
    // we can't use the same technique again since the object would need a mut reference to the
264
    // StreamMap *and* to this inner cursor object, which is illegal.
265
19172
    pub fn poll_ready_iter_mut<'a>(
266
19172
        &'a mut self,
267
19172
        cx: &mut Context,
268
19172
    ) -> impl Iterator<Item = (&'a K, &'a P, &'a mut S)> + 'a + use<'a, K, P, S> {
269
        // First poll for ready streams
270
23538
        while let Poll::Ready(Some((key, stream))) = self.pending_streams.poll_next_unpin(cx) {
271
4366
            let (priority, _) = self
272
4366
                .priorities
273
4366
                .get(&key)
274
4366
                // By `pending_streams` invariant that all keys are also in `priorities`.
275
4366
                .expect("Missing priority");
276
4366
            let prev = self.ready_streams.insert((priority.clone(), key), stream);
277
4366
            assert!(prev.is_none());
278
        }
279
19172
        self.ready_streams.iter_mut().map(|((p, k), s)| (k, p, s))
280
19172
    }
281

            
282
    /// If the stream for `key` has `Some(value)` ready, take that value and set the
283
    /// priority for it to `new_priority`.
284
    ///
285
    /// This method doesn't register a waker with the polled stream. Use
286
    /// `poll_ready_iter` to ensure streams make progress.
287
    ///
288
    /// If the key doesn't exist, the stream isn't ready, or the stream's value
289
    /// is `None` (indicating the end of the stream), this function returns
290
    /// `None` without mutating anything.
291
    ///
292
    /// Ended streams should be removed using [`Self::remove`].
293
4278
    pub fn take_ready_value_and_reprioritize(
294
4278
        &mut self,
295
4278
        key: &K,
296
4278
        new_priority: P,
297
4278
    ) -> Option<(P, S::Item)> {
298
        // Get the priority entry, but don't replace until the lookup in ready_streams is confirmed.
299
4278
        let hash_map::Entry::Occupied(mut priority_entry) = self.priorities.entry(key.clone())
300
        else {
301
            // Key isn't present at all.
302
            return None;
303
        };
304
4278
        let (priority_mut, _) = priority_entry.get_mut();
305
4278
        let Some(((_p, key), mut stream)) = self
306
4278
            .ready_streams
307
4278
            .remove_entry(&(priority_mut.clone(), key.clone()))
308
        else {
309
            // This stream isn't in the ready list.
310
            return None;
311
        };
312
4278
        match Pin::new(&mut stream).poll_peek(&mut Context::from_waker(Waker::noop())) {
313
4278
            Poll::Ready(Some(_val)) => (), // Stream is ready, and has an item. Proceed.
314
            Poll::Ready(None) => {
315
                // Stream is ready, but is terminated.
316
                // Leave in place and return `None`.
317
                return None;
318
            }
319
            Poll::Pending => {
320
                // Stream wasn't actually ready, despite being on the ready
321
                // list. This should be impossible by the stability guarantees
322
                // of `PeekableStream` and our own internal logic, but we can
323
                // recover.
324
                tracing::error!("Bug: Stream unexpectedly unready");
325
                self.pending_streams
326
                    .try_insert(key.clone(), PeekableReady::new(stream))
327
                    // By invariant on `priorities` that keys are in exactly one of the ready or pending lists.
328
                    .unwrap_or_else(|_| {
329
                        unreachable!("Key unexpectedly in both ready and unready list")
330
                    });
331
                return None;
332
            }
333
        }
334
4278
        let Some(Some(val)) = stream.next().now_or_never() else {
335
            panic!("Polling stream returned a different result than peeking");
336
        };
337
4278
        let prev_priority = std::mem::replace(priority_mut, new_priority);
338
4278
        self.pending_streams
339
4278
            .try_insert(key, PeekableReady::new(stream))
340
            // We verified above that the key wasn't present in `priorities`,
341
            // and `pending_streams` has the invariant that its keys are a
342
            // subset of those in `priorities`.
343
4278
            .unwrap_or_else(|_| panic!("Unexpected pending stream entry"));
344
4278
        Some((prev_priority, val))
345
4278
    }
346

            
347
    /// Get a mut reference to a ready value for key `key`, if one exists.
348
    ///
349
    /// This method doesn't poll the internal streams. Use `poll_ready_iter` to
350
    /// ensure streams make progress.
351
    // This will be used for packing and fragmentation, to take part of a DATA message.
352
    #[allow(unused)]
353
    pub fn peek_mut<'a>(&'a mut self, key: &K) -> Option<Poll<Option<&'a mut S::Item>>> {
354
        let (priority, _) = self.priorities.get(key)?;
355
        let Some(peekable) = self.ready_streams.get_mut(&(priority.clone(), key.clone())) else {
356
            return Some(Poll::Pending);
357
        };
358
        // We don't have a waker registered here, so we can just use the noop waker.
359
        // TODO: Create a mut future for `PeekableStream`.
360
        Some(Pin::new(peekable).poll_peek_mut(&mut Context::from_waker(Waker::noop())))
361
    }
362

            
363
    /// Get a reference to the stream for `key`.
364
    ///
365
    /// The same restrictions apply as for [`Self::stream_mut`] (e.g. using
366
    /// interior mutability).
367
    #[allow(dead_code)]
368
4
    pub fn stream(&self, key: &K) -> Option<&S> {
369
4
        if let Some(s) = self.pending_streams.get(key) {
370
4
            let s = s.get_ref();
371
            // Stream must be present since it's still pending.
372
4
            debug_assert!(s.is_some(), "Unexpected missing pending stream");
373
4
            return s;
374
        }
375
        let (priority, _) = self.priorities.get(key)?;
376
        self.ready_streams.get(&(priority.clone(), key.clone()))
377
4
    }
378

            
379
    /// Get a mut reference to the stream for `key`.
380
    ///
381
    /// Polling the stream through this reference, or otherwise causing its
382
    /// registered `Waker` to be removed without waking it, will result in
383
    /// unspecified (but not unsound) behavior.
384
    ///
385
    /// This is mostly intended for accessing non-`Stream` functionality of the stream
386
    /// object, though it *is* permitted to mutate it in a way that the stream becomes
387
    /// ready (potentially removing and waking its registered Waker(s)).
388
    //
389
    // In particular:
390
    // * Polling a stream in the pending list and getting a Pending result
391
    //   will overwrite our Waker, resulting in us not polling it again.
392
    // * Doing so with a stream on the pending list and getting a Ready result
393
    //   might be ok if it had already woken our waker. Otoh it could potentially
394
    //   result in our waker never getting woken, and hence us not polling it again.
395
    // * Doing so with a stream on the ready list should actually be ok, since
396
    //   we don't have a registered waker, and don't do our own buffering.
397
8708
    pub fn stream_mut(&mut self, key: &K) -> Option<&mut S> {
398
8708
        if let Some(s) = self.pending_streams.get_mut(key) {
399
8624
            let s = s.get_mut();
400
            // Stream must be present since it's still pending.
401
8624
            debug_assert!(s.is_some(), "Unexpected missing pending stream");
402
8624
            return s;
403
84
        }
404
84
        let (priority, _) = self.priorities.get(key)?;
405
2
        self.ready_streams.get_mut(&(priority.clone(), key.clone()))
406
8708
    }
407

            
408
    /// Number of streams managed by this object.
409
530
    pub fn len(&self) -> usize {
410
530
        self.priorities.len()
411
530
    }
412

            
413
    /// Return a `TunnelActivity` for this hop.
414
    pub fn tunnel_activity(&self) -> TunnelActivity {
415
        assert_eq!(self.len(), self.tunnel_activity.n_open_streams());
416
        self.tunnel_activity
417
    }
418
}
419

            
420
impl<K, P, S> Drop for StreamPollSet<K, P, S>
421
where
422
    S: PeekableStream + Unpin,
423
{
424
916
    fn drop(&mut self) {
425
916
        self.priorities
426
916
            .drain()
427
1068
            .for_each(|(_key, (_prio, token))| token.consume_and_forget());
428
916
    }
429
}
430

            
431
/// Error returned by [`StreamPollSet::try_insert`].
432
#[derive(Debug, thiserror::Error)]
433
#[allow(clippy::exhaustive_structs)]
434
pub struct KeyAlreadyInsertedError<K, P, S> {
435
    /// Key that caller tried to insert.
436
    #[allow(dead_code)]
437
    pub key: K,
438
    /// Priority that caller tried to insert.
439
    #[allow(dead_code)]
440
    pub priority: P,
441
    /// Stream that caller tried to insert.
442
    #[allow(dead_code)]
443
    pub stream: S,
444
}
445

            
446
#[cfg(test)]
447
mod test {
448
    // @@ begin test lint list maintained by maint/add_warning @@
449
    #![allow(clippy::bool_assert_comparison)]
450
    #![allow(clippy::clone_on_copy)]
451
    #![allow(clippy::dbg_macro)]
452
    #![allow(clippy::mixed_attributes_style)]
453
    #![allow(clippy::print_stderr)]
454
    #![allow(clippy::print_stdout)]
455
    #![allow(clippy::single_char_pattern)]
456
    #![allow(clippy::unwrap_used)]
457
    #![allow(clippy::unchecked_time_subtraction)]
458
    #![allow(clippy::useless_vec)]
459
    #![allow(clippy::needless_pass_by_value)]
460
    #![allow(clippy::string_slice)] // See arti#2571
461
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
462

            
463
    use std::{
464
        collections::VecDeque,
465
        sync::{Arc, Mutex},
466
        task::Poll,
467
    };
468

            
469
    use futures::{SinkExt as _, stream::Peekable};
470
    use pin_project::pin_project;
471
    use tor_rtmock::MockRuntime;
472

            
473
    use super::*;
474

            
475
    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
476
    struct Key(u64);
477

            
478
    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
479
    struct Priority(u64);
480

            
481
    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
482
    struct Value(u64);
483

            
484
    /// Test stream that we can directly manipulate and examine.
485
    #[derive(Debug)]
486
    #[pin_project]
487
    struct VecDequeStream<T> {
488
        // Ready items.
489
        vec: VecDeque<T>,
490
        // Whether any more items will be written.
491
        closed: bool,
492
        // Registered waker.
493
        waker: Option<std::task::Waker>,
494
    }
495
    impl<T> VecDequeStream<T> {
496
        fn new_open<I: IntoIterator<Item = T>>(values: I) -> Self {
497
            Self {
498
                vec: VecDeque::from_iter(values),
499
                waker: None,
500
                closed: false,
501
            }
502
        }
503
        fn new_closed<I: IntoIterator<Item = T>>(values: I) -> Self {
504
            Self {
505
                vec: VecDeque::from_iter(values),
506
                waker: None,
507
                closed: true,
508
            }
509
        }
510
        fn push(&mut self, value: T) {
511
            assert!(!self.closed);
512
            self.vec.push_back(value);
513
            if let Some(waker) = self.waker.take() {
514
                waker.wake();
515
            }
516
        }
517
    }
518
    impl<T> futures::Stream for VecDequeStream<T> {
519
        type Item = T;
520

            
521
        fn poll_next(
522
            mut self: std::pin::Pin<&mut Self>,
523
            cx: &mut std::task::Context<'_>,
524
        ) -> Poll<Option<Self::Item>> {
525
            if let Some(val) = self.as_mut().vec.pop_front() {
526
                Poll::Ready(Some(val))
527
            } else if self.as_mut().closed {
528
                // No more items coming.
529
                Poll::Ready(None)
530
            } else {
531
                self.as_mut().waker.replace(cx.waker().clone());
532
                Poll::Pending
533
            }
534
        }
535
    }
536
    impl<T> PeekableStream for VecDequeStream<T> {
537
        fn poll_peek_mut(
538
            self: Pin<&mut Self>,
539
            cx: &mut Context<'_>,
540
        ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
541
            let s = self.project();
542
            if let Some(val) = s.vec.front_mut() {
543
                Poll::Ready(Some(val))
544
            } else if *s.closed {
545
                // No more items coming.
546
                Poll::Ready(None)
547
            } else {
548
                s.waker.replace(cx.waker().clone());
549
                Poll::Pending
550
            }
551
        }
552
    }
553
    impl<T> std::cmp::PartialEq for VecDequeStream<T>
554
    where
555
        T: std::cmp::PartialEq,
556
    {
557
        fn eq(&self, other: &Self) -> bool {
558
            // Ignore waker, which isn't comparable
559
            self.vec == other.vec && self.closed == other.closed
560
        }
561
    }
562
    impl<T> std::cmp::Eq for VecDequeStream<T> where T: std::cmp::Eq {}
563

            
564
    type TestStream = VecDequeStream<Value>;
565

            
566
    #[test]
567
    fn test_empty() {
568
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
569
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
570
            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
571
            Poll::Ready(())
572
        }));
573
    }
574

            
575
    #[test]
576
    fn test_one_pending() {
577
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
578
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
579
            pollset
580
                .try_insert(Key(0), Priority(0), TestStream::new_open([]))
581
                .unwrap();
582
            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
583
            Poll::Ready(())
584
        }));
585
    }
586

            
587
    #[test]
588
    fn test_one_ready() {
589
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
590
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
591
            pollset
592
                .try_insert(
593
                    Key(0),
594
                    Priority(0),
595
                    TestStream::new_closed([Value(1), Value(2)]),
596
                )
597
                .unwrap();
598

            
599
            // We only see the first value of the one ready stream.
600
            assert_eq!(
601
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
602
                vec![(
603
                    &Key(0),
604
                    &Priority(0),
605
                    &mut TestStream::new_closed([Value(1), Value(2)])
606
                )],
607
            );
608

            
609
            // Same result, the same value is still at the head of the stream..
610
            assert_eq!(
611
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
612
                vec![(
613
                    &Key(0),
614
                    &Priority(0),
615
                    &mut TestStream::new_closed([Value(1), Value(2)])
616
                )]
617
            );
618

            
619
            // Take the head of the stream.
620
            assert_eq!(
621
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(1)),
622
                Some((Priority(0), Value(1)))
623
            );
624

            
625
            // Should see the next value, with the new priority.
626
            assert_eq!(
627
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
628
                vec![(
629
                    &Key(0),
630
                    &Priority(1),
631
                    &mut TestStream::new_closed([Value(2)])
632
                )]
633
            );
634

            
635
            // Take again.
636
            assert_eq!(
637
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
638
                Some((Priority(1), Value(2)))
639
            );
640

            
641
            // Should see end-of-stream.
642
            assert_eq!(
643
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
644
                vec![(&Key(0), &Priority(2), &mut TestStream::new_closed([]))]
645
            );
646

            
647
            // Remove the now-ended stream.
648
            assert_eq!(
649
                pollset.remove(&Key(0)),
650
                Some((Key(0), Priority(2), TestStream::new_closed([])))
651
            );
652

            
653
            // Should now be empty.
654
            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
655

            
656
            Poll::Ready(())
657
        }));
658
    }
659

            
660
    #[test]
661
    fn test_round_robin() {
662
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
663
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
664
            pollset
665
                .try_insert(
666
                    Key(0),
667
                    Priority(0),
668
                    TestStream::new_closed([Value(1), Value(2)]),
669
                )
670
                .unwrap();
671
            pollset
672
                .try_insert(
673
                    Key(1),
674
                    Priority(1),
675
                    TestStream::new_closed([Value(3), Value(4)]),
676
                )
677
                .unwrap();
678

            
679
            // Should see both ready streams, in priority order.
680
            assert_eq!(
681
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
682
                vec![
683
                    (
684
                        &Key(0),
685
                        &Priority(0),
686
                        &mut TestStream::new_closed([Value(1), Value(2)])
687
                    ),
688
                    (
689
                        &Key(1),
690
                        &Priority(1),
691
                        &mut TestStream::new_closed([Value(3), Value(4)])
692
                    ),
693
                ]
694
            );
695

            
696
            // Take from the first stream and send it to the back via priority assignment.
697
            assert_eq!(
698
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
699
                Some((Priority(0), Value(1)))
700
            );
701

            
702
            // Should see both ready streams, in the new priority order.
703
            assert_eq!(
704
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
705
                vec![
706
                    (
707
                        &Key(1),
708
                        &Priority(1),
709
                        &mut TestStream::new_closed([Value(3), Value(4)])
710
                    ),
711
                    (
712
                        &Key(0),
713
                        &Priority(2),
714
                        &mut TestStream::new_closed([Value(2)])
715
                    ),
716
                ]
717
            );
718

            
719
            // Keep going ...
720
            assert_eq!(
721
                pollset.take_ready_value_and_reprioritize(&Key(1), Priority(3)),
722
                Some((Priority(1), Value(3)))
723
            );
724
            assert_eq!(
725
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
726
                vec![
727
                    (
728
                        &Key(0),
729
                        &Priority(2),
730
                        &mut TestStream::new_closed([Value(2)])
731
                    ),
732
                    (
733
                        &Key(1),
734
                        &Priority(3),
735
                        &mut TestStream::new_closed([Value(4)])
736
                    ),
737
                ]
738
            );
739
            assert_eq!(
740
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(4)),
741
                Some((Priority(2), Value(2)))
742
            );
743
            assert_eq!(
744
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
745
                vec![
746
                    (
747
                        &Key(1),
748
                        &Priority(3),
749
                        &mut TestStream::new_closed([Value(4)])
750
                    ),
751
                    (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
752
                ]
753
            );
754
            assert_eq!(
755
                pollset.take_ready_value_and_reprioritize(&Key(1), Priority(5)),
756
                Some((Priority(3), Value(4)))
757
            );
758
            assert_eq!(
759
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
760
                vec![
761
                    (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
762
                    (&Key(1), &Priority(5), &mut TestStream::new_closed([])),
763
                ]
764
            );
765

            
766
            Poll::Ready(())
767
        }));
768
    }
769

            
770
    #[test]
771
    fn test_remove_and_reuse_key() {
772
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
773
            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
774
            pollset
775
                .try_insert(
776
                    Key(0),
777
                    Priority(0),
778
                    TestStream::new_closed([Value(1), Value(2)]),
779
                )
780
                .unwrap();
781
            assert_eq!(
782
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
783
                vec![(
784
                    &Key(0),
785
                    &Priority(0),
786
                    &mut TestStream::new_closed([Value(1), Value(2)])
787
                ),]
788
            );
789
            assert_eq!(
790
                pollset.remove(&Key(0)),
791
                Some((
792
                    Key(0),
793
                    Priority(0),
794
                    TestStream::new_closed([Value(1), Value(2)])
795
                ))
796
            );
797
            pollset
798
                .try_insert(
799
                    Key(0),
800
                    Priority(1),
801
                    TestStream::new_closed([Value(3), Value(4)]),
802
                )
803
                .unwrap();
804
            // Ensure we see the ready value in the new stream, and *not* anything from the previous stream at that key.
805
            assert_eq!(
806
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
807
                vec![(
808
                    &Key(0),
809
                    &Priority(1),
810
                    &mut TestStream::new_closed([Value(3), Value(4)])
811
                ),]
812
            );
813
            Poll::Ready(())
814
        }));
815
    }
816

            
817
    #[test]
818
    fn get_ready_stream() {
819
        futures::executor::block_on(futures::future::poll_fn(|_ctx| {
820
            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
821
            pollset
822
                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(1)]))
823
                .unwrap();
824
            assert_eq!(pollset.stream(&Key(0)).unwrap().vec[0], Value(1));
825
            Poll::Ready(())
826
        }));
827
    }
828

            
829
    #[test]
830
    fn get_pending_stream() {
831
        futures::executor::block_on(futures::future::poll_fn(|_ctx| {
832
            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
833
            pollset
834
                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
835
                .unwrap();
836
            assert!(pollset.stream(&Key(0)).unwrap().vec.is_empty());
837
            Poll::Ready(())
838
        }));
839
    }
840

            
841
    #[test]
842
    fn mutate_pending_stream() {
843
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
844
            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
845
            pollset
846
                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
847
                .unwrap();
848
            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
849

            
850
            // This should cause the stream to become ready.
851
            pollset.stream_mut(&Key(0)).unwrap().push(Value(0));
852

            
853
            assert_eq!(
854
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
855
                vec![(
856
                    &Key(0),
857
                    &Priority(0),
858
                    &mut VecDequeStream::new_open([Value(0)])
859
                ),]
860
            );
861

            
862
            Poll::Ready(())
863
        }));
864
    }
865

            
866
    #[test]
867
    fn mutate_ready_stream() {
868
        futures::executor::block_on(futures::future::poll_fn(|ctx| {
869
            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
870
            pollset
871
                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(0)]))
872
                .unwrap();
873
            assert_eq!(
874
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
875
                vec![(
876
                    &Key(0),
877
                    &Priority(0),
878
                    &mut VecDequeStream::new_open([Value(0)])
879
                ),]
880
            );
881

            
882
            pollset.stream_mut(&Key(0)).unwrap().push(Value(1));
883

            
884
            assert_eq!(
885
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
886
                vec![(
887
                    &Key(0),
888
                    &Priority(0),
889
                    &mut VecDequeStream::new_open([Value(0), Value(1)])
890
                ),]
891
            );
892

            
893
            // Consume the value that was there.
894
            assert_eq!(
895
                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(0)),
896
                Some((Priority(0), Value(0)))
897
            );
898

            
899
            // We should now see the value we added.
900
            assert_eq!(
901
                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
902
                vec![(
903
                    &Key(0),
904
                    &Priority(0),
905
                    &mut VecDequeStream::new_open([Value(1)])
906
                ),]
907
            );
908

            
909
            Poll::Ready(())
910
        }));
911
    }
912

            
913
    #[test]
914
    fn test_async() {
915
        MockRuntime::test_with_various(|rt| async move {
916
            let mut pollset = StreamPollSet::<
917
                Key,
918
                Priority,
919
                Peekable<futures::channel::mpsc::Receiver<Value>>,
920
            >::new();
921

            
922
            // Create 2 mpsc channels, bounded so that we can exercise back-pressure.
923
            // These are analogous to Tor streams.
924
            for streami in 1..=2 {
925
                let (mut send, recv) = futures::channel::mpsc::channel::<Value>(2);
926
                pollset
927
                    .try_insert(Key(streami), Priority(streami), recv.peekable())
928
                    .unwrap();
929
                rt.spawn_identified(format!("stream{streami}"), async move {
930
                    for val in 0..10 {
931
                        send.send(Value(val * streami)).await.unwrap();
932
                    }
933
                });
934
            }
935

            
936
            let output = Arc::new(Mutex::new(Vec::new()));
937

            
938
            rt.spawn_identified("mux", {
939
                let output = output.clone();
940
                async move {
941
                    loop {
942
                        let (key, priority, value) = futures::future::poll_fn(|ctx| {
943
                            match pollset.poll_ready_iter_mut(ctx).next() {
944
                                Some((key, priority, stream)) => {
945
                                    let Poll::Ready(value) = Pin::new(stream).poll_peek(ctx) else {
946
                                        panic!("poll_ready_iter_mut returned non-ready stream")
947
                                    };
948
                                    Poll::Ready((*key, *priority, value.copied()))
949
                                }
950
                                // No streams ready, but there could be more items coming.
951
                                // The current `ctx` should be registered to wake us
952
                                // if and when there are.
953
                                None => Poll::Pending,
954
                            }
955
                        })
956
                        .await;
957
                        if let Some(value) = value {
958
                            // Take the value, and haphazardly set priority to push this stream "back".
959
                            pollset
960
                                .take_ready_value_and_reprioritize(&key, Priority(priority.0 + 10))
961
                                .unwrap();
962
                            output.lock().unwrap().push((key, value));
963
                        } else {
964
                            // Stream ended. Remove it.
965
                            let _ = pollset.remove(&key).unwrap();
966
                        }
967
                    }
968
                }
969
            });
970

            
971
            rt.advance_until_stalled().await;
972

            
973
            let output = output.lock().unwrap();
974

            
975
            // We can't predict exactly how the stream values will be
976
            // interleaved, but we should get all items from each stream, with
977
            // correct order within each stream.
978
            for streami in 1..=2 {
979
                let expected = (0..10).map(|val| Value(val * streami)).collect::<Vec<_>>();
980
                let actual = output
981
                    .iter()
982
                    .filter_map(|(k, v)| (k == &Key(streami)).then_some(*v))
983
                    .collect::<Vec<_>>();
984
                assert_eq!(actual, expected);
985
            }
986
        });
987
    }
988
}