1
//! Module exposing structures relating to a reactor's view of a circuit hop.
2

            
3
// TODO(relay): don't import from the client module
4
use crate::client::circuit::handshake::RelayCryptLayerProtocol;
5

            
6
use crate::ccparams::CongestionControlParams;
7
use crate::circuit::CircParameters;
8
use crate::congestion::{CongestionControl, sendme};
9
use crate::memquota::{SpecificAccount, StreamAccount};
10
use crate::stream::CloseStreamBehavior;
11
use crate::stream::SEND_WINDOW_INIT;
12
use crate::stream::StreamMpscSender;
13
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
14
use crate::stream::flow_ctrl::params::FlowCtrlParameters;
15
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl, StreamRateLimit};
16
use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
17
use crate::stream::queue::{StreamQueueReceiver, stream_queue};
18
use crate::streammap::{
19
    self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut, StreamMap,
20
};
21
use crate::util::notify::{NotifyReceiver, NotifySender};
22
use crate::{Error, HopNum, Result};
23

            
24
use derive_deftly::Deftly;
25
use postage::watch;
26
use safelog::sensitive as sv;
27
use tracing::{debug, trace};
28

            
29
use tor_cell::chancell::BoxedCellBody;
30
use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
31
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
32
use tor_cell::relaycell::msg::AnyRelayMsg;
33
use tor_cell::relaycell::{
34
    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
35
    StreamId, UnparsedRelayMsg,
36
};
37
use tor_error::{Bug, internal};
38
use tor_memquota::derive_deftly_template_HasMemoryCost;
39
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
40
use tor_protover::named;
41
use tor_rtcompat::DynTimeProvider;
42

            
43
use std::num::NonZeroU32;
44
use std::pin::Pin;
45
use std::result::Result as StdResult;
46
use std::sync::{Arc, Mutex};
47
use web_time_compat::Instant;
48

            
49
#[cfg(test)]
50
use tor_cell::relaycell::msg::SendmeTag;
51

            
52
use cfg_if::cfg_if;
53

            
54
/// The size of the stream's outbound RELAY message queue.
55
// TODO(tuning): figure out if this is a good size for this buffer
56
const CIRCUIT_BUFFER_SIZE: usize = 128;
57

            
58
/// Type of negotiation that we'll be performing as we establish a hop.
59
///
60
/// Determines what flavor of extensions we can send and receive, which in turn
61
/// limits the hop settings we can negotiate.
62
///
63
// TODO-CGO: This is likely to be refactored when we finally add support for
64
// HsV3+CGO, which will require refactoring
65
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
66
pub(crate) enum HopNegotiationType {
67
    /// We're using a handshake in which extension-based negotiation cannot occur.
68
    None,
69
    /// We're using the HsV3-ntor handshake, in which the client can send extensions,
70
    /// but the server cannot.
71
    ///
72
    /// As a special case, the default relay encryption protocol is the hsv3
73
    /// variant of Tor1.
74
    //
75
    // We would call this "HalfDuplex" or something, but we do not expect to add
76
    // any more handshakes of this type.
77
    HsV3,
78
    /// We're using a handshake in which both client and relay can send extensions.
79
    Full,
80
}
81

            
82
/// The settings we use for single hop of a circuit.
83
///
84
/// Unlike [`CircParameters`], this type is crate-internal.
85
/// We construct it based on our settings from the circuit,
86
/// and from the hop's actual capabilities.
87
/// Then, we negotiate with the hop as part of circuit
88
/// creation/extension to determine the actual settings that will be in use.
89
/// Finally, we use those settings to construct the negotiated circuit hop.
90
//
91
// TODO: Relays should probably derive an instance of this type too, as
92
// part of the circuit creation handshake.
93
#[derive(Clone, Debug)]
94
pub(crate) struct HopSettings {
95
    /// The negotiated congestion control settings for this hop .
96
    pub(crate) ccontrol: CongestionControlParams,
97

            
98
    /// Flow control parameters that will be used for streams on this hop.
99
    pub(crate) flow_ctrl_params: FlowCtrlParameters,
100

            
101
    /// Maximum number of permitted incoming relay cells for this hop.
102
    pub(crate) n_incoming_cells_permitted: Option<u32>,
103

            
104
    /// Maximum number of permitted outgoing relay cells for this hop.
105
    pub(crate) n_outgoing_cells_permitted: Option<u32>,
106

            
107
    /// The relay cell encryption algorithm and cell format for this hop.
108
    relay_crypt_protocol: RelayCryptLayerProtocol,
109
}
110

            
111
impl HopSettings {
112
    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
113
    /// and `caps` (a set of protocol capabilities for a circuit target).
114
    ///
115
    /// The resulting settings will represent what the client would prefer to negotiate
116
    /// (determined by `params`),
117
    /// as modified by what the target relay is believed to support (represented by `caps`).
118
    ///
119
    /// This represents the `HopSettings` in a pre-negotiation state:
120
    /// the circuit negotiation process will modify it.
121
    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
122
1104
    pub(crate) fn from_params_and_caps(
123
1104
        hoptype: HopNegotiationType,
124
1104
        params: &CircParameters,
125
1104
        caps: &tor_protover::Protocols,
126
1104
    ) -> Result<Self> {
127
1104
        let mut ccontrol = params.ccontrol.clone();
128
1104
        match ccontrol.alg() {
129
720
            crate::ccparams::Algorithm::FixedWindow(_) => {}
130
            crate::ccparams::Algorithm::Vegas(_) => {
131
                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
132
384
                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
133
                    ccontrol.use_fallback_alg();
134
384
                }
135
            }
136
        };
137
1104
        if hoptype == HopNegotiationType::None {
138
84
            ccontrol.use_fallback_alg();
139
1020
        } else if hoptype == HopNegotiationType::HsV3 {
140
            // TODO #2037, TODO-CGO: We need a way to send congestion control extensions
141
            // in this case too.  But since we aren't sending them, we
142
            // should use the fallback algorithm.
143
            ccontrol.use_fallback_alg();
144
1020
        }
145
1104
        let ccontrol = ccontrol; // drop mut
146

            
147
        // Negotiate CGO if it is supported, if CC is also supported,
148
        // and if CGO is available on this relay.
149
1104
        let relay_crypt_protocol = match hoptype {
150
84
            HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
151
            HopNegotiationType::HsV3 => {
152
                // TODO-CGO: Support CGO when available.
153
                cfg_if! {
154
                    if #[cfg(feature = "hs-common")] {
155
                        RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
156
                    } else {
157
                        return Err(
158
                            tor_error::internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
159
                        );
160
                    }
161
                }
162
            }
163
            HopNegotiationType::Full => {
164
                cfg_if! {
165
                    if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
166
                        #[allow(clippy::overly_complex_bool_expr)]
167
1020
                        if  ccontrol.alg().compatible_with_cgo()
168
384
                            && caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
169
                            && caps.supports_named_subver(named::RELAY_CRYPT_CGO)
170
                        {
171
                            RelayCryptLayerProtocol::Cgo
172
                        } else {
173
1020
                            RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
174
                        }
175
                    } else {
176
                        RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
177
                    }
178
                }
179
            }
