1
//! The stream reactor.
2

            
3
use crate::circuit::circhop::CircHopOutbound;
4
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
5
use crate::circuit::{CircHopSyncView, UniqId};
6
use crate::congestion::{CongestionControl, sendme};
7
use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
8
use crate::stream::CloseStreamBehavior;
9
use crate::stream::cmdcheck::StreamStatus;
10
use crate::streammap;
11
use crate::util::err::ReactorError;
12
use crate::{Error, HopNum};
13

            
14
#[cfg(any(feature = "hs-service", feature = "relay"))]
15
use crate::stream::incoming::{
16
    InboundDataCmdChecker, IncomingStreamRequest, IncomingStreamRequestContext,
17
    IncomingStreamRequestDisposition, IncomingStreamRequestHandler, StreamReqInfo,
18
};
19

            
20
use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
21
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, End, EndReason};
22
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
23
use tor_error::into_internal;
24
use tor_log_ratelim::log_ratelim;
25
use tor_rtcompat::{DynTimeProvider, Runtime, SleepProvider as _};
26

            
27
use derive_deftly::Deftly;
28
use futures::SinkExt;
29
use futures::channel::mpsc;
30
use futures::{FutureExt as _, StreamExt as _, future, select_biased};
31
use tracing::debug;
32

            
33
use std::pin::Pin;
34
use std::result::Result as StdResult;
35
use std::sync::{Arc, Mutex};
36
use std::task::Poll;
37
use std::time::Duration;
38

            
39
/// Trait for customizing the behavior of the stream reactor.
40
///
41
/// Used for plugging in the implementation-dependent (client vs relay)
42
/// parts of the implementation into the generic one.
43
pub(crate) trait StreamHandler: Send + Sync + 'static {
44
    /// Return the amount of time a newly closed stream
45
    /// should be kept in the stream map for.
46
    ///
47
    /// This is the amount of time we are willing to wait for
48
    /// an END ack before removing the half-stream from the map.
49
    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration;
