1
//! Types and code for mapping StreamIDs to streams on a circuit.
2

            
3
mod halfstream;
4

            
5
use crate::congestion::sendme;
6
use crate::stream::RECV_WINDOW_INIT;
7
use crate::stream::StreamMpscReceiver;
8
use crate::stream::cmdcheck::AnyCmdChecker;
9
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl};
10
use crate::stream::queue::StreamQueueSender;
11
use crate::util::stream_poll_set::{KeyAlreadyInsertedError, StreamPollSet};
12
use crate::{Error, Result};
13
use pin_project::pin_project;
14
use tor_async_utils::peekable_stream::{PeekableStream, UnobtrusivePeekableStream};
15
use tor_async_utils::stream_peek::StreamUnobtrusivePeeker;
16
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
17
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
18
use tor_cell::relaycell::{StreamId, msg::AnyRelayMsg};
19

            
20
use std::collections::HashMap;
21
use std::collections::hash_map;
22
use std::num::NonZeroU16;
23
use std::pin::Pin;
24
use std::task::{Poll, Waker};
25
use std::time::Instant;
26
use tor_error::{bad_api_usage, internal};
27

            
28
use rand::Rng;
29

            
30
use tracing::debug;
31

            
32
use halfstream::HalfStream;
33

            
34
/// Entry for an open stream
35
///
36
/// (For the purposes of this module, an open stream is one where we have not
37
/// sent or received any message indicating that the stream is ended.)
38
#[derive(Debug)]
39
#[pin_project]
40
pub(super) struct OpenStreamEnt {
41
    /// Sink to send relay cells tagged for this stream into.
42
    pub(super) sink: StreamQueueSender,
43
    /// Number of cells dropped due to the stream disappearing before we can
44
    /// transform this into an `EndSent`.
45
    pub(super) dropped: u16,
46
    /// A `CmdChecker` used to tell whether cells on this stream are valid.
47
    pub(super) cmd_checker: AnyCmdChecker,
48
    /// Flow control for this stream.
49
    // Non-pub because we need to proxy `put_for_incoming_sendme` to ensure
50
    // `flow_ctrl_waker` is woken.
51
    flow_ctrl: StreamFlowCtrl,
52
    /// Stream for cells that should be sent down this stream.
53
    // Not directly exposed. This should only be polled via
54
    // `OpenStreamEntStream`s implementation of `Stream`, which in turn should
55
    // only be used through `StreamPollSet`.
56
    #[pin]
57
    rx: StreamUnobtrusivePeeker<StreamMpscReceiver<AnyRelayMsg>>,
58
    /// Waker to be woken when more sending capacity becomes available (e.g.
59
    /// receiving a SENDME).
60
    flow_ctrl_waker: Option<Waker>,
61
}
62

            
63
impl OpenStreamEnt {
64
    /// Whether this stream is ready to send `msg`.
65
4180
    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
66
4180
        self.flow_ctrl.can_send(msg)
67
4180
    }
68

            
69
    /// Handle an incoming sendme.
70
    ///
71
    /// On failure, return an error: the caller should close the stream or
72
    /// circuit with a protocol error.
73
4
    pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
74
4
        self.flow_ctrl.put_for_incoming_sendme(msg)?;
75
        // Wake the stream if it was blocked on flow control.
76
4
        if let Some(waker) = self.flow_ctrl_waker.take() {
77
            waker.wake();
78
4
        }
79
4
        Ok(())
80
4
    }
81

            
82
    /// The approximate number of stream inbound data bytes buffered.
83
64
    fn approx_stream_bytes_buffered(&self) -> usize {
84
        // NOTE: Here we want to know the total number of buffered incoming stream data bytes. We
85
        // have access to the `StreamQueueSender` and can get how many bytes are buffered in that
86
        // queue.
87
        // But this isn't always the total number of buffered bytes since some bytes might be
88
        // buffered outside of this queue.
89
        // For example `DataReaderImpl` stores some stream bytes in its `pending` buffer, and we
90
        // have no way to access that from here in the reactor. So it's impossible to know the total
91
        // number of incoming stream data bytes that are buffered.
92
        //
93
        // This isn't really an issue in practice since *most* of the bytes will be queued in the
94
        // `StreamQueueSender`, the XOFF threshold is very large, and we don't need to be exact.
95
64
        self.sink.approx_stream_bytes()
96
64
    }
