1
//! Types and code for mapping StreamIDs to streams on a circuit.
2

            
3
mod halfstream;
4

            
5
use crate::stream::StreamMpscReceiver;
6
use crate::stream::cmdcheck::AnyCmdChecker;
7
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, HalfStreamFlowCtrlHooks, StreamFlowCtrl};
8
use crate::stream::queue::StreamQueueSender;
9
use crate::util::stream_poll_set::{KeyAlreadyInsertedError, StreamPollSet};
10
use crate::{Error, Result};
11
use pin_project::pin_project;
12
use tor_async_utils::peekable_stream::{PeekableStream, UnobtrusivePeekableStream};
13
use tor_async_utils::stream_peek::StreamUnobtrusivePeeker;
14
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
15
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
16
use tor_cell::relaycell::{StreamId, msg::AnyRelayMsg};
17

            
18
use std::collections::HashMap;
19
use std::collections::hash_map;
20
use std::num::NonZeroU16;
21
use std::pin::Pin;
22
use std::task::{Poll, Waker};
23
use tor_error::{bad_api_usage, internal};
24
use web_time_compat::Instant;
25

            
26
use rand::RngExt;
27

            
28
use tracing::debug;
29

            
30
use halfstream::HalfStream;
31

            
32
/// Entry for an open stream
33
///
34
/// (For the purposes of this module, an open stream is one where we have not
35
/// sent or received any message indicating that the stream is ended.)
36
#[derive(Debug)]
37
#[pin_project]
38
pub(super) struct OpenStreamEnt {
39
    /// Sink to send relay cells tagged for this stream into.
40
    pub(super) sink: StreamQueueSender,
41
    /// Number of cells dropped due to the stream disappearing before we can
42
    /// transform this into an `EndSent`.
43
    pub(super) dropped: u16,
44
    /// A `CmdChecker` used to tell whether cells on this stream are valid.
45
    pub(super) cmd_checker: AnyCmdChecker,
46
    /// Flow control for this stream.
47
    // Non-pub because we need to proxy `put_for_incoming_sendme` to ensure
48
    // `flow_ctrl_waker` is woken.
49
    flow_ctrl: StreamFlowCtrl,
50
    /// Stream for cells that should be sent down this stream.
51
    // Not directly exposed. This should only be polled via
52
    // `OpenStreamEntStream`s implementation of `Stream`, which in turn should
53
    // only be used through `StreamPollSet`.
54
    #[pin]
55
    rx: StreamUnobtrusivePeeker<StreamMpscReceiver<AnyRelayMsg>>,
56
    /// Waker to be woken when more sending capacity becomes available (e.g.
57
    /// receiving a SENDME).
58
    flow_ctrl_waker: Option<Waker>,
59
}
60

            
61
impl OpenStreamEnt {
62
    /// Whether this stream is ready to send `msg`.
63
4180
    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
64
4180
        self.flow_ctrl.can_send(msg)
65
4180
    }
66

            
67
    /// Handle an incoming sendme.
68
    ///
69
    /// On failure, return an error: the caller should close the stream or
70
    /// circuit with a protocol error.
71
4
    pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
72
4
        self.flow_ctrl.put_for_incoming_sendme(msg)?;
73
        // Wake the stream if it was blocked on flow control.
74
4
        if let Some(waker) = self.flow_ctrl_waker.take() {
75
            waker.wake();
76
4
        }
77
4
        Ok(())
78
4
    }
79

            
80
    /// The approximate number of stream inbound data bytes buffered.
81
64
    fn approx_stream_bytes_buffered(&self) -> usize {
82
        // NOTE: Here we want to know the total number of buffered incoming stream data bytes. We
83
        // have access to the `StreamQueueSender` and can get how many bytes are buffered in that
84
        // queue.
85
        // But this isn't always the total number of buffered bytes since some bytes might be
86
        // buffered outside of this queue.
87
        // For example `DataReaderImpl` stores some stream bytes in its `pending` buffer, and we
88
        // have no way to access that from here in the reactor. So it's impossible to know the total
89
        // number of incoming stream data bytes that are buffered.
90
        //
91
        // This isn't really an issue in practice since *most* of the bytes will be queued in the
92
        // `StreamQueueSender`, the XOFF threshold is very large, and we don't need to be exact.
93
64
        self.sink.approx_stream_bytes()
94
64
    }
