1
//! Module exposing structures relating to a reactor's view of a circuit hop.
2

            
3
// TODO(relay): don't import from the client module
4
use crate::client::circuit::handshake::RelayCryptLayerProtocol;
5

            
6
use crate::ccparams::CongestionControlParams;
7
use crate::circuit::CircParameters;
8
use crate::congestion::{CongestionControl, sendme};
9
use crate::stream::CloseStreamBehavior;
10
use crate::stream::SEND_WINDOW_INIT;
11
use crate::stream::StreamMpscReceiver;
12
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
13
use crate::stream::flow_ctrl::params::FlowCtrlParameters;
14
use crate::stream::flow_ctrl::state::{StreamFlowCtrl, StreamRateLimit};
15
use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
16
use crate::stream::queue::StreamQueueSender;
17
use crate::streammap::{
18
    self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut, StreamMap,
19
};
20
use crate::util::notify::NotifySender;
21
use crate::{Error, HopNum, Result};
22

            
23
use postage::watch;
24
use safelog::sensitive as sv;
25
use tracing::{trace, warn};
26

            
27
use tor_cell::chancell::BoxedCellBody;
28
use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
29
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
30
use tor_cell::relaycell::msg::AnyRelayMsg;
31
use tor_cell::relaycell::{
32
    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
33
    StreamId, UnparsedRelayMsg,
34
};
35
use tor_error::{Bug, internal};
36
use tor_protover::named;
37

            
38
use std::num::NonZeroU32;
39
use std::pin::Pin;
40
use std::result::Result as StdResult;
41
use std::sync::{Arc, Mutex};
42
use std::time::Instant;
43

            
44
#[cfg(test)]
45
use tor_cell::relaycell::msg::SendmeTag;
46

            
47
use cfg_if::cfg_if;
48

            
49
/// Type of negotiation that we'll be performing as we establish a hop.
50
///
51
/// Determines what flavor of extensions we can send and receive, which in turn
52
/// limits the hop settings we can negotiate.
53
///
54
// TODO-CGO: This is likely to be refactored when we finally add support for
55
// HsV3+CGO, which will require refactoring
56
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
57
pub(crate) enum HopNegotiationType {
58
    /// We're using a handshake in which extension-based negotiation cannot occur.
59
    None,
60
    /// We're using the HsV3-ntor handshake, in which the client can send extensions,
61
    /// but the server cannot.
62
    ///
63
    /// As a special case, the default relay encryption protocol is the hsv3
64
    /// variant of Tor1.
65
    //
66
    // We would call this "HalfDuplex" or something, but we do not expect to add
67
    // any more handshakes of this type.
68
    HsV3,
69
    /// We're using a handshake in which both client and relay can send extensions.
70
    Full,
71
}
72

            
73
/// The settings we use for single hop of a circuit.
74
///
75
/// Unlike [`CircParameters`], this type is crate-internal.
76
/// We construct it based on our settings from the circuit,
77
/// and from the hop's actual capabilities.
78
/// Then, we negotiate with the hop as part of circuit
79
/// creation/extension to determine the actual settings that will be in use.
80
/// Finally, we use those settings to construct the negotiated circuit hop.
81
//
82
// TODO: Relays should probably derive an instance of this type too, as
83
// part of the circuit creation handshake.
84
#[derive(Clone, Debug)]
85
pub(crate) struct HopSettings {
86
    /// The negotiated congestion control settings for this hop .
87
    pub(crate) ccontrol: CongestionControlParams,
88

            
89
    /// Flow control parameters that will be used for streams on this hop.
90
    pub(crate) flow_ctrl_params: FlowCtrlParameters,
91

            
92
    /// Maximum number of permitted incoming relay cells for this hop.
93
    pub(crate) n_incoming_cells_permitted: Option<u32>,
94

            
95
    /// Maximum number of permitted outgoing relay cells for this hop.
96
    pub(crate) n_outgoing_cells_permitted: Option<u32>,
97

            
98
    /// The relay cell encryption algorithm and cell format for this hop.
99
    relay_crypt_protocol: RelayCryptLayerProtocol,
100
}
101

            
102
impl HopSettings {
103
    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
104
    /// and `caps` (a set of protocol capabilities for a circuit target).
105
    ///
106
    /// The resulting settings will represent what the client would prefer to negotiate
107
    /// (determined by `params`),
108
    /// as modified by what the target relay is believed to support (represented by `caps`).
109
    ///
110
    /// This represents the `HopSettings` in a pre-negotiation state:
111
    /// the circuit negotiation process will modify it.
112
    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
113
1068
    pub(crate) fn from_params_and_caps(
114
1068
        hoptype: HopNegotiationType,
115
1068
        params: &CircParameters,
116
1068
        caps: &tor_protover::Protocols,
117
1068
    ) -> Result<Self> {
118
1068
        let mut ccontrol = params.ccontrol.clone();
119
1068
        match ccontrol.alg() {
120
720
            crate::ccparams::Algorithm::FixedWindow(_) => {}
121
            crate::ccparams::Algorithm::Vegas(_) => {
122
                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
123
348
                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
124
                    ccontrol.use_fallback_alg();
125
348
                }
126
            }
127
        };
128
1068
        if hoptype == HopNegotiationType::None {
129
84
            ccontrol.use_fallback_alg();
130
984
        } else if hoptype == HopNegotiationType::HsV3 {
131
            // TODO #2037, TODO-CGO: We need a way to send congestion control extensions
132
            // in this case too.  But since we aren't sending them, we
133
            // should use the fallback algorithm.
134
            ccontrol.use_fallback_alg();
135
984
        }
136
1068
        let ccontrol = ccontrol; // drop mut
137

            
138
        // Negotiate CGO if it is supported, if CC is also supported,
139
        // and if CGO is available on this relay.
140
1068
        let relay_crypt_protocol = match hoptype {
141
84
            HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
142
            HopNegotiationType::HsV3 => {
143
                // TODO-CGO: Support CGO when available.
144
                cfg_if! {
145
                    if #[cfg(feature = "hs-common")] {
146
                        RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
147
                    } else {
148
                        return Err(
149
                            tor_error::internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
150
                        );
151
                    }
152
                }
153
            }
154
            HopNegotiationType::Full => {
155
                cfg_if! {
156
                    if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
157
                        #[allow(clippy::overly_complex_bool_expr)]
158
984
                        if  ccontrol.alg().compatible_with_cgo()
159
348
                            && caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
160
                            && caps.supports_named_subver(named::RELAY_CRYPT_CGO)
161
                        {
162
                            RelayCryptLayerProtocol::Cgo
163
                        } else {
164
984
                            RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
165
                        }
166
                    } else {
167
                        RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
168
                    }
169
                }
170
            }
