1
//! Tests for bridge descriptor downloading
2

            
3
// @@ begin test lint list maintained by maint/add_warning @@
4
#![allow(clippy::bool_assert_comparison)]
5
#![allow(clippy::clone_on_copy)]
6
#![allow(clippy::dbg_macro)]
7
#![allow(clippy::mixed_attributes_style)]
8
#![allow(clippy::print_stderr)]
9
#![allow(clippy::print_stdout)]
10
#![allow(clippy::single_char_pattern)]
11
#![allow(clippy::unwrap_used)]
12
#![allow(clippy::unchecked_time_subtraction)]
13
#![allow(clippy::useless_vec)]
14
#![allow(clippy::needless_pass_by_value)]
15
#![allow(clippy::string_slice)] // See arti#2571
16
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
17

            
18
use std::future::Future;
19
use std::iter;
20
use std::time::UNIX_EPOCH;
21

            
22
use futures::Stream;
23
use futures::select_biased;
24
use futures::stream::FusedStream;
25
use itertools::{Itertools, chain};
26
use tempfile::TempDir;
27
use time::OffsetDateTime;
28
use tracing_test::traced_test;
29

            
30
use tor_linkspec::HasAddrs;
31
use tor_rtcompat::SleepProvider;
32
use tor_rtmock::MockRuntime;
33
use tor_rtmock::simple_time::SimpleMockTimeProvider;
34

            
35
use super::*;
36

            
37
const EXAMPLE_DESCRIPTOR: &str = include_str!("../../testdata/routerdesc1.txt");
38
const EXAMPLE_PORT: u16 = 9001;
39

            
40
24
fn example_validity() -> (SystemTime, SystemTime) {
41
24
    let (_, (t, u)) = RouterDesc::parse(EXAMPLE_DESCRIPTOR)
42
24
        .unwrap()
43
24
        .dangerously_assume_wellsigned()
44
24
        .dangerously_into_parts();
45
60
    let ret = |tb| match tb {
46
48
        Some(t) => t,
47
        None => panic!("Time range does not have a starting bound"),
48
48
    };
49
24
    (ret(t), ret(u))
50
24
}
51
16
fn example_wallclock() -> SystemTime {
52
16
    example_validity().0 + Duration::from_secs(10)
53
16
}
54

            
55
type R = MockRuntime;
56
type M = Mock;
57
type Bdm = BridgeDescMgr<R, M>;
58
type RT = RetryTime;
59
use Error::TestError as TE;
60

            
61
#[derive(Debug, Clone)]
62
struct Mock {
63
    sleep: SimpleMockTimeProvider,
64

            
65
    // Using an async mutex lets us block a call to `download`
66
    // so we can see what the state is mid-download.
67
    mstate: Arc<futures::lock::Mutex<MockState>>,
68
}
69

            
70
const MOCK_NOT_MODIFIED: &str = "IF-MODIFIED-SINCE ";
71

            
72
struct MockState {
73
    /// Maps the port number for a download, to what we should return
74
    ///
75
    /// If the Ok string starts with `MOCK_NOT_MODIFIED` then the rest is the Debug
76
    /// output from a SystemTime.   In this case the manager is supposed to pass
77
    /// `if_modified_since` as `Some(that SystemTime)`, and we will actually return `None`.
78
    ///
79
    /// Otherwise the `if_modified_since` from the manager will be ignored
80
    /// and we always give it Some.
81
    docs: HashMap<u16, Result<String, Error>>,
82

            
83
    download_calls: usize,
84
}
85

            
86
impl Mockable<R> for Mock {}
87

            
88
#[async_trait]
89
impl mockable::MockableAPI<R> for Mock {
90
    type CircMgr = ();
91

            
92
    async fn download(
93
        self,
94
        _runtime: &R,
95
        _circmgr: &Self::CircMgr,
96
        bridge: &BridgeConfig,
97
        if_modified_since: Option<SystemTime>,
98
88
    ) -> Result<Option<String>, Error> {
99
        eprint!("download ...");
100
        let mut mstate = self.mstate.lock().await;
101
        mstate.download_calls += 1;
102
        eprintln!("#{} {:?}", mstate.download_calls, bridge);
103
        let addr = bridge
104
            .addrs()
105
            .next()
106
            .ok_or(TE("bridge has no error", RT::Never))?;
107
        let doc = mstate
108
            .docs
109
            .get(&addr.port())
110
            .ok_or(TE("no document", RT::AfterWaiting))?;
111
24
        doc.clone().map(|text| {
112
24
            if let Some(expect_ims) = text.strip_prefix(MOCK_NOT_MODIFIED) {
113
4
                eprintln!("#{} {:?}", mstate.download_calls, text);
114
4
                assert_eq!(format!("{:?}", if_modified_since.unwrap()), expect_ims,);
115
4
                None
116
            } else {
117
20
                Some(text)
118
            }
119
24
        })
120
88
    }
121
}
122

            
123
impl Mock {
124
60
    async fn expect_download_calls(&self, expected: usize) {
125
40
        let mut mstate = self.mstate.lock().await;
126
40
        assert_eq!(mstate.download_calls, expected);
127
40
        mstate.download_calls = 0;
128
40
    }
129
}
130

            
131
16
fn setup(runtime: MockRuntime) -> (TempDir, Bdm, R, M, BridgeKey, rusqlite::Connection) {
132
16
    let sleep = runtime.mock_sleep().clone();
133
16
    sleep.jump_wallclock(example_wallclock());
134

            
135
16
    let mut docs = HashMap::new();
136
16
    docs.insert(EXAMPLE_PORT, Ok(EXAMPLE_DESCRIPTOR.into()));
137

            
138
16
    let mstate = Arc::new(futures::lock::Mutex::new(MockState {
139
16
        docs,
140
16
        download_calls: 0,
141
16
    }));
142

            
143
16
    let mock = Mock { sleep, mstate };
144

            
145
16
    let (db_tmp_dir, store) = crate::storage::sqlite::test::new_empty().unwrap();
146
16
    let store = Arc::new(Mutex::new(Box::new(store) as _));
147

            
148
16
    let sql_path = db_tmp_dir.path().join("db.sql");
149
16
    let conn = rusqlite::Connection::open(sql_path).unwrap();
150

            
151
16
    let bdm = BridgeDescMgr::<R, M>::new_internal(
152
16
        runtime.clone(),
153
16
        (),
154
16
        store,
155
16
        &Default::default(),
156
16
        Dormancy::Active,
157
16
        mock.clone(),
158
    )
159
16
    .unwrap();
160

            
161
16
    let bridge = "51.68.172.83:9001 EB6EFB27F29AC9511A4246D7ABE1AFABFB416FF1"
162
16
        .parse()
163
16
        .unwrap();
164

            
165
16
    (db_tmp_dir, bdm, runtime, mock, bridge, conn)
166
16
}
167

            
168
20
async fn stream_drain_ready<S: Stream + Unpin + FusedStream>(s: &mut S) -> usize {
169
20
    let mut count = 0;
170
24
    while select_biased! {
171
24
        _ = s.next() => true,
172
24
        () = future::ready(()) => false,
173
    } {
174
4
        tor_rtcompat::task::yield_now().await;
175
4
        count += 1;
176
    }
177
20
    count
178
20
}
179

            
180
48
async fn stream_drain_until<S, F, FF, Y>(attempts: usize, s: &mut S, mut f: F) -> Y
181
48
where
182
48
    S: Stream + Unpin + FusedStream,
