1
//! Tor stream handling.
2
//!
3
//! A stream is an anonymized conversation; multiple streams can be
4
//! multiplexed over a single circuit.
5

            
6
pub(crate) mod cmdcheck;
7
pub(crate) mod flow_ctrl;
8
pub(crate) mod raw;
9

            
10
#[cfg(any(feature = "hs-service", feature = "relay"))]
11
pub(crate) mod incoming;
12

            
13
pub(crate) mod queue;
14

            
15
use futures::SinkExt as _;
16
use oneshot_fused_workaround as oneshot;
17
use postage::watch;
18
use safelog::sensitive;
19

            
20
use tor_async_utils::SinkCloseChannel as _;
21
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
22
use tor_cell::relaycell::msg::{AnyRelayMsg, End};
23
use tor_cell::relaycell::{RelayCellFormat, StreamId, UnparsedRelayMsg};
24
use tor_memquota::mq_queue::{self, MpscSpec};
25

            
26
use flow_ctrl::state::StreamRateLimit;
27

            
28
use crate::memquota::StreamAccount;
29
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
30
use crate::{ClientTunnel, Error, HopLocation, Result};
31

            
32
#[cfg(any(feature = "hs-service", feature = "relay"))]
33
pub use incoming::{
34
    IncomingStream, IncomingStreamRequest, IncomingStreamRequestContext,
35
    IncomingStreamRequestDisposition, IncomingStreamRequestFilter,
36
};
37

            
38
pub use raw::StreamReceiver;
39

            
40
#[cfg_attr(docsrs, doc(cfg(any(feature = "hs-service", feature = "relay"))))]
41
#[cfg(any(feature = "hs-service", feature = "relay"))]
42
pub(crate) use incoming::{InboundDataCmdChecker, IncomingCmdChecker, StreamReqInfo};
43

            
44
use std::pin::Pin;
45
use std::sync::Arc;
46

            
47
/// Initial value for outbound flow-control window on streams.
48
pub(crate) const SEND_WINDOW_INIT: u16 = 500;
49
/// Initial value for inbound flow-control window on streams.
50
pub(crate) const RECV_WINDOW_INIT: u16 = 500;
51

            
52
/// Size of the buffer used between the reactor and a `StreamReader`.
53
///
54
/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
55
///             get sent more than the receive window anyway!). We might do due to things that
56
///             don't count towards the window though.
57
pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
58

            
59
/// MPSC queue relating to a stream (either inbound or outbound), sender
60
pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
61
/// MPSC queue relating to a stream (either inbound or outbound), receiver
62
pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
63

            
64
/// A behavior to perform when closing a stream.
65
///
66
/// We don't use `Option<End>` here, since the behavior of `SendNothing` is so surprising
67
/// that we shouldn't let it pass unremarked.
68
#[derive(Clone, Debug)]
69
pub(crate) enum CloseStreamBehavior {
70
    /// Send nothing at all, so that the other side will not realize we have
71
    /// closed the stream.
72
    ///
73
    /// We should only do this for incoming onion service streams when we
74
    /// want to black-hole the client's requests.
75
    SendNothing,
76
    /// Send an End cell, if we haven't already sent one.
77
    SendEnd(End),
78
}
79

            
80
impl Default for CloseStreamBehavior {
81
58
    fn default() -> Self {
82
58
        Self::SendEnd(End::new_misc())
83
58
    }
84
}
85

            
86
/// A collection of components that can be combined to implement a Tor stream,
87
/// or anything that requires a stream ID.
88
///
89
/// Not all components may be needed, depending on the purpose of the "stream".
90
/// For example we build `RELAY_RESOLVE` requests like we do data streams,
91
/// but they won't use the `StreamTarget` as they don't need to send additional
92
/// messages.
93
#[derive(Debug)]
94
pub(crate) struct StreamComponents {
95
    /// A [`Stream`](futures::Stream) of incoming relay messages for this Tor stream.
96
    pub(crate) stream_receiver: StreamReceiver,
97
    /// A handle that can communicate messages to the circuit reactor for this stream.
98
    pub(crate) target: StreamTarget,
99
    /// The memquota [account](tor_memquota::Account) to use for data on this stream.
100
    pub(crate) memquota: StreamAccount,
101
    /// The control information needed to add XON/XOFF flow control to the stream.
102
    pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
103
}
104

            
105
/// Internal handle, used to implement a stream on a particular tunnel.
106
///
107
/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
108
/// the reader should additionally hold an `mpsc::Receiver` to get
109
/// relay messages for the stream.
110
///
111
/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
112
/// close the stream by sending an END message to the other side.
113
/// You can close a stream earlier by using [`StreamTarget::close`]
114
/// or [`StreamTarget::close_pending`].
115
#[derive(Clone, Debug)]
116
pub(crate) struct StreamTarget {
117
    /// Which hop of the circuit this stream is with.
118
    pub(crate) hop: Option<HopLocation>,
119
    /// Reactor ID for this stream.
120
    pub(crate) stream_id: StreamId,
121
    /// Encoding to use for relay cells sent on this stream.
122
    ///
123
    /// This is mostly irrelevant, except when deciding
124
    /// how many bytes we can pack in a DATA message.
125
    pub(crate) relay_cell_format: RelayCellFormat,
126
    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
127
    // TODO(arti#2068): we should consider making this an `Option`
128
    pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
129
    /// Channel to send cells down.
130
    pub(crate) tx: StreamMpscSender<AnyRelayMsg>,
131
    /// Reference to the tunnel that this stream is on.
132
    pub(crate) tunnel: Tunnel,
133
}
134

            
135
/// A client or relay tunnel.
136
#[derive(Debug, Clone, derive_more::From)]
137
pub(crate) enum Tunnel {
138
    /// A client tunnel.
139
    Client(Arc<ClientTunnel>),
140
    /// A relay tunnel.
141
    #[cfg(feature = "relay")]
142
    Relay(Arc<crate::relay::RelayCirc>),
143
}
144

            
145
impl StreamTarget {
146
    /// Deliver a relay message for the stream that owns this StreamTarget.
147
    ///
148
    /// The StreamTarget will set the correct stream ID and pick the
149
    /// right hop, but will not validate that the message is well-formed
150
    /// or meaningful in context.
151
6498
    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
152
4332
        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
153
4332
        Ok(())
154
4332
    }