171
        };
172

            
173
1068
        Ok(Self {
174
1068
            ccontrol,
175
1068
            flow_ctrl_params: params.flow_ctrl.clone(),
176
1068
            relay_crypt_protocol,
177
1068
            n_incoming_cells_permitted: params.n_incoming_cells_permitted,
178
1068
            n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
179
1068
        })
180
1068
    }
181

            
182
    /// Return the negotiated relay crypto protocol.
183
1128
    pub(crate) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
184
1128
        self.relay_crypt_protocol
185
1128
    }
186

            
187
    /// Return the client circuit-creation extensions that we should use in order to negotiate
188
    /// these circuit hop parameters.
189
    #[allow(clippy::unnecessary_wraps)]
190
36
    pub(crate) fn circuit_request_extensions(&self) -> Result<Vec<CircRequestExt>> {
191
        // allow 'unused_mut' because of the combinations of `cfg` conditions below
192
        #[allow(unused_mut)]
193
36
        let mut client_extensions = Vec::new();
194

            
195
        #[allow(unused, unused_mut)]
196
36
        let mut cc_extension_set = false;
197

            
198
36
        if self.ccontrol.is_enabled() {
199
12
            cfg_if::cfg_if! {
200
12
                if #[cfg(feature = "flowctl-cc")] {
201
12
                    client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
202
12
                    cc_extension_set = true;
203
12
                } else {
204
12
                    return Err(
205
12
                        tor_error::internal!(
206
12
                            "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
207
12
                        )
208
12
                        .into()
209
12
                    );
210
12
                }
211
12
            }
212
24
        }
