1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
#![deny(clippy::string_slice)] // See arti#2571
48
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
49

            
50
pub mod events;
51

            
52
use crate::events::{TorEvent, TorEventKind};
53
use async_broadcast::{InactiveReceiver, Receiver, Sender, TrySendError};
54
use futures::StreamExt;
55
use futures::channel::mpsc;
56
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
57
use futures::future::Either;
58
use std::pin::Pin;
59
use std::sync::OnceLock;
60
use std::sync::atomic::{AtomicUsize, Ordering};
61
use std::task::{Context, Poll};
62
use thiserror::Error;
63
use tracing::{error, warn};
64

            
65
/// Pointer to an `UnboundedSender`, used to send events into the `EventReactor`.
66
static EVENT_SENDER: OnceLock<UnboundedSender<TorEvent>> = OnceLock::new();
67
/// An inactive receiver for the currently active broadcast channel, if there is one.
68
static CURRENT_RECEIVER: OnceLock<InactiveReceiver<TorEvent>> = OnceLock::new();
69
/// The number of `TorEventKind`s there are.
70
const EVENT_KIND_COUNT: usize = 1;
71
/// An array containing one `AtomicUsize` for each `TorEventKind`, used to track subscriptions.
72
///
73
/// When a `TorEventReceiver` subscribes to a `TorEventKind`, it uses its `usize` value to index
74
/// into this array and increment the associated `AtomicUsize` (and decrements it to unsubscribe).
75
/// This lets event emitters check whether there are any subscribers, and avoid emitting events
76
/// if there aren't.
77
static EVENT_SUBSCRIBERS: [AtomicUsize; EVENT_KIND_COUNT] = [AtomicUsize::new(0); EVENT_KIND_COUNT];
78

            
79
/// The size of the internal broadcast channel used to implement event subscription.
80
pub static BROADCAST_CAPACITY: usize = 512;
81

            
82
/// A reactor used to forward events to make the event reporting system work.
83
///
84
/// # Note
85
///
86
/// Currently, this type is a singleton; there is one event reporting system used for the entire
87
/// program. This is not stable, and may change in future.
88
pub struct EventReactor {
89
    /// A receiver that the reactor uses to learn about incoming events.
90
    ///
91
    /// This is unbounded so that event publication doesn't have to be async.
92
    receiver: UnboundedReceiver<TorEvent>,
93
    /// A sender that the reactor uses to publish events.
94
    ///
95
    /// Events are only sent here if at least one subscriber currently wants them.
96
    broadcast: Sender<TorEvent>,
97
}
98

            
99
impl EventReactor {
100
    /// Initialize the event reporting system, returning a reactor that must be run for it to work,
101
    /// and a `TorEventReceiver` that can be used to extract events from the system. If the system
102
    /// has already been initialized, returns `None` instead of a reactor.
103
    ///
104
    /// # Warnings
105
    ///
106
    /// The returned reactor *must* be run with `EventReactor::run`, in a background async task.
107
    /// If it is not, the event system might consume unbounded amounts of memory.
108
8
    pub fn new() -> Option<Self> {
109
8
        let (tx, rx) = mpsc::unbounded();
110
8
        if EVENT_SENDER.set(tx).is_ok() {
111
2
            let (btx, brx) = async_broadcast::broadcast(BROADCAST_CAPACITY);
112
2
            CURRENT_RECEIVER
113
2
                .set(brx.deactivate())
114
2
                .expect("CURRENT_RECEIVER can't be set if EVENT_SENDER is unset!");
115
2
            Some(Self {
116
2
                receiver: rx,
117
2
                broadcast: btx,
118
2
            })
119
        } else {
120
6
            None
121
        }
122
8
    }
123
    /// Get a `TorEventReceiver` to receive events from, assuming an `EventReactor` is already
124
    /// running somewhere. (If it isn't, returns `None`.)
125
    ///
126
    /// As noted in the type-level documentation, this function might not always work this way.
127
8
    pub fn receiver() -> Option<TorEventReceiver> {
128
8
        CURRENT_RECEIVER
129
8
            .get()
130
12
            .map(|rx| TorEventReceiver::wrap(rx.clone()))
131
8
    }
132
    /// Run the event forwarding reactor.
133
    ///
134
    /// You *must* call this function once a reactor is created.
135
3
    pub async fn run(mut self) {
136
4
        while let Some(event) = self.receiver.next().await {
137
2
            match self.broadcast.try_broadcast(event) {
138
2
                Ok(_) => {}
139
                Err(TrySendError::Closed(_)) => break,
140
                Err(TrySendError::Full(event)) => {
141
                    // If the channel is full, do a blocking broadcast to wait for it to be
142
                    // not full, and log a warning about receivers lagging behind.
143
                    warn!("TorEventReceivers aren't receiving events fast enough!");
144
                    if self.broadcast.broadcast(event).await.is_err() {
145
                        break;
146
                    }
147
                }
148
                Err(TrySendError::Inactive(_)) => {
149
                    // no active receivers, so just drop the event on the floor.
150
                }
151
            }
152
        }
153
        // It shouldn't be possible to get here, since we have globals keeping the channels
154
        // open. Still, if we somehow do, log an error about it.
155
        error!("event reactor shutting down; this shouldn't ever happen");
156
    }
157
}
158

            
159
/// An error encountered when trying to receive a `TorEvent`.
160
#[derive(Clone, Debug, Error)]
161
#[non_exhaustive]
162
pub enum ReceiverError {
163
    /// The receiver isn't subscribed to anything, so wouldn't ever return any events.
164
    #[error("No event subscriptions")]
165
    NoSubscriptions,
166
    /// The internal broadcast channel was closed, which shouldn't ever happen.
167
    #[error("Internal event broadcast channel closed")]
168
    ChannelClosed,
169
}
170

            
171
/// A receiver for `TorEvent`s emitted by other users of this crate.
172
///
173
/// To use this type, first subscribe to some kinds of event by calling
174
/// `TorEventReceiver::subscribe`. Then, consume events using the implementation of
175
/// `futures::stream::Stream`.
176
///
177
/// # Warning
178
///
179
/// Once interest in events has been signalled with `subscribe`, events must be continuously
180
/// read from the receiver in order to avoid excessive memory consumption.
181
#[derive(Clone, Debug)]
182
pub struct TorEventReceiver {
183
    /// If no events have been subscribed to yet, this is an `InactiveReceiver`; otherwise,
184
    /// it's a `Receiver`.
185
    inner: Either<Receiver<TorEvent>, InactiveReceiver<TorEvent>>,
186
    /// Whether we're subscribed to each event kind (if `subscribed[kind]` is true, we're
187
    /// subscribed to `kind`).
188
    subscribed: [bool; EVENT_KIND_COUNT],
189
}
190

            
191
impl futures::stream::Stream for TorEventReceiver {
192
    type Item = TorEvent;
193

            
194
10
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195
10
        let this = self.get_mut();
196
10
        match this.inner {
197
8
            Either::Left(ref mut active) => loop {
198
8
                match Pin::new(&mut *active).poll_next(cx) {
199
2
                    Poll::Ready(Some(e)) => {
200
2
                        if this.subscribed[e.kind() as usize] {
201
2
                            return Poll::Ready(Some(e));
202
                        }
203
                        // loop, since we weren't subscribed to that event
204
                    }
205
6
                    x => return x,
206
                }
207
            },
208
            Either::Right(_) => {
209
2
                warn!("TorEventReceiver::poll_next() called without subscriptions!");
210
2
                Poll::Ready(None)
211
            }
212
        }
213
10
    }
