1
//! Simple provider of simulated time
2
//!
3
//! See [`SimpleMockTimeProvider`]
4

            
5
use std::cmp::Reverse;
6
use std::future::Future;
7
use std::pin::Pin;
8
use std::sync::{Arc, Mutex, MutexGuard};
9
use std::task::{Context, Poll, Waker};
10
use web_time_compat::{Duration, Instant, InstantExt, SystemTime, SystemTimeExt};
11

            
12
use derive_more::AsMut;
13
use priority_queue::priority_queue::PriorityQueue;
14
use slotmap_careful::DenseSlotMap;
15

            
16
use tor_rtcompat::CoarseInstant;
17
use tor_rtcompat::CoarseTimeProvider;
18
use tor_rtcompat::SleepProvider;
19

            
20
use crate::time_core::MockTimeCore;
21

            
22
/// Simple provider of simulated time
23
///
24
/// Maintains a mocked view of the current [`Instant`] and [`SystemTime`].
25
///
26
/// The simulated time advances only when explicitly instructed,
27
/// by calling [`.advance()`](Provider::advance).
28
///
29
/// The wallclock time can be warped with
30
/// [`.jump_wallclock()`](Provider::jump_wallclock),
31
/// allowing simulation of wall clock non-monotonicity.
32
///
33
/// # Panics and aborts
34
///
35
/// Panics on time under/overflow.
36
///
37
/// May cause an abort if the [`SimpleMockTimeProvider`] implementation contains bugs.
38
#[derive(Clone, Debug)]
39
pub struct SimpleMockTimeProvider {
40
    /// The actual state
41
    state: Arc<Mutex<State>>,
42
}
43

            
44
/// Convenience abbreviation
45
pub(crate) use SimpleMockTimeProvider as Provider;
46

            
47
/// Identifier of a [`SleepFuture`]
48
type Id = slotmap_careful::DefaultKey;
49

            
50
/// Future for `sleep`
51
///
52
/// Iff this struct exists, there is an entry for `id` in `prov.futures`.
53
/// (It might contain `None`.)
54
pub struct SleepFuture {
55
    /// Reference to our state
56
    prov: Provider,
57

            
58
    /// Which `SleepFuture` are we
59
    id: Id,
60
}
61

            
62
/// Mutable state for a [`Provider`]
63
///
64
/// Each sleep ([`Id`], [`SleepFuture`]) is in one of the following states:
65
///
66
/// | state       | [`SleepFuture`]  | `futures`        | `unready`          |
67
/// |-------------|------------------|------------------|--------------------|
68
/// | UNPOLLED    | exists           | present, `None`  | present, `> now`   |
69
/// | WAITING     | exists           | present, `Some`  | present, `> now`   |
70
/// | READY       | exists           | present, `None`  | absent             |
71
/// | DROPPED     | dropped          | absent           | absent             |
72
#[derive(Debug, AsMut)]
73
struct State {
74
    /// Current time (coarse)
75
    core: MockTimeCore,
76

            
77
    /// Futures; record of every existing [`SleepFuture`], including any `Waker`
78
    ///
79
    /// Entry exists iff `SleepFuture` exists.
80
    ///
81
    /// Contains `None` if we haven't polled the future;
82
    /// `Some` if we have.
83
    ///
84
    /// We could use a `Vec` or `TiVec`
85
    /// but using a slotmap is more robust against bugs here.
86
    futures: DenseSlotMap<Id, Option<Waker>>,
87

            
88
    /// Priority queue
89
    ///
90
    /// Subset of `futures`.
91
    ///
92
    /// An entry is present iff the `Instant` is *strictly* after `State.now`,
93
    /// in which case that's when the future should be woken.
94
    ///
95
    /// `PriorityQueue` is a max-heap but we want earliest times, hence `Reverse`
96
    unready: PriorityQueue<Id, Reverse<Instant>>,
97
}
98

            
99
/// `Default` makes a `Provider` which starts at whatever the current real time is
100
impl Default for Provider {
101
85648
    fn default() -> Self {
102
85648
        Self::from_real()
103
85648
    }
104
}
105

            
106
impl Provider {
107
    /// Return a new mock time provider starting at a specified point in time
108
86604
    pub fn new(now: Instant, wallclock: SystemTime) -> Self {
109
86604
        let state = State {
110
86604
            core: MockTimeCore::new(now, wallclock),
111
86604
            futures: Default::default(),
112
86604
            unready: Default::default(),
113
86604
        };
114
86604
        Provider {
115
86604
            state: Arc::new(Mutex::new(state)),
116
86604
        }
117
86604
    }
118

            
119
    /// Return a new mock time provider starting at the current actual (non-mock) time
120
    ///
121
    /// Like any [`SimpleMockTimeProvider`], the time is frozen and only changes
122
    /// due to calls to `advance`.
123
85648
    pub fn from_real() -> Self {
124
85648
        Provider::from_wallclock(SystemTime::get())
125
85648
    }
126
    /// Return a new mock time provider starting at a specified wallclock time
127
    ///
128
    /// The monotonic time ([`Instant`]) starts at the current actual (non-mock) time.
129
    /// (Absolute values of the real monotonic time are not readily
130
    /// observable or distinguishable from Rust,
131
    /// nor can a fixed `Instant` be constructed,
132
    /// so this is usually sufficient for a reproducible test.)
133
86432
    pub fn from_wallclock(wallclock: SystemTime) -> Self {
134
86432
        Provider::new(Instant::get(), wallclock)
135
86432
    }
136

            
137
    /// Advance the simulated time by `d`
138
    ///
139
    /// This advances both the `Instant` (monotonic time)
140
    /// and `SystemTime` (wallclock time)
141
    /// by the same amount.
142
    ///
143
    /// Will wake sleeping [`SleepFuture`]s, as appropriate.
144
    ///
145
    /// Note that the tasks which were waiting on those now-expired `SleepFuture`s
146
    /// will only actually execute when they are next polled.
147
    /// `advance` does not yield to the executor or poll any futures.
148
    /// The executor will (presumably) poll those woken tasks, when it regains control.
149
    /// But the order in which the tasks run will depend on its scheduling policy,
150
    /// and might be different to the order implied by the futures' timeout values.
151
    ///
152
    /// To simulate normal time advancement, wakeups, and task activations,
153
    /// use [`MockExecutor::advance_*()`](crate::MockRuntime).
154
1001452
    pub fn advance(&self, d: Duration) {
155
1001452
        let mut state = self.lock();
156
1001452
        state.core.advance(d);
157
1001452
        state.wake_any();
158
1001452
    }
159

            
160
    /// Warp the wallclock time
161
    ///
162
    /// This has no effect on any sleeping futures.
163
    /// It only affects the return value from [`.wallclock()`](Provider::wallclock).
164
1402
    pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
165
1402
        self.lock().core.jump_wallclock(new_wallclock);
166
        // Really we ought to wake people up, here.
167
        // But absolutely every Rust API is wrong: none offer a way to sleep until a SystemTime.
168
        // (There might be some less-portable non-Rust APIs for that.)
169
1402
    }