97

            
98
    /// Check if we should send an XON message.
99
    ///
100
    /// If we should, then returns the XON message that should be sent.
101
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
102
    pub(crate) fn maybe_send_xon(&mut self, rate: XonKbpsEwma) -> Result<Option<Xon>> {
103
        self.flow_ctrl
104
            .maybe_send_xon(rate, self.approx_stream_bytes_buffered())
105
    }
106

            
107
    /// Check if we should send an XOFF message.
108
    ///
109
    /// If we should, then returns the XOFF message that should be sent.
110
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
111
64
    pub(super) fn maybe_send_xoff(&mut self) -> Result<Option<Xoff>> {
112
64
        self.flow_ctrl
113
64
            .maybe_send_xoff(self.approx_stream_bytes_buffered())
114
64
    }
115

            
116
    /// Handle an incoming XON message.
117
    ///
118
    /// On failure, return an error: the caller should close the stream or
119
    /// circuit with a protocol error.
120
    pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
121
        self.flow_ctrl.handle_incoming_xon(msg)
122
    }
123

            
124
    /// Handle an incoming XOFF message.
125
    ///
126
    /// On failure, return an error: the caller should close the stream or
127
    /// circuit with a protocol error.
128
    pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
129
        self.flow_ctrl.handle_incoming_xoff(msg)
130
    }
131

            
132
    /// Inform the flow control code that we're about to send `msg`.
133
    /// Should be called at the point we've fully committed to sending the message.
134
    /// Returns an error if we can't send `msg` and should close the circuit.
135
    //
136
    // TODO: Consider not exposing this, and instead calling in
137
    // `StreamMap::take_ready_msg`.
138
4156
    pub(crate) fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
139
4156
        self.flow_ctrl.about_to_send(msg)
140
4156
    }
141
}
142

            
143
/// Private wrapper over `OpenStreamEnt`. We implement `futures::Stream` for
144
/// this wrapper, and not directly for `OpenStreamEnt`, so that client code
145
/// can't directly access the stream.
146
#[derive(Debug)]
147
#[pin_project]
148
struct OpenStreamEntStream {
149
    /// Inner value.
150
    #[pin]
151
    inner: OpenStreamEnt,
152
}
153

            
154
impl futures::Stream for OpenStreamEntStream {
155
    type Item = AnyRelayMsg;
156

            
157
4180
    fn poll_next(
158
4180
        mut self: std::pin::Pin<&mut Self>,
159
4180
        cx: &mut std::task::Context<'_>,
160
4180
    ) -> Poll<Option<Self::Item>> {
161
4180
        if !self.as_mut().poll_peek_mut(cx).is_ready() {
162
            return Poll::Pending;
163
4180
        };
164
4180
        let res = self.project().inner.project().rx.poll_next(cx);
165
4180
        debug_assert!(res.is_ready());
166
        // TODO: consider calling `inner.flow_ctrl.about_to_send` here;
167
        // particularly if we change it to return a wrapper type that proves
168
        // we've taken the capacity. Otherwise it'd make it tricky in the reactor
169
        // to be sure we've correctly taken the capacity, since messages can originate
170
        // in other parts of the code (currently none of those should be of types that
171
        // count towards flow control, but that may change).
172
4180
        res
173
4180
    }
174
}
175

            
176
impl PeekableStream for OpenStreamEntStream {
177
12778
    fn poll_peek_mut(
178
12778
        self: Pin<&mut Self>,
179
12778
        cx: &mut std::task::Context<'_>,
180
12778
    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
181
12778
        let s = self.project();
182
12778
        let inner = s.inner.project();
183
12778
        let m = match inner.rx.poll_peek_mut(cx) {
184
12548
            Poll::Ready(Some(m)) => m,
185
58
            Poll::Ready(None) => return Poll::Ready(None),
186
172
            Poll::Pending => return Poll::Pending,
187
        };
188
12548
        if !inner.flow_ctrl.can_send(m) {
189
            inner.flow_ctrl_waker.replace(cx.waker().clone());
190
            return Poll::Pending;
191
12548
        }
192
12548
        Poll::Ready(Some(m))
193
12778
    }
194
}
195

            
196
impl UnobtrusivePeekableStream for OpenStreamEntStream {
197
4238
    fn unobtrusive_peek_mut(
198
4238
        self: std::pin::Pin<&mut Self>,
199
4238
    ) -> Option<&mut <Self as futures::Stream>::Item> {
200
4238
        let s = self.project();
201
4238
        let inner = s.inner.project();
202
4238
        let m = inner.rx.unobtrusive_peek_mut()?;
203
4180
        if inner.flow_ctrl.can_send(m) {
204
4180
            Some(m)
205
        } else {
206
            None
207
        }
208
4238
    }
209
}
210

            
211
/// Entry for a stream where we have sent an END, or other message
212
/// indicating that the stream is terminated.
213
#[derive(Debug)]
214
pub(super) struct EndSentStreamEnt {
215
    /// A "half-stream" that we use to check the validity of incoming
216
    /// messages on this stream.
217
    pub(super) half_stream: HalfStream,
218
    /// True if the sender on this stream has been explicitly dropped;
219
    /// false if we got an explicit close from `close_pending`
220
    explicitly_dropped: bool,
221
    /// When this entry should be removed from the stream map.
222
    ///
223
    /// This is the amount of time we are willing to wait for
224
    /// an END ack before removing the half-stream from the map.
225
    pub(super) expiry: Instant,
226
}
227

            
228
/// The entry for a stream.
229
#[derive(Debug)]
230
enum ClosedStreamEnt {
231
    /// A stream for which we have received an END cell, but not yet
232
    /// had the stream object get dropped.
233
    EndReceived,
234
    /// A stream for which we have sent an END cell but not yet received an END
235
    /// cell.
236
    ///
237
    /// TODO(arti#264) Can we ever throw this out? Do we really get END cells for
238
    /// these?
239
    EndSent(EndSentStreamEnt),
240
}
241

            
242
/// Mutable reference to a stream entry.
243
pub(super) enum StreamEntMut<'a> {
244
    /// An open stream.
