1
//! A `maybenot`-specific backend for padding.
2

            
3
// Some of the circuit padding implementation isn't reachable unless
4
// the extra-experimental circ-padding-manual feature is also present.
5
//
6
// TODO circpad: Remove this once we have circ-padding negotiation implemented.
7
#![cfg_attr(
8
    all(feature = "circ-padding", not(feature = "circ-padding-manual")),
9
    expect(dead_code)
10
)]
11

            
12
mod backend;
13

            
14
use std::collections::VecDeque;
15
use std::num::NonZeroU16;
16
use std::pin::Pin;
17
use std::sync::{Arc, Mutex};
18
use std::task::{Context, Poll, Waker};
19

            
20
use bitvec::BitArr;
21
use maybenot::MachineId;
22
use smallvec::SmallVec;
23
use tor_memquota::memory_cost_structural_copy;
24
use tor_rtcompat::{DynTimeProvider, SleepProvider};
25

            
26
use crate::HopNum;
27
use crate::circuit::HOPS;
28
use crate::util::err::ExcessPadding;
29
use backend::PaddingBackend;
30

            
31
/// The type of Instant that we'll use for our padding machines.
32
///
33
/// We use a separate type alias here in case we want to move to coarsetime.
34
type Instant = web_time_compat::Instant;
35

            
36
/// The type of Duration that we'll use for our padding machines.
37
///
38
/// We use a separate type alias here in case we want to move to coarsetime.
39
type Duration = std::time::Duration;
40

            
41
/// A type we use to generate a set of [`PaddingEvent`].
42
///
43
/// This is a separate type so we can tune it and make it into a smallvec if needed.
44
type PaddingEventQueue = VecDeque<PaddingEvent>;
45

            
46
/// A type we use to generate a set of [`PaddingEvent`].
47
///
48
/// This is a separate type so we can tune it and make it into a smallvec if needed.
49
type PerHopPaddingEventVec = Vec<PerHopPaddingEvent>;
50

            
51
/// Specifications for a set of maybenot padding machines as used in Arti: used to construct a `maybenot::Framework`.
52
#[derive(Clone, Debug, derive_builder::Builder)]
53
#[builder(build_fn(
54
    validate = "Self::validate",
55
    private,
56
    error = "CircuitPadderConfigError"