214
}
215

            
216
impl TorEventReceiver {
217
    /// Create a `TorEventReceiver` from an `InactiveReceiver` handle.
218
8
    pub(crate) fn wrap(rx: InactiveReceiver<TorEvent>) -> Self {
219
8
        Self {
220
8
            inner: Either::Right(rx),
221
8
            subscribed: [false; EVENT_KIND_COUNT],
222
8
        }
223
8
    }
224
    /// Subscribe to a given kind of `TorEvent`.
225
    ///
226
    /// After calling this function, `TorEventReceiver::recv` will emit events of that kind.
227
    /// This function is idempotent (subscribing twice has the same effect as doing so once).
228
14
    pub fn subscribe(&mut self, kind: TorEventKind) {
229
14
        if !self.subscribed[kind as usize] {
230
10
            EVENT_SUBSCRIBERS[kind as usize].fetch_add(1, Ordering::SeqCst);
231
10
            self.subscribed[kind as usize] = true;
232
10
        }
233
        // FIXME(eta): cloning is ungood, but hard to avoid
234
14
        if let Either::Right(inactive) = self.inner.clone() {
235
10
            self.inner = Either::Left(inactive.activate());
236
10
        }
237
14
    }
238
    /// Unsubscribe from a given kind of `TorEvent`.
239
    ///
240
    /// After calling this function, `TorEventReceiver::recv` will no longer emit events of that
241
    /// kind.
242
    /// This function is idempotent (unsubscribing twice has the same effect as doing so once).
243
4
    pub fn unsubscribe(&mut self, kind: TorEventKind) {
244
4
        if self.subscribed[kind as usize] {
245
4
            EVENT_SUBSCRIBERS[kind as usize].fetch_sub(1, Ordering::SeqCst);
246
4
            self.subscribed[kind as usize] = false;
247
4
        }
248
        // If we're now not subscribed to anything, deactivate our channel.
249
6
        if self.subscribed.iter().all(|x| !*x) {
250
            // FIXME(eta): cloning is ungood, but hard to avoid
251
4
            if let Either::Left(active) = self.inner.clone() {
252
4
                self.inner = Either::Right(active.deactivate());
253
4
            }
254
        }
255
4
    }
256
}
257

            
258
impl Drop for TorEventReceiver {
259
8
    fn drop(&mut self) {
260
8
        for (i, subscribed) in self.subscribed.iter().enumerate() {
261
            // FIXME(eta): duplicates logic from Self::unsubscribe, because it's not possible
262
            //             to go from a `usize` to a `TorEventKind`
263
8
            if *subscribed {
264
6
                EVENT_SUBSCRIBERS[i].fetch_sub(1, Ordering::SeqCst);
265
6
            }
266
        }
267
8
    }
268
}
269

            
270
/// Returns a boolean indicating whether the event `kind` has any subscribers (as in,
271
/// whether `TorEventReceiver::subscribe` has been called with that event kind).
272
///
273
/// This is useful to avoid doing work to generate events that might be computationally expensive
274
/// to generate.
275
20
pub fn event_has_subscribers(kind: TorEventKind) -> bool {
276
20
    EVENT_SUBSCRIBERS[kind as usize].load(Ordering::SeqCst) > 0
277
20
}
278

            
279
/// Broadcast the given `TorEvent` to any interested subscribers.
280
///
281
/// As an optimization, does nothing if the event has no subscribers (`event_has_subscribers`
282
/// returns false). (also does nothing if the event subsystem hasn't been initialized yet)
283
///
284
/// This function isn't intended for use outside Arti crates (as in, library consumers of Arti
285
/// shouldn't broadcast events!).
286
4
pub fn broadcast(event: TorEvent) {
287
4
    if !event_has_subscribers(event.kind()) {
288
2
        return;
289
2
    }
290
2
    if let Some(sender) = EVENT_SENDER.get() {
291
2
        // If this fails, there isn't much we can really do about it!
292
2
        let _ = sender.unbounded_send(event);
293
2
    }
294
4
}
295

            
296
#[cfg(test)]
297
mod test {
298
    // @@ begin test lint list maintained by maint/add_warning @@
299
    #![allow(clippy::bool_assert_comparison)]
300
    #![allow(clippy::clone_on_copy)]
301
    #![allow(clippy::dbg_macro)]
302
    #![allow(clippy::mixed_attributes_style)]
303
    #![allow(clippy::print_stderr)]
304
    #![allow(clippy::print_stdout)]
305
    #![allow(clippy::single_char_pattern)]
306
    #![allow(clippy::unwrap_used)]
307
    #![allow(clippy::unchecked_time_subtraction)]
308
    #![allow(clippy::useless_vec)]
309
    #![allow(clippy::needless_pass_by_value)]
310
    #![allow(clippy::string_slice)] // See arti#2571
311
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
312
    use crate::{
313
        EventReactor, StreamExt, TorEvent, TorEventKind, broadcast, event_has_subscribers,
314
    };
315
    use std::sync::{Mutex, MutexGuard, OnceLock};
316
    use std::time::Duration;
317
    use tokio::runtime::Runtime;
318

            
319
    // HACK(eta): these tests need to run effectively singlethreaded, since they mutate global
320
    //            state. They *also* need to share the same tokio runtime, which the
321
    //            #[tokio::test] thing doesn't do (it makes a new runtime per test), because of
322
    //            the need to have a background singleton EventReactor.
323
    //
324
    //            To hack around this, we just have a global runtime protected by a mutex!
325
    static TEST_MUTEX: OnceLock<Mutex<Runtime>> = OnceLock::new();
326

            
327
    /// Locks the mutex, and makes sure the event reactor is initialized.
328
    fn test_setup() -> MutexGuard<'static, Runtime> {
329
        let mutex = TEST_MUTEX.get_or_init(|| Mutex::new(Runtime::new().unwrap()));
330
        let runtime = mutex
331
            .lock()
332
            .expect("mutex poisoned, probably by other failing tests");
333
        if let Some(reactor) = EventReactor::new() {
334
            runtime.handle().spawn(reactor.run());
335
        }
336
        runtime
337
    }