95

            
96
    /// Check if we should send an XON message.
97
    ///
98
    /// If we should, then returns the XON message that should be sent.
99
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
100
    pub(crate) fn maybe_send_xon(&mut self, rate: XonKbpsEwma) -> Result<Option<Xon>> {
101
        self.flow_ctrl
102
            .maybe_send_xon(rate, self.approx_stream_bytes_buffered())
103
    }
104

            
105
    /// Check if we should send an XOFF message.
106
    ///
107
    /// If we should, then returns the XOFF message that should be sent.
108
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
109
64
    pub(super) fn maybe_send_xoff(&mut self) -> Result<Option<Xoff>> {
110
64
        self.flow_ctrl
111
64
            .maybe_send_xoff(self.approx_stream_bytes_buffered())
112
64
    }
113

            
114
    /// Handle an incoming XON message.
115
    ///
116
    /// On failure, return an error: the caller should close the stream or
117
    /// circuit with a protocol error.
118
    pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
119
        self.flow_ctrl.handle_incoming_xon(msg)
120
    }
121

            
122
    /// Handle an incoming XOFF message.
123
    ///
124
    /// On failure, return an error: the caller should close the stream or
125
    /// circuit with a protocol error.
126
    pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
127
        self.flow_ctrl.handle_incoming_xoff(msg)
128
    }
129

            
130
    /// Inform the flow control code that we're about to send `msg`.
131
    /// Should be called at the point we've fully committed to sending the message.
132
    /// Returns an error if we can't send `msg` and should close the circuit.
133
    //
134
    // TODO: Consider not exposing this, and instead calling in
135
    // `StreamMap::take_ready_msg`.
136
4156
    pub(crate) fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
137
4156
        self.flow_ctrl.about_to_send(msg)
138
4156
    }
139
}
140

            
141
/// Private wrapper over `OpenStreamEnt`. We implement `futures::Stream` for
142
/// this wrapper, and not directly for `OpenStreamEnt`, so that client code
143
/// can't directly access the stream.
144
#[derive(Debug)]
145
#[pin_project]
146
struct OpenStreamEntStream {
147
    /// Inner value.
148
    #[pin]
149
    inner: OpenStreamEnt,
150
}
151

            
152
impl futures::Stream for OpenStreamEntStream {
153
    type Item = AnyRelayMsg;
154

            
155
4180
    fn poll_next(
156
4180
        mut self: std::pin::Pin<&mut Self>,
157
4180
        cx: &mut std::task::Context<'_>,
158
4180
    ) -> Poll<Option<Self::Item>> {
159
4180
        if !self.as_mut().poll_peek_mut(cx).is_ready() {
160
            return Poll::Pending;
161
4180
        };
162
4180
        let res = self.project().inner.project().rx.poll_next(cx);
163
4180
        debug_assert!(res.is_ready());
164
        // TODO: consider calling `inner.flow_ctrl.about_to_send` here;
165
        // particularly if we change it to return a wrapper type that proves
166
        // we've taken the capacity. Otherwise it'd make it tricky in the reactor
167
        // to be sure we've correctly taken the capacity, since messages can originate
168
        // in other parts of the code (currently none of those should be of types that
169
        // count towards flow control, but that may change).
170
4180
        res
171
4180
    }
172
}
173

            
174
impl PeekableStream for OpenStreamEntStream {
175
12768
    fn poll_peek_mut(
176
12768
        self: Pin<&mut Self>,
177
12768
        cx: &mut std::task::Context<'_>,
178
12768
    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
179
12768
        let s = self.project();
180
12768
        let inner = s.inner.project();
181
12768
        let m = match inner.rx.poll_peek_mut(cx) {
182
12548
            Poll::Ready(Some(m)) => m,
183
56
            Poll::Ready(None) => return Poll::Ready(None),
184
164
            Poll::Pending => return Poll::Pending,
185
        };
186
12548
        if !inner.flow_ctrl.can_send(m) {
187
            inner.flow_ctrl_waker.replace(cx.waker().clone());
188
            return Poll::Pending;
189
12548
        }
190
12548
        Poll::Ready(Some(m))
191
12768
    }
192
}
193

            
194
impl UnobtrusivePeekableStream for OpenStreamEntStream {
195
4236
    fn unobtrusive_peek_mut(
196
4236
        self: std::pin::Pin<&mut Self>,
197
4236
    ) -> Option<&mut <Self as futures::Stream>::Item> {
198
4236
        let s = self.project();
199
4236
        let inner = s.inner.project();
200
4236
        let m = inner.rx.unobtrusive_peek_mut()?;
201
4180
        if inner.flow_ctrl.can_send(m) {
202
4180
            Some(m)
203
        } else {
204
            None
205
        }
206
4236
    }
207
}
208

            
209
/// Entry for a stream where we have sent an END, or other message
210
/// indicating that the stream is terminated.
211
#[derive(Debug)]
212
pub(super) struct EndSentStreamEnt {
213
    /// A "half-stream" that we use to check the validity of incoming
214
    /// messages on this stream.
215
    pub(super) half_stream: HalfStream,
216
    /// True if the sender on this stream has been explicitly dropped;
217
    /// false if we got an explicit close from `close_pending`
218
    explicitly_dropped: bool,
219
    /// When this entry should be removed from the stream map.
220
    ///
221
    /// This is the amount of time we are willing to wait for
222
    /// an END ack before removing the half-stream from the map.
223
    pub(super) expiry: Instant,
224
}
225

            
226
/// The entry for a stream.
227
#[derive(Debug)]
228
enum ClosedStreamEnt {
229
    /// A stream for which we have received an END cell, but not yet
230
    /// had the stream object get dropped.
231
    EndReceived,
232
    /// A stream for which we have sent an END cell but not yet received an END
233
    /// cell.
234
    ///
235
    /// TODO(arti#264) Can we ever throw this out? Do we really get END cells for
236
    /// these?
237
    EndSent(EndSentStreamEnt),
238
}
239

            
240
/// Mutable reference to a stream entry.
241
pub(super) enum StreamEntMut<'a> {
242
    /// An open stream.
