1
//! The stream reactor.
2

            
3
use crate::circuit::circhop::CircHopOutbound;
4
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
5
use crate::circuit::{CircHopSyncView, UniqId};
6
use crate::congestion::{CongestionControl, sendme};
7
use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
8
use crate::stream::CloseStreamBehavior;
9
use crate::stream::cmdcheck::StreamStatus;
10
use crate::stream::flow_ctrl::state::StreamRateLimit;
11
use crate::stream::queue::stream_queue;
12
use crate::streammap;
13
use crate::util::err::ReactorError;
14
use crate::util::notify::NotifySender;
15
use crate::util::timeout::TimeoutEstimator;
16
use crate::{Error, HopNum};
17

            
18
#[cfg(any(feature = "hs-service", feature = "relay"))]
19
use crate::stream::incoming::{
20
    InboundDataCmdChecker, IncomingStreamRequest, IncomingStreamRequestContext,
21
    IncomingStreamRequestDisposition, IncomingStreamRequestHandler, StreamReqInfo,
22
};
23

            
24
use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
25
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, End, EndReason};
26
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
27
use tor_error::into_internal;
28
use tor_log_ratelim::log_ratelim;
29
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
30
use tor_rtcompat::{DynTimeProvider, Runtime, SleepProvider as _};
31

            
32
use derive_deftly::Deftly;
33
use futures::SinkExt;
34
use futures::channel::mpsc;
35
use futures::{FutureExt as _, StreamExt as _, future, select_biased};
36
use postage::watch;
37
use tracing::debug;
38

            
39
use std::pin::Pin;
40
use std::result::Result as StdResult;
41
use std::sync::{Arc, Mutex};
42
use std::task::Poll;
43
use std::time::Duration;
44

            
45
/// Size of the buffer for communication between a StreamTarget and the reactor.
46
///
47
// TODO(tuning): figure out if this is a good size for this buffer
48
const CIRCUIT_BUFFER_SIZE: usize = 128;
49

            
50
/// The stream reactor for a given hop.
51
///
52
/// Drives the application streams.
53
///
54
/// This reactor accepts [`StreamMsg`]s from the forward reactor over its [`Self::cell_rx`]
55
/// MPSC channel, and delivers them to the corresponding stream entries in the stream map.
56
///
57
/// The local streams are polled from the main loop, and any ready messages are sent
58
/// to the backward reactor over the `bwd_tx` MPSC channel for packaging and delivery.
59
///
60
/// Shuts downs down if an error occurs, or if the sending end
61
/// of the `cell_rx` MPSC channel, i.e. the forward reactor, closes.
62
#[derive(Deftly)]
63
#[derive_deftly(CircuitReactor)]
64
#[deftly(reactor_name = "stream reactor")]
65
#[deftly(run_inner_fn = "Self::run_once")]
66
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
67
pub(crate) struct StreamReactor {
68
    /// The hop this stream reactor is for.
69
    ///
70
    /// This is `None` for relays.
71
    hopnum: Option<HopNum>,
72
    /// The state of this circuit hop.
73
    hop: CircHopOutbound,
74
    /// The time provider.
75
    time_provider: DynTimeProvider,
76
    /// An identifier for logging about this reactor's circuit.
77
    unique_id: UniqId,
78
    /// Receiver for Tor stream data that need to be delivered to a Tor stream.
79
    ///
80
    /// The sender is in [`ForwardReactor`](super::ForwardReactor), which will forward all cells
81
    /// carrying Tor stream data to us.
82
    ///
83
    /// This serves a dual purpose:
84
    ///
85
    ///   * it enables the `ForwardReactor` to deliver Tor stream data received from the client
86
    ///   * it lets the `StreamReactor` know if the `ForwardReactor` has shut down:
87
    ///     we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
88
    ///     shuts down, we will get EOS upon calling `.next()`)
89
    cell_rx: mpsc::Receiver<StreamMsg>,
90
    /// Sender for sending Tor stream data to [`BackwardReactor`](super::BackwardReactor).
91
    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
92
    /// A handler for incoming streams.
93
    ///
94
    /// Set to `None` if incoming streams are not allowed on this circuit.
95
    ///
96
    /// This handler is shared with the [`HopMgr`](super::hop_mgr::HopMgr) of this reactor,
97
    /// which can install a new handler at runtime (for example, in response to a CtrlMsg).
98
    /// The ability to update the handler after the reactor is launched is needed
99
    /// for onion services, where the incoming stream request handler only gets installed
100
    /// after the virtual hop is created.
101
    #[cfg(any(feature = "hs-service", feature = "relay"))]
102
    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
103
    /// The circuit timeout estimator.
104
    ///
105
    /// Used for computing half-stream expiration.
106
    timeouts: Arc<dyn TimeoutEstimator>,
107
    /// Memory quota account
108
    memquota: CircuitAccount,
109
}
110

            
111
#[allow(unused)] // TODO(relay)
112
impl StreamReactor {
113
    /// Create a new [`StreamReactor`].
114
    #[allow(clippy::too_many_arguments)] // TODO
115
    pub(crate) fn new<R: Runtime>(
116
        runtime: R,
117
        hopnum: Option<HopNum>,
118
        hop: CircHopOutbound,
119
        unique_id: UniqId,
120
        cell_rx: mpsc::Receiver<StreamMsg>,
121
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
122
        timeouts: Arc<dyn TimeoutEstimator>,
123
        #[cfg(any(feature = "hs-service", feature = "relay"))] //
124
        incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
125
        memquota: CircuitAccount,
126
    ) -> Self {
127
        Self {
128
            hopnum,
129
            hop,
130
            time_provider: DynTimeProvider::new(runtime),
131
            unique_id,
132
            #[cfg(any(feature = "hs-service", feature = "relay"))]
133
            incoming,
134
            cell_rx,
135
            bwd_tx,
136
            timeouts,
137
            memquota,
138
        }
139
    }
140

            
141
    /// Helper for [`run`](Self::run).
142
    ///
143
    /// Polls the stream map for messages
144
    /// that need to be delivered to the other endpoint,
145
    /// and the `cells_rx` MPSC stream for stream messages received
146
    /// from the `ForwardReactor` that need to be delivered to the application streams.
147
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
148
        use postage::prelude::{Sink as _, Stream as _};
149

            
150
        // Garbage-collect all halfstreams that have expired.
151
        //
152
        // Note: this will iterate over the closed streams of this hop.
153
        // If we think this will cause perf issues, one idea would be to make
154
        // StreamMap::closed_streams into a min-heap, and add a branch to the
155
        // select_biased! below to sleep until the first expiry is due
156
        // (but my gut feeling is that iterating is cheaper)
157
        self.hop
158
            .stream_map()
159
            .lock()
160
            .expect("poisoned lock")
161
            .remove_expired_halfstreams(self.time_provider.now());
162

            
163
        let mut streams = Arc::clone(self.hop.stream_map());
164
        let can_send = self
165
            .hop
166
            .ccontrol()
167
            .lock()
168
            .expect("poisoned lock")
169
            .can_send();
170
        let mut ready_streams_fut = future::poll_fn(move |cx| {
171
            if !can_send {
172
                // We can't send anything on this hop that counts towards SENDME windows.
173
                //
174
                // Note: this does not block outgoing flow-control messages:
175
                //
176
                //   * circuit SENDMEs are initiated by the forward reactor,
177
                //     by sending a BackwardReactorCmd::SendRelayMsg to BWD,
178
                //   * stream SENDMEs will be initiated by StreamTarget::send_sendme(),
179
                //     by sending a a control message to the reactor
180
                //     (TODO(relay): not yet implemented)
181
                //   * XOFFs are sent in response to messages on streams
182
                //     (i.e. RELAY messages with non-zero stream IDs).
183
                //     These messages are delivered to us by the forward reactor
184
                //     inside BackwardReactorCmd::HandleMsg
185
                //   * XON will be initiated by StreamTarget::drain_rate_update(),
186
                //     by sending a control message to the reactor
187
                //     (TODO(relay): not yet implemented)\
188
                return Poll::Pending;
189
            }
190

            
191
            let mut streams = streams.lock().expect("lock poisoned");
192
            let Some((sid, msg)) = streams.poll_ready_streams_iter(cx).next() else {
193
                // No ready streams
194
                //
195
                // TODO(flushing): if there are no ready Tor streams, we might want to defer
196
                // flushing until stream data becomes available (or until a timeout elapses).
197
                // The deferred flushing approach should enable us to send
198
                // more than one message at a time to the channel reactor.
199
                return Poll::Pending;
200
            };
201

            
202
            if msg.is_none() {
203
                // This means the local sender has been dropped,
204
                // which presumably can only happen if an error occurs,
205
                // or if the Tor stream ends. In both cases, we're going to
206
                // want to send an END to the client to let them know,
207
                // and to remove the stream from the stream map.
208
                //
209
                // TODO(relay): the local sender part is not implemented yet
210
                return Poll::Ready(StreamEvent::Closed {
211
                    sid,
212
                    behav: CloseStreamBehavior::default(),
213
                    reason: streammap::TerminateReason::StreamTargetClosed,
214
                });
215
            };
216

            
217
            let msg = streams.take_ready_msg(sid).expect("msg disappeared");
218

            
219
            Poll::Ready(StreamEvent::ReadyMsg { sid, msg })
220
        });
221

            
222
        select_biased! {
223
            res = self.cell_rx.next().fuse() => {
224
                let Some(cmd) = res else {
225
                    // The forward reactor has shut down
226
                    return Err(ReactorError::Shutdown);
227
                };
228

            
229
                self.handle_reactor_cmd(cmd).await?;
230
            }
231
            event = ready_streams_fut.fuse() => {
232
                self.handle_stream_event(event).await?;
233
            }
234
        }
235

            
236
        Ok(())
237
    }
