1
//! The stream reactor.
2

            
3
use crate::circuit::circhop::CircHopOutbound;
4
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
5
use crate::circuit::{CircHopSyncView, UniqId};
6
use crate::congestion::{CongestionControl, sendme};
7
use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
8
use crate::stream::CloseStreamBehavior;
9
use crate::stream::cmdcheck::StreamStatus;
10
use crate::stream::flow_ctrl::state::StreamRateLimit;
11
use crate::stream::queue::stream_queue;
12
use crate::streammap;
13
use crate::util::err::ReactorError;
14
use crate::util::notify::NotifySender;
15
use crate::{Error, HopNum};
16

            
17
#[cfg(any(feature = "hs-service", feature = "relay"))]
18
use crate::stream::incoming::{
19
    InboundDataCmdChecker, IncomingStreamRequest, IncomingStreamRequestContext,
20
    IncomingStreamRequestDisposition, IncomingStreamRequestHandler, StreamReqInfo,
21
};
22

            
23
use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
24
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, End, EndReason};
25
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
26
use tor_error::into_internal;
27
use tor_log_ratelim::log_ratelim;
28
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
29
use tor_rtcompat::{DynTimeProvider, Runtime, SleepProvider as _};
30

            
31
use derive_deftly::Deftly;
32
use futures::SinkExt;
33
use futures::channel::mpsc;
34
use futures::{FutureExt as _, StreamExt as _, future, select_biased};
35
use postage::watch;
36
use tracing::debug;
37

            
38
use std::pin::Pin;
39
use std::result::Result as StdResult;
40
use std::sync::{Arc, Mutex};
41
use std::task::Poll;
42
use std::time::Duration;
43

            
44
/// Size of the buffer for communication between a StreamTarget and the reactor.
45
///
46
// TODO(tuning): figure out if this is a good size for this buffer
47
const CIRCUIT_BUFFER_SIZE: usize = 128;
48

            
49
/// Trait for customizing the behavior of the stream reactor.
50
///
51
/// Used for plugging in the implementation-dependent (client vs relay)
52
/// parts of the implementation into the generic one.
53
pub(crate) trait StreamHandler: Send + Sync + 'static {
54
    /// Return the amount of time a newly closed stream
55
    /// should be kept in the stream map for.
56
    ///
57
    /// This is the amount of time we are willing to wait for
58
    /// an END ack before removing the half-stream from the map.
59
    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration;