243
    Open(&'a mut OpenStreamEnt),
244
    /// A stream for which we have received an END cell, but not yet
245
    /// had the stream object get dropped.
246
    EndReceived,
247
    /// A stream for which we have sent an END cell but not yet received an END
248
    /// cell.
249
    EndSent(&'a mut EndSentStreamEnt),
250
}
251

            
252
impl<'a> From<&'a mut ClosedStreamEnt> for StreamEntMut<'a> {
253
36
    fn from(value: &'a mut ClosedStreamEnt) -> Self {
254
36
        match value {
255
14
            ClosedStreamEnt::EndReceived => Self::EndReceived,
256
22
            ClosedStreamEnt::EndSent(e) => Self::EndSent(e),
257
        }
258
36
    }
259
}
260

            
261
impl<'a> From<&'a mut OpenStreamEntStream> for StreamEntMut<'a> {
262
8614
    fn from(value: &'a mut OpenStreamEntStream) -> Self {
263
8614
        Self::Open(&mut value.inner)
264
8614
    }
265
}
266

            
267
/// Return value to indicate whether or not we send an END cell upon
268
/// terminating a given stream.
269
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
270
pub(super) enum ShouldSendEnd {
271
    /// An END cell should be sent.
272
    Send,
273
    /// An END cell should not be sent.
274
    DontSend,
275
}
276

            
277
/// A priority for use with [`StreamPollSet`].
278
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
279
struct Priority(u64);
280

            
281
/// A map from stream IDs to stream entries. Each circuit has one for each
282
/// hop.
283
pub(crate) struct StreamMap {
284
    /// Open streams.
285
    // Invariants:
286
    // * Keys are disjoint with `closed_streams`.
287
    open_streams: StreamPollSet<StreamId, Priority, OpenStreamEntStream>,
288
    /// Closed streams.
289
    // Invariants:
290
    // * Keys are disjoint with `open_streams`.
291
    closed_streams: HashMap<StreamId, ClosedStreamEnt>,
292
    /// The next StreamId that we should use for a newly allocated
293
    /// circuit.
294
    next_stream_id: StreamId,
295
    /// Next priority to use in `open_streams`. We implement round-robin scheduling of
296
    /// handling outgoing messages from streams by assigning a stream the next
297
    /// priority whenever an outgoing message is processed from that stream,
298
    /// putting it last in line.
299
    next_priority: Priority,
300
}
301

            
302
impl StreamMap {
303
    /// Make a new empty StreamMap.
304
1022
    pub(crate) fn new() -> Self {
305
1022
        let mut rng = rand::rng();
306
1022
        let next_stream_id: NonZeroU16 = rng.random();
307
1022
        StreamMap {
308
1022
            open_streams: StreamPollSet::new(),
309
1022
            closed_streams: HashMap::new(),
310
1022
            next_stream_id: next_stream_id.into(),
311
1022
            next_priority: Priority(0),
312
1022
        }
313
1022
    }
314

            
315
    /// Return the number of open streams in this map.
316
530
    pub(super) fn n_open_streams(&self) -> usize {
317
530
        self.open_streams.len()
318
530
    }
319

            
320
    /// Return a [`TunnelActivity`](crate::util::tunnel_activity::TunnelActivity) for this hop.
321
    pub(super) fn tunnel_activity(&self) -> crate::util::tunnel_activity::TunnelActivity {
322
        self.open_streams.tunnel_activity()
323
    }
324

            
325
    /// Return the next available priority.
326
4568
    fn take_next_priority(&mut self) -> Priority {
327
4568
        let rv = self.next_priority;
328
4568
        self.next_priority = Priority(rv.0 + 1);
329
4568
        rv
330
4568
    }
331

            
332
    /// Add an entry to this map; return the newly allocated StreamId.
333
352
    pub(super) fn add_ent(
334
352
        &mut self,
335
352
        sink: StreamQueueSender,
336
352
        rx: StreamMpscReceiver<AnyRelayMsg>,
337
352
        flow_ctrl: StreamFlowCtrl,
338
352
        cmd_checker: AnyCmdChecker,
339
352
    ) -> Result<StreamId> {
340
352
        let mut stream_ent = OpenStreamEntStream {
341
352
            inner: OpenStreamEnt {
342
352
                sink,
343
352
                flow_ctrl,
344
352
                dropped: 0,
345
352
                cmd_checker,
346
352
                rx: StreamUnobtrusivePeeker::new(rx),
347
352
                flow_ctrl_waker: None,
348
352
            },
349
352
        };
350
352
        let priority = self.take_next_priority();
351
        // This "65536" seems too aggressive, but it's what tor does.
352
        //
353
        // Also, going around in a loop here is (sadly) needed in order
354
        // to look like Tor clients.
355
352
        for _ in 1..=65536 {
356
352
            let id: StreamId = self.next_stream_id;
357
352
            self.next_stream_id = wrapping_next_stream_id(self.next_stream_id);
358
352
            stream_ent = match self.open_streams.try_insert(id, priority, stream_ent) {
359
352
                Ok(_) => return Ok(id),
360
                Err(KeyAlreadyInsertedError {
361
                    key: _,
362
                    priority: _,
363
                    stream,
364
                }) => stream,
365
            };
366
        }
367

            
368
        Err(Error::IdRangeFull)
369
352
    }
370

            
371
    /// Add an entry to this map using the specified StreamId.
372
    #[cfg(any(feature = "hs-service", feature = "relay"))]
373
36
    pub(super) fn add_ent_with_id(
374
36
        &mut self,
375
36
        sink: StreamQueueSender,
376
36
        rx: StreamMpscReceiver<AnyRelayMsg>,
377
36
        flow_ctrl: StreamFlowCtrl,
378
36
        id: StreamId,
379
36
        cmd_checker: AnyCmdChecker,
380
36
    ) -> Result<()> {
381
36
        let stream_ent = OpenStreamEntStream {
382
36
            inner: OpenStreamEnt {
383
36
                sink,
384
36
                flow_ctrl,
385
36
                dropped: 0,
386
36
                cmd_checker,
387
36
                rx: StreamUnobtrusivePeeker::new(rx),
388
36
                flow_ctrl_waker: None,
389
36
            },
390
36
        };
391
36
        let priority = self.take_next_priority();
392
36
        self.open_streams
393
36
            .try_insert(id, priority, stream_ent)
394
36
            .map_err(|_| Error::IdUnavailable(id))
395
36
    }
396

            
397
    /// Return the entry for `id` in this map, if any.
398
8692
    pub(super) fn get_mut(&mut self, id: StreamId) -> Option<StreamEntMut<'_>> {
399
8692
        if let Some(e) = self.open_streams.stream_mut(&id) {
400
8614
            return Some(e.into());
401
78
        }
402
78
        if let Some(e) = self.closed_streams.get_mut(&id) {
403
36
            return Some(e.into());
404
42
        }
405
42
        None
406
8692
    }