238

            
239
    /// Handle a stream message sent to us by the forward reactor.
240
    ///
241
    /// Delivers the message to its corresponding application stream.
242
    async fn handle_reactor_cmd(&mut self, msg: StreamMsg) -> StdResult<(), ReactorError> {
243
        let StreamMsg {
244
            sid,
245
            msg,
246
            cell_counts_toward_windows,
247
        } = msg;
248

            
249
        // We need to apply stream-level flow control *before* encoding the message.
250
        let msg = self.handle_msg(sid, msg, cell_counts_toward_windows)?;
251

            
252
        // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
253
        if let Some(msg) = msg {
254
            // We might be out of capacity entirely; see if we are about to hit a limit.
255
            //
256
            // TODO: If we ever add a notion of _recoverable_ errors below, we'll
257
            // need a way to restore this limit, and similarly for about_to_send().
258
            self.hop.decrement_cell_limit()?;
259

            
260
            let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
261

            
262
            // We need to apply stream-level flow control *before* encoding the message
263
            // (the BWD handles the encoding)
264
            if c_t_w {
265
                if let Some(stream_id) = msg.stream_id() {
266
                    self.hop
267
                        .about_to_send(self.unique_id, stream_id, msg.msg())?;
268
                }
269
            }
270

            
271
            // NOTE: on the client side, we call note_data_sent()
272
            // just before writing the cell to the channel.
273
            // We can't do that here, because we're not the ones
274
            // encoding the cell, so we don't have the SENDME tag
275
            // which is needed for note_data_sent().
276
            //
277
            // Instead, we notify the CC algorithm in the BWD,
278
            // right after we've finished sending the cell.
279

            
280
            self.send_msg_to_bwd(msg).await?;
281
        }
282

            
283
        Ok(())
284
    }