245
    Open(&'a mut OpenStreamEnt),
246
    /// A stream for which we have received an END cell, but not yet
247
    /// had the stream object get dropped.
248
    EndReceived,
249
    /// A stream for which we have sent an END cell but not yet received an END
250
    /// cell.
251
    EndSent(&'a mut EndSentStreamEnt),
252
}
253

            
254
impl<'a> From<&'a mut ClosedStreamEnt> for StreamEntMut<'a> {
255
36
    fn from(value: &'a mut ClosedStreamEnt) -> Self {
256
36
        match value {
257
14
            ClosedStreamEnt::EndReceived => Self::EndReceived,
258
22
            ClosedStreamEnt::EndSent(e) => Self::EndSent(e),
259
        }
260
36
    }
261
}
262

            
263
impl<'a> From<&'a mut OpenStreamEntStream> for StreamEntMut<'a> {
264
8614
    fn from(value: &'a mut OpenStreamEntStream) -> Self {
265
8614
        Self::Open(&mut value.inner)
266
8614
    }
267
}
268

            
269
/// Return value to indicate whether or not we send an END cell upon
270
/// terminating a given stream.
271
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
272
pub(super) enum ShouldSendEnd {
273
    /// An END cell should be sent.
274
    Send,
275
    /// An END cell should not be sent.
276
    DontSend,
277
}
278

            
279
/// A priority for use with [`StreamPollSet`].
280
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
281
struct Priority(u64);
282

            
283
/// A map from stream IDs to stream entries. Each circuit has one for each
284
/// hop.
285
pub(crate) struct StreamMap {
286
    /// Open streams.
287
    // Invariants:
288
    // * Keys are disjoint with `closed_streams`.
289
    open_streams: StreamPollSet<StreamId, Priority, OpenStreamEntStream>,
290
    /// Closed streams.
291
    // Invariants:
292
    // * Keys are disjoint with `open_streams`.
293
    closed_streams: HashMap<StreamId, ClosedStreamEnt>,
294
    /// The next StreamId that we should use for a newly allocated
295
    /// circuit.
296
    next_stream_id: StreamId,
297
    /// Next priority to use in `open_streams`. We implement round-robin scheduling of
298
    /// handling outgoing messages from streams by assigning a stream the next
299
    /// priority whenever an outgoing message is processed from that stream,
300
    /// putting it last in line.
301
    next_priority: Priority,
302
}
303

            
304
impl StreamMap {
305
    /// Make a new empty StreamMap.
306
1022
    pub(crate) fn new() -> Self {
307
1022
        let mut rng = rand::rng();
308
1022
        let next_stream_id: NonZeroU16 = rng.random();
309
1022
        StreamMap {
310
1022
            open_streams: StreamPollSet::new(),
311
1022
            closed_streams: HashMap::new(),
312
1022
            next_stream_id: next_stream_id.into(),
313
1022
            next_priority: Priority(0),
314
1022
        }
315
1022
    }
316

            
317
    /// Return the number of open streams in this map.
318
530
    pub(super) fn n_open_streams(&self) -> usize {
319
530
        self.open_streams.len()
320
530
    }
321

            
322
    /// Return a [`TunnelActivity`](crate::util::tunnel_activity::TunnelActivity) for this hop.
323
    pub(super) fn tunnel_activity(&self) -> crate::util::tunnel_activity::TunnelActivity {
324
        self.open_streams.tunnel_activity()
325
    }
326

            
327
    /// Return the next available priority.
328
4568
    fn take_next_priority(&mut self) -> Priority {
329
4568
        let rv = self.next_priority;
330
4568
        self.next_priority = Priority(rv.0 + 1);
331
4568
        rv
332
4568
    }
333

            
334
    /// Add an entry to this map; return the newly allocated StreamId.
335
352
    pub(super) fn add_ent(
336
352
        &mut self,
337
352
        sink: StreamQueueSender,
338
352
        rx: StreamMpscReceiver<AnyRelayMsg>,
339
352
        flow_ctrl: StreamFlowCtrl,
340
352
        cmd_checker: AnyCmdChecker,
341
352
    ) -> Result<StreamId> {
342
352
        let mut stream_ent = OpenStreamEntStream {
343
352
            inner: OpenStreamEnt {
344
352
                sink,
345
352
                flow_ctrl,
346
352
                dropped: 0,
347
352
                cmd_checker,
348
352
                rx: StreamUnobtrusivePeeker::new(rx),
349
352
                flow_ctrl_waker: None,
350
352
            },
351
352
        };
352
352
        let priority = self.take_next_priority();
353
        // This "65536" seems too aggressive, but it's what tor does.
354
        //
355
        // Also, going around in a loop here is (sadly) needed in order
356
        // to look like Tor clients.
357
352
        for _ in 1..=65536 {
358
352
            let id: StreamId = self.next_stream_id;
359
352
            self.next_stream_id = wrapping_next_stream_id(self.next_stream_id);
360
352
            stream_ent = match self.open_streams.try_insert(id, priority, stream_ent) {
361
352
                Ok(_) => return Ok(id),
362
                Err(KeyAlreadyInsertedError {
363
                    key: _,
364
                    priority: _,
365
                    stream,
366
                }) => stream,
367
            };
368
        }
369

            
370
        Err(Error::IdRangeFull)
371
352
    }
372

            
373
    /// Add an entry to this map using the specified StreamId.
374
    #[cfg(any(feature = "hs-service", feature = "relay"))]
375
36
    pub(super) fn add_ent_with_id(
376
36
        &mut self,
377
36
        sink: StreamQueueSender,
378
36
        rx: StreamMpscReceiver<AnyRelayMsg>,
379
36
        flow_ctrl: StreamFlowCtrl,
380
36
        id: StreamId,
381
36
        cmd_checker: AnyCmdChecker,
382
36
    ) -> Result<()> {
383
36
        let stream_ent = OpenStreamEntStream {
384
36
            inner: OpenStreamEnt {
385
36
                sink,
386
36
                flow_ctrl,
387
36
                dropped: 0,
388
36
                cmd_checker,
389
36
                rx: StreamUnobtrusivePeeker::new(rx),
390
36
                flow_ctrl_waker: None,
391
36
            },
392
36
        };
393
36
        let priority = self.take_next_priority();
394
36
        self.open_streams
395
36
            .try_insert(id, priority, stream_ent)
396
36
            .map_err(|_| Error::IdUnavailable(id))
397
36
    }
398

            
399
    /// Return the entry for `id` in this map, if any.
400
8692
    pub(super) fn get_mut(&mut self, id: StreamId) -> Option<StreamEntMut<'_>> {
401
8692
        if let Some(e) = self.open_streams.stream_mut(&id) {
402
8614
            return Some(e.into());
403
78
        }
404
78
        if let Some(e) = self.closed_streams.get_mut(&id) {
405
36
            return Some(e.into());
406
42
        }
407
42
        None
408
8692
    }
