1
//! The stream reactor.
2

            
3
use crate::circuit::circhop::CircHopOutbound;
4
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
5
use crate::circuit::{CircHopSyncView, UniqId};
6
use crate::congestion::{CongestionControl, sendme};
7
use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
8
use crate::stream::CloseStreamBehavior;
9
use crate::stream::cmdcheck::StreamStatus;
10
use crate::stream::flow_ctrl::state::StreamRateLimit;
11
use crate::stream::queue::stream_queue;
12
use crate::streammap;
13
use crate::util::err::ReactorError;
14
use crate::util::notify::NotifySender;
15
use crate::{Error, HopNum};
16

            
17
#[cfg(any(feature = "hs-service", feature = "relay"))]
18
use crate::stream::incoming::{
19
    InboundDataCmdChecker, IncomingStreamRequest, IncomingStreamRequestContext,
20
    IncomingStreamRequestDisposition, IncomingStreamRequestHandler, StreamReqInfo,
21
};
22

            
23
use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
24
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, End, EndReason};
25
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
26
use tor_error::into_internal;
27
use tor_log_ratelim::log_ratelim;
28
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
29
use tor_rtcompat::{DynTimeProvider, Runtime, SleepProvider as _};
30

            
31
use derive_deftly::Deftly;
32
use futures::SinkExt;
33
use futures::channel::mpsc;
34
use futures::{FutureExt as _, StreamExt as _, future, select_biased};
35
use postage::watch;
36
use tracing::debug;
37

            
38
use std::pin::Pin;
39
use std::result::Result as StdResult;
40
use std::sync::{Arc, Mutex};
41
use std::task::Poll;
42
use std::time::Duration;
43

            
44
/// Size of the buffer for communication between a StreamTarget and the reactor.
45
///
46
// TODO(tuning): figure out if this is a good size for this buffer
47
const CIRCUIT_BUFFER_SIZE: usize = 128;
48

            
49
/// Trait for customizing the behavior of the stream reactor.
50
///
51
/// Used for plugging in the implementation-dependent (client vs relay)
52
/// parts of the implementation into the generic one.
53
pub(crate) trait StreamHandler: Send + Sync + 'static {
54
    /// Return the amount of time a newly closed stream
55
    /// should be kept in the stream map for.
56
    ///
57
    /// This is the amount of time we are willing to wait for
58
    /// an END ack before removing the half-stream from the map.
59
    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration;