50
}
51

            
52
/// The stream reactor for a given hop.
53
///
54
/// Drives the application streams.
55
///
56
/// This reactor accepts [`StreamMsg`]s from the forward reactor over its [`Self::cell_rx`]
57
/// MPSC channel, and delivers them to the corresponding stream entries in the stream map.
58
///
59
/// The local streams are polled from the main loop, and any ready messages are sent
60
/// to the backward reactor over the `bwd_tx` MPSC channel for packaging and delivery.
61
///
62
/// Shuts downs down if an error occurs, or if the sending end
63
/// of the `cell_rx` MPSC channel, i.e. the forward reactor, closes.
64
#[derive(Deftly)]
65
#[derive_deftly(CircuitReactor)]
66
#[deftly(reactor_name = "stream reactor")]
67
#[deftly(run_inner_fn = "Self::run_once")]
68
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
69
pub(crate) struct StreamReactor {
70
    /// The hop this stream reactor is for.
71
    ///
72
    /// This is `None` for relays.
73
    hopnum: Option<HopNum>,
74
    /// The state of this circuit hop.
75
    hop: CircHopOutbound,
76
    /// The time provider.
77
    time_provider: DynTimeProvider,
78
    /// An identifier for logging about this reactor's circuit.
79
    unique_id: UniqId,
80
    /// Receiver for Tor stream data that need to be delivered to a Tor stream.
81
    ///
82
    /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
83
    /// carrying Tor stream data to us.
84
    ///
85
    /// This serves a dual purpose:
86
    ///
87
    ///   * it enables the `ForwardReactor` to deliver Tor stream data received from the client
88
    ///   * it lets the `StreamReactor` know if the `ForwardReactor` has shut down:
89
    ///     we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
90
    ///     shuts down, we will get EOS upon calling `.next()`)
91
    cell_rx: mpsc::Receiver<StreamMsg>,
92
    /// Sender for sending Tor stream data to [`BackwardReactor`](super::BackwardReactor).
93
    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
94
    /// A handler for incoming streams.
95
    ///
96
    /// Set to `None` if incoming streams are not allowed on this circuit.
97
    ///
98
    /// This handler is shared with the [`HopMgr`](super::hop_mgr::HopMgr) of this reactor,
99
    /// which can install a new handler at runtime (for example, in response to a CtrlMsg).
100
    /// The ability to update the handler after the reactor is launched is needed
101
    /// for onion services, where the incoming stream request handler only gets installed
102
    /// after the virtual hop is created.
103
    #[cfg(any(feature = "hs-service", feature = "relay"))]
104
    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
105
    /// A handler for customizing the stream reactor behavior.
106
    inner: Arc<dyn StreamHandler>,
107
    /// Memory quota account
108
    memquota: CircuitAccount,
109
}
110

            
111
#[allow(unused)] // TODO(relay)
112
impl StreamReactor {
113
    /// Create a new [`StreamReactor`].
114
    #[allow(clippy::too_many_arguments)] // TODO
115
4
    pub(crate) fn new<R: Runtime>(
116
4
        runtime: R,
117
4
        hopnum: Option<HopNum>,
118
4
        hop: CircHopOutbound,
119
4
        unique_id: UniqId,
120
4
        cell_rx: mpsc::Receiver<StreamMsg>,
121
4
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
122
4
        inner: Arc<dyn StreamHandler>,
123
4
        #[cfg(any(feature = "hs-service", feature = "relay"))] //
124
4
        incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
125
4
        memquota: CircuitAccount,
126
4
    ) -> Self {
127
4
        Self {
128
4
            hopnum,
129
4
            hop,
130
4
            time_provider: DynTimeProvider::new(runtime),
131
4
            unique_id,
132
4
            #[cfg(any(feature = "hs-service", feature = "relay"))]
133
4
            incoming,
134
4
            cell_rx,
135
4
            bwd_tx,
136
4
            inner,
137
4
            memquota,
138
4
        }
139
4
    }
140

            
141
    /// Helper for [`run`](Self::run).
142
    ///
143
    /// Polls the stream map for messages
144
    /// that need to be delivered to the other endpoint,
145
    /// and the `cells_rx` MPSC stream for stream messages received
146
    /// from the `ForwardReactor` that need to be delivered to the application streams.
147
30
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
148
        use postage::prelude::{Sink as _, Stream as _};
149

            
150
        // Garbage-collect all halfstreams that have expired.
151
        //
152
        // Note: this will iterate over the closed streams of this hop.
153
        // If we think this will cause perf issues, one idea would be to make
154
        // StreamMap::closed_streams into a min-heap, and add a branch to the
155
        // select_biased! below to sleep until the first expiry is due
156
        // (but my gut feeling is that iterating is cheaper)
157
20
        self.hop
158
20
            .stream_map()
159
20
            .lock()
160
20
            .expect("poisoned lock")
161
20
            .remove_expired_halfstreams(self.time_provider.now());
162

            
163
20
        let mut streams = Arc::clone(self.hop.stream_map());
164
20
        let can_send = self
165
20
            .hop
166
20
            .ccontrol()
167
20
            .lock()
168
20
            .expect("poisoned lock")
169
20
            .can_send();
170
20
        let mut ready_streams_fut = future::poll_fn(move |cx| {
171
14
            if !can_send {
172
                // We can't send anything on this hop that counts towards SENDME windows.
173
                //
174
                // Note: this does not block outgoing flow-control messages:
175
                //
176
                //   * circuit SENDMEs are initiated by the forward reactor,
177
                //     by sending a BackwardReactorCmd::SendRelayMsg to BWD,
178
                //   * stream SENDMEs will be initiated by StreamTarget::send_sendme(),
179
                //     by sending a control message to the reactor
180
                //     (TODO(relay): not yet implemented)
181
                //   * XOFFs are sent in response to messages on streams
182
                //     (i.e. RELAY messages with non-zero stream IDs).
183
                //     These messages are delivered to us by the forward reactor
184
                //     inside BackwardReactorCmd::HandleMsg
185
                //   * XON will be initiated by StreamTarget::drain_rate_update(),
186
                //     by sending a control message to the reactor
187
                //     (TODO(relay): not yet implemented)\
188
                return Poll::Pending;
189
14
            }
190

            
191
14
            let mut streams = streams.lock().expect("lock poisoned");
192
14
            let Some((sid, msg)) = streams.poll_ready_streams_iter(cx).next() else {
193
                // No ready streams
194
                //
195
                // TODO(flushing): if there are no ready Tor streams, we might want to defer
196
                // flushing until stream data becomes available (or until a timeout elapses).
197
                // The deferred flushing approach should enable us to send
198
                // more than one message at a time to the channel reactor.
199
6
                return Poll::Pending;
200
            };
201

            
202
8
            if msg.is_none() {
203
                // This means the local sender has been dropped,
204
                // which presumably can only happen if an error occurs,
205
                // or if the Tor stream ends. In both cases, we're going to
206
                // want to send an END to the client to let them know,
207
                // and to remove the stream from the stream map.
208
                //
209
                // TODO(relay): the local sender part is not implemented yet
210
4
                return Poll::Ready(StreamEvent::Closed {
211
4
                    sid,
212
4
                    behav: CloseStreamBehavior::default(),
213
4
                    reason: streammap::TerminateReason::StreamTargetClosed,
214
4
                });
215
4
            };
216

            
217
4
            let msg = streams.take_ready_msg(sid).expect("msg disappeared");
218

            
219
4
            Poll::Ready(StreamEvent::ReadyMsg { sid, msg })
220
14
        });
221

            
222
20
        select_biased! {
223
20
            res = self.cell_rx.next().fuse() => {
224
12
                let Some(cmd) = res else {
225
                    // The forward reactor has shut down
226
4
                    return Err(ReactorError::Shutdown);
227
                };
228

            
229
8
                self.handle_reactor_cmd(cmd).await?;
230
            }
231
20
            event = ready_streams_fut.fuse() => {
232
8
                self.handle_stream_event(event).await?;
233
            }
234
        }
235

            
236
16
        Ok(())
237
20
    }
