1
//! Implements a sender and a [`Stream`] type for sending messages
2
//! from a [channel](crate::channel) to a circuit,
3
//! prioritizing the delivery of `DESTROY` messages.
4
//!
5
//! [`CircuitRxSender`] and [`CircuitRxReceiver`] take any channel message,
6
//! because the receiving end can be either a client or a relay circuit reactor.
7
//! The reactor itself will convert into its restricted message set.
8

            
9
use std::pin::Pin;
10
use std::task::{self, Context, Poll};
11

            
12
use futures::{FutureExt as _, SinkExt as _, Stream, StreamExt as _};
13
use oneshot_fused_workaround as oneshot;
14
use tor_basic_utils::assert_val_impl_trait;
15
use tor_cell::chancell::msg::{AnyChanMsg, Destroy};
16
use tor_memquota::mq_queue::{self, ChannelSpec, MpscSpec};
17

            
18
/// The sending end of the SPSC queue for inbound data on its way from channel to circuit
19
///
20
/// A [`CircuitRxSender`] sender is closed for sending as soon as the first
21
/// `DESTROY` message is sent, and will discard any unflushed cells
22
/// from its underlying [`mq_queue`], by dropping it.
23
///
24
/// ## No [`Sink`](futures::Sink) implementation
25
///
26
/// This type intentionally does not implement [`Sink`](futures::Sink).
27
/// Instead it provides a [`send()`](CircuitRxSender::send) function
28
/// similar to [`SinkExt::send`](futures::SinkExt::send).
29
///
30
/// The reason for doing it this way is because we cannot provide
31
/// a correct `Sink::poll:ready()` implementation
32
/// that wouldn't block DESTROY cells from being sent
33
/// when our underlying MPSC sender is full:
34
/// `SinkExt::send()` calls `poll_ready()` followed by `start_send()`,
35
/// so in order for our `poll_ready()` implementation to not block DESTROY
36
/// on the MPSC queue's readiness, it would need to know whether
37
/// the cell that will be sent via `start_send()` is a DESTROY or not,
38
/// but that's not possible because of the way the `Sink`/`SinkExt` traits
39
/// are designed.
40
#[derive(Debug)]
41
pub(crate) struct CircuitRxSender(Option<CircuitRxSenderInner>);
42

            
43
/// The inner state of a [`CircuitRxSender`].
44
#[derive(Debug)]
45
struct CircuitRxSenderInner {
46
    /// Sender for sending `DESTROY` to [`CircuitRxReceiver`]
47
    destroy_tx: oneshot::Sender<Destroy>,
48
    /// Sender for sending all other [`AnyChanMsg`]s to [`CircuitRxReceiver`]
49
    cell_tx: mq_queue::Sender<AnyChanMsg, MpscSpec>,
50
}
51

            
52
/// The receiving end of the SPSC queue for inbound data on its way from channel to circuit
53
///
54
/// A [`CircuitRxReceiver`] stream ends as soon as the first `DESTROY` message
55
/// is received, causing the stream to discard any unflushed cells
56
/// from its underlying [`mq_queue`], by dropping it.
57
#[derive(Debug)]
58
pub(crate) struct CircuitRxReceiver(Option<CircuitRxReceiverInner>);
59

            
60
/// The inner state of a [`CircuitRxReceiver`].
61
#[derive(Debug)]
62
struct CircuitRxReceiverInner {
63
    /// Receiver for receiving `DESTROY` from [`CircuitRxSender`]
64
    destroy_rx: oneshot::Receiver<Destroy>,
65
    /// Receiver for receiving all other [`AnyChanMsg`]s from [`CircuitRxReceiver`]
66
    cell_rx: mq_queue::Receiver<AnyChanMsg, MpscSpec>,
67
}
68

            
69
/// Wrap the sender and receiver of an [`mq_queue`] channel
70
/// into [`CircuitRxSender`] and [`CircuitRxReceiver`].
71
///
72
/// The returned channel will ensure any DESTROY messages sent
73
/// over the [`CircuitRxSender`] will be delivered
74
/// by the [`CircuitRxReceiver`] immediately,
75
/// ahead of any other messages that might already be queued,
76
/// which will be discarded.
77
///
78
/// We are fine with the resulting data loss, because inbound DESTROY
79
/// can be indicative of malicious activity on the circuit.
80
/// We choose to err on the safe side, and free up the resources associated
81
/// with such circuits as soon as possible.
82
/// DESTROY messages are also sent by relays when they're about to hibernate,
83
/// and by clients once they've decided to stop using a circuit.
84
/// In the latter case, the lack of an `RELAY_COMMAND_END_ACK`
85
/// does mean that this prioritization can cause data loss
86
/// (if the client closes the circuit immediately after END-ing a stream).
87
/// However, this is a deficiency in the protocol,
88
/// and not something we want to fix by implementing custom flushing logic
89
/// in the reactor. See torspec#196 and the discussion in #2490.
90
///
91
/// Note: the underlying buffer of the [`mq_queue`] will only be freed
92
/// once both the [`CircuitRxSender`] and [`CircuitRxReceiver`] are dropped;
93
/// in other words, after a `DESTROY` cell has been obtained from the [`CircuitRxReceiver`],
94
/// via its [`Stream`] implementation
95
996
pub(crate) fn channel(
96
996
    cell_tx: mq_queue::Sender<AnyChanMsg, MpscSpec>,
97
996
    cell_rx: mq_queue::Receiver<AnyChanMsg, MpscSpec>,
98
996
) -> (CircuitRxSender, CircuitRxReceiver) {
99
996
    let (destroy_tx, destroy_rx) = oneshot::channel();
100
996
    let sender = CircuitRxSender(Some(CircuitRxSenderInner {
101
996
        destroy_tx,
102
996
        cell_tx,
103
996
    }));
104

            
105
996
    let receiver = CircuitRxReceiver(Some(CircuitRxReceiverInner {
106
996
        destroy_rx,
107
996
        cell_rx,
108
996
    }));
109

            
110
996
    (sender, receiver)
111
996
}
112

            
113
impl Stream for CircuitRxReceiver {
114
    type Item = AnyChanMsg;
115

            
116
9160
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
117
9160
        let Some(inner) = self.0.as_mut() else {
118
16
            return Poll::Ready(None);
119
        };
120

            
121
        // It's important that destroy_rx is fused,
122
        // because we call poll_unpin() unconditionally below.
123
9144
        assert_val_impl_trait!(inner.destroy_rx, futures_util::future::FusedFuture);
124

            
125
        // First, check if we have a DESTROY message ready
126
9144
        let destroy_cell = match inner.destroy_rx.poll_unpin(cx) {
127
58
            Poll::Ready(destroy) => {
128
                // If destroy.is_err(), it means the CircuitRxSender was dropped,
129
                // but there may be more data buffered in the underlying mpsc,
130
                // so we need to continue polling cell_rx.
131
                //
132
                // This is important, because we want to preserve the behavior
133
                // of the mq_queue, whose Receiver will continue yielding queued
134
                // messages even after the Sender is dropped.
135
58
                destroy.ok()
136
            }
137
            Poll::Pending => {
138
                // No DESTROY message yet, so it's time to poll the non-priority
139
                // message queue
140
9086
                None
141
            }
142
        };
143

            
144
9144
        if let Some(destroy) = destroy_cell {
145
            // Drop the inner state, closing this stream
146
48
            self.0 = None;
147
48
            return Poll::Ready(Some(AnyChanMsg::Destroy(destroy)));
148
9096
        }
149

            
150
9096
        let res = task::ready!(inner.cell_rx.poll_next_unpin(cx));
151

            
152
        // Our CircuitRxSender impl will never send DESTROY messages
153
        // on the cell_rx queue (they're always sent via the oneshot channel)
154
578
        debug_assert!(!matches!(res, Some(AnyChanMsg::Destroy(_))));
155

            
156
578
        Poll::Ready(res)
157
9160
    }
