1
//! Module exposing structures relating to the reactor's view of a circuit's hops.
2

            
3
use super::{CircuitCmd, CloseStreamBehavior};
4
use crate::circuit::circhop::{CircHopInbound, CircHopOutbound, HopSettings, SendRelayCell};
5
use crate::client::reactor::circuit::path::PathEntry;
6
use crate::congestion::CongestionControl;
7
use crate::crypto::cell::HopNum;
8
use crate::stream::StreamMpscReceiver;
9
use crate::stream::cmdcheck::AnyCmdChecker;
10
use crate::stream::flow_ctrl::state::StreamRateLimit;
11
use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
12
use crate::stream::queue::StreamQueueSender;
13
use crate::streammap::{self, StreamEntMut, StreamMap};
14
use crate::tunnel::TunnelScopedCircId;
15
use crate::util::notify::NotifySender;
16
use crate::util::tunnel_activity::TunnelActivity;
17
use crate::{Error, Result};
18

            
19
use futures::Stream;
20
use futures::stream::FuturesUnordered;
21
use postage::watch;
22
use smallvec::SmallVec;
23
use tor_cell::chancell::BoxedCellBody;
24
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
25
use tor_cell::relaycell::msg::AnyRelayMsg;
26
use tor_cell::relaycell::{
27
    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, StreamId,
28
    UnparsedRelayMsg,
29
};
30

            
31
use safelog::sensitive as sv;
32
use tor_error::Bug;
33
use tracing::instrument;
34

            
35
use std::result::Result as StdResult;
36
use std::sync::{Arc, Mutex, MutexGuard};
37
use std::task::Poll;
38
use std::time::Instant;
39

            
40
#[cfg(test)]
41
use tor_cell::relaycell::msg::SendmeTag;
42

            
43
/// The "usual" number of hops in a [`CircHopList`].
44
///
45
/// This saves us a heap allocation when the number of hops is less than or equal to this value.
46
const NUM_HOPS: usize = 3;
47

            
48
/// Represents the reactor's view of a circuit's hop.
49
#[derive(Default)]
50
pub(crate) struct CircHopList {
51
    /// The list of hops.
52
    hops: SmallVec<[CircHop; NUM_HOPS]>,
53
}
54

            
55
impl CircHopList {
56
    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
57
4112
    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
58
4112
        self.hops.get(Into::<usize>::into(hopnum))
59
4112
    }
60

            
61
    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
62
6460
    pub(super) fn get_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
63
6460
        self.hops.get_mut(Into::<usize>::into(hopnum))
64
6460
    }
65

            
66
    /// Append the specified hop.
67
1020
    pub(crate) fn push(&mut self, hop: CircHop) {
68
1020
        self.hops.push(hop);
69
1020
    }
70

            
71
    /// Returns `true` if the list contains no [`CircHop`]s.
72
4886
    pub(crate) fn is_empty(&self) -> bool {
73
4886
        self.hops.is_empty()
74
4886
    }
75

            
76
    /// Returns the number of hops in the list.
77
2292
    pub(crate) fn len(&self) -> usize {
78
2292
        self.hops.len()
79
2292
    }
80

            
81
    /// Returns a [`Stream`] of [`CircuitCmd`] to poll from the main loop.
82
    ///
83
    /// The iterator contains at most one [`CircuitCmd`] for each hop,
84
    /// representing the instructions for handling the ready-item, if any,
85
    /// of its highest priority stream.
86
    ///
87
    /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
88
    /// To avoid contention, never create more than one
89
    /// [`ready_streams_iterator`](Self::ready_streams_iterator)
90
    /// stream at a time!
91
    ///
92
    /// This is cancellation-safe.