155

            
156
    /// Close the pending stream that owns this StreamTarget, delivering the specified
157
    /// END message (if any)
158
    ///
159
    /// The stream is closed by sending a control message (`CtrlMsg::ClosePendingStream`)
160
    /// to the reactor.
161
    ///
162
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
163
    ///
164
    /// The StreamTarget will set the correct stream ID and pick the
165
    /// right hop, but will not validate that the message is well-formed
166
    /// or meaningful in context.
167
    ///
168
    /// Note that in many cases, the actual contents of an END message can leak unwanted
169
    /// information. Please consider carefully before sending anything but an
170
    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
171
    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
172
    ///
173
    /// In addition to sending the END message, this function also ensures
174
    /// the state of the stream map entry of this stream is updated
175
    /// accordingly.
176
    ///
177
    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
178
    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
179
    /// function is for closing pending incoming streams (a stream is said to be pending if we have
180
    /// received the message initiating the stream but have not responded to it yet).
181
    ///
182
    /// **NOTE**: This function should be called at most once per request.
183
    /// Calling it twice is an error.
184
    #[cfg(any(feature = "hs-service", feature = "relay"))]
185
16
    pub(crate) fn close_pending(
186
16
        &self,
187
16
        message: crate::stream::CloseStreamBehavior,
188
16
    ) -> Result<oneshot::Receiver<Result<()>>> {
189
16
        match &self.tunnel {
190
12
            Tunnel::Client(t) => {
191
                cfg_if::cfg_if! {
192
                    if #[cfg(feature = "hs-service")] {
193
12
                        t.close_pending(self.stream_id, self.hop, message)
194
                    } else {
195
                        Err(tor_error::internal!("close_pending() called on client stream?!").into())
196
                    }
197
                }
198
            }
199
            #[cfg(feature = "relay")]
200
4
            Tunnel::Relay(t) => t.close_pending(self.stream_id, message),
201
        }