285

            
286
    /// Handle a RELAY message that has a non-zero stream ID.
287
    ///
288
    // TODO(relay): this is very similar to the client impl from
289
    // Circuit::handle_in_order_relay_msg()
290
    fn handle_msg(
291
        &mut self,
292
        streamid: StreamId,
293
        msg: UnparsedRelayMsg,
294
        cell_counts_toward_windows: bool,
295
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
296
        let cmd = msg.cmd();
297
        let possible_proto_violation_err = move |streamid: StreamId| {
298
            Error::StreamProto(format!(
299
                "Unexpected {cmd:?} message on unknown stream {streamid}"
300
            ))
301
        };
302
        let now = self.time_provider.now();
303

            
304
        // Check if any of our already-open streams want this message
305
        let res = self.hop.handle_msg(
306
            possible_proto_violation_err,
307
            cell_counts_toward_windows,
308
            streamid,
309
            msg,
310
            now,
311
        )?;
312

            
313
        // If it was an incoming stream request, we don't need to worry about
314
        // sending an XOFF as there's no stream data within this message.
315
        if let Some(msg) = res {
316
            cfg_if::cfg_if! {
317
                if #[cfg(any(feature = "hs-service", feature = "relay"))] {
318
                    return self.handle_incoming_stream_request(streamid, msg);
319
                } else {
320
                    return Err(
321
                        tor_error::internal!(
322
                            "incoming stream not rejected, but relay and hs-service features are disabled?!"
323
                            ).into()
324
                    );
325
                }
326
            }
327
        }