93
7046
    pub(in crate::client::reactor) fn ready_streams_iterator(
94
7046
        &self,
95
7046
        exclude: Option<HopNum>,
96
7046
    ) -> impl Stream<Item = CircuitCmd> + use<> {
97
7046
        self.hops
98
7046
            .iter()
99
7046
            .enumerate()
100
23929
            .filter_map(|(i, hop)| {
101
20406
                let hop_num = HopNum::from(i as u8);
102

            
103
20406
                if exclude == Some(hop_num) {
104
                    // We must skip polling this hop
105
1478
                    return None;
106
18928
                }
107

            
108
18928
                if !hop.ccontrol().can_send() {
109
                    // We can't send anything on this hop that counts towards SENDME windows.
110
                    //
111
                    // In theory we could send messages that don't count towards
112
                    // windows (like `RESOLVE`), and process end-of-stream
113
                    // events (to send an `END`), but it's probably not worth
114
                    // doing an O(N) iteration over flow-control-ready streams
115
                    // to see if that's the case.
116
                    //
117
                    // This *doesn't* block outgoing flow-control messages (e.g.
118
                    // SENDME), which are initiated via the control-message
119
                    // channel, handled above.
120
                    //
121
                    // TODO: Consider revisiting. OTOH some extra throttling when circuit-level
122
                    // congestion control has "bottomed out" might not be so bad, and the
123
                    // alternatives have complexity and/or performance costs.
124
                    return None;
125
18928
                }
126

            
127
18928
                let hop_map = Arc::clone(self.hops[i].stream_map());
128
19004
                Some(futures::future::poll_fn(move |cx| {
129
                    // Process an outbound message from the first ready stream on
130
                    // this hop. The stream map implements round robin scheduling to
131
                    // ensure fairness across streams.
132
                    // TODO: Consider looping here to process multiple ready
133
                    // streams. Need to be careful though to balance that with
134
                    // continuing to service incoming and control messages.
135
19004
                    let mut hop_map = hop_map.lock().expect("lock poisoned");
136
19004
                    let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
137
                        // No ready streams for this hop.
138
14766
                        return Poll::Pending;
139
                    };
140

            
141
4238
                    if msg.is_none() {
142
58
                        return Poll::Ready(CircuitCmd::CloseStream {
143
58
                            hop: hop_num,
144
58
                            sid,
145
58
                            behav: CloseStreamBehavior::default(),
146
58
                            reason: streammap::TerminateReason::StreamTargetClosed,
147
58
                        });
148
4180
                    };
149
4180
                    let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
150

            
151
                    #[allow(unused)] // unused in non-debug builds
152
4180
                    let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
153
                        panic!("Stream {sid} disappeared");
154
                    };
155

            
156
4180
                    debug_assert!(
157
4180
                        s.can_send(&msg),
158
                        "Stream {sid} produced a message it can't send: {msg:?}"
159
                    );
160

            
161
4180
                    let cell = SendRelayCell {
162
4180
                        hop: Some(hop_num),
163
4180
                        early: false,
164
4180
                        cell: AnyRelayMsgOuter::new(Some(sid), msg),
165
4180
                    };
166
4180
                    Poll::Ready(CircuitCmd::Send(cell))
167
19004
                }))
168
20406
            })
169
7046
            .collect::<FuturesUnordered<_>>()
170
7046
    }
171

            
172
    /// Remove all halfstreams that are expired at `now`.
173
7046
    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
174
20406
        for hop in self.hops.iter_mut() {
175
20406
            hop.stream_map()
176
20406
                .lock()
177
20406
                .expect("lock poisoned")
178
20406
                .remove_expired_halfstreams(now);
179
20406
        }
180
7046
    }
181

            
182
    /// Returns true if there are any streams on this circuit
183
    ///
184
    /// Important: this function locks the stream map of its each of the [`CircHop`]s
185
    /// in this circuit, so it must **not** be called from any function where the
186
    /// stream map lock is held (such as [`ready_streams_iterator`](Self::ready_streams_iterator).
187
52
    pub(super) fn has_streams(&self) -> bool {
188
182
        self.hops.iter().any(|hop| {
189
156
            hop.stream_map()
190
156
                .lock()
191
156
                .expect("lock poisoned")
192
156
                .n_open_streams()
193
156
                > 0
194
156
        })
195
52
    }
196

            
197
    /// Return the most active [`TunnelActivity`] for any hop on this `CircHopList`.
198
    pub(crate) fn tunnel_activity(&self) -> TunnelActivity {
199
        self.hops
200
            .iter()
201
            .map(|hop| {
202
                hop.stream_map()
203
                    .lock()
204
                    .expect("Poisoned lock")
205
                    .tunnel_activity()
206
            })
207
            .max()
208
            .unwrap_or_else(TunnelActivity::never_used)
209
    }
