1
//! Padding backend based on [`maybenot`].
2
//!
3
//! # Operation
4
//!
5
//! For each each circuit hop, we have an optional [`maybenot::Framework`].
6
//! This framework wraps multiple "padding machines",
7
//! each of which is a randomized state machine.
8
//! (Arti is built with a list of pre-configured of padding machines.
9
//! The set of padding machines to use with any given circuit hop
10
//! are negotiated via `PADDING_NEGOTIATE` messages.)
11
//! We interact with the framework via
12
//! [`Framework::trigger_events`](maybenot::Framework::trigger_events),
13
//! which consumes [`TriggerEvent`]s and gives us [`TriggerAction`]s.
14
//! Those `TriggerAction`s tell us to schedule or reschedule different timers,
15
//! to block traffic, or to send padding.
16
//!
17
//! We wrap the `Framework` in [`MaybenotPadder`],
18
//! which keeps track of the expiration time for each timer.
19
//! From `MaybenotPadder`, we expose a single timer
20
//! describing when the next action from the padding machine may be necessary.
21
//! This timer is likely to update frequently.
22

            
23
use std::{sync::Arc, task::Waker};
24

            
25
use maybenot::{MachineId, TriggerEvent};
26
use smallvec::SmallVec;
27

            
28
use super::{Bypass, Duration, Instant, PerHopPaddingEvent, PerHopPaddingEventVec, Replace};
29

            
30
/// The Rng that we construct for our padding machines.
31
///
32
/// We use a separate type alias here in case we want to move to a per-Framework
33
/// ChaCha8Rng or something.
34
type Rng = ThisThreadRng;
35

            
36
/// The particular instantiated padding framework type that we use.
37
type Framework = maybenot::Framework<Arc<[maybenot::Machine]>, Rng, Instant>;
38

            
39
/// A [`maybenot::TriggerAction`] as we construct it for use with our [`Framework`]s.
40
type TriggerAction = maybenot::TriggerAction<Instant>;
41

            
42
/// A type we use to report events that we must trigger on the basis of triggering other events.
43
///
44
/// We've optimized here for the assumption that we _usually_ won't need to trigger more than one
45
/// event.
46
type TriggerEventsOutVec = SmallVec<[TriggerEvent; 1]>;
47

            
48
/// An action that we should take on a machine's behalf,
49
/// after a certain interval has elapsed.
50
#[derive(Clone, Debug)]
51
enum ScheduledAction {
52
    /// We should send padding if and when the machine's action timer expires.
53
    SendPadding {
54
        /// Send padding even if bypassable blocking is in place.
55
        /// (Blocking can be bypassable or non-bypassable.)
56
        bypass: bool,
57
        /// If an existing non-padding cell is queued,
58
        /// it can replace this padding.
59
        //
60
        /// If `bypass` is true, such a cell can also bypass bypassable blocking.
61
        replace: bool,
62
    },
63
    /// We should block outbound traffic if and when the machine's action timer expires.
64
    Block {
65
        /// If true, then the blocking is bypassable.
66
        bypass: bool,
67
        /// If true, then we should change the duration of the current blocking unconditionally.
68
        /// If false, we should use whichever duration is longer.
69
        replace: bool,
70
        /// The interval of the blocking that we should apply.
71
        duration: Duration,
72
    },
73
}
74

            
75
/// The state for a _single_ padding Machine within a Framework.
76
#[derive(Default, Clone, Debug)]
77
struct MachineState {
78
    /// The current state for the machine's "internal timer".
79
    ///
80
    /// Each machine has a single internal timer,
81
    /// and manages the timer itself via the `UpdateTimer` and `Cancel`
82
    /// [`TriggerAction`] variants.
83
    internal_timer_expires: Option<Instant>,
84

            
85
    /// The current state for the machine's "action timer".
86
    ///
87
    /// Each machine has a single action timer, after which some [`ScheduledAction`]
88
    /// should be taken.
89
    ///
90
    /// (Note that only one action can be scheduled per machine,
91
    /// so if we're told to schedule blocking, we should cancel padding;
92
    /// and if we're told to schedule padding, we should cancel blocking.)
93
    action_timer_expires: Option<(Instant, ScheduledAction)>,
94
}
95

            
96
impl MachineState {
97
    /// Return the earliest time that either of this machine's timers will expire.
98
    fn next_expiration(&self) -> Option<Instant> {
99
        match (&self.internal_timer_expires, &self.action_timer_expires) {
100
            (None, None) => None,
101
            (None, Some((t, _))) => Some(*t),
102
            (Some(t), None) => Some(*t),
103
            (Some(t1), Some((t2, _))) => Some(*t1.min(t2)),
104
        }
105
    }
106
}
107

            
108
/// Represents the state for all padding machines within a framework.
109
///
110
/// N should be around the number of padding machines that the framework should support.
111
struct PadderState<const N: usize> {
112
    /// A list of all the padding machine states for a single framework.
113
    ///
114
    /// This list is indexed by `MachineId`.
115
    //
116
    // TODO: Optimize this size even more if appropriate
117
    state: SmallVec<[MachineState; N]>,
118
}
119

            
120
impl<const N: usize> PadderState<N> {
121
    /// Return a mutable reference to the state corresponding to a given [`MachineId`]
122
    ///
123
    /// # Panics
124
    ///
125
    /// Panics if `id` is out of range, which can only happen if a MachineId from
126
    /// one Framework is given to another Framework.
127
    fn state_mut(&mut self, id: MachineId) -> &mut MachineState {
128
        &mut self.state[id.into_raw()]
129
    }
130

            
131
    /// Execute a single [`TriggerAction`] on this state.
132
    ///
133
    /// `TriggerActions` are created by `maybenot::Framework` instances
134
    /// in response to [`TriggerEvent`]s.
135
    ///
136
    /// Executing a `TriggerAction` can adjust timers,
137
    /// and can schedule a new [`ScheduledAction`] to be taken in the future;
138
    /// it does not, however, send any padding or adjust any blocking on its own.
139
    ///
140
    /// The current time should be provided in `now`.
141
    ///
142
    /// Executing a `TriggerAction` can cause more events to occur.
143
    /// If this happens, they are added to `events_out`.
144
    ///
145
    /// If this method returns false, no timer has changed.
146
    /// If this method returns true, then a timer may have changed.
147
    /// (False positives are possible, but not false negatives.)
148
    fn trigger_action(
149
        &mut self,
150
        action: &TriggerAction,
151
        now: Instant,
152
        events_out: &mut TriggerEventsOutVec,
153
    ) -> bool {
154
        use maybenot::Timer as T;
155
        use maybenot::TriggerAction as A;
156

            
157
        let mut timer_changed = false;
158

            
159
        match action {
160
            A::Cancel { machine, timer } => {
161
                // "Cancel" means to stop one or both of the timers from this machine.
162
                let st = self.state_mut(*machine);
163
                match timer {
164
                    T::Action => st.action_timer_expires = None,
165
                    T::Internal => st.internal_timer_expires = None,
166
                    T::All => {
167
                        st.action_timer_expires = None;
168
                        st.internal_timer_expires = None;
169
                    }
170
                };
171
                timer_changed = true;
172
            }
173
            A::SendPadding {
174
                timeout,
175
                bypass,
176
                replace,
177
                machine,
178
            } => {
179
                // "SendPadding" means to schedule padding to be sent after a given timeout,
180
                // and to replace any previous timed action.
181
                let st = self.state_mut(*machine);
182
                st.action_timer_expires = Some((
183
                    now + *timeout,
184
                    ScheduledAction::SendPadding {
185
                        bypass: *bypass,
186
                        replace: *replace,
187
                    },
188
                ));
189
                timer_changed = true;
190
            }
191
            A::BlockOutgoing {
192
                timeout,
193
                duration,
194
                bypass,
195
                replace,
196
                machine,
197
            } => {
198
                // "BlockOutgoing" means to begin blocking traffic for a given duration,
199
                // after a given timeout,
200
                // and to replace any previous timed action.
201
                let st = self.state_mut(*machine);
202
                st.action_timer_expires = Some((
203
                    now + *timeout,
204
                    ScheduledAction::Block {
205
                        bypass: *bypass,
206
                        replace: *replace,
207
                        duration: *duration,
208
                    },
209
                ));
210
                timer_changed = true;
211
            }
212
            A::UpdateTimer {
213
                duration,
214
                replace,
215
                machine,
216
            } => {
217
                // "UpdateTimer" means to set or re-set the internal timer for this machine.
218
                let st = self.state_mut(*machine);
219

            
220
                let new_expiry = now + *duration;
221
                // The "replace" flag means "update the internal timer unconditionally".
222
                // If it is false, and the timer is already set, then we should only update
223
                // the internal timer to be _longer_.
224
                let update_timer = match (replace, st.internal_timer_expires) {
225
                    (_, None) => true,
226
                    (true, Some(_)) => true,
227
                    (false, Some(cur)) if new_expiry > cur => true,
228
                    (false, Some(_)) => false,
229
                };
230
                if update_timer {
231
                    st.internal_timer_expires = Some(new_expiry);
232
                    timer_changed = true;
233
                }
234
                // Note: We are supposed to trigger TimerBegin unconditionally
235
                // if the timer changes at all.
236
                events_out.push(TriggerEvent::TimerBegin { machine: *machine });
237
            }
238
        }
239

            
240
        timer_changed
241
    }
242

            
243
    /// Return the next instant (if any) at which any of the padding machines' timers will expire.
244
    fn next_expiration(&self) -> Option<Instant> {
245
        self.state
246
            .iter()
247
            .filter_map(MachineState::next_expiration)
248
            .min()
249
    }
250
}
251

            
252
/// Possible state of a Framework's aggregate timer.
253
///
254
/// (Since there are two possible timers for each Machine,
255
/// we just keep track of the one that will expire next.)
256
#[derive(Clone, Debug)]
257
struct Timer {
258
    /// The next time at which any of this padding machines' timer will expire.
259
    ///
260
    /// (None means "no timers are set.")
261
    next_expiration: Option<Instant>,
262

            
263
    /// A [`Waker`] that we must wake whenever `self.next_expiration` becomes sooner than
264
    /// our next scheduled wakeup (as passed as an argument to `set_expiration`).
265
    waker: Waker,
266
}
267

            
268
impl Timer {
269
    /// Construct a new Timer.
270
    fn new() -> Self {
271
        Self {
272
            next_expiration: None,
273
            waker: Waker::noop().clone(),
274
        }
275
    }
276

            
277
    /// Return the next expiration time, and schedule `waker` to be alerted whenever
278
    /// the expiration time becomes earlier than the time at which we've actually decided to sleep
279
    /// (passed as an argument to `set_expiration()`).
280
    ///
281
    /// (There are two separate expiration times at work here because, in higher-level code,
282
    /// we combine _all_ the timer expirations for all padding machines on a circuit
283
    /// into a single expiration, and track only that expiration.)
284
    fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
285
        // TODO: Perhaps this should instead return and/or manipulate a sleep future.
286
        // TODO: Perhaps there should be a shared AtomicWaker?
287
        self.waker = waker.clone();
288
        self.next_expiration
289
    }
