1
//! Code for exporting events from the channel manager.
2
#![allow(dead_code, unreachable_pub)]
3

            
4
use educe::Educe;
5
use futures::{Stream, StreamExt};
6
use postage::watch;
7
use std::fmt;
8
use tor_basic_utils::skip_fmt;
9
use web_time_compat::{Duration, Instant, InstantExt};
10

            
11
/// The status of our connection to the internet.
12
#[derive(Default, Debug, Clone)]
13
pub struct ConnStatus {
14
    /// Have we been able to make TCP connections?
15
    ///
16
    /// True if we've been able to make outgoing connections recently.
17
    /// False if we've definitely been failing.
18
    /// None if we haven't succeeded yet, but it's too early to say if
19
    /// that's a problem.
20
    online: Option<bool>,
21

            
22
    /// Have we ever been able to make TLS handshakes and negotiate
23
    /// certificates, _not including timeliness checking_?
24
    ///
25
    /// True if we've been able to make TLS handshakes and talk to Tor relays we
26
    /// like recently. False if we've definitely been failing. None if we
27
    /// haven't succeeded yet, but it's too early to say if that's a problem.
28
    auth_works: Option<bool>,
29

            
30
    /// Have we been able to successfully negotiate full Tor handshakes?
31
    ///
32
    /// True if we've been able to make Tor handshakes recently.
33
    /// False if we've definitely been failing.
34
    /// None if we haven't succeeded yet, but it's too early to say if
35
    /// that's a problem.
36
    handshake_works: Option<bool>,
37
}
38

            
39
/// A problem detected while connecting to the Tor network.
40
#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
41
#[non_exhaustive]
42
pub enum ConnBlockage {
43
    #[display("unable to connect to the internet")]
44
    /// We haven't been able to make successful TCP connections.
45
    NoTcp,
46
    /// We've made TCP connections, but our TLS connections either failed, or
47
    /// got hit by an attempted man-in-the-middle attack.
48
    #[display("our internet connection seems to be filtered")]
49
    NoHandshake,
50
    /// We've made TCP connections, and our TLS connections mostly succeeded,
51
    /// but we encountered failures that are well explained by clock skew,
52
    /// or expired certificates.
53
    #[display("relays all seem to be using expired certificates")]
54
    CertsExpired,
55
}
56

            
57
impl ConnStatus {
58
    /// Return true if this status is equal to `other`.
59
    ///
60
    /// Note:(This would just be a PartialEq implementation, but I'm not sure I
61
    /// want to expose that PartialEq for this struct.)
62
40
    fn eq(&self, other: &ConnStatus) -> bool {
63
40
        self.online == other.online && self.handshake_works == other.handshake_works
64
40
    }
65

            
66
    /// Return true if this status indicates that we can successfully open Tor channels.
67
10
    pub fn usable(&self) -> bool {
68
10
        self.online == Some(true) && self.handshake_works == Some(true)
69
10
    }
70

            
71
    /// Return a float representing "how bootstrapped" we are with respect to
72
    /// connecting to the Tor network, where 0 is "not at all" and 1 is
73
    /// "successful".
74
    ///
75
    /// Callers _should not_ depend on the specific meaning of any particular
76
    /// fraction; we may change these fractions in the future.
77
842
    pub fn frac(&self) -> f32 {
78
6
        match self {
79
            Self {
80
                online: Some(true),
81
                auth_works: Some(true),
82
                handshake_works: Some(true),
83
4
            } => 1.0,
84
            Self {
85
                online: Some(true), ..
86
4
            } => 0.5,
87
834
            _ => 0.0,
88
        }
89
842
    }
90

            
91
    /// Return the cause of why we aren't able to connect to the Tor network,
92
    /// if we think we're stuck.
93
842
    pub fn blockage(&self) -> Option<ConnBlockage> {
94
834
        match self {
95
            Self {
96
                online: Some(false),
97
                ..
98
4
            } => Some(ConnBlockage::NoTcp),
99
            Self {
100
                auth_works: Some(false),
101
                ..
102
4
            } => Some(ConnBlockage::NoHandshake),
103
            Self {
104
                handshake_works: Some(false),
105
                ..
106
            } => Some(ConnBlockage::CertsExpired),
107
834
            _ => None,
108
        }
109
842
    }
110
}
111

            
112
impl fmt::Display for ConnStatus {
113
838
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114
2
        match self {
115
830
            ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
116
            ConnStatus {
117
                online: Some(false),
118
                ..
119
2
            } => write!(f, "unable to connect to the internet"),
120
            ConnStatus {
121
                handshake_works: None,
122
                ..
123
2
            } => write!(f, "handshaking with Tor relays"),
124
            ConnStatus {
125
                auth_works: Some(true),
126
                handshake_works: Some(false),
127
                ..
128
            } => write!(
129
                f,
130
                "unable to handshake with Tor relays, possibly due to clock skew"
131
            ),
132
            ConnStatus {
133
                handshake_works: Some(false),
134
                ..
135
2
            } => write!(f, "unable to handshake with Tor relays"),
136
            ConnStatus {
137
                online: Some(true),
138
                handshake_works: Some(true),
139
                ..
140
2
            } => write!(f, "connecting successfully"),
141
        }
142
838
    }