328

            
329
        // We may want to send an XOFF if the incoming buffer is too large.
330
        if let Some(cell) = self.hop.maybe_send_xoff(streamid)? {
331
            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
332
            return Ok(Some(cell));
333
        }
334

            
335
        Ok(None)
336
    }
337

            
338
    /// A helper for handling incoming stream requests.
339
    ///
340
    /// Accepts the specified incoming stream request,
341
    /// by adding a new entry to our stream map.
342
    ///
343
    /// Returns the cell we need to send back to the client,
344
    /// if an error occurred and the stream cannot be opened.
345
    ///
346
    /// Returns None if everything went well
347
    /// (the CONNECTED response only comes if the external
348
    /// consumer of our [Stream](futures::Stream) of incoming Tor streams
349
    /// is able to actually establish the connection to the address
350
    /// specified in the BEGIN).
351
    ///
352
    /// Any error returned from this function will shut down the reactor.
353
    #[cfg(any(feature = "hs-service", feature = "relay"))]
354
    fn handle_incoming_stream_request(
355
        &mut self,
356
        sid: StreamId,
357
        msg: UnparsedRelayMsg,
358
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
359
        let mut lock = self.incoming.lock().expect("poisoned lock");
360
        let Some(handler) = lock.as_mut() else {
361
            return Err(
362
                Error::CircProto("Cannot handle BEGIN cells on this circuit".into()).into(),
363
            );
364
        };
365

            
366
        if self.hopnum != handler.hop_num {
367
            let expected_hopnum = match handler.hop_num {
368
                Some(hopnum) => hopnum.display().to_string(),
369
                None => "client".to_string(),
370
            };
371

            
372
            let actual_hopnum = match self.hopnum {
373
                Some(hopnum) => hopnum.display().to_string(),
374
                None => "None".to_string(),
375
            };
376

            
377
            return Err(Error::CircProto(format!(
378
                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
379
                expected_hopnum,
380
                msg.cmd(),
381
                actual_hopnum,
382
            ))
383
            .into());
384
        }
385

            
386
        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
387

            
388
        if message_closes_stream {
389
            self.hop
390
                .stream_map()
391
                .lock()
392
                .expect("poisoned lock")
393
                .ending_msg_received(sid)?;
394

            
395
            return Ok(None);
396
        }
397

            
398
        let req = parse_incoming_stream_req(msg)?;
399
        let view = CircHopSyncView::new(&self.hop);
400

            
401
        if let Some(reject) = Self::should_reject_incoming(handler, sid, &req, &view)? {
402
            // We can't honor this request, so we bail by sending an END.
403
            return Ok(Some(reject));
404
        };
405

            
406
        let memquota =
407
            StreamAccount::new(&self.memquota).map_err(|e| ReactorError::Err(e.into()))?;
408

            
409
        let (sender, receiver) = stream_queue(
410
            #[cfg(not(feature = "flowctl-cc"))]
411
            crate::stream::STREAM_READER_BUFFER,
412
            &memquota,
413
            &self.time_provider,
414
        )
415
        .map_err(|e| ReactorError::Err(e.into()))?;
416

            
417
        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
418
            .new_mq(self.time_provider.clone(), memquota.as_raw_account())
419
            .map_err(|e| ReactorError::Err(e.into()))?;
420

            
421
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
422

            
423
        // A channel for the reactor to request a new drain rate from the reader.
424
        // Typically this notification will be sent after an XOFF is sent so that the reader can
425
        // send us a new drain rate when the stream data queue becomes empty.
426
        let mut drain_rate_request_tx = NotifySender::new_typed();
427
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
428

            
429
        let cmd_checker = InboundDataCmdChecker::new_connected();
430
        self.hop.add_ent_with_id(
431
            sender,
432
            msg_rx,
433
            rate_limit_tx,
434
            drain_rate_request_tx,
435
            sid,
436
            cmd_checker,
437
        )?;
438

            
439
        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
440
            req,
441
            stream_id: sid,
442
            hop: None,
443
            msg_tx,
444
            receiver,
445
            rate_limit_stream: rate_limit_rx,
446
            drain_rate_request_stream: drain_rate_request_rx,
447
            memquota,
448
            relay_cell_format: self.hop.relay_cell_format(),
449
        });
