1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_time_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
#![allow(clippy::collapsible_if)] // See arti#2342
46
#![deny(clippy::unused_async)]
47
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48

            
49
use async_trait::async_trait;
50
use futures::{
51
    StreamExt as _,
52
    task::{Spawn, SpawnError},
53
};
54
use postage::watch;
55
use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc, sync::Mutex, time::Duration};
56
use tor_rtcompat::SpawnExt as _;
57

            
58
mod err;
59
mod reactor;
60

            
61
pub mod http;
62

            
63
pub use err::{Rejection, UploadError};
64

            
65
/// An object that can upload documents of a given type to targets of a given type.
66
///
67
/// See type and method documentation for details on how to implement this type correctly.
68
#[async_trait]
69
pub trait Uploader: Send + Sync + 'static {
70
    /// The type of document we are uploading.
71
    ///
72
    /// Typically, this will be `str` or `[u8]`,
73
    /// but other types are possible.
74
    ///
75
    /// We pass this around in an [`Arc`],
76
    /// so it is allowed to be quite large.
77
    type Doc: ?Sized;
78

            
79
    /// A single target to which we're uploading a document.
80
    ///
81
    /// For a simple HTTP(S) upload this could be a `Vec` of addresses.
82
    ///
83
    /// In a Tor context, it could be a ChanTarget or a CircTarget.
84
    ///
85
    /// We pass this around in an [`Arc`],
86
    /// so the size doesn't much matter.
87
    ///
88
    /// We require that this type implements Eq and Hash,
89
    /// so that we can tell when a target has changed.
90
    type Target: ?Sized;
91

            
92
    /// Try to upload `document` to `target`.
93
    ///
94
    /// Return Ok(()) on success; return an error on failure.
95
    ///
96
    /// If it is possible for the target to reject a document,
97
    /// this method must return [`UploadError::Rejected`]
98
    /// in that case.
99
    ///
100
    /// If it is possible for the target to say
101
    /// "I am overloaded, come back later",
102
    /// the implementor must return [`UploadError::Deferred`]
103
    /// in that case.
104
    ///
105
    /// It is the implementor's responsibility to provide:
106
    /// - Timeout behavior, if desired.
107
    /// - [Happy-eyeballs] address selection, if desired.
108
    ///
109
    /// [Happy-eyeballs]: https://en.wikipedia.org/wiki/Happy_Eyeballs
110
    async fn upload(
111
        self: Arc<Self>,
112
        target: Arc<Self::Target>,
113
        document: Arc<Self::Doc>,
114
    ) -> Result<(), UploadError>;
115
}
116

            
117
/// A handle to a publisher object that manages uploading a document to a set of targets.
118
///
119
/// See the [crate documentation](crate) for more information on this type and how to use it.
120
pub struct Publisher<D, T>
121
where
122
    T: Hash + Eq + Send + Sync + Debug + 'static + ?Sized,
123
    D: Send + Sync + 'static + ?Sized,
124
{
125
    /// A sender that we use to tell the reactor what actions to take.
126
    directive: Mutex<watch::Sender<PublishDirective<D, T>>>,
127

            
128
    /// A receiver to tell us about publication progress.
129
    status: watch::Receiver<PublishStatus>,
130
}
131

            
132
impl<D, T> Publisher<D, T>
133
where
134
    T: Hash + Eq + Send + Sync + Debug + 'static + ?Sized,
135
    D: Send + Sync + 'static + ?Sized,