183
48
    S::Item: Debug,
184
48
    F: FnMut() -> FF,
185
48
    FF: Future<Output = Option<Y>>,
186
48
{
187
48
    for _ in 0..attempts {
188
72
        let event = s.next().await;
189
72
        eprintln!("stream_drain_until, got {:?}", event);
190

            
191
72
        if let Some(y) = f().await {
192
48
            return y;
193
24
        }
194
    }
195
    panic!("untilness didn't occur");
196
48
}
197

            
198
12
fn queues_are_empty(bdm: &Bdm) -> Option<()> {
199
12
    let state = bdm.mgr.lock_only();
200
12
    (state.running.is_empty() && state.queued.is_empty()).then_some(())
201
12
}
202

            
203
24
fn in_results(bdm: &Bdm, bridge: &BridgeKey, wanted: Option<Result<(), ()>>) -> Option<()> {
204
24
    let bridges = bdm.bridges();
205
24
    let got = bridges.get(bridge);
206
32
    let got = got.map(|got| got.as_ref().map(|_| ()).map_err(|_| ()));
207
24
    (got == wanted).then_some(())
208
24
}
209

            
210
8
async fn clear_and_re_request<S>(bdm: &Bdm, events: &mut S, bridge: &BridgeKey)
211
8
where
212
8
    S: Stream + Unpin + FusedStream,