450

            
451
        log_ratelim!("Delivering message to incoming stream handler"; outcome);
452

            
453
        if let Err(e) = outcome {
454
            if e.is_full() {
455
                // The IncomingStreamRequestHandler's stream is full; it isn't
456
                // handling requests fast enough. So instead, we reply with an
457
                // END cell.
458
                let end_msg = AnyRelayMsgOuter::new(
459
                    Some(sid),
460
                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
461
                );
462

            
463
                return Ok(Some(end_msg));
464
            } else if e.is_disconnected() {
465
                // The IncomingStreamRequestHandler's stream has been dropped.
466
                // In the Tor protocol as it stands, this always means that the
467
                // circuit itself is out-of-use and should be closed.
468
                //
469
                // Note that we will _not_ reach this point immediately after
470
                // the IncomingStreamRequestHandler is dropped; we won't hit it
471
                // until we next get an incoming request.  Thus, if we later
472
                // want to add early detection for a dropped
473
                // IncomingStreamRequestHandler, we need to do it elsewhere, in
474
                // a different way.
475
                debug!(
476
                    circ_id = %self.unique_id,
477
                    "Incoming stream request receiver dropped",
478
                );
479
                // This will _cause_ the circuit to get closed.
480
                return Err(ReactorError::Err(Error::CircuitClosed));
481
            } else {
482
                // There are no errors like this with the current design of
483
                // futures::mpsc, but we shouldn't just ignore the possibility
484
                // that they'll be added later.
485
                return Err(
486
                    Error::from((into_internal!("try_send failed unexpectedly"))(e)).into(),
487
                );
488
            }
489
        }
490

            
491
        Ok(None)
492
    }
493

            
494
    /// Check if we should reject this incoming stream request or not.
495
    ///
496
    /// Returns a cell we need to send back to the client if we must reject the request,
497
    /// or `None` if we are allowed to accept it.
498
    ///`
499
    /// Any error returned from this function will shut down the reactor.
500
    #[cfg(any(feature = "hs-service", feature = "relay"))]
501
    fn should_reject_incoming<'a>(
502
        handler: &mut IncomingStreamRequestHandler,
503
        sid: StreamId,
504
        request: &IncomingStreamRequest,
505
        view: &CircHopSyncView<'a>,
506
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
507
        use IncomingStreamRequestDisposition::*;
508

            
509
        let ctx = IncomingStreamRequestContext { request };
510

            
511
        // Run the externally provided filter to check if we should
512
        // open the stream or not.
513
        match handler.filter.as_mut().disposition(&ctx, view)? {
514
            Accept => {
515
                // All is well, we can accept the stream request
516
                Ok(None)
517
            }
518
            CloseCircuit => Err(ReactorError::Shutdown),
519
            RejectRequest(end) => {
520
                let end_msg = AnyRelayMsgOuter::new(Some(sid), end.into());
521

            
522
                Ok(Some(end_msg))
523
            }
524
        }
525
    }
526

            
527
    /// Handle a [`StreamEvent`].
528
    async fn handle_stream_event(&mut self, event: StreamEvent) -> StdResult<(), ReactorError> {
529
        match event {
530
            StreamEvent::Closed { sid, behav, reason } => {
531
                let max_rtt = {
532
                    let mut ccontrol = self.hop.ccontrol().lock().expect("poisoned lock");
533

            
534
                    // Note: if we have no measurements for the RTT, this will be set to 0,
535
                    // and the timeout will be 2 * CBT.
536
                    ccontrol
537
                        .rtt()
538
                        .max_rtt_usec()
539
                        .map(|rtt| Duration::from_millis(u64::from(rtt)))
540
                        .unwrap_or_default()
541
                };
542

            
543
                // The length of the circuit up until the hop that has the half-streeam.
544
                //
545
                // +1, because HopNums are zero-based.
546
                //
547
                /// If we're an exit, the estimated circ_len is hard-coded to 3.
548
                /// TODO: But maybe we need a better way of estimating the circuit length here?...
549
                const FALLBACK_CIRC_HOP: usize = 2;
550
                let circ_len = self.hopnum.map(usize::from).unwrap_or(FALLBACK_CIRC_HOP) + 1;
551

            
552
                // We double the CBT to account for rend circuits,
553
                // which are twice as long (otherwise we risk expiring
554
                // the rend half-streams too soon).
555
                let timeout = std::cmp::max(max_rtt, 2 * self.estimate_cbt(circ_len));
556
                let expire_at = self.time_provider.now() + timeout;
557
                let res =
558
                    self.hop
559
                        .close_stream(self.unique_id, sid, None, behav, reason, expire_at)?;
560
                let Some(msg) = res else {
561
                    // We may not need to send anything at all...
562
                    return Ok(());
563
                };
564

            
565
                self.send_msg_to_bwd(msg.cell).await
566
            }
567
            StreamEvent::ReadyMsg { sid, msg } => {
568
                self.send_msg_to_bwd(AnyRelayMsgOuter::new(Some(sid), msg))
569
                    .await
570
            }
571
        }
572
    }
