1
//! Implements an outbound Sink type for cells being sent from a circuit onto a
2
//! [channel](crate::channel).
3

            
4
use std::{
5
    pin::{Pin, pin},
6
    task::{Context, Poll},
7
};
8

            
9
use cfg_if::cfg_if;
10
use futures::Sink;
11
use pin_project::pin_project;
12
use tor_rtcompat::DynTimeProvider;
13
use tracing::instrument;
14

            
15
use crate::{
16
    HopNum,
17
    channel::{ChanCellQueueEntry, ChannelSender},
18
    congestion::CongestionSignals,
19
    util::{SinkExt, sometimes_unbounded_sink::SometimesUnboundedSink},
20
};
21

            
22
cfg_if! {
23
    if #[cfg(feature="circ-padding")] {
24
        use crate::util::sink_blocker::{BooleanPolicy, SinkBlocker};
25
        /// Inner type used to implement a [`CircuitCellSender`].
26
        ///
27
        /// When `circ-padding` feature is enabled, this is a multi-level wrapper around
28
        /// a ChanSender:
29
        /// - On the outermost layer, there is a [`SinkBlocker`] that we use
30
        ///   to make this sink behave as if it were full
31
        ///   when our [circuit padding](crate::client::circuit::padding) code
32
        ///   tells us to block outbound traffic.
33
        /// - Then there is a [`SometimesUnboundedSink`] that we use to queue control messages
34
        ///   when the target `ChanSender` is full,
35
        ///   or when we traffic is blocked.
36
        /// - Finally, there is the [`ChannelSender`] itself.
37
        ///
38
        /// NOTE: We once had a second `SinkBlocker` to keep messages from the
39
        /// SometimesUnboundedSink from reaching the ChanSender
40
        /// when we were blocked on padding.
41
        /// We no longer use this SinkBlocker, since we decided in
42
        /// our [padding design] that non-data messages
43
        /// would never wait for a padding-based block.
44
        /// We can reinstate it if we change our mind.
45
        ///
46
        /// TODO: Ideally, this type would participate in the memory quota system.
47
        ///
48
        /// TODO: At some point in the future, we might want to add
49
        /// an additional _bounded_ [`futures::sink::Buffer`]
50
        /// to queue cells before they are put onto the channel,
51
        /// or to queue data from loud streams.
52
        ///
53
        /// [padding design]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/circuit-padding.md
54
        type InnerSink = SinkBlocker<
55
            SometimesUnbounded, BooleanPolicy,
56
        >;
57
        /// The type of our `SometimesUnboundedSink`, as instantiated.
58
        ///
59
        /// We use this to queue control cells.
60
        type SometimesUnbounded = SometimesUnboundedSink<
61
            ChanCellQueueEntry,
62
            // This is what we would reinstate
63
            // in order to have control messages blocked by padding frameworks:
64
            //      SinkBlocker<ChannelSender, CountingPolicy>
65
            ChannelSender
66
        >;
67
    } else {
68
        /// Inner type used to implement a [`CircuitCellSender`].
69
        ///
70
        /// When the `circ-padding` is disabled, this only adds a [`SometimesUnboundedSink`].
71
        ///
72
        /// TODO: Ideally, this type would participate in the memory quota system.
73
        /// TODO: At some point, we might want to add
74
        /// an additional _bounded_ [`futures::sink::Buffer`]
75
        /// to queue cells before they are put onto the channel.)
76
        type InnerSink = SometimesUnboundedSink<ChanCellQueueEntry, ChannelSender>;
77
        /// The type of our `SometimesUnboundedSink`, as instantiated.
78
        ///
79
        /// We use this to queue control cells.
80
        type SometimesUnbounded = InnerSink;
81
    }