409

            
410
    /// Note that we received an END message (or other message indicating the end of
411
    /// the stream) on the stream with `id`.
412
    ///
413
    /// Returns true if there was really a stream there.
414
44
    pub(super) fn ending_msg_received(&mut self, id: StreamId) -> Result<()> {
415
44
        if self.open_streams.remove(&id).is_some() {
416
26
            let prev = self.closed_streams.insert(id, ClosedStreamEnt::EndReceived);
417
26
            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
418
26
            return Ok(());
419
18
        }
420
18
        let hash_map::Entry::Occupied(closed_entry) = self.closed_streams.entry(id) else {
421
2
            return Err(Error::CircProto(
422
2
                "Received END cell on nonexistent stream".into(),
423
2
            ));
424
        };
425
        // Progress the stream's state machine accordingly
426
16
        match closed_entry.get() {
427
2
            ClosedStreamEnt::EndReceived => Err(Error::CircProto(
428
2
                "Received two END cells on same stream".into(),
429
2
            )),
430
            ClosedStreamEnt::EndSent { .. } => {
431
14
                debug!("Actually got an end cell on a half-closed stream!");
432
                // We got an END, and we already sent an END. Great!
433
                // we can forget about this stream.
434
14
                closed_entry.remove_entry();
435
14
                Ok(())
436
            }
437
        }
438
44
    }