573

            
574
    /// Wrap `msg` in [`ReadyStreamMsg`], and send it to the backward reactor.
575
    async fn send_msg_to_bwd(&mut self, msg: AnyRelayMsgOuter) -> StdResult<(), ReactorError> {
576
        let msg = ReadyStreamMsg {
577
            hop: self.hopnum,
578
            relay_cell_format: self.hop.relay_cell_format(),
579
            ccontrol: Arc::clone(self.hop.ccontrol()),
580
            msg,
581
        };
582

            
583
        self.bwd_tx
584
            .send(msg)
585
            .await
586
            .map_err(|_| ReactorError::Shutdown)?;
587

            
588
        Ok(())
589
    }
590

            
591
    /// The estimated circuit build timeout for a circuit of the specified length.
592
    ///
593
    /// Note: this duplicates the client implementation
594
    fn estimate_cbt(&self, length: usize) -> Duration {
595
        self.timeouts.circuit_build_timeout(length)
596
    }
597
}
598

            
599
/// A Tor stream-related event.
600
enum StreamEvent {
601
    /// A stream was closed.
602
    ///
603
    /// It needs to be removed from the reactor's stream map.
604
    Closed {
605
        /// The ID of the stream to close.
606
        sid: StreamId,
607
        /// The stream-closing behavior.
608
        behav: CloseStreamBehavior,
609
        /// The reason for closing the stream.
610
        reason: streammap::TerminateReason,
611
    },
612
    /// A stream has a ready message.
613
    ReadyMsg {
614
        /// The ID of the stream to close.
615
        sid: StreamId,
616
        /// The message.
617
        msg: AnyRelayMsg,
618
    },
619
}
620

            
621
/// Convert an incoming stream request message (BEGIN, BEGIN_DIR, RESOLVE, etc.)
622
/// to an [`IncomingStreamRequest`]
623
#[cfg(any(feature = "hs-service", feature = "relay"))]
624
fn parse_incoming_stream_req(msg: UnparsedRelayMsg) -> crate::Result<IncomingStreamRequest> {
625
    // TODO(relay): support other stream-initiating messages, not just BEGIN
626
    let begin = msg
627
        .decode::<Begin>()
628
        .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
629
        .into_msg();
630

            
631
    Ok(IncomingStreamRequest::Begin(begin))
632
}
633

            
634
/// A stream message to be sent to the backward reactor for delivery.
635
pub(crate) struct ReadyStreamMsg {
636
    /// The hop number, or `None` if we are a relay.
637
    pub(crate) hop: Option<HopNum>,
638
    /// The message to send.
639
    pub(crate) msg: AnyRelayMsgOuter,
640
    /// The cell format used with the hop the message should be sent to.
641
    pub(crate) relay_cell_format: RelayCellFormat,
642
    /// The CC object to use.
643
    pub(crate) ccontrol: Arc<Mutex<CongestionControl>>,
644
}
645

            
646
/// Stream data received from the other endpoint
647
/// that needs to be handled by [`StreamReactor`].
648
pub(crate) struct StreamMsg {
649
    /// The ID of the stream this message is for.
650
    pub(crate) sid: StreamId,
651
    /// The message.
652
    pub(crate) msg: UnparsedRelayMsg,
653
    /// Whether the cell this message came from counts towards flow-control windows.
654
    pub(crate) cell_counts_toward_windows: bool,
655
}