1
//! Padding backend based on [`maybenot`].
2
//!
3
//! # Operation
4
//!
5
//! For each each circuit hop, we have an optional [`maybenot::Framework`].
6
//! This framework wraps multiple "padding machines",
7
//! each of which is a randomized state machine.
8
//! (Arti is built with a list of pre-configured of padding machines.
9
//! The set of padding machines to use with any given circuit hop
10
//! are negotiated via `PADDING_NEGOTIATE` messages.)
11
//! We interact with the framework via
12
//! [`Framework::trigger_events`](maybenot::Framework::trigger_events),
13
//! which consumes [`TriggerEvent`]s and gives us [`TriggerAction`]s.
14
//! Those `TriggerAction`s tell us to schedule or reschedule different timers,
15
//! to block traffic, or to send padding.
16
//!
17
//! We wrap the `Framework` in [`MaybenotPadder`],
18
//! which keeps track of the expiration time for each timer.
19
//! From `MaybenotPadder`, we expose a single timer
20
//! describing when the next action from the padding machine may be necessary.
21
//! This timer is likely to update frequently.
22

            
23
use std::{sync::Arc, task::Waker};
24

            
25
use maybenot::{MachineId, TriggerEvent};
26
use smallvec::SmallVec;
27
use web_time_compat::InstantExt;
28

            
29
use super::{Bypass, Duration, Instant, PerHopPaddingEvent, PerHopPaddingEventVec, Replace};
30

            
31
/// The Rng that we construct for our padding machines.
32
///
33
/// We use a separate type alias here in case we want to move to a per-Framework
34
/// ChaCha8Rng or something.
35
type Rng = ThisThreadRng;
36

            
37
/// The particular instantiated padding framework type that we use.
38
type Framework = maybenot::Framework<Arc<[maybenot::Machine]>, Rng, Instant>;
39

            
40
/// A [`maybenot::TriggerAction`] as we construct it for use with our [`Framework`]s.
41
type TriggerAction = maybenot::TriggerAction<Instant>;
42

            
43
/// A type we use to report events that we must trigger on the basis of triggering other events.
44
///
45
/// We've optimized here for the assumption that we _usually_ won't need to trigger more than one
46
/// event.
47
type TriggerEventsOutVec = SmallVec<[TriggerEvent; 1]>;
48

            
49
/// An action that we should take on a machine's behalf,
50
/// after a certain interval has elapsed.
51
#[derive(Clone, Debug)]
52
enum ScheduledAction {
53
    /// We should send padding if and when the machine's action timer expires.
54
    SendPadding {
55
        /// Send padding even if bypassable blocking is in place.
56
        /// (Blocking can be bypassable or non-bypassable.)
57
        bypass: bool,
58
        /// If an existing non-padding cell is queued,
59
        /// it can replace this padding.
60
        //
61
        /// If `bypass` is true, such a cell can also bypass bypassable blocking.
62
        replace: bool,
63
    },
64
    /// We should block outbound traffic if and when the machine's action timer expires.
65
    Block {
66
        /// If true, then the blocking is bypassable.
67
        bypass: bool,
68
        /// If true, then we should change the duration of the current blocking unconditionally.
69
        /// If false, we should use whichever duration is longer.
70
        replace: bool,
71
        /// The interval of the blocking that we should apply.
72
        duration: Duration,
73
    },
74
}
75

            
76
/// The state for a _single_ padding Machine within a Framework.
77
#[derive(Default, Clone, Debug)]
78
struct MachineState {
79
    /// The current state for the machine's "internal timer".
80
    ///
81
    /// Each machine has a single internal timer,
82
    /// and manages the timer itself via the `UpdateTimer` and `Cancel`
83
    /// [`TriggerAction`] variants.
84
    internal_timer_expires: Option<Instant>,
85

            
86
    /// The current state for the machine's "action timer".
87
    ///
88
    /// Each machine has a single action timer, after which some [`ScheduledAction`]
89
    /// should be taken.
90
    ///
91
    /// (Note that only one action can be scheduled per machine,
92
    /// so if we're told to schedule blocking, we should cancel padding;
93
    /// and if we're told to schedule padding, we should cancel blocking.)
94
    action_timer_expires: Option<(Instant, ScheduledAction)>,
95
}
96

            
97
impl MachineState {
98
    /// Return the earliest time that either of this machine's timers will expire.
99
    fn next_expiration(&self) -> Option<Instant> {
100
        match (&self.internal_timer_expires, &self.action_timer_expires) {
101
            (None, None) => None,
102
            (None, Some((t, _))) => Some(*t),
103
            (Some(t), None) => Some(*t),
104
            (Some(t1), Some((t2, _))) => Some(*t1.min(t2)),
105
        }
106
    }
107
}
108

            
109
/// Represents the state for all padding machines within a framework.
110
///
111
/// N should be around the number of padding machines that the framework should support.
112
struct PadderState<const N: usize> {
113
    /// A list of all the padding machine states for a single framework.
114
    ///
115
    /// This list is indexed by `MachineId`.
116
    //
117
    // TODO: Optimize this size even more if appropriate
118
    state: SmallVec<[MachineState; N]>,
119
}
120

            
121
impl<const N: usize> PadderState<N> {
122
    /// Return a mutable reference to the state corresponding to a given [`MachineId`]
123
    ///
124
    /// # Panics
125
    ///
126
    /// Panics if `id` is out of range, which can only happen if a MachineId from
127
    /// one Framework is given to another Framework.
128
    fn state_mut(&mut self, id: MachineId) -> &mut MachineState {
129
        &mut self.state[id.into_raw()]
130
    }
131

            
132
    /// Execute a single [`TriggerAction`] on this state.
133
    ///
134
    /// `TriggerActions` are created by `maybenot::Framework` instances
135
    /// in response to [`TriggerEvent`]s.
136
    ///
137
    /// Executing a `TriggerAction` can adjust timers,
138
    /// and can schedule a new [`ScheduledAction`] to be taken in the future;
139
    /// it does not, however, send any padding or adjust any blocking on its own.
140
    ///
141
    /// The current time should be provided in `now`.
142
    ///
143
    /// Executing a `TriggerAction` can cause more events to occur.
144
    /// If this happens, they are added to `events_out`.
145
    ///
146
    /// If this method returns false, no timer has changed.
147
    /// If this method returns true, then a timer may have changed.
148
    /// (False positives are possible, but not false negatives.)
149
    fn trigger_action(
150
        &mut self,
151
        action: &TriggerAction,
152
        now: Instant,
153
        events_out: &mut TriggerEventsOutVec,
154
    ) -> bool {
155
        use maybenot::Timer as T;
156
        use maybenot::TriggerAction as A;
157

            
158
        let mut timer_changed = false;
159

            
160
        match action {
161
            A::Cancel { machine, timer } => {
162
                // "Cancel" means to stop one or both of the timers from this machine.
163
                let st = self.state_mut(*machine);
164
                match timer {
165
                    T::Action => st.action_timer_expires = None,
166
                    T::Internal => st.internal_timer_expires = None,
167
                    T::All => {
168
                        st.action_timer_expires = None;
169
                        st.internal_timer_expires = None;
170
                    }
171
                };
172
                timer_changed = true;
173
            }
174
            A::SendPadding {
175
                timeout,
176
                bypass,
177
                replace,
178
                machine,
179
            } => {
180
                // "SendPadding" means to schedule padding to be sent after a given timeout,
181
                // and to replace any previous timed action.
182
                let st = self.state_mut(*machine);
183
                st.action_timer_expires = Some((
184
                    now + *timeout,
185
                    ScheduledAction::SendPadding {
186
                        bypass: *bypass,
187
                        replace: *replace,
188
                    },
189
                ));
190
                timer_changed = true;
191
            }
192
            A::BlockOutgoing {
193
                timeout,
194
                duration,
195
                bypass,
196
                replace,
197
                machine,
198
            } => {
199
                // "BlockOutgoing" means to begin blocking traffic for a given duration,
200
                // after a given timeout,
201
                // and to replace any previous timed action.
202
                let st = self.state_mut(*machine);
203
                st.action_timer_expires = Some((
204
                    now + *timeout,
205
                    ScheduledAction::Block {
206
                        bypass: *bypass,
207
                        replace: *replace,
208
                        duration: *duration,
209
                    },
210
                ));
211
                timer_changed = true;
212
            }
213
            A::UpdateTimer {
214
                duration,
215
                replace,
216
                machine,
217
            } => {
218
                // "UpdateTimer" means to set or re-set the internal timer for this machine.
219
                let st = self.state_mut(*machine);
220

            
221
                let new_expiry = now + *duration;
222
                // The "replace" flag means "update the internal timer unconditionally".
223
                // If it is false, and the timer is already set, then we should only update
224
                // the internal timer to be _longer_.
225
                let update_timer = match (replace, st.internal_timer_expires) {
226
                    (_, None) => true,
227
                    (true, Some(_)) => true,
228
                    (false, Some(cur)) if new_expiry > cur => true,
229
                    (false, Some(_)) => false,
230
                };
231
                if update_timer {
232
                    st.internal_timer_expires = Some(new_expiry);
233
                    timer_changed = true;
234
                }
235
                // Note: We are supposed to trigger TimerBegin unconditionally
236
                // if the timer changes at all.
237
                events_out.push(TriggerEvent::TimerBegin { machine: *machine });
238
            }
239
        }
240

            
241
        timer_changed
242
    }
243

            
244
    /// Return the next instant (if any) at which any of the padding machines' timers will expire.
245
    fn next_expiration(&self) -> Option<Instant> {
246
        self.state
247
            .iter()
248
            .filter_map(MachineState::next_expiration)
249
            .min()
250
    }
251
}
252

            
253
/// Possible state of a Framework's aggregate timer.
254
///
255
/// (Since there are two possible timers for each Machine,
256
/// we just keep track of the one that will expire next.)
257
#[derive(Clone, Debug)]
258
struct Timer {
259
    /// The next time at which any of this padding machines' timer will expire.
260
    ///
261
    /// (None means "no timers are set.")
262
    next_expiration: Option<Instant>,
263

            
264
    /// A [`Waker`] that we must wake whenever `self.next_expiration` becomes sooner than
265
    /// our next scheduled wakeup (as passed as an argument to `set_expiration`).
266
    waker: Waker,
267
}
268

            
269
impl Timer {
270
    /// Construct a new Timer.
271
    fn new() -> Self {
272
        Self {
273
            next_expiration: None,
274
            waker: Waker::noop().clone(),
275
        }
276
    }
277

            
278
    /// Return the next expiration time, and schedule `waker` to be alerted whenever
279
    /// the expiration time becomes earlier than the time at which we've actually decided to sleep
280
    /// (passed as an argument to `set_expiration()`).
281
    ///
282
    /// (There are two separate expiration times at work here because, in higher-level code,
283
    /// we combine _all_ the timer expirations for all padding machines on a circuit
284
    /// into a single expiration, and track only that expiration.)
285
    fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
286
        // TODO: Perhaps this should instead return and/or manipulate a sleep future.
287
        // TODO: Perhaps there should be a shared AtomicWaker?
288
        self.waker = waker.clone();
289
        self.next_expiration
290
    }
