1
//! Padding backend based on [`maybenot`].
2
//!
3
//! # Operation
4
//!
5
//! For each each circuit hop, we have an optional [`maybenot::Framework`].
6
//! This framework wraps multiple "padding machines",
7
//! each of which is a randomized state machine.
8
//! (Arti is built with a list of pre-configured of padding machines.
9
//! The set of padding machines to use with any given circuit hop
10
//! are negotiated via `PADDING_NEGOTIATE` messages.)
11
//! We interact with the framework via
12
//! [`Framework::trigger_events`](maybenot::Framework::trigger_events),
13
//! which consumes [`TriggerEvent`]s and gives us [`TriggerAction`]s.
14
//! Those `TriggerAction`s tell us to schedule or reschedule different timers,
15
//! to block traffic, or to send padding.
16
//!
17
//! We wrap the `Framework` in [`MaybenotPadder`],
18
//! which keeps track of the expiration time for each timer.
19
//! From `MaybenotPadder`, we expose a single timer
20
//! describing when the next action from the padding machine may be necessary.
21
//! This timer is likely to update frequently.
22

            
23
use std::{sync::Arc, task::Waker};
24

            
25
use maybenot::{MachineId, TriggerEvent};
26
use smallvec::SmallVec;
27

            
28
use super::{Bypass, Duration, Instant, PerHopPaddingEvent, PerHopPaddingEventVec, Replace};
29

            
30
/// The Rng that we construct for our padding machines.
31
///
32
/// We use a separate type alias here in case we want to move to a per-Framework
33
/// ChaCha8Rng or something.
34
type Rng = ThisThreadRng;
35

            
36
/// The particular instantiated padding framework type that we use.
37
type Framework = maybenot::Framework<Arc<[maybenot::Machine]>, Rng, Instant>;
38

            
39
/// A [`maybenot::TriggerAction`] as we construct it for use with our [`Framework`]s.
40
type TriggerAction = maybenot::TriggerAction<Instant>;
41

            
42
/// A type we use to report events that we must trigger on the basis of triggering other events.
43
///
44
/// We've optimized here for the assumption that we _usually_ won't need to trigger more than one
45
/// event.
46
type TriggerEventsOutVec = SmallVec<[TriggerEvent; 1]>;
47

            
48
/// An action that we should take on a machine's behalf,
49
/// after a certain interval has elapsed.
50
#[derive(Clone, Debug)]
51
enum ScheduledAction {
52
    /// We should send padding if and when the machine's action timer expires.
53
    SendPadding {
54
        /// Send padding even if bypassable blocking is in place.
55
        /// (Blocking can be bypassable or non-bypassable.)
56
        bypass: bool,
57
        /// If an existing non-padding cell is queued,
58
        /// it can replace this padding.
59
        //
60
        /// If `bypass` is true, such a cell can also bypass bypassable blocking.
61
        replace: bool,
62
    },
63
    /// We should block outbound traffic if and when the machine's action timer expires.
64
    Block {
65
        /// If true, then the blocking is bypassable.
66
        bypass: bool,
67
        /// If true, then we should change the duration of the current blocking unconditionally.
68
        /// If false, we should use whichever duration is longer.
69
        replace: bool,
70
        /// The interval of the blocking that we should apply.
71
        duration: Duration,
72
    },
73
}
74

            
75
/// The state for a _single_ padding Machine within a Framework.
76
#[derive(Default, Clone, Debug)]
77
struct MachineState {
78
    /// The current state for the machine's "internal timer".
79
    ///
80
    /// Each machine has a single internal timer,
81
    /// and manages the timer itself via the `UpdateTimer` and `Cancel`
82
    /// [`TriggerAction`] variants.
83
    internal_timer_expires: Option<Instant>,
84

            
85
    /// The current state for the machine's "action timer".
86
    ///
87
    /// Each machine has a single action timer, after which some [`ScheduledAction`]
88
    /// should be taken.
89
    ///
90
    /// (Note that only one action can be scheduled per machine,
91
    /// so if we're told to schedule blocking, we should cancel padding;
92
    /// and if we're told to schedule padding, we should cancel blocking.)
93
    action_timer_expires: Option<(Instant, ScheduledAction)>,
94
}
95

            
96
impl MachineState {
97
    /// Return the earliest time that either of this machine's timers will expire.
98
    fn next_expiration(&self) -> Option<Instant> {
99
        match (&self.internal_timer_expires, &self.action_timer_expires) {
100
            (None, None) => None,
101
            (None, Some((t, _))) => Some(*t),
102
            (Some(t), None) => Some(*t),
103
            (Some(t1), Some((t2, _))) => Some(*t1.min(t2)),
104
        }
105
    }
106
}
107

            
108
/// Represents the state for all padding machines within a framework.
109
///
110
/// N should be around the number of padding machines that the framework should support.
111
struct PadderState<const N: usize> {
112
    /// A list of all the padding machine states for a single framework.
113
    ///
114
    /// This list is indexed by `MachineId`.
115
    //
116
    // TODO: Optimize this size even more if appropriate
117
    state: SmallVec<[MachineState; N]>,
118
}
119

            
120
impl<const N: usize> PadderState<N> {
121
    /// Return a mutable reference to the state corresponding to a given [`MachineId`]
122
    ///
123
    /// # Panics
124
    ///
125
    /// Panics if `id` is out of range, which can only happen if a MachineId from
126
    /// one Framework is given to another Framework.
127
    fn state_mut(&mut self, id: MachineId) -> &mut MachineState {
128
        &mut self.state[id.into_raw()]
129
    }
130

            
131
    /// Execute a single [`TriggerAction`] on this state.
132
    ///
133
    /// `TriggerActions` are created by `maybenot::Framework` instances
134
    /// in response to [`TriggerEvent`]s.
135
    ///
136
    /// Executing a `TriggerAction` can adjust timers,
137
    /// and can schedule a new [`ScheduledAction`] to be taken in the future;
138
    /// it does not, however, send any padding or adjust any blocking on its own.
139
    ///
140
    /// The current time should be provided in `now`.
141
    ///
142
    /// Executing a `TriggerAction` can cause more events to occur.
143
    /// If this happens, they are added to `events_out`.
144
    ///
145
    /// If this method returns false, no timer has changed.
146
    /// If this method returns true, then a timer may have changed.
147
    /// (False positives are possible, but not false negatives.)
148
    fn trigger_action(
149
        &mut self,
150
        action: &TriggerAction,
151
        now: Instant,
152
        events_out: &mut TriggerEventsOutVec,
153
    ) -> bool {
154
        use maybenot::Timer as T;
155
        use maybenot::TriggerAction as A;
156

            
157
        let mut timer_changed = false;
158

            
159
        match action {
160
            A::Cancel { machine, timer } => {
161
                // "Cancel" means to stop one or both of the timers from this machine.
162
                let st = self.state_mut(*machine);
163
                match timer {
164
                    T::Action => st.action_timer_expires = None,
165
                    T::Internal => st.internal_timer_expires = None,
166
                    T::All => {
167
                        st.action_timer_expires = None;
168
                        st.internal_timer_expires = None;
169
                    }
170
                };
171
                timer_changed = true;
172
            }
173
            A::SendPadding {
174
                timeout,
175
                bypass,
176
                replace,
177
                machine,
178
            } => {
179
                // "SendPadding" means to schedule padding to be sent after a given timeout,
180
                // and to replace any previous timed action.
181
                let st = self.state_mut(*machine);
182
                st.action_timer_expires = Some((
183
                    now + *timeout,
184
                    ScheduledAction::SendPadding {
185
                        bypass: *bypass,
186
                        replace: *replace,
187
                    },
188
                ));
189
                timer_changed = true;
190
            }
191
            A::BlockOutgoing {
192
                timeout,
193
                duration,
194
                bypass,
195
                replace,
196
                machine,
197
            } => {
198
                // "BlockOutgoing" means to begin blocking traffic for a given duration,
199
                // after a given timeout,
200
                // and to replace any previous timed action.
201
                let st = self.state_mut(*machine);
202
                st.action_timer_expires = Some((
203
                    now + *timeout,
204
                    ScheduledAction::Block {
205
                        bypass: *bypass,
206
                        replace: *replace,
207
                        duration: *duration,
208
                    },
209
                ));
210
                timer_changed = true;
211
            }
212
            A::UpdateTimer {
213
                duration,
214
                replace,
215
                machine,
216
            } => {
217
                // "UpdateTimer" means to set or re-set the internal timer for this machine.
218
                let st = self.state_mut(*machine);
219

            
220
                let new_expiry = now + *duration;
221
                // The "replace" flag means "update the internal timer unconditionally".
222
                // If it is false, and the timer is already set, then we should only update
223
                // the internal timer to be _longer_.
224
                let update_timer = match (replace, st.internal_timer_expires) {
225
                    (_, None) => true,
226
                    (true, Some(_)) => true,
227
                    (false, Some(cur)) if new_expiry > cur => true,
228
                    (false, Some(_)) => false,
229
                };
230
                if update_timer {
231
                    st.internal_timer_expires = Some(new_expiry);
232
                    timer_changed = true;
233
                }
234
                // Note: We are supposed to trigger TimerBegin unconditionally
235
                // if the timer changes at all.
236
                events_out.push(TriggerEvent::TimerBegin { machine: *machine });
237
            }
238
        }
239

            
240
        timer_changed
241
    }
242

            
243
    /// Return the next instant (if any) at which any of the padding machines' timers will expire.
244
    fn next_expiration(&self) -> Option<Instant> {
245
        self.state
246
            .iter()
247
            .filter_map(MachineState::next_expiration)
248
            .min()
249
    }
250
}
251

            
252
/// Possible state of a Framework's aggregate timer.
253
///
254
/// (Since there are two possible timers for each Machine,
255
/// we just keep track of the one that will expire next.)
256
#[derive(Clone, Debug)]
257
struct Timer {
258
    /// The next time at which any of this padding machines' timer will expire.
259
    ///
260
    /// (None means "no timers are set.")
261
    next_expiration: Option<Instant>,
262

            
263
    /// A [`Waker`] that we must wake whenever `self.next_expiration` becomes sooner than
264
    /// our next scheduled wakeup (as passed as an argument to `set_expiration`).
265
    waker: Waker,
266
}
267

            
268
impl Timer {
269
    /// Construct a new Timer.
270
    fn new() -> Self {
271
        Self {
272
            next_expiration: None,
273
            waker: Waker::noop().clone(),
274
        }
275
    }
276

            
277
    /// Return the next expiration time, and schedule `waker` to be alerted whenever
278
    /// the expiration time becomes earlier than the time at which we've actually decided to sleep
279
    /// (passed as an argument to `set_expiration()`).
280
    ///
281
    /// (There are two separate expiration times at work here because, in higher-level code,
282
    /// we combine _all_ the timer expirations for all padding machines on a circuit
283
    /// into a single expiration, and track only that expiration.)
284
    fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
285
        // TODO: Perhaps this should instead return and/or manipulate a sleep future.
286
        // TODO: Perhaps there should be a shared AtomicWaker?
287
        self.waker = waker.clone();
288
        self.next_expiration
289
    }