60
}
61

            
62
/// The stream reactor for a given hop.
63
///
64
/// Drives the application streams.
65
///
66
/// This reactor accepts [`StreamMsg`]s from the forward reactor over its [`Self::cell_rx`]
67
/// MPSC channel, and delivers them to the corresponding stream entries in the stream map.
68
///
69
/// The local streams are polled from the main loop, and any ready messages are sent
70
/// to the backward reactor over the `bwd_tx` MPSC channel for packaging and delivery.
71
///
72
/// Shuts downs down if an error occurs, or if the sending end
73
/// of the `cell_rx` MPSC channel, i.e. the forward reactor, closes.
74
#[derive(Deftly)]
75
#[derive_deftly(CircuitReactor)]
76
#[deftly(reactor_name = "stream reactor")]
77
#[deftly(run_inner_fn = "Self::run_once")]
78
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
79
pub(crate) struct StreamReactor {
80
    /// The hop this stream reactor is for.
81
    ///
82
    /// This is `None` for relays.
83
    hopnum: Option<HopNum>,
84
    /// The state of this circuit hop.
85
    hop: CircHopOutbound,
86
    /// The time provider.
87
    time_provider: DynTimeProvider,
88
    /// An identifier for logging about this reactor's circuit.
89
    unique_id: UniqId,
90
    /// Receiver for Tor stream data that need to be delivered to a Tor stream.
91
    ///
92
    /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
93
    /// carrying Tor stream data to us.
94
    ///
95
    /// This serves a dual purpose:
96
    ///
97
    ///   * it enables the `ForwardReactor` to deliver Tor stream data received from the client
98
    ///   * it lets the `StreamReactor` know if the `ForwardReactor` has shut down:
99
    ///     we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
100
    ///     shuts down, we will get EOS upon calling `.next()`)
101
    cell_rx: mpsc::Receiver<StreamMsg>,
102
    /// Sender for sending Tor stream data to [`BackwardReactor`](super::BackwardReactor).
103
    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
104
    /// A handler for incoming streams.
105
    ///
106
    /// Set to `None` if incoming streams are not allowed on this circuit.
107
    ///
108
    /// This handler is shared with the [`HopMgr`](super::hop_mgr::HopMgr) of this reactor,
109
    /// which can install a new handler at runtime (for example, in response to a CtrlMsg).
110
    /// The ability to update the handler after the reactor is launched is needed
111
    /// for onion services, where the incoming stream request handler only gets installed
112
    /// after the virtual hop is created.
113
    #[cfg(any(feature = "hs-service", feature = "relay"))]
114
    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
115
    /// A handler for customizing the stream reactor behavior.
116
    inner: Arc<dyn StreamHandler>,
117
    /// Memory quota account
118
    memquota: CircuitAccount,
119
}
120

            
121
#[allow(unused)] // TODO(relay)
122
impl StreamReactor {
123
    /// Create a new [`StreamReactor`].
124
    #[allow(clippy::too_many_arguments)] // TODO
125
    pub(crate) fn new<R: Runtime>(
126
        runtime: R,
127
        hopnum: Option<HopNum>,
128
        hop: CircHopOutbound,
129
        unique_id: UniqId,
130
        cell_rx: mpsc::Receiver<StreamMsg>,
131
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
132
        inner: Arc<dyn StreamHandler>,
133
        #[cfg(any(feature = "hs-service", feature = "relay"))] //
134
        incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
135
        memquota: CircuitAccount,
136
    ) -> Self {
137
        Self {
138
            hopnum,
139
            hop,
140
            time_provider: DynTimeProvider::new(runtime),
141
            unique_id,
142
            #[cfg(any(feature = "hs-service", feature = "relay"))]
143
            incoming,
144
            cell_rx,
145
            bwd_tx,
146
            inner,
147
            memquota,
148
        }
149
    }
150

            
151
    /// Helper for [`run`](Self::run).
152
    ///
153
    /// Polls the stream map for messages
154
    /// that need to be delivered to the other endpoint,
155
    /// and the `cells_rx` MPSC stream for stream messages received
156
    /// from the `ForwardReactor` that need to be delivered to the application streams.
157
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
158
        use postage::prelude::{Sink as _, Stream as _};
159

            
160
        // Garbage-collect all halfstreams that have expired.
161
        //
162
        // Note: this will iterate over the closed streams of this hop.
163
        // If we think this will cause perf issues, one idea would be to make
164
        // StreamMap::closed_streams into a min-heap, and add a branch to the
165
        // select_biased! below to sleep until the first expiry is due
166
        // (but my gut feeling is that iterating is cheaper)
167
        self.hop
168
            .stream_map()
169
            .lock()
170
            .expect("poisoned lock")
171
            .remove_expired_halfstreams(self.time_provider.now());
172

            
173
        let mut streams = Arc::clone(self.hop.stream_map());
174
        let can_send = self
175
            .hop
176
            .ccontrol()
177
            .lock()
178
            .expect("poisoned lock")
179
            .can_send();
180
        let mut ready_streams_fut = future::poll_fn(move |cx| {
181
            if !can_send {
182
                // We can't send anything on this hop that counts towards SENDME windows.
183
                //
184
                // Note: this does not block outgoing flow-control messages:
185
                //
186
                //   * circuit SENDMEs are initiated by the forward reactor,
187
                //     by sending a BackwardReactorCmd::SendRelayMsg to BWD,
188
                //   * stream SENDMEs will be initiated by StreamTarget::send_sendme(),
189
                //     by sending a control message to the reactor
190
                //     (TODO(relay): not yet implemented)
191
                //   * XOFFs are sent in response to messages on streams
192
                //     (i.e. RELAY messages with non-zero stream IDs).
193
                //     These messages are delivered to us by the forward reactor
194
                //     inside BackwardReactorCmd::HandleMsg
195
                //   * XON will be initiated by StreamTarget::drain_rate_update(),
196
                //     by sending a control message to the reactor
197
                //     (TODO(relay): not yet implemented)\
198
                return Poll::Pending;
199
            }
200

            
201
            let mut streams = streams.lock().expect("lock poisoned");
202
            let Some((sid, msg)) = streams.poll_ready_streams_iter(cx).next() else {
203
                // No ready streams
204
                //
205
                // TODO(flushing): if there are no ready Tor streams, we might want to defer
206
                // flushing until stream data becomes available (or until a timeout elapses).
207
                // The deferred flushing approach should enable us to send
208
                // more than one message at a time to the channel reactor.
209
                return Poll::Pending;
210
            };
211

            
212
            if msg.is_none() {
213
                // This means the local sender has been dropped,
214
                // which presumably can only happen if an error occurs,
215
                // or if the Tor stream ends. In both cases, we're going to
216
                // want to send an END to the client to let them know,
217
                // and to remove the stream from the stream map.
218
                //
219
                // TODO(relay): the local sender part is not implemented yet
220
                return Poll::Ready(StreamEvent::Closed {
221
                    sid,
222
                    behav: CloseStreamBehavior::default(),
223
                    reason: streammap::TerminateReason::StreamTargetClosed,
224
                });
225
            };
226

            
227
            let msg = streams.take_ready_msg(sid).expect("msg disappeared");
228

            
229
            Poll::Ready(StreamEvent::ReadyMsg { sid, msg })
230
        });
231

            
232
        select_biased! {
233
            res = self.cell_rx.next().fuse() => {
234
                let Some(cmd) = res else {
235
                    // The forward reactor has shut down
236
                    return Err(ReactorError::Shutdown);
237
                };
238

            
239
                self.handle_reactor_cmd(cmd).await?;
240
            }
241
            event = ready_streams_fut.fuse() => {
242
                self.handle_stream_event(event).await?;
243
            }
244
        }
245

            
246
        Ok(())
247
    }