213
8
    S::Item: Debug,
214
8
{
215
8
    bdm.set_bridges(&[]);
216
8
    stream_drain_until(3, events, || async {
217
8
        in_results(bdm, bridge, None)
218
8
            .and_then(|()| bdm.mgr.lock_only().running.is_empty().then_some(()))
219
16
    })
220
8
    .await;
221
8
    bdm.set_bridges(std::slice::from_ref(bridge));
222
8
}
223

            
224
24
fn bad_bridge(i: usize) -> BridgeKey {
225
24
    let bad = format!("192.126.0.1:{} EB6EFB27F29AC9511A4246D7ABE1AFABFB416FF1", i);
226
24
    let bad: BridgeConfig = bad.parse().unwrap();
227
24
    bad
228
24
}
229

            
230
#[traced_test]
231
#[test]
232
2
fn success() -> Result<(), anyhow::Error> {
233
5
    MockRuntime::try_test_with_various(|runtime| async {
234
4
        let (_db_tmp_dir, bdm, runtime, mock, bridge, ..) = setup(runtime);
235

            
236
4
        bdm.check_consistency(Some([]));
237

            
238
4
        let mut events = bdm.events().fuse();
239

            
240
4
        eprintln!("----- test downloading one descriptor -----");
241

            
242
4
        stream_drain_ready(&mut events).await;
243

            
244
4
        let hold = mock.mstate.lock().await;
245

            
246
4
        bdm.set_bridges(std::slice::from_ref(&bridge));
247
4
        bdm.check_consistency(Some([&bridge]));
248

            
249
4
        drop(hold);
250

            
251
4
        let got = stream_drain_until(3, &mut events, || async {
252
4
            bdm.bridges().get(&bridge).cloned()
253
8
        })
254
4
        .await;
255

            
256
4
        dbg!(runtime.wallclock(), example_validity(),);
257

            
258
4
        eprintln!("got: {:?}", got.unwrap());
259

            
260
4
        bdm.check_consistency(Some([&bridge]));
261
4
        mock.expect_download_calls(1).await;
262

            
263
4
        eprintln!("----- add a number of failing descriptors -----");
264

            
265
        const NFAIL: usize = 6;
266

            
267
4
        let bad = (1..=NFAIL).map(bad_bridge).collect_vec();
268

            
269
4
        let mut bridges = chain!(iter::once(bridge.clone()), bad.iter().cloned(),).collect_vec();
270

            
271
4
        let hold = mock.mstate.lock().await;
272

            
273
4
        bdm.set_bridges(&bridges);
274
4
        bdm.check_consistency(Some(&bridges));
275

            
276
4
        drop(hold);
277

            
278
12
        let () = stream_drain_until(13, &mut events, || async {
279
12
            bdm.check_consistency(Some(&bridges));
280
12
            bridges
281
12
                .iter()
282
50
                .all(|b| bdm.bridges().contains_key(b))
283
12
                .then_some(())
284
24
        })
285
4
        .await;
286

            
287
24
        for b in &bad {
288
24
            bdm.bridges().get(b).unwrap().as_ref().unwrap_err();
289
24
        }
290

            
291
4
        bdm.check_consistency(Some(&bridges));
292
4
        mock.expect_download_calls(NFAIL).await;
293

            
294
4
        eprintln!("----- move the clock forward to do some retries ----------");
295

            
296
4
        mock.sleep.advance(Duration::from_secs(5000));
297

            
298
4
        bdm.check_consistency(Some(&bridges));
299

            
300
12
        let () = stream_drain_until(13, &mut events, || async {
301
12
            bdm.check_consistency(Some(&bridges));
302
12
            (mock.mstate.lock().await.download_calls == NFAIL).then_some(())
303
24
        })
304
4
        .await;
305

            
306
4
        stream_drain_ready(&mut events).await;
307

            
308
4
        bdm.check_consistency(Some(&bridges));
309
4
        mock.expect_download_calls(NFAIL).await;
310

            
311
4
        eprintln!("----- set the bridges to the ones we have already ----------");
312

            
313
4
        let hold = mock.mstate.lock().await;
314

            
315
4
        bdm.set_bridges(&bridges);
316
4
        bdm.check_consistency(Some(&bridges));
317

            
318
4
        drop(hold);
319

            
320
4
        let events_counted = stream_drain_ready(&mut events).await;
321
4
        assert_eq!(events_counted, 0);
322
4
        bdm.check_consistency(Some(&bridges));
323
4
        mock.expect_download_calls(0).await;
324

            
325
4
        eprintln!("----- set the bridges to one fewer than we have already ----------");
326

            
327
4
        let _ = bridges.pop().unwrap();
328

            
329
4
        let hold = mock.mstate.lock().await;
330

            
331
4
        bdm.set_bridges(&bridges);
332
4
        bdm.check_consistency(Some(&bridges));
333

            
334
4
        drop(hold);
335

            
336
4
        let events_counted = stream_drain_ready(&mut events).await;
337
4
        assert_eq!(events_counted, 1);
338
4
        bdm.check_consistency(Some(&bridges));
339
4
        mock.expect_download_calls(0).await;
340

            
341
4
        eprintln!("----- remove a bridge while we have some requeued ----------");
342

            
343
4
        let hold = mock.mstate.lock().await;
344

            
345
4
        mock.sleep.advance(Duration::from_secs(8000));
346
4
        bdm.check_consistency(Some(&bridges));
347

            
348
        // should yield, but not produce any events yet
349
4
        let count = stream_drain_ready(&mut events).await;
350
4
        assert_eq!(count, 0);
351
4
        bdm.check_consistency(Some(&bridges));
352

            
353
4
        let removed = bridges.pop().unwrap();
354
4
        bdm.set_bridges(&bridges);
355

            
356
        // should produce a removed bridge event
357
4
        let () = stream_drain_until(1, &mut events, || async {
358
4
            bdm.check_consistency(Some(&bridges));
359
4
            (!bdm.bridges().contains_key(&removed)).then_some(())
360
8
        })
361
4
        .await;
362

            
363
4
        drop(hold);
364

            
365
        // Check that queues become empty.
366
        // Depending on scheduling, there may be tasks still live from the work above.
367
        // For example, one of the requeues might be still running after we did the remove.
368
        // So we may get a number of change events.  Certainly not more than 10.
369
12
        let () = stream_drain_until(10, &mut events, || async {
370
12
            bdm.check_consistency(Some(&bridges));
371
12
            queues_are_empty(&bdm)
372
24
        })
373
4
        .await;
374

            
375
        {
376
            // When we cancel the download, we race with the manager.
377
            // Maybe the download for the one we removed was started, or maybe not.
378
4
            let mut mstate = mock.mstate.lock().await;
379
4
            assert!(
380
4
                ((NFAIL - 1)..=NFAIL).contains(&mstate.download_calls),
381
                "{:?}",
382
                mstate.download_calls
383
            );
384
4
            mstate.download_calls = 0;
385
        }
386

            
387
4
        Ok(())
388
8
    })
389
2
}
390

            
391
#[traced_test]
392
#[test]
393
2
fn cache() -> Result<(), anyhow::Error> {
394
5
    MockRuntime::try_test_with_various(|runtime| async {
395
4
        let (_db_tmp_path, bdm, runtime, mock, bridge, sql_conn, ..) = setup(runtime);
396
4
        let mut events = bdm.events().fuse();
397

            
398
12
        let in_results = |wanted| in_results(&bdm, &bridge, wanted);
399

            
400
4
        eprintln!("----- test that a downloaded descriptor goes into the cache -----");
401

            
402
4
        bdm.set_bridges(std::slice::from_ref(&bridge));
403
8
        stream_drain_until(3, &mut events, || async { in_results(Some(Ok(()))) }).await;
404

            
405
4
        mock.expect_download_calls(1).await;
406

            
407
4
        sql_conn
408
4
            .query_row("SELECT * FROM BridgeDescs", [], |row| {
409
4
                let get_time =
410
8
                    |f| -> SystemTime { row.get_unwrap::<&str, OffsetDateTime>(f).into() };
411
4
                let bline: String = row.get_unwrap("bridge_line");
412
4
                let fetched: SystemTime = get_time("fetched");
413
4
                let until: SystemTime = get_time("until");
414
4
                let contents: String = row.get_unwrap("contents");
415
4
                let now = runtime.wallclock();
416
4
                assert_eq!(bline, bridge.to_string());
417
4
                assert!(fetched <= now);
418
4
                assert!(now < until);
419
4
                assert_eq!(contents, EXAMPLE_DESCRIPTOR);
420
4
                Ok(())
421
4
            })
422
4
            .unwrap();
423

            
424
4
        eprintln!("----- forget the descriptor and try to reload it from the cache -----");
425

            
426
4
        clear_and_re_request(&bdm, &mut events, &bridge).await;
427
8
        stream_drain_until(3, &mut events, || async { in_results(Some(Ok(()))) }).await;
428

            
429
        // Should not have been re-downloaded, since the fetch time is great.
430
4
        mock.expect_download_calls(0).await;
431

            
432
4
        eprintln!("----- corrupt the cache and check we re-download -----");
433

            
434
4
        sql_conn
435
4
            .execute_batch("UPDATE BridgeDescs SET contents = 'garbage'")
436
4
            .unwrap();
437

            
438
4
        clear_and_re_request(&bdm, &mut events, &bridge).await;
439
8
        stream_drain_until(3, &mut events, || async { in_results(Some(Ok(()))) }).await;
440

            
441
4
        mock.expect_download_calls(1).await;
442

            
443
4
        eprintln!("----- advance the lock and check that we do an if-modified-since -----");
444

            
445
4
        let published = bdm
446
4
            .bridges()
447
4
            .get(&bridge)
448
4
            .unwrap()
449
4
            .as_ref()
450
4
            .unwrap()
451
4
            .as_ref()
452
4
            .published();
453

            
454
4
        mock.mstate.lock().await.docs.insert(
455
            EXAMPLE_PORT,
456
4
            Ok(format!("{}{:?}", MOCK_NOT_MODIFIED, published)),
457
        );
458

            
459
        // Exceeds default max_refetch
460
4
        mock.sleep.advance(Duration::from_secs(20000));
461

            
462
4
        stream_drain_until(3, &mut events, || async {
463
4
            (mock.mstate.lock().await.download_calls > 0).then_some(())
464
8
        })
465
4
        .await;
466

            
467
4
        mock.expect_download_calls(1).await;
468

            
469
4
        Ok(())
470
8
    })
471
2
}
472

            
473
#[traced_test]
474
#[test]
475
2
fn dormant() -> Result<(), anyhow::Error> {
476
5
    MockRuntime::try_test_with_various(|runtime| async {
477
        #[allow(unused_variables)] // avoids churn and makes all of these identical
478
4
        let (db_tmp_path, bdm, runtime, mock, bridge, sql_conn, ..) = setup(runtime);
479
4
        let mut events = bdm.events().fuse();
480

            
481
        use Dormancy::*;
482

            
483
4
        eprintln!("----- become dormant, but request a bridge -----");
484
4
        bdm.set_dormancy(Dormant);
485
4
        bdm.set_bridges(std::slice::from_ref(&bridge));
486

            
487
        // Drive all tasks until we are idle
488
4
        runtime.progress_until_stalled().await;
489

            
490
4
        eprintln!("----- become active -----");
491
4
        bdm.set_dormancy(Active);
492
        // This should immediately trigger the download:
493

            
494
4
        stream_drain_until(3, &mut events, || async {
495
4
            in_results(&bdm, &bridge, Some(Ok(())))
496
8
        })
497
4
        .await;
498
4
        mock.expect_download_calls(1).await;
499

            
500
4
        Ok(())
501
8
    })
502
2
}
503

            
504
#[traced_test]
505
#[test]
506
2
fn process_doc() -> Result<(), anyhow::Error> {
507
5
    MockRuntime::try_test_with_various(|runtime| async {
508
        #[allow(unused_variables)] // avoids churn and makes all of these identical
509
4
        let (db_tmp_path, bdm, runtime, mock, bridge, sql_conn, ..) = setup(runtime);
510

            
511
4
        let text = EXAMPLE_DESCRIPTOR;
512
4
        let config = BridgeDescDownloadConfig::default();
513
4
        let valid = example_validity();
514

            
515
88
        let pr_t = |s: &str, t: SystemTime| {
516
88
            let now = runtime.wallclock();
517
88
            eprintln!(
518
                "                  {:10} {:?} {:10}",
519
                s,
520
                t,
521
88
                t.duration_since(UNIX_EPOCH).unwrap().as_secs_f64()
522
88
                    - now.duration_since(UNIX_EPOCH).unwrap().as_secs_f64(),
523
            );
524
88
        };
525

            
526
28
        let expecting_of = |text: &str, exp: Result<SystemTime, &str>| {
527
28
            let got = process_document(&runtime, &config, text);
528
28
            match exp {
529
16
                Ok(exp_refetch) => {
530
16
                    let refetch = got.unwrap().refetch;
531
16
                    pr_t("refetch", refetch);
532
16
                    assert_eq!(refetch, exp_refetch);
533
                }
534
12
                Err(exp_msg) => {
535
12
                    let msg = got.as_ref().expect_err(exp_msg).to_string();
536
12
                    assert!(
537
12
                        msg.contains(exp_msg),
538
                        "{:?} {:?} exp={:?}",
539
                        msg,
540
                        got,
541
                        exp_msg
542
                    );
543
                }
544
            }
545
28
        };
546

            
547
20
        let expecting_at = |now: SystemTime, exp| {
548
20
            mock.sleep.jump_wallclock(now);
549
20
            pr_t("now", now);
550
20
            pr_t("valid.0", valid.0);
551
20
            pr_t("valid.1", valid.1);
552
20
            if let Ok(exp) = exp {
553
12
                pr_t("expect", exp);
554
12
            }
555
20
            expecting_of(text, exp);
556
20
        };
557

            
558
4
        let secs = Duration::from_secs;
559

            
560
4
        eprintln!("----- good -----");
561
4
        expecting_of(text, Ok(runtime.wallclock() + config.max_refetch));
562

            
563
4
        eprintln!("----- modified under signature -----");
564
4
        expecting_of(
565
4
            &text.replace("\nbandwidth 10485760", "\nbandwidth 10485761"),
566
4
            Err("Signature check failed"),
567
4
        );
568

            
569
4
        eprintln!("----- doc not yet valid -----");
570
4
        expecting_at(
571
4
            valid.0 - secs(10),
572
4
            Err("Descriptor is outside its validity time"),
573
4
        );
574

            
575
4
        eprintln!("----- need to refetch due to doc validity expiring soon -----");
576
4
        expecting_at(valid.1 - secs(5000), Ok(valid.1 - secs(1000)));
577

            
578
4
        eprintln!("----- will refetch later than usual, due to min refetch interval -----");
579
4
        {
580
4
            let now = valid.1 - secs(4000); // would want to refetch at valid.1-1000 ie 30000
581
4
            expecting_at(now, Ok(now + config.min_refetch));
582
4
        }
583

            
584
4
        eprintln!("----- will refetch after doc validity ends, due to min refetch interval -----");
585
        {
586
4
            let now = valid.1 - secs(10);
587
4
            let exp = now + config.min_refetch;
588
4
            assert!(exp > valid.1);
589
4
            expecting_at(now, Ok(exp));
590
        }
591

            
592
4
        eprintln!("----- expired -----");
593
4
        expecting_at(
594
4
            valid.1 + secs(10),
595
4
            Err("Descriptor is outside its validity time"),
596
4
        );
597

            
598
        // TODO ideally we would test the unbounded case in process_download's
599
        // expiry time handling, but that would require making a document with unbounded
600
        // validity time.  Even if that is possible, I don't think we have code in-tree to
601
        // make signed test documents.
602

            
603
4
        Ok(())
604
8
    })
605
2
}