290

            
291
    /// Change the expiration time to `new_expiration`, alerting the [`Waker`] if that time
292
    /// is earlier than `next_scheduled_wakeup`.
293
    fn set_expiration(
294
        &mut self,
295
        new_expiration: Option<Instant>,
296
        next_scheduled_wakeup: Option<Instant>,
297
    ) {
298
        // we need to invoke the waker if the new expiration is earlier than the one the waker has.
299
        let wake = match (next_scheduled_wakeup, new_expiration) {
300
            (_, None) => false,
301
            (None, Some(_)) => true,
302
            (Some(w_exp), Some(new_exp)) => new_exp < w_exp,
303
        };
304
        self.next_expiration = new_expiration;
305
        if wake {
306
            self.waker.wake_by_ref();
307
        }
308
    }
309
}
310

            
311
/// State of a MaybenotPadder that is blocking.
312
///
313
/// Here we only need to remember when the blocking expires;
314
/// we record the bypassable status of the padding in [`super::PaddingShared`].
315
#[derive(Debug)]
316
struct BlockingState {
317
    /// The time at which this blocking expires.
318
    expiration: Instant,
319
}
320

            
321
/// An implementation of circuit padding using [`maybenot`].
322
///
323
/// Supports up to `N` padding machines without spilling over onto the heap.
324
pub(super) struct MaybenotPadder<const N: usize> {
325
    /// Our underlying [`maybenot::Framework`].
326
    framework: Framework,
327
    /// The state of our padding machines.
328
    state: PadderState<N>,
329
    /// Our current timer information.
330
    timer: Timer,
331
    /// If we are blocking, information about the blocking.
332
    blocking: Option<BlockingState>,
333
}
334

            
335
impl<const N: usize> MaybenotPadder<N> {
336
    /// Construct a new MaybyenotPadder from a provided `FrameworkRules`.
337
    pub(super) fn from_framework_rules(
338
        rules: &super::PaddingRules,
339
    ) -> Result<Self, maybenot::Error> {
340
        let framework = maybenot::Framework::new(
341
            rules.machines.clone(),
342
            rules.max_outbound_padding_frac,
343
            rules.max_outbound_blocking_frac,
344
            Instant::now(),
345
            ThisThreadRng,
346
        )?;
347
        Ok(Self::from_framework(framework))
348
    }
349

            
350
    /// Construct a new MaybenotPadder from a given Framework.
351
    pub(super) fn from_framework(framework: Framework) -> Self {
352
        let n = framework.num_machines();
353
        let state = PadderState {
354
            state: smallvec::smallvec![MachineState::default(); n],
355
        };
356
        Self {
357
            framework,
358
            state,
359
            timer: Timer::new(),
360
            blocking: None,
361
        }
362
    }
363

            
364
    /// Return the next expiration time, and schedule `waker` to be alerted whenever
365
    /// the expiration time becomes earlier than that.
366
    pub(super) fn get_expiration(&mut self, waker: &Waker) -> Option<Instant> {
367
        self.timer.get_expiration(waker)
368
    }
369

            
370
    /// Tell the padding machines about all of the given `events`,
371
    /// report them happening at `now`, and adjust internal state.
372
    ///
373
    /// If doing this would cause any timer to become earlier than `next_scheduled_wakeup`,
374
    /// wake up the registered [`Waker`].
375
    pub(super) fn trigger_events_at(
376
        &mut self,
377
        events: &[TriggerEvent],
378
        now: Instant,
379
        next_scheduled_wakeup: Option<Instant>,
380
    ) {
381
        let mut timer_changed = false;
382

            
383
        // A pair of buffers that we'll use to handle events that arise while triggering other
384
        // events.  (The BeginTimer event can be triggered by the UpdateTimer action.)
385
        let (mut e1, mut e2) = (TriggerEventsOutVec::new(), TriggerEventsOutVec::new());
386
        let (mut processing, mut pending) = (&mut e1, &mut e2);
387

            
388
        let mut events = events;
389

            
390
        /// If we go through our loop more than this many times, we stop:
391
        /// An infinite loop is in theory possible, but we don't want to allow one.
392
        const MAX_LOOPS: usize = 4;
393

            
394
        let finished_normally = 'finished: {
395
            for _ in 0..MAX_LOOPS {
396
                pending.clear();
397
                for action in self.framework.trigger_events(events, now) {
398
                    timer_changed |= self.state.trigger_action(action, now, pending);
399
                }
400

            
401
                if pending.is_empty() {
402
                    // We don't have any additional events to trigger.
403
                    break 'finished true;
404
                } else {
405
                    std::mem::swap(&mut processing, &mut pending);
406
                    events = &processing[..];
407
                }
408
            }
409
            // We got to the last iteration of the loop and still had events to trigger.
410
            break 'finished false;
411
        };
412

            
413
        if !finished_normally {
414
            // TODO: Log in this case, but not too many times.
415
        }
416

            
417
        if timer_changed {
418
            self.timer
419
                .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
420
        }
421
    }
