1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
#![deny(clippy::string_slice)] // See arti#2571
48
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
49

            
50
use async_trait::async_trait;
51
use futures::{
52
    StreamExt as _,
53
    task::{Spawn, SpawnError},
54
};
55
use postage::watch;
56
use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc, sync::Mutex, time::Duration};
57
use tor_rtcompat::SpawnExt as _;
58

            
59
mod err;
60
mod reactor;
61

            
62
pub mod http;
63

            
64
pub use err::{Rejection, UploadError};
65

            
66
/// An object that can upload documents of a given type to targets of a given type.
67
///
68
/// See type and method documentation for details on how to implement this type correctly.
69
#[async_trait]
70
pub trait Uploader: Send + Sync + 'static {
71
    /// The type of document we are uploading.
72
    ///
73
    /// Typically, this will be `str` or `[u8]`,
74
    /// but other types are possible.
75
    ///
76
    /// We pass this around in an [`Arc`],
77
    /// so it is allowed to be quite large.
78
    type Doc: ?Sized;
79

            
80
    /// A single target to which we're uploading a document.
81
    ///
82
    /// For a simple HTTP(S) upload this could be a `Vec` of addresses.
83
    ///
84
    /// In a Tor context, it could be a ChanTarget or a CircTarget.
85
    ///
86
    /// We pass this around in an [`Arc`],
87
    /// so the size doesn't much matter.
88
    ///
89
    /// We require that this type implements Eq and Hash,
90
    /// so that we can tell when a target has changed.
91
    type Target: ?Sized;
92

            
93
    /// Try to upload `document` to `target`.
94
    ///
95
    /// Return Ok(()) on success; return an error on failure.
96
    ///
97
    /// If it is possible for the target to reject a document,
98
    /// this method must return [`UploadError::Rejected`]
99
    /// in that case.
100
    ///
101
    /// If it is possible for the target to say
102
    /// "I am overloaded, come back later",
103
    /// the implementor must return [`UploadError::Deferred`]
104
    /// in that case.
105
    ///
106
    /// It is the implementor's responsibility to provide:
107
    /// - Timeout behavior, if desired.
108
    /// - [Happy-eyeballs] address selection, if desired.
109
    ///
110
    /// [Happy-eyeballs]: https://en.wikipedia.org/wiki/Happy_Eyeballs
111
    async fn upload(
112
        self: Arc<Self>,
113
        target: Arc<Self::Target>,
114
        document: Arc<Self::Doc>,
115
    ) -> Result<(), UploadError>;
116
}
117

            
118
/// A handle to a publisher object that manages uploading a document to a set of targets.
119
///
120
/// See the [crate documentation](crate) for more information on this type and how to use it.
121
pub struct Publisher<D, T>
122
where
123
    T: Hash + Eq + Send + Sync + Debug + 'static + ?Sized,
124
    D: Send + Sync + 'static + ?Sized,
125
{
126
    /// A sender that we use to tell the reactor what actions to take.
127
    directive: Mutex<watch::Sender<PublishDirective<D, T>>>,
128

            
129
    /// A receiver to tell us about publication progress.
130
    status: watch::Receiver<PublishStatus>,
131
}
132

            
133
impl<D, T> Publisher<D, T>
134
where
135
    T: Hash + Eq + Send + Sync + Debug + 'static + ?Sized,
136
    D: Send + Sync + 'static + ?Sized,