290

            
291
    /// Change the expiration time to `new_expiration`, alerting the [`Waker`] if that time
292
    /// is earlier than `next_scheduled_wakeup`.
293
    fn set_expiration(
294
        &mut self,
295
        new_expiration: Option<Instant>,
296
        next_scheduled_wakeup: Option<Instant>,
297
    ) {
298
        // we need to invoke the waker if the new expiration is earlier than the one the waker has.
299
        let wake = match (next_scheduled_wakeup, new_expiration) {
300
            (_, None) => false,
301
            (None, Some(_)) => true,
302
            (Some(w_exp), Some(new_exp)) => new_exp < w_exp,
303
        };
304
        self.next_expiration = new_expiration;
305
        if wake {
306
            self.waker.wake_by_ref();
307
        }
308
    }
309
}
310

            
311
/// State of a MaybenotPadder that is blocking.
312
///
313
/// Here we only need to remember when the blocking expires;
314
/// we record the bypassable status of the padding in [`super::PaddingShared`].
315
#[derive(Debug)]
316
struct BlockingState {
317
    /// The time at which this blocking expires.
318
    expiration: Instant,
319
}
320

            
321
/// An implementation of circuit padding using [`maybenot`].
322
///
323
/// Supports up to `N` padding machines without spilling over onto the heap.
324
pub(super) struct MaybenotPadder<const N: usize> {
325
    /// Our underlying [`maybenot::Framework`].
326
    framework: Framework,
327
    /// The state of our padding machines.
328
    state: PadderState<N>,
329
    /// Our current timer information.
330
    timer: Timer,
331
    /// If we are blocking, information about the blocking.
332
    blocking: Option<BlockingState>,
333
}
334

            
335
impl<const N: usize> MaybenotPadder<N> {
336
    /// Construct a new MaybyenotPadder from a provided `FrameworkRules`.
337
    pub(super) fn from_framework_rules(
338
        rules: &super::PaddingRules,
339
        now: Instant,
340
    ) -> Result<Self, maybenot::Error> {
341
        let framework = maybenot::Framework::new(
342
            rules.machines.clone(),
343
            rules.max_outbound_padding_frac,
344
            rules.max_outbound_blocking_frac,
345
            now,
346
            ThisThreadRng,
347
        )?;
348
        Ok(Self::from_framework(framework))
349
    }
350

            
351
    /// Construct a new MaybenotPadder from a given Framework.
352
    pub(super) fn from_framework(framework: Framework) -> Self {
353
        let n = framework.num_machines();
354
        let state = PadderState {
355
            state: smallvec::smallvec![MachineState::default(); n],
356
        };
357
        Self {
358
            framework,
359
            state,
360
            timer: Timer::new(),
361
            blocking: None,
362
        }
363
    }
364

            
365
    /// Return the next expiration time, and schedule `waker` to be alerted whenever
366
    /// the expiration time becomes earlier than that.
367
    pub(super) fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
368
        self.timer.get_expiration(waker)
369
    }