338

            
339
    #[test]
340
    fn subscriptions() {
341
        let rt = test_setup();
342

            
343
        rt.block_on(async move {
344
            // shouldn't have any subscribers at the start
345
            assert!(!event_has_subscribers(TorEventKind::Empty));
346

            
347
            let mut rx = EventReactor::receiver().unwrap();
348
            // creating a receiver shouldn't result in any subscriptions
349
            assert!(!event_has_subscribers(TorEventKind::Empty));
350

            
351
            rx.subscribe(TorEventKind::Empty);
352
            // subscription should work
353
            assert!(event_has_subscribers(TorEventKind::Empty));
354

            
355
            rx.unsubscribe(TorEventKind::Empty);
356
            // unsubscribing should work
357
            assert!(!event_has_subscribers(TorEventKind::Empty));
358

            
359
            // subscription should be idempotent
360
            rx.subscribe(TorEventKind::Empty);
361
            rx.subscribe(TorEventKind::Empty);
362
            rx.subscribe(TorEventKind::Empty);
363
            assert!(event_has_subscribers(TorEventKind::Empty));
364

            
365
            rx.unsubscribe(TorEventKind::Empty);
366
            assert!(!event_has_subscribers(TorEventKind::Empty));
367

            
368
            rx.subscribe(TorEventKind::Empty);
369
            assert!(event_has_subscribers(TorEventKind::Empty));
370

            
371
            std::mem::drop(rx);
372
            // dropping the receiver should auto-unsubscribe
373
            assert!(!event_has_subscribers(TorEventKind::Empty));
374
        });
375
    }