213

            
214
        // See whether we need to send a list of required protocol capabilities.
215
        // These aren't "negotiated" per se; they're simply demanded.
216
        // The relay will refuse the circuit if it doesn't support all of them,
217
        // and if any of them isn't supported in the SubprotocolRequest extension.
218
        //
219
        // (In other words, don't add capabilities here just because you want the
220
        // relay to have them! They must be explicitly listed as supported for use
221
        // with this extension. For the current list, see
222
        // https://spec.torproject.org/tor-spec/create-created-cells.html#subproto-request)
223
        //
224
        #[allow(unused_mut)]
225
36
        let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
226

            
227
        #[cfg(feature = "counter-galois-onion")]
228
36
        if matches!(self.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
229
            if !cc_extension_set {
230
                return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
231
            }
232
            required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
233
36
        }
234

            
235
36
        if !required_protocol_capabilities.is_empty() {
236
            client_extensions.push(CircRequestExt::SubprotocolRequest(
237
                required_protocol_capabilities.into_iter().collect(),
238
            ));
239
36
        }
240

            
241
36
        Ok(client_extensions)
242
36
    }
243
}
244

            
245
#[cfg(test)]
246
impl std::default::Default for CircParameters {
247
322
    fn default() -> Self {
248
322
        Self {
249
322
            extend_by_ed25519_id: true,
250
322
            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
251
322
            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
252
322
            n_incoming_cells_permitted: None,
253
322
            n_outgoing_cells_permitted: None,
254
322
        }
255
322
    }
256
}
257

            
258
impl CircParameters {
259
    /// Constructor
260
596
    pub fn new(
261
596
        extend_by_ed25519_id: bool,
262
596
        ccontrol: CongestionControlParams,
263
596
        flow_ctrl: FlowCtrlParameters,
264
596
    ) -> Self {
265
596
        Self {
266
596
            extend_by_ed25519_id,
267
596
            ccontrol,
268
596
            flow_ctrl,
269
596
            n_incoming_cells_permitted: None,
270
596
            n_outgoing_cells_permitted: None,
271
596
        }
272
596
    }
273
}
274

            
275
/// Instructions for sending a RELAY cell.
276
///
277
/// This instructs a circuit reactor to send a RELAY cell to a given target
278
/// (a hop, if we are a client, or the client, if we are a relay).
279
#[derive(educe::Educe)]
280
#[educe(Debug)]
281
pub(crate) struct SendRelayCell {
282
    /// The hop number, or `None` if we are a relay.
283
    pub(crate) hop: Option<HopNum>,
284
    /// Whether to use a RELAY_EARLY cell.
285
    pub(crate) early: bool,
286
    /// The cell to send.
287
    pub(crate) cell: AnyRelayMsgOuter,
288
}
289

            
290
/// The inbound state of a hop.
291
pub(crate) struct CircHopInbound {
292
    /// Decodes relay cells received from this hop.
293
    decoder: RelayCellDecoder,
294
    /// Remaining permitted incoming relay cells from this hop, plus 1.
295
    ///
296
    /// (In other words, `None` represents no limit,
297
    /// `Some(1)` represents an exhausted limit,
298
    /// and `Some(n)` means that n-1 more cells may be received.)
299
    ///
300
    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
301
    n_incoming_cells_permitted: Option<NonZeroU32>,
302
}
303

            
304
/// The outbound state of a hop.
305
pub(crate) struct CircHopOutbound {
306
    /// Congestion control object.
307
    ///
308
    /// This object is also in charge of handling circuit level SENDME logic for this hop.
309
    ccontrol: Arc<Mutex<CongestionControl>>,
310
    /// Map from stream IDs to streams.
311
    ///
312
    /// We store this with the reactor instead of the circuit, since the
313
    /// reactor needs it for every incoming cell on a stream, whereas
314
    /// the circuit only needs it when allocating new streams.
315
    ///
316
    /// NOTE: this is behind a mutex because the client reactor polls the `StreamMap`s
317
    /// of all hops concurrently, in a `FuturesUnordered`. Without the mutex,
318
    /// this wouldn't be possible, because it would mean holding multiple
319
    /// mutable references to `self` (the reactor). Note, however,
320
    /// that there should never be any contention on this mutex:
321
    /// we never create more than one
322
    /// `CircHopList::ready_streams_iterator()` stream
323
    /// at a time, and we never clone/lock the hop's `StreamMap` outside of it.
324
    ///
325
    /// Additionally, the stream map of the last hop (join point) of a conflux tunnel
326
    /// is shared with all the circuits in the tunnel.
327
    map: Arc<Mutex<StreamMap>>,
328
    /// Format to use for relay cells.
329
    //
330
    // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
331
    relay_format: RelayCellFormat,
332
    /// Flow control parameters for new streams.
333
    flow_ctrl_params: Arc<FlowCtrlParameters>,
334
    /// Remaining permitted outgoing relay cells from this hop, plus 1.
335
    ///
336
    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
337
    n_outgoing_cells_permitted: Option<NonZeroU32>,
338
}
339

            
340
impl CircHopInbound {
341
    /// Create a new [`CircHopInbound`].
342
1020
    pub(crate) fn new(decoder: RelayCellDecoder, settings: &HopSettings) -> Self {
343
1020
        Self {
344
1020
            decoder,
345
1020
            n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
346
1020
        }
347
1020
    }
348

            
349
    /// Parse a RELAY or RELAY_EARLY cell body.
350
    ///
351
    /// Requires that the cryptographic checks on the message have already been
352
    /// performed
353
500
    pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
354
500
        self.decoder
355
500
            .decode(cell)
356
500
            .map_err(|e| Error::from_bytes_err(e, "relay cell"))
357
500
    }