158
}
159

            
160
/// Error returned when trying to write to a [`CircuitRxSender`]
161
#[derive(thiserror::Error, Clone, Debug)]
162
pub(crate) enum SendError {
163
    /// The underlying MPSC channel rejected the message
164
    #[error("{0}")]
165
    Channel(#[from] mq_queue::SendError<<MpscSpec as ChannelSpec>::SendError>),
166

            
167
    /// The receiver has dropped
168
    ///
169
    // Note: technically, there are two "Disconnected" variants:
170
    // this one, for the oneshot channel, and a second, hidden variant
171
    // inside mq_queue:SendError, for the mq_queue one.
172
    //
173
    // It would be nice if we only had one variant covering both cases,
174
    // but this will have to do for now.
175
    #[error("the receiver has dropped")]
176
    Disconnected,
177

            
178
    /// The sender is closed
179
    ///
180
    /// Returned if the [`CircuitRxSender`] is used after a DESTROY cell has been written to it.
181
    #[error("sender has closed")]
182
    Closed,
183
}
184

            
185
impl CircuitRxSender {
186
    /// Send a cell down this channel
187
    ///
188
    /// If the sender is already closed (i.e., if we have already sent DESTROY),
189
    /// this will return an error.
190
    ///
191
    // In practice, we never write more than 1 DESTROY cell to this sender,
192
    // because the channel reactor removes the circuit (and corresponding CircuitRxSender)
193
    // from its circ map after the first DESTROY.
194
1062
    pub(crate) async fn send(&mut self, msg: AnyChanMsg) -> Result<(), SendError> {
195
708
        if let AnyChanMsg::Destroy(d) = msg {
196
48
            let inner = self.take_inner()?;
197

            
198
48
            if inner.destroy_tx.send(d).is_err() {
199
                return Err(SendError::Disconnected);
200
48
            }
201

            
202
48
            Ok(())
203
        } else {
204
660
            self.borrow_for_sending()?.cell_tx.send(msg).await?;
205
640
            Ok(())
206
        }
207
704
    }
208

            
209
    /// Borrow the [`CircuitRxSenderInner`] state for sending.
210
    ///
211
    /// Returns an error if the sender is closed.
212
660
    fn borrow_for_sending(&mut self) -> Result<&mut CircuitRxSenderInner, SendError> {
213
660
        self.0.as_mut().ok_or_else(|| SendError::Closed)
214
660
    }
215

            
216
    /// Take the inner [`CircuitRxSenderInner`], closing the sender.
217
    ///
218
    /// Returns an error if the sender is already closed.
219
48
    fn take_inner(&mut self) -> Result<CircuitRxSenderInner, SendError> {
220
48
        self.0.take().ok_or_else(|| SendError::Closed)
221
48
    }
222
}
223

            
224
#[cfg(test)]
225
pub(crate) mod test {
226
    // @@ begin test lint list maintained by maint/add_warning @@
227
    #![allow(clippy::bool_assert_comparison)]
228
    #![allow(clippy::clone_on_copy)]
229
    #![allow(clippy::dbg_macro)]
230
    #![allow(clippy::mixed_attributes_style)]
231
    #![allow(clippy::print_stderr)]
232
    #![allow(clippy::print_stdout)]
233
    #![allow(clippy::single_char_pattern)]
234
    #![allow(clippy::unwrap_used)]
235
    #![allow(clippy::unchecked_time_subtraction)]
236
    #![allow(clippy::useless_vec)]
237
    #![allow(clippy::needless_pass_by_value)]
238
    #![allow(clippy::string_slice)] // See arti#2571
239
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
240

            
241
    use super::*;
242

            
243
    use tor_cell::chancell::msg::{self, DestroyReason};
244
    use tor_rtmock::MockRuntime;
245

            
246
    use std::task::Waker;
247

            
248
    /// Make an MPSC queue, of the type we use to send cells
249
    /// from the channel reactor to the circuit reactor,
250
    /// but a fake one for testing
251
    #[cfg(test)]
252
    pub(crate) fn fake_mpsc(buffer: usize) -> (CircuitRxSender, CircuitRxReceiver) {
253
        let (tx, rx) = crate::fake_mpsc(buffer);
254

            
255
        crate::circuit::circ_sender::channel(tx, rx)
256
    }
257

            
258
    /// A DESTROY message
259
    fn destroy_msg(reason: DestroyReason) -> AnyChanMsg {
260
        AnyChanMsg::Destroy(msg::Destroy::new(reason))
261
    }
262

            
263
    /// A RELAY message
264
    fn relay_msg() -> AnyChanMsg {
265
        AnyChanMsg::Relay(msg::Relay::new(b"hello"))
266
    }
267

            
268
    macro_rules! assert_eos {
269
        ($tx:expr, $rx:expr) => {{
270
            assert!($rx.next().await.is_none());
271
            // Cannot send any more cells once the sender is closed
272
            let err = $tx.send(relay_msg()).await.unwrap_err();
273
            assert!(matches!(err, SendError::Closed));
274
        }};
275
    }
276

            
277
    /// The buffer size to use for the fake MPSC queues
278
    const BUFFER_SIZE: usize = 16;
279

            
280
    #[test]
281
    fn destroy_skips_queue() {
282
        MockRuntime::test_with_various(|_rt| async move {
283
            let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE);
284

            
285
            tx.send(relay_msg()).await.unwrap();
286
            tx.send(destroy_msg(DestroyReason::HIBERNATING))
287
                .await
288
                .unwrap();
289

            
290
            // Destroy skips the queue
291
            let destroy = rx.next().await.unwrap();
292

            
293
            assert!(matches!(destroy, AnyChanMsg::Destroy(_)));
294
            // And we've reached EOS
295
            assert_eos!(tx, rx);
296
        });
297
    }
298

            
299
    #[test]
300
    fn destroy_on_empty_queue() {
301
        MockRuntime::test_with_various(|_rt| async move {
302
            let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE);
303

            
304
            tx.send(destroy_msg(DestroyReason::HIBERNATING))
305
                .await
306
                .unwrap();
307
            let destroy = rx.next().await.unwrap();
308

            
309
            assert!(matches!(destroy, AnyChanMsg::Destroy(_)));
310
            // And we've reached EOS
311
            assert_eos!(tx, rx);
312
        });
313
    }
