1
//! Implement background tasks used by guard managers.
2
//!
3
//! These background tasks keep a weak reference to the [`GuardMgrInner`]
4
//! and use that to notice when they should shut down.
5

            
6
use crate::GuardMgrInner;
7
use crate::pending::{GuardStatus, RequestId};
8

            
9
use futures::{channel::mpsc, stream::StreamExt};
10
#[cfg(test)]
11
use oneshot_fused_workaround as oneshot;
12
use tor_proto::ClockSkew;
13
use tracing::instrument;
14

            
15
use std::sync::{Mutex, Weak};
16

            
17
/// A message sent by to the [`report_status_events()`] task.
18
#[derive(Debug)]
19
pub(crate) enum Msg {
20
    /// A message sent by a [`GuardMonitor`](crate::GuardMonitor) to
21
    /// report the status of an attempt to use a guard.
22
    Status(RequestId, GuardStatus, Option<ClockSkew>),
23
    /// Tells the task to reply on the provided oneshot::Sender once
24
    /// it has seen this message.  Used to indicate that the message
25
    /// queue is flushed.
26
    #[cfg(test)]
27
    Ping(oneshot::Sender<()>),
28
}
29

            
30
/// Background task: wait for messages about guard statuses, and
31
/// tell a guard manager about them.  Runs indefinitely.
32
///
33
/// Takes the [`GuardMgrInner`] by weak reference; if the guard
34
/// manager goes away, then this task exits.
35
///
36
/// Requires a `mpsc::Receiver` that is used to tell the task about
37
/// new status events to wait for.
38
#[instrument(skip_all, level = "trace")]
39
702
pub(crate) async fn report_status_events(
40
702
    runtime: impl tor_rtcompat::SleepProvider,
41
702
    inner: Weak<Mutex<GuardMgrInner>>,
42
702
    mut events: mpsc::UnboundedReceiver<Msg>,
43
702
) {
44
    loop {
45
        match events.next().await {
46
            Some(Msg::Status(id, status, skew)) => {
47
                // We've got a report about a guard status.
48
                if let Some(inner) = inner.upgrade() {
49
                    let mut inner = inner.lock().expect("Poisoned lock");
50
                    inner.handle_msg(id, status, skew, &runtime);
51
                } else {
52
                    // The guard manager has gone away.
53
                    return;
54
                }
55
            }
56
            #[cfg(test)]
57
            Some(Msg::Ping(sender)) => {
58
                let _ignore = sender.send(());
59
            }
60
            // The streams have all closed.  (I think this is impossible?)
61
            None => return,
62
        }
63
        // TODO: Is this task guaranteed to exit?
64
    }
65
192
}
66

            
67
/// Background task to run periodic events on the guard manager.
68
///
69
/// The only role of this task is to invoke
70
/// [`GuardMgrInner::run_periodic_events`] from time to time, so that
71
/// it can perform housekeeping tasks.
72
///
73
/// Takes the [`GuardMgrInner`] by weak reference; if the guard
74
/// manager goes away, then this task exits.
75
#[instrument(skip_all, level = "trace")]
76
702
pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
77
702
    runtime: R,
78
702
    inner: Weak<Mutex<GuardMgrInner>>,
79
702
) {
80
    loop {
81
        let delay = if let Some(inner) = inner.upgrade() {
82
            let mut inner = inner.lock().expect("Poisoned lock");
83
            let wallclock = runtime.wallclock();
84
            let now = runtime.now();
85
            inner.run_periodic_events(wallclock, now)
86
        } else {
87
            // The guard manager has gone away.
88
            return;
89
        };
90
        runtime.sleep(delay).await;
91
    }
92
114
}
93

            
94
/// Background task to keep a guard manager up-to-date with a given network
95
/// directory provider.
96
#[instrument(skip_all, level = "trace")]
97
630
pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
98
630
    runtime: RT,
99
630
    inner: Weak<Mutex<GuardMgrInner>>,
100
630
    netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
101
630
) {
102
    use tor_netdir::DirEvent;
103

            
104
118
    let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
105
        Some(s) => s,
106
        None => return,
107
    };
108

            
109
    while let Some(event) = event_stream.next().await {
110
        match event {
111
            DirEvent::NewConsensus | DirEvent::NewDescriptors => {
112
                if let Some(inner) = inner.upgrade() {
113
                    let mut inner = inner.lock().expect("Poisoned lock");
114
                    inner.update(runtime.wallclock(), runtime.now());
115
                } else {
116
                    return;
117
                }
118
            }
119
            _ => {}
120
        }
121
    }
122
164
}
123

            
124
/// Background task to keep a guard manager up-to-date with a given bridge
125
/// descriptor provider.
126
#[cfg(feature = "bridge-client")]
127
#[instrument(level = "trace", skip_all)]
128
pub(crate) async fn keep_bridge_descs_updated<RT: tor_rtcompat::Runtime>(
129
    runtime: RT,
130
    inner: Weak<Mutex<GuardMgrInner>>,
131
    bridge_desc_provider: Weak<dyn crate::bridge::BridgeDescProvider>,
132
) {
133
    use crate::bridge::BridgeDescEvent as E;
134
    let mut event_stream = match bridge_desc_provider.upgrade().map(|p| p.events()) {
135
        Some(s) => s,
136
        None => return,
137
    };
138

            
139
    while let Some(event) = event_stream.next().await {
140
        match event {
141
            E::SomethingChanged => {
142
                if let Some(inner) = inner.upgrade() {
143
                    let mut inner = inner.lock().expect("Poisoned lock");
144
                    inner.update(runtime.wallclock(), runtime.now());
145
                } else {
146
                    return;
147
                }
148
            }
149
        }
150
    }
151
}