57
))]
58
#[builder(name = "CircuitPadderConfig")]
59
#[cfg_attr(not(feature = "circ-padding-manual"), builder(private))]
60
#[cfg_attr(feature = "circ-padding-manual", builder(public))]
61
pub(crate) struct PaddingRules {
62
    /// List of padding machines to use for shaping traffic.
63
    ///
64
    /// Note that this list may be empty, if we only want to receive padding,
65
    /// and never send it.
66
    machines: Arc<[maybenot::Machine]>,
67
    /// Maximum allowable outbound padding fraction.
68
    ///
69
    /// Passed directly to maybenot; not enforced in Arti.
70
    /// See [`maybenot::Framework::new`] for details.
71
    ///
72
    /// Must be between 0.0 and 1.0
73
    #[builder(default = "1.0")]
74
    max_outbound_blocking_frac: f64,
75
    /// Maximum allowable outbound blocking fraction.
76
    ///
77
    /// Passed directly to maybenot; not enforced in Arti.
78
    /// See [`maybenot::Framework::new`] for details.
79
    ///
80
    /// Must be between 0.0 and 1.0.
81
    #[builder(default = "1.0")]
82
    max_outbound_padding_frac: f64,
83
    /// Maximum allowable fraction of inbound padding
84
    #[builder(default = "1.0")]
85
    max_inbound_padding_frac: f64,
86
    /// Number of cells before which we should not enforce max_inbound_padding_frac.
87
    #[builder(default = "1")]
88
    enforce_inbound_padding_after_cells: u16,
89
}
90

            
91
/// An error returned from validating a [`CircuitPadderConfig`].
92
#[derive(Clone, Debug, thiserror::Error)]
93
#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
94
#[non_exhaustive]
95
pub(crate) enum CircuitPadderConfigError {
96
    /// A field needed to be given, but wasn't.
97
    #[error("No value was given for {0}")]
98
    UninitializedField(&'static str),
99
    /// A field needed to be a proper fraction, but wasn't.
100
    #[error("Value was out of range for {0}. (Must be between 0 and 1)")]
101
    FractionOutOfRange(&'static str),
102
    /// Maybenot gave us an error when initializing the framework.
103
    #[error("Maybenot could not initialize framework for rules")]
104
    MaybenotError(#[from] maybenot::Error),
105
}
106

            
107
impl From<derive_builder::UninitializedFieldError> for CircuitPadderConfigError {
108
    fn from(value: derive_builder::UninitializedFieldError) -> Self {
109
        Self::UninitializedField(value.field_name())
110
    }
111
}
112

            
113
impl CircuitPadderConfig {
114
    /// Helper: Return an error if this is not a valid Builder.
115
    fn validate(&self) -> Result<(), CircuitPadderConfigError> {
116
        macro_rules! enforce_frac {
117
            { $field:ident } =>
118
            {
119
                if self.$field.is_some_and(|v| ! (0.0 .. 1.0).contains(&v)) {
120
                    return Err(CircuitPadderConfigError::FractionOutOfRange(stringify!($field)));
121
                }
122
            }
123
        }
124
        enforce_frac!(max_outbound_blocking_frac);
125
        enforce_frac!(max_outbound_padding_frac);
126
        enforce_frac!(max_inbound_padding_frac);
127

            
128
        Ok(())
129
    }
130

            
131
    /// Construct a [`CircuitPadder`] based on this [`CircuitPadderConfig`].
132
    ///
133
    /// A [`CircuitPadderConfig`] is created its accessors, and used with this method to build a [`CircuitPadder`].
134
    ///
135
    /// That [`CircuitPadder`] can then be installed on a circuit using [`ClientCirc::start_padding_at_hop`](crate::client::circuit::ClientCirc::start_padding_at_hop).
136
    #[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
137
    pub(crate) fn create_padder(
138
        &self,
139
        now: Instant,
140
    ) -> Result<CircuitPadder, CircuitPadderConfigError> {
141
        let rules = self.build()?;
142
        let backend = rules.create_padding_backend(now)?;
143
        let initial_stats = rules.initialize_stats();
144
        Ok(CircuitPadder {
145
            initial_stats,
146
            backend,
147
        })
148
    }
149
}
150

            
151
impl PaddingRules {
152
    /// Create a [`PaddingBackend`] for this [`PaddingRules`], so we can install it in a
153
    /// [`PaddingShared`].
154
    fn create_padding_backend(
155
        &self,
156
        now: Instant,
157
    ) -> Result<Box<dyn PaddingBackend>, maybenot::Error> {
158
        // TODO circpad: specialize this for particular values of n_machines,
159
        // when we finally go to implement padding.
160
        const OPTIMIZE_FOR_N_MACHINES: usize = 4;
161

            
162
        let backend =
163
            backend::MaybenotPadder::<OPTIMIZE_FOR_N_MACHINES>::from_framework_rules(self, now)?;
164
        Ok(Box::new(backend))
165
    }
166

            
167
    /// Create a new `PaddingStats` to reflect the rules for inbound padding of this  PaddingRules
168
    fn initialize_stats(&self) -> PaddingStats {
169
        PaddingStats {
170
            n_padding: 0,
171
            n_normal: 0,
172
            max_padding_frac: self.max_inbound_padding_frac as f32,
173
            // We just convert 0 to 1, since that's necessarily what was meant.
174
            enforce_max_after: self
175
                .enforce_inbound_padding_after_cells
176
                .try_into()
177
                .unwrap_or(1.try_into().expect("1 was not nonzero!?")),
178
        }
179
    }
180
}
181

            
182
/// A opaque handle to a padding implementation for a single hop.
183
///
184
/// This type is constructed with [`CircuitPadderConfig::create_padder`].
185
#[derive(derive_more::Debug)]
186
#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
187
pub(crate) struct CircuitPadder {
188
    /// The initial padding stats and restrictions for inbound padding.
189
    initial_stats: PaddingStats,
190
    /// The underlying backend to use.
191
    #[debug(skip)]
192
    backend: Box<dyn PaddingBackend>,
193
}
194

            
195
/// An instruction from the padding machine to the circuit.
196
///
197
/// These are returned from the [`PaddingEventStream`].
198
///
199
/// When the `circ-padding` feature is disabled, these won't actually be constructed.
200
#[derive(Clone, Copy, Debug)]
201
pub(crate) enum PaddingEvent {
202
    /// An instruction to send padding.
203
    SendPadding(SendPadding),
204
    /// An instruction to start blocking outbound traffic,
205
    /// or change the hop at which traffic is blocked.
206
    StartBlocking(StartBlocking),
207
    /// An instruction to stop all blocking.
208
    StopBlocking,
209
}
210

            
211
/// An instruction from a single padding hop.
212
///
213
/// This will be turned into a [`PaddingEvent`] before it's given
214
/// to the circuit reactor.
215
#[derive(Clone, Copy, Debug)]
216
enum PerHopPaddingEvent {
217
    /// An instruction to send padding.
218
    SendPadding {
219
        /// The machine that told us to send the padding.
220
        ///
221
        /// (We need to use this when we report that we sent the padding.)
222
        machine: MachineId,
223
        /// Whether the padding can be replaced with regular data.
224
        replace: Replace,
225
        /// Whether the padding can bypass a bypassable block.
226
        bypass: Bypass,
227
    },
228
    /// An instruction to start blocking traffic..
229
    StartBlocking {
230
        /// Whether this blocking instance may be bypassed by padding with
231
        /// [`Bypass::BypassBlocking`].
232
        ///
233
        /// (Note that this is _not_ a `Bypass`, since that enum notes whether
234
        /// or not a _padding_ cell can bypass blocking)
235
        is_bypassable: bool,
236
    },
237
    /// An instruction to stop blocking.
238
    StopBlocking,
239
}
240

            
241
/// Whether a given piece of padding can be replaced with queued data.
242
///
243
/// This is an enum to avoid confusing it with `Bypass`.
244
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245
pub(crate) enum Replace {
246
    /// The padding can be replaced
247
    /// either by packaging data in a regular data cell,
248
    /// or with data currently queued but not yet sent.
249
    Replaceable,
250
    /// The padding must be queued; it can't be replaced with data.
251
    NotReplaceable,
252
}
253

            
254
impl Replace {
255
    /// Construct a [`Replace`] from a bool.
256
    fn from_bool(replace: bool) -> Self {
257
        match replace {
258
            true => Replace::Replaceable,
259
            false => Replace::NotReplaceable,
260
        }
261
    }
262
}
263

            
264
/// Whether a piece of padding can bypass a bypassable case of blocking.
265
///
266
/// This is an enum to avoid confusing it with `Release`.
267
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
268
pub(crate) enum Bypass {
269
    /// This padding may bypass the block, if the block is bypassable.
270
    ///
271
    /// Note that this case has complicated interactions with `Replace`; see the
272
    /// `maybenot` documentation.
273
    BypassBlocking,
274
    /// The padding may not bypass the block.
275
    DoNotBypass,
276
}
277

            
278
/// Information about a queued cell that we need to feed back into the padding
279
/// subsystem.
280
#[derive(Clone, Copy, Debug)]
281
pub(crate) struct QueuedCellPaddingInfo {
282
    /// The hop that will receive this cell.
283
    pub(crate) target_hop: HopNum,
284
}
285
memory_cost_structural_copy!(QueuedCellPaddingInfo);
286

            
287
impl Bypass {
288
    /// Construct a [`Bypass`] from a bool.
289
    fn from_bool(replace: bool) -> Self {
290
        match replace {
291
            true => Bypass::BypassBlocking,
292
            false => Bypass::DoNotBypass,
293
        }
294
    }
295
}
296

            
297
/// An indication that we should send a padding cell.
298
///
299
/// Don't drop this: instead, once the cell is queued,
300
/// pass this `SendPadding` object to the relevant [`PaddingController`]
301
/// to report that the particular piece of padding has been queued.
302
#[derive(Clone, Debug, Copy)]
303
pub(crate) struct SendPadding {
304
    /// The machine within a framework that told us to send the padding.
305
    ///
306
    /// We store this so we can tell the framework which machine's padding we sent.
307
    machine: maybenot::MachineId,
308

            
309
    /// The hop to which we need to send the padding.
310
    pub(crate) hop: HopNum,
311

            
312
    /// Whether this padding can be replaced by regular data.
313
    pub(crate) replace: Replace,
314

            
315
    /// Whether this padding cell should bypass any current blocking.
316
    pub(crate) bypass: Bypass,
317
}
318

            
319
impl SendPadding {
320
    /// Convert this SendPadding into a TriggerEvent for Maybenot,
321
    /// to indicate that the padding was sent.
322
    fn into_sent_event(self) -> maybenot::TriggerEvent {
323
        maybenot::TriggerEvent::PaddingSent {
324
            machine: self.machine,
325
        }
326
    }
327

            
328
    /// If true, we are allowed to replace this padding cell
329
    /// with a normal non-padding cell.
330
    ///
331
    /// (If we do, we should call [`PaddingController::queued_data_as_padding`])
332
    pub(crate) fn may_replace_with_data(&self) -> Replace {
333
        self.replace
334
    }
335

            
336
    /// Return whether this padding cell is allowed to bypass any current blocking.
337
    pub(crate) fn may_bypass_block(&self) -> Bypass {
338
        self.bypass
339
    }
340
}
341

            
342
/// An instruction to start blocking traffic
343
/// or to change the rules for blocking traffic.
344
#[derive(Clone, Copy, Debug)]
345
pub(crate) struct StartBlocking {
346
    /// If true, then padding traffic _to the blocking hop_
347
    /// can bypass this block, if it has [`Bypass::BypassBlocking`].
348
    ///
349
    /// (All traffic can be sent to earlier hops as normal.
350
    /// No traffic may be sent to later hops.)
351
    pub(crate) is_bypassable: bool,
352
}
353

            
354
/// Absolute upper bound for number of hops.
355
const MAX_HOPS: usize = 64;
356

            
357
/// A handle to the padding state of a single circuit.
358
///
359
/// Used to tell the padders about events that they need to react to.
360
#[derive(Clone, derive_more::Debug)]
361
pub(crate) struct PaddingController<S = DynTimeProvider>
362
where
363
    S: SleepProvider,
364
{
365
    /// The underlying shared state.
366
    #[debug(skip)]
367
    shared: Arc<Mutex<PaddingShared<S>>>,
368
}
369

            
370
/// The shared state for a single circuit's padding.
371
///
372
/// Used by both PaddingController and PaddingEventStream.
373
struct PaddingShared<S: SleepProvider> {
374
    /// A sleep provider for telling the time and creating sleep futures.
375
    runtime: S,
376
    /// Per-hop state for each hop that we have enabled padding with.
377
    ///
378
    /// INVARIANT: the length of this vector is no greater than `MAX_HOPS`.
379
    hops: SmallVec<[Option<Box<dyn PaddingBackend>>; HOPS]>,
380
    /// Records about how much padding and normal traffic we have received from each hop,
381
    /// and how much padding is allowed.
382
    stats: SmallVec<[Option<PaddingStats>; HOPS]>,
383
    /// Which hops are currently blocking, and whether that blocking is bypassable.
384
    blocking: BlockingState,
385
    /// When will the currently pending sleep future next expire?
386
    ///
387
    /// We keep track of this so that we know when we need to reset the sleep future.
388
    /// It gets updated by `PaddingStream::schedule_next_wakeup`,
389
    /// which we call in `<PaddingStream as Stream>::poll_next` immediately
390
    /// before we create a timer.
391
    next_scheduled_wakeup: Option<Instant>,
392

            
393
    /// A deque of `PaddingEvent` that we want to yield from our [`PaddingEventStream`].
394
    ///
395
    /// NOTE: If you put new items in this list from anywhere other than inside
396
    /// `PaddingEventStream::poll_next`, you need to alert the `waker`.
397
    pending_events: PaddingEventQueue,
398

            
399
    /// A waker to alert if we've added any events to padding_events,
400
    /// or if we need the stream to re-poll.
401
    //
402
    // TODO circpad: This waker is redundant with the one stored in every backend's `Timer`.
403
    // When we revisit this code we may want to consider combining them somehow.
404
    waker: Waker,
405
}
406

            
407
/// The number of padding and non-padding cells we have received from each hop,
408
/// and the rules for how many are allowed.
409
#[derive(Clone, Debug)]
410
struct PaddingStats {
411
    /// The number of padding cells we've received from this hop.
412
    n_padding: u64,
413
    /// The number of non-padding cells we've received from this hop.
414
    n_normal: u64,
415
    /// The maximum allowable fraction of padding cells.
416
    max_padding_frac: f32,
417
    /// A lower limit, below which we will not enforce `max_padding_frac`.
418
    //
419
    // This is a NonZero for several reasons:
420
    // - It doesn't make sense to enforce a ratio when no cells have been received.
421
    // - If we only check when the total is at above zero, we can avoid a division-by-zero check.
422
    // - Having an impossible value here ensures that the niche optimization
423
    //   will work on PaddingStats.
424
    enforce_max_after: NonZeroU16,
425
}
426

            
427
impl PaddingStats {
428
    /// Return an error if this PaddingStats has exceeded its maximum.
429
    fn validate(&self) -> Result<(), ExcessPadding> {
430
        // Total number of cells.
431
        // (It is impossible to get so many cells that this addition will overflow a u64.)
432
        let total = self.n_padding + self.n_normal;
433

            
434
        if total >= u16::from(self.enforce_max_after).into() {
435
            // TODO: is there a way we can avoid a floating-point op here?
436
            // Or can we limit the number of times that we need to check?
437
            // (Tobias suggests randomization; I'm worried.)
438
            //
439
            // On the one hand, this may never appear on our profiles.
440
            // But on the other hand, if it _does_ matter for performance,
441
            // it is likely to be on some marginal platform with bad FP performance,
442
            // where we are unlikely to be doing much testing.
443
            //
444
            // One promising possibility is to calculate a minimum amount of padding
445
            // that we _know_ will be valid, given the current total,
446
            // and then not check again until we at all until we reach that amount.
447
            if self.n_padding as f32 > (total as f32 * self.max_padding_frac) {
448
                return Err(ExcessPadding::PaddingExceedsLimit);
449
            }
450
        }
451
        Ok(())
452
    }
453
}
454

            
455
/// Current padding-related blocking status for a circuit.
456
///
457
/// We have to keep track of whether each hop is blocked or not,
458
/// and whether its blocking is bypassable.
459
/// But all we actually need to tell the reactor code
460
/// is whether to block the _entire_ circuit or not.
461
//
462
// TODO circpad: It might beneficial
463
// to block only the first blocking hop and its successors,
464
// but that creates tricky starvation problems
465
// in the case where we have queued traffic for a later, blocking, hop
466
// that prevents us from flushing any messages to earlier hops.
467
// We could solve this with tricky out-of-order designs,
468
// but for now we're just treating "blocked" as a boolean.
469
#[derive(Default)]
470
struct BlockingState {
471
    /// Whether each hop is currently blocked.
472
    hop_blocked: BitArr![for MAX_HOPS],
473
    /// Whether each hop's blocking is currently **not** bypassable.
474
    blocking_non_bypassable: BitArr![for MAX_HOPS],
475
}
476

            
477
impl BlockingState {
478
    /// Set the hop at position `idx` to blocked.
479
    fn set_blocked(&mut self, idx: usize, is_bypassable: bool) {
480
        self.hop_blocked.set(idx, true);
481
        self.blocking_non_bypassable.set(idx, !is_bypassable);
482
    }
483
    /// Set the hop at position `idx` to unblocked.
484
    fn set_unblocked(&mut self, idx: usize) {
485
        self.hop_blocked.set(idx, false);
486
        self.blocking_non_bypassable.set(idx, false);
487
    }
488
    /// Return a [`PaddingEvent`]
489
    fn blocking_update_paddingevent(&self) -> PaddingEvent {
490
        if self.blocking_non_bypassable.any() {
491
            // At least one hop has non-bypassable blocking, so our blocking is non-bypassable.
492
            PaddingEvent::StartBlocking(StartBlocking {
493
                is_bypassable: false,
494
            })
495
        } else if self.hop_blocked.any() {
496
            // At least one hop is blocking, but no hop has non-bypassable padding, so this padding
497
            // is bypassable.
498
            PaddingEvent::StartBlocking(StartBlocking {
499
                is_bypassable: true,
500
            })
501
        } else {
502
            // Nobody is blocking right now; it's time to unblock.
503
            PaddingEvent::StopBlocking
504
        }
505
    }
506
}
507

            
508
#[allow(clippy::unnecessary_wraps)]
509
impl<S: SleepProvider> PaddingController<S> {
510
    /// Report that we've enqueued a non-padding cell for a given hop.
511
    ///
512
    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
513
    /// when this cell is flushed.
514
9230
    pub(crate) fn queued_data(&self, hop: HopNum) -> Option<QueuedCellPaddingInfo> {
515
9230
        let mut shared = self.shared.lock().expect("Lock poisoned");
516
        // Every hop up to and including the target hop will see this as normal data.
517
9230
        shared.trigger_events(hop, &[maybenot::TriggerEvent::NormalSent]);
518
9230
        shared.info_for_hop(hop)
519
9230
    }
520

            
521
    /// Install the given [`CircuitPadder`] to start padding traffic to the listed `hop`.
522
    ///
523
    /// Stops padding if the provided padder is `None`.
524
    ///
525
    /// Replaces any previous [`CircuitPadder`].
526
    pub(crate) fn install_padder_padding_at_hop(&self, hop: HopNum, padder: Option<CircuitPadder>) {
527
        self.shared
528
            .lock()
529
            .expect("lock poisoned")
530
            .set_hop_backend(hop, padder);
531
    }
532

            
533
    /// Report that we have enqueued a non-padding cell
534
    /// in place of a replaceable padding cell
535
    /// for a given hop.
536
    ///
537
    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
538
    /// when this cell is flushed.
539
    pub(crate) fn queued_data_as_padding(
540
        &self,
541
        hop: HopNum,
542
        sendpadding: SendPadding,
543
    ) -> Option<QueuedCellPaddingInfo> {
544
        assert_eq!(hop, sendpadding.hop);
545
        assert_eq!(Replace::Replaceable, sendpadding.replace);
546
        let mut shared = self.shared.lock().expect("Lock poisoned");
547
        shared.trigger_events_mixed(
548
            hop,
549
            // Each intermediate hop sees this as normal data.
550
            &[maybenot::TriggerEvent::NormalSent],
551
            // For the target hop, we treat this both as normal, _and_ as padding.
552
            &[
553
                maybenot::TriggerEvent::NormalSent,
554
                sendpadding.into_sent_event(),
555
            ],
556
        );
557
        shared.info_for_hop(hop)
558
    }
559

            
560
    /// Report that we have enqueued a padding cell to a given hop.
561
    ///
562
    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
563
    /// when this cell is flushed.
564
    pub(crate) fn queued_padding(
565
        &self,
566
        hop: HopNum,
567
        sendpadding: SendPadding,
568
    ) -> Option<QueuedCellPaddingInfo> {
569
        assert_eq!(hop, sendpadding.hop);
570
        let mut shared = self.shared.lock().expect("Lock poisoned");
571
        shared.trigger_events_mixed(
572
            hop,
573
            // Each intermediate hop sees this as normal data.
574
            &[maybenot::TriggerEvent::NormalSent],
575
            // The target hop sees this as padding.
576
            &[sendpadding.into_sent_event()],
577
        );
578
        shared.info_for_hop(hop)
579
    }
580

            
581
    /// Report that we are using an already-queued cell
582
    /// as a substitute for sending padding to a given hop.
583
    pub(crate) fn replaceable_padding_already_queued(&self, hop: HopNum, sendpadding: SendPadding) {
584
        assert_eq!(hop, sendpadding.hop);
585
        let mut shared = self.shared.lock().expect("Lock poisoned");
586
        shared.trigger_events_mixed(
587
            hop,
588
            // No additional data will be seen for any intermediate hops.
589
            &[],
590
            // The target hop's machine sees this as padding.
591
            &[sendpadding.into_sent_event()],
592
        );
593
    }
594

            
595
    /// Report that we've flushed a cell from the queue for the given hop.
596
    pub(crate) fn flushed_relay_cell(&self, info: QueuedCellPaddingInfo) {
597
        // Every hop up to the last
598
        let mut shared = self.shared.lock().expect("Lock poisoned");
599
        shared.trigger_events(info.target_hop, &[maybenot::TriggerEvent::TunnelSent]);
600
    }
601

            
602
    /// Report that we've flushed a cell from the per-channel queue.
603
4540
    pub(crate) fn flushed_channel_cell(&self) {
604
4540
        let mut shared = self.shared.lock().expect("Lock poisoned");
605
4540
        shared.trigger_events(HopNum::from(0), &[maybenot::TriggerEvent::TunnelSent]);
606
4540
    }
607

            
608
    /// Report that we have decrypted a non-padding cell from our queue
609
    /// from a given hop.
610
    ///
611
    // Note that in theory, it would be better to trigger TunnelRecv as soon as
612
    // possible after we receive and enqueue the data cell, and NormalRecv only
613
    // once we've decrypted it and found it to be data.  But we can't do that,
614
    // since we won't know which hop actually originated the cell until we
615
    // decrypt it.
616
948
    pub(crate) fn decrypted_data(&self, hop: HopNum) {
617
948
        let mut shared = self.shared.lock().expect("Lock poisoned");
618
948
        shared.inc_normal_received(hop);
619
948
        shared.trigger_events(
620
948
            hop,
621
948
            // We treat this as normal data from every hop.
622
948
            &[
623
948
                maybenot::TriggerEvent::TunnelRecv,
624
948
                maybenot::TriggerEvent::NormalRecv,
625
948
            ],
626
948
        );
627
948
    }
628
    /// Report that we have decrypted a padding cell from our queue.
629
    ///
630
    /// Return an error if this padding cell is not acceptable
631
    /// (because we have received too much padding from this hop,
632
    /// or because we have not enabled padding with this hop.)
633
    //
634
    // See note above.
635
    pub(crate) fn decrypted_padding(&self, hop: HopNum) -> Result<(), crate::Error> {
636
        let mut shared = self.shared.lock().expect("Lock poisoned");
637
        shared
638
            .inc_padding_received(hop)
639
            .map_err(|e| crate::Error::ExcessPadding(e, hop))?;
640
        shared.trigger_events_mixed(
641
            hop,
642
            // We treat this as normal data from the intermediate hops.
643
            &[
644
                maybenot::TriggerEvent::TunnelRecv,
645
                maybenot::TriggerEvent::NormalRecv,
646
            ],
647
            // But from the target hop, it counts as padding.
648
            &[
649
                maybenot::TriggerEvent::TunnelRecv,
650
                maybenot::TriggerEvent::PaddingRecv,
651
            ],
652
        );
653
        Ok(())
654
    }
655
}
656

            
657
impl<S: SleepProvider> PaddingShared<S> {
658
    /// Trigger a list of maybenot events on every hop up to and including `hop`.
659
14764
    fn trigger_events(&mut self, hop: HopNum, events: &[maybenot::TriggerEvent]) {
660
14764
        let final_idx = usize::from(hop);
661
14764
        let now = self.runtime.now();
662
14764
        let next_scheduled_wakeup = self.next_scheduled_wakeup;
663
14764
        for hop_controller in self.hops.iter_mut().take(final_idx + 1) {
664
            let Some(hop_controller) = hop_controller else {
665
                continue;
666
            };
667
            hop_controller.report_events_at(events, now, next_scheduled_wakeup);
668
        }
669
14764
    }
670

            
671
    /// Trigger `intermediate_hop_events` on every hop up to but _not_ including `hop`.
672
    ///
673
    /// Trigger `final_hop_events` on `hop`.
674
    ///
675
    /// (Don't trigger anything on any hops _after_ `hop`.)
676
    fn trigger_events_mixed(
677
        &mut self,
678
        hop: HopNum,
679
        intermediate_hop_events: &[maybenot::TriggerEvent],
680
        final_hop_events: &[maybenot::TriggerEvent],
681
    ) {
682
        use itertools::Itertools as _;
683
        use itertools::Position as P;
684
        let final_idx = usize::from(hop);
685
        let now = self.runtime.now();
686
        let next_scheduled_wakeup = self.next_scheduled_wakeup;
687
        for (position, hop_controller) in self.hops.iter_mut().take(final_idx + 1).with_position() {
688
            let Some(hop_controller) = hop_controller else {
689
                continue;
690
            };
691
            let events = match position {
692
                P::First | P::Middle => intermediate_hop_events,
693
                P::Last | P::Only => final_hop_events,
694
            };
695
            hop_controller.report_events_at(events, now, next_scheduled_wakeup);
696
        }
697
    }
698

            
699
    /// Increment the normal cell count from every hop up to and including `hop`.
700
948
    fn inc_normal_received(&mut self, hop: HopNum) {
701
948
        let final_idx = usize::from(hop);
702
948
        for stats in self.stats.iter_mut().take(final_idx + 1).flatten() {
703
            stats.n_normal += 1;
704
        }
705
948
    }
706

            
707
    /// Increment the padding count from `hop`, and the normal cell count from all earlier hops.
708
    ///
709
    /// Return an error if a padding cell from `hop` would not be acceptable.
710
    fn inc_padding_received(&mut self, hop: HopNum) -> Result<(), ExcessPadding> {
711
        use itertools::Itertools as _;
712
        use itertools::Position as P;
713
        let final_idx = usize::from(hop);
714
        for (position, stats) in self.stats.iter_mut().take(final_idx + 1).with_position() {
715
            match (position, stats) {
716
                (P::First | P::Middle, Some(stats)) => stats.n_normal += 1,
717
                (P::First | P::Middle, None) => {}
718
                (P::Last | P::Only, Some(stats)) => {
719
                    stats.n_padding += 1;
720
                    stats.validate()?;
721
                }
722
                (P::Last | P::Only, None) => {
723
                    return Err(ExcessPadding::NoPaddingNegotiated);
724
                }
725
            }
726
        }
727
        Ok(())
728
    }
729

            
730
    /// Return the `QueuedCellPaddingInfo` to use when sending messages to `target_hop`
731
    #[allow(clippy::unnecessary_wraps)]
732
9230
    fn info_for_hop(&self, target_hop: HopNum) -> Option<QueuedCellPaddingInfo> {
733
        // TODO circpad optimization: This is always Some for now, but we
734
        // could someday avoid creating this object
735
        // when padding is not enabled on the circuit,
736
        // or if padding is not enabled on any hop of the circuit <= target_hop.
737
9230
        Some(QueuedCellPaddingInfo { target_hop })
738
9230
    }
739
}
740

            
741
impl<S: SleepProvider> PaddingShared<S> {
742
    /// Install or remove a [`CircuitPadder`] for a single hop.
743
    fn set_hop_backend(&mut self, hop: HopNum, backend: Option<CircuitPadder>) {
744
        let hop_idx: usize = hop.into();
745
        assert!(hop_idx < MAX_HOPS);
746
        let n_needed = hop_idx + 1;
747
        // Make sure there are enough spaces in self.hops.
748
        // We can't use "resize" or "extend", since Box<dyn<PaddingBackend>>
749
        // doesn't implement Clone, which SmallVec requires.
750
        while self.hops.len() < n_needed {
751
            self.hops.push(None);
752
        }
753
        while self.stats.len() < n_needed {
754
            self.stats.push(None);
755
        }
756
        // project through option...
757
        let (hop_backend, stats) = if let Some(padder) = backend {
758
            (Some(padder.backend), Some(padder.initial_stats))
759
        } else {
760
            (None, None)
761
        };
762
        self.hops[hop_idx] = hop_backend;
763
        self.stats[hop_idx] = stats;
764

            
765
        let was_blocked = self.blocking.hop_blocked[hop_idx];
766
        self.blocking.set_unblocked(hop_idx);
767
        if was_blocked {
768
            self.pending_events
769
                .push_back(self.blocking.blocking_update_paddingevent());
770
        }
771

            
772
        // We need to alert the stream, in case we added an event above, and so that it will poll
773
        // the new padder at least once.
774
        self.waker.wake_by_ref();
775
    }
776

            
777
    /// Transform a [`PerHopPaddingEvent`] for a single hop with index `idx` into a [`PaddingEvent`],
778
    /// updating our state as appropriate.
779
    fn process_per_hop_event(
780
        blocking: &mut BlockingState,
781
        hop_idx: usize,
782
        event: PerHopPaddingEvent,
783
    ) -> PaddingEvent {
784
        use PaddingEvent as PE;
785
        use PerHopPaddingEvent as PHPE;
786

            
787
        match event {
788
            PHPE::SendPadding {
789
                machine,
790
                replace,
791
                bypass,
792
            } => PE::SendPadding(SendPadding {
793
                machine,
794
                hop: hopnum_from_hop_idx(hop_idx),
795
                replace,
796
                bypass,
797
            }),
798
            PHPE::StartBlocking { is_bypassable } => {
799
                // NOTE that we remember is_bypassable for every hop, but the blocking is only
800
                // bypassable if _every_ hop is unblocked, or has bypassable blocking.
801
                blocking.set_blocked(hop_idx, is_bypassable);
802
                blocking.blocking_update_paddingevent()
803
            }
804
            PHPE::StopBlocking => {
805
                blocking.set_unblocked(hop_idx);
806
                blocking.blocking_update_paddingevent()
807
            }
808
        }
809
    }
810

            
811
    /// Extract every PaddingEvent that is ready to be reported to the circuit at time `now`.
812
    ///
813
    /// May trigger other events, or wake up the stream, in the course of running.
814
12636
    fn take_padding_events_at(&mut self, now: Instant) -> PaddingEventQueue {
815
12636
        let mut output = PaddingEventQueue::default();
816
12636
        for (hop_idx, backend) in self.hops.iter_mut().enumerate() {
817
            let Some(backend) = backend else {
818
                continue;
819
            };
820

            
821
            let hop_events = backend.take_padding_events_at(now, self.next_scheduled_wakeup);
822

            
823
            output.extend(
824
                hop_events
825
                    .into_iter()
826
                    .map(|ev| Self::process_per_hop_event(&mut self.blocking, hop_idx, ev)),
827
            );
828
        }
829
12636
        output
830
12636
    }
831

            
832
    /// Find the next time at which we should wake up the stream, and register it as our
833
    /// "next scheduled wakeup".
834
12636
    fn schedule_next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
835
        // Find the earliest time at which any hop has a scheduled event.
836
12636
        let next_expiration = self
837
12636
            .hops
838
12636
            .iter_mut()
839
12636
            .flatten()
840
12636
            .filter_map(|hop| hop.next_wakeup(waker))
841
12636
            .min();
842
12636
        self.next_scheduled_wakeup = next_expiration;
843
12636
        self.waker = waker.clone();
844
12636
        next_expiration
845
12636
    }
846
}
847

            
848
/// A stream of [`PaddingEvent`] to tell a circuit when (if at all) it should send
849
/// padding and block traffic.
850
//
851
// TODO circpad: Optimize this even more for the no-padding case?
852
// We could make it smaller or faster.
853
pub(crate) struct PaddingEventStream<S = DynTimeProvider>
854
where
855
    S: SleepProvider,
856
{
857
    /// An underlying list of PaddingBackend.
858
    shared: Arc<Mutex<PaddingShared<S>>>,
859

            
860
    /// A future defining a time at which we must next call `padder.padding_events_at`.
861
    ///
862
    /// (We also arrange for the backend to wake us up if we need to change this time,
863
    /// or call `padder.padding_events_at`.)
864
    ///
865
    /// Note that this timer is allowed to be _earlier_ than our true wakeup time,
866
    /// but not later.
867
    sleep_future: S::SleepFuture,
868
}
869

            
870
impl futures::Stream for PaddingEventStream {
871
    type Item = PaddingEvent;
872

            
873
12636
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
874
        loop {
875
12636
            let (now, next_wakeup, runtime) = {
876
                // We destructure like this to avoid simultaneous mutable/immutable borrows.
877
12636
                let Self { shared, .. } = &mut *self;
878

            
879
12636
                let mut shared = shared.lock().expect("Poisoned lock");
880

            
881
                // Do we have any events that are waiting to be yielded?
882
12636
                if let Some(val) = shared.pending_events.pop_front() {
883
                    return Poll::Ready(Some(val));
884
12636
                }
885

            
886
                // Does the padder have any events that have become ready to be yielded?
887
12636
                let now = shared.runtime.now();
888
12636
                shared.pending_events = shared.take_padding_events_at(now);
889

            
890
12636
                if let Some(val) = shared.pending_events.pop_front() {
891
                    return Poll::Ready(Some(val));
892
12636
                }
893

            
894
                // If we reach this point, there are no events to trigger right now.
895
                //
896
                // We'll ask all the padders for the time at which they next might need to take
897
                // action, and register our Waker with them, to be alerted if we need to take any action
898
                // before that.
899
12636
                (
900
12636
                    now,
901
12636
                    shared.schedule_next_wakeup(cx.waker()),
902
12636
                    shared.runtime.clone(),
903
12636
                )
904
                // Here we drop the lock on the shared state.
905
            };
906

            
907
12636
            match next_wakeup {
908
                None => {
909
12636
                    return Poll::Pending;
910
                }
911
                Some(t) => {
912
                    // TODO circpad: Avoid rebuilding sleep future needlessly.  May require new APIs in
913
                    // tor-rtcompat.
914
                    self.sleep_future = runtime.sleep(t.saturating_duration_since(now));
915
                    match self.sleep_future.as_mut().poll(cx) {
916
                        Poll::Ready(()) => {
917
                            // Okay, The timer expired already. Continue through the loop.
918
                            continue;
919
                        }
920
                        Poll::Pending => return Poll::Pending,
921
                    }
922
                }
923
            }
924
        }
925
12636
    }
926
}
927

            
928
impl futures::stream::FusedStream for PaddingEventStream {
929
12590
    fn is_terminated(&self) -> bool {
930
        // This stream is _never_ terminated: even if it has no padding machines now,
931
        // we might add some in the future.
932
12590
        false
933
12590
    }
934
}
935

            
936
/// Construct a HopNum from an index into the `hops` field of a [`PaddingShared`].
937
///
938
/// # Panics
939
///
940
/// Panics if `hop_idx` is greater than u8::MAX, which should be impossible.
941
fn hopnum_from_hop_idx(hop_idx: usize) -> HopNum {
942
    // (Static assertion: makes sure we can represent every index of hops as a HopNum.)
943
    const _: () = assert!(MAX_HOPS < u8::MAX as usize);
944
    HopNum::from(u8::try_from(hop_idx).expect("hop_idx out of range!"))
945
}
946

            
947
/// Create a new, empty padding instance for a new circuit.
948
986
pub(crate) fn new_padding<S>(runtime: S) -> (PaddingController<S>, PaddingEventStream<S>)
949
986
where
950
986
    S: SleepProvider,
951
{
952
    // Start with an arbitrary sleep future.  We won't actually use this until
953
    // the first time that we have an event to schedule, so the timeout doesn't matter.
954
986
    let sleep_future = runtime.sleep(Duration::new(86400, 0));
955

            
956
986
    let shared = PaddingShared {
957
986
        runtime,
958
986
        hops: Default::default(),
959
986
        stats: Default::default(),
960
986
        blocking: Default::default(),
961
986
        next_scheduled_wakeup: None,
962
986
        pending_events: PaddingEventQueue::default(),
963
986
        waker: Waker::noop().clone(),
964
986
    };
965
986
    let shared = Arc::new(Mutex::new(shared));
966
986
    let controller = PaddingController {
967
986
        shared: shared.clone(),
968
986
    };
969
986
    let stream = PaddingEventStream {
970
986
        shared,
971
986
        sleep_future,
972
986
    };
973

            
974
986
    (controller, stream)
975
986
}