358

            
359
    /// Decrement the limit of inbound cells that may be received from this hop; give
360
    /// an error if it would reach zero.
361
500
    pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
362
500
        try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
363
500
            .map_err(|_| Error::ExcessInboundCells)
364
500
    }
365
}
366

            
367
impl CircHopOutbound {
368
    /// Create a new [`CircHopOutbound`].
369
1020
    pub(crate) fn new(
370
1020
        ccontrol: Arc<Mutex<CongestionControl>>,
371
1020
        relay_format: RelayCellFormat,
372
1020
        flow_ctrl_params: Arc<FlowCtrlParameters>,
373
1020
        settings: &HopSettings,
374
1020
    ) -> Self {
375
1020
        Self {
376
1020
            ccontrol,
377
1020
            map: Arc::new(Mutex::new(StreamMap::new())),
378
1020
            relay_format,
379
1020
            flow_ctrl_params,
380
1020
            n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
381
1020
        }
382
1020
    }
383

            
384
    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
385
    /// `message` to the provided hop.
386
    #[allow(clippy::too_many_arguments)]
387
96
    pub(crate) fn begin_stream(
388
96
        &mut self,
389
96
        hop: Option<HopNum>,
390
96
        message: AnyRelayMsg,
391
96
        sender: StreamQueueSender,
392
96
        rx: StreamMpscReceiver<AnyRelayMsg>,
393
96
        rate_limit_updater: watch::Sender<StreamRateLimit>,
394
96
        drain_rate_requester: NotifySender<DrainRateRequest>,
395
96
        cmd_checker: AnyCmdChecker,
396
96
    ) -> Result<(SendRelayCell, StreamId)> {
397
96
        let flow_ctrl = self.build_flow_ctrl(
398
96
            Arc::clone(&self.flow_ctrl_params),
399
96
            rate_limit_updater,
400
96
            drain_rate_requester,
401
        )?;
402
96
        let r =
403
96
            self.map
404
96
                .lock()
405
96
                .expect("lock poisoned")
406
96
                .add_ent(sender, rx, flow_ctrl, cmd_checker)?;
407
96
        let cell = AnyRelayMsgOuter::new(Some(r), message);
408
96
        Ok((
409
96
            SendRelayCell {
410
96
                hop,
411
96
                early: false,
412
96
                cell,
413
96
            },
414
96
            r,
415
96
        ))
416
96
    }
417

            
418
    /// Close the stream associated with `id` because the stream was dropped.
419
    ///
420
    /// If we have not already received an END cell on this stream, send one.
421
    /// If no END cell is specified, an END cell with the reason byte set to
422
    /// REASON_MISC will be sent.
423
    ///
424
    // Note(relay): `circ_id` is an opaque displayable type
425
    // because relays use a different circuit ID type
426
    // than clients. Eventually, we should probably make
427
    // them both use the same ID type, or have a nicer approach here
428
70
    pub(crate) fn close_stream(
429
70
        &mut self,
430
70
        circ_id: impl std::fmt::Display,
431
70
        id: StreamId,
432
70
        hop: Option<HopNum>,
433
70
        message: CloseStreamBehavior,
434
70
        why: streammap::TerminateReason,
435
70
        expiry: Instant,
436
70
    ) -> Result<Option<SendRelayCell>> {
437
70
        let should_send_end = self
438
70
            .map
439
70
            .lock()
440
70
            .expect("lock poisoned")
441
70
            .terminate(id, why, expiry)?;
442
70
        trace!(
443
            circ_id = %circ_id,
444
            stream_id = %id,
445
            should_send_end = ?should_send_end,
446
            "Ending stream",
447
        );
448
        // TODO: I am about 80% sure that we only send an END cell if
449
        // we didn't already get an END cell.  But I should double-check!
450
70
        if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
451
70
            (should_send_end, message)
452
        {
453
70
            let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
454
70
            let cell = SendRelayCell {
455
70
                hop,
456
70
                early: false,
457
70
                cell: end_cell,
458
70
            };
459

            
460
70
            return Ok(Some(cell));
461
        }
462
        Ok(None)
463
70
    }
464

            
465
    /// Check if we should send an XON message.
466
    ///
467
    /// If we should, then returns the XON message that should be sent.
468
    pub(crate) fn maybe_send_xon(
469
        &mut self,
470
        rate: XonKbpsEwma,
471
        id: StreamId,
472
    ) -> Result<Option<Xon>> {
473
        // the call below will return an error if XON/XOFF aren't supported,
474
        // so we check for support here
475
        if !self
476
            .ccontrol()
477
            .lock()
478
            .expect("poisoned lock")
479
            .uses_xon_xoff()
480
        {
481
            return Ok(None);
482
        }
483

            
484
        let mut map = self.map.lock().expect("lock poisoned");
485
        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
486
            // stream went away
487
            return Ok(None);
488
        };
489

            
490
        ent.maybe_send_xon(rate)
491
    }