143
}
144

            
145
/// A stream of [`ConnStatus`] events describing changes in our connected-ness.
146
///
147
/// This stream is lossy; a reader might not see some events on the stream, if
148
/// they are produced faster than the reader can consume.  In that case, the
149
/// reader will see more recent updates, and miss older ones.
150
///
151
/// Note that the bootstrap status is not monotonic: we might become less
152
/// bootstrapped than we were before.  (For example, the internet could go
153
/// down.)
154
#[derive(Clone, Educe)]
155
#[educe(Debug)]
156
pub struct ConnStatusEvents {
157
    /// The receiver that implements this stream.
158
    ///
159
    /// (We wrap it in a new type here so that we can replace the implementation
160
    /// later on if we need to.)
161
    #[educe(Debug(method = "skip_fmt"))]
162
    inner: watch::Receiver<ConnStatus>,
163
}
164

            
165
impl Stream for ConnStatusEvents {
166
    type Item = ConnStatus;
167
1012
    fn poll_next(
168
1012
        mut self: std::pin::Pin<&mut Self>,
169
1012
        cx: &mut std::task::Context<'_>,
170
1012
    ) -> std::task::Poll<Option<Self::Item>> {
171
1012
        self.inner.poll_next_unpin(cx)
172
1012
    }
173
}
174

            
175
/// Crate-internal view of "how connected are we to the internet?"
176
///
177
/// This is a more complex and costly structure than ConnStatus, so we track
178
/// this here, and only expose the minimum via ConnStatus over a
179
/// `postage::watch`.  Later, we might want to expose more of this information.
180
//
181
// TODO: Eventually we should add some ability to reset our bootstrap status, if
182
// our connections start failing.
183
#[derive(Debug, Clone)]
184
struct ChanMgrStatus {
185
    /// When did we first get initialized?
186
    startup: Instant,
187

            
188
    /// Since we started, how many channels have we tried to build?
189
    n_attempts: usize,
190

            
191
    /// When (if ever) have we made a TCP connection to (what we hoped was) a
192
    /// Tor relay?
193
    ///
194
    /// If we don't reach this point, we're probably not on the internet.
195
    ///
196
    /// If we get no further than this, we're probably having our TCP
197
    /// connections captured or replaced.
198
    last_tcp_success: Option<Instant>,
199

            
200
    /// When (if ever) have we successfully finished a TLS handshake to (what we
201
    /// hoped was) a Tor relay?
202
    ///
203
    /// If we get no further than this, we might be facing a TLS MITM attack.
204
    //
205
    // TODO: We don't actually use this information yet: our output doesn't
206
    // distinguish filtering where TLS succeeds but gets MITM'd from filtering
207
    // where TLS fails.
208
    last_tls_success: Option<Instant>,
209

            
210
    /// When (if ever) have we ever finished the inner Tor handshake with a relay,
211
    /// up to the point where we check for certificate timeliness?
212
    last_chan_auth_success: Option<Instant>,
213

            
214
    /// When (if ever) have we successfully finished the inner Tor handshake
215
    /// with a relay?
216
    ///
217
    /// If we get to this point, we can successfully talk to something that
218
    /// holds the private key that it's supposed to.
219
    last_chan_success: Option<Instant>,
220
}
221

            
222
impl ChanMgrStatus {
223
    /// Construct a new ChanMgr status.
224
    ///
225
    /// It will be built as having been initialized at the time `now`.
226
1398
    fn new_at(now: Instant) -> ChanMgrStatus {
227
1398
        ChanMgrStatus {
228
1398
            startup: now,
229
1398
            n_attempts: 0,
230
1398
            last_tcp_success: None,
231
1398
            last_tls_success: None,
232
1398
            last_chan_auth_success: None,
233
1398
            last_chan_success: None,
234
1398
        }
235
1398
    }
236

            
237
    /// Return a [`ConnStatus`] for the current state, at time `now`.
238
    ///
239
    /// (The time is necessary because a lack of success doesn't indicate a
240
    /// problem until enough time has passed.)
241
30
    fn conn_status_at(&self, now: Instant) -> ConnStatus {
242
        /// How long do we need to be online before we'll acknowledge failure?
243
        const MIN_DURATION: Duration = Duration::from_secs(60);
244
        /// How many attempts do we need to launch before we'll acknowledge failure?
245
        const MIN_ATTEMPTS: usize = 6;
246

            
247
        // If set, it's too early to determine failure.
248
30
        let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
249

            
250
30
        let online = match (self.last_tcp_success.is_some(), early) {
251
18
            (true, _) => Some(true),
252
10
            (_, true) => None,
253
2
            (false, false) => Some(false),
254
        };
255

            
256
30
        let auth_works = match (self.last_chan_auth_success.is_some(), early) {
257
6
            (true, _) => Some(true),
258
20
            (_, true) => None,
259
4
            (false, false) => Some(false),
260
        };
261

            
262
30
        let handshake_works = match (self.last_chan_success.is_some(), early) {
263
6
            (true, _) => Some(true),
264
20
            (_, true) => None,
265
4
            (false, false) => Some(false),
266
        };
267

            
268
30
        ConnStatus {
269
30
            online,
270
30
            auth_works,
271
30
            handshake_works,
272
30
        }
273
30
    }
274

            
275
    /// Note that an attempt to connect has been started.
276
24
    fn record_attempt(&mut self) {
277
24
        self.n_attempts += 1;
278
24
    }
279

            
280
    /// Note that we've successfully done a TCP handshake with an alleged relay.
281
6
    fn record_tcp_success(&mut self, now: Instant) {
282
6
        self.last_tcp_success = Some(now);
283
6
    }
284

            
285
    /// Note that we've completed a TLS handshake with an alleged relay.
286
    ///
287
    /// (Its identity won't be verified till the next step.)
288
4
    fn record_tls_finished(&mut self, now: Instant) {
289
4
        self.last_tls_success = Some(now);
290
4
    }
291

            
292
    /// Note that we've completed a Tor handshake with a relay, _but failed to
293
    /// verify the certificates in a way that could indicate clock skew_.
294
    fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) {
295
        self.last_chan_auth_success = Some(now);
296
    }
