1
//! Module exposing structures relating to the reactor's view of a circuit's hops.
2

            
3
use super::{CircuitCmd, CloseStreamBehavior};
4
use crate::circuit::circhop::{
5
    CircHopInbound, CircHopOutbound, HopSettings, ReactorStreamComponents, SendRelayCell,
6
};
7
use crate::client::reactor::circuit::path::PathEntry;
8
use crate::congestion::CongestionControl;
9
use crate::crypto::cell::HopNum;
10
use crate::memquota::StreamAccount;
11
use crate::stream::cmdcheck::AnyCmdChecker;
12
use crate::streammap::{self, StreamEntMut, StreamMap};
13
use crate::tunnel::TunnelScopedCircId;
14
use crate::util::tunnel_activity::TunnelActivity;
15
use crate::{Error, Result};
16

            
17
use futures::Stream;
18
use futures::stream::FuturesUnordered;
19
use smallvec::SmallVec;
20
use tor_cell::chancell::BoxedCellBody;
21
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
22
use tor_cell::relaycell::msg::AnyRelayMsg;
23
use tor_cell::relaycell::{
24
    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, StreamId,
25
    UnparsedRelayMsg,
26
};
27
use tor_rtcompat::DynTimeProvider;
28
use web_time_compat::Instant;
29

            
30
use safelog::sensitive as sv;
31
use tor_error::Bug;
32
use tracing::instrument;
33

            
34
use std::result::Result as StdResult;
35
use std::sync::{Arc, Mutex, MutexGuard};
36
use std::task::Poll;
37

            
38
#[cfg(test)]
39
use tor_cell::relaycell::msg::SendmeTag;
40

            
41
/// The "usual" number of hops in a [`CircHopList`].
42
///
43
/// This saves us a heap allocation when the number of hops is less than or equal to this value.
44
const NUM_HOPS: usize = 3;
45

            
46
/// Represents the reactor's view of a circuit's hop.
47
#[derive(Default)]
48
pub(crate) struct CircHopList {
49
    /// The list of hops.
50
    hops: SmallVec<[CircHop; NUM_HOPS]>,
51
}
52

            
53
impl CircHopList {
54
    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
55
4106
    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
56
4106
        self.hops.get(Into::<usize>::into(hopnum))
57
4106
    }
58

            
59
    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
60
6448
    pub(super) fn get_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
61
6448
        self.hops.get_mut(Into::<usize>::into(hopnum))
62
6448
    }
63

            
64
    /// Append the specified hop.
65
1020
    pub(crate) fn push(&mut self, hop: CircHop) {
66
1020
        self.hops.push(hop);
67
1020
    }
68

            
69
    /// Returns `true` if the list contains no [`CircHop`]s.
70
4876
    pub(crate) fn is_empty(&self) -> bool {
71
4876
        self.hops.is_empty()
72
4876
    }
73

            
74
    /// Returns the number of hops in the list.
75
2292
    pub(crate) fn len(&self) -> usize {
76
2292
        self.hops.len()
77
2292
    }
78

            
79
    /// Returns a [`Stream`] of [`CircuitCmd`] to poll from the main loop.
80
    ///
81
    /// The iterator contains at most one [`CircuitCmd`] for each hop,
82
    /// representing the instructions for handling the ready-item, if any,
83
    /// of its highest priority stream.
84
    ///
85
    /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
86
    /// To avoid contention, never create more than one
87
    /// [`ready_streams_iterator`](Self::ready_streams_iterator)
88
    /// stream at a time!
89
    ///
90
    /// This is cancellation-safe.