170

            
171
    /// When will the next timeout occur?
172
    ///
173
    /// Returns the duration until the next [`SleepFuture`] should wake up.
174
    ///
175
    /// Advancing time by at least this amount will wake up that future,
176
    /// and any others with the same wakeup time.
177
    ///
178
    /// Will never return `Some(ZERO)`:
179
    /// any future that is supposed to wake up now (or earlier) has indeed already been woken,
180
    /// so it is no longer sleeping and isn't included in the calculation.
181
3115266
    pub fn time_until_next_timeout(&self) -> Option<Duration> {
182
3115266
        let state = self.lock();
183
3115266
        let Reverse(until) = state.unready.peek()?.1;
184
        // The invariant (see `State`) guarantees that entries in `unready` are always `> now`,
185
        // so we don't whether duration_since would panic or saturate.
186
1393404
        let d = until.duration_since(state.core.instant());
187
1393404
        Some(d)
188
3115266
    }
189

            
190
    /// Convenience function to lock the state
191
8766946
    fn lock(&self) -> MutexGuard<'_, State> {
192
8766946
        self.state.lock().expect("simple time state poisoned")
193
8766946
    }
194
}
195

            
196
impl SleepProvider for Provider {
197
    type SleepFuture = SleepFuture;
198

            
199
79342
    fn sleep(&self, d: Duration) -> SleepFuture {
200
79342
        let mut state = self.lock();
201
79342
        let until = state.core.instant() + d;
202

            
203
79342
        let id = state.futures.insert(None);
204
79342
        state.unready.push(id, Reverse(until));
205

            
206
79342
        let fut = SleepFuture {
207
79342
            id,
208
79342
            prov: self.clone(),
209
79342
        };
210

            
211
        // This sleep is now UNPOLLED, except that its time might be `<= now`:
212

            
213
        // Possibly, `until` isn't *strictly* after than `state.now`, since d might be 0.
214
        // If so, .wake_any() will restore the invariant by immediately waking.
215
79342
        state.wake_any();
216

            
217
        // This sleep is now UNPOLLED or READY, according to whether duration was 0.
218

            
219
79342
        fut
220
79342
    }
221

            
222
3991086
    fn now(&self) -> Instant {
223
3991086
        self.lock().core.instant()
224
3991086
    }
225
131270
    fn wallclock(&self) -> SystemTime {
226
131270
        self.lock().core.wallclock()
227
131270
    }
228
}
229

            
230
impl CoarseTimeProvider for Provider {
231
289408
    fn now_coarse(&self) -> CoarseInstant {
232
289408
        self.lock().core.coarse().now_coarse()
233
289408
    }
234
}
235

            
236
impl Future for SleepFuture {
237
    type Output = ();
238

            
239
83642
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
240
83642
        let mut state = self.prov.lock();
241
83642
        if let Some((_, Reverse(scheduled))) = state.unready.get(&self.id) {
242
            // Presence of this entry implies scheduled > now: we are UNPOLLED or WAITING
243
63158
            assert!(*scheduled > state.core.instant());
244
63158
            let waker = Some(cx.waker().clone());
245
            // Make this be WAITING.  (If we're re-polled, we simply drop any previous waker.)
246
63158
            *state
247
63158
                .futures
248
63158
                .get_mut(self.id)
249
63158
                .expect("polling futures entry") = waker;
250
63158
            Poll::Pending
251
        } else {
252
            // Absence implies scheduled (no longer stored) <= now: we are READY
253
20484
            Poll::Ready(())
254
        }
255
83642
    }