407

            
408
    /// Note that we received an END message (or other message indicating the end of
409
    /// the stream) on the stream with `id`.
410
    ///
411
    /// Returns true if there was really a stream there.
412
44
    pub(super) fn ending_msg_received(&mut self, id: StreamId) -> Result<()> {
413
44
        if self.open_streams.remove(&id).is_some() {
414
26
            let prev = self.closed_streams.insert(id, ClosedStreamEnt::EndReceived);
415
26
            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
416
26
            return Ok(());
417
18
        }
418
18
        let hash_map::Entry::Occupied(closed_entry) = self.closed_streams.entry(id) else {
419
2
            return Err(Error::CircProto(
420
2
                "Received END cell on nonexistent stream".into(),
421
2
            ));
422
        };
423
        // Progress the stream's state machine accordingly
424
16
        match closed_entry.get() {
425
2
            ClosedStreamEnt::EndReceived => Err(Error::CircProto(
426
2
                "Received two END cells on same stream".into(),
427
2
            )),
428
            ClosedStreamEnt::EndSent { .. } => {
429
14
                debug!("Actually got an end cell on a half-closed stream!");
430
                // We got an END, and we already sent an END. Great!
431
                // we can forget about this stream.
432
14
                closed_entry.remove_entry();
433
14
                Ok(())
434
            }
435
        }
436
44
    }