60
}
61

            
62
/// The stream reactor for a given hop.
63
///
64
/// Drives the application streams.
65
///
66
/// This reactor accepts [`StreamMsg`]s from the forward reactor over its [`Self::cell_rx`]
67
/// MPSC channel, and delivers them to the corresponding stream entries in the stream map.
68
///
69
/// The local streams are polled from the main loop, and any ready messages are sent
70
/// to the backward reactor over the `bwd_tx` MPSC channel for packaging and delivery.
71
///
72
/// Shuts downs down if an error occurs, or if the sending end
73
/// of the `cell_rx` MPSC channel, i.e. the forward reactor, closes.
74
#[derive(Deftly)]
75
#[derive_deftly(CircuitReactor)]
76
#[deftly(reactor_name = "stream reactor")]
77
#[deftly(run_inner_fn = "Self::run_once")]
78
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
79
pub(crate) struct StreamReactor {
80
    /// The hop this stream reactor is for.
81
    ///
82
    /// This is `None` for relays.
83
    hopnum: Option<HopNum>,
84
    /// The state of this circuit hop.
85
    hop: CircHopOutbound,
86
    /// The time provider.
87
    time_provider: DynTimeProvider,
88
    /// An identifier for logging about this reactor's circuit.
89
    unique_id: UniqId,
90
    /// Receiver for Tor stream data that need to be delivered to a Tor stream.
91
    ///
92
    /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
93
    /// carrying Tor stream data to us.
94
    ///
95
    /// This serves a dual purpose:
96
    ///
97
    ///   * it enables the `ForwardReactor` to deliver Tor stream data received from the client
98
    ///   * it lets the `StreamReactor` know if the `ForwardReactor` has shut down:
99
    ///     we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
100
    ///     shuts down, we will get EOS upon calling `.next()`)
101
    cell_rx: mpsc::Receiver<StreamMsg>,
102
    /// Sender for sending Tor stream data to [`BackwardReactor`](super::BackwardReactor).
103
    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
104
    /// A handler for incoming streams.
105
    ///
106
    /// Set to `None` if incoming streams are not allowed on this circuit.
107
    ///
108
    /// This handler is shared with the [`HopMgr`](super::hop_mgr::HopMgr) of this reactor,
109
    /// which can install a new handler at runtime (for example, in response to a CtrlMsg).
110
    /// The ability to update the handler after the reactor is launched is needed
111
    /// for onion services, where the incoming stream request handler only gets installed
112
    /// after the virtual hop is created.
113
    #[cfg(any(feature = "hs-service", feature = "relay"))]
114
    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
115
    /// A handler for customizing the stream reactor behavior.
116
    inner: Arc<dyn StreamHandler>,
117
    /// Memory quota account
118
    memquota: CircuitAccount,
119
}
120

            
121
#[allow(unused)] // TODO(relay)
122
impl StreamReactor {
123
    /// Create a new [`StreamReactor`].
124
    #[allow(clippy::too_many_arguments)] // TODO
125
    pub(crate) fn new<R: Runtime>(
126
        runtime: R,
127
        hopnum: Option<HopNum>,
128
        hop: CircHopOutbound,
129
        unique_id: UniqId,
130
        cell_rx: mpsc::Receiver<StreamMsg>,
131
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
132
        inner: Arc<dyn StreamHandler>,
133
        #[cfg(any(feature = "hs-service", feature = "relay"))] //
134
        incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
135
        memquota: CircuitAccount,
136
    ) -> Self {
137
        Self {
138
            hopnum,
139
            hop,
140
            time_provider: DynTimeProvider::new(runtime),
141
            unique_id,
142
            #[cfg(any(feature = "hs-service", feature = "relay"))]
143
            incoming,
144
            cell_rx,
145
            bwd_tx,
146
            inner,
147
            memquota,
148
        }
149
    }
150

            
151
    /// Helper for [`run`](Self::run).
152
    ///
153
    /// Polls the stream map for messages
154
    /// that need to be delivered to the other endpoint,
155
    /// and the `cells_rx` MPSC stream for stream messages received
156
    /// from the `ForwardReactor` that need to be delivered to the application streams.
157
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
158
        use postage::prelude::{Sink as _, Stream as _};
159

            
160
        // Garbage-collect all halfstreams that have expired.
161
        //
162
        // Note: this will iterate over the closed streams of this hop.
163
        // If we think this will cause perf issues, one idea would be to make
164
        // StreamMap::closed_streams into a min-heap, and add a branch to the
165
        // select_biased! below to sleep until the first expiry is due
166
        // (but my gut feeling is that iterating is cheaper)
167
        self.hop
168
            .stream_map()
169
            .lock()
170
            .expect("poisoned lock")
171
            .remove_expired_halfstreams(self.time_provider.now());
172

            
173
        let mut streams = Arc::clone(self.hop.stream_map());
174
        let can_send = self
175
            .hop
176
            .ccontrol()
177
            .lock()
178
            .expect("poisoned lock")
179
            .can_send();
180
        let mut ready_streams_fut = future::poll_fn(move |cx| {
181
            if !can_send {
182
                // We can't send anything on this hop that counts towards SENDME windows.
183
                //
184
                // Note: this does not block outgoing flow-control messages:
185
                //
186
                //   * circuit SENDMEs are initiated by the forward reactor,
187
                //     by sending a BackwardReactorCmd::SendRelayMsg to BWD,
188
                //   * stream SENDMEs will be initiated by StreamTarget::send_sendme(),
189
                //     by sending a control message to the reactor
190
                //     (TODO(relay): not yet implemented)
191
                //   * XOFFs are sent in response to messages on streams
192
                //     (i.e. RELAY messages with non-zero stream IDs).
193
                //     These messages are delivered to us by the forward reactor
194
                //     inside BackwardReactorCmd::HandleMsg
195
                //   * XON will be initiated by StreamTarget::drain_rate_update(),
196
                //     by sending a control message to the reactor
197
                //     (TODO(relay): not yet implemented)\
198
                return Poll::Pending;
199
            }
200

            
201
            let mut streams = streams.lock().expect("lock poisoned");
202
            let Some((sid, msg)) = streams.poll_ready_streams_iter(cx).next() else {
203
                // No ready streams
204
                //
205
                // TODO(flushing): if there are no ready Tor streams, we might want to defer
206
                // flushing until stream data becomes available (or until a timeout elapses).
207
                // The deferred flushing approach should enable us to send
208
                // more than one message at a time to the channel reactor.
209
                return Poll::Pending;
210
            };
211

            
212
            if msg.is_none() {
213
                // This means the local sender has been dropped,
214
                // which presumably can only happen if an error occurs,
215
                // or if the Tor stream ends. In both cases, we're going to
216
                // want to send an END to the client to let them know,
217
                // and to remove the stream from the stream map.
218
                //
219
                // TODO(relay): the local sender part is not implemented yet
220
                return Poll::Ready(StreamEvent::Closed {
221
                    sid,
222
                    behav: CloseStreamBehavior::default(),
223
                    reason: streammap::TerminateReason::StreamTargetClosed,
224
                });
225
            };
226

            
227
            let msg = streams.take_ready_msg(sid).expect("msg disappeared");
228

            
229
            Poll::Ready(StreamEvent::ReadyMsg { sid, msg })
230
        });
231

            
232
        select_biased! {
233
            res = self.cell_rx.next().fuse() => {
234
                let Some(cmd) = res else {
235
                    // The forward reactor has shut down
236
                    return Err(ReactorError::Shutdown);
237
                };
238

            
239
                self.handle_reactor_cmd(cmd).await?;
240
            }
241
            event = ready_streams_fut.fuse() => {
242
                self.handle_stream_event(event).await?;
243
            }
244
        }
245

            
246
        Ok(())
247
    }