238

            
239
    /// Handle a stream message sent to us by the forward reactor.
240
    ///
241
    /// Delivers the message to its corresponding application stream.
242
12
    async fn handle_reactor_cmd(&mut self, msg: StreamMsg) -> StdResult<(), ReactorError> {
243
        let StreamMsg {
244
8
            sid,
245
8
            msg,
246
8
            cell_counts_toward_windows,
247
8
        } = msg;
248

            
249
        // We need to apply stream-level flow control *before* encoding the message.
250
        // May optionally return a message that needs to be sent back to the client.
251
8
        let bwd_msg = self.handle_msg(sid, msg, cell_counts_toward_windows)?;
252

            
253
        // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
254
8
        if let Some(bwd_msg) = bwd_msg {
255
            // We might be out of capacity entirely; see if we are about to hit a limit.
256
            //
257
            // TODO: If we ever add a notion of _recoverable_ errors below, we'll
258
            // need a way to restore this limit, and similarly for about_to_send().
259
            self.hop.decrement_cell_limit()?;
260

            
261
            let c_t_w = sendme::cmd_counts_towards_windows(bwd_msg.cmd());
262

            
263
            // We need to apply stream-level flow control *before* encoding the message
264
            // (the BWD handles the encoding)
265
            if c_t_w {
266
                if let Some(stream_id) = bwd_msg.stream_id() {
267
                    self.hop
268
                        .about_to_send(self.unique_id, stream_id, bwd_msg.msg())?;
269
                }
270
            }
271

            
272
            // NOTE: on the client side, we call note_data_sent()
273
            // just before writing the cell to the channel.
274
            // We can't do that here, because we're not the ones
275
            // encoding the cell, so we don't have the SENDME tag
276
            // which is needed for note_data_sent().
277
            //
278
            // Instead, we notify the CC algorithm in the BWD,
279
            // right after we've finished sending the cell.
280

            
281
            self.send_msg_to_bwd(bwd_msg).await?;
282
8
        }
283

            
284
8
        Ok(())
285
8
    }
286

            
287
    /// Handle a RELAY message that has a non-zero stream ID.
