1
//! A `maybenot`-specific backend for padding.
2

            
3
// Some of the circuit padding implementation isn't reachable unless
4
// the extra-experimental circ-padding-manual feature is also present.
5
//
6
// TODO circpad: Remove this once we have circ-padding negotiation implemented.
7
#![cfg_attr(
8
    all(feature = "circ-padding", not(feature = "circ-padding-manual")),
9
    expect(dead_code)
10
)]
11

            
12
mod backend;
13

            
14
use std::collections::VecDeque;
15
use std::num::NonZeroU16;
16
use std::pin::Pin;
17
use std::sync::{Arc, Mutex};
18
use std::task::{Context, Poll, Waker};
19

            
20
use bitvec::BitArr;
21
use maybenot::MachineId;
22
use smallvec::SmallVec;
23
use tor_memquota::memory_cost_structural_copy;
24
use tor_rtcompat::{DynTimeProvider, SleepProvider};
25

            
26
use crate::HopNum;
27
use crate::circuit::HOPS;
28
use crate::util::err::ExcessPadding;
29
use backend::PaddingBackend;
30

            
31
/// The type of Instant that we'll use for our padding machines.
32
///
33
/// We use a separate type alias here in case we want to move to coarsetime.
34
type Instant = std::time::Instant;
35

            
36
/// The type of Duration that we'll use for our padding machines.
37
///
38
/// We use a separate type alias here in case we want to move to coarsetime.
39
type Duration = std::time::Duration;
40

            
41
/// A type we use to generate a set of [`PaddingEvent`].
42
///
43
/// This is a separate type so we can tune it and make it into a smallvec if needed.
44
type PaddingEventQueue = VecDeque<PaddingEvent>;
45

            
46
/// A type we use to generate a set of [`PaddingEvent`].
47
///
48
/// This is a separate type so we can tune it and make it into a smallvec if needed.
49
type PerHopPaddingEventVec = Vec<PerHopPaddingEvent>;
50

            
51
/// Specifications for a set of maybenot padding machines as used in Arti: used to construct a `maybenot::Framework`.
52
#[derive(Clone, Debug, derive_builder::Builder)]
53
#[builder(build_fn(
54
    validate = "Self::validate",
55
    private,
56
    error = "CircuitPadderConfigError"