370

            
371
    /// Tell the padding machines about all of the given `events`,
372
    /// report them happening at `now`, and adjust internal state.
373
    ///
374
    /// If doing this would cause any timer to become earlier than `next_scheduled_wakeup`,
375
    /// wake up the registered [`Waker`].
376
    pub(super) fn trigger_events_at(
377
        &mut self,
378
        events: &[TriggerEvent],
379
        now: Instant,
380
        next_scheduled_wakeup: Option<Instant>,
381
    ) {
382
        let mut timer_changed = false;
383

            
384
        // A pair of buffers that we'll use to handle events that arise while triggering other
385
        // events.  (The BeginTimer event can be triggered by the UpdateTimer action.)
386
        let (mut e1, mut e2) = (TriggerEventsOutVec::new(), TriggerEventsOutVec::new());
387
        let (mut processing, mut pending) = (&mut e1, &mut e2);
388

            
389
        let mut events = events;
390

            
391
        /// If we go through our loop more than this many times, we stop:
392
        /// An infinite loop is in theory possible, but we don't want to allow one.
393
        const MAX_LOOPS: usize = 4;
394

            
395
        let finished_normally = 'finished: {
396
            for _ in 0..MAX_LOOPS {
397
                pending.clear();
398
                for action in self.framework.trigger_events(events, now) {
399
                    timer_changed |= self.state.trigger_action(action, now, pending);
400
                }
401

            
402
                if pending.is_empty() {
403
                    // We don't have any additional events to trigger.
404
                    break 'finished true;
405
                } else {
406
                    std::mem::swap(&mut processing, &mut pending);
407
                    events = &processing[..];
408
                }
409
            }
410
            // We got to the last iteration of the loop and still had events to trigger.
411
            break 'finished false;
412
        };