288
    ///
289
    /// A returned message is one that we need to send back to the client.
290
    //
291
    // TODO(relay): this is very similar to the client impl from
292
    // Circuit::handle_in_order_relay_msg()
293
8
    fn handle_msg(
294
8
        &mut self,
295
8
        streamid: StreamId,
296
8
        msg: UnparsedRelayMsg,
297
8
        cell_counts_toward_windows: bool,
298
8
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
299
8
        let cmd = msg.cmd();
300
8
        let possible_proto_violation_err = move |streamid: StreamId| {
301
            Error::StreamProto(format!(
302
                "Unexpected {cmd:?} message on unknown stream {streamid}"
303
            ))
304
        };
305
8
        let now = self.time_provider.now();
306

            
307
        // Check if any of our already-open streams want this message
308
8
        let res = self.hop.handle_msg(
309
8
            possible_proto_violation_err,
310
8
            cell_counts_toward_windows,
311
8
            streamid,
312
8
            msg,
313
8
            now,
314
        )?;
315

            
316
        // If it was an incoming stream request, we don't need to worry about
317
        // sending an XOFF as there's no stream data within this message.
318
8
        if let Some(msg) = res {
319
            cfg_if::cfg_if! {
320
                if #[cfg(any(feature = "hs-service", feature = "relay"))] {
321
4
                    return self.handle_incoming_stream_request(streamid, msg);
322
                } else {
323
                    return Err(
324
                        tor_error::internal!(
325
                            "incoming stream not rejected, but relay and hs-service features are disabled?!"
326
                            ).into()
327
                    );
328
                }
329
            }
330
4
        }
331

            
332
        // We may want to send an XOFF if the incoming buffer is too large.
333
4
        if let Some(cell) = self.hop.maybe_send_xoff(streamid)? {
334
            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
335
            return Ok(Some(cell));
336
4
        }
337

            
338
4
        Ok(None)
339
8
    }
340

            
341
    /// A helper for handling incoming stream requests.
342
    ///
343
    /// Accepts the specified incoming stream request,
344
    /// by adding a new entry to our stream map.
345
    ///
346
    /// Returns the cell we need to send back to the client,
347
    /// if an error occurred and the stream cannot be opened.
348
    ///
349
    /// Returns None if everything went well
350
    /// (the CONNECTED response only comes if the external
351
    /// consumer of our [Stream](futures::Stream) of incoming Tor streams
352
    /// is able to actually establish the connection to the address
353
    /// specified in the BEGIN).
354
    ///
355
    /// Any error returned from this function will shut down the reactor.
356
    #[cfg(any(feature = "hs-service", feature = "relay"))]
357
4
    fn handle_incoming_stream_request(
358
4
        &mut self,
359
4
        sid: StreamId,
360
4
        msg: UnparsedRelayMsg,
361
4
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
362
4
        let mut lock = self.incoming.lock().expect("poisoned lock");
363
4
        let Some(handler) = lock.as_mut() else {
364
            return Err(
365
                Error::CircProto("Cannot handle BEGIN cells on this circuit".into()).into(),
366
            );
367
        };
368

            
369
4
        if self.hopnum != handler.hop_num {
370
            let expected_hopnum = match handler.hop_num {
371
                Some(hopnum) => hopnum.display().to_string(),
372
                None => "client".to_string(),
373
            };
374

            
375
            let actual_hopnum = match self.hopnum {
376
                Some(hopnum) => hopnum.display().to_string(),
377
                None => "None".to_string(),
378
            };
379

            
380
            return Err(Error::CircProto(format!(
381
                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
382
                expected_hopnum,
383
                msg.cmd(),
384
                actual_hopnum,
385
            ))
386
            .into());
387
4
        }
388

            
389
4
        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
390

            
391
4
        if message_closes_stream {
392
            self.hop
393
                .stream_map()
394
                .lock()
395
                .expect("poisoned lock")
396
                .ending_msg_received(sid)?;
397

            
398
            return Ok(None);
399
4
        }
400

            
401
4
        let req = parse_incoming_stream_req(msg)?;
402
4
        let view = CircHopSyncView::new(&self.hop);
403

            
404
4
        if let Some(reject) = Self::should_reject_incoming(handler, sid, &req, &view)? {
405
            // We can't honor this request, so we bail by sending an END.
406
            return Ok(Some(reject));
407
4
        };
408

            
409
4
        let memquota =
410
4
            StreamAccount::new(&self.memquota).map_err(|e| ReactorError::Err(e.into()))?;
411

            
412
4
        let cmd_checker = InboundDataCmdChecker::new_connected();
413
4
        let stream_components =
414
4
            self.hop
415
4
                .add_ent_with_id(&self.time_provider, sid, cmd_checker, &memquota)?;
416

            
417
4
        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
418
4
            req,
419
4
            stream_id: sid,
420
4
            hop: None,
421
4
            stream_components,
422
4
            memquota,
423
4
            relay_cell_format: self.hop.relay_cell_format(),
424
4
        });