437

            
438
    /// Handle a termination of the stream with `id` from this side of
439
    /// the circuit. Return true if the stream was open and an END
440
    /// ought to be sent.
441
70
    pub(super) fn terminate(
442
70
        &mut self,
443
70
        id: StreamId,
444
70
        why: TerminateReason,
445
70
        expiry: Instant,
446
70
    ) -> Result<ShouldSendEnd> {
447
        use TerminateReason as TR;
448

            
449
70
        if let Some((_id, _priority, ent)) = self.open_streams.remove(&id) {
450
            let OpenStreamEntStream {
451
                inner:
452
                    OpenStreamEnt {
453
66
                        flow_ctrl,
454
66
                        dropped,
455
66
                        cmd_checker,
456
                        // notably absent: the channels for sink and stream, which will get dropped and
457
                        // closed (meaning reads/writes from/to this stream will now fail)
458
                        ..
459
                    },
460
66
            } = ent;
461

            
462
66
            let mut flow_ctrl = flow_ctrl.half_stream();
463
66
            flow_ctrl.handle_incoming_dropped(dropped)?;
464

            
465
            // TODO: would be nice to avoid new_ref.
466
66
            let half_stream = HalfStream::new(flow_ctrl, cmd_checker);
467
66
            let explicitly_dropped = why == TR::StreamTargetClosed;
468

            
469
66
            let prev = self.closed_streams.insert(
470
66
                id,
471
66
                ClosedStreamEnt::EndSent(EndSentStreamEnt {
472
66
                    half_stream,
473
66
                    explicitly_dropped,
474
66
                    expiry,
475
66
                }),
476
            );
477
66
            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
478
66
            return Ok(ShouldSendEnd::Send);
479
4
        }
480

            
481
        // Progress the stream's state machine accordingly
482
4
        match self
483
4
            .closed_streams
484
4
            .remove(&id)
485
5
            .ok_or_else(|| Error::from(internal!("Somehow we terminated a nonexistent stream?")))?
486
        {
487
2
            ClosedStreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
488
            ClosedStreamEnt::EndSent(EndSentStreamEnt {
489
                ref mut explicitly_dropped,
490
                ..
491
            }) => match (*explicitly_dropped, why) {
492
                (false, TR::StreamTargetClosed) => {
493
                    *explicitly_dropped = true;
494
                    Ok(ShouldSendEnd::DontSend)
495
                }
496
                (true, TR::StreamTargetClosed) => {
497
                    Err(bad_api_usage!("Tried to close an already closed stream.").into())
498
                }
499
                (_, TR::ExplicitEnd) => Err(bad_api_usage!(
500
                    "Tried to end an already closed stream. (explicitly_dropped={:?})",
501
                    *explicitly_dropped
502
                )
503
                .into()),
504
            },
505
        }
506
70
    }