210
}
211

            
212
/// Represents the reactor's view of a single hop.
213
pub(crate) struct CircHop {
214
    /// The unique ID of the circuit. Used for logging.
215
    unique_id: TunnelScopedCircId,
216
    /// Hop number in the path.
217
    hop_num: HopNum,
218
    /// The inbound state of the hop.
219
    ///
220
    /// Used for processing cells received from this hop.
221
    inbound: CircHopInbound,
222
    /// The outbound state of the hop.
223
    ///
224
    /// Used for preparing cells to send to this hop.
225
    outbound: CircHopOutbound,
226
}
227

            
228
impl CircHop {
229
    /// Create a new hop.
230
1020
    pub(crate) fn new(
231
1020
        unique_id: TunnelScopedCircId,
232
1020
        hop_num: HopNum,
233
1020
        settings: &HopSettings,
234
1020
    ) -> Self {
235
1020
        let relay_format = settings.relay_crypt_protocol().relay_cell_format();
236

            
237
1020
        let ccontrol = Arc::new(Mutex::new(CongestionControl::new(&settings.ccontrol)));
238
1020
        let inbound = CircHopInbound::new(RelayCellDecoder::new(relay_format), settings);
239

            
240
1020
        let outbound = CircHopOutbound::new(
241
1020
            ccontrol,
242
1020
            relay_format,
243
1020
            Arc::new(settings.flow_ctrl_params.clone()),
244
1020
            settings,
245
        );
246

            
247
1020
        CircHop {
248
1020
            unique_id,
249
1020
            hop_num,
250
1020
            inbound,
251
1020
            outbound,
252
1020
        }
253
1020
    }
254

            
255
    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
256
    /// `message` to the provided hop.
257
96
    pub(crate) fn begin_stream(
258
96
        &mut self,
259
96
        message: AnyRelayMsg,
260
96
        sender: StreamQueueSender,
261
96
        rx: StreamMpscReceiver<AnyRelayMsg>,
262
96
        rate_limit_updater: watch::Sender<StreamRateLimit>,
263
96
        drain_rate_requester: NotifySender<DrainRateRequest>,
264
96
        cmd_checker: AnyCmdChecker,
265
96
    ) -> Result<(SendRelayCell, StreamId)> {
266
96
        self.outbound.begin_stream(
267
96
            Some(self.hop_num),
268
96
            message,
269
96
            sender,
270
96
            rx,
271
96
            rate_limit_updater,
272
96
            drain_rate_requester,
273
96
            cmd_checker,
274
        )
275
96
    }
276

            
277
    /// Close the stream associated with `id` because the stream was
278
    /// dropped.
279
    ///
280
    /// See [`CircHopOutbound::close_stream`].
281
70
    pub(crate) fn close_stream(
282
70
        &mut self,
283
70
        id: StreamId,
284
70
        message: CloseStreamBehavior,
285
70
        why: streammap::TerminateReason,
286
70
        expiry: Instant,
287
70
    ) -> Result<Option<SendRelayCell>> {
288
70
        self.outbound
289
70
            .close_stream(self.unique_id, id, Some(self.hop_num), message, why, expiry)
290
70
    }
291

            
292
    /// Check if we should send an XON message.
293
    ///
294
    /// If we should, then returns the XON message that should be sent.
295
    #[instrument(level = "trace", skip_all)]
296
    pub(crate) fn maybe_send_xon(
297
        &mut self,
298
        rate: XonKbpsEwma,
299
        id: StreamId,
300
    ) -> Result<Option<Xon>> {
301
        self.outbound.maybe_send_xon(rate, id)
302
    }
303

            
304
    /// Check if we should send an XOFF message.
305
    ///
306
    /// If we should, then returns the XOFF message that should be sent.
307
216
    pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
308
216
        self.outbound.maybe_send_xoff(id)
309
216
    }
310

            
311
    /// Return the format that is used for relay cells sent to this hop.
312
    ///
313
    /// For the most part, this format isn't necessary to interact with a CircHop;
314
    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
315
4726
    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
316
4726
        self.outbound.relay_cell_format()
317
4726
    }
318

            
319
    /// Delegate to CongestionControl, for testing purposes
320
    #[cfg(test)]
321
20
    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
322
20
        self.outbound.send_window_and_expected_tags()
323
20
    }
324

            
325
    /// Return a mutable reference to our CongestionControl object.