248

            
249
    /// Handle a stream message sent to us by the forward reactor.
250
    ///
251
    /// Delivers the message to its corresponding application stream.
252
    async fn handle_reactor_cmd(&mut self, msg: StreamMsg) -> StdResult<(), ReactorError> {
253
        let StreamMsg {
254
            sid,
255
            msg,
256
            cell_counts_toward_windows,
257
        } = msg;
258

            
259
        // We need to apply stream-level flow control *before* encoding the message.
260
        // May optionally return a message that needs to be sent back to the client.
261
        let bwd_msg = self.handle_msg(sid, msg, cell_counts_toward_windows)?;
262

            
263
        // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
264
        if let Some(bwd_msg) = bwd_msg {
265
            // We might be out of capacity entirely; see if we are about to hit a limit.
266
            //
267
            // TODO: If we ever add a notion of _recoverable_ errors below, we'll
268
            // need a way to restore this limit, and similarly for about_to_send().
269
            self.hop.decrement_cell_limit()?;
270

            
271
            let c_t_w = sendme::cmd_counts_towards_windows(bwd_msg.cmd());
272

            
273
            // We need to apply stream-level flow control *before* encoding the message
274
            // (the BWD handles the encoding)
275
            if c_t_w {
276
                if let Some(stream_id) = bwd_msg.stream_id() {
277
                    self.hop
278
                        .about_to_send(self.unique_id, stream_id, bwd_msg.msg())?;
279
                }
280
            }
281

            
282
            // NOTE: on the client side, we call note_data_sent()
283
            // just before writing the cell to the channel.
284
            // We can't do that here, because we're not the ones
285
            // encoding the cell, so we don't have the SENDME tag
286
            // which is needed for note_data_sent().
287
            //
288
            // Instead, we notify the CC algorithm in the BWD,
289
            // right after we've finished sending the cell.
290

            
291
            self.send_msg_to_bwd(bwd_msg).await?;
292
        }
293

            
294
        Ok(())
295
    }
296

            
297
    /// Handle a RELAY message that has a non-zero stream ID.
298
    ///
299
    /// A returned message is one that we need to send back to the client.
300
    //
301
    // TODO(relay): this is very similar to the client impl from
302
    // Circuit::handle_in_order_relay_msg()