376

            
377
    #[test]
378
    fn empty_recv() {
379
        let rt = test_setup();
380

            
381
        rt.block_on(async move {
382
            let mut rx = EventReactor::receiver().unwrap();
383
            // attempting to read from a receiver with no subscriptions should return None
384
            let result = rx.next().await;
385
            assert!(result.is_none());
386
        });
387
    }
388

            
389
    #[test]
390
    fn receives_events() {
391
        let rt = test_setup();
392

            
393
        rt.block_on(async move {
394
            let mut rx = EventReactor::receiver().unwrap();
395
            rx.subscribe(TorEventKind::Empty);
396
            // HACK(eta): give the event reactor time to run
397
            tokio::time::sleep(Duration::from_millis(100)).await;
398
            broadcast(TorEvent::Empty);
399

            
400
            let result = rx.next().await;
401
            assert_eq!(result, Some(TorEvent::Empty));
402
        });
403
    }
404

            
405
    #[test]
406
    fn does_not_send_to_no_subscribers() {
407
        let rt = test_setup();
408

            
409
        rt.block_on(async move {
410
            // this event should just get dropped on the floor, because no subscribers exist
411
            broadcast(TorEvent::Empty);
412

            
413
            let mut rx = EventReactor::receiver().unwrap();
414
            rx.subscribe(TorEventKind::Empty);
415

            
416
            // this shouldn't have an event to receive now
417
            let result = tokio::time::timeout(Duration::from_millis(100), rx.next()).await;
418
            assert!(result.is_err());
419
        });
420
    }
421
}