425

            
426
4
        log_ratelim!("Delivering message to incoming stream handler"; outcome);
427

            
428
4
        if let Err(e) = outcome {
429
            if e.is_full() {
430
                // The IncomingStreamRequestHandler's stream is full; it isn't
431
                // handling requests fast enough. So instead, we reply with an
432
                // END cell.
433
                let end_msg = AnyRelayMsgOuter::new(
434
                    Some(sid),
435
                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
436
                );
437

            
438
                return Ok(Some(end_msg));
439
            } else if e.is_disconnected() {
440
                // The IncomingStreamRequestHandler's stream has been dropped.
441
                // In the Tor protocol as it stands, this always means that the
442
                // circuit itself is out-of-use and should be closed.
443
                //
444
                // Note that we will _not_ reach this point immediately after
445
                // the IncomingStreamRequestHandler is dropped; we won't hit it
446
                // until we next get an incoming request.  Thus, if we later
447
                // want to add early detection for a dropped
448
                // IncomingStreamRequestHandler, we need to do it elsewhere, in
449
                // a different way.
450
                debug!(
451
                    circ_id = %self.unique_id,
452
                    "Incoming stream request receiver dropped",
453
                );
454
                // This will _cause_ the circuit to get closed.
455
                return Err(ReactorError::Err(Error::CircuitClosed));
456
            } else {
457
                // There are no errors like this with the current design of
458
                // futures::mpsc, but we shouldn't just ignore the possibility
459
                // that they'll be added later.
460
                return Err(
461
                    Error::from((into_internal!("try_send failed unexpectedly"))(e)).into(),
462
                );
463
            }
464
4
        }
465

            
466
4
        Ok(None)
467
4
    }
468

            
469
    /// Check if we should reject this incoming stream request or not.
470
    ///
471
    /// Returns a cell we need to send back to the client if we must reject the request,
472
    /// or `None` if we are allowed to accept it.
473
    ///`
474
    /// Any error returned from this function will shut down the reactor.
475
    #[cfg(any(feature = "hs-service", feature = "relay"))]
476
4
    fn should_reject_incoming<'a>(
477
4
        handler: &mut IncomingStreamRequestHandler,
478
4
        sid: StreamId,
479
4
        request: &IncomingStreamRequest,
480
4
        view: &CircHopSyncView<'a>,
481
4
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
482
        use IncomingStreamRequestDisposition::*;
483

            
484
4
        let ctx = IncomingStreamRequestContext { request };
485

            
486
        // Run the externally provided filter to check if we should
487
        // open the stream or not.
488
4
        match handler.filter.as_mut().disposition(&ctx, view)? {
489
            Accept => {
490
                // All is well, we can accept the stream request
491
4
                Ok(None)
492
            }
493
            CloseCircuit => Err(ReactorError::Shutdown),
494
            RejectRequest(end) => {
495
                let end_msg = AnyRelayMsgOuter::new(Some(sid), end.into());
496

            
497
                Ok(Some(end_msg))
498
            }
499
        }
500
4
    }
501

            
502
    /// Handle a [`StreamEvent`].