492

            
493
    /// Check if we should send an XOFF message.
494
    ///
495
    /// If we should, then returns the XOFF message that should be sent.
496
216
    pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
497
        // the call below will return an error if XON/XOFF aren't supported,
498
        // so we check for support here
499
216
        if !self
500
216
            .ccontrol()
501
216
            .lock()
502
216
            .expect("poisoned lock")
503
216
            .uses_xon_xoff()
504
        {
505
140
            return Ok(None);
506
76
        }
507

            
508
76
        let mut map = self.map.lock().expect("lock poisoned");
509
76
        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
510
            // stream went away
511
12
            return Ok(None);
512
        };
513

            
514
64
        ent.maybe_send_xoff()
515
216
    }
516

            
517
    /// Return the format that is used for relay cells sent to this hop.
518
    ///
519
    /// For the most part, this format isn't necessary to interact with a CircHop;
520
    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
521
4726
    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
522
4726
        self.relay_format
523
4726
    }
524

            
525
    /// Delegate to CongestionControl, for testing purposes
526
    #[cfg(test)]
527
20
    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
528
20
        self.ccontrol()
529
20
            .lock()
530
20
            .expect("poisoned lock")
531
20
            .send_window_and_expected_tags()
532
20
    }
533

            
534
    /// Return the number of open streams on this hop.