256
}
257

            
258
impl State {
259
    /// Restore the invariant for `unready` after `now` has been increased
260
    ///
261
    /// Ie, ensures that any sleeps which are
262
    /// WAITING/UNPOLLED except that they are `<= now`,
263
    /// are moved to state READY.
264
1080794
    fn wake_any(&mut self) {
265
        loop {
266
1117182
            match self.unready.peek() {
267
                // Keep picking off entries with scheduled <= now
268
815650
                Some((_, Reverse(scheduled))) if *scheduled <= self.core.instant() => {
269
36388
                    let (id, _) = self.unready.pop().expect("vanished");
270
                    // We can .take() the waker since this can only ever run once
271
                    // per sleep future (since it happens when we pop it from unready).
272
36388
                    let futures_entry = self.futures.get_mut(id).expect("stale unready entry");
273
36388
                    if let Some(waker) = futures_entry.take() {
274
27202
                        waker.wake();
275
27202
                    }
276
                }
277
1080794
                _ => break,
278
            }
279
        }
280
1080794
    }
281
}
282

            
283
impl Drop for SleepFuture {
284
74078
    fn drop(&mut self) {
285
74078
        let mut state = self.prov.lock();
286
74078
        let _: Option<Waker> = state.futures.remove(self.id).expect("entry vanished");
287
74078
        let _: Option<(Id, Reverse<Instant>)> = state.unready.remove(&self.id);
288
        // Now it is DROPPED.
289
74078
    }
290
}
291

            
292
#[cfg(test)]
293
mod test {
294
    // @@ begin test lint list maintained by maint/add_warning @@
295
    #![allow(clippy::bool_assert_comparison)]
296
    #![allow(clippy::clone_on_copy)]
297
    #![allow(clippy::dbg_macro)]
298
    #![allow(clippy::mixed_attributes_style)]
299
    #![allow(clippy::print_stderr)]
300
    #![allow(clippy::print_stdout)]
301
    #![allow(clippy::single_char_pattern)]
302
    #![allow(clippy::unwrap_used)]
303
    #![allow(clippy::unchecked_time_subtraction)]
304
    #![allow(clippy::useless_vec)]
305
    #![allow(clippy::needless_pass_by_value)]
306
    #![allow(clippy::string_slice)] // See arti#2571
307
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
308
    use super::*;
309
    use crate::task::MockExecutor;
310
    use Poll::*;
311
    use futures::poll;
312
    use humantime::parse_rfc3339;
313
    use tor_rtcompat::ToplevelBlockOn as _;
314

            
315
    fn ms(ms: u64) -> Duration {
316
        Duration::from_millis(ms)
317
    }
318

            
319
    fn run_test<FUT>(f: impl FnOnce(Provider, MockExecutor) -> FUT)
320
    where
321
        FUT: Future<Output = ()>,
322
    {
323
        let sp = Provider::new(
324
            Instant::get(), // it would have been nice to make this fixed for the test
325
            parse_rfc3339("2000-01-01T00:00:00Z").unwrap(),
326
        );
327
        let exec = MockExecutor::new();
328
        exec.block_on(f(sp, exec.clone()));
329
    }
330

            
331
    #[test]
332
    fn simple() {
333
        run_test(|sp, _exec| async move {
334
            let n1 = sp.now();
335
            let w1 = sp.wallclock();
336
            let mut f1 = sp.sleep(ms(500));
337
            let mut f2 = sp.sleep(ms(1500));
338
            assert_eq!(poll!(&mut f1), Pending);
339
            sp.advance(ms(200));
340
            assert_eq!(n1 + ms(200), sp.now());
341
            assert_eq!(w1 + ms(200), sp.wallclock());
342
            assert_eq!(poll!(&mut f1), Pending);
343
            assert_eq!(poll!(&mut f2), Pending);
344
            drop(f2);
345
            sp.jump_wallclock(w1 + ms(10_000));
346
            sp.advance(ms(300));
347
            assert_eq!(n1 + ms(500), sp.now());
348
            assert_eq!(w1 + ms(10_300), sp.wallclock());
349
            assert_eq!(poll!(&mut f1), Ready(()));
350
            let mut f0 = sp.sleep(ms(0));
351
            assert_eq!(poll!(&mut f0), Ready(()));
352
        });
353
    }
354

            
355
    #[test]
356
    fn task() {
357
        run_test(|sp, exec| async move {
358
            let st = Arc::new(Mutex::new(0_i8));
359

            
360
            exec.spawn_identified("test task", {
361
                let st = st.clone();
362
                let sp = sp.clone();
363
                async move {
364
                    *st.lock().unwrap() = 1;
365
                    sp.sleep(ms(500)).await;
366
                    *st.lock().unwrap() = 2;
367
                    sp.sleep(ms(300)).await;
368
                    *st.lock().unwrap() = 3;
369
                }
370
            });
371

            
372
            let st = move || *st.lock().unwrap();
373

            
374
            assert_eq!(st(), 0);
375
            exec.progress_until_stalled().await;
376
            assert_eq!(st(), 1);
377
            assert_eq!(sp.time_until_next_timeout(), Some(ms(500)));
378

            
379
            sp.advance(ms(500));
380

            
381
            assert_eq!(st(), 1);
382
            assert_eq!(sp.time_until_next_timeout(), None);
383
            exec.progress_until_stalled().await;
384
            assert_eq!(st(), 2);
385
            assert_eq!(sp.time_until_next_timeout(), Some(ms(300)));
386

            
387
            sp.advance(ms(500));
388
            assert_eq!(st(), 2);
389
            assert_eq!(sp.time_until_next_timeout(), None);
390
            exec.progress_until_stalled().await;
391
            assert_eq!(sp.time_until_next_timeout(), None);
392
            assert_eq!(st(), 3);
393
        });
394
    }
395
}