303
    fn handle_msg(
304
        &mut self,
305
        streamid: StreamId,
306
        msg: UnparsedRelayMsg,
307
        cell_counts_toward_windows: bool,
308
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
309
        let cmd = msg.cmd();
310
        let possible_proto_violation_err = move |streamid: StreamId| {
311
            Error::StreamProto(format!(
312
                "Unexpected {cmd:?} message on unknown stream {streamid}"
313
            ))
314
        };
315
        let now = self.time_provider.now();
316

            
317
        // Check if any of our already-open streams want this message
318
        let res = self.hop.handle_msg(
319
            possible_proto_violation_err,
320
            cell_counts_toward_windows,
321
            streamid,
322
            msg,
323
            now,
324
        )?;
325

            
326
        // If it was an incoming stream request, we don't need to worry about
327
        // sending an XOFF as there's no stream data within this message.
328
        if let Some(msg) = res {
329
            cfg_if::cfg_if! {
330
                if #[cfg(any(feature = "hs-service", feature = "relay"))] {
331
                    return self.handle_incoming_stream_request(streamid, msg);
332
                } else {
333
                    return Err(
334
                        tor_error::internal!(
335
                            "incoming stream not rejected, but relay and hs-service features are disabled?!"
336
                            ).into()
337
                    );
338
                }
339
            }
340
        }
341

            
342
        // We may want to send an XOFF if the incoming buffer is too large.
343
        if let Some(cell) = self.hop.maybe_send_xoff(streamid)? {
344
            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
345
            return Ok(Some(cell));
346
        }
347

            
348
        Ok(None)
349
    }
350

            
351
    /// A helper for handling incoming stream requests.
352
    ///
353
    /// Accepts the specified incoming stream request,
354
    /// by adding a new entry to our stream map.
355
    ///
356
    /// Returns the cell we need to send back to the client,
357
    /// if an error occurred and the stream cannot be opened.
358
    ///
359
    /// Returns None if everything went well
360
    /// (the CONNECTED response only comes if the external
361
    /// consumer of our [Stream](futures::Stream) of incoming Tor streams
362
    /// is able to actually establish the connection to the address
363
    /// specified in the BEGIN).
364
    ///
365
    /// Any error returned from this function will shut down the reactor.
366
    #[cfg(any(feature = "hs-service", feature = "relay"))]
367
    fn handle_incoming_stream_request(
368
        &mut self,
369
        sid: StreamId,
370
        msg: UnparsedRelayMsg,
371
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
372
        let mut lock = self.incoming.lock().expect("poisoned lock");
373
        let Some(handler) = lock.as_mut() else {
374
            return Err(
375
                Error::CircProto("Cannot handle BEGIN cells on this circuit".into()).into(),
376
            );
377
        };
378

            
379
        if self.hopnum != handler.hop_num {
380
            let expected_hopnum = match handler.hop_num {
381
                Some(hopnum) => hopnum.display().to_string(),
382
                None => "client".to_string(),
383
            };
384

            
385
            let actual_hopnum = match self.hopnum {
386
                Some(hopnum) => hopnum.display().to_string(),
387
                None => "None".to_string(),
388
            };
389

            
390
            return Err(Error::CircProto(format!(
391
                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
392
                expected_hopnum,
393
                msg.cmd(),
394
                actual_hopnum,
395
            ))
396
            .into());
397
        }
398

            
399
        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
400

            
401
        if message_closes_stream {
402
            self.hop
403
                .stream_map()
404
                .lock()
405
                .expect("poisoned lock")
406
                .ending_msg_received(sid)?;
407

            
408
            return Ok(None);
409
        }
410

            
411
        let req = parse_incoming_stream_req(msg)?;
412
        let view = CircHopSyncView::new(&self.hop);
413

            
414
        if let Some(reject) = Self::should_reject_incoming(handler, sid, &req, &view)? {
415
            // We can't honor this request, so we bail by sending an END.
416
            return Ok(Some(reject));
417
        };
418

            
419
        let memquota =
420
            StreamAccount::new(&self.memquota).map_err(|e| ReactorError::Err(e.into()))?;
421

            
422
        let (sender, receiver) = stream_queue(
423
            #[cfg(not(feature = "flowctl-cc"))]
424
            crate::stream::STREAM_READER_BUFFER,
425
            &memquota,
426
            &self.time_provider,
427
        )
428
        .map_err(|e| ReactorError::Err(e.into()))?;
429

            
430
        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
431
            .new_mq(self.time_provider.clone(), memquota.as_raw_account())
432
            .map_err(|e| ReactorError::Err(e.into()))?;
433

            
434
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
435

            
436
        // A channel for the reactor to request a new drain rate from the reader.
437
        // Typically this notification will be sent after an XOFF is sent so that the reader can
438
        // send us a new drain rate when the stream data queue becomes empty.
439
        let mut drain_rate_request_tx = NotifySender::new_typed();
440
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
441

            
442
        let cmd_checker = InboundDataCmdChecker::new_connected();
443
        self.hop.add_ent_with_id(
444
            sender,
445
            msg_rx,
446
            rate_limit_tx,
447
            drain_rate_request_tx,
448
            sid,
449
            cmd_checker,
450
        )?;
451

            
452
        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
453
            req,
454
            stream_id: sid,
455
            hop: None,
456
            msg_tx,
457
            receiver,
458
            rate_limit_stream: rate_limit_rx,
459
            drain_rate_request_stream: drain_rate_request_rx,
460
            memquota,
461
            relay_cell_format: self.hop.relay_cell_format(),
462
        });