248

            
249
    /// Handle a stream message sent to us by the forward reactor.
250
    ///
251
    /// Delivers the message to its corresponding application stream.
252
    async fn handle_reactor_cmd(&mut self, msg: StreamMsg) -> StdResult<(), ReactorError> {
253
        let StreamMsg {
254
            sid,
255
            msg,
256
            cell_counts_toward_windows,
257
        } = msg;
258

            
259
        // We need to apply stream-level flow control *before* encoding the message.
260
        let msg = self.handle_msg(sid, msg, cell_counts_toward_windows)?;
261

            
262
        // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
263
        if let Some(msg) = msg {
264
            // We might be out of capacity entirely; see if we are about to hit a limit.
265
            //
266
            // TODO: If we ever add a notion of _recoverable_ errors below, we'll
267
            // need a way to restore this limit, and similarly for about_to_send().
268
            self.hop.decrement_cell_limit()?;
269

            
270
            let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
271

            
272
            // We need to apply stream-level flow control *before* encoding the message
273
            // (the BWD handles the encoding)
274
            if c_t_w {
275
                if let Some(stream_id) = msg.stream_id() {
276
                    self.hop
277
                        .about_to_send(self.unique_id, stream_id, msg.msg())?;
278
                }
279
            }
280

            
281
            // NOTE: on the client side, we call note_data_sent()
282
            // just before writing the cell to the channel.
283
            // We can't do that here, because we're not the ones
284
            // encoding the cell, so we don't have the SENDME tag
285
            // which is needed for note_data_sent().
286
            //
287
            // Instead, we notify the CC algorithm in the BWD,
288
            // right after we've finished sending the cell.
289

            
290
            self.send_msg_to_bwd(msg).await?;
291
        }
292

            
293
        Ok(())
294
    }
295

            
296
    /// Handle a RELAY message that has a non-zero stream ID.
297
    ///
298
    // TODO(relay): this is very similar to the client impl from
299
    // Circuit::handle_in_order_relay_msg()
