1
//! The stream reactor.
2

            
3
use crate::circuit::circhop::CircHopOutbound;
4
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
5
use crate::circuit::{CircHopSyncView, UniqId};
6
use crate::congestion::{CongestionControl, sendme};
7
use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
8
use crate::stream::CloseStreamBehavior;
9
use crate::stream::cmdcheck::StreamStatus;
10
use crate::streammap;
11
use crate::util::err::ReactorError;
12
use crate::{Error, HopNum};
13

            
14
#[cfg(any(feature = "hs-service", feature = "relay"))]
15
use crate::stream::incoming::{
16
    InboundDataCmdChecker, IncomingStreamRequest, IncomingStreamRequestContext,
17
    IncomingStreamRequestDisposition, IncomingStreamRequestHandler, StreamReqInfo,
18
};
19

            
20
use tor_async_utils::{SinkTrySend as _, SinkTrySendError as _};
21
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, End, EndReason};
22
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
23
use tor_error::into_internal;
24
use tor_log_ratelim::log_ratelim;
25
use tor_rtcompat::{DynTimeProvider, Runtime, SleepProvider as _};
26

            
27
use derive_deftly::Deftly;
28
use futures::SinkExt;
29
use futures::channel::mpsc;
30
use futures::{FutureExt as _, StreamExt as _, future, select_biased};
31
use tracing::debug;
32

            
33
use std::pin::Pin;
34
use std::result::Result as StdResult;
35
use std::sync::{Arc, Mutex};
36
use std::task::Poll;
37
use std::time::Duration;
38

            
39
/// Trait for customizing the behavior of the stream reactor.
40
///
41
/// Used for plugging in the implementation-dependent (client vs relay)
42
/// parts of the implementation into the generic one.
43
pub(crate) trait StreamHandler: Send + Sync + 'static {
44
    /// Return the amount of time a newly closed stream
45
    /// should be kept in the stream map for.
46
    ///
47
    /// This is the amount of time we are willing to wait for
48
    /// an END ack before removing the half-stream from the map.
49
    fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration;