326
27276
    pub(crate) fn ccontrol(&self) -> MutexGuard<'_, CongestionControl> {
327
27276
        self.outbound.ccontrol().lock().expect("poisoned lock")
328
27276
    }
329

            
330
    /// Return a reference to our CircHopOutbound object.
331
36
    pub(crate) fn outbound(&self) -> &CircHopOutbound {
332
36
        &self.outbound
333
36
    }
334

            
335
    /// We're about to send `msg`.
336
    ///
337
    /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
338
    //
339
    // TODO prop340: This should take a cell or similar, not a message.
340
4156
    pub(crate) fn about_to_send(&mut self, stream_id: StreamId, msg: &AnyRelayMsg) -> Result<()> {
341
4156
        self.outbound.about_to_send(self.unique_id, stream_id, msg)
342
4156
    }
343

            
344
    /// Add an entry to this map using the specified StreamId.
345
    #[cfg(feature = "hs-service")]
346
36
    pub(crate) fn add_ent_with_id(
347
36
        &self,
348
36
        sink: StreamQueueSender,
349
36
        rx: StreamMpscReceiver<AnyRelayMsg>,
350
36
        rate_limit_updater: watch::Sender<StreamRateLimit>,
351
36
        drain_rate_requester: NotifySender<DrainRateRequest>,
352
36
        stream_id: StreamId,
353
36
        cmd_checker: AnyCmdChecker,
354
36
    ) -> Result<()> {
355
36
        self.outbound.add_ent_with_id(
356
36
            sink,
357
36
            rx,
358
36
            rate_limit_updater,
359
36
            drain_rate_requester,
360
36
            stream_id,
361
36
            cmd_checker,
362
        )
363
36
    }
364

            
365
    /// Note that we received an END message (or other message indicating the end of
366
    /// the stream) on the stream with `id`.
367
    ///
368
    /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
369
    #[cfg(feature = "hs-service")]
370
    pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
371
        self.outbound.ending_msg_received(stream_id)
372
    }
373

            
374
    /// Parse a RELAY or RELAY_EARLY cell body.
375
    ///
376
    /// Requires that the cryptographic checks on the message have already been
377
    /// performed
378
500
    pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
379
500
        self.inbound.decode(cell)
380
500
    }
381

            
382
    /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
383
    ///
384
    /// Returns back the provided `msg`, if the message is an incoming stream request
385
    /// that needs to be handled by the calling code.
386
    ///
387
    // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
388
    // back and forth like this.
389
268
    pub(super) fn handle_msg(
390
268
        &self,
391
268
        hop_detail: &PathEntry,
392
268
        cell_counts_toward_windows: bool,
393
268
        streamid: StreamId,
394
268
        msg: UnparsedRelayMsg,
395
268
        now: Instant,
396
268
    ) -> Result<Option<UnparsedRelayMsg>> {
397
268
        let possible_proto_violation_err = |streamid: StreamId| Error::UnknownStream {
398
4
            src: sv(hop_detail.clone()),
399
4
            streamid,
400
4
        };
401

            
402
268
        self.outbound.handle_msg(
403
268
            possible_proto_violation_err,
404
268
            cell_counts_toward_windows,
405
268
            streamid,
406
268
            msg,
407
268
            now,
408
        )
409
268
    }
410

            
411
    /// Get the stream map of this hop.
412
39550
    pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
413
39550
        self.outbound.stream_map()
414
39550
    }
415

            
416
    /// Set the stream map of this hop to `map`.
417
    ///
418
    /// Returns an error if the existing stream map of the hop has any open stream.
419
104
    pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
420
104
        self.outbound.set_stream_map(map)
421
104
    }
422

            
423
    /// Decrement the limit of outbound cells that may be sent to this hop; give
424
    /// an error if it would reach zero.
425
4594
    pub(crate) fn decrement_outbound_cell_limit(&mut self) -> Result<()> {
426
4594
        self.outbound.decrement_cell_limit()
427
4594
    }
428

            
429
    /// Decrement the limit of inbound cells that may be received from this hop; give
430
    /// an error if it would reach zero.
431
500
    pub(crate) fn decrement_inbound_cell_limit(&mut self) -> Result<()> {
432
500
        self.inbound.decrement_cell_limit()
433
500
    }
434
}