300
    fn handle_msg(
301
        &mut self,
302
        streamid: StreamId,
303
        msg: UnparsedRelayMsg,
304
        cell_counts_toward_windows: bool,
305
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
306
        let cmd = msg.cmd();
307
        let possible_proto_violation_err = move |streamid: StreamId| {
308
            Error::StreamProto(format!(
309
                "Unexpected {cmd:?} message on unknown stream {streamid}"
310
            ))
311
        };
312
        let now = self.time_provider.now();
313

            
314
        // Check if any of our already-open streams want this message
315
        let res = self.hop.handle_msg(
316
            possible_proto_violation_err,
317
            cell_counts_toward_windows,
318
            streamid,
319
            msg,
320
            now,
321
        )?;
322

            
323
        // If it was an incoming stream request, we don't need to worry about
324
        // sending an XOFF as there's no stream data within this message.
325
        if let Some(msg) = res {
326
            cfg_if::cfg_if! {
327
                if #[cfg(any(feature = "hs-service", feature = "relay"))] {
328
                    return self.handle_incoming_stream_request(streamid, msg);
329
                } else {
330
                    return Err(
331
                        tor_error::internal!(
332
                            "incoming stream not rejected, but relay and hs-service features are disabled?!"
333
                            ).into()
334
                    );
335
                }
336
            }
337
        }
338

            
339
        // We may want to send an XOFF if the incoming buffer is too large.
340
        if let Some(cell) = self.hop.maybe_send_xoff(streamid)? {
341
            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
342
            return Ok(Some(cell));
343
        }
344

            
345
        Ok(None)
346
    }
347

            
348
    /// A helper for handling incoming stream requests.
349
    ///
350
    /// Accepts the specified incoming stream request,
351
    /// by adding a new entry to our stream map.
352
    ///
353
    /// Returns the cell we need to send back to the client,
354
    /// if an error occurred and the stream cannot be opened.
355
    ///
356
    /// Returns None if everything went well
357
    /// (the CONNECTED response only comes if the external
358
    /// consumer of our [Stream](futures::Stream) of incoming Tor streams
359
    /// is able to actually establish the connection to the address
360
    /// specified in the BEGIN).
361
    ///
362
    /// Any error returned from this function will shut down the reactor.
363
    #[cfg(any(feature = "hs-service", feature = "relay"))]
364
    fn handle_incoming_stream_request(
365
        &mut self,
366
        sid: StreamId,
367
        msg: UnparsedRelayMsg,
368
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
369
        let mut lock = self.incoming.lock().expect("poisoned lock");
370
        let Some(handler) = lock.as_mut() else {
371
            return Err(
372
                Error::CircProto("Cannot handle BEGIN cells on this circuit".into()).into(),
373
            );
374
        };
375

            
376
        if self.hopnum != handler.hop_num {
377
            let expected_hopnum = match handler.hop_num {
378
                Some(hopnum) => hopnum.display().to_string(),
379
                None => "client".to_string(),
380
            };
381

            
382
            let actual_hopnum = match self.hopnum {
383
                Some(hopnum) => hopnum.display().to_string(),
384
                None => "None".to_string(),
385
            };
386

            
387
            return Err(Error::CircProto(format!(
388
                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
389
                expected_hopnum,
390
                msg.cmd(),
391
                actual_hopnum,
392
            ))
393
            .into());
394
        }
395

            
396
        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
397

            
398
        if message_closes_stream {
399
            self.hop
400
                .stream_map()
401
                .lock()
402
                .expect("poisoned lock")
403
                .ending_msg_received(sid)?;
404

            
405
            return Ok(None);
406
        }
407

            
408
        let req = parse_incoming_stream_req(msg)?;
409
        let view = CircHopSyncView::new(&self.hop);
410

            
411
        if let Some(reject) = Self::should_reject_incoming(handler, sid, &req, &view)? {
412
            // We can't honor this request, so we bail by sending an END.
413
            return Ok(Some(reject));
414
        };
415

            
416
        let memquota =
417
            StreamAccount::new(&self.memquota).map_err(|e| ReactorError::Err(e.into()))?;
418

            
419
        let (sender, receiver) = stream_queue(
420
            #[cfg(not(feature = "flowctl-cc"))]
421
            crate::stream::STREAM_READER_BUFFER,
422
            &memquota,
423
            &self.time_provider,
424
        )
425
        .map_err(|e| ReactorError::Err(e.into()))?;
426

            
427
        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
428
            .new_mq(self.time_provider.clone(), memquota.as_raw_account())
429
            .map_err(|e| ReactorError::Err(e.into()))?;
430

            
431
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
432

            
433
        // A channel for the reactor to request a new drain rate from the reader.
434
        // Typically this notification will be sent after an XOFF is sent so that the reader can
435
        // send us a new drain rate when the stream data queue becomes empty.
436
        let mut drain_rate_request_tx = NotifySender::new_typed();
437
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
438

            
439
        let cmd_checker = InboundDataCmdChecker::new_connected();
440
        self.hop.add_ent_with_id(
441
            sender,
442
            msg_rx,
443
            rate_limit_tx,
444
            drain_rate_request_tx,
445
            sid,
446
            cmd_checker,
447
        )?;
448

            
449
        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
450
            req,
451
            stream_id: sid,
452
            hop: None,
453
            msg_tx,
454
            receiver,
455
            rate_limit_stream: rate_limit_rx,
456
            drain_rate_request_stream: drain_rate_request_rx,
457
            memquota,
458
            relay_cell_format: self.hop.relay_cell_format(),
459
        });