535
    ///
536
    /// WARNING: because this locks the stream map mutex,
537
    /// it should never be called from a context where that mutex is already locked.
538
104
    pub(crate) fn n_open_streams(&self) -> usize {
539
104
        self.map.lock().expect("lock poisoned").n_open_streams()
540
104
    }
541

            
542
    /// Return a reference to our CongestionControl object.
543
27644
    pub(crate) fn ccontrol(&self) -> &Arc<Mutex<CongestionControl>> {
544
27644
        &self.ccontrol
545
27644
    }
546

            
547
    /// We're about to send `msg`.
548
    ///
549
    /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
550
    //
551
    // TODO prop340: This should take a cell or similar, not a message.
552
    //
553
    // Note(relay): `circ_id` is an opaque displayable type
554
    // because relays use a different circuit ID type
555
    // than clients. Eventually, we should probably make
556
    // them both use the same ID type, or have a nicer approach here
557
4156
    pub(crate) fn about_to_send(
558
4156
        &mut self,
559
4156
        circ_id: impl std::fmt::Display,
560
4156
        stream_id: StreamId,
561
4156
        msg: &AnyRelayMsg,
562
4156
    ) -> Result<()> {
563
4156
        let mut hop_map = self.map.lock().expect("lock poisoned");
564
4156
        let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
565
            warn!(
566
                circ_id = %circ_id,
567
                stream_id = %stream_id,
568
                "sending a relay cell for non-existent or non-open stream!",
569
            );
570
            return Err(Error::CircProto(format!(
571
                "tried to send a relay cell on non-open stream {}",
572
                sv(stream_id),
573
            )));
574
        };
575

            
576
4156
        ent.about_to_send(msg)
577
4156
    }
578

            
579
    /// Add an entry to this map using the specified StreamId.
580
    #[cfg(any(feature = "hs-service", feature = "relay"))]
581
36
    pub(crate) fn add_ent_with_id(
582
36
        &self,
583
36
        sink: StreamQueueSender,
584
36
        rx: StreamMpscReceiver<AnyRelayMsg>,
585
36
        rate_limit_updater: watch::Sender<StreamRateLimit>,
586
36
        drain_rate_requester: NotifySender<DrainRateRequest>,
587
36
        stream_id: StreamId,
588
36
        cmd_checker: AnyCmdChecker,
589
36
    ) -> Result<()> {
590
36
        let mut hop_map = self.map.lock().expect("lock poisoned");
591
36
        hop_map.add_ent_with_id(
592
36
            sink,
593
36
            rx,
594
36
            self.build_flow_ctrl(
595
36
                Arc::clone(&self.flow_ctrl_params),
596
36
                rate_limit_updater,
597
36
                drain_rate_requester,
598
            )?,
599
36
            stream_id,
600
36
            cmd_checker,
601
        )?;
602

            
603
36
        Ok(())
604
36
    }
605

            
606
    /// Builds the reactor's flow control handler for a new stream.
607
    // TODO: remove the `Result` once we remove the "flowctl-cc" feature
608
    #[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