439

            
440
    /// Handle a termination of the stream with `id` from this side of
441
    /// the circuit. Return true if the stream was open and an END
442
    /// ought to be sent.
443
76
    pub(super) fn terminate(
444
76
        &mut self,
445
76
        id: StreamId,
446
76
        why: TerminateReason,
447
76
        expiry: Instant,
448
76
    ) -> Result<ShouldSendEnd> {
449
        use TerminateReason as TR;
450

            
451
76
        if let Some((_id, _priority, ent)) = self.open_streams.remove(&id) {
452
            let OpenStreamEntStream {
453
                inner:
454
                    OpenStreamEnt {
455
72
                        flow_ctrl,
456
72
                        dropped,
457
72
                        cmd_checker,
458
                        // notably absent: the channels for sink and stream, which will get dropped and
459
                        // closed (meaning reads/writes from/to this stream will now fail)
460
                        ..
461
                    },
462
72
            } = ent;
463
            // FIXME(eta): we don't copy the receive window, instead just creating a new one,
464
            //             so a malicious peer can send us slightly more data than they should
465
            //             be able to; see arti#230.
466
72
            let mut recv_window = sendme::StreamRecvWindow::new(RECV_WINDOW_INIT);
467
72
            recv_window.decrement_n(dropped)?;
468
            // TODO: would be nice to avoid new_ref.
469
72
            let half_stream = HalfStream::new(flow_ctrl, recv_window, cmd_checker);
470
72
            let explicitly_dropped = why == TR::StreamTargetClosed;
471

            
472
72
            let prev = self.closed_streams.insert(
473
72
                id,
474
72
                ClosedStreamEnt::EndSent(EndSentStreamEnt {
475
72
                    half_stream,
476
72
                    explicitly_dropped,
477
72
                    expiry,
478
72
                }),
479
            );
480
72
            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
481
72
            return Ok(ShouldSendEnd::Send);
482
4
        }
483

            
484
        // Progress the stream's state machine accordingly
485
4
        match self
486
4
            .closed_streams
487
4
            .remove(&id)
488
5
            .ok_or_else(|| Error::from(internal!("Somehow we terminated a nonexistent stream?")))?
489
        {
490
2
            ClosedStreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
491
            ClosedStreamEnt::EndSent(EndSentStreamEnt {
492
                ref mut explicitly_dropped,
493
                ..
494
            }) => match (*explicitly_dropped, why) {
495
                (false, TR::StreamTargetClosed) => {
496
                    *explicitly_dropped = true;
497
                    Ok(ShouldSendEnd::DontSend)
498
                }
499
                (true, TR::StreamTargetClosed) => {
500
                    Err(bad_api_usage!("Tried to close an already closed stream.").into())
501
                }
502
                (_, TR::ExplicitEnd) => Err(bad_api_usage!(
503
                    "Tried to end an already closed stream. (explicitly_dropped={:?})",
504
                    *explicitly_dropped
505
                )
506
                .into()),
507
            },
508
        }
509
76
    }