82
}
83

            
84
/// A sink that a circuit uses to send cells onto a Channel.
85
///
86
/// (This is a separate type so we can more easily control access to its internals.)
87
///
88
/// ### You must poll this type
89
///
90
/// This type is based on [`SometimesUnboundedSink`].
91
/// For queued items to be delivered,
92
/// [`SometimesUnboundedSink`] must be polled,
93
/// even if you don't have an item to send.
94
/// The same rule applies here.
95
///
96
/// Currently [`Sink::poll_flush`], [`Sink::poll_close`], and [`Sink::poll_ready`]
97
/// will all work for this purpose.
98
#[pin_project]
99
pub(crate) struct CircuitCellSender {
100
    /// The actual inner sink on which we'll be sending cells.
101
    ///
102
    /// See type alias documentation for full details.
103
    #[pin]
104
    sink: InnerSink,
105
}
106

            
107
impl CircuitCellSender {
108
    /// Construct a new `CircuitCellSender` to deliver cells onto `inner`.
109
376
    pub(crate) fn from_channel_sender(inner: ChannelSender) -> Self {
110
        cfg_if! {
111
            if #[cfg(feature="circ-padding")] {
112
376
                let sink = SinkBlocker::new(
113
376
                    SometimesUnboundedSink::new(
114
376
                        inner
115
                    ),
116
376
                    BooleanPolicy::Unblocked
117
                );
118
            } else {
119
                let sink = SometimesUnboundedSink::new(inner);
120
            }
121
        }
122

            
123
376
        Self { sink }
124
376
    }
125

            
126
    /// Return the number of cells queued in this Sender
127
    /// that have not yet been flushed onto the channel.
128
44
    pub(crate) fn n_queued(&self) -> usize {
129
44
        self.sometimes_unbounded().n_queued()
130
44
    }
131

            
132
    /// Return true if we have a queued cell for the specified hop or later.
133
    #[cfg(feature = "circ-padding")]
134
    pub(crate) fn have_queued_cell_for_hop_or_later(&self, hop: HopNum) -> bool {
135
        if hop.is_first_hop() && self.chan_sender().approx_count() > 0 {
136
            // There's a cell on the outbound channel queue:
137
            // That will function perfectly well as padding to the first hop of this circuit,
138
            // whether it is actually for this circuit or not.
139
            return true;
140
        }
141

            
142
        // Now look at our own sometimes_unbounded queue.
143
        //
144
        // TODO circpad: in theory we could also look at the members of the per-channel queue to find this out!
145
        // But that's nontrivial, since the per-channel queue is implemented with an futures mpsc
146
        // channel, which doesn't have any functionality to let you inspect its queue.
147
        self.sometimes_unbounded()
148
            .iter_queue()
149
            .any(|(_, info)| info.is_some_and(|inf| inf.target_hop >= hop))
150
    }
151

            
152
    /// Send a cell on this sender,
153
    /// even if the  underlying channel queues are all full.
154
    ///
155
    /// You must `.await` this, but it will never block.
156
    /// (Its future is always `Ready`.)
157
    ///
158
    /// See note on [`CircuitCellSender`] type about polling:
159
    /// If you don't poll this sink, then queued items might never flush.
160
    #[instrument(level = "trace", skip_all)]
161
6957
    pub(crate) async fn send_unbounded(&mut self, entry: ChanCellQueueEntry) -> crate::Result<()> {
162
        Pin::new(self.sometimes_unbounded_mut())
163
            .send_unbounded(entry)
164
            .await?;
165
        self.chan_sender().note_cell_queued();
166
        Ok(())
167
4638
    }
168

            
169
    /// Return the time provider used by the underlying channel sender
170
    /// for memory quota purposes.
171
72
    pub(crate) fn time_provider(&self) -> &DynTimeProvider {
172
72
        self.chan_sender().time_provider()
173
72
    }
174

            
175
    /// Circpadding only: Put this sink into a blocked state.
176
    ///
177
    /// When we are blocked, attempts to `send()` to this sink will fail.
178
    /// You can still queue items with `send_unbounded()`,
179
    /// and they will be sent immediately.
180
    //
181
    // (Previously we would block those items too,
182
    // and only allow them to be flushed one by one,
183
    // but we changed that behavior so that non-DATA cells can _always_ be sent.)
184
    #[cfg(feature = "circ-padding")]
185
    pub(crate) fn start_blocking(&mut self) {
186
        self.pre_queue_blocker_mut().set_blocked();
187
    }
188

            
189
    /// Circpadding only: Put this sink into an unblocked state.
190
    #[cfg(feature = "circ-padding")]
191
    pub(crate) fn stop_blocking(&mut self) {
192
        self.pre_queue_blocker_mut().set_unblocked();
193
    }
194

            
195
    /// Note: This is only async because we need a Context to check the underlying sink for readiness.
196
    /// This will register a new waker (or overwrite any existing waker).