422

            
423
    /// Take any actions that need to occur at time `now`.
424
    ///
425
    /// We should call this function as soon as possible after our timer has expired.
426
    ///
427
    /// Returns zero or more [`PerHopPaddingEvent`]s reflecting the padding that we should send,
428
    /// and what we should do with blocking.
429
    fn take_actions_at(
430
        &mut self,
431
        now: Instant,
432
        next_scheduled_wakeup: Option<Instant>,
433
    ) -> PerHopPaddingEventVec {
434
        // Events that we need to trigger based on expired timers.
435
        // TODO: We might want a smaller N here.
436
        let mut e: SmallVec<[TriggerEvent; N]> = SmallVec::default();
437

            
438
        // A list of events that we can't handle internally, and which we need to report
439
        // to a circuit/tunnel reactor.
440
        let mut return_events = PerHopPaddingEventVec::default();
441

            
442
        let mut timer_changed = false;
443

            
444
        if let Some(blocking) = &self.blocking {
445
            if blocking.expiration <= now {
446
                timer_changed = true;
447
                self.blocking = None;
448
                e.push(TriggerEvent::BlockingEnd);
449
                return_events.push(PerHopPaddingEvent::StopBlocking);
450
            }
451
        }
452

            
453
        for (idx, st) in self.state.state.iter_mut().enumerate() {
454
            match st.internal_timer_expires {
455
                Some(t) if t <= now => {
456
                    // This machine's internal timer has expired; we tell it so.
457
                    st.internal_timer_expires = None;
458
                    timer_changed = true;
459
                    e.push(TriggerEvent::TimerEnd {
460
                        machine: MachineId::from_raw(idx),
461
                    });
462
                }
463
                None | Some(_) => {}
464
            }
465
            match &st.action_timer_expires {
466
                Some((t, _)) if *t <= now => {
467
                    // This machine's action timer has expired; we now take that action.
468
                    use ScheduledAction as SA;
469
                    let action = st
470
                        .action_timer_expires
471
                        .take()
472
                        .expect("It was Some a minute ago!")
473
                        .1;
474
                    timer_changed = true;
475
                    match action {
476
                        SA::SendPadding { bypass, replace } => {
477
                            return_events.push(PerHopPaddingEvent::SendPadding {
478
                                machine: MachineId::from_raw(idx),
479
                                replace: Replace::from_bool(replace),
480
                                bypass: Bypass::from_bool(bypass),
481
                            });
482
                        }
483
                        SA::Block {
484
                            bypass,
485
                            replace,
486
                            duration,
487
                        } => {
488
                            let new_expiry = now + duration;
489
                            if self.blocking.is_none() {
490
                                return_events.push(PerHopPaddingEvent::StartBlocking {
491
                                    is_bypassable: bypass,
492
                                });
493
                            }
494
                            let replace = match &self.blocking {
495
                                None => true,
496
                                Some(b) if replace || b.expiration < new_expiry => true,
497
                                Some(_) => false,
498
                            };
499
                            if replace {
500
                                self.blocking = Some(BlockingState {
501
                                    expiration: new_expiry,
502
                                });
503
                            }
504

            
505
                            // We trigger this event unconditionally, even if we were already
506
                            // blocking.
507
                            e.push(TriggerEvent::BlockingBegin {
508
                                machine: MachineId::from_raw(idx),
509
                            });
510
                        }
511
                    }
512
                }
513
                None | Some(_) => {}
514
            }
515
        }
516

            
517
        if timer_changed {
518
            self.timer
519
                .set_expiration(self.state.next_expiration(), next_scheduled_wakeup);
520
        }
521

            
522
        // Inform the framework of any expired timeouts.
523
        self.trigger_events_at(&e[..], now, next_scheduled_wakeup);
524

            
525
        return_events
526
    }