460

            
461
        log_ratelim!("Delivering message to incoming stream handler"; outcome);
462

            
463
        if let Err(e) = outcome {
464
            if e.is_full() {
465
                // The IncomingStreamRequestHandler's stream is full; it isn't
466
                // handling requests fast enough. So instead, we reply with an
467
                // END cell.
468
                let end_msg = AnyRelayMsgOuter::new(
469
                    Some(sid),
470
                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
471
                );
472

            
473
                return Ok(Some(end_msg));
474
            } else if e.is_disconnected() {
475
                // The IncomingStreamRequestHandler's stream has been dropped.
476
                // In the Tor protocol as it stands, this always means that the
477
                // circuit itself is out-of-use and should be closed.
478
                //
479
                // Note that we will _not_ reach this point immediately after
480
                // the IncomingStreamRequestHandler is dropped; we won't hit it
481
                // until we next get an incoming request.  Thus, if we later
482
                // want to add early detection for a dropped
483
                // IncomingStreamRequestHandler, we need to do it elsewhere, in
484
                // a different way.
485
                debug!(
486
                    circ_id = %self.unique_id,
487
                    "Incoming stream request receiver dropped",
488
                );
489
                // This will _cause_ the circuit to get closed.
490
                return Err(ReactorError::Err(Error::CircuitClosed));
491
            } else {
492
                // There are no errors like this with the current design of
493
                // futures::mpsc, but we shouldn't just ignore the possibility
494
                // that they'll be added later.
495
                return Err(
496
                    Error::from((into_internal!("try_send failed unexpectedly"))(e)).into(),
497
                );
498
            }
499
        }
500

            
501
        Ok(None)
502
    }
503

            
504
    /// Check if we should reject this incoming stream request or not.
505
    ///
506
    /// Returns a cell we need to send back to the client if we must reject the request,
507
    /// or `None` if we are allowed to accept it.
508
    ///`
509
    /// Any error returned from this function will shut down the reactor.
510
    #[cfg(any(feature = "hs-service", feature = "relay"))]
511
    fn should_reject_incoming<'a>(
512
        handler: &mut IncomingStreamRequestHandler,
513
        sid: StreamId,
514
        request: &IncomingStreamRequest,
515
        view: &CircHopSyncView<'a>,
516
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
517
        use IncomingStreamRequestDisposition::*;
518

            
519
        let ctx = IncomingStreamRequestContext { request };
520

            
521
        // Run the externally provided filter to check if we should
522
        // open the stream or not.
523
        match handler.filter.as_mut().disposition(&ctx, view)? {
524
            Accept => {
525
                // All is well, we can accept the stream request
526
                Ok(None)
527
            }
528
            CloseCircuit => Err(ReactorError::Shutdown),
529
            RejectRequest(end) => {
530
                let end_msg = AnyRelayMsgOuter::new(Some(sid), end.into());
531

            
532
                Ok(Some(end_msg))
533
            }
534
        }