136
{
137
    /// Create and launch a new [`Publisher`] to deliver `initial_document` to `initial_targets`.
138
    ///
139
    /// `description` should be a string describing what we're publishing, for the benefit of logs.
140
    ///
141
    /// (This method launches a background task.)
142
8
    pub fn launch<R, UP>(
143
8
        runtime: &R,
144
8
        description: String,
145
8
        initial_document: Option<Arc<D>>,
146
8
        initial_targets: HashSet<Arc<T>>,
147
8
        initial_retry_delay: Duration,
148
8
        uploader: Arc<UP>,
149
8
    ) -> Result<Arc<Self>, SpawnError>
150
8
    where
151
8
        UP: Uploader<Doc = D, Target = T>,
152
8
        R: tor_rtcompat::SleepProvider + Spawn,
153
    {
154
8
        let n_targets = initial_targets.len();
155
8
        let action = PublishDirective::new(initial_document, initial_targets);
156
8
        let status = PublishStatus::new(action.document.version, n_targets);
157

            
158
8
        let (action, action_rcv) = watch::channel_with(action);
159
8
        let (status_snd, status) = watch::channel_with(status);
160
8
        let action = Mutex::new(action);
161

            
162
8
        let reactor = reactor::PublishReactor::new(
163
8
            runtime.clone(),
164
8
            description,
165
8
            action_rcv,
166
8
            status_snd,
167
8
            initial_retry_delay,
168
8
            uploader,
169
        );
170

            
171
8
        runtime.spawn(reactor.run())?;
172

            
173
8
        Ok(Arc::new(Self {
174
8
            directive: action,
175
8
            status,
176
8
        }))
177
8
    }
178

            
179
    /// Change the current document and publish something else instead.
180
    ///
181
    /// - Any currently in-flight attempts to publish the old document will be allowed to finish,
182
    ///   but we will not wait for them before launching attempts to publish the new one.
183
    /// - If any target has rejected or accepted the old document,
184
    ///   we will try sending it the new one.
185
    ///
186
    /// If `reset_failing_targets` is true, then any targets that are currently waiting before they retry
187
    /// will be told to retry immediately.
188
8
    pub fn set_document(&self, new_document: Option<Arc<D>>, reset_failing_targets: bool) {
189
8
        let mut action_guard = self.directive.lock().expect("poisoned lock");
190
8
        let mut action = action_guard.borrow_mut();
191
8
        let version = action.document.version.next();
192
8
        action.document = Document {
193
8
            contents: new_document,
194
8
            version,
195
8
        };
196
8
        if reset_failing_targets {
197
            action.reset_failures_count += 1;
198
8
        }
199
8
    }
200

            
201
    /// Reset the failure counters and timeouts for all targets that are currently failing.
202
    ///
203
    /// Ordinarily, once a target has failed, we wait a while before we try it again.
204
    /// Calling this function makes the next attempt happen right away.
205
    pub fn reset_failing_targets(&self) {
206
        let mut action_guard = self.directive.lock().expect("poisoned lock");
207
        let mut action = action_guard.borrow_mut();
208
        action.reset_failures_count += 1;
209
    }
210

            
211
    /// Change the current set of targets by calling `modify` on it.
212
    ///
213
    /// If targets are added, upload attempts will be launched for them.
214
    ///
215
    /// If targets are removed, then any in-flight attempts to upload to them will be allowed to finish,
216
    /// but no further attempts will be launched.
217
    ///
218
    /// (As a consequence, if the set of targets is cleared completely,
219
    /// then all in-flight attempts will be allowed to finish, and no further attempts will be made.)
220
8
    pub fn adjust_targets<F>(&self, modify: F)
221
8
    where
222
8
        F: FnOnce(&mut HashSet<Arc<T>>),
223
    {
224
8
        let mut action_guard = self.directive.lock().expect("poisoned lock");
225
8
        let mut action = action_guard.borrow_mut();
226
8
        modify(&mut action.targets);
227
8
    }
228

            
229
    /// Tell the underlying reactor to stop.
230
    ///
231
    /// All inflight attempts to upload will be halted immediately.  This [`Publisher`] object will no
232
    /// longer be usable.
233
    ///
234
    /// This method will return right away.
235
    pub fn stop(&self) {
236
        let mut action_guard = self.directive.lock().expect("poisoned lock");
237
        let mut action = action_guard.borrow_mut();
238
        action.shutdown = true;
239
    }
240

            
241
    /// Tell the underlying reactor to stop, and wait for it to shut down.
242
    ///
243
    /// All inflight attempts to upload will be halted immediately.
244
    /// This [`Publisher`] object will no longer be usable.
245
    ///
246
    /// This method will wait for the underlying reactor task to report that it has exited.
247
    pub async fn shutdown(&self) {
248
        self.stop();
249
        let mut status = self.status.clone();
250
        while status.next().await.is_some() {}
251
    }
252

            
253
    /// Return the current document that we are trying to publish.
254
    pub fn document(&self) -> Option<Arc<D>> {
255
        self.directive
256
            .lock()
257
            .expect("poisoned lock")
258
            .borrow()
259
            .document
260
            .contents
261
            .clone()
262
    }
263

            
264
    /// Return the current targets to which we are trying to publish.
265
    pub fn targets(&self) -> HashSet<Arc<T>> {
266
        self.directive
267
            .lock()
268
            .expect("poisoned lock")
269
            .borrow()
270
            .targets
271
            .clone()
272
    }
273

            
274
    /// Return a [`Stream`](futures::Stream) of [`PublishStatus`] objects
275
    /// representing changes to this publisher's status.
276
    ///
277
    /// Intermediate states may be omitted if the state changes more frequently
278
    /// than this stream is polled.
279
    pub fn watch_status(&self) -> impl futures::Stream<Item = PublishStatus> {
280
        self.status.clone()
281
    }
282

            
283
    /// Return a [`Stream`](futures::Stream) of [`PublishStatus`] objects
284
    /// representing changes to this publisher's status with respect to the current
285
    /// document.
286
    ///
287
    /// Intermediate states may be omitted if the state changes more frequently
288
    /// than this stream is polled.
289
8
    pub fn watch_current_document_status(&self) -> impl futures::Stream<Item = PublishStatus> {
290
        use futures::future::ready;
291
8
        let cur_doc_version = self
292
8
            .directive
293
8
            .lock()
294
8
            .expect("Lock poisoned")
295
8
            .borrow()
296
8
            .document
297
8
            .version;
298

            
299
8
        self.status
300
8
            .clone()
301
            // This combination of take_while and filter is a little subtle!
302
            // The "take_while" causes the stream to be done (and return None) whenever the
303
            // status publisher is talking about a _later_ version of the document.
304
            // The "filter" discards all the values from the stream for which cur_doc_version
305
            // is _less_ than the current version.
306
            //
307
            // It might be nice to have a single tor-async-utils implementation for this
308
            // kind of thing, if we find that we're using it regularly.
309
20
            .take_while(move |s| ready(s.document_version <= cur_doc_version))
310
20
            .filter(move |s| ready(s.document_version == cur_doc_version))
311
8
    }
312

            
313
    /// Return this publisher's current [`PublishStatus`].
314
28
    pub fn status(&self) -> PublishStatus {
315
28
        self.status.borrow().clone()
316
28
    }
317
}
318

            
319
/// A description of the current operation that the [`Publisher`] is telling
320
/// the [`PublishReactor`](reactor::PublishReactor) to perform.
321
///
322
/// We use [`postage::watch`] to share changes in this object.
323
#[derive(educe::Educe, Debug)]
324
#[educe(Clone)]
325
struct PublishDirective<D: ?Sized, T: Hash + Eq + ?Sized> {
326
    /// If true, the reactor should shut down right away.
327
    shutdown: bool,
328

            
329
    /// The current document we're trying to publish, and its associated version number.
330
    document: Document<D>,
331

            
332
    /// A set of targets to which we want to publish.
333
    targets: HashSet<Arc<T>>,
334

            
335
    /// A counter that we increment whenever we want to reset
336
    /// the failure status for every target.
337
    ///
338
    /// Whenever the reactor sees that this value has changed,
339
    /// it marks every target as ready to try uploading again.
340
    reset_failures_count: usize,
341
}
342

            
343
impl<D: ?Sized, T: Hash + Eq + ?Sized> PublishDirective<D, T> {
344
    /// Construct a new [`PublishDirective`].
345
8
    fn new(document: Option<Arc<D>>, targets: HashSet<Arc<T>>) -> Self {
346
8
        Self {
347
8
            shutdown: false,
348
8
            document: Document {
349
8
                version: DocVersion(0.into()),
350
8
                contents: document,
351
8
            },
352
8
            targets,
353
8
            reset_failures_count: 0,
354
8
        }
355
8
    }
356
}
357

            
358
/// The version of a document.
359
///
360
/// (We use versions rather than Eq on documents, since they are allowed to be quite large.)
361
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
362
struct DocVersion(
363
    // This is sensitive because we may want to use it for hsdesc uploads.
364
    safelog::Sensitive<u64>,
365
);
366

            
367
impl DocVersion {
368
    /// Return the next document version in sequence.
369
8
    fn next(&self) -> Self {
370
8
        let n = (*self.0) + 1;
371
8
        Self(n.into())
372
8
    }
373
}
374

            
375
/// A document we're trying to publish.
376
#[derive(educe::Educe, Debug)]
377
#[educe(Clone)]
378
struct Document<D: ?Sized> {
379
    /// The version of this document.
380
    ///
381
    /// Versions are scoped to a single [`Publisher`].
382
    version: DocVersion,
383

            
384
    /// The document itself.
385
    ///
386
    /// This may be None to indicate that we have nothing to publish at present.
387
    contents: Option<Arc<D>>,
388
}
389

            
390
/// The current status of a [`Publisher`]'s attempt to publish the current document.
391
//
392
// This information is as reported by the [`PublishReactor`](reactor::PublishReactor)
393
// to the [`Publisher`].
394
//
395
// We use [`postage::watch`] to share changes in this object.
396
#[derive(Clone, Debug, Eq, PartialEq)]
397
pub struct PublishStatus {
398
    /// The version of the document that we're trying to upload.
399
    ///
400
    /// All the counters in this struct are with respect to _this_ version of the document.
401
    document_version: DocVersion,
402

            
403
    /// The number of targets we are configured to publish to.
404
    n_targets: usize,
405

            
406
    /// The number of targets that have acknowledged that there is no document to publish.
407
    n_inert: usize,
408

            
409
    /// The number of targets we have successfully published to.
410
    n_published: usize,
411

            
412
    /// The number of targets that rejected this document.
413
    n_rejected: usize,
414

            
415
    /// The number of targets that have failed in some non-retriable way.
416
    n_failed_permanently: usize,
417

            
418
    /// The number of targets for which we have encountered at least one retriable failure,
419
    /// and are still trying to upload to.
420
    n_failing: usize,
421

            
422
    /// The number of targets that we are trying to upload the document to for the first time.
423
    n_pending: usize,
424

            
425
    /// True if the reactor has begun running.
426
    initialized: bool,
427

            
428
    /// True if the reactor has shut down.
429
    shutdown: bool,
430
}
431

            
432
// TODO: Right now the accessors for this struct are fairly coarse.
433
// We may want to provide better ones.
434
impl PublishStatus {
435
    /// Construct a new PublishStatus.
436
8
    fn new(document_version: DocVersion, n_targets: usize) -> Self {
437
8
        Self {
438
8
            document_version,
439
8
            n_targets,
440
8
            n_inert: 0,
441
8
            n_published: 0,
442
8
            n_rejected: 0,
443
8
            n_failed_permanently: 0,
444
8
            n_failing: 0,
445
8
            n_pending: 0,
446
8
            initialized: false,
447
8
            shutdown: false,
448
8
        }
449
8
    }
450

            
451
    /// Return true if there is any activity in progress, according to this status.
452
    ///
453
    /// This function returns true if we are uploading to any target,
454
    /// or waiting to upload to any target.
455
40
    pub fn is_active(&self) -> bool {
456
40
        if self.shutdown {
457
            return false;
458
40
        }
459
40
        if !self.initialized {
460
4
            return true;
461
36
        }
462

            
463
36
        self.n_failing > 0 || self.n_pending > 0
464
40
    }
465
}
466

            
467
impl std::fmt::Display for PublishStatus {
468
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469
        let Self {
470
            document_version,
471
            n_targets,
472
            n_inert,
473
            n_published,
474
            n_rejected,
475
            n_failed_permanently,
476
            n_failing,
477
            n_pending,
478
            initialized,
479
            shutdown,
480
        } = self;
481
        let n = n_targets;
482
        let status = if !*initialized {
483
            "not initialized"
484
        } else if *shutdown {
485
            "shut down"
486
        } else if self.is_active() {
487
            "in progress"
488
        } else if self.n_inert == self.n_targets {
489
            "paused"
490
        } else if self.n_published == self.n_targets {
491
            "successful"
492
        } else if self.n_published == 0 {
493
            "failed"
494
        } else {
495
            "partially successful"
496
        };
497
        let version = document_version.0;
498

            
499
        write!(
500
            f,
501
            "Document {version} upload {status}. Of {n} upload targets",
502
        )?;
503

            
504
        let mut w = |n, s| {
505
            if n != 0 {
506
                write!(f, ", {n} {s}")
507
            } else {
508
                Ok(())
509
            }
510
        };
511

            
512
        w(*n_inert, "are paused")?;
513
        w(*n_published, "have succeeded")?;
514
        w(*n_rejected, "have rejected the document")?;
515
        w(*n_failed_permanently, "have failed non-retriably")?;
516
        w(*n_failing, "are failing")?;
517
        w(*n_pending, "are pending")?;
518
        Ok(())
519
    }