291

            
292
    /// Change the expiration time to `new_expiration`, alerting the [`Waker`] if that time
293
    /// is earlier than `next_scheduled_wakeup`.
294
    fn set_expiration(
295
        &mut self,
296
        new_expiration: Option<Instant>,
297
        next_scheduled_wakeup: Option<Instant>,
298
    ) {
299
        // we need to invoke the waker if the new expiration is earlier than the one the waker has.
300
        let wake = match (next_scheduled_wakeup, new_expiration) {
301
            (_, None) => false,
302
            (None, Some(_)) => true,
303
            (Some(w_exp), Some(new_exp)) => new_exp < w_exp,
304
        };
305
        self.next_expiration = new_expiration;
306
        if wake {
307
            self.waker.wake_by_ref();
308
        }
309
    }
310
}
311

            
312
/// State of a MaybenotPadder that is blocking.
313
///
314
/// Here we only need to remember when the blocking expires;
315
/// we record the bypassable status of the padding in [`super::PaddingShared`].
316
#[derive(Debug)]
317
struct BlockingState {
318
    /// The time at which this blocking expires.
319
    expiration: Instant,
320
}
321

            
322
/// An implementation of circuit padding using [`maybenot`].
323
///
324
/// Supports up to `N` padding machines without spilling over onto the heap.
325
pub(super) struct MaybenotPadder<const N: usize> {
326
    /// Our underlying [`maybenot::Framework`].
327
    framework: Framework,
328
    /// The state of our padding machines.
329
    state: PadderState<N>,
330
    /// Our current timer information.
331
    timer: Timer,
332
    /// If we are blocking, information about the blocking.
333
    blocking: Option<BlockingState>,
334
}
335

            
336
impl<const N: usize> MaybenotPadder<N> {
337
    /// Construct a new MaybyenotPadder from a provided `FrameworkRules`.
338
    pub(super) fn from_framework_rules(
339
        rules: &super::PaddingRules,
340
    ) -> Result<Self, maybenot::Error> {
341
        let framework = maybenot::Framework::new(
342
            rules.machines.clone(),
343
            rules.max_outbound_padding_frac,
344
            rules.max_outbound_blocking_frac,
345
            // TODO #2428 PADDING: We should be taking this from a SleepProvider!
346
            Instant::get(),
347
            ThisThreadRng,
348
        )?;
349
        Ok(Self::from_framework(framework))
350
    }
351

            
352
    /// Construct a new MaybenotPadder from a given Framework.
353
    pub(super) fn from_framework(framework: Framework) -> Self {
354
        let n = framework.num_machines();
355
        let state = PadderState {
356
            state: smallvec::smallvec![MachineState::default(); n],
357
        };
358
        Self {
359
            framework,
360
            state,
361
            timer: Timer::new(),
362
            blocking: None,
363
        }
364
    }
365

            
366
    /// Return the next expiration time, and schedule `waker` to be alerted whenever
367
    /// the expiration time becomes earlier than that.
368
    pub(super) fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
369
        self.timer.get_expiration(waker)
370
    }