535
    }
536

            
537
    /// Handle a [`StreamEvent`].
538
    async fn handle_stream_event(&mut self, event: StreamEvent) -> StdResult<(), ReactorError> {
539
        match event {
540
            StreamEvent::Closed { sid, behav, reason } => {
541
                let timeout = self.inner.halfstream_expiry(&self.hop);
542
                let expire_at = self.time_provider.now() + timeout;
543
                let res =
544
                    self.hop
545
                        .close_stream(self.unique_id, sid, None, behav, reason, expire_at)?;
546
                let Some(msg) = res else {
547
                    // We may not need to send anything at all...
548
                    return Ok(());
549
                };
550

            
551
                self.send_msg_to_bwd(msg.cell).await
552
            }
553
            StreamEvent::ReadyMsg { sid, msg } => {
554
                self.send_msg_to_bwd(AnyRelayMsgOuter::new(Some(sid), msg))
555
                    .await
556
            }
557
        }
558
    }
559

            
560
    /// Wrap `msg` in [`ReadyStreamMsg`], and send it to the backward reactor.
561
    async fn send_msg_to_bwd(&mut self, msg: AnyRelayMsgOuter) -> StdResult<(), ReactorError> {
562
        let msg = ReadyStreamMsg {
563
            hop: self.hopnum,
564
            relay_cell_format: self.hop.relay_cell_format(),
565
            ccontrol: Arc::clone(self.hop.ccontrol()),
566
            msg,
567
        };
568

            
569
        self.bwd_tx
570
            .send(msg)
571
            .await
572
            .map_err(|_| ReactorError::Shutdown)?;
573

            
574
        Ok(())
575
    }
576
}
577

            
578
/// A Tor stream-related event.
579
enum StreamEvent {
580
    /// A stream was closed.
581
    ///
582
    /// It needs to be removed from the reactor's stream map.
583
    Closed {
584
        /// The ID of the stream to close.
585
        sid: StreamId,
586
        /// The stream-closing behavior.
587
        behav: CloseStreamBehavior,
588
        /// The reason for closing the stream.
589
        reason: streammap::TerminateReason,
590
    },
591
    /// A stream has a ready message.
592
    ReadyMsg {
593
        /// The ID of the stream to close.
594
        sid: StreamId,
595
        /// The message.
596
        msg: AnyRelayMsg,
597
    },
598
}
599

            
600
/// Convert an incoming stream request message (BEGIN, BEGIN_DIR, RESOLVE, etc.)
601
/// to an [`IncomingStreamRequest`]
602
#[cfg(any(feature = "hs-service", feature = "relay"))]
603
fn parse_incoming_stream_req(msg: UnparsedRelayMsg) -> crate::Result<IncomingStreamRequest> {
604
    // TODO(relay): support other stream-initiating messages, not just BEGIN
605
    let begin = msg
606
        .decode::<Begin>()
607
        .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
608
        .into_msg();
609

            
610
    Ok(IncomingStreamRequest::Begin(begin))
611
}
612

            
613
/// A stream message to be sent to the backward reactor for delivery.
614
pub(crate) struct ReadyStreamMsg {
615
    /// The hop number, or `None` if we are a relay.
616
    pub(crate) hop: Option<HopNum>,
617
    /// The message to send.
618
    pub(crate) msg: AnyRelayMsgOuter,
619
    /// The cell format used with the hop the message should be sent to.
620
    pub(crate) relay_cell_format: RelayCellFormat,
621
    /// The CC object to use.
622
    pub(crate) ccontrol: Arc<Mutex<CongestionControl>>,
623
}
624

            
625
/// Stream data received from the other endpoint
626
/// that needs to be handled by [`StreamReactor`].
627
pub(crate) struct StreamMsg {
628
    /// The ID of the stream this message is for.
629
    pub(crate) sid: StreamId,
630
    /// The message.
631
    pub(crate) msg: UnparsedRelayMsg,
632
    /// Whether the cell this message came from counts towards flow-control windows.
633
    pub(crate) cell_counts_toward_windows: bool,
634
}