180
        };
181

            
182
1104
        Ok(Self {
183
1104
            ccontrol,
184
1104
            flow_ctrl_params: params.flow_ctrl.clone(),
185
1104
            relay_crypt_protocol,
186
1104
            n_incoming_cells_permitted: params.n_incoming_cells_permitted,
187
1104
            n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
188
1104
        })
189
1104
    }
190

            
191
    /// Return the negotiated relay crypto protocol.
192
1208
    pub(crate) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
193
1208
        self.relay_crypt_protocol
194
1208
    }
195

            
196
    /// Return the client circuit-creation extensions that we should use in order to negotiate
197
    /// these circuit hop parameters.
198
    #[allow(clippy::unnecessary_wraps)]
199
36
    pub(crate) fn circuit_request_extensions(&self) -> Result<Vec<CircRequestExt>> {
200
        // allow 'unused_mut' because of the combinations of `cfg` conditions below
201
        #[allow(unused_mut)]
202
36
        let mut client_extensions = Vec::new();
203

            
204
        #[allow(unused, unused_mut)]
205
36
        let mut cc_extension_set = false;
206

            
207
36
        if self.ccontrol.is_enabled() {
208
12
            cfg_if::cfg_if! {
209
12
                if #[cfg(feature = "flowctl-cc")] {
210
12
                    client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
211
12
                    cc_extension_set = true;
212
12
                } else {
213
12
                    return Err(
214
12
                        tor_error::internal!(
215
12
                            "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
216
12
                        )
217
12
                        .into()
218
12
                    );
219
12
                }
220
12
            }
221
24
        }
222

            
223
        // See whether we need to send a list of required protocol capabilities.
224
        // These aren't "negotiated" per se; they're simply demanded.
225
        // The relay will refuse the circuit if it doesn't support all of them,