297

            
298
    /// Note that we've completed a Tor handshake with a relay.
299
    ///
300
    /// (This includes performing the TLS handshake, and verifying that the
301
    /// relay was indeed the one that we wanted to reach.)
302
6
    fn record_handshake_done(&mut self, now: Instant) {
303
6
        self.last_chan_auth_success = Some(now);
304
6
        self.last_chan_success = Some(now);
305
6
    }
306
}
307

            
308
/// Object that manages information about a `ChanMgr`'s status, and sends
309
/// information about connectivity changes over an asynchronous channel
310
pub(crate) struct ChanMgrEventSender {
311
    /// The last ConnStatus that we sent over the channel.
312
    last_conn_status: ConnStatus,
313
    /// The unsummarized status information from the ChanMgr.
314
    mgr_status: ChanMgrStatus,
315
    /// The channel that we use for sending ConnStatus information.
316
    sender: watch::Sender<ConnStatus>,
317
}
318

            
319
impl ChanMgrEventSender {
320
    /// If the status has changed as of `now`, tell any listeners.
321
    ///
322
    /// (This takes a time because we need to know how much time has elapsed
323
    /// without successful attempts.)
324
    ///
325
    /// # Limitations
326
    ///
327
    /// We are dependent on calls to `record_attempt()` and similar methods to
328
    /// actually invoke this function; if they were never called, we'd never
329
    /// notice that we had gone too long without building connections.  That's
330
    /// okay for now, though, since any Tor client will immediately start
331
    /// building circuits, which will launch connection attempts until one
332
    /// succeeds or the client gives up entirely.  
333
16
    fn push_at(&mut self, now: Instant) {
334
16
        let status = self.mgr_status.conn_status_at(now);
335
16
        if !status.eq(&self.last_conn_status) {
336
8
            self.last_conn_status = status.clone();
337
8
            let mut b = self.sender.borrow_mut();
338
8
            *b = status;
339
8
        }
340
16
    }
341

            
342
    /// Note that an attempt to connect has been started.
343
4
    pub(crate) fn record_attempt(&mut self) {
344
4
        self.mgr_status.record_attempt();
345
4
        self.push_at(Instant::get());
346
4
    }
347

            
348
    /// Note that we've successfully done a TCP handshake with an alleged relay.
349
4
    pub(crate) fn record_tcp_success(&mut self) {
350
4
        let now = Instant::get();
351
4
        self.mgr_status.record_tcp_success(now);
352
4
        self.push_at(now);
353
4
    }
354

            
355
    /// Note that we've completed a TLS handshake with an alleged relay.
356
    ///
357
    /// (Its identity won't be verified till the next step.)
358
4
    pub(crate) fn record_tls_finished(&mut self) {
359
4
        let now = Instant::get();
360
4
        self.mgr_status.record_tls_finished(now);
361
4
        self.push_at(now);
362
4
    }
363

            
364
    /// Record that a handshake has succeeded _except for the certificate
365
    /// timeliness check, which may indicate a skewed clock.
366
    pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) {
367
        let now = Instant::get();
368
        self.mgr_status.record_handshake_done_with_skewed_clock(now);
369
        self.push_at(now);
370
    }