413

            
414
        if !finished_normally {
415
            // TODO: Log in this case, but not too many times.
416
        }
417

            
418
        if timer_changed {
419
            self.timer
420
                .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
421
        }
422
    }
423

            
424
    /// Take any actions that need to occur at time `now`.
425
    ///
426
    /// We should call this function as soon as possible after our timer has expired.
427
    ///
428
    /// Returns zero or more [`PerHopPaddingEvent`]s reflecting the padding that we should send,
429
    /// and what we should do with blocking.
430
    fn take_actions_at(
431
        &mut self,
432
        now: Instant,
433
        next_scheduled_wakeup: Option<Instant>,
434
    ) -> PerHopPaddingEventVec {
435
        // Events that we need to trigger based on expired timers.
436
        // TODO: We might want a smaller N here.
437
        let mut e: SmallVec<[TriggerEvent; N]> = SmallVec::default();
438

            
439
        // A list of events that we can't handle internally, and which we need to report
440
        // to a circuit/tunnel reactor.
441
        let mut return_events = PerHopPaddingEventVec::default();
442

            
443
        let mut timer_changed = false;
444

            
445
        if let Some(blocking) = &self.blocking {
446
            if blocking.expiration <= now {
447
                timer_changed = true;
448
                self.blocking = None;
449
                e.push(TriggerEvent::BlockingEnd);
450
                return_events.push(PerHopPaddingEvent::StopBlocking);
451
            }
452
        }
453

            
454
        for (idx, st) in self.state.state.iter_mut().enumerate() {
455
            match st.internal_timer_expires {
456
                Some(t) if t <= now => {
457
                    // This machine's internal timer has expired; we tell it so.
458
                    st.internal_timer_expires = None;
459
                    timer_changed = true;
460
                    e.push(TriggerEvent::TimerEnd {
461
                        machine: MachineId::from_raw(idx),
462
                    });
463
                }
464
                None | Some(_) => {}
465
            }
466
            match &st.action_timer_expires {
467
                Some((t, _)) if *t <= now => {
468
                    // This machine's action timer has expired; we now take that action.
469
                    use ScheduledAction as SA;
470
                    let action = st
471
                        .action_timer_expires
472
                        .take()
473
                        .expect("It was Some a minute ago!")
474
                        .1;
475
                    timer_changed = true;
476
                    match action {
477
                        SA::SendPadding { bypass, replace } => {
478
                            return_events.push(PerHopPaddingEvent::SendPadding {
479
                                machine: MachineId::from_raw(idx),
480
                                replace: Replace::from_bool(replace),
481
                                bypass: Bypass::from_bool(bypass),
482
                            });
483
                        }
484
                        SA::Block {
485
                            bypass,
486
                            replace,
487
                            duration,
488
                        } => {
489
                            let new_expiry = now + duration;
490
                            if self.blocking.is_none() {
491
                                return_events.push(PerHopPaddingEvent::StartBlocking {
492
                                    is_bypassable: bypass,
493
                                });
494
                            }
495
                            let replace = match &self.blocking {
496
                                None => true,
497
                                Some(b) if replace || b.expiration < new_expiry => true,
498
                                Some(_) => false,
499
                            };
500
                            if replace {
501
                                self.blocking = Some(BlockingState {
502
                                    expiration: new_expiry,
503
                                });
504
                            }
505

            
506
                            // We trigger this event unconditionally, even if we were already
507
                            // blocking.
508
                            e.push(TriggerEvent::BlockingBegin {
509
                                machine: MachineId::from_raw(idx),
510
                            });
511
                        }
512
                    }
513
                }
514
                None | Some(_) => {}
515
            }
516
        }
517

            
518
        if timer_changed {
519
            self.timer
520
                .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
521
        }
522

            
523
        // Inform the framework of any expired timeouts.
524
        self.trigger_events_at(&e[..], now, next_scheduled_wakeup);
525

            
526
        return_events
527
    }