226
        // and if any of them isn't supported in the SubprotocolRequest extension.
227
        //
228
        // (In other words, don't add capabilities here just because you want the
229
        // relay to have them! They must be explicitly listed as supported for use
230
        // with this extension. For the current list, see
231
        // https://spec.torproject.org/tor-spec/create-created-cells.html#subproto-request)
232
        //
233
        #[allow(unused_mut)]
234
36
        let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
235

            
236
        #[cfg(feature = "counter-galois-onion")]
237
36
        if matches!(self.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
238
            if !cc_extension_set {
239
                return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
240
            }
241
            required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
242
36
        }
243

            
244
36
        if !required_protocol_capabilities.is_empty() {
245
            client_extensions.push(CircRequestExt::SubprotocolRequest(
246
                required_protocol_capabilities.into_iter().collect(),
247
            ));
248
36
        }
249

            
250
36
        Ok(client_extensions)
251
36
    }
252
}
253

            
254
#[cfg(test)]
255
impl std::default::Default for CircParameters {
256
322
    fn default() -> Self {
257
322
        Self {
258
322
            extend_by_ed25519_id: true,
259
322
            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
260
322
            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
261
322
            n_incoming_cells_permitted: None,
262
322
            n_outgoing_cells_permitted: None,
263
322
        }
264
322
    }
265
}
266

            
267
impl CircParameters {
268
    /// Constructor
269
767
    pub fn new(
270
767
        extend_by_ed25519_id: bool,
271
767
        ccontrol: CongestionControlParams,
272
767
        flow_ctrl: FlowCtrlParameters,
273
767
    ) -> Self {
274
767
        Self {
275
767
            extend_by_ed25519_id,
276
767
            ccontrol,
277
767
            flow_ctrl,
278
767
            n_incoming_cells_permitted: None,
279
767
            n_outgoing_cells_permitted: None,
280
767
        }
281
767
    }
282
}
283

            
284
/// Instructions for sending a RELAY cell.
285
///
286
/// This instructs a circuit reactor to send a RELAY cell to a given target
287
/// (a hop, if we are a client, or the client, if we are a relay).
288
#[derive(educe::Educe)]
289
#[educe(Debug)]
290
pub(crate) struct SendRelayCell {
291
    /// The hop number, or `None` if we are a relay.
292
    pub(crate) hop: Option<HopNum>,
293
    /// Whether to use a RELAY_EARLY cell.
294
    pub(crate) early: bool,
295
    /// The cell to send.
296
    pub(crate) cell: AnyRelayMsgOuter,
297
}
298

            
299
/// The inbound state of a hop.
300
pub(crate) struct CircHopInbound {
301
    /// Decodes relay cells received from this hop.
302
    decoder: RelayCellDecoder,
303
    /// Remaining permitted incoming relay cells from this hop, plus 1.
304
    ///
305
    /// (In other words, `None` represents no limit,
306
    /// `Some(1)` represents an exhausted limit,
307
    /// and `Some(n)` means that n-1 more cells may be received.)
308
    ///
309
    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
310
    n_incoming_cells_permitted: Option<NonZeroU32>,
311
}
312

            
313
/// The outbound state of a hop.
314
pub(crate) struct CircHopOutbound {
315
    /// Congestion control object.
316
    ///
317
    /// This object is also in charge of handling circuit level SENDME logic for this hop.
318
    ccontrol: Arc<Mutex<CongestionControl>>,
319
    /// Map from stream IDs to streams.
320
    ///
321
    /// We store this with the reactor instead of the circuit, since the
322
    /// reactor needs it for every incoming cell on a stream, whereas
323
    /// the circuit only needs it when allocating new streams.
324
    ///
325
    /// NOTE: this is behind a mutex because the client reactor polls the `StreamMap`s
326
    /// of all hops concurrently, in a `FuturesUnordered`. Without the mutex,
327
    /// this wouldn't be possible, because it would mean holding multiple
328
    /// mutable references to `self` (the reactor). Note, however,
329
    /// that there should never be any contention on this mutex:
330
    /// we never create more than one
331
    /// `CircHopList::ready_streams_iterator()` stream
332
    /// at a time, and we never clone/lock the hop's `StreamMap` outside of it.
333
    ///
334
    /// Additionally, the stream map of the last hop (join point) of a conflux tunnel
335
    /// is shared with all the circuits in the tunnel.
336
    map: Arc<Mutex<StreamMap>>,
337
    /// Format to use for relay cells.
338
    //
339
    // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
340
    relay_format: RelayCellFormat,
341
    /// Flow control parameters for new streams.
342
    flow_ctrl_params: Arc<FlowCtrlParameters>,
343
    /// Remaining permitted outgoing relay cells from this hop, plus 1.
344
    ///
345
    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
346
    n_outgoing_cells_permitted: Option<NonZeroU32>,
347
}
348

            
349
impl CircHopInbound {
350
    /// Create a new [`CircHopInbound`].
351
1056
    pub(crate) fn new(decoder: RelayCellDecoder, settings: &HopSettings) -> Self {
352
1056
        Self {
353
1056
            decoder,
354
1056
            n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
355
1056
        }
356
1056
    }
357

            
358
    /// Parse a RELAY or RELAY_EARLY cell body.
359
    ///
360
    /// Requires that the cryptographic checks on the message have already been
361
    /// performed
362
532
    pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
363
532
        self.decoder
364
532
            .decode(cell)
365
532
            .map_err(|e| Error::from_bytes_err(e, "relay cell"))
366
532
    }