50
}
51

            
52
/// The stream reactor for a given hop.
53
///
54
/// Drives the application streams.
55
///
56
/// This reactor accepts [`CtrlMsg`]s from the forward reactor over its [`Self::cell_rx`]
57
/// MPSC channel, and delivers them to the corresponding stream entries in the stream map.
58
///
59
/// The local streams are polled from the main loop, and any ready messages are sent
60
/// to the backward reactor over the `bwd_tx` MPSC channel for packaging and delivery.
61
///
62
/// Shuts downs down if an error occurs, or if the sending end
63
/// of the `cell_rx` MPSC channel, i.e. the forward reactor, closes.
64
#[derive(Deftly)]
65
#[derive_deftly(CircuitReactor)]
66
#[deftly(reactor_name = "stream reactor")]
67
#[deftly(run_inner_fn = "Self::run_once")]
68
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
69
pub(crate) struct StreamReactor {
70
    /// The hop this stream reactor is for.
71
    ///
72
    /// This is `None` for relays.
73
    hopnum: Option<HopNum>,
74
    /// The state of this circuit hop.
75
    hop: CircHopOutbound,
76
    /// The time provider.
77
    time_provider: DynTimeProvider,
78
    /// An identifier for logging about this reactor's circuit.
79
    unique_id: UniqId,
80
    /// Receiver for Tor stream data that need to be delivered to a Tor stream.
81
    ///
82
    /// The sender is in the [`HopMgr`](super::hop_mgr::HopMgr) of the
83
    /// [`ForwardReactor`](super::ForwardReactor), which will forward all cells
84
    /// carrying Tor stream data to us.
85
    ///
86
    /// This serves a dual purpose:
87
    ///
88
    ///   * it enables the `ForwardReactor` to deliver Tor stream data received from the client
89
    ///   * it lets the `StreamReactor` know if the `ForwardReactor` has shut down:
90
    ///     we select! on this MPSC channel in the main loop, so if the `ForwardReactor`
91
    ///     shuts down, we will get EOS upon calling `.next()`)
92
    cell_rx: mpsc::Receiver<CtrlMsg>,
93
    /// Sender for sending Tor stream data to [`BackwardReactor`](super::BackwardReactor).
94
    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
95
    /// A handler for incoming streams.
96
    ///
97
    /// Set to `None` if incoming streams are not allowed on this circuit.
98
    ///
99
    /// This handler is shared with the [`HopMgr`](super::hop_mgr::HopMgr) of this reactor,
100
    /// which can install a new handler at runtime (for example, in response to a CtrlMsg).
101
    /// The ability to update the handler after the reactor is launched is needed
102
    /// for onion services, where the incoming stream request handler only gets installed
103
    /// after the virtual hop is created.
104
    #[cfg(any(feature = "hs-service", feature = "relay"))]
105
    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
106
    /// A handler for customizing the stream reactor behavior.
107
    inner: Arc<dyn StreamHandler>,
108
    /// Memory quota account
109
    memquota: CircuitAccount,
110
}
111

            
112
#[allow(unused)] // TODO(relay)
113
impl StreamReactor {
114
    /// Create a new [`StreamReactor`].
115
    #[allow(clippy::too_many_arguments)] // TODO
116
8
    pub(crate) fn new<R: Runtime>(
117
8
        runtime: R,
118
8
        hopnum: Option<HopNum>,
119
8
        hop: CircHopOutbound,
120
8
        unique_id: UniqId,
121
8
        cell_rx: mpsc::Receiver<CtrlMsg>,
122
8
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
123
8
        inner: Arc<dyn StreamHandler>,
124
8
        #[cfg(any(feature = "hs-service", feature = "relay"))] //
125
8
        incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
126
8
        memquota: CircuitAccount,
127
8
    ) -> Self {
128
8
        Self {
129
8
            hopnum,
130
8
            hop,
131
8
            time_provider: DynTimeProvider::new(runtime),
132
8
            unique_id,
133
8
            #[cfg(any(feature = "hs-service", feature = "relay"))]
134
8
            incoming,
135
8
            cell_rx,
136
8
            bwd_tx,
137
8
            inner,
138
8
            memquota,
139
8
        }
140
8
    }
141

            
142
    /// Helper for [`run`](Self::run).
143
    ///
144
    /// Polls the stream map for messages
145
    /// that need to be delivered to the other endpoint,
146
    /// and the `cells_rx` MPSC stream for stream messages received
147
    /// from the `ForwardReactor` that need to be delivered to the application streams.
148
48
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
149
        use postage::prelude::{Sink as _, Stream as _};
150

            
151
        // Garbage-collect all halfstreams that have expired.
152
        //
153
        // Note: this will iterate over the closed streams of this hop.
154
        // If we think this will cause perf issues, one idea would be to make
155
        // StreamMap::closed_streams into a min-heap, and add a branch to the
156
        // select_biased! below to sleep until the first expiry is due
157
        // (but my gut feeling is that iterating is cheaper)
158
32
        self.hop
159
32
            .stream_map()
160
32
            .lock()
161
32
            .expect("poisoned lock")
162
32
            .remove_expired_halfstreams(self.time_provider.now());
163

            
164
32
        let mut streams = Arc::clone(self.hop.stream_map());
165
32
        let can_send = self
166
32
            .hop
167
32
            .ccontrol()
168
32
            .lock()
169
32
            .expect("poisoned lock")
170
32
            .can_send();
171
32
        let mut ready_streams_fut = future::poll_fn(move |cx| {
172
24
            if !can_send {
173
                // We can't send anything on this hop that counts towards SENDME windows.
174
                //
175
                // Note: this does not block outgoing flow-control messages:
176
                //
177
                //   * circuit SENDMEs are initiated by the forward reactor,
178
                //     by sending a BackwardReactorCmd::SendRelayMsg to BWD,
179
                //   * stream SENDMEs will be initiated by StreamTarget::send_sendme(),
180
                //     by sending a control message to the reactor
181
                //     (TODO(relay): not yet implemented)
182
                //   * XOFFs are sent in response to messages on streams
183
                //     (i.e. RELAY messages with non-zero stream IDs).
184
                //     These messages are delivered to us by the forward reactor
185
                //     inside BackwardReactorCmd::HandleMsg
186
                //   * XON will be initiated by StreamTarget::drain_rate_update(),
187
                //     by sending a control message to the reactor
188
                //     (TODO(relay): not yet implemented)\
189
                return Poll::Pending;
190
24
            }
191

            
192
24
            let mut streams = streams.lock().expect("lock poisoned");
193
24
            let Some((sid, msg)) = streams.poll_ready_streams_iter(cx).next() else {
194
                // No ready streams
195
                //
196
                // TODO(flushing): if there are no ready Tor streams, we might want to defer
197
                // flushing until stream data becomes available (or until a timeout elapses).
198
                // The deferred flushing approach should enable us to send
199
                // more than one message at a time to the channel reactor.
200
16
                return Poll::Pending;
201
            };
202

            
203
8
            if msg.is_none() {
204
                // This means the local sender has been dropped,
205
                // which presumably can only happen if an error occurs,
206
                // or if the Tor stream ends. In both cases, we're going to
207
                // want to send an END to the client to let them know,
208
                // and to remove the stream from the stream map.
209
                //
210
                // TODO(relay): the local sender part is not implemented yet
211
4
                return Poll::Ready(StreamEvent::ApplicationStreamClosed(sid));
212
4
            };
213

            
214
4
            let msg = streams.take_ready_msg(sid).expect("msg disappeared");
215

            
216
4
            Poll::Ready(StreamEvent::ReadyMsg { sid, msg })
217
24
        });
218

            
219
32
        select_biased! {
220
32
            res = self.cell_rx.next().fuse() => {
221
24
                let Some(cmd) = res else {
222
                    // The forward reactor has shut down
223
4
                    return Err(ReactorError::Shutdown);
224
                };
225

            
226
20
                self.handle_reactor_cmd(cmd).await?;
227
            }
228
32
            event = ready_streams_fut.fuse() => {
229
8
                self.handle_stream_event(event).await?;
230
            }
231
        }
232

            
233
24
        Ok(())
234
32
    }