57
))]
58
#[builder(name = "CircuitPadderConfig")]
59
#[cfg_attr(not(feature = "circ-padding-manual"), builder(private))]
60
#[cfg_attr(feature = "circ-padding-manual", builder(public))]
61
pub(crate) struct PaddingRules {
62
    /// List of padding machines to use for shaping traffic.
63
    ///
64
    /// Note that this list may be empty, if we only want to receive padding,
65
    /// and never send it.
66
    machines: Arc<[maybenot::Machine]>,
67
    /// Maximum allowable outbound padding fraction.
68
    ///
69
    /// Passed directly to maybenot; not enforced in Arti.
70
    /// See [`maybenot::Framework::new`] for details.
71
    ///
72
    /// Must be between 0.0 and 1.0
73
    #[builder(default = "1.0")]
74
    max_outbound_blocking_frac: f64,
75
    /// Maximum allowable outbound blocking fraction.
76
    ///
77
    /// Passed directly to maybenot; not enforced in Arti.
78
    /// See [`maybenot::Framework::new`] for details.
79
    ///
80
    /// Must be between 0.0 and 1.0.
81
    #[builder(default = "1.0")]
82
    max_outbound_padding_frac: f64,
83
    /// Maximum allowable fraction of inbound padding
84
    #[builder(default = "1.0")]
85
    max_inbound_padding_frac: f64,
86
    /// Number of cells before which we should not enforce max_inbound_padding_frac.
87
    #[builder(default = "1")]
88
    enforce_inbound_padding_after_cells: u16,
89
}
90

            
91
/// An error returned from validating a [`CircuitPadderConfig`].
92
#[derive(Clone, Debug, thiserror::Error)]
93
#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
94
#[non_exhaustive]
95
pub(crate) enum CircuitPadderConfigError {
96
    /// A field needed to be given, but wasn't.
97
    #[error("No value was given for {0}")]
98
    UninitializedField(&'static str),
99
    /// A field needed to be a proper fraction, but wasn't.
100
    #[error("Value was out of range for {0}. (Must be between 0 and 1)")]
101
    FractionOutOfRange(&'static str),
102
    /// Maybenot gave us an error when initializing the framework.
103
    #[error("Maybenot could not initialize framework for rules")]
104
    MaybenotError(#[from] maybenot::Error),
105
}
106

            
107
impl From<derive_builder::UninitializedFieldError> for CircuitPadderConfigError {
108
    fn from(value: derive_builder::UninitializedFieldError) -> Self {
109
        Self::UninitializedField(value.field_name())
110
    }
111
}
112

            
113
impl CircuitPadderConfig {
114
    /// Helper: Return an error if this is not a valid Builder.
115
    fn validate(&self) -> Result<(), CircuitPadderConfigError> {
116
        macro_rules! enforce_frac {
117
            { $field:ident } =>
118
            {
119
                if self.$field.is_some_and(|v| ! (0.0 .. 1.0).contains(&v)) {
120
                    return Err(CircuitPadderConfigError::FractionOutOfRange(stringify!($field)));
121
                }
122
            }
123
        }
124
        enforce_frac!(max_outbound_blocking_frac);
125
        enforce_frac!(max_outbound_padding_frac);
126
        enforce_frac!(max_inbound_padding_frac);
127

            
128
        Ok(())
129
    }
130

            
131
    /// Construct a [`CircuitPadder`] based on this [`CircuitPadderConfig`].
132
    ///
133
    /// A [`CircuitPadderConfig`] is created its accessors, and used with this method to build a [`CircuitPadder`].
134
    ///
135
    /// That [`CircuitPadder`] can then be installed on a circuit using [`ClientCirc::start_padding_at_hop`](crate::client::circuit::ClientCirc::start_padding_at_hop).
136
    #[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
137
    pub(crate) fn create_padder(&self) -> Result<CircuitPadder, CircuitPadderConfigError> {
138
        let rules = self.build()?;
139
        let backend = rules.create_padding_backend()?;
140
        let initial_stats = rules.initialize_stats();
141
        Ok(CircuitPadder {
142
            initial_stats,
143
            backend,
144
        })
145
    }
146
}
147

            
148
impl PaddingRules {
149
    /// Create a [`PaddingBackend`] for this [`PaddingRules`], so we can install it in a
150
    /// [`PaddingShared`].
151
    fn create_padding_backend(&self) -> Result<Box<dyn PaddingBackend>, maybenot::Error> {
152
        // TODO circpad: specialize this for particular values of n_machines,
153
        // when we finally go to implement padding.
154
        const OPTIMIZE_FOR_N_MACHINES: usize = 4;
155

            
156
        let backend =
157
            backend::MaybenotPadder::<OPTIMIZE_FOR_N_MACHINES>::from_framework_rules(self)?;
158
        Ok(Box::new(backend))
159
    }
160

            
161
    /// Create a new `PaddingStats` to reflect the rules for inbound padding of this  PaddingRules
162
    fn initialize_stats(&self) -> PaddingStats {
163
        PaddingStats {
164
            n_padding: 0,
165
            n_normal: 0,
166
            max_padding_frac: self.max_inbound_padding_frac as f32,
167
            // We just convert 0 to 1, since that's necessarily what was meant.
168
            enforce_max_after: self
169
                .enforce_inbound_padding_after_cells
170
                .try_into()
171
                .unwrap_or(1.try_into().expect("1 was not nonzero!?")),
172
        }
173
    }
174
}
175

            
176
/// A opaque handle to a padding implementation for a single hop.
177
///
178
/// This type is constructed with [`CircuitPadderConfig::create_padder`].
179
#[derive(derive_more::Debug)]
180
#[cfg_attr(feature = "circ-padding-manual", visibility::make(pub))]
181
pub(crate) struct CircuitPadder {
182
    /// The initial padding stats and restrictions for inbound padding.
183
    initial_stats: PaddingStats,
184
    /// The underlying backend to use.
185
    #[debug(skip)]
186
    backend: Box<dyn PaddingBackend>,
187
}
188

            
189
/// An instruction from the padding machine to the circuit.
190
///
191
/// These are returned from the [`PaddingEventStream`].
192
///
193
/// When the `circ-padding` feature is disabled, these won't actually be constructed.
194
#[derive(Clone, Copy, Debug)]
195
pub(crate) enum PaddingEvent {
196
    /// An instruction to send padding.
197
    SendPadding(SendPadding),
198
    /// An instruction to start blocking outbound traffic,
199
    /// or change the hop at which traffic is blocked.
200
    StartBlocking(StartBlocking),
201
    /// An instruction to stop all blocking.
202
    StopBlocking,
203
}
204

            
205
/// An instruction from a single padding hop.
206
///
207
/// This will be turned into a [`PaddingEvent`] before it's given
208
/// to the circuit reactor.
209
#[derive(Clone, Copy, Debug)]
210
enum PerHopPaddingEvent {
211
    /// An instruction to send padding.
212
    SendPadding {
213
        /// The machine that told us to send the padding.
214
        ///
215
        /// (We need to use this when we report that we sent the padding.)
216
        machine: MachineId,
217
        /// Whether the padding can be replaced with regular data.
218
        replace: Replace,
219
        /// Whether the padding can bypass a bypassable block.
220
        bypass: Bypass,
221
    },
222
    /// An instruction to start blocking traffic..
223
    StartBlocking {
224
        /// Whether this blocking instance may be bypassed by padding with
225
        /// [`Bypass::BypassBlocking`].
226
        ///
227
        /// (Note that this is _not_ a `Bypass`, since that enum notes whether
228
        /// or not a _padding_ cell can bypass blocking)
229
        is_bypassable: bool,
230
    },
231
    /// An instruction to stop blocking.
232
    StopBlocking,
233
}
234

            
235
/// Whether a given piece of padding can be replaced with queued data.
236
///
237
/// This is an enum to avoid confusing it with `Bypass`.
238
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
239
pub(crate) enum Replace {
240
    /// The padding can be replaced
241
    /// either by packaging data in a regular data cell,
242
    /// or with data currently queued but not yet sent.
243
    Replaceable,
244
    /// The padding must be queued; it can't be replaced with data.
245
    NotReplaceable,
246
}
247

            
248
impl Replace {
249
    /// Construct a [`Replace`] from a bool.
250
    fn from_bool(replace: bool) -> Self {
251
        match replace {
252
            true => Replace::Replaceable,
253
            false => Replace::NotReplaceable,
254
        }
255
    }
256
}
257

            
258
/// Whether a piece of padding can bypass a bypassable case of blocking.
259
///
260
/// This is an enum to avoid confusing it with `Release`.
261
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
262
pub(crate) enum Bypass {
263
    /// This padding may bypass the block, if the block is bypassable.
264
    ///
265
    /// Note that this case has complicated interactions with `Replace`; see the
266
    /// `maybenot` documentation.
267
    BypassBlocking,
268
    /// The padding may not bypass the block.
269
    DoNotBypass,
270
}
271

            
272
/// Information about a queued cell that we need to feed back into the padding
273
/// subsystem.
274
#[derive(Clone, Copy, Debug)]
275
pub(crate) struct QueuedCellPaddingInfo {
276
    /// The hop that will receive this cell.
277
    pub(crate) target_hop: HopNum,
278
}
279
memory_cost_structural_copy!(QueuedCellPaddingInfo);
280

            
281
impl Bypass {
282
    /// Construct a [`Bypass`] from a bool.
283
    fn from_bool(replace: bool) -> Self {
284
        match replace {
285
            true => Bypass::BypassBlocking,
286
            false => Bypass::DoNotBypass,
287
        }
288
    }
289
}
290

            
291
/// An indication that we should send a padding cell.
292
///
293
/// Don't drop this: instead, once the cell is queued,
294
/// pass this `SendPadding` object to the relevant [`PaddingController`]
295
/// to report that the particular piece of padding has been queued.
296
#[derive(Clone, Debug, Copy)]
297
pub(crate) struct SendPadding {
298
    /// The machine within a framework that told us to send the padding.
299
    ///
300
    /// We store this so we can tell the framework which machine's padding we sent.
301
    machine: maybenot::MachineId,
302

            
303
    /// The hop to which we need to send the padding.
304
    pub(crate) hop: HopNum,
305

            
306
    /// Whether this padding can be replaced by regular data.
307
    pub(crate) replace: Replace,
308

            
309
    /// Whether this padding cell should bypass any current blocking.
310
    pub(crate) bypass: Bypass,
311
}
312

            
313
impl SendPadding {
314
    /// Convert this SendPadding into a TriggerEvent for Maybenot,
315
    /// to indicate that the padding was sent.
316
    fn into_sent_event(self) -> maybenot::TriggerEvent {
317
        maybenot::TriggerEvent::PaddingSent {
318
            machine: self.machine,
319
        }
320
    }
321

            
322
    /// If true, we are allowed to replace this padding cell
323
    /// with a normal non-padding cell.
324
    ///
325
    /// (If we do, we should call [`PaddingController::queued_data_as_padding`])
326
    pub(crate) fn may_replace_with_data(&self) -> Replace {
327
        self.replace
328
    }
329

            
330
    /// Return whether this padding cell is allowed to bypass any current blocking.
331
    pub(crate) fn may_bypass_block(&self) -> Bypass {
332
        self.bypass
333
    }
334
}
335

            
336
/// An instruction to start blocking traffic
337
/// or to change the rules for blocking traffic.
338
#[derive(Clone, Copy, Debug)]
339
pub(crate) struct StartBlocking {
340
    /// If true, then padding traffic _to the blocking hop_
341
    /// can bypass this block, if it has [`Bypass::BypassBlocking`].
342
    ///
343
    /// (All traffic can be sent to earlier hops as normal.
344
    /// No traffic may be sent to later hops.)
345
    pub(crate) is_bypassable: bool,
346
}
347

            
348
/// Absolute upper bound for number of hops.
349
const MAX_HOPS: usize = 64;
350

            
351
/// A handle to the padding state of a single circuit.
352
///
353
/// Used to tell the padders about events that they need to react to.
354
#[derive(Clone, derive_more::Debug)]
355
pub(crate) struct PaddingController<S = DynTimeProvider>
356
where
357
    S: SleepProvider,
358
{
359
    /// The underlying shared state.
360
    #[debug(skip)]
361
    shared: Arc<Mutex<PaddingShared<S>>>,
362
}
363

            
364
/// The shared state for a single circuit's padding.
365
///
366
/// Used by both PaddingController and PaddingEventStream.
367
struct PaddingShared<S: SleepProvider> {
368
    /// A sleep provider for telling the time and creating sleep futures.
369
    runtime: S,
370
    /// Per-hop state for each hop that we have enabled padding with.
371
    ///
372
    /// INVARIANT: the length of this vector is no greater than `MAX_HOPS`.
373
    hops: SmallVec<[Option<Box<dyn PaddingBackend>>; HOPS]>,
374
    /// Records about how much padding and normal traffic we have received from each hop,
375
    /// and how much padding is allowed.
376
    stats: SmallVec<[Option<PaddingStats>; HOPS]>,
377
    /// Which hops are currently blocking, and whether that blocking is bypassable.
378
    blocking: BlockingState,
379
    /// When will the currently pending sleep future next expire?
380
    ///
381
    /// We keep track of this so that we know when we need to reset the sleep future.
382
    /// It gets updated by `PaddingStream::schedule_next_wakeup`,
383
    /// which we call in `<PaddingStream as Stream>::poll_next` immediately
384
    /// before we create a timer.
385
    next_scheduled_wakeup: Option<Instant>,
386

            
387
    /// A deque of `PaddingEvent` that we want to yield from our [`PaddingEventStream`].
388
    ///
389
    /// NOTE: If you put new items in this list from anywhere other than inside
390
    /// `PaddingEventStream::poll_next`, you need to alert the `waker`.
391
    pending_events: PaddingEventQueue,
392

            
393
    /// A waker to alert if we've added any events to padding_events,
394
    /// or if we need the stream to re-poll.
395
    //
396
    // TODO circpad: This waker is redundant with the one stored in every backend's `Timer`.
397
    // When we revisit this code we may want to consider combining them somehow.
398
    waker: Waker,
399
}
400

            
401
/// The number of padding and non-padding cells we have received from each hop,
402
/// and the rules for how many are allowed.
403
#[derive(Clone, Debug)]
404
struct PaddingStats {
405
    /// The number of padding cells we've received from this hop.
406
    n_padding: u64,
407
    /// The number of non-padding cells we've received from this hop.
408
    n_normal: u64,
409
    /// The maximum allowable fraction of padding cells.
410
    max_padding_frac: f32,
411
    /// A lower limit, below which we will not enforce `max_padding_frac`.
412
    //
413
    // This is a NonZero for several reasons:
414
    // - It doesn't make sense to enforce a ratio when no cells have been received.
415
    // - If we only check when the total is at above zero, we can avoid a division-by-zero check.
416
    // - Having an impossible value here ensures that the niche optimization
417
    //   will work on PaddingStats.
418
    enforce_max_after: NonZeroU16,
419
}
420

            
421
impl PaddingStats {
422
    /// Return an error if this PaddingStats has exceeded its maximum.
423
    fn validate(&self) -> Result<(), ExcessPadding> {
424
        // Total number of cells.
425
        // (It is impossible to get so many cells that this addition will overflow a u64.)
426
        let total = self.n_padding + self.n_normal;
427

            
428
        if total >= u16::from(self.enforce_max_after).into() {
429
            // TODO: is there a way we can avoid a floating-point op here?
430
            // Or can we limit the number of times that we need to check?
431
            // (Tobias suggests randomization; I'm worried.)
432
            //
433
            // On the one hand, this may never appear on our profiles.
434
            // But on the other hand, if it _does_ matter for performance,
435
            // it is likely to be on some marginal platform with bad FP performance,
436
            // where we are unlikely to be doing much testing.
437
            //
438
            // One promising possibility is to calculate a minimum amount of padding
439
            // that we _know_ will be valid, given the current total,
440
            // and then not check again until we at all until we reach that amount.
441
            if self.n_padding as f32 > (total as f32 * self.max_padding_frac) {
442
                return Err(ExcessPadding::PaddingExceedsLimit);
443
            }
444
        }
445
        Ok(())
446
    }
447
}
448

            
449
/// Current padding-related blocking status for a circuit.
450
///
451
/// We have to keep track of whether each hop is blocked or not,
452
/// and whether its blocking is bypassable.
453
/// But all we actually need to tell the reactor code
454
/// is whether to block the _entire_ circuit or not.
455
//
456
// TODO circpad: It might beneficial
457
// to block only the first blocking hop and its successors,
458
// but that creates tricky starvation problems
459
// in the case where we have queued traffic for a later, blocking, hop
460
// that prevents us from flushing any messages to earlier hops.
461
// We could solve this with tricky out-of-order designs,
462
// but for now we're just treating "blocked" as a boolean.
463
#[derive(Default)]
464
struct BlockingState {
465
    /// Whether each hop is currently blocked.
466
    hop_blocked: BitArr![for MAX_HOPS],
467
    /// Whether each hop's blocking is currently **not** bypassable.
468
    blocking_non_bypassable: BitArr![for MAX_HOPS],
469
}
470

            
471
impl BlockingState {
472
    /// Set the hop at position `idx` to blocked.
473
    fn set_blocked(&mut self, idx: usize, is_bypassable: bool) {
474
        self.hop_blocked.set(idx, true);
475
        self.blocking_non_bypassable.set(idx, !is_bypassable);
476
    }
477
    /// Set the hop at position `idx` to unblocked.
478
    fn set_unblocked(&mut self, idx: usize) {
479
        self.hop_blocked.set(idx, false);
480
        self.blocking_non_bypassable.set(idx, false);
481
    }
482
    /// Return a [`PaddingEvent`]
483
    fn blocking_update_paddingevent(&self) -> PaddingEvent {
484
        if self.blocking_non_bypassable.any() {
485
            // At least one hop has non-bypassable blocking, so our blocking is non-bypassable.
486
            PaddingEvent::StartBlocking(StartBlocking {
487
                is_bypassable: false,
488
            })
489
        } else if self.hop_blocked.any() {
490
            // At least one hop is blocking, but no hop has non-bypassable padding, so this padding
491
            // is bypassable.
492
            PaddingEvent::StartBlocking(StartBlocking {
493
                is_bypassable: true,
494
            })
495
        } else {
496
            // Nobody is blocking right now; it's time to unblock.
497
            PaddingEvent::StopBlocking
498
        }
499
    }
500
}
501

            
502
#[allow(clippy::unnecessary_wraps)]
503
impl<S: SleepProvider> PaddingController<S> {
504
    /// Report that we've enqueued a non-padding cell for a given hop.
505
    ///
506
    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
507
    /// when this cell is flushed.
508
9232
    pub(crate) fn queued_data(&self, hop: HopNum) -> Option<QueuedCellPaddingInfo> {
509
9232
        let mut shared = self.shared.lock().expect("Lock poisoned");
510
        // Every hop up to and including the target hop will see this as normal data.
511
9232
        shared.trigger_events(hop, &[maybenot::TriggerEvent::NormalSent]);
512
9232
        shared.info_for_hop(hop)
513
9232
    }
514

            
515
    /// Install the given [`CircuitPadder`] to start padding traffic to the listed `hop`.
516
    ///
517
    /// Stops padding if the provided padder is `None`.
518
    ///
519
    /// Replaces any previous [`CircuitPadder`].
520
    pub(crate) fn install_padder_padding_at_hop(&self, hop: HopNum, padder: Option<CircuitPadder>) {
521
        self.shared
522
            .lock()
523
            .expect("lock poisoned")
524
            .set_hop_backend(hop, padder);
525
    }
526

            
527
    /// Report that we have enqueued a non-padding cell
528
    /// in place of a replaceable padding cell
529
    /// for a given hop.
530
    ///
531
    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
532
    /// when this cell is flushed.
533
    pub(crate) fn queued_data_as_padding(
534
        &self,
535
        hop: HopNum,
536
        sendpadding: SendPadding,
537
    ) -> Option<QueuedCellPaddingInfo> {
538
        assert_eq!(hop, sendpadding.hop);
539
        assert_eq!(Replace::Replaceable, sendpadding.replace);
540
        let mut shared = self.shared.lock().expect("Lock poisoned");
541
        shared.trigger_events_mixed(
542
            hop,
543
            // Each intermediate hop sees this as normal data.
544
            &[maybenot::TriggerEvent::NormalSent],
545
            // For the target hop, we treat this both as normal, _and_ as padding.
546
            &[
547
                maybenot::TriggerEvent::NormalSent,
548
                sendpadding.into_sent_event(),
549
            ],
550
        );
551
        shared.info_for_hop(hop)
552
    }
553

            
554
    /// Report that we have enqueued a padding cell to a given hop.
555
    ///
556
    /// Return a QueuedCellPaddingInfo if we need to alert the padding subsystem
557
    /// when this cell is flushed.
558
    pub(crate) fn queued_padding(
559
        &self,
560
        hop: HopNum,
561
        sendpadding: SendPadding,
562
    ) -> Option<QueuedCellPaddingInfo> {
563
        assert_eq!(hop, sendpadding.hop);
564
        let mut shared = self.shared.lock().expect("Lock poisoned");
565
        shared.trigger_events_mixed(
566
            hop,
567
            // Each intermediate hop sees this as normal data.
568
            &[maybenot::TriggerEvent::NormalSent],
569
            // The target hop sees this as padding.
570
            &[sendpadding.into_sent_event()],
571
        );
572
        shared.info_for_hop(hop)
573
    }
574

            
575
    /// Report that we are using an already-queued cell
576
    /// as a substitute for sending padding to a given hop.
577
    pub(crate) fn replaceable_padding_already_queued(&self, hop: HopNum, sendpadding: SendPadding) {
578
        assert_eq!(hop, sendpadding.hop);
579
        let mut shared = self.shared.lock().expect("Lock poisoned");
580
        shared.trigger_events_mixed(
581
            hop,
582
            // No additional data will be seen for any intermediate hops.
583
            &[],
584
            // The target hop's machine sees this as padding.
585
            &[sendpadding.into_sent_event()],
586
        );
587
    }
588

            
589
    /// Report that we've flushed a cell from the queue for the given hop.
590
    pub(crate) fn flushed_relay_cell(&self, info: QueuedCellPaddingInfo) {
591
        // Every hop up to the last
592
        let mut shared = self.shared.lock().expect("Lock poisoned");
593
        shared.trigger_events(info.target_hop, &[maybenot::TriggerEvent::TunnelSent]);
594
    }
595

            
596
    /// Report that we've flushed a cell from the per-channel queue.
597
4570
    pub(crate) fn flushed_channel_cell(&self) {
598
4570
        let mut shared = self.shared.lock().expect("Lock poisoned");
599
4570
        shared.trigger_events(HopNum::from(0), &[maybenot::TriggerEvent::TunnelSent]);
600
4570
    }
601

            
602
    /// Report that we have decrypted a non-padding cell from our queue
603
    /// from a given hop.
604
    ///
605
    // Note that in theory, it would be better to trigger TunnelRecv as soon as
606
    // possible after we receive and enqueue the data cell, and NormalRecv only
607
    // once we've decrypted it and found it to be data.  But we can't do that,
608
    // since we won't know which hop actually originated the cell until we
609
    // decrypt it.
610
932
    pub(crate) fn decrypted_data(&self, hop: HopNum) {
611
932
        let mut shared = self.shared.lock().expect("Lock poisoned");
612
932
        shared.inc_normal_received(hop);
613
932
        shared.trigger_events(
614
932
            hop,
615
932
            // We treat this as normal data from every hop.
616
932
            &[
617
932
                maybenot::TriggerEvent::TunnelRecv,
618
932
                maybenot::TriggerEvent::NormalRecv,
619
932
            ],
620
932
        );
621
932
    }
622
    /// Report that we have decrypted a padding cell from our queue.
623
    ///
624
    /// Return an error if this padding cell is not acceptable
625
    /// (because we have received too much padding from this hop,
626
    /// or because we have not enabled padding with this hop.)
627
    //
628
    // See note above.
629
    pub(crate) fn decrypted_padding(&self, hop: HopNum) -> Result<(), crate::Error> {
630
        let mut shared = self.shared.lock().expect("Lock poisoned");
631
        shared
632
            .inc_padding_received(hop)
633
            .map_err(|e| crate::Error::ExcessPadding(e, hop))?;
634
        shared.trigger_events_mixed(
635
            hop,
636
            // We treat this as normal data from the intermediate hops.
637
            &[
638
                maybenot::TriggerEvent::TunnelRecv,
639
                maybenot::TriggerEvent::NormalRecv,
640
            ],
641
            // But from the target hop, it counts as padding.
642
            &[
643
                maybenot::TriggerEvent::TunnelRecv,
644
                maybenot::TriggerEvent::PaddingRecv,
645
            ],
646
        );
647
        Ok(())
648
    }
649
}
650

            
651
impl<S: SleepProvider> PaddingShared<S> {
652
    /// Trigger a list of maybenot events on every hop up to and including `hop`.
653
14780
    fn trigger_events(&mut self, hop: HopNum, events: &[maybenot::TriggerEvent]) {
654
14780
        let final_idx = usize::from(hop);
655
14780
        let now = self.runtime.now();
656
14780
        let next_scheduled_wakeup = self.next_scheduled_wakeup;
657
14780
        for hop_controller in self.hops.iter_mut().take(final_idx + 1) {
658
            let Some(hop_controller) = hop_controller else {
659
                continue;
660
            };
661
            hop_controller.report_events_at(events, now, next_scheduled_wakeup);
662
        }
663
14780
    }
664

            
665
    /// Trigger `intermediate_hop_events` on every hop up to but _not_ including `hop`.
666
    ///
667
    /// Trigger `final_hop_events` on `hop`.
668
    ///
669
    /// (Don't trigger anything on any hops _after_ `hop`.)
670
    fn trigger_events_mixed(
671
        &mut self,
672
        hop: HopNum,
673
        intermediate_hop_events: &[maybenot::TriggerEvent],
674
        final_hop_events: &[maybenot::TriggerEvent],
675
    ) {
676
        use itertools::Itertools as _;
677
        use itertools::Position as P;
678
        let final_idx = usize::from(hop);
679
        let now = self.runtime.now();
680
        let next_scheduled_wakeup = self.next_scheduled_wakeup;
681
        for (position, hop_controller) in self.hops.iter_mut().take(final_idx + 1).with_position() {
682
            let Some(hop_controller) = hop_controller else {
683
                continue;
684
            };
685
            let events = match position {
686
                P::First | P::Middle => intermediate_hop_events,
687
                P::Last | P::Only => final_hop_events,
688
            };
689
            hop_controller.report_events_at(events, now, next_scheduled_wakeup);
690
        }
691
    }
692

            
693
    /// Increment the normal cell count from every hop up to and including `hop`.
694
932
    fn inc_normal_received(&mut self, hop: HopNum) {
695
932
        let final_idx = usize::from(hop);
696
932
        for stats in self.stats.iter_mut().take(final_idx + 1).flatten() {
697
            stats.n_normal += 1;
698
        }
699
932
    }
700

            
701
    /// Increment the padding count from `hop`, and the normal cell count from all earlier hops.
702
    ///
703
    /// Return an error if a padding cell from `hop` would not be acceptable.
704
    fn inc_padding_received(&mut self, hop: HopNum) -> Result<(), ExcessPadding> {
705
        use itertools::Itertools as _;
706
        use itertools::Position as P;
707
        let final_idx = usize::from(hop);
708
        for (position, stats) in self.stats.iter_mut().take(final_idx + 1).with_position() {
709
            match (position, stats) {
710
                (P::First | P::Middle, Some(stats)) => stats.n_normal += 1,
711
                (P::First | P::Middle, None) => {}
712
                (P::Last | P::Only, Some(stats)) => {
713
                    stats.n_padding += 1;
714
                    stats.validate()?;
715
                }
716
                (P::Last | P::Only, None) => {
717
                    return Err(ExcessPadding::NoPaddingNegotiated);
718
                }
719
            }
720
        }
721
        Ok(())
722
    }
723

            
724
    /// Return the `QueuedCellPaddingInfo` to use when sending messages to `target_hop`
725
    #[allow(clippy::unnecessary_wraps)]
726
9232
    fn info_for_hop(&self, target_hop: HopNum) -> Option<QueuedCellPaddingInfo> {
727
        // TODO circpad optimization: This is always Some for now, but we
728
        // could someday avoid creating this object
729
        // when padding is not enabled on the circuit,
730
        // or if padding is not enabled on any hop of the circuit <= target_hop.
731
9232
        Some(QueuedCellPaddingInfo { target_hop })
732
9232
    }
733
}
734

            
735
impl<S: SleepProvider> PaddingShared<S> {
736
    /// Install or remove a [`CircuitPadder`] for a single hop.
737
    fn set_hop_backend(&mut self, hop: HopNum, backend: Option<CircuitPadder>) {
738
        let hop_idx: usize = hop.into();
739
        assert!(hop_idx < MAX_HOPS);
740
        let n_needed = hop_idx + 1;
741
        // Make sure there are enough spaces in self.hops.
742
        // We can't use "resize" or "extend", since Box<dyn<PaddingBackend>>
743
        // doesn't implement Clone, which SmallVec requires.
744
        while self.hops.len() < n_needed {
745
            self.hops.push(None);
746
        }
747
        while self.stats.len() < n_needed {
748
            self.stats.push(None);
749
        }
750
        // project through option...
751
        let (hop_backend, stats) = if let Some(padder) = backend {
752
            (Some(padder.backend), Some(padder.initial_stats))
753
        } else {
754
            (None, None)
755
        };
756
        self.hops[hop_idx] = hop_backend;
757
        self.stats[hop_idx] = stats;
758

            
759
        let was_blocked = self.blocking.hop_blocked[hop_idx];
760
        self.blocking.set_unblocked(hop_idx);
761
        if was_blocked {
762
            self.pending_events
763
                .push_back(self.blocking.blocking_update_paddingevent());
764
        }
765

            
766
        // We need to alert the stream, in case we added an event above, and so that it will poll
767
        // the new padder at least once.
768
        self.waker.wake_by_ref();
769
    }
770

            
771
    /// Transform a [`PerHopPaddingEvent`] for a single hop with index `idx` into a [`PaddingEvent`],
772
    /// updating our state as appropriate.
773
    fn process_per_hop_event(
774
        blocking: &mut BlockingState,
775
        hop_idx: usize,
776
        event: PerHopPaddingEvent,
777
    ) -> PaddingEvent {
778
        use PaddingEvent as PE;
779
        use PerHopPaddingEvent as PHPE;
780

            
781
        match event {
782
            PHPE::SendPadding {
783
                machine,
784
                replace,
785
                bypass,
786
            } => PE::SendPadding(SendPadding {
787
                machine,
788
                hop: hopnum_from_hop_idx(hop_idx),
789
                replace,
790
                bypass,
791
            }),
792
            PHPE::StartBlocking { is_bypassable } => {
793
                // NOTE that we remember is_bypassable for every hop, but the blocking is only
794
                // bypassable if _every_ hop is unblocked, or has bypassable blocking.
795
                blocking.set_blocked(hop_idx, is_bypassable);
796
                blocking.blocking_update_paddingevent()
797
            }
798
            PHPE::StopBlocking => {
799
                blocking.set_unblocked(hop_idx);
800
                blocking.blocking_update_paddingevent()
801
            }
802
        }
803
    }
804

            
805
    /// Extract every PaddingEvent that is ready to be reported to the circuit at time `now`.
806
    ///
807
    /// May trigger other events, or wake up the stream, in the course of running.
808
12434
    fn take_padding_events_at(&mut self, now: Instant) -> PaddingEventQueue {
809
12434
        let mut output = PaddingEventQueue::default();
810
12434
        for (hop_idx, backend) in self.hops.iter_mut().enumerate() {
811
            let Some(backend) = backend else {
812
                continue;
813
            };
814

            
815
            let hop_events = backend.take_padding_events_at(now, self.next_scheduled_wakeup);
816

            
817
            output.extend(
818
                hop_events
819
                    .into_iter()
820
                    .map(|ev| Self::process_per_hop_event(&mut self.blocking, hop_idx, ev)),
821
            );
822
        }
823
12434
        output
824
12434
    }
825

            
826
    /// Find the next time at which we should wake up the stream, and register it as our
827
    /// "next scheduled wakeup".
828
12434
    fn schedule_next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
829
        // Find the earliest time at which any hop has a scheduled event.
830
12434
        let next_expiration = self
831
12434
            .hops
832
12434
            .iter_mut()
833
12434
            .flatten()
834
12434
            .filter_map(|hop| hop.next_wakeup(waker))
835
12434
            .min();
836
12434
        self.next_scheduled_wakeup = next_expiration;
837
12434
        self.waker = waker.clone();
838
12434
        next_expiration
839
12434
    }
840
}
841

            
842
/// A stream of [`PaddingEvent`] to tell a circuit when (if at all) it should send
843
/// padding and block traffic.
844
//
845
// TODO circpad: Optimize this even more for the no-padding case?
846
// We could make it smaller or faster.
847
pub(crate) struct PaddingEventStream<S = DynTimeProvider>
848
where
849
    S: SleepProvider,
850
{
851
    /// An underlying list of PaddingBackend.
852
    shared: Arc<Mutex<PaddingShared<S>>>,
853

            
854
    /// A future defining a time at which we must next call `padder.padding_events_at`.
855
    ///
856
    /// (We also arrange for the backend to wake us up if we need to change this time,
857
    /// or call `padder.padding_events_at`.)
858
    ///
859
    /// Note that this timer is allowed to be _earlier_ than our true wakeup time,
860
    /// but not later.
861
    sleep_future: S::SleepFuture,
862
}
863

            
864
impl futures::Stream for PaddingEventStream {
865
    type Item = PaddingEvent;
866

            
867
12434
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
868
        loop {
869
12434
            let (now, next_wakeup, runtime) = {
870
                // We destructure like this to avoid simultaneous mutable/immutable borrows.
871
12434
                let Self { shared, .. } = &mut *self;
872

            
873
12434
                let mut shared = shared.lock().expect("Poisoned lock");
874

            
875
                // Do we have any events that are waiting to be yielded?
876
12434
                if let Some(val) = shared.pending_events.pop_front() {
877
                    return Poll::Ready(Some(val));
878
12434
                }
879

            
880
                // Does the padder have any events that have become ready to be yielded?
881
12434
                let now = shared.runtime.now();
882
12434
                shared.pending_events = shared.take_padding_events_at(now);
883

            
884
12434
                if let Some(val) = shared.pending_events.pop_front() {
885
                    return Poll::Ready(Some(val));
886
12434
                }
887

            
888
                // If we reach this point, there are no events to trigger right now.
889
                //
890
                // We'll ask all the padders for the time at which they next might need to take
891
                // action, and register our Waker with them, to be alerted if we need to take any action
892
                // before that.
893
12434
                (
894
12434
                    now,
895
12434
                    shared.schedule_next_wakeup(cx.waker()),
896
12434
                    shared.runtime.clone(),
897
12434
                )
898
                // Here we drop the lock on the shared state.
899
            };
900

            
901
12434
            match next_wakeup {
902
                None => {
903
12434
                    return Poll::Pending;
904
                }
905
                Some(t) => {
906
                    // TODO circpad: Avoid rebuilding sleep future needlessly.  May require new APIs in
907
                    // tor-rtcompat.
908
                    self.sleep_future = runtime.sleep(t.saturating_duration_since(now));
909
                    match self.sleep_future.as_mut().poll(cx) {
910
                        Poll::Ready(()) => {
911
                            // Okay, The timer expired already. Continue through the loop.
912
                            continue;
913
                        }
914
                        Poll::Pending => return Poll::Pending,
915
                    }
916
                }
917
            }
918
        }
919
12434
    }
920
}
921

            
922
impl futures::stream::FusedStream for PaddingEventStream {
923
12434
    fn is_terminated(&self) -> bool {
924
        // This stream is _never_ terminated: even if it has no padding machines now,
925
        // we might add some in the future.
926
12434
        false
927
12434
    }
928
}
929

            
930
/// Construct a HopNum from an index into the `hops` field of a [`PaddingShared`].
931
///
932
/// # Panics
933
///
934
/// Panics if `hop_idx` is greater than u8::MAX, which should be impossible.
935
fn hopnum_from_hop_idx(hop_idx: usize) -> HopNum {
936
    // (Static assertion: makes sure we can represent every index of hops as a HopNum.)
937
    const _: () = assert!(MAX_HOPS < u8::MAX as usize);
938
    HopNum::from(u8::try_from(hop_idx).expect("hop_idx out of range!"))
939
}
940

            
941
/// Create a new, empty padding instance for a new circuit.
942
946
pub(crate) fn new_padding<S>(runtime: S) -> (PaddingController<S>, PaddingEventStream<S>)
943
946
where
944
946
    S: SleepProvider,
945
{
946
    // Start with an arbitrary sleep future.  We won't actually use this until
947
    // the first time that we have an event to schedule, so the timeout doesn't matter.
948
946
    let sleep_future = runtime.sleep(Duration::new(86400, 0));
949

            
950
946
    let shared = PaddingShared {
951
946
        runtime,
952
946
        hops: Default::default(),
953
946
        stats: Default::default(),
954
946
        blocking: Default::default(),
955
946
        next_scheduled_wakeup: None,
956
946
        pending_events: PaddingEventQueue::default(),
957
946
        waker: Waker::noop().clone(),
958
946
    };
959
946
    let shared = Arc::new(Mutex::new(shared));
960
946
    let controller = PaddingController {
961
946
        shared: shared.clone(),
962
946
    };
963
946
    let stream = PaddingEventStream {
964
946
        shared,
965
946
        sleep_future,
966
946
    };
967

            
968
946
    (controller, stream)
969
946
}