367

            
368
    /// Decrement the limit of inbound cells that may be received from this hop; give
369
    /// an error if it would reach zero.
370
532
    pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
371
532
        try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
372
532
            .map_err(|_| Error::ExcessInboundCells)
373
532
    }
374
}
375

            
376
impl CircHopOutbound {
377
    /// Create a new [`CircHopOutbound`].
378
1024
    pub(crate) fn new(
379
1024
        ccontrol: Arc<Mutex<CongestionControl>>,
380
1024
        relay_format: RelayCellFormat,
381
1024
        flow_ctrl_params: Arc<FlowCtrlParameters>,
382
1024
        settings: &HopSettings,
383
1024
    ) -> Self {
384
1024
        Self {
385
1024
            ccontrol,
386
1024
            map: Arc::new(Mutex::new(StreamMap::new())),
387
1024
            relay_format,
388
1024
            flow_ctrl_params,
389
1024
            n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
390
1024
        }
391
1024
    }
392

            
393
    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
394
    /// `message` to the provided hop.
395
96
    pub(crate) fn begin_stream(
396
96
        &mut self,
397
96
        hop: Option<HopNum>,
398
96
        message: AnyRelayMsg,
399
96
        time_prov: &DynTimeProvider,
400
96
        cmd_checker: AnyCmdChecker,
401
96
        memquota: &StreamAccount,
402
96
    ) -> Result<(SendRelayCell, StreamId, ReactorStreamComponents)> {
403
        // TODO: This has a lot of duplicated code with `Self::add_ent_with_id()`.
404

            
405
        // A channel for the reactor to inform the writer of a new rate limit.
406
96
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
407

            
408
        // A channel for the reactor to request a new drain rate from the reader.
409
        // Typically this notification will be sent after an XOFF is sent so that the reader can
410
        // send us a new drain rate when the stream data queue becomes empty.
411
96
        let mut drain_rate_request_tx = NotifySender::new_typed();
412
96
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
413

            
414
96
        let flow_ctrl = self.build_flow_ctrl(rate_limit_tx, drain_rate_request_tx)?;
415

            
416
96
        let stream_queue_max_len = flow_ctrl.inbound_queue_max_len();
417

            
418
        // A queue for inbound RELAY messages.
419
96
        let (sender, receiver) = stream_queue(stream_queue_max_len, memquota, time_prov)?;
420

            
421
        // A queue for outbound RELAY messages.
422
96
        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
423
96
            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
424

            
425
96
        let r = self.map.lock().expect("lock poisoned").add_ent(
426
96
            sender,
427
96
            msg_rx,
428
96
            flow_ctrl,
429
96
            cmd_checker,
430
96
        )?;
431
96
        let cell = AnyRelayMsgOuter::new(Some(r), message);
432

            
433
96
        let stream_components = ReactorStreamComponents {
434
96
            stream_inbound_rx: receiver,
435
96
            stream_outbound_tx: msg_tx,
436
96
            rate_limit_rx,
437
96
            drain_rate_request_rx,
438
96
        };
439

            
440
96
        Ok((
441
96
            SendRelayCell {
442
96
                hop,
443
96
                early: false,
444
96
                cell,
445
96
            },
446
96
            r,
447
96
            stream_components,
448
96
        ))
449
96
    }
450

            
451
    /// Close the stream associated with `id` because the stream was dropped.
452
    ///
453
    /// If we have not already received an END cell on this stream, send one.
454
    /// If no END cell is specified, an END cell with the reason byte set to
455
    /// REASON_MISC will be sent.
456
    ///
457
    // Note(relay): `circ_id` is an opaque displayable type
458
    // because relays use a different circuit ID type
459
    // than clients. Eventually, we should probably make
460
    // them both use the same ID type, or have a nicer approach here
461
68
    pub(crate) fn close_stream(
462
68
        &mut self,
463
68
        circ_id: impl std::fmt::Display,
464
68
        id: StreamId,
465
68
        hop: Option<HopNum>,
466
68
        message: CloseStreamBehavior,
467
68
        why: streammap::TerminateReason,
468
68
        expiry: Instant,
469
68
    ) -> Result<Option<SendRelayCell>> {
470
68
        let should_send_end = self
471
68
            .map
472
68
            .lock()
473
68
            .expect("lock poisoned")
474
68
            .terminate(id, why, expiry)?;
475
68
        trace!(
476
            circ_id = %circ_id,
477
            stream_id = %id,
478
            should_send_end = ?should_send_end,
479
            "Ending stream",
480
        );
481
        // TODO: I am about 80% sure that we only send an END cell if
482
        // we didn't already get an END cell.  But I should double-check!
483
68
        if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
484
68
            (should_send_end, message)
485
        {
486
68
            let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
487
68
            let cell = SendRelayCell {
488
68
                hop,
489
68
                early: false,
490
68
                cell: end_cell,
491
68
            };
492

            
493
68
            return Ok(Some(cell));
494
        }
495
        Ok(None)
496
68
    }
497

            
498
    /// Check if we should send an XON message.
499
    ///
500
    /// If we should, then returns the XON message that should be sent.
501
    pub(crate) fn maybe_send_xon(
502
        &mut self,
503
        rate: XonKbpsEwma,
504
        id: StreamId,
505
    ) -> Result<Option<Xon>> {
506
        // the call below will return an error if XON/XOFF aren't supported,
507
        // so we check for support here
508
        if !self
509
            .ccontrol()
510
            .lock()
511
            .expect("poisoned lock")
512
            .uses_xon_xoff()
513
        {
514
            return Ok(None);
515
        }
516

            
517
        let mut map = self.map.lock().expect("lock poisoned");
518
        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
519
            // stream went away
520
            return Ok(None);
521
        };
522

            
523
        ent.maybe_send_xon(rate)
524
    }