510

            
511
    /// Get an up-to-date iterator of streams with ready items. `Option<AnyRelayMsg>::None`
512
    /// indicates that the local sender has been dropped.
513
    ///
514
    /// Conceptually all streams are in a queue; new streams are added to the
515
    /// back of the queue, and a stream is sent to the back of the queue
516
    /// whenever a ready message is taken from it (via
517
    /// [`Self::take_ready_msg`]). The returned iterator is an ordered view of
518
    /// this queue, showing the subset of streams that have a message ready to
519
    /// send, or whose sender has been dropped.
520
19004
    pub(super) fn poll_ready_streams_iter<'a>(
521
19004
        &'a mut self,
522
19004
        cx: &mut std::task::Context,
523
19004
    ) -> impl Iterator<Item = (StreamId, Option<&'a AnyRelayMsg>)> + 'a + use<'a> {
524
19004
        self.open_streams
525
19004
            .poll_ready_iter_mut(cx)
526
21123
            .map(|(sid, _priority, ent)| {
527
4238
                let ent = Pin::new(ent);
528
4238
                let msg = ent.unobtrusive_peek();
529
4238
                (*sid, msg)
530
4238
            })
531
19004
    }
532

            
533
    /// If the stream `sid` has a message ready, take it, and reprioritize `sid`
534
    /// to the "back of the line" with respect to
535
    /// [`Self::poll_ready_streams_iter`].
536
4180
    pub(super) fn take_ready_msg(&mut self, sid: StreamId) -> Option<AnyRelayMsg> {
537
4180
        let new_priority = self.take_next_priority();
538
4180
        let (_prev_priority, val) = self
539
4180
            .open_streams
540
4180
            .take_ready_value_and_reprioritize(&sid, new_priority)?;
541
4180
        Some(val)
542
4180
    }
543

            
544
    /// Remove all halfstreams that are expired at `now`.
545
20406
    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
546
20464
        self.closed_streams.retain(|_sid, entry| match entry {
547
32
            ClosedStreamEnt::EndReceived => true,
548
84
            ClosedStreamEnt::EndSent(ent) => ent.expiry > now,
549
116
        });
550
20406
    }
551
}
552

            
553
/// A reason for terminating a stream.
554
///
555
/// We use this type in order to ensure that we obey the API restrictions of [`StreamMap::terminate`]
556
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
557
pub(super) enum TerminateReason {
558
    /// Closing a stream because the receiver got `Ok(None)`, indicating that the
559
    /// corresponding senders were all dropped.
560
    StreamTargetClosed,
561
    /// Closing a stream because we were explicitly told to end it via
562
    /// [`StreamTarget::close_pending`](crate::stream::StreamTarget::close_pending).
563
    ExplicitEnd,
564
}
565

            
566
/// Convenience function for doing a wrapping increment of a `StreamId`.
567
612
fn wrapping_next_stream_id(id: StreamId) -> StreamId {
568
612
    let next_val = NonZeroU16::from(id)
569
612
        .checked_add(1)
570
613
        .unwrap_or_else(|| NonZeroU16::new(1).expect("Impossibly got 0 value"));
571
612
    next_val.into()
572
612
}
573

            
574
#[cfg(test)]
575
mod test {
576
    // @@ begin test lint list maintained by maint/add_warning @@
577
    #![allow(clippy::bool_assert_comparison)]
578
    #![allow(clippy::clone_on_copy)]
579
    #![allow(clippy::dbg_macro)]
580
    #![allow(clippy::mixed_attributes_style)]
581
    #![allow(clippy::print_stderr)]
582
    #![allow(clippy::print_stdout)]
583
    #![allow(clippy::single_char_pattern)]
584
    #![allow(clippy::unwrap_used)]
585
    #![allow(clippy::unchecked_time_subtraction)]
586
    #![allow(clippy::useless_vec)]
587
    #![allow(clippy::needless_pass_by_value)]
588
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
589
    use super::*;
590
    use crate::client::circuit::test::fake_mpsc;
591
    use crate::stream::queue::fake_stream_queue;
592
    use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow};