371

            
372
    /// Tell the padding machines about all of the given `events`,
373
    /// report them happening at `now`, and adjust internal state.
374
    ///
375
    /// If doing this would cause any timer to become earlier than `next_scheduled_wakeup`,
376
    /// wake up the registered [`Waker`].
377
    pub(super) fn trigger_events_at(
378
        &mut self,
379
        events: &[TriggerEvent],
380
        now: Instant,
381
        next_scheduled_wakeup: Option<Instant>,
382
    ) {
383
        let mut timer_changed = false;
384

            
385
        // A pair of buffers that we'll use to handle events that arise while triggering other
386
        // events.  (The BeginTimer event can be triggered by the UpdateTimer action.)
387
        let (mut e1, mut e2) = (TriggerEventsOutVec::new(), TriggerEventsOutVec::new());
388
        let (mut processing, mut pending) = (&mut e1, &mut e2);
389

            
390
        let mut events = events;
391

            
392
        /// If we go through our loop more than this many times, we stop:
393
        /// An infinite loop is in theory possible, but we don't want to allow one.
394
        const MAX_LOOPS: usize = 4;
395

            
396
        let finished_normally = 'finished: {
397
            for _ in 0..MAX_LOOPS {
398
                pending.clear();
399
                for action in self.framework.trigger_events(events, now) {
400
                    timer_changed |= self.state.trigger_action(action, now, pending);
401
                }
402

            
403
                if pending.is_empty() {
404
                    // We don't have any additional events to trigger.
405
                    break 'finished true;
406
                } else {
407
                    std::mem::swap(&mut processing, &mut pending);
408
                    events = &processing[..];
409
                }
410
            }
411
            // We got to the last iteration of the loop and still had events to trigger.
412
            break 'finished false;
413
        };
