1
//! Tor stream handling.
2
//!
3
//! A stream is an anonymized conversation; multiple streams can be
4
//! multiplexed over a single circuit.
5

            
6
pub(crate) mod cmdcheck;
7
pub(crate) mod flow_ctrl;
8
pub(crate) mod raw;
9

            
10
#[cfg(any(feature = "hs-service", feature = "relay"))]
11
pub(crate) mod incoming;
12

            
13
pub(crate) mod queue;
14

            
15
use futures::SinkExt as _;
16
use oneshot_fused_workaround as oneshot;
17
use postage::watch;
18
use safelog::sensitive;
19

            
20
use tor_async_utils::SinkCloseChannel as _;
21
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
22
use tor_cell::relaycell::msg::{AnyRelayMsg, End};
23
use tor_cell::relaycell::{RelayCellFormat, StreamId, UnparsedRelayMsg};
24
use tor_memquota::mq_queue::{self, MpscSpec};
25

            
26
use flow_ctrl::state::StreamRateLimit;
27

            
28
use crate::memquota::StreamAccount;
29
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
30
use crate::stream::raw::StreamReceiver;
31
use crate::{ClientTunnel, Error, HopLocation, Result};
32

            
33
use std::pin::Pin;
34
use std::sync::Arc;
35

            
36
/// Initial value for outbound flow-control window on streams.
37
pub(crate) const SEND_WINDOW_INIT: u16 = 500;
38
/// Initial value for inbound flow-control window on streams.
39
pub(crate) const RECV_WINDOW_INIT: u16 = 500;
40

            
41
/// Size of the buffer used between the reactor and a `StreamReader`.
42
///
43
/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
44
///             get sent more than the receive window anyway!). We might do due to things that
45
///             don't count towards the window though.
46
pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
47

            
48
/// MPSC queue relating to a stream (either inbound or outbound), sender
49
pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
50
/// MPSC queue relating to a stream (either inbound or outbound), receiver
51
pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
52

            
53
/// A behavior to perform when closing a stream.
54
///
55
/// We don't use `Option<End>` here, since the behavior of `SendNothing` is so surprising
56
/// that we shouldn't let it pass unremarked.
57
#[derive(Clone, Debug)]
58
pub(crate) enum CloseStreamBehavior {
59
    /// Send nothing at all, so that the other side will not realize we have
60
    /// closed the stream.
61
    ///
62
    /// We should only do this for incoming onion service streams when we
63
    /// want to black-hole the client's requests.
64
    SendNothing,
65
    /// Send an End cell, if we haven't already sent one.
66
    SendEnd(End),
67
}
68

            
69
impl Default for CloseStreamBehavior {
70
54
    fn default() -> Self {
71
54
        Self::SendEnd(End::new_misc())
72
54
    }
73
}
74

            
75
/// A collection of components that can be combined to implement a Tor stream,
76
/// or anything that requires a stream ID.
77
///
78
/// Not all components may be needed, depending on the purpose of the "stream".
79
/// For example we build `RELAY_RESOLVE` requests like we do data streams,
80
/// but they won't use the `StreamTarget` as they don't need to send additional
81
/// messages.
82
#[derive(Debug)]
83
pub(crate) struct StreamComponents {
84
    /// A [`Stream`](futures::Stream) of incoming relay messages for this Tor stream.
85
    pub(crate) stream_receiver: StreamReceiver,
86
    /// A handle that can communicate messages to the circuit reactor for this stream.
87
    pub(crate) target: StreamTarget,
88
    /// The memquota [account](tor_memquota::Account) to use for data on this stream.
89
    pub(crate) memquota: StreamAccount,
90
    /// The control information needed to add XON/XOFF flow control to the stream.
91
    pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
92
}
93

            
94
/// Internal handle, used to implement a stream on a particular tunnel.
95
///
96
/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
97
/// the reader should additionally hold an `mpsc::Receiver` to get
98
/// relay messages for the stream.
99
///
100
/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
101
/// close the stream by sending an END message to the other side.
102
/// You can close a stream earlier by using [`StreamTarget::close`]
103
/// or [`StreamTarget::close_pending`].
104
#[derive(Clone, Debug)]
105
pub(crate) struct StreamTarget {
106
    /// Which hop of the circuit this stream is with.
107
    pub(crate) hop: Option<HopLocation>,
108
    /// Reactor ID for this stream.
109
    pub(crate) stream_id: StreamId,
110
    /// Encoding to use for relay cells sent on this stream.
111
    ///
112
    /// This is mostly irrelevant, except when deciding
113
    /// how many bytes we can pack in a DATA message.
114
    pub(crate) relay_cell_format: RelayCellFormat,
115
    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
116
    // TODO(arti#2068): we should consider making this an `Option`
117
    pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
118
    /// Channel to send cells down.
119
    pub(crate) tx: StreamMpscSender<AnyRelayMsg>,
120
    /// Reference to the tunnel that this stream is on.
121
    pub(crate) tunnel: Tunnel,
122
}
123

            
124
/// A client or relay tunnel.
125
#[derive(Debug, Clone, derive_more::From)]
126
pub(crate) enum Tunnel {
127
    /// A client tunnel.
128
    Client(Arc<ClientTunnel>),
129
    /// A relay tunnel.
130
    #[cfg(feature = "relay")]
131
    Relay(Arc<crate::relay::RelayCirc>),
132
}
133

            
134
impl StreamTarget {
135
    /// Deliver a relay message for the stream that owns this StreamTarget.
136
    ///
137
    /// The StreamTarget will set the correct stream ID and pick the
138
    /// right hop, but will not validate that the message is well-formed
139
    /// or meaningful in context.
140
6492
    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
141
4328
        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
142
4328
        Ok(())
143
4328
    }