463

            
464
        log_ratelim!("Delivering message to incoming stream handler"; outcome);
465

            
466
        if let Err(e) = outcome {
467
            if e.is_full() {
468
                // The IncomingStreamRequestHandler's stream is full; it isn't
469
                // handling requests fast enough. So instead, we reply with an
470
                // END cell.
471
                let end_msg = AnyRelayMsgOuter::new(
472
                    Some(sid),
473
                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
474
                );
475

            
476
                return Ok(Some(end_msg));
477
            } else if e.is_disconnected() {
478
                // The IncomingStreamRequestHandler's stream has been dropped.
479
                // In the Tor protocol as it stands, this always means that the
480
                // circuit itself is out-of-use and should be closed.
481
                //
482
                // Note that we will _not_ reach this point immediately after
483
                // the IncomingStreamRequestHandler is dropped; we won't hit it
484
                // until we next get an incoming request.  Thus, if we later
485
                // want to add early detection for a dropped
486
                // IncomingStreamRequestHandler, we need to do it elsewhere, in
487
                // a different way.
488
                debug!(
489
                    circ_id = %self.unique_id,
490
                    "Incoming stream request receiver dropped",
491
                );
492
                // This will _cause_ the circuit to get closed.
493
                return Err(ReactorError::Err(Error::CircuitClosed));
494
            } else {
495
                // There are no errors like this with the current design of
496
                // futures::mpsc, but we shouldn't just ignore the possibility
497
                // that they'll be added later.
498
                return Err(
499
                    Error::from((into_internal!("try_send failed unexpectedly"))(e)).into(),
500
                );
501
            }
502
        }
503

            
504
        Ok(None)
505
    }
506

            
507
    /// Check if we should reject this incoming stream request or not.
508
    ///
509
    /// Returns a cell we need to send back to the client if we must reject the request,
510
    /// or `None` if we are allowed to accept it.
511
    ///`
512
    /// Any error returned from this function will shut down the reactor.
513
    #[cfg(any(feature = "hs-service", feature = "relay"))]
514
    fn should_reject_incoming<'a>(
515
        handler: &mut IncomingStreamRequestHandler,
516
        sid: StreamId,
517
        request: &IncomingStreamRequest,
518
        view: &CircHopSyncView<'a>,
519
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
520
        use IncomingStreamRequestDisposition::*;
521

            
522
        let ctx = IncomingStreamRequestContext { request };
523

            
524
        // Run the externally provided filter to check if we should
525
        // open the stream or not.
526
        match handler.filter.as_mut().disposition(&ctx, view)? {
527
            Accept => {
528
                // All is well, we can accept the stream request
529
                Ok(None)
530
            }
531
            CloseCircuit => Err(ReactorError::Shutdown),
532
            RejectRequest(end) => {
533
                let end_msg = AnyRelayMsgOuter::new(Some(sid), end.into());
534

            
535
                Ok(Some(end_msg))
536
            }
537
        }