371

            
372
    /// Note that we've completed a Tor handshake with a relay.
373
    ///
374
    /// (This includes performing the TLS handshake, and verifying that the
375
    /// relay was indeed the one that we wanted to reach.)
376
4
    pub(crate) fn record_handshake_done(&mut self) {
377
4
        let now = Instant::get();
378
4
        self.mgr_status.record_handshake_done(now);
379
4
        self.push_at(now);
380
4
    }
381
}
382

            
383
/// Create a new channel for sending connectivity status events to other crates.
384
1396
pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
385
1396
    let (sender, receiver) = watch::channel();
386
1396
    let receiver = ConnStatusEvents { inner: receiver };
387
1396
    let sender = ChanMgrEventSender {
388
1396
        last_conn_status: ConnStatus::default(),
389
1396
        mgr_status: ChanMgrStatus::new_at(Instant::get()),
390
1396
        sender,
391
1396
    };
392
1396
    (sender, receiver)
393
1396
}
394

            
395
#[cfg(test)]
396
#[allow(clippy::cognitive_complexity)]
397
mod test {
398
    // @@ begin test lint list maintained by maint/add_warning @@
399
    #![allow(clippy::bool_assert_comparison)]
400
    #![allow(clippy::clone_on_copy)]
401
    #![allow(clippy::dbg_macro)]
402
    #![allow(clippy::mixed_attributes_style)]
403
    #![allow(clippy::print_stderr)]
404
    #![allow(clippy::print_stdout)]
405
    #![allow(clippy::single_char_pattern)]
406
    #![allow(clippy::unwrap_used)]
407
    #![allow(clippy::unchecked_time_subtraction)]
408
    #![allow(clippy::useless_vec)]
409
    #![allow(clippy::needless_pass_by_value)]
410
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
411
    use super::*;
412
    use float_eq::assert_float_eq;
413

            
414
    /// Tolerance for float comparison.
415
    const TOL: f32 = 0.00001;
416

            
417
    #[test]
418
    fn status_basics() {
419
        let s1 = ConnStatus::default();
420
        assert_eq!(s1.to_string(), "connecting to the internet");
421
        assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
422
        assert!(s1.eq(&s1));
423
        assert!(s1.blockage().is_none());
424
        assert!(!s1.usable());
425

            
426
        let s2 = ConnStatus {
427
            online: Some(false),
428
            auth_works: None,
429
            handshake_works: None,
430
        };
431
        assert_eq!(s2.to_string(), "unable to connect to the internet");
432
        assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
433
        assert!(s2.eq(&s2));
434
        assert!(!s2.eq(&s1));
435
        assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
436
        assert_eq!(
437
            s2.blockage().unwrap().to_string(),
438
            "unable to connect to the internet"
439
        );
440
        assert!(!s2.usable());
441

            
442
        let s3 = ConnStatus {
443
            online: Some(true),
444
            auth_works: None,
445
            handshake_works: None,
446
        };
447
        assert_eq!(s3.to_string(), "handshaking with Tor relays");
448
        assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
449
        assert_eq!(s3.blockage(), None);
450
        assert!(!s3.eq(&s1));
451
        assert!(!s3.usable());
452

            
453
        let s4 = ConnStatus {
454
            online: Some(true),
455
            auth_works: Some(false),
456
            handshake_works: Some(false),
457
        };
458
        assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
459
        assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
460
        assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
461
        assert_eq!(
462
            s4.blockage().unwrap().to_string(),
463
            "our internet connection seems to be filtered"
464
        );
465
        assert!(!s4.eq(&s1));
466
        assert!(!s4.eq(&s2));
467
        assert!(!s4.eq(&s3));
468
        assert!(s4.eq(&s4));
469
        assert!(!s4.usable());
470

            
471
        let s5 = ConnStatus {
472
            online: Some(true),
473
            auth_works: Some(true),
474
            handshake_works: Some(true),
475
        };
476
        assert_eq!(s5.to_string(), "connecting successfully");
477
        assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
478
        assert!(s5.blockage().is_none());
479
        assert!(s5.eq(&s5));
480
        assert!(!s5.eq(&s4));
481
        assert!(s5.usable());
482
    }
483

            
484
    #[test]
485
    fn derive_status() {
486
        let start = Instant::get();
487
        let sec = Duration::from_secs(1);
488
        let hour = Duration::from_secs(3600);
489

            
490
        let mut ms = ChanMgrStatus::new_at(start);
491

            
492
        // when we start, we're unable to reach any conclusions.
493
        let s0 = ms.conn_status_at(start);
494
        assert!(s0.online.is_none());
495
        assert!(s0.handshake_works.is_none());
496

            
497
        // Time won't let us make conclusions either, unless there have been
498
        // attempts.
499
        let s = ms.conn_status_at(start + hour);
500
        assert!(s.eq(&s0));
501

            
502
        // But if there have been attempts, _and_ time has passed, we notice
503
        // failure.
504
        for _ in 0..10 {
505
            ms.record_attempt();
506
        }
507
        // (Not immediately...)
508
        let s = ms.conn_status_at(start);
509
        assert!(s.eq(&s0));
510
        // (... but after a while.)
511
        let s = ms.conn_status_at(start + hour);
512
        assert_eq!(s.online, Some(false));
513
        assert_eq!(s.handshake_works, Some(false));
514

            
515
        // If TCP has succeeded, we should notice that.
516
        ms.record_tcp_success(start + sec);
517
        let s = ms.conn_status_at(start + sec * 2);
518
        assert_eq!(s.online, Some(true));
519
        assert!(s.handshake_works.is_none());
520
        let s = ms.conn_status_at(start + hour);
521
        assert_eq!(s.online, Some(true));
522
        assert_eq!(s.handshake_works, Some(false));
523

            
524
        // If the handshake succeeded, we can notice that too.
525
        ms.record_handshake_done(start + sec * 2);
526
        let s = ms.conn_status_at(start + sec * 3);
527
        assert_eq!(s.online, Some(true));
528
        assert_eq!(s.handshake_works, Some(true));
529
    }
530

            
531
    #[test]
532
    fn sender() {
533
        let (mut snd, rcv) = channel();
534

            
535
        {
536
            let s = rcv.inner.borrow().clone();
537
            assert_float_eq!(s.frac(), 0.0, abs <= TOL);
538
        }
539

            
540
        snd.record_attempt();
541
        snd.record_tcp_success();
542
        snd.record_tls_finished();
543
        snd.record_handshake_done();
544

            
545
        {
546
            let s = rcv.inner.borrow().clone();
547
            assert_float_eq!(s.frac(), 1.0, abs <= TOL);
548
        }
549
    }
550
}