235

            
236
    /// Handle a stream message sent to us by the forward reactor.
237
    ///
238
    /// Delivers the message to its corresponding application stream.
239
30
    async fn handle_reactor_cmd(&mut self, msg: CtrlMsg) -> StdResult<(), ReactorError> {
240
20
        match msg {
241
            CtrlMsg::DeliverStreamMsg {
242
16
                sid,
243
16
                msg,
244
16
                cell_counts_toward_windows,
245
            } => {
246
16
                self.deliver_message_to_stream(sid, msg, cell_counts_toward_windows)
247
16
                    .await
248
            }
249
            #[cfg(any(feature = "hs-service", feature = "relay"))]
250
4
            CtrlMsg::ClosePendingStream { stream_id, behav } => {
251
4
                self.close_stream(stream_id, behav, streammap::TerminateReason::ExplicitEnd)
252
4
                    .await
253
            }
254
        }
255
20
    }
256

            
257
    /// Deliver `msg` to the specified stream
258
16
    async fn deliver_message_to_stream(
259
16
        &mut self,
260
16
        sid: StreamId,
261
16
        msg: UnparsedRelayMsg,
262
16
        cell_counts_toward_windows: bool,
263
24
    ) -> StdResult<(), ReactorError> {
264
        // We need to apply stream-level flow control *before* encoding the message.
265
        // May optionally return a message that needs to be sent back to the client.
266
16
        let bwd_msg = self.handle_msg(sid, msg, cell_counts_toward_windows)?;
267

            
268
        // TODO(DEDUP): this contains parts of Circuit::send_relay_cell_inner()
269
12
        if let Some(bwd_msg) = bwd_msg {
270
            // We might be out of capacity entirely; see if we are about to hit a limit.
271
            //
272
            // TODO: If we ever add a notion of _recoverable_ errors below, we'll
273
            // need a way to restore this limit, and similarly for about_to_send().
274
            self.hop.decrement_cell_limit()?;
275

            
276
            let c_t_w = sendme::cmd_counts_towards_windows(bwd_msg.cmd());
277

            
278
            // We need to apply stream-level flow control *before* encoding the message
279
            // (the BWD handles the encoding)
280
            if c_t_w {
281
                if let Some(stream_id) = bwd_msg.stream_id() {
282
                    self.hop
283
                        .about_to_send(self.unique_id, stream_id, bwd_msg.msg())?;
284
                }
285
            }
286

            
287
            // NOTE: on the client side, we call note_data_sent()
288
            // just before writing the cell to the channel.
289
            // We can't do that here, because we're not the ones
290
            // encoding the cell, so we don't have the SENDME tag
291
            // which is needed for note_data_sent().
292
            //
293
            // Instead, we notify the CC algorithm in the BWD,
294
            // right after we've finished sending the cell.
295

            
296
            self.send_msg_to_bwd(bwd_msg).await?;
297
12
        }
298

            
299
12
        Ok(())
300
16
    }
301

            
302
    /// Handle a RELAY message that has a non-zero stream ID.
303
    ///
304
    /// A returned message is one that we need to send back to the client.
305
    //