528
}
529

            
530
/// Helper: An Rng object that calls `rand::rng()` to get the thread Rng.
531
///
532
/// (We use this since we want our maybenot Framework to use the thread Rng,
533
/// but we can't have it _own_ the thread Rng. )
534
#[derive(Clone, Debug)]
535
pub(super) struct ThisThreadRng;
536
impl rand::RngCore for ThisThreadRng {
537
    fn next_u32(&mut self) -> u32 {
538
        rand::rng().next_u32()
539
    }
540

            
541
    fn next_u64(&mut self) -> u64 {
542
        rand::rng().next_u64()
543
    }
544

            
545
    fn fill_bytes(&mut self, dst: &mut [u8]) {
546
        rand::rng().fill_bytes(dst);
547
    }
548
}
549

            
550
/// Helper trait: Used to wrap a single [`MaybenotPadder`].
551
///
552
/// (We don't use `MaybenotPadder` directly because we want to keep the freedom
553
/// to parameterize it differently, or maybe even to replace it with something else.)
554
//
555
// TODO circpad: Decide whether this optimization/flexibility makes any sense.
556
pub(super) trait PaddingBackend: Send + Sync {
557
    /// Report one or more TriggerEvents to the padder.
558
    ///
559
    /// Alert any registered `Waker` if these events cause us to need to take action
560
    /// earlier than `next_scheduled_wakeup`.
561
    fn report_events_at(
562
        &mut self,
563
        events: &[maybenot::TriggerEvent],
564
        now: Instant,
565
        next_scheduled_wakeup: Option<Instant>,
566
    );
567

            
568
    /// Trigger any padding actions that should be taken `now`.
569
    ///
570
    /// If _we_ should perform any actions (blocking, unblocking, or sending padding),
571
    /// return them in a [`PerHopPaddingEventVec`].
572
    ///
573
    /// Alert any registered `Waker` if these events cause us to need to take action
574
    /// earlier than `next_scheduled_wakeup`.
575
    fn take_padding_events_at(
576
        &mut self,
577
        now: Instant,
578
        next_scheduled_wakeup: Option<Instant>,
579
    ) -> PerHopPaddingEventVec;
580

            
581
    /// This method should be called when we have no actions to perform,
582
    /// with a [`Waker`] that will activate the corresponding [`PaddingEventStream`](super::PaddingEventStream).
583
    ///
584
    /// It will return a time at which pending_events_at() should next be called,
585
    /// and will wake up the Waker if it turns out that we need to call `pending_events_at()`
586
    /// any earlier than that.
587
    fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant>;
588
}
589

            
590
impl<const N: usize> PaddingBackend for MaybenotPadder<N> {
591
    fn report_events_at(
592
        &mut self,
593
        events: &[maybenot::TriggerEvent],
594
        now: Instant,
595
        next_scheduled_wakeup: Option<Instant>,
596
    ) {
597
        self.trigger_events_at(events, now, next_scheduled_wakeup);
598
    }
599

            
600
    fn take_padding_events_at(
601
        &mut self,
602
        now: Instant,
603
        next_scheduled_wakeup: Option<Instant>,
604
    ) -> PerHopPaddingEventVec {
605
        self.take_actions_at(now, next_scheduled_wakeup)
606
    }
607

            
608
    fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
609
        self.get_expiration(waker)
610
    }
611
}