520
}
521

            
522
#[cfg(test)]
523
mod test {
524
    // @@ begin test lint list maintained by maint/add_warning @@
525
    #![allow(clippy::bool_assert_comparison)]
526
    #![allow(clippy::clone_on_copy)]
527
    #![allow(clippy::dbg_macro)]
528
    #![allow(clippy::mixed_attributes_style)]
529
    #![allow(clippy::print_stderr)]
530
    #![allow(clippy::print_stdout)]
531
    #![allow(clippy::single_char_pattern)]
532
    #![allow(clippy::unwrap_used)]
533
    #![allow(clippy::unchecked_time_subtraction)]
534
    #![allow(clippy::useless_vec)]
535
    #![allow(clippy::needless_pass_by_value)]
536
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
537
    use super::*;
538
    use std::collections::HashMap;
539
    use tor_rtmock::MockRuntime;
540

            
541
    /// State for a single target in our tests.
542
    #[derive(Clone, Debug, Default)]
543
    struct TState {
544
        should_reject: bool,
545
        should_fail: u8,
546
        #[allow(clippy::rc_buffer)]
547
        document: Option<Arc<String>>,
548
    }
549

            
550
    struct TestUploader {
551
        state: Arc<Mutex<HashMap<u32, TState>>>,
552
    }
553

            
554
    #[async_trait]
555
    impl Uploader for TestUploader {
556
        type Doc = String;
557
        type Target = u32;
558
        async fn upload(
559
            self: Arc<Self>,
560
            target: Arc<u32>,
561
            document: Arc<String>,
562
        ) -> Result<(), UploadError> {
563
            let mut map = self.state.lock().unwrap();
564
            let entry: &mut TState = map.entry(*target).or_default();
565
            if entry.should_reject {
566
                Err(UploadError::Rejected(Rejection::from_message(
567
                    "document refused".into(),
568
                )))
569
            } else if entry.should_fail > 0 {
570
                entry.should_fail -= 1;
571
                Err(UploadError::Timeout) // This is a pretend error, but it'll work fine.
572
            } else {
573
                entry.document = Some(document);
574
                Ok(())
575
            }
576
        }
577
    }
578

            
579
    #[test]
580
    fn successful_upload() {
581
        MockRuntime::test_with_various(|rt| async move {
582
            let state = Arc::new(Mutex::new(HashMap::new()));
583
            let uploader = TestUploader {
584
                state: Arc::clone(&state),
585
            };
586

            
587
            let targets = [1, 2, 3].into_iter().map(Arc::new).collect();
588

            
589
            let publisher = Publisher::launch(
590
                &rt,
591
                "Testing".into(),
592
                None,
593
                targets,
594
                Duration::new(1, 0),
595
                Arc::new(uploader),
596
            )
597
            .unwrap();
598

            
599
            // Kick off an initial upload.
600
            publisher.set_document(Some(Arc::new("hello world".into())), false);
601
            let mut status = publisher.watch_current_document_status();
602
            while let Some(s) = status.next().await {
603
                if !s.is_active() {
604
                    break;
605
                }
606
            }
607

            
608
            assert_eq!(state.lock().unwrap().len(), 3);
609
            for n in 1..=3 {
610
                let map = state.lock().unwrap();
611
                assert_eq!(
612
                    map.get(&n).unwrap().document,
613
                    Some(Arc::new("hello world".into()))
614
                );
615
            }
616

            
617
            // Add a target 4.
618
            publisher.adjust_targets(|targets| {
619
                targets.insert(Arc::new(4));
620
            });
621
            while let Some(s) = status.next().await {
622
                if !s.is_active() {
623
                    break;
624
                }
625
            }
626
            assert_eq!(
627
                state.lock().unwrap().get(&4).unwrap().document,
628
                Some(Arc::new("hello world".into()))
629
            );
630

            
631
            // Drop target 1, then replace the document.
632
            publisher.adjust_targets(|targets| {
633
                targets.remove(&1);
634
            });
635
            publisher.set_document(Some(Arc::new("HELLO WORLD".into())), false);
636

            
637
            let mut status = publisher.watch_current_document_status();
638
            while let Some(s) = status.next().await {
639
                if !s.is_active() {
640
                    break;
641
                }
642
            }
643

            
644
            for n in 1..=4 {
645
                let map = state.lock().unwrap();
646
                let s = if n == 1 { "hello world" } else { "HELLO WORLD" };
647
                assert_eq!(map.get(&n).unwrap().document, Some(Arc::new(s.into())));
648
            }
649
        });
650
    }
651

            
652
    #[test]
653
    fn test_with_retries() {
654
        MockRuntime::test_with_various(|rt| async move {
655
            let state = Arc::new(Mutex::new(HashMap::new()));
656
            let uploader = TestUploader {
657
                state: Arc::clone(&state),
658
            };
659

            
660
            let targets = [1, 2, 3].into_iter().map(Arc::new).collect();
661
            for t in 1..=3 {
662
                state.lock().unwrap().insert(
663
                    t,
664
                    TState {
665
                        should_reject: false,
666
                        should_fail: t as u8,
667
                        document: None,
668
                    },
669
                );
670
            }
671

            
672
            let publisher = Publisher::launch(
673
                &rt,
674
                "Testing".into(),
675
                Some(Arc::new("hello world".into())),
676
                targets,
677
                Duration::new(1, 0),
678
                Arc::new(uploader),
679
            )
680
            .unwrap();
681

            
682
            while publisher.status().is_active() {
683
                rt.advance_by(Duration::new(1, 0)).await;
684
            }
685

            
686
            for n in 1..=3 {
687
                let map = state.lock().unwrap();
688
                let e = map.get(&n).unwrap();
689
                assert_eq!(e.document, Some(Arc::new("hello world".into())));
690
                assert_eq!(e.should_fail, 0);
691
            }
692
        });
693
    }
694
}