525

            
526
    /// Check if we should send an XOFF message.
527
    ///
528
    /// If we should, then returns the XOFF message that should be sent.
529
220
    pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
530
        // the call below will return an error if XON/XOFF aren't supported,
531
        // so we check for support here
532
220
        if !self
533
220
            .ccontrol()
534
220
            .lock()
535
220
            .expect("poisoned lock")
536
220
            .uses_xon_xoff()
537
        {
538
140
            return Ok(None);
539
80
        }
540

            
541
80
        let mut map = self.map.lock().expect("lock poisoned");
542
80
        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
543
            // stream went away
544
12
            return Ok(None);
545
        };
546

            
547
68
        ent.maybe_send_xoff()
548
220
    }
549

            
550
    /// Return the format that is used for relay cells sent to this hop.
551
    ///
552
    /// For the most part, this format isn't necessary to interact with a CircHop;
553
    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
554
4732
    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
555
4732
        self.relay_format
556
4732
    }
557

            
558
    /// Delegate to CongestionControl, for testing purposes
559
    #[cfg(test)]
560
20
    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
561
20
        self.ccontrol()
562
20
            .lock()
563
20
            .expect("poisoned lock")
564
20
            .send_window_and_expected_tags()
565
20
    }
566

            
567
    /// Return the number of open streams on this hop.
568
    ///
569
    /// WARNING: because this locks the stream map mutex,
570
    /// it should never be called from a context where that mutex is already locked.
571
104
    pub(crate) fn n_open_streams(&self) -> usize {
572
104
        self.map.lock().expect("lock poisoned").n_open_streams()
573
104
    }
574

            
575
    /// Return a reference to our CongestionControl object.
576
27644
    pub(crate) fn ccontrol(&self) -> &Arc<Mutex<CongestionControl>> {
577
27644
        &self.ccontrol
578
27644
    }
579

            
580
    /// We're about to send `msg`.
581
    ///
582
    /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
583
    //
584
    // TODO prop340: This should take a cell or similar, not a message.
585
    //
586
    // Note(relay): `circ_id` is an opaque displayable type
587
    // because relays use a different circuit ID type
588
    // than clients. Eventually, we should probably make