503
12
    async fn handle_stream_event(&mut self, event: StreamEvent) -> StdResult<(), ReactorError> {
504
8
        match event {
505
4
            StreamEvent::Closed { sid, behav, reason } => {
506
4
                let timeout = self.inner.halfstream_expiry(&self.hop);
507
4
                let expire_at = self.time_provider.now() + timeout;
508
4
                let res =
509
4
                    self.hop
510
4
                        .close_stream(self.unique_id, sid, None, behav, reason, expire_at)?;
511
4
                let Some(msg) = res else {
512
                    // We may not need to send anything at all...
513
                    return Ok(());
514
                };
515

            
516
4
                self.send_msg_to_bwd(msg.cell).await
517
            }
518
4
            StreamEvent::ReadyMsg { sid, msg } => {
519
4
                self.send_msg_to_bwd(AnyRelayMsgOuter::new(Some(sid), msg))
520
4
                    .await
521
            }
522
        }
523
8
    }
524

            
525
    /// Wrap `msg` in [`ReadyStreamMsg`], and send it to the backward reactor.
526
12
    async fn send_msg_to_bwd(&mut self, msg: AnyRelayMsgOuter) -> StdResult<(), ReactorError> {
527
8
        let msg = ReadyStreamMsg {
528
8
            hop: self.hopnum,
529
8
            relay_cell_format: self.hop.relay_cell_format(),
530
8
            ccontrol: Arc::clone(self.hop.ccontrol()),
531
8
            msg,
532
8
        };
533

            
534
8
        self.bwd_tx
535
8
            .send(msg)
536
8
            .await
537
8
            .map_err(|_| ReactorError::Shutdown)?;
538

            
539
8
        Ok(())
540
8
    }
541
}
542

            
543
/// A Tor stream-related event.
544
enum StreamEvent {
545
    /// A stream was closed.
546
    ///
547
    /// It needs to be removed from the reactor's stream map.
548
    Closed {
549
        /// The ID of the stream to close.
550
        sid: StreamId,
551
        /// The stream-closing behavior.
552
        behav: CloseStreamBehavior,
553
        /// The reason for closing the stream.
554
        reason: streammap::TerminateReason,
555
    },
556
    /// A stream has a ready message.
557
    ReadyMsg {
558
        /// The ID of the stream to close.
559
        sid: StreamId,
560
        /// The message.
561
        msg: AnyRelayMsg,
562
    },
563
}
564

            
565
/// Convert an incoming stream request message (BEGIN, BEGIN_DIR, RESOLVE, etc.)
566
/// to an [`IncomingStreamRequest`]
567
#[cfg(any(feature = "hs-service", feature = "relay"))]
568
4
fn parse_incoming_stream_req(msg: UnparsedRelayMsg) -> crate::Result<IncomingStreamRequest> {
569
    // TODO(relay): support other stream-initiating messages, not just BEGIN
570
4
    let begin = msg
571
4
        .decode::<Begin>()
572
4
        .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
573
4
        .into_msg();
574

            
575
4
    Ok(IncomingStreamRequest::Begin(begin))
576
4
}
577

            
578
/// A stream message to be sent to the backward reactor for delivery.
579
pub(crate) struct ReadyStreamMsg {
580
    /// The hop number, or `None` if we are a relay.
581
    pub(crate) hop: Option<HopNum>,
582
    /// The message to send.
583
    pub(crate) msg: AnyRelayMsgOuter,
584
    /// The cell format used with the hop the message should be sent to.
585
    pub(crate) relay_cell_format: RelayCellFormat,
586
    /// The CC object to use.
587
    pub(crate) ccontrol: Arc<Mutex<CongestionControl>>,
588
}
589

            
590
/// Stream data received from the other endpoint
591
/// that needs to be handled by [`StreamReactor`].
592
pub(crate) struct StreamMsg {
593
    /// The ID of the stream this message is for.
594
    pub(crate) sid: StreamId,
595
    /// The message.
596
    pub(crate) msg: UnparsedRelayMsg,
597
    /// Whether the cell this message came from counts towards flow-control windows.
598
    pub(crate) cell_counts_toward_windows: bool,
599
}