314

            
315
    #[test]
316
    fn destroy_after_data() {
317
        MockRuntime::test_with_various(|_rt| async move {
318
            let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE);
319

            
320
            for _ in 0..3 {
321
                tx.send(relay_msg()).await.unwrap();
322
            }
323

            
324
            for _ in 0..3 {
325
                let data = rx.next().await.unwrap();
326
                assert!(matches!(data, AnyChanMsg::Relay(_)));
327
            }
328

            
329
            let mut noop_cx = Context::from_waker(Waker::noop());
330
            // The queue is now empty
331
            assert!(rx.poll_next_unpin(&mut noop_cx).is_pending());
332

            
333
            tx.send(destroy_msg(DestroyReason::PROTOCOL)).await.unwrap();
334

            
335
            let destroy = rx.next().await.unwrap();
336
            assert!(matches!(destroy, AnyChanMsg::Destroy(_)));
337
            // And we've reached EOS
338
            assert_eos!(tx, rx);
339
        });
340
    }
341

            
342
    #[test]
343
    fn destroy_full_queue() {
344
        MockRuntime::test_with_various(|_rt| async move {
345
            let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE);
346

            
347
            // Fill the queue with data...
348
            loop {
349
                let fut = Box::pin(tx.send(relay_msg()));
350
                match futures::poll!(fut) {
351
                    Poll::Pending => {
352
                        // Full, time to break
353
                        break;
354
                    }
355
                    Poll::Ready(res) => {
356
                        let () = res.unwrap();
357
                    }
358
                }
359
            }
360
            // ...followed by a destroy
361
            tx.send(destroy_msg(DestroyReason::INTERNAL)).await.unwrap();
362

            
363
            // The destroy cell goes through even though the queue is full,
364
            // ahead of all the queued data
365
            let destroy = rx.next().await.unwrap();
366

            
367
            assert!(matches!(destroy, AnyChanMsg::Destroy(_)));
368
            // And we've reached EOS
369
            assert_eos!(tx, rx);
370
        });
371
    }
372
}