414

            
415
        if !finished_normally {
416
            // TODO: Log in this case, but not too many times.
417
        }
418

            
419
        if timer_changed {
420
            self.timer
421
                .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
422
        }
423
    }
424

            
425
    /// Take any actions that need to occur at time `now`.
426
    ///
427
    /// We should call this function as soon as possible after our timer has expired.
428
    ///
429
    /// Returns zero or more [`PerHopPaddingEvent`]s reflecting the padding that we should send,
430
    /// and what we should do with blocking.
431
    fn take_actions_at(
432
        &mut self,
433
        now: Instant,
434
        next_scheduled_wakeup: Option<Instant>,
435
    ) -> PerHopPaddingEventVec {
436
        // Events that we need to trigger based on expired timers.
437
        // TODO: We might want a smaller N here.
438
        let mut e: SmallVec<[TriggerEvent; N]> = SmallVec::default();
439

            
440
        // A list of events that we can't handle internally, and which we need to report
441
        // to a circuit/tunnel reactor.
442
        let mut return_events = PerHopPaddingEventVec::default();
443

            
444
        let mut timer_changed = false;
445

            
446
        if let Some(blocking) = &self.blocking {
447
            if blocking.expiration <= now {
448
                timer_changed = true;
449
                self.blocking = None;
450
                e.push(TriggerEvent::BlockingEnd);
451
                return_events.push(PerHopPaddingEvent::StopBlocking);
452
            }
453
        }
454

            
455
        for (idx, st) in self.state.state.iter_mut().enumerate() {
456
            match st.internal_timer_expires {
457
                Some(t) if t <= now => {
458
                    // This machine's internal timer has expired; we tell it so.
459
                    st.internal_timer_expires = None;
460
                    timer_changed = true;
461
                    e.push(TriggerEvent::TimerEnd {
462
                        machine: MachineId::from_raw(idx),
463
                    });
464
                }
465
                None | Some(_) => {}
466
            }
467
            match &st.action_timer_expires {
468
                Some((t, _)) if *t <= now => {
469
                    // This machine's action timer has expired; we now take that action.
470
                    use ScheduledAction as SA;
471
                    let action = st
472
                        .action_timer_expires
473
                        .take()
474
                        .expect("It was Some a minute ago!")
475
                        .1;
476
                    timer_changed = true;
477
                    match action {
478
                        SA::SendPadding { bypass, replace } => {
479
                            return_events.push(PerHopPaddingEvent::SendPadding {
480
                                machine: MachineId::from_raw(idx),
481
                                replace: Replace::from_bool(replace),
482
                                bypass: Bypass::from_bool(bypass),
483
                            });
484
                        }
485
                        SA::Block {
486
                            bypass,
487
                            replace,
488
                            duration,
489
                        } => {
490
                            let new_expiry = now + duration;
491
                            if self.blocking.is_none() {
492
                                return_events.push(PerHopPaddingEvent::StartBlocking {
493
                                    is_bypassable: bypass,
494
                                });
495
                            }
496
                            let replace = match &self.blocking {
497
                                None => true,
498
                                Some(b) if replace || b.expiration < new_expiry => true,
499
                                Some(_) => false,
500
                            };
501
                            if replace {
502
                                self.blocking = Some(BlockingState {
503
                                    expiration: new_expiry,
504
                                });
505
                            }
506

            
507
                            // We trigger this event unconditionally, even if we were already
508
                            // blocking.
509
                            e.push(TriggerEvent::BlockingBegin {
510
                                machine: MachineId::from_raw(idx),
511
                            });
512
                        }
513
                    }
514
                }
515
                None | Some(_) => {}
516
            }
517
        }
518

            
519
        if timer_changed {
520
            self.timer
521
                .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
522
        }
523

            
524
        // Inform the framework of any expired timeouts.
525
        self.trigger_events_at(&e[..], now, next_scheduled_wakeup);
526

            
527
        return_events
528
    }