202
16
    }
203

            
204
    /// Queue a "close" for the stream corresponding to this StreamTarget.
205
    ///
206
    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
207
    ///
208
    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
209
    /// on this `StreamTarget`` or any clone of it.
210
    /// The reactor *will* try to flush any already-send messages before it closes the stream.
211
    ///
212
    /// You don't need to call this method if the stream is closing because all of its StreamTargets
213
    /// have been dropped.
214
16
    pub(crate) fn close(&mut self) {
215
16
        Pin::new(&mut self.tx).close_channel();
216
16
    }
217

            
218
    /// Called when a circuit-level protocol error has occurred and the
219
    /// tunnel needs to shut down.
220
    pub(crate) fn protocol_error(&mut self) {
221
        match &self.tunnel {
222
            Tunnel::Client(t) => t.terminate(),
223
            #[cfg(feature = "relay")]
224
            Tunnel::Relay(t) => t.terminate(),
225
        }
226
    }
227

            
228
    /// Request to send a SENDME cell for this stream.
229
    ///
230
    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
231
    /// block or wait for a response from the circuit reactor.
232
    /// An error is only returned if we are unable to send the request.
233
    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
234
    /// this here and an error will not be returned.
235
    pub(crate) fn send_sendme(&mut self) -> Result<()> {
236
        match &self.tunnel {
237
            Tunnel::Client(t) => t.send_sendme(self.stream_id, self.hop),
238
            #[cfg(feature = "relay")]
239
            Tunnel::Relay(t) => t.send_sendme(self.stream_id),
240
        }
241
    }
242

            
243
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
244
    ///
245
    /// Typically the circuit reactor would send this new rate in an XON message to the other end of
246
    /// the stream.
247
    /// But it may decide not to, and may discard this update.
248
    /// For example the stream may have a large amount of buffered data, and the reactor may not
249
    /// want to send an XON while the buffer is large.
250
    ///
251
    /// This sends a message to inform the circuit reactor of the new drain rate,
252
    /// but it does not block or wait for a response from the reactor.
253
    /// An error is only returned if we are unable to send the update.
254
    pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
255
        match &mut self.tunnel {
256
            Tunnel::Client(t) => t.drain_rate_update(self.stream_id, self.hop, rate),
257
            #[cfg(feature = "relay")]
258
            Tunnel::Relay(t) => t.drain_rate_update(self.stream_id, rate),
259
        }
260
    }
261

            
262
    /// Return a reference to the tunnel that this `StreamTarget` is using.
263
    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
264
124
    pub(crate) fn tunnel(&self) -> &Tunnel {
265
124
        &self.tunnel
266
124
    }
267

            
268
    /// Return the kind of relay cell in use on this `StreamTarget`.
269
124
    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
270
124
        self.relay_cell_format
271
124
    }
272

            
273
    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
274
124
    pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
275
124
        &self.rate_limit_stream
276
124
    }
277
}
278

            
279
/// Return the stream ID of `msg`, if it has one.
280
///
281
/// Returns `Ok(None)` if `msg` is a meta cell.
282
540
pub(crate) fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
283
540
    let cmd = msg.cmd();
284
540
    let streamid = msg.stream_id();
285
540
    if !cmd.accepts_streamid_val(streamid) {
286
4
        return Err(Error::CircProto(format!(
287
4
            "Invalid stream ID {} for relay command {}",
288
4
            sensitive(StreamId::get_or_zero(streamid)),
289
4
            msg.cmd()
290
4
        )));
291
536
    }
292

            
293
536
    Ok(streamid)
294
540
}