593

            
594
    #[test]
595
    fn test_wrapping_next_stream_id() {
596
        let one = StreamId::new(1).unwrap();
597
        let two = StreamId::new(2).unwrap();
598
        let max = StreamId::new(0xffff).unwrap();
599
        assert_eq!(wrapping_next_stream_id(one), two);
600
        assert_eq!(wrapping_next_stream_id(max), one);
601
    }
602

            
603
    #[test]
604
    #[allow(clippy::cognitive_complexity)]
605
    fn streammap_basics() -> Result<()> {
606
        let mut map = StreamMap::new();
607
        let mut next_id = map.next_stream_id;
608
        let mut ids = Vec::new();
609

            
610
        assert_eq!(map.n_open_streams(), 0);
611

            
612
        // Try add_ent
613
        for n in 1..=128 {
614
            let (sink, _) = fake_stream_queue(
615
                #[cfg(not(feature = "flowctl-cc"))]
616
                128,
617
            );
618
            let (_, rx) = fake_mpsc(2);
619
            let id = map.add_ent(
620
                sink,
621
                rx,
622
                StreamFlowCtrl::new_window(StreamSendWindow::new(500)),
623
                OutboundDataCmdChecker::new_any(),
624
            )?;
625
            let expect_id: StreamId = next_id;
626
            assert_eq!(expect_id, id);
627
            next_id = wrapping_next_stream_id(next_id);
628
            ids.push(id);
629
            assert_eq!(map.n_open_streams(), n);
630
        }
631

            
632
        // Test get_mut.
633
        let nonesuch_id = next_id;
634
        assert!(matches!(
635
            map.get_mut(ids[0]),
636
            Some(StreamEntMut::Open { .. })
637
        ));
638
        assert!(map.get_mut(nonesuch_id).is_none());
639

            
640
        // Test end_received
641
        assert!(map.ending_msg_received(nonesuch_id).is_err());
642
        assert_eq!(map.n_open_streams(), 128);
643
        assert!(map.ending_msg_received(ids[1]).is_ok());
644
        assert_eq!(map.n_open_streams(), 127);
645
        assert!(matches!(
646
            map.get_mut(ids[1]),
647
            Some(StreamEntMut::EndReceived)
648
        ));
649
        assert!(map.ending_msg_received(ids[1]).is_err());
650

            
651
        // Test terminate
652
        use TerminateReason as TR;
653
        let expiry = Instant::now(); // dummy value, unused outside of the reactor
654
        assert!(map.terminate(nonesuch_id, TR::ExplicitEnd, expiry).is_err());
655
        assert_eq!(map.n_open_streams(), 127);
656
        assert_eq!(
657
            map.terminate(ids[2], TR::ExplicitEnd, expiry).unwrap(),
658
            ShouldSendEnd::Send
659
        );
660
        assert_eq!(map.n_open_streams(), 126);
661
        assert!(matches!(
662
            map.get_mut(ids[2]),
663
            Some(StreamEntMut::EndSent { .. })
664
        ));
665
        assert_eq!(
666
            map.terminate(ids[1], TR::ExplicitEnd, expiry).unwrap(),
667
            ShouldSendEnd::DontSend
668
        );
669
        // This stream was already closed when we called `ending_msg_received`
670
        // above.
671
        assert_eq!(map.n_open_streams(), 126);
672
        assert!(map.get_mut(ids[1]).is_none());
673

            
674
        // Try receiving an end after a terminate.
675
        assert!(map.ending_msg_received(ids[2]).is_ok());
676
        assert!(map.get_mut(ids[2]).is_none());
677
        assert_eq!(map.n_open_streams(), 126);
678

            
679
        Ok(())
680
    }
681
}