137
{
138
    /// Create and launch a new [`Publisher`] to deliver `initial_document` to `initial_targets`.
139
    ///
140
    /// `description` should be a string describing what we're publishing, for the benefit of logs.
141
    ///
142
    /// (This method launches a background task.)
143
8
    pub fn launch<R, UP>(
144
8
        runtime: &R,
145
8
        description: String,
146
8
        initial_document: Option<Arc<D>>,
147
8
        initial_targets: HashSet<Arc<T>>,
148
8
        initial_retry_delay: Duration,
149
8
        uploader: Arc<UP>,
150
8
    ) -> Result<Arc<Self>, SpawnError>
151
8
    where
152
8
        UP: Uploader<Doc = D, Target = T>,
153
8
        R: tor_rtcompat::SleepProvider + Spawn,
154
    {
155
8
        let n_targets = initial_targets.len();
156
8
        let action = PublishDirective::new(initial_document, initial_targets);
157
8
        let status = PublishStatus::new(action.document.version, n_targets);
158

            
159
8
        let (action, action_rcv) = watch::channel_with(action);
160
8
        let (status_snd, status) = watch::channel_with(status);
161
8
        let action = Mutex::new(action);
162

            
163
8
        let reactor = reactor::PublishReactor::new(
164
8
            runtime.clone(),
165
8
            description,
166
8
            action_rcv,
167
8
            status_snd,
168
8
            initial_retry_delay,
169
8
            uploader,
170
        );
171

            
172
8
        runtime.spawn(reactor.run())?;
173

            
174
8
        Ok(Arc::new(Self {
175
8
            directive: action,
176
8
            status,
177
8
        }))
178
8
    }
179

            
180
    /// Change the current document and publish something else instead.
181
    ///
182
    /// - Any currently in-flight attempts to publish the old document will be allowed to finish,
183
    ///   but we will not wait for them before launching attempts to publish the new one.
184
    /// - If any target has rejected or accepted the old document,
185
    ///   we will try sending it the new one.
186
    ///
187
    /// If `reset_failing_targets` is true, then any targets that are currently waiting before they retry
188
    /// will be told to retry immediately.
189
8
    pub fn set_document(&self, new_document: Option<Arc<D>>, reset_failing_targets: bool) {
190
8
        let mut action_guard = self.directive.lock().expect("poisoned lock");
191
8
        let mut action = action_guard.borrow_mut();
192
8
        let version = action.document.version.next();
193
8
        action.document = Document {
194
8
            contents: new_document,
195
8
            version,
196
8
        };
197
8
        if reset_failing_targets {
198
            action.reset_failures_count += 1;
199
8
        }
200
8
    }
201

            
202
    /// Reset the failure counters and timeouts for all targets that are currently failing.
203
    ///
204
    /// Ordinarily, once a target has failed, we wait a while before we try it again.
205
    /// Calling this function makes the next attempt happen right away.
206
    pub fn reset_failing_targets(&self) {
207
        let mut action_guard = self.directive.lock().expect("poisoned lock");
208
        let mut action = action_guard.borrow_mut();
209
        action.reset_failures_count += 1;
210
    }
211

            
212
    /// Change the current set of targets by calling `modify` on it.
213
    ///
214
    /// If targets are added, upload attempts will be launched for them.
215
    ///
216
    /// If targets are removed, then any in-flight attempts to upload to them will be allowed to finish,
217
    /// but no further attempts will be launched.
218
    ///
219
    /// (As a consequence, if the set of targets is cleared completely,
220
    /// then all in-flight attempts will be allowed to finish, and no further attempts will be made.)
221
8
    pub fn adjust_targets<F>(&self, modify: F)
222
8
    where
223
8
        F: FnOnce(&mut HashSet<Arc<T>>),
224
    {
225
8
        let mut action_guard = self.directive.lock().expect("poisoned lock");
226
8
        let mut action = action_guard.borrow_mut();
227
8
        modify(&mut action.targets);
228
8
    }
229

            
230
    /// Tell the underlying reactor to stop.
231
    ///
232
    /// All inflight attempts to upload will be halted immediately.  This [`Publisher`] object will no
233
    /// longer be usable.
234
    ///
235
    /// This method will return right away.
236
    pub fn stop(&self) {
237
        let mut action_guard = self.directive.lock().expect("poisoned lock");
238
        let mut action = action_guard.borrow_mut();
239
        action.shutdown = true;
240
    }
241

            
242
    /// Tell the underlying reactor to stop, and wait for it to shut down.
243
    ///
244
    /// All inflight attempts to upload will be halted immediately.
245
    /// This [`Publisher`] object will no longer be usable.
246
    ///
247
    /// This method will wait for the underlying reactor task to report that it has exited.
248
    pub async fn shutdown(&self) {
249
        self.stop();
250
        let mut status = self.status.clone();
251
        while status.next().await.is_some() {}
252
    }
253

            
254
    /// Return the current document that we are trying to publish.
255
    pub fn document(&self) -> Option<Arc<D>> {
256
        self.directive
257
            .lock()
258
            .expect("poisoned lock")
259
            .borrow()
260
            .document
261
            .contents
262
            .clone()
263
    }
264

            
265
    /// Return the current targets to which we are trying to publish.
266
    pub fn targets(&self) -> HashSet<Arc<T>> {
267
        self.directive
268
            .lock()
269
            .expect("poisoned lock")
270
            .borrow()
271
            .targets
272
            .clone()
273
    }
274

            
275
    /// Return a [`Stream`](futures::Stream) of [`PublishStatus`] objects
276
    /// representing changes to this publisher's status.
277
    ///
278
    /// Intermediate states may be omitted if the state changes more frequently
279
    /// than this stream is polled.
280
    pub fn watch_status(&self) -> impl futures::Stream<Item = PublishStatus> {
281
        self.status.clone()
282
    }
283

            
284
    /// Return a [`Stream`](futures::Stream) of [`PublishStatus`] objects
285
    /// representing changes to this publisher's status with respect to the current
286
    /// document.
287
    ///
288
    /// Intermediate states may be omitted if the state changes more frequently
289
    /// than this stream is polled.
290
8
    pub fn watch_current_document_status(&self) -> impl futures::Stream<Item = PublishStatus> {
291
        use futures::future::ready;
292
8
        let cur_doc_version = self
293
8
            .directive
294
8
            .lock()
295
8
            .expect("Lock poisoned")
296
8
            .borrow()
297
8
            .document
298
8
            .version;
299

            
300
8
        self.status
301
8
            .clone()
302
            // This combination of take_while and filter is a little subtle!
303
            // The "take_while" causes the stream to be done (and return None) whenever the
304
            // status publisher is talking about a _later_ version of the document.
305
            // The "filter" discards all the values from the stream for which cur_doc_version
306
            // is _less_ than the current version.
307
            //
308
            // It might be nice to have a single tor-async-utils implementation for this
309
            // kind of thing, if we find that we're using it regularly.
310
20
            .take_while(move |s| ready(s.document_version <= cur_doc_version))
311
20
            .filter(move |s| ready(s.document_version == cur_doc_version))
312
8
    }
313

            
314
    /// Return this publisher's current [`PublishStatus`].
315
26
    pub fn status(&self) -> PublishStatus {
316
26
        self.status.borrow().clone()
317
26
    }
318
}
319

            
320
/// A description of the current operation that the [`Publisher`] is telling
321
/// the [`PublishReactor`](reactor::PublishReactor) to perform.
322
///
323
/// We use [`postage::watch`] to share changes in this object.
324
#[derive(educe::Educe, Debug)]
325
#[educe(Clone)]
326
struct PublishDirective<D: ?Sized, T: Hash + Eq + ?Sized> {
327
    /// If true, the reactor should shut down right away.
328
    shutdown: bool,
329

            
330
    /// The current document we're trying to publish, and its associated version number.
331
    document: Document<D>,
332

            
333
    /// A set of targets to which we want to publish.
334
    targets: HashSet<Arc<T>>,
335

            
336
    /// A counter that we increment whenever we want to reset
337
    /// the failure status for every target.
338
    ///
339
    /// Whenever the reactor sees that this value has changed,
340
    /// it marks every target as ready to try uploading again.
341
    reset_failures_count: usize,
342
}
343

            
344
impl<D: ?Sized, T: Hash + Eq + ?Sized> PublishDirective<D, T> {
345
    /// Construct a new [`PublishDirective`].
346
8
    fn new(document: Option<Arc<D>>, targets: HashSet<Arc<T>>) -> Self {
347
8
        Self {
348
8
            shutdown: false,
349
8
            document: Document {
350
8
                version: DocVersion(0.into()),
351
8
                contents: document,
352
8
            },
353
8
            targets,
354
8
            reset_failures_count: 0,
355
8
        }
356
8
    }
357
}
358

            
359
/// The version of a document.
360
///
361
/// (We use versions rather than Eq on documents, since they are allowed to be quite large.)
362
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
363
struct DocVersion(
364
    // This is sensitive because we may want to use it for hsdesc uploads.
365
    safelog::Sensitive<u64>,
366
);
367

            
368
impl DocVersion {
369
    /// Return the next document version in sequence.
370
8
    fn next(&self) -> Self {
371
8
        let n = (*self.0) + 1;
372
8
        Self(n.into())
373
8
    }
374
}
375

            
376
/// A document we're trying to publish.
377
#[derive(educe::Educe, Debug)]
378
#[educe(Clone)]
379
struct Document<D: ?Sized> {
380
    /// The version of this document.
381
    ///
382
    /// Versions are scoped to a single [`Publisher`].
383
    version: DocVersion,
384

            
385
    /// The document itself.
386
    ///
387
    /// This may be None to indicate that we have nothing to publish at present.
388
    contents: Option<Arc<D>>,
389
}
390

            
391
/// The current status of a [`Publisher`]'s attempt to publish the current document.
392
//
393
// This information is as reported by the [`PublishReactor`](reactor::PublishReactor)
394
// to the [`Publisher`].
395
//
396
// We use [`postage::watch`] to share changes in this object.
397
#[derive(Clone, Debug, Eq, PartialEq)]
398
pub struct PublishStatus {
399
    /// The version of the document that we're trying to upload.
400
    ///
401
    /// All the counters in this struct are with respect to _this_ version of the document.
402
    document_version: DocVersion,
403

            
404
    /// The number of targets we are configured to publish to.
405
    n_targets: usize,
406

            
407
    /// The number of targets that have acknowledged that there is no document to publish.
408
    n_inert: usize,
409

            
410
    /// The number of targets we have successfully published to.
411
    n_published: usize,
412

            
413
    /// The number of targets that rejected this document.
414
    n_rejected: usize,
415

            
416
    /// The number of targets that have failed in some non-retriable way.
417
    n_failed_permanently: usize,
418

            
419
    /// The number of targets for which we have encountered at least one retriable failure,
420
    /// and are still trying to upload to.
421
    n_failing: usize,
422

            
423
    /// The number of targets that we are trying to upload the document to for the first time.
424
    n_pending: usize,
425

            
426
    /// True if the reactor has begun running.
427
    initialized: bool,
428

            
429
    /// True if the reactor has shut down.
430
    shutdown: bool,
431
}
432

            
433
// TODO: Right now the accessors for this struct are fairly coarse.
434
// We may want to provide better ones.
435
impl PublishStatus {
436
    /// Construct a new PublishStatus.
437
8
    fn new(document_version: DocVersion, n_targets: usize) -> Self {
438
8
        Self {
439
8
            document_version,
440
8
            n_targets,
441
8
            n_inert: 0,
442
8
            n_published: 0,
443
8
            n_rejected: 0,
444
8
            n_failed_permanently: 0,
445
8
            n_failing: 0,
446
8
            n_pending: 0,
447
8
            initialized: false,
448
8
            shutdown: false,
449
8
        }
450
8
    }
451

            
452
    /// Return true if there is any activity in progress, according to this status.
453
    ///
454
    /// This function returns true if we are uploading to any target,
455
    /// or waiting to upload to any target.
456
38
    pub fn is_active(&self) -> bool {
457
38
        if self.shutdown {
458
            return false;
459
38
        }
460
38
        if !self.initialized {
461
4
            return true;
462
34
        }
463

            
464
34
        self.n_failing > 0 || self.n_pending > 0
465
38
    }
466
}
467

            
468
impl std::fmt::Display for PublishStatus {
469
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470
        let Self {
471
            document_version,
472
            n_targets,
473
            n_inert,
474
            n_published,
475
            n_rejected,
476
            n_failed_permanently,
477
            n_failing,
478
            n_pending,
479
            initialized,
480
            shutdown,
481
        } = self;
482
        let n = n_targets;
483
        let status = if !*initialized {
484
            "not initialized"
485
        } else if *shutdown {
486
            "shut down"
487
        } else if self.is_active() {
488
            "in progress"
489
        } else if self.n_inert == self.n_targets {
490
            "paused"
491
        } else if self.n_published == self.n_targets {
492
            "successful"
493
        } else if self.n_published == 0 {
494
            "failed"
495
        } else {
496
            "partially successful"
497
        };
498
        let version = document_version.0;
499

            
500
        write!(
501
            f,
502
            "Document {version} upload {status}. Of {n} upload targets",
503
        )?;
504

            
505
        let mut w = |n, s| {
506
            if n != 0 {
507
                write!(f, ", {n} {s}")
508
            } else {
509
                Ok(())
510
            }
511
        };
512

            
513
        w(*n_inert, "are paused")?;
514
        w(*n_published, "have succeeded")?;
515
        w(*n_rejected, "have rejected the document")?;
516
        w(*n_failed_permanently, "have failed non-retriably")?;
517
        w(*n_failing, "are failing")?;
518
        w(*n_pending, "are pending")?;
519
        Ok(())
520
    }