609
132
    fn build_flow_ctrl(
610
132
        &self,
611
132
        params: Arc<FlowCtrlParameters>,
612
132
        rate_limit_updater: watch::Sender<StreamRateLimit>,
613
132
        drain_rate_requester: NotifySender<DrainRateRequest>,
614
132
    ) -> Result<StreamFlowCtrl> {
615
132
        if self
616
132
            .ccontrol()
617
132
            .lock()
618
132
            .expect("poisoned lock")
619
132
            .uses_stream_sendme()
620
        {
621
120
            let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
622
120
            Ok(StreamFlowCtrl::new_window(window))
623
        } else {
624
            cfg_if::cfg_if! {
625
                if #[cfg(feature = "flowctl-cc")] {
626
                    // TODO: Currently arti only supports clients, and we don't support connecting
627
                    // to onion services while using congestion control, so we hardcode this. In the
628
                    // future we will need to somehow tell the `CircHop` this so that we can set it
629
                    // correctly, since we don't want to enable this at exits.
630
12
                    let use_sidechannel_mitigations = true;
631

            
632
12
                    Ok(StreamFlowCtrl::new_xon_xoff(
633
12
                        params,
634
12
                        use_sidechannel_mitigations,
635
12
                        rate_limit_updater,
636
12
                        drain_rate_requester,
637
12
                    ))
638
                } else {
639
                    drop(params);
640
                    drop(rate_limit_updater);
641
                    drop(drain_rate_requester);
642
                    Err(internal!(
643
                        "`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
644
                    ).into())
645
                }
646
            }
647
        }
648
132
    }
649

            
650
    /// Deliver `msg` to the specified open stream entry `ent`.
651
212
    fn deliver_msg_to_stream(
652
212
        streamid: StreamId,
653
212
        ent: &mut OpenStreamEnt,
654
212
        cell_counts_toward_windows: bool,
655
212
        msg: UnparsedRelayMsg,
656
212
    ) -> Result<bool> {
657
        use tor_async_utils::SinkTrySend as _;
658
        use tor_async_utils::SinkTrySendError as _;
659

            
660
        // The stream for this message exists, and is open.
661

            
662
        // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
663
        // else we'd never notice them if the stream isn't reading.
664
        //
665
        // TODO: this logic is the same as `HalfStream::handle_msg`; we should refactor this if
666
        // possible
667
212
        match msg.cmd() {
668
            RelayCmd::SENDME => {
669
4
                ent.put_for_incoming_sendme(msg)?;
670
4
                return Ok(false);
671
            }
672
            RelayCmd::XON => {
673
                ent.handle_incoming_xon(msg)?;
674
                return Ok(false);
675
            }
676
            RelayCmd::XOFF => {
677
                ent.handle_incoming_xoff(msg)?;
678
                return Ok(false);
679
            }
680
208
            _ => {}
681
        }
682

            
683
208
        let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
684

            
685
208
        if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
686
            if e.is_full() {
687
                cfg_if::cfg_if! {
688
                    if #[cfg(not(feature = "flowctl-cc"))] {
689
                        // If we get here, we either have a logic bug (!), or an attacker
690
                        // is sending us more cells than we asked for via congestion control.
691
                        return Err(Error::CircProto(format!(
692
                            "Stream sink would block; received too many cells on stream ID {}",
693
                            sv(streamid),
694
                        )));
695
                    } else {
696
                        return Err(internal!(
697
                            "Stream (ID {}) uses an unbounded queue, but apparently it's full?",
698
                            sv(streamid),
699
                        )
700
                        .into());
701
                    }
702
                }
703
            }
704
            if e.is_disconnected() && cell_counts_toward_windows {
705
                // the other side of the stream has gone away; remember
706
                // that we received a cell that we couldn't queue for it.
707
                //
708
                // Later this value will be recorded in a half-stream.
709
                ent.dropped += 1;
710
            }
711
208
        }
712

            
713
208
        Ok(message_closes_stream)
714
212
    }
715

            
716
    /// Note that we received an END message (or other message indicating the end of
717
    /// the stream) on the stream with `id`.
718
    ///
719
    /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
720
    #[cfg(feature = "hs-service")]
721
    pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
722
        let mut hop_map = self.map.lock().expect("lock poisoned");
723

            
724
        hop_map.ending_msg_received(stream_id)?;
725

            
726
        Ok(())
727
    }
728

            
729
    /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
730
    ///
731
    /// Returns back the provided `msg`, if the message is an incoming stream request
732
    /// that needs to be handled by the calling code.
733
    ///
734
    // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
735
    // back and forth like this.