507

            
508
    /// Get an up-to-date iterator of streams with ready items. `Option<AnyRelayMsg>::None`
509
    /// indicates that the local sender has been dropped.
510
    ///
511
    /// Conceptually all streams are in a queue; new streams are added to the
512
    /// back of the queue, and a stream is sent to the back of the queue
513
    /// whenever a ready message is taken from it (via
514
    /// [`Self::take_ready_msg`]). The returned iterator is an ordered view of
515
    /// this queue, showing the subset of streams that have a message ready to
516
    /// send, or whose sender has been dropped.
517
19034
    pub(super) fn poll_ready_streams_iter<'a>(
518
19034
        &'a mut self,
519
19034
        cx: &mut std::task::Context,
520
19034
    ) -> impl Iterator<Item = (StreamId, Option<&'a AnyRelayMsg>)> + 'a + use<'a> {
521
19034
        self.open_streams
522
19034
            .poll_ready_iter_mut(cx)
523
21152
            .map(|(sid, _priority, ent)| {
524
4236
                let ent = Pin::new(ent);
525
4236
                let msg = ent.unobtrusive_peek();
526
4236
                (*sid, msg)
527
4236
            })
528
19034
    }
529

            
530
    /// If the stream `sid` has a message ready, take it, and reprioritize `sid`
531
    /// to the "back of the line" with respect to
532
    /// [`Self::poll_ready_streams_iter`].
533
4180
    pub(super) fn take_ready_msg(&mut self, sid: StreamId) -> Option<AnyRelayMsg> {
534
4180
        let new_priority = self.take_next_priority();
535
4180
        let (_prev_priority, val) = self
536
4180
            .open_streams
537
4180
            .take_ready_value_and_reprioritize(&sid, new_priority)?;
538
4180
        Some(val)
539
4180
    }
540

            
541
    /// Remove all halfstreams that are expired at `now`.
542
20452
    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
543
20510
        self.closed_streams.retain(|_sid, entry| match entry {
544
30
            ClosedStreamEnt::EndReceived => true,
545
86
            ClosedStreamEnt::EndSent(ent) => ent.expiry > now,
546
116
        });
547
20452
    }
548
}
549

            
550
/// A reason for terminating a stream.
551
///
552
/// We use this type in order to ensure that we obey the API restrictions of [`StreamMap::terminate`]
553
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
554
pub(super) enum TerminateReason {
555
    /// Closing a stream because the receiver got `Ok(None)`, indicating that the
556
    /// corresponding senders were all dropped.
557
    StreamTargetClosed,
558
    /// Closing a stream because we were explicitly told to end it via
559
    /// [`StreamTarget::close_pending`](crate::stream::StreamTarget::close_pending).
560
    ExplicitEnd,
561
}
562

            
563
/// Convenience function for doing a wrapping increment of a `StreamId`.
564
612
fn wrapping_next_stream_id(id: StreamId) -> StreamId {
565
612
    let next_val = NonZeroU16::from(id)
566
612
        .checked_add(1)
567
613
        .unwrap_or_else(|| NonZeroU16::new(1).expect("Impossibly got 0 value"));
568
612
    next_val.into()
569
612
}
570

            
571
#[cfg(test)]
572
mod test {
573
    // @@ begin test lint list maintained by maint/add_warning @@
574
    #![allow(clippy::bool_assert_comparison)]
575
    #![allow(clippy::clone_on_copy)]
576
    #![allow(clippy::dbg_macro)]
577
    #![allow(clippy::mixed_attributes_style)]
578
    #![allow(clippy::print_stderr)]
579
    #![allow(clippy::print_stdout)]
580
    #![allow(clippy::single_char_pattern)]
581
    #![allow(clippy::unwrap_used)]
582
    #![allow(clippy::unchecked_time_subtraction)]
583
    #![allow(clippy::useless_vec)]
584
    #![allow(clippy::needless_pass_by_value)]
585
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
586
    use super::*;
587
    use crate::client::circuit::test::fake_mpsc;
588
    use crate::stream::queue::fake_stream_queue;
589
    use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow};