538
    }
539

            
540
    /// Handle a [`StreamEvent`].
541
    async fn handle_stream_event(&mut self, event: StreamEvent) -> StdResult<(), ReactorError> {
542
        match event {
543
            StreamEvent::Closed { sid, behav, reason } => {
544
                let timeout = self.inner.halfstream_expiry(&self.hop);
545
                let expire_at = self.time_provider.now() + timeout;
546
                let res =
547
                    self.hop
548
                        .close_stream(self.unique_id, sid, None, behav, reason, expire_at)?;
549
                let Some(msg) = res else {
550
                    // We may not need to send anything at all...
551
                    return Ok(());
552
                };
553

            
554
                self.send_msg_to_bwd(msg.cell).await
555
            }
556
            StreamEvent::ReadyMsg { sid, msg } => {
557
                self.send_msg_to_bwd(AnyRelayMsgOuter::new(Some(sid), msg))
558
                    .await
559
            }
560
        }
561
    }
562

            
563
    /// Wrap `msg` in [`ReadyStreamMsg`], and send it to the backward reactor.
564
    async fn send_msg_to_bwd(&mut self, msg: AnyRelayMsgOuter) -> StdResult<(), ReactorError> {
565
        let msg = ReadyStreamMsg {
566
            hop: self.hopnum,
567
            relay_cell_format: self.hop.relay_cell_format(),
568
            ccontrol: Arc::clone(self.hop.ccontrol()),
569
            msg,
570
        };
571

            
572
        self.bwd_tx
573
            .send(msg)
574
            .await
575
            .map_err(|_| ReactorError::Shutdown)?;
576

            
577
        Ok(())
578
    }
579
}
580

            
581
/// A Tor stream-related event.
582
enum StreamEvent {
583
    /// A stream was closed.
584
    ///
585
    /// It needs to be removed from the reactor's stream map.
586
    Closed {
587
        /// The ID of the stream to close.
588
        sid: StreamId,
589
        /// The stream-closing behavior.
590
        behav: CloseStreamBehavior,
591
        /// The reason for closing the stream.
592
        reason: streammap::TerminateReason,
593
    },
594
    /// A stream has a ready message.
595
    ReadyMsg {
596
        /// The ID of the stream to close.
597
        sid: StreamId,
598
        /// The message.
599
        msg: AnyRelayMsg,
600
    },
601
}
602

            
603
/// Convert an incoming stream request message (BEGIN, BEGIN_DIR, RESOLVE, etc.)
604
/// to an [`IncomingStreamRequest`]
605
#[cfg(any(feature = "hs-service", feature = "relay"))]
606
fn parse_incoming_stream_req(msg: UnparsedRelayMsg) -> crate::Result<IncomingStreamRequest> {
607
    // TODO(relay): support other stream-initiating messages, not just BEGIN
608
    let begin = msg
609
        .decode::<Begin>()
610
        .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
611
        .into_msg();
612

            
613
    Ok(IncomingStreamRequest::Begin(begin))
614
}
615

            
616
/// A stream message to be sent to the backward reactor for delivery.
617
pub(crate) struct ReadyStreamMsg {
618
    /// The hop number, or `None` if we are a relay.
619
    pub(crate) hop: Option<HopNum>,
620
    /// The message to send.
621
    pub(crate) msg: AnyRelayMsgOuter,
622
    /// The cell format used with the hop the message should be sent to.
623
    pub(crate) relay_cell_format: RelayCellFormat,
624
    /// The CC object to use.
625
    pub(crate) ccontrol: Arc<Mutex<CongestionControl>>,
626
}
627

            
628
/// Stream data received from the other endpoint
629
/// that needs to be handled by [`StreamReactor`].
630
pub(crate) struct StreamMsg {
631
    /// The ID of the stream this message is for.
632
    pub(crate) sid: StreamId,
633
    /// The message.
634
    pub(crate) msg: UnparsedRelayMsg,
635
    /// Whether the cell this message came from counts towards flow-control windows.
636
    pub(crate) cell_counts_toward_windows: bool,
637
}