306
    // TODO(relay): this is very similar to the client impl from
307
    // Circuit::handle_in_order_relay_msg()
308
16
    fn handle_msg(
309
16
        &mut self,
310
16
        streamid: StreamId,
311
16
        msg: UnparsedRelayMsg,
312
16
        cell_counts_toward_windows: bool,
313
16
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
314
16
        let cmd = msg.cmd();
315
18
        let possible_proto_violation_err = move |streamid: StreamId| {
316
4
            Error::StreamProto(format!(
317
4
                "Unexpected {cmd:?} message on unknown stream {streamid}"
318
4
            ))
319
4
        };
320
16
        let now = self.time_provider.now();
321

            
322
        // Check if any of our already-open streams want this message
323
16
        let res = self.hop.handle_msg(
324
16
            possible_proto_violation_err,
325
16
            cell_counts_toward_windows,
326
16
            streamid,
327
16
            msg,
328
16
            now,
329
4
        )?;
330

            
331
        // If it was an incoming stream request, we don't need to worry about
332
        // sending an XOFF as there's no stream data within this message.
333
12
        if let Some(msg) = res {
334
            cfg_if::cfg_if! {
335
                if #[cfg(any(feature = "hs-service", feature = "relay"))] {
336
8
                    return self.handle_incoming_stream_request(streamid, msg);
337
                } else {
338
                    return Err(
339
                        tor_error::internal!(
340
                            "incoming stream not rejected, but relay and hs-service features are disabled?!"
341
                            ).into()
342
                    );
343
                }
344
            }
345
4
        }
346

            
347
        // We may want to send an XOFF if the incoming buffer is too large.
348
4
        if let Some(cell) = self.hop.maybe_send_xoff(streamid)? {
349
            let cell = AnyRelayMsgOuter::new(Some(streamid), cell.into());
350
            return Ok(Some(cell));
351
4
        }
352

            
353
4
        Ok(None)
354
16
    }
355

            
356
    /// A helper for handling incoming stream requests.
357
    ///
358
    /// Accepts the specified incoming stream request,
359
    /// by adding a new entry to our stream map.
360
    ///
361
    /// Returns the cell we need to send back to the client,
362
    /// if an error occurred and the stream cannot be opened.
363
    ///
364
    /// Returns None if everything went well
365
    /// (the CONNECTED response only comes if the external
366
    /// consumer of our [Stream](futures::Stream) of incoming Tor streams
367
    /// is able to actually establish the connection to the address
368
    /// specified in the BEGIN).
369
    ///
370
    /// Any error returned from this function will shut down the reactor.
371
    #[cfg(any(feature = "hs-service", feature = "relay"))]
372
8
    fn handle_incoming_stream_request(
373
8
        &mut self,
374
8
        sid: StreamId,
375
8
        msg: UnparsedRelayMsg,
376
8
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
377
8
        let mut lock = self.incoming.lock().expect("poisoned lock");
378
8
        let Some(handler) = lock.as_mut() else {
379
            return Err(
380
                Error::CircProto("Cannot handle BEGIN cells on this circuit".into()).into(),
381
            );
382
        };
383

            
384
8
        if self.hopnum != handler.hop_num {
385
            let expected_hopnum = match handler.hop_num {
386
                Some(hopnum) => hopnum.display().to_string(),
387
                None => "client".to_string(),
388
            };
389

            
390
            let actual_hopnum = match self.hopnum {
391
                Some(hopnum) => hopnum.display().to_string(),
392
                None => "None".to_string(),
393
            };
394

            
395
            return Err(Error::CircProto(format!(
396
                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
397
                expected_hopnum,
398
                msg.cmd(),
399
                actual_hopnum,
400
            ))
401
            .into());
402
8
        }
403

            
404
8
        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
405

            
406
8
        if message_closes_stream {
407
            self.hop
408
                .stream_map()
409
                .lock()
410
                .expect("poisoned lock")
411
                .ending_msg_received(sid)?;
412

            
413
            return Ok(None);
414
8
        }
415

            
416
8
        let req = parse_incoming_stream_req(msg)?;
417
8
        let view = CircHopSyncView::new(&self.hop);
418

            
419
8
        if let Some(reject) = Self::should_reject_incoming(handler, sid, &req, &view)? {
420
            // We can't honor this request, so we bail by sending an END.
421
            return Ok(Some(reject));
422
8
        };
423

            
424
8
        let memquota =
425
8
            StreamAccount::new(&self.memquota).map_err(|e| ReactorError::Err(e.into()))?;
426

            
427
8
        let cmd_checker = InboundDataCmdChecker::new_connected();
428
8
        let stream_components =
429
8
            self.hop
430
8
                .add_ent_with_id(&self.time_provider, sid, cmd_checker, &memquota)?;
431

            
432
8
        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
433
8
            req,
434
8
            stream_id: sid,
435
8
            hop: None,
436
8
            stream_components,
437
8
            memquota,
438
8
            relay_cell_format: self.hop.relay_cell_format(),
439
8
        });