589
    // them both use the same ID type, or have a nicer approach here
590
4156
    pub(crate) fn about_to_send(
591
4156
        &mut self,
592
4156
        circ_id: impl std::fmt::Display,
593
4156
        stream_id: StreamId,
594
4156
        msg: &AnyRelayMsg,
595
4156
    ) -> Result<()> {
596
4156
        let mut hop_map = self.map.lock().expect("lock poisoned");
597
4156
        let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
598
            // This can happen when we have outgoing data queued when we received an END.
599
            // We shouldn't return an error here since it would close the circuit along with all
600
            // other streams, and instead we just let the caller send this message anyways.
601
            // Also the caller only calls `about_to_send()` for DATA cells,
602
            // which means that other non-DATA cells don't hit this code path and are always sent,
603
            // and so we should handle all cell types consistently.
604
            // TODO: We should drop the message and not send it,
605
            // but the caller of `about_to_send()` isn't designed to handle fallible sends
606
            // so it would need some refactoring to handle this.
607
            debug!(
608
                circ_id = %circ_id,
609
                stream_id = %stream_id,
610
                "sending a relay cell for non-existent or non-open stream!",
611
            );
612
            return Ok(());
613
        };
614

            
615
4156
        ent.about_to_send(msg)
616
4156
    }
617

            
618
    /// Add an entry to this map using the specified StreamId.
619
    #[cfg(any(feature = "hs-service", feature = "relay"))]
620
40
    pub(crate) fn add_ent_with_id(
621
40
        &self,
622
40
        time_prov: &DynTimeProvider,
623
40
        stream_id: StreamId,
624
40
        cmd_checker: AnyCmdChecker,
625
40
        memquota: &StreamAccount,
626
40
    ) -> Result<ReactorStreamComponents> {
627
        // TODO: This has a lot of duplicated code with `Self::begin_stream()`.
628

            
629
        // A channel for the reactor to inform the writer of a new rate limit.
630
40
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
631

            
632
        // A channel for the reactor to request a new drain rate from the reader.
633
        // Typically this notification will be sent after an XOFF is sent so that the reader can
634
        // send us a new drain rate when the stream data queue becomes empty.
635
40
        let mut drain_rate_request_tx = NotifySender::new_typed();
636
40
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
637

            
638
40
        let flow_ctrl = self.build_flow_ctrl(rate_limit_tx, drain_rate_request_tx)?;
639

            
640
40
        let stream_queue_max_len = flow_ctrl.inbound_queue_max_len();
641

            
642
        // A queue for inbound RELAY messages.
643
40
        let (sender, receiver) = stream_queue(stream_queue_max_len, memquota, time_prov)?;
644

            
645
        // A queue for outbound RELAY messages.
646
40
        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
647
40
            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
648

            
649
40
        let mut hop_map = self.map.lock().expect("lock poisoned");
650
40
        hop_map.add_ent_with_id(sender, msg_rx, flow_ctrl, stream_id, cmd_checker)?;
651

            
652
40
        Ok(ReactorStreamComponents {
653
40
            stream_inbound_rx: receiver,
654
40
            stream_outbound_tx: msg_tx,
655
40
            rate_limit_rx,
656
40
            drain_rate_request_rx,
657
40
        })
658
40
    }
659

            
660
    /// Builds the reactor's flow control handler for a new stream.
661
    // TODO: remove the `Result` once we remove the "flowctl-cc" feature
662
    #[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
663
136
    fn build_flow_ctrl(
664
136
        &self,
665
136
        rate_limit_updater: watch::Sender<StreamRateLimit>,
666
136
        drain_rate_requester: NotifySender<DrainRateRequest>,
667
136
    ) -> Result<StreamFlowCtrl> {
668
136
        let params = Arc::clone(&self.flow_ctrl_params);
669

            
670
136
        if self
671
136
            .ccontrol()
672
136
            .lock()
673
136
            .expect("poisoned lock")
674
136
            .uses_stream_sendme()
675
        {
676
120
            let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
677
120
            Ok(StreamFlowCtrl::new_window(window))
678
        } else {
679
            cfg_if::cfg_if! {
680
                if #[cfg(feature = "flowctl-cc")] {
681
                    // TODO: Currently arti only supports clients, and we don't support connecting
682
                    // to onion services while using congestion control, so we hardcode this. In the
683
                    // future we will need to somehow tell the `CircHop` this so that we can set it
684
                    // correctly, since we don't want to enable this at exits.
685
16
                    let use_sidechannel_mitigations = true;
686

            
687
16
                    Ok(StreamFlowCtrl::new_xon_xoff(
688
16
                        params,
689
16
                        use_sidechannel_mitigations,
690
16
                        rate_limit_updater,
691
16
                        drain_rate_requester,
692
16
                    ))
693
                } else {
694
                    drop(params);
695
                    drop(rate_limit_updater);
696
                    drop(drain_rate_requester);
697
                    Err(internal!(
698
                        "`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
699
                    ).into())
700
                }
701
            }
702
        }