590
    use web_time_compat::InstantExt;
591

            
592
    #[test]
593
    fn test_wrapping_next_stream_id() {
594
        let one = StreamId::new(1).unwrap();
595
        let two = StreamId::new(2).unwrap();
596
        let max = StreamId::new(0xffff).unwrap();
597
        assert_eq!(wrapping_next_stream_id(one), two);
598
        assert_eq!(wrapping_next_stream_id(max), one);
599
    }
600

            
601
    #[test]
602
    #[allow(clippy::cognitive_complexity)]
603
    fn streammap_basics() -> Result<()> {
604
        let mut map = StreamMap::new();
605
        let mut next_id = map.next_stream_id;
606
        let mut ids = Vec::new();
607

            
608
        assert_eq!(map.n_open_streams(), 0);
609

            
610
        // Try add_ent
611
        for n in 1..=128 {
612
            let (sink, _) = fake_stream_queue(
613
                #[cfg(not(feature = "flowctl-cc"))]
614
                128,
615
            );
616
            let (_, rx) = fake_mpsc(2);
617
            let id = map.add_ent(
618
                sink,
619
                rx,
620
                StreamFlowCtrl::new_window(StreamSendWindow::new(500)),
621
                OutboundDataCmdChecker::new_any(),
622
            )?;
623
            let expect_id: StreamId = next_id;
624
            assert_eq!(expect_id, id);
625
            next_id = wrapping_next_stream_id(next_id);
626
            ids.push(id);
627
            assert_eq!(map.n_open_streams(), n);
628
        }
629

            
630
        // Test get_mut.
631
        let nonesuch_id = next_id;
632
        assert!(matches!(
633
            map.get_mut(ids[0]),
634
            Some(StreamEntMut::Open { .. })
635
        ));
636
        assert!(map.get_mut(nonesuch_id).is_none());
637

            
638
        // Test end_received
639
        assert!(map.ending_msg_received(nonesuch_id).is_err());
640
        assert_eq!(map.n_open_streams(), 128);
641
        assert!(map.ending_msg_received(ids[1]).is_ok());
642
        assert_eq!(map.n_open_streams(), 127);
643
        assert!(matches!(
644
            map.get_mut(ids[1]),
645
            Some(StreamEntMut::EndReceived)
646
        ));
647
        assert!(map.ending_msg_received(ids[1]).is_err());
648

            
649
        // Test terminate
650
        use TerminateReason as TR;
651
        let expiry = Instant::get(); // dummy value, unused outside of the reactor
652
        assert!(map.terminate(nonesuch_id, TR::ExplicitEnd, expiry).is_err());
653
        assert_eq!(map.n_open_streams(), 127);
654
        assert_eq!(
655
            map.terminate(ids[2], TR::ExplicitEnd, expiry).unwrap(),
656
            ShouldSendEnd::Send
657
        );
658
        assert_eq!(map.n_open_streams(), 126);
659
        assert!(matches!(
660
            map.get_mut(ids[2]),
661
            Some(StreamEntMut::EndSent { .. })
662
        ));
663
        assert_eq!(
664
            map.terminate(ids[1], TR::ExplicitEnd, expiry).unwrap(),
665
            ShouldSendEnd::DontSend
666
        );
667
        // This stream was already closed when we called `ending_msg_received`
668
        // above.
669
        assert_eq!(map.n_open_streams(), 126);
670
        assert!(map.get_mut(ids[1]).is_none());
671

            
672
        // Try receiving an end after a terminate.
673
        assert!(map.ending_msg_received(ids[2]).is_ok());
674
        assert!(map.get_mut(ids[2]).is_none());
675
        assert_eq!(map.n_open_streams(), 126);
676

            
677
        Ok(())
678
    }
679
}