529
}
530

            
531
/// Helper: An Rng object that calls `rand::rng()` to get the thread Rng.
532
///
533
/// (We use this since we want our maybenot Framework to use the thread Rng,
534
/// but we can't have it _own_ the thread Rng. )
535
#[derive(Clone, Debug)]
536
pub(super) struct ThisThreadRng;
537
impl rand::RngCore for ThisThreadRng {
538
    fn next_u32(&mut self) -> u32 {
539
        rand::rng().next_u32()
540
    }
541

            
542
    fn next_u64(&mut self) -> u64 {
543
        rand::rng().next_u64()
544
    }
545

            
546
    fn fill_bytes(&mut self, dst: &mut [u8]) {
547
        rand::rng().fill_bytes(dst);
548
    }
549
}
550

            
551
/// Helper trait: Used to wrap a single [`MaybenotPadder`].
552
///
553
/// (We don't use `MaybenotPadder` directly because we want to keep the freedom
554
/// to parameterize it differently, or maybe even to replace it with something else.)
555
//
556
// TODO circpad: Decide whether this optimization/flexibility makes any sense.
557
pub(super) trait PaddingBackend: Send + Sync {
558
    /// Report one or more TriggerEvents to the padder.
559
    ///
560
    /// Alert any registered `Waker` if these events cause us to need to take action
561
    /// earlier than `next_scheduled_wakeup`.
562
    fn report_events_at(
563
        &mut self,
564
        events: &[maybenot::TriggerEvent],
565
        now: Instant,
566
        next_scheduled_wakeup: Option<Instant>,
567
    );
568

            
569
    /// Trigger any padding actions that should be taken `now`.
570
    ///
571
    /// If _we_ should perform any actions (blocking, unblocking, or sending padding),
572
    /// return them in a [`PerHopPaddingEventVec`].
573
    ///
574
    /// Alert any registered `Waker` if these events cause us to need to take action
575
    /// earlier than `next_scheduled_wakeup`.
576
    fn take_padding_events_at(
577
        &mut self,
578
        now: Instant,
579
        next_scheduled_wakeup: Option<Instant>,
580
    ) -> PerHopPaddingEventVec;
581

            
582
    /// This method should be called when we have no actions to perform,
583
    /// with a [`Waker`] that will activate the corresponding [`PaddingEventStream`](super::PaddingEventStream).
584
    ///
585
    /// It will return a time at which pending_events_at() should next be called,
586
    /// and will wake up the Waker if it turns out that we need to call `pending_events_at()`
587
    /// any earlier than that.
588
    fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant>;
589
}
590

            
591
impl<const N: usize> PaddingBackend for MaybenotPadder<N> {
592
    fn report_events_at(
593
        &mut self,
594
        events: &[maybenot::TriggerEvent],
595
        now: Instant,
596
        next_scheduled_wakeup: Option<Instant>,
597
    ) {
598
        self.trigger_events_at(events, now, next_scheduled_wakeup);
599
    }
600

            
601
    fn take_padding_events_at(
602
        &mut self,
603
        now: Instant,
604
        next_scheduled_wakeup: Option<Instant>,
605
    ) -> PerHopPaddingEventVec {
606
        self.take_actions_at(now, next_scheduled_wakeup)
607
    }
608

            
609
    fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
610
        self.get_expiration(waker)
611
    }
612
}