703
136
    }
704

            
705
    /// Deliver `msg` to the specified open stream entry `ent`.
706
216
    fn deliver_msg_to_stream(
707
216
        streamid: StreamId,
708
216
        ent: &mut OpenStreamEnt,
709
216
        cell_counts_toward_windows: bool,
710
216
        msg: UnparsedRelayMsg,
711
216
    ) -> Result<bool> {
712
        use tor_async_utils::SinkTrySend as _;
713
        use tor_async_utils::SinkTrySendError as _;
714

            
715
        // The stream for this message exists, and is open.
716

            
717
        // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
718
        // else we'd never notice them if the stream isn't reading.
719
216
        match msg.cmd() {
720
            RelayCmd::SENDME => {
721
4
                ent.put_for_incoming_sendme(msg)?;
722
4
                return Ok(false);
723
            }
724
            RelayCmd::XON => {
725
                ent.handle_incoming_xon(msg)?;
726
                return Ok(false);
727
            }
728
            RelayCmd::XOFF => {
729
                ent.handle_incoming_xoff(msg)?;
730
                return Ok(false);
731
            }
732
212
            _ => {}
733
        }
734

            
735
212
        let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
736

            
737
212
        if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
738
            if e.is_full() {
739
                cfg_if::cfg_if! {
740
                    if #[cfg(not(feature = "flowctl-cc"))] {
741
                        // If we get here, we either have a logic bug (!), or an attacker
742
                        // is sending us more cells than we asked for via congestion control.
743
                        return Err(Error::CircProto(format!(
744
                            "Stream sink would block; received too many cells on stream ID {}",
745
                            sv(streamid),
746
                        )));
747
                    } else {
748
                        return Err(internal!(
749
                            "Stream (ID {}) uses an unbounded queue, but apparently it's full?",
750
                            sv(streamid),
751
                        )
752
                        .into());
753
                    }
754
                }
755
            }
756
            if e.is_disconnected() && cell_counts_toward_windows {
757
                // the other side of the stream has gone away; remember
758
                // that we received a cell that we couldn't queue for it.
759
                //
760
                // Later this value will be recorded in a half-stream.
761
                ent.dropped += 1;
762
            }
763
212
        }
764

            
765
212
        Ok(message_closes_stream)
766
216
    }
767

            
768
    /// Note that we received an END message (or other message indicating the end of
769
    /// the stream) on the stream with `id`.
770
    ///
771
    /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
772
    #[cfg(feature = "hs-service")]
773
    pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
774
        let mut hop_map = self.map.lock().expect("lock poisoned");
775

            
776
        hop_map.ending_msg_received(stream_id)?;
777

            
778
        Ok(())
779
    }
780

            
781
    /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
782
    ///
783
    /// Returns back the provided `msg`, if the message is an incoming stream request
784
    /// that needs to be handled by the calling code.
785
    ///
786
    // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
787
    // back and forth like this.
788
276
    pub(crate) fn handle_msg<F>(
789
276
        &self,
790
276
        possible_proto_violation_err: F,
791
276
        cell_counts_toward_windows: bool,
792
276
        streamid: StreamId,
793
276
        msg: UnparsedRelayMsg,
794
276
        now: Instant,
795
276
    ) -> Result<Option<UnparsedRelayMsg>>
796
276
    where
797
276
        F: FnOnce(StreamId) -> Error,