521
}
522

            
523
#[cfg(test)]
524
mod test {
525
    // @@ begin test lint list maintained by maint/add_warning @@
526
    #![allow(clippy::bool_assert_comparison)]
527
    #![allow(clippy::clone_on_copy)]
528
    #![allow(clippy::dbg_macro)]
529
    #![allow(clippy::mixed_attributes_style)]
530
    #![allow(clippy::print_stderr)]
531
    #![allow(clippy::print_stdout)]
532
    #![allow(clippy::single_char_pattern)]
533
    #![allow(clippy::unwrap_used)]
534
    #![allow(clippy::unchecked_time_subtraction)]
535
    #![allow(clippy::useless_vec)]
536
    #![allow(clippy::needless_pass_by_value)]
537
    #![allow(clippy::string_slice)] // See arti#2571
538
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
539
    use super::*;
540
    use std::collections::HashMap;
541
    use tor_rtmock::MockRuntime;
542

            
543
    /// State for a single target in our tests.
544
    #[derive(Clone, Debug, Default)]
545
    struct TState {
546
        should_reject: bool,
547
        should_fail: u8,
548
        #[allow(clippy::rc_buffer)]
549
        document: Option<Arc<String>>,
550
    }
551

            
552
    struct TestUploader {
553
        state: Arc<Mutex<HashMap<u32, TState>>>,
554
    }
555

            
556
    #[async_trait]
557
    impl Uploader for TestUploader {
558
        type Doc = String;
559
        type Target = u32;
560
        async fn upload(
561
            self: Arc<Self>,
562
            target: Arc<u32>,
563
            document: Arc<String>,
564
        ) -> Result<(), UploadError> {
565
            let mut map = self.state.lock().unwrap();
566
            let entry: &mut TState = map.entry(*target).or_default();
567
            if entry.should_reject {
568
                Err(UploadError::Rejected(Rejection::from_message(
569
                    "document refused".into(),
570
                )))
571
            } else if entry.should_fail > 0 {
572
                entry.should_fail -= 1;
573
                Err(UploadError::Timeout) // This is a pretend error, but it'll work fine.
574
            } else {
575
                entry.document = Some(document);
576
                Ok(())
577
            }
578
        }
579
    }
580

            
581
    #[test]
582
    fn successful_upload() {
583
        MockRuntime::test_with_various(|rt| async move {
584
            let state = Arc::new(Mutex::new(HashMap::new()));
585
            let uploader = TestUploader {
586
                state: Arc::clone(&state),
587
            };
588

            
589
            let targets = [1, 2, 3].into_iter().map(Arc::new).collect();
590

            
591
            let publisher = Publisher::launch(
592
                &rt,
593
                "Testing".into(),
594
                None,
595
                targets,
596
                Duration::new(1, 0),
597
                Arc::new(uploader),
598
            )
599
            .unwrap();
600

            
601
            // Kick off an initial upload.
602
            publisher.set_document(Some(Arc::new("hello world".into())), false);
603
            let mut status = publisher.watch_current_document_status();
604
            while let Some(s) = status.next().await {
605
                if !s.is_active() {
606
                    break;
607
                }
608
            }
609

            
610
            assert_eq!(state.lock().unwrap().len(), 3);
611
            for n in 1..=3 {
612
                let map = state.lock().unwrap();
613
                assert_eq!(
614
                    map.get(&n).unwrap().document,
615
                    Some(Arc::new("hello world".into()))
616
                );
617
            }
618

            
619
            // Add a target 4.
620
            publisher.adjust_targets(|targets| {
621
                targets.insert(Arc::new(4));
622
            });
623
            while let Some(s) = status.next().await {
624
                if !s.is_active() {
625
                    break;
626
                }
627
            }
628
            assert_eq!(
629
                state.lock().unwrap().get(&4).unwrap().document,
630
                Some(Arc::new("hello world".into()))
631
            );
632

            
633
            // Drop target 1, then replace the document.
634
            publisher.adjust_targets(|targets| {
635
                targets.remove(&1);
636
            });
637
            publisher.set_document(Some(Arc::new("HELLO WORLD".into())), false);
638

            
639
            let mut status = publisher.watch_current_document_status();
640
            while let Some(s) = status.next().await {
641
                if !s.is_active() {
642
                    break;
643
                }
644
            }
645

            
646
            for n in 1..=4 {
647
                let map = state.lock().unwrap();
648
                let s = if n == 1 { "hello world" } else { "HELLO WORLD" };
649
                assert_eq!(map.get(&n).unwrap().document, Some(Arc::new(s.into())));
650
            }
651
        });
652
    }
653

            
654
    #[test]
655
    fn test_with_retries() {
656
        MockRuntime::test_with_various(|rt| async move {
657
            let state = Arc::new(Mutex::new(HashMap::new()));
658
            let uploader = TestUploader {
659
                state: Arc::clone(&state),
660
            };
661

            
662
            let targets = [1, 2, 3].into_iter().map(Arc::new).collect();
663
            for t in 1..=3 {
664
                state.lock().unwrap().insert(
665
                    t,
666
                    TState {
667
                        should_reject: false,
668
                        should_fail: t as u8,
669
                        document: None,
670
                    },
671
                );
672
            }
673

            
674
            let publisher = Publisher::launch(
675
                &rt,
676
                "Testing".into(),
677
                Some(Arc::new("hello world".into())),
678
                targets,
679
                Duration::new(1, 0),
680
                Arc::new(uploader),
681
            )
682
            .unwrap();
683

            
684
            while publisher.status().is_active() {
685
                rt.advance_by(Duration::new(1, 0)).await;
686
            }
687

            
688
            for n in 1..=3 {
689
                let map = state.lock().unwrap();
690
                let e = map.get(&n).unwrap();
691
                assert_eq!(e.document, Some(Arc::new("hello world".into())));
692
                assert_eq!(e.should_fail, 0);
693
            }
694
        });
695
    }
696
}