527
}
528

            
529
/// Helper: An Rng object that calls `rand::rng()` to get the thread Rng.
530
///
531
/// (We use this since we want our maybenot Framework to use the thread Rng,
532
/// but we can't have it _own_ the thread Rng. )
533
#[derive(Clone, Debug)]
534
pub(super) struct ThisThreadRng;
535
impl rand::RngCore for ThisThreadRng {
536
    fn next_u32(&mut self) -> u32 {
537
        rand::rng().next_u32()
538
    }
539

            
540
    fn next_u64(&mut self) -> u64 {
541
        rand::rng().next_u64()
542
    }
543

            
544
    fn fill_bytes(&mut self, dst: &mut [u8]) {
545
        rand::rng().fill_bytes(dst);
546
    }
547
}
548

            
549
/// Helper trait: Used to wrap a single [`MaybenotPadder`].
550
///
551
/// (We don't use `MaybenotPadder` directly because we want to keep the freedom
552
/// to parameterize it differently, or maybe even to replace it with something else.)
553
//
554
// TODO circpad: Decide whether this optimization/flexibility makes any sense.
555
pub(super) trait PaddingBackend: Send + Sync {
556
    /// Report one or more TriggerEvents to the padder.
557
    ///
558
    /// Alert any registered `Waker` if these events cause us to need to take action
559
    /// earlier than `next_scheduled_wakeup`.
560
    fn report_events_at(
561
        &mut self,
562
        events: &[maybenot::TriggerEvent],
563
        now: Instant,
564
        next_scheduled_wakeup: Option<Instant>,
565
    );
566

            
567
    /// Trigger any padding actions that should be taken `now`.
568
    ///
569
    /// If _we_ should perform any actions (blocking, unblocking, or sending padding),
570
    /// return them in a [`PerHopPaddingEventVec`].
571
    ///
572
    /// Alert any registered `Waker` if these events cause us to need to take action
573
    /// earlier than `next_scheduled_wakeup`.
574
    fn take_padding_events_at(
575
        &mut self,
576
        now: Instant,
577
        next_scheduled_wakeup: Option<Instant>,
578
    ) -> PerHopPaddingEventVec;
579

            
580
    /// This method should be called when we have no actions to perform,
581
    /// with a [`Waker`] that will activate the corresponding [`PaddingEventStream`](super::PaddingEventStream).
582
    ///
583
    /// It will return a time at which pending_events_at() should next be called,
584
    /// and will wake up the Waker if it turns out that we need to call `pending_events_at()`
585
    /// any earlier than that.
586
    fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant>;
587
}
588

            
589
impl<const N: usize> PaddingBackend for MaybenotPadder<N> {
590
    fn report_events_at(
591
        &mut self,
592
        events: &[maybenot::TriggerEvent],
593
        now: Instant,
594
        next_scheduled_wakeup: Option<Instant>,
595
    ) {
596
        self.trigger_events_at(events, now, next_scheduled_wakeup);
597
    }
598

            
599
    fn take_padding_events_at(
600
        &mut self,
601
        now: Instant,
602
        next_scheduled_wakeup: Option<Instant>,
603
    ) -> PerHopPaddingEventVec {
604
        self.take_actions_at(now, next_scheduled_wakeup)
605
    }
606

            
607
    fn next_wakeup(&mut self, waker: &Waker) -> Option<Instant> {
608
        self.get_expiration(waker)
609
    }
610
}