798
    {
799
276
        let mut hop_map = self.map.lock().expect("lock poisoned");
800

            
801
276
        match hop_map.get_mut(streamid) {
802
216
            Some(StreamEntMut::Open(ent)) => {
803
                // Can't have a stream level SENDME when congestion control is enabled.
804
216
                let message_closes_stream =
805
216
                    Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
806

            
807
216
                if message_closes_stream {
808
24
                    hop_map.ending_msg_received(streamid)?;
809
192
                }
810
            }
811
20
            Some(StreamEntMut::EndSent(EndSentStreamEnt { expiry, .. })) if now >= *expiry => {
812
4
                return Err(possible_proto_violation_err(streamid));
813
            }
814
            #[cfg(feature = "hs-service")]
815
            Some(StreamEntMut::EndSent(_))
816
4
                if matches!(
817
16
                    msg.cmd(),
818
                    RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
819
                ) =>
820
            {
821
                // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
822
                // message, just remove the old stream from the map and stop waiting for a
823
                // response
824
12
                hop_map.ending_msg_received(streamid)?;
825
12
                return Ok(Some(msg));
826
            }
827
4
            Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
828
                // We sent an end but maybe the other side hasn't heard.
829

            
830
4
                match half_stream.handle_msg(msg)? {
831
4
                    StreamStatus::Open => {}
832
                    StreamStatus::Closed => {
833
                        hop_map.ending_msg_received(streamid)?;
834
                    }
835
                }
836
            }
837
            #[cfg(feature = "hs-service")]
838
            None if matches!(
839
40
                msg.cmd(),
840
                RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
841
            ) =>
842
            {
843
40
                return Ok(Some(msg));
844
            }
845
            _ => {
846
                // No stream wants this message, or ever did.
847
                return Err(possible_proto_violation_err(streamid));
848
            }
849
        }
850

            
851
220
        Ok(None)
852
276
    }
853

            
854
    /// Get the stream map of this hop.
855
39522
    pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
856
39522
        &self.map
857
39522
    }
858

            
859
    /// Set the stream map of this hop to `map`.
860
    ///
861
    /// Returns an error if the existing stream map of the hop has any open stream.
862
104
    pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
863
104
        if self.n_open_streams() != 0 {
864
            return Err(internal!("Tried to discard existing open streams?!"));
865
104
        }
866

            
867
104
        self.map = map;
868

            
869
104
        Ok(())
870
104
    }
871

            
872
    /// Decrement the limit of outbound cells that may be sent to this hop; give
873
    /// an error if it would reach zero.
874
4588
    pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
875
4588
        try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
876
4588
            .map_err(|_| Error::ExcessOutboundCells)
877
4588
    }
878
}
879

            
880
/// If `val` is `Some(1)`, return Err(());
881
/// otherwise decrement it (if it is Some) and return Ok(()).
882
#[inline]
883
5120
fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
884
    // This is a bit verbose, but I've confirmed that it optimizes nicely.
885
5120
    match val {
886
        Some(x) => {
887
            let z = u32::from(*x);
888
            if z == 1 {
889
                Err(())
890
            } else {
891
                *x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
892
                Ok(())
893
            }
894
        }
895
5120
        None => Ok(()),
896
    }
897
5120
}
898

            
899
/// Convert a limit from the form used in a HopSettings to that used here.
900
/// (The format we use here is more compact.)
901
fn cvt(limit: u32) -> NonZeroU32 {
902
    // See "known limitations" comment on n_incoming_cells_permitted.
903
    limit
904
        .saturating_add(1)
905
        .try_into()
906
        .expect("Adding one left it as zero?")
907
}
908

            
909
/// A collection of components that can be used to interact with the reactor's view of a Tor stream.
910
//
911
// TODO: We also have a `StreamComponents` type that is used and built outside of the reactor.
912
// It's maybe confusing to have these similar type names, so a better name would be nice.
913
//
914
// TODO(arti#2068): The components we return should maybe depend on what type of flow control is
915
// used, so in the future we might want to make some of these fields optional.
916
#[derive(Debug, Deftly)]
917
#[derive_deftly(HasMemoryCost)]
918
pub(crate) struct ReactorStreamComponents {
919
    /// An MPSC receiver for inbound messages that arrive on the stream.
920
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
921
    pub(crate) stream_inbound_rx: StreamQueueReceiver,
922

            
923
    /// An MPSC sender for outbound messages to be sent on the stream.
924
    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
925
    pub(crate) stream_outbound_tx: StreamMpscSender<AnyRelayMsg>,
926

            
927
    /// A mechanism to allow the stream's writer to receive rate limit updates from the reactor.
928
    // The `watch::Sender` owns the indirect data.
929
    #[deftly(has_memory_cost(indirect_size = "0"))]
930
    pub(crate) rate_limit_rx: watch::Receiver<StreamRateLimit>,
931

            
932
    /// A mechanism to allow the stream's reader to receive drain rate update requests from the
933
    /// reactor.
934
    #[deftly(has_memory_cost(indirect_size = "0"))]
935
    pub(crate) drain_rate_request_rx: NotifyReceiver<DrainRateRequest>,
936
}