440

            
441
8
        log_ratelim!("Delivering message to incoming stream handler"; outcome);
442

            
443
8
        if let Err(e) = outcome {
444
            if e.is_full() {
445
                // The IncomingStreamRequestHandler's stream is full; it isn't
446
                // handling requests fast enough. So instead, we reply with an
447
                // END cell.
448
                let end_msg = AnyRelayMsgOuter::new(
449
                    Some(sid),
450
                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
451
                );
452

            
453
                return Ok(Some(end_msg));
454
            } else if e.is_disconnected() {
455
                // The IncomingStreamRequestHandler's stream has been dropped.
456
                // In the Tor protocol as it stands, this always means that the
457
                // circuit itself is out-of-use and should be closed.
458
                //
459
                // Note that we will _not_ reach this point immediately after
460
                // the IncomingStreamRequestHandler is dropped; we won't hit it
461
                // until we next get an incoming request.  Thus, if we later
462
                // want to add early detection for a dropped
463
                // IncomingStreamRequestHandler, we need to do it elsewhere, in
464
                // a different way.
465
                debug!(
466
                    circ_id = %self.unique_id,
467
                    "Incoming stream request receiver dropped",
468
                );
469
                // This will _cause_ the circuit to get closed.
470
                return Err(ReactorError::Err(Error::CircuitClosed));
471
            } else {
472
                // There are no errors like this with the current design of
473
                // futures::mpsc, but we shouldn't just ignore the possibility
474
                // that they'll be added later.
475
                return Err(
476
                    Error::from((into_internal!("try_send failed unexpectedly"))(e)).into(),
477
                );
478
            }
479
8
        }
480

            
481
8
        Ok(None)
482
8
    }
483

            
484
    /// Check if we should reject this incoming stream request or not.
485
    ///
486
    /// Returns a cell we need to send back to the client if we must reject the request,
487
    /// or `None` if we are allowed to accept it.
488
    ///`
489
    /// Any error returned from this function will shut down the reactor.
490
    #[cfg(any(feature = "hs-service", feature = "relay"))]
491
8
    fn should_reject_incoming<'a>(
492
8
        handler: &mut IncomingStreamRequestHandler,
493
8
        sid: StreamId,
494
8
        request: &IncomingStreamRequest,
495
8
        view: &CircHopSyncView<'a>,
496
8
    ) -> StdResult<Option<AnyRelayMsgOuter>, ReactorError> {
497
        use IncomingStreamRequestDisposition::*;
498

            
499
8
        let ctx = IncomingStreamRequestContext { request };
500

            
501
        // Run the externally provided filter to check if we should
502
        // open the stream or not.
503
8
        match handler.filter.as_mut().disposition(&ctx, view)? {
504
            Accept => {
505
                // All is well, we can accept the stream request
506
8
                Ok(None)
507
            }
508
            CloseCircuit => Err(ReactorError::Shutdown),
509
            RejectRequest(end) => {
510
                let end_msg = AnyRelayMsgOuter::new(Some(sid), end.into());
511

            
512
                Ok(Some(end_msg))
513
            }
514
        }
515
8
    }
516

            
517
    /// Handle a [`StreamEvent`].
518
12
    async fn handle_stream_event(&mut self, event: StreamEvent) -> StdResult<(), ReactorError> {
519
8
        match event {
520
4
            StreamEvent::ApplicationStreamClosed(sid) => {
521
4
                self.close_stream(
522
4
                    sid,
523
4
                    CloseStreamBehavior::default(),
524
4
                    streammap::TerminateReason::StreamTargetClosed,
525
4
                )
526
4
                .await
527
            }
528
4
            StreamEvent::ReadyMsg { sid, msg } => {
529
4
                self.send_msg_to_bwd(AnyRelayMsgOuter::new(Some(sid), msg))
530
4
                    .await
531
            }
532
        }
533
8
    }
534

            
535
    /// Close the stream that has the specified `sid`.