197
    #[instrument(level = "trace", skip_all)]
198
66
    pub(crate) async fn congestion_signals(&mut self) -> CongestionSignals {
199
44
        futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
200
            // We're looking at the ChanSender's in order to deliberately ignore the blocked/unblocked
201
            // status of this sink.
202
            //
203
            // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3225#note_3252061
204
            // for a deeper discussion.
205
44
            let channel_ready = self
206
44
                .chan_sender_mut()
207
44
                .poll_ready_unpin_bool(cx)
208
44
                .unwrap_or(false);
209
44
            Poll::Ready(CongestionSignals::new(
210
44
                /* channel_blocked= */ !channel_ready,
211
44
                self.n_queued(),
212
44
            ))
213
44
        })
214
        .await
215
44
    }
216

            
217
    /// Helper: return a reference to the internal [`SometimesUnboundedSink`]
218
    /// that this `CircuitCellSender` is based on.
219
44
    fn sometimes_unbounded(&self) -> &SometimesUnbounded {
220
        cfg_if! {
221
            if #[cfg(feature="circ-padding")] {
222
44
                self.sink.as_inner()
223
            } else {
224
                &self.sink
225
            }
226
        }
227
44
    }
228

            
229
    /// Helper: return a mutable reference to the internal [`SometimesUnboundedSink`]
230
    /// that this `CircuitCellSender` is based on.
231
11748
    fn sometimes_unbounded_mut(&mut self) -> &mut SometimesUnbounded {
232
        cfg_if! {
233
            if #[cfg(feature="circ-padding")] {
234
11748
                self.sink.as_inner_mut()
235
            } else {
236
                &mut self.sink
237
            }
238
        }
239
11748
    }
240

            
241
    /// Helper: Return a reference to the internal [`ChannelSender`]
242
    /// that this `CircuitCellSender` is based on.
243
4706
    fn chan_sender(&self) -> &ChannelSender {
244
        cfg_if! {
245
            if #[cfg(feature="circ-padding")] {
246
4706
                self.sink.as_inner().as_inner()
247
            } else {
248
                self.sink.as_inner()
249
            }
250
        }
251
4706
    }
252

            
253
    /// Helper: Return a mutable reference to the internal [`ChannelSender`]
254
    /// that this `CircuitCellSender` is based on.
255
44
    fn chan_sender_mut(&mut self) -> &mut ChannelSender {
256
        cfg_if! {
257
            if #[cfg(feature="circ-padding")] {
258
44
                self.sink.as_inner_mut().as_inner_mut()
259
            } else {
260
                self.sink.as_inner_mut()
261
            }
262
        }
263
44
    }
264

            
265
    /// Helper: Return a mutable reference to our outer [`SinkBlocker`]
266
    #[cfg(feature = "circ-padding")]
267
    fn pre_queue_blocker_mut(&mut self) -> &mut InnerSink {
268
        &mut self.sink
269
    }
270
}
271

            
272
impl Sink<ChanCellQueueEntry> for CircuitCellSender {
273
    type Error = <ChannelSender as Sink<ChanCellQueueEntry>>::Error;
274

            
275
7110
    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
276
        cfg_if! {
277
            if #[cfg(feature = "circ-padding")] {
278
                // In this case, our sink is _not_ the same as our SometimesUnboundedSink.
279
                // But we need to ensure that SometimesUnboundedMut gets polled
280
                // unconditionally, so that it can actually flush its members.
281
                //
282
                // We don't actually _care_ if it's ready;
283
                // we just need to make sure that it gets polled.
284
                // See the "You must poll this type" comment on SometimesUnboundedSink.
285
7110
                let _ignore = pin!(self.sometimes_unbounded_mut()).poll_ready(cx);
286
            }
287
        }
288
7110
        self.project().sink.poll_ready(cx)
289
7110
    }
290

            
291
    fn start_send(mut self: Pin<&mut Self>, item: ChanCellQueueEntry) -> Result<(), Self::Error> {
292
        self.as_mut().project().sink.start_send(item)?;
293
        self.chan_sender().note_cell_queued();
294
        Ok(())
295
    }
296

            
297
48
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
298
48
        self.project().sink.poll_flush(cx)
299
48
    }
300

            
301
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
302
        self.project().sink.poll_close(cx)
303
    }
304
}