144

            
145
    /// Close the pending stream that owns this StreamTarget, delivering the specified
146
    /// END message (if any)
147
    ///
148
    /// The stream is closed by sending a control message (`CtrlMsg::ClosePendingStream`)
149
    /// to the reactor.
150
    ///
151
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
152
    ///
153
    /// The StreamTarget will set the correct stream ID and pick the
154
    /// right hop, but will not validate that the message is well-formed
155
    /// or meaningful in context.
156
    ///
157
    /// Note that in many cases, the actual contents of an END message can leak unwanted
158
    /// information. Please consider carefully before sending anything but an
159
    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
160
    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
161
    ///
162
    /// In addition to sending the END message, this function also ensures
163
    /// the state of the stream map entry of this stream is updated
164
    /// accordingly.
165
    ///
166
    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
167
    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
168
    /// function is for closing pending incoming streams (a stream is said to be pending if we have
169
    /// received the message initiating the stream but have not responded to it yet).
170
    ///
171
    /// **NOTE**: This function should be called at most once per request.
172
    /// Calling it twice is an error.
173
    #[cfg(any(feature = "hs-service", feature = "relay"))]
174
12
    pub(crate) fn close_pending(
175
12
        &self,
176
12
        message: crate::stream::CloseStreamBehavior,
177
12
    ) -> Result<oneshot::Receiver<Result<()>>> {
178
12
        match &self.tunnel {
179
12
            Tunnel::Client(t) => {
180
                cfg_if::cfg_if! {
181
                    if #[cfg(feature = "hs-service")] {
182
12
                        t.close_pending(self.stream_id, self.hop, message)
183
                    } else {
184
                        Err(tor_error::internal!("close_pending() called on client stream?!").into())
185
                    }
186
                }
187
            }
188
            #[cfg(feature = "relay")]
189
            Tunnel::Relay(t) => t.close_pending(self.stream_id, message),
190
        }
191
12
    }
192

            
193
    /// Queue a "close" for the stream corresponding to this StreamTarget.
194
    ///
195
    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
196
    ///
197
    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
198
    /// on this `StreamTarget`` or any clone of it.
199
    /// The reactor *will* try to flush any already-send messages before it closes the stream.
200
    ///
201
    /// You don't need to call this method if the stream is closing because all of its StreamTargets
202
    /// have been dropped.
203
16
    pub(crate) fn close(&mut self) {
204
16
        Pin::new(&mut self.tx).close_channel();
205
16
    }
206

            
207
    /// Called when a circuit-level protocol error has occurred and the
208
    /// tunnel needs to shut down.
209
    pub(crate) fn protocol_error(&mut self) {
210
        match &self.tunnel {
211
            Tunnel::Client(t) => t.terminate(),
212
            #[cfg(feature = "relay")]
213
            Tunnel::Relay(t) => t.terminate(),
214
        }
215
    }
216

            
217
    /// Request to send a SENDME cell for this stream.
218
    ///
219
    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
220
    /// block or wait for a response from the circuit reactor.
221
    /// An error is only returned if we are unable to send the request.
222
    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
223
    /// this here and an error will not be returned.
224
    pub(crate) fn send_sendme(&mut self) -> Result<()> {
225
        match &self.tunnel {
226
            Tunnel::Client(t) => t.send_sendme(self.stream_id, self.hop),
227
            #[cfg(feature = "relay")]
228
            Tunnel::Relay(t) => t.send_sendme(self.stream_id),
229
        }
230
    }
231

            
232
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
233
    ///
234
    /// Typically the circuit reactor would send this new rate in an XON message to the other end of
235
    /// the stream.
236
    /// But it may decide not to, and may discard this update.
237
    /// For example the stream may have a large amount of buffered data, and the reactor may not
238
    /// want to send an XON while the buffer is large.
239
    ///
240
    /// This sends a message to inform the circuit reactor of the new drain rate,
241
    /// but it does not block or wait for a response from the reactor.
242
    /// An error is only returned if we are unable to send the update.
243
    pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
244
        match &mut self.tunnel {
245
            Tunnel::Client(t) => t.drain_rate_update(self.stream_id, self.hop, rate),
246
            #[cfg(feature = "relay")]
247
            Tunnel::Relay(t) => t.drain_rate_update(self.stream_id, rate),
248
        }
249
    }
250

            
251
    /// Return a reference to the tunnel that this `StreamTarget` is using.
252
    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
253
120
    pub(crate) fn tunnel(&self) -> &Tunnel {
254
120
        &self.tunnel
255
120
    }
256

            
257
    /// Return the kind of relay cell in use on this `StreamTarget`.
258
120
    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
259
120
        self.relay_cell_format
260
120
    }
261

            
262
    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
263
120
    pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
264
120
        &self.rate_limit_stream
265
120
    }
266
}
267

            
268
/// Return the stream ID of `msg`, if it has one.
269
///
270
/// Returns `Ok(None)` if `msg` is a meta cell.
271
500
pub(crate) fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
272
500
    let cmd = msg.cmd();
273
500
    let streamid = msg.stream_id();
274
500
    if !cmd.accepts_streamid_val(streamid) {
275
        return Err(Error::CircProto(format!(
276
            "Invalid stream ID {} for relay command {}",
277
            sensitive(StreamId::get_or_zero(streamid)),
278
            msg.cmd()
279
        )));
280
500
    }
281

            
282
500
    Ok(streamid)
283
500
}