536
    ///
537
    /// The `behav` controls whether an `END` will be sent or not.
538
    ///
539
    /// This calls [`CircHopOutbound::close_stream`] under the hood,
540
    /// which removes the stream from the stream map,
541
    /// and returns an optional `END` cell to send back to the other party.
542
8
    async fn close_stream(
543
8
        &mut self,
544
8
        sid: StreamId,
545
8
        behav: CloseStreamBehavior,
546
8
        reason: streammap::TerminateReason,
547
12
    ) -> StdResult<(), ReactorError> {
548
8
        let timeout = self.inner.halfstream_expiry(&self.hop);
549
8
        let expire_at = self.time_provider.now() + timeout;
550
8
        let res = self
551
8
            .hop
552
8
            .close_stream(self.unique_id, sid, None, behav, reason, expire_at)?;
553
8
        let Some(msg) = res else {
554
            // We may not need to send anything at all...
555
            return Ok(());
556
        };
557

            
558
8
        self.send_msg_to_bwd(msg.cell).await
559
8
    }
560

            
561
    /// Wrap `msg` in [`ReadyStreamMsg`], and send it to the backward reactor.
562
18
    async fn send_msg_to_bwd(&mut self, msg: AnyRelayMsgOuter) -> StdResult<(), ReactorError> {
563
12
        let msg = ReadyStreamMsg {
564
12
            hop: self.hopnum,
565
12
            relay_cell_format: self.hop.relay_cell_format(),
566
12
            ccontrol: Arc::clone(self.hop.ccontrol()),
567
12
            msg,
568
12
        };
569

            
570
12
        self.bwd_tx
571
12
            .send(msg)
572
12
            .await
573
12
            .map_err(|_| ReactorError::Shutdown)?;
574

            
575
12
        Ok(())
576
12
    }
577
}
578

            
579
/// A Tor stream-related event.
580
enum StreamEvent {
581
    /// An application stream was closed.
582
    ///
583
    /// The corresponding entry needs to be removed from the reactor's stream map.
584
    ApplicationStreamClosed(StreamId),
585
    /// A stream has a ready message.
586
    ReadyMsg {
587
        /// The ID of the stream to close.
588
        sid: StreamId,
589
        /// The message.
590
        msg: AnyRelayMsg,
591
    },
592
}
593

            
594
/// Convert an incoming stream request message (BEGIN, BEGIN_DIR, RESOLVE, etc.)
595
/// to an [`IncomingStreamRequest`]
596
#[cfg(any(feature = "hs-service", feature = "relay"))]
597
8
fn parse_incoming_stream_req(msg: UnparsedRelayMsg) -> crate::Result<IncomingStreamRequest> {
598
    // TODO(relay): support other stream-initiating messages, not just BEGIN
599
8
    let begin = msg
600
8
        .decode::<Begin>()
601
8
        .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
602
8
        .into_msg();
603

            
604
8
    Ok(IncomingStreamRequest::Begin(begin))
605
8
}
606

            
607
/// A stream message to be sent to the backward reactor for delivery.
608
pub(crate) struct ReadyStreamMsg {
609
    /// The hop number, or `None` if we are a relay.
610
    pub(crate) hop: Option<HopNum>,
611
    /// The message to send.
612
    pub(crate) msg: AnyRelayMsgOuter,
613
    /// The cell format used with the hop the message should be sent to.
614
    pub(crate) relay_cell_format: RelayCellFormat,
615
    /// The CC object to use.
616
    pub(crate) ccontrol: Arc<Mutex<CongestionControl>>,
617
}
618

            
619
/// A control message
620
/// that needs to be handled by [`StreamReactor`].
621
pub(crate) enum CtrlMsg {
622
    /// Stream data received from the other endpoint
623
    /// that needs to be delivered to a Tor stream
624
    DeliverStreamMsg {
625
        /// The ID of the stream this message is for.
626
        sid: StreamId,
627
        /// The message.
628
        msg: UnparsedRelayMsg,
629
        /// Whether the cell this message came from counts towards flow-control windows.
630
        cell_counts_toward_windows: bool,
631
    },
632

            
633
    /// Close the specified pending incoming stream, sending the provided END message.
634
    #[cfg(any(feature = "hs-service", feature = "relay"))]
635
    ClosePendingStream {
636
        /// The stream ID to send the END for.
637
        stream_id: StreamId,
638
        /// The END message to send, if any.
639
        behav: CloseStreamBehavior,
640
    },
641
}