736
268
    pub(crate) fn handle_msg<F>(
737
268
        &self,
738
268
        possible_proto_violation_err: F,
739
268
        cell_counts_toward_windows: bool,
740
268
        streamid: StreamId,
741
268
        msg: UnparsedRelayMsg,
742
268
        now: Instant,
743
268
    ) -> Result<Option<UnparsedRelayMsg>>
744
268
    where
745
268
        F: FnOnce(StreamId) -> Error,
746
    {
747
268
        let mut hop_map = self.map.lock().expect("lock poisoned");
748

            
749
268
        match hop_map.get_mut(streamid) {
750
212
            Some(StreamEntMut::Open(ent)) => {
751
                // Can't have a stream level SENDME when congestion control is enabled.
752
212
                let message_closes_stream =
753
212
                    Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
754

            
755
212
                if message_closes_stream {
756
24
                    hop_map.ending_msg_received(streamid)?;
757
188
                }
758
            }
759
20
            Some(StreamEntMut::EndSent(EndSentStreamEnt { expiry, .. })) if now >= *expiry => {
760
4
                return Err(possible_proto_violation_err(streamid));
761
            }
762
            #[cfg(feature = "hs-service")]
763
            Some(StreamEntMut::EndSent(_))
764
4
                if matches!(
765
16
                    msg.cmd(),
766
                    RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
767
                ) =>
768
            {
769
                // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
770
                // message, just remove the old stream from the map and stop waiting for a
771
                // response
772
12
                hop_map.ending_msg_received(streamid)?;
773
12
                return Ok(Some(msg));
774
            }
775
4
            Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
776
                // We sent an end but maybe the other side hasn't heard.
777

            
778
4
                match half_stream.handle_msg(msg)? {
779
4
                    StreamStatus::Open => {}
780
                    StreamStatus::Closed => {
781
                        hop_map.ending_msg_received(streamid)?;
782
                    }
783
                }
784
            }
785
            #[cfg(feature = "hs-service")]
786
            None if matches!(
787
36
                msg.cmd(),
788
                RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
789
            ) =>
790
            {
791
36
                return Ok(Some(msg));
792
            }
793
            _ => {
794
                // No stream wants this message, or ever did.
795
                return Err(possible_proto_violation_err(streamid));
796
            }
797
        }
798

            
799
216
        Ok(None)
800
268
    }
801

            
802
    /// Get the stream map of this hop.
803
39550
    pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
804
39550
        &self.map
805
39550
    }
806

            
807
    /// Set the stream map of this hop to `map`.
808
    ///
809
    /// Returns an error if the existing stream map of the hop has any open stream.
810
104
    pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
811
104
        if self.n_open_streams() != 0 {
812
            return Err(internal!("Tried to discard existing open streams?!"));
813
104
        }
814

            
815
104
        self.map = map;
816

            
817
104
        Ok(())
818
104
    }
819

            
820
    /// Decrement the limit of outbound cells that may be sent to this hop; give
821
    /// an error if it would reach zero.
822
4594
    pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
823
4594
        try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
824
4594
            .map_err(|_| Error::ExcessOutboundCells)
825
4594
    }
826
}
827

            
828
/// If `val` is `Some(1)`, return Err(());
829
/// otherwise decrement it (if it is Some) and return Ok(()).
830
#[inline]
831
5094
fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
832
    // This is a bit verbose, but I've confirmed that it optimizes nicely.
833
5094
    match val {
834
        Some(x) => {
835
            let z = u32::from(*x);
836
            if z == 1 {
837
                Err(())
838
            } else {
839
                *x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
840
                Ok(())
841
            }
842
        }
843
5094
        None => Ok(()),
844
    }
845
5094
}
846

            
847
/// Convert a limit from the form used in a HopSettings to that used here.
848
/// (The format we use here is more compact.)
849
fn cvt(limit: u32) -> NonZeroU32 {
850
    // See "known limitations" comment on n_incoming_cells_permitted.
851
    limit
852
        .saturating_add(1)
853
        .try_into()
854
        .expect("Adding one left it as zero?")
855
}