1
//! Declare the lowest level of stream: a stream that operates on raw
2
//! cells.
3

            
4
use std::pin::Pin;
5
use std::task::{Context, Poll};
6

            
7
use futures::Stream;
8
use pin_project::pin_project;
9
use tor_async_utils::peekable_stream::UnobtrusivePeekableStream;
10
use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
11
use tracing::debug;
12

            
13
use crate::congestion::sendme;
14
use crate::stream::StreamTarget;
15
use crate::stream::queue::StreamQueueReceiver;
16
use crate::{Error, Result};
17

            
18
/// The read part of a stream on a particular circuit.
19
///
20
/// This [`Stream`](Stream) will return incoming messages for this Tor stream, excluding flow control
21
/// related messages like SENDME, XON, and XOFF.
22
///
23
/// To avoid ambiguity, the following uses "stream" to refer to the `futures::Stream`, not the Tor
24
/// stream.
25
///
26
/// If the stream ends unexpectedly (before an END message), the stream will return an error.
27
/// After the stream returns an END message or an error, this stream will be "terminated" and future
28
/// [`poll_next`](Stream::poll_next) calls will return `None`.
29
// I think it would be better to *not* return an error if the stream ends before an END message is
30
// received, and just return `None`. The caller will know if it received an END message or not, so
31
// returning an error isn't very useful and is maybe unexpected.
32
#[derive(Debug)]
33
#[pin_project]
34
pub struct StreamReceiver {
35
    /// The underlying `StreamTarget` for this stream.
36
    ///
37
    /// A reader has this target in order to:
38
    ///   * Make the reactor send SENDME messages.
39
    ///   * Tell the reactor when there is a protocol error.
40
    ///   * Keep the stream alive at least until the StreamReceiver
41
    ///     is dropped.
42
    pub(crate) target: StreamTarget,
43
    /// Channel to receive stream messages from the reactor.
44
    #[pin]
45
    pub(crate) receiver: StreamQueueReceiver,
46
    /// Congestion control receive window for this stream.
47
    ///
48
    /// Having this here means we're only going to update it when the end consumer of this stream
49
    /// actually reads things, meaning we don't ask for more data until it's actually needed (as
50
    /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
51
    /// with having to buffer data).
52
    pub(crate) recv_window: sendme::StreamRecvWindow,
53
    /// Whether or not this stream has ended.
54
    pub(crate) ended: bool,
55
}
56

            
57
impl StreamReceiver {
58
    /// Try to read the next relay message from this stream.
59
428
    fn poll_next_inner(
60
428
        mut self: Pin<&mut Self>,
61
428
        cx: &mut Context<'_>,
62
428
    ) -> Result<Poll<UnparsedRelayMsg>> {
63
428
        let msg = match self.as_mut().project().receiver.poll_next(cx) {
64
208
            Poll::Ready(Some(msg)) => msg,
65
            Poll::Ready(None) => {
66
                // The channel is indicating that it has terminated, likely from a dropped sender.
67
                // But if we're here, it means we never received an END cell.
68
                //
69
                // This generally (exclusively?) means that the circuit was destroyed before the
70
                // peer sent an END message.
71
                return Err(Error::CircuitClosed);
72
            }
73
220
            Poll::Pending => return Ok(Poll::Pending),
74
        };
75

            
76
208
        if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
77
            if let Err(e) = self.target.send_sendme() {
78
                if matches!(e, Error::CircuitClosed) {
79
                    // If the tunnel has closed, sending a message to the tunnel reactor may fail.
80
                    // But this is okay. We still want the user to be able to continue reading the
81
                    // remaining queued data for this stream, and if the tunnel has closed it
82
                    // wouldn't make sense to send a SENDME message anyways.
83
                    debug!("Failed to send stream-level SENDME. Ignoring: {e}");
84
                } else {
85
                    // This error is unexpected. Let's return it to the user.
86
                    return Err(e);
87
                }
88
            }
89
            self.recv_window.put();
90
208
        }
91

            
92
208
        Ok(Poll::Ready(msg))
93
428
    }
94

            
95
    /// Shut down this stream.
96
    pub fn protocol_error(&mut self) {
97
        self.target.protocol_error();
98
    }
99

            
100
    /// Is the stream currently empty?
101
    ///
102
    /// This method is inherently subject to race conditions. More data may arrive even before this
103
    /// method returns, so a result of `true` might have been outdated before the method even
104
    /// returned.
105
    ///
106
    /// This takes a `&mut` so that we can peek the stream.
107
    ///
108
    /// We provide an `is_empty` method rather than implementing [`UnobtrusivePeekableStream`]
109
    /// directly since `UnobtrusivePeekableStream` allows you to mutate the peeked item, which could
110
    /// break any accounting we do here in `StreamReceiver` (like stream sendme accounting). Also
111
    /// the stream types are incompatible (the inner receiver returns items of `UnparsedRelayMsg`,
112
    /// but this [`StreamReceiver`] returns items of `Result<UnparsedRelayMsg>`).
113
    pub(crate) fn is_empty(&mut self) -> bool {
114
        // The `StreamQueueReceiver` gives us two ways of checking if the queue is empty:
115
        // `unobtrusive_peek().is_none()` and `approx_stream_bytes() == 0`. The peek seems like a
116
        // better approach, so we do that here.
117
        // TODO(arti#534): Should reconsider using `unobtrusive_peek()`. What we really want to know
118
        // is if there is more stream data in the queue. But peeking only tells us if there are more
119
        // messages. There could be more messages, but none of them data messages.
120
        let peek_is_none = Pin::new(&mut self.receiver).unobtrusive_peek().is_none();
121

            
122
        // if the peek says that the stream is empty, assert that `approx_stream_bytes()` shows 0
123
        // bytes
124
        #[cfg(debug_assertions)]
125
        if peek_is_none {
126
            assert_eq!(self.receiver.approx_stream_bytes(), 0);
127
        } else {
128
            // if the peek is not empty it doesn't mean that approx_stream_bytes() != 0,
129
            // since there may be messages that contain no stream data
130
        }
131

            
132
        peek_is_none
133
    }
134
}
135

            
136
impl Stream for StreamReceiver {
137
    type Item = Result<UnparsedRelayMsg>;
138

            
139
428
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140
428
        if self.ended {
141
            // Prevent reading more messages from streams after they've ended. `None` indicates that
142
            // the stream is complete/terminated.
143
            return Poll::Ready(None);
144
428
        }
145

            
146
428
        match self.as_mut().poll_next_inner(cx) {
147
220
            Ok(Poll::Pending) => Poll::Pending,
148
208
            Ok(Poll::Ready(msg)) => {
149
208
                if msg.cmd() == RelayCmd::END {
150
24
                    // We return the END cell, and future polls will return `None`.
151
24
                    self.ended = true;
152
184
                }
153
208
                Poll::Ready(Some(Ok(msg)))
154
            }
155
            Err(e) => {
156
                // We return the error, and future polls will return `None`.
157
                self.ended = true;
158
                Poll::Ready(Some(Err(e)))
159
            }
160
        }
161
428
    }
162
}