91
7040
    pub(in crate::client::reactor) fn ready_streams_iterator(
92
7040
        &self,
93
7040
        exclude: Option<HopNum>,
94
7040
    ) -> impl Stream<Item = CircuitCmd> + use<> {
95
7040
        self.hops
96
7040
            .iter()
97
7040
            .enumerate()
98
23892
            .filter_map(|(i, hop)| {
99
20372
                let hop_num = HopNum::from(i as u8);
100

            
101
20372
                if exclude == Some(hop_num) {
102
                    // We must skip polling this hop
103
1478
                    return None;
104
18894
                }
105

            
106
18894
                if !hop.ccontrol().can_send() {
107
                    // We can't send anything on this hop that counts towards SENDME windows.
108
                    //
109
                    // In theory we could send messages that don't count towards
110
                    // windows (like `RESOLVE`), and process end-of-stream
111
                    // events (to send an `END`), but it's probably not worth
112
                    // doing an O(N) iteration over flow-control-ready streams
113
                    // to see if that's the case.
114
                    //
115
                    // This *doesn't* block outgoing flow-control messages (e.g.
116
                    // SENDME), which are initiated via the control-message
117
                    // channel, handled above.
118
                    //
119
                    // TODO: Consider revisiting. OTOH some extra throttling when circuit-level
120
                    // congestion control has "bottomed out" might not be so bad, and the
121
                    // alternatives have complexity and/or performance costs.
122
                    return None;
123
18894
                }
124

            
125
18894
                let hop_map = Arc::clone(self.hops[i].stream_map());
126
18954
                Some(futures::future::poll_fn(move |cx| {
127
                    // Process an outbound message from the first ready stream on
128
                    // this hop. The stream map implements round robin scheduling to
129
                    // ensure fairness across streams.
130
                    // TODO: Consider looping here to process multiple ready
131
                    // streams. Need to be careful though to balance that with
132
                    // continuing to service incoming and control messages.
133
18954
                    let mut hop_map = hop_map.lock().expect("lock poisoned");
134
18954
                    let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
135
                        // No ready streams for this hop.
136
14722
                        return Poll::Pending;
137
                    };
138

            
139
4232
                    if msg.is_none() {
140
52
                        return Poll::Ready(CircuitCmd::CloseStream {
141
52
                            hop: hop_num,
142
52
                            sid,
143
52
                            behav: CloseStreamBehavior::default(),
144
52
                            reason: streammap::TerminateReason::StreamTargetClosed,
145
52
                        });
146
4180
                    };
147
4180
                    let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
148

            
149
                    #[allow(unused)] // unused in non-debug builds
150
4180
                    let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
151
                        panic!("Stream {sid} disappeared");
152
                    };
153

            
154
4180
                    debug_assert!(
155
4180
                        s.can_send(&msg),
156
                        "Stream {sid} produced a message it can't send: {msg:?}"
157
                    );
158

            
159
4180
                    let cell = SendRelayCell {
160
4180
                        hop: Some(hop_num),
161
4180
                        early: false,
162
4180
                        cell: AnyRelayMsgOuter::new(Some(sid), msg),
163
4180
                    };
164
4180
                    Poll::Ready(CircuitCmd::Send(cell))
165
18954
                }))
166
20372
            })
167
7040
            .collect::<FuturesUnordered<_>>()
168
7040
    }
169

            
170
    /// Remove all halfstreams that are expired at `now`.
171
7040
    pub(super) fn remove_expired_halfstreams(&mut self, now: Instant) {
172
20372
        for hop in self.hops.iter_mut() {
173
20372
            hop.stream_map()
174
20372
                .lock()
175
20372
                .expect("lock poisoned")
176
20372
                .remove_expired_halfstreams(now);
177
20372
        }
178
7040
    }
179

            
180
    /// Returns true if there are any streams on this circuit
181
    ///
182
    /// Important: this function locks the stream map of its each of the [`CircHop`]s
183
    /// in this circuit, so it must **not** be called from any function where the
184
    /// stream map lock is held (such as [`ready_streams_iterator`](Self::ready_streams_iterator).
185
52
    pub(super) fn has_streams(&self) -> bool {
186
182
        self.hops.iter().any(|hop| {
187
156
            hop.stream_map()
188
156
                .lock()
189
156
                .expect("lock poisoned")
190
156
                .n_open_streams()
191
156
                > 0
192
156
        })
193
52
    }
194

            
195
    /// Return the most active [`TunnelActivity`] for any hop on this `CircHopList`.
196
    pub(crate) fn tunnel_activity(&self) -> TunnelActivity {
197
        self.hops
198
            .iter()
199
            .map(|hop| {
200
                hop.stream_map()
201
                    .lock()
202
                    .expect("Poisoned lock")
203
                    .tunnel_activity()
204
            })
205
            .max()
206
            .unwrap_or_else(TunnelActivity::never_used)
207
    }
208
}
209

            
210
/// Represents the reactor's view of a single hop.
211
pub(crate) struct CircHop {
212
    /// The unique ID of the circuit. Used for logging.
213
    unique_id: TunnelScopedCircId,
214
    /// Hop number in the path.
215
    hop_num: HopNum,
216
    /// The inbound state of the hop.
217
    ///
218
    /// Used for processing cells received from this hop.
219
    inbound: CircHopInbound,
220
    /// The outbound state of the hop.
221
    ///
222
    /// Used for preparing cells to send to this hop.
223
    outbound: CircHopOutbound,
224
}
225

            
226
impl CircHop {
227
    /// Create a new hop.
228
1020
    pub(crate) fn new(
229
1020
        unique_id: TunnelScopedCircId,
230
1020
        hop_num: HopNum,
231
1020
        settings: &HopSettings,
232
1020
    ) -> Self {
233
1020
        let relay_format = settings.relay_crypt_protocol().relay_cell_format();
234

            
235
1020
        let ccontrol = Arc::new(Mutex::new(CongestionControl::new(&settings.ccontrol)));
236
1020
        let inbound = CircHopInbound::new(RelayCellDecoder::new(relay_format), settings);
237

            
238
1020
        let outbound = CircHopOutbound::new(
239
1020
            ccontrol,
240
1020
            relay_format,
241
1020
            Arc::new(settings.flow_ctrl_params.clone()),
242
1020
            settings,
243
        );
244

            
245
1020
        CircHop {
246
1020
            unique_id,
247
1020
            hop_num,
248
1020
            inbound,
249
1020
            outbound,
250
1020
        }
251
1020
    }
252

            
253
    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
254
    /// `message` to the provided hop.
255
96
    pub(crate) fn begin_stream(
256
96
        &mut self,
257
96
        message: AnyRelayMsg,
258
96
        time_prov: &DynTimeProvider,
259
96
        cmd_checker: AnyCmdChecker,
260
96
        memquota: &StreamAccount,
261
96
    ) -> Result<(SendRelayCell, StreamId, ReactorStreamComponents)> {
262
96
        self.outbound.begin_stream(
263
96
            Some(self.hop_num),
264
96
            message,
265
96
            time_prov,
266
96
            cmd_checker,
267
96
            memquota,
268
        )
269
96
    }
270

            
271
    /// Close the stream associated with `id` because the stream was
272
    /// dropped.
273
    ///
274
    /// See [`CircHopOutbound::close_stream`].
275
64
    pub(crate) fn close_stream(
276
64
        &mut self,
277
64
        id: StreamId,
278
64
        message: CloseStreamBehavior,
279
64
        why: streammap::TerminateReason,
280
64
        expiry: Instant,
281
64
    ) -> Result<Option<SendRelayCell>> {
282
64
        self.outbound
283
64
            .close_stream(self.unique_id, id, Some(self.hop_num), message, why, expiry)
284
64
    }
285

            
286
    /// Check if we should send an XON message.
287
    ///
288
    /// If we should, then returns the XON message that should be sent.
289
    #[instrument(level = "trace", skip_all)]
290
    pub(crate) fn maybe_send_xon(
291
        &mut self,
292
        rate: XonKbpsEwma,
293
        id: StreamId,
294
    ) -> Result<Option<Xon>> {
295
        self.outbound.maybe_send_xon(rate, id)
296
    }
297

            
298
    /// Check if we should send an XOFF message.
299
    ///
300
    /// If we should, then returns the XOFF message that should be sent.
301
216
    pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
302
216
        self.outbound.maybe_send_xoff(id)
303
216
    }
304

            
305
    /// Return the format that is used for relay cells sent to this hop.
306
    ///
307
    /// For the most part, this format isn't necessary to interact with a CircHop;
308
    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
309
4720
    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
310
4720
        self.outbound.relay_cell_format()
311
4720
    }
312

            
313
    /// Delegate to CongestionControl, for testing purposes
314
    #[cfg(test)]
315
20
    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
316
20
        self.outbound.send_window_and_expected_tags()
317
20
    }
318

            
319
    /// Return a mutable reference to our CongestionControl object.
320
27236
    pub(crate) fn ccontrol(&self) -> MutexGuard<'_, CongestionControl> {
321
27236
        self.outbound.ccontrol().lock().expect("poisoned lock")
322
27236
    }
323

            
324
    /// Return a reference to our CircHopOutbound object.
325
36
    pub(crate) fn outbound(&self) -> &CircHopOutbound {
326
36
        &self.outbound
327
36
    }
328

            
329
    /// We're about to send `msg`.
330
    ///
331
    /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
332
    //
333
    // TODO prop340: This should take a cell or similar, not a message.
334
4156
    pub(crate) fn about_to_send(&mut self, stream_id: StreamId, msg: &AnyRelayMsg) -> Result<()> {
335
4156
        self.outbound.about_to_send(self.unique_id, stream_id, msg)
336
4156
    }
337

            
338
    /// Add an entry to this map using the specified StreamId.
339
    #[cfg(feature = "hs-service")]
340
36
    pub(crate) fn add_ent_with_id(
341
36
        &self,
342
36
        time_prov: &DynTimeProvider,
343
36
        stream_id: StreamId,
344
36
        cmd_checker: AnyCmdChecker,
345
36
        memquota: &StreamAccount,
346
36
    ) -> Result<ReactorStreamComponents> {
347
36
        self.outbound
348
36
            .add_ent_with_id(time_prov, stream_id, cmd_checker, memquota)
349
36
    }
350

            
351
    /// Note that we received an END message (or other message indicating the end of
352
    /// the stream) on the stream with `id`.
353
    ///
354
    /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
355
    #[cfg(feature = "hs-service")]
356
    pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
357
        self.outbound.ending_msg_received(stream_id)
358
    }
359

            
360
    /// Parse a RELAY or RELAY_EARLY cell body.
361
    ///
362
    /// Requires that the cryptographic checks on the message have already been
363
    /// performed
364
500
    pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
365
500
        self.inbound.decode(cell)
366
500
    }
367

            
368
    /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
369
    ///
370
    /// Returns back the provided `msg`, if the message is an incoming stream request
371
    /// that needs to be handled by the calling code.
372
    ///
373
    // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
374
    // back and forth like this.
375
268
    pub(super) fn handle_msg(
376
268
        &self,
377
268
        hop_detail: &PathEntry,
378
268
        cell_counts_toward_windows: bool,
379
268
        streamid: StreamId,
380
268
        msg: UnparsedRelayMsg,
381
268
        now: Instant,
382
268
    ) -> Result<Option<UnparsedRelayMsg>> {
383
268
        let possible_proto_violation_err = |streamid: StreamId| Error::UnknownStream {
384
4
            src: sv(hop_detail.clone()),
385
4
            streamid,
386
4
        };
387

            
388
268
        self.outbound.handle_msg(
389
268
            possible_proto_violation_err,
390
268
            cell_counts_toward_windows,
391
268
            streamid,
392
268
            msg,
393
268
            now,
394
        )
395
268
    }
396

            
397
    /// Get the stream map of this hop.
398
39482
    pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
399
39482
        self.outbound.stream_map()
400
39482
    }
401

            
402
    /// Set the stream map of this hop to `map`.
403
    ///
404
    /// Returns an error if the existing stream map of the hop has any open stream.
405
104
    pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
406
104
        self.outbound.set_stream_map(map)
407
104
    }
408

            
409
    /// Decrement the limit of outbound cells that may be sent to this hop; give
410
    /// an error if it would reach zero.
411
4588
    pub(crate) fn decrement_outbound_cell_limit(&mut self) -> Result<()> {
412
4588
        self.outbound.decrement_cell_limit()
413
4588
    }
414

            
415
    /// Decrement the limit of inbound cells that may be received from this hop; give
416
    /// an error if it would reach zero.
417
500
    pub(crate) fn decrement_inbound_cell_limit(&mut self) -> Result<()> {
418
500
        self.inbound.decrement_cell_limit()
419
500
    }
420
}