1
//! Backend logic to upload a document to multiple targets
2
use std::{
3
    collections::HashMap,
4
    fmt::Debug,
5
    hash::Hash,
6
    num::NonZeroUsize,
7
    sync::{
8
        Arc,
9
        atomic::{self, AtomicUsize},
10
    },
11
    time::Duration,
12
};
13

            
14
use futures::{
15
    FutureExt as _, StreamExt as _, future::BoxFuture, select_biased, stream::Fuse,
16
    stream::FuturesUnordered,
17
};
18
use postage::watch;
19
use tor_basic_utils::retry::RetryDelay;
20
use tor_error::warn_report;
21
use tor_rtcompat::SleepProvider;
22
use tracing::{Level, debug, span, trace, warn};
23
use web_time_compat::Instant;
24

            
25
use crate::{
26
    DocVersion, Document, PublishDirective, PublishStatus, Rejection, UploadError, Uploader,
27
};
28

            
29
/// Identifier for a single action that we have queued for a target.
30
///
31
/// (Actions can currently be "try to upload" or "wait till later.")
32
#[derive(Eq, PartialEq, Clone, Copy, Debug)]
33
struct ActionNum(NonZeroUsize);
34

            
35
impl ActionNum {
36
    /// Return a new identifier.
37
    ///
38
    /// We do not guarantee that these are permanently unique:
39
    /// only that there are very unlikely to be two actions with the same ActionNum
40
    /// active at once.
41
88
    fn next() -> Self {
42
        static NEXT: AtomicUsize = AtomicUsize::new(0);
43

            
44
        loop {
45
90
            let val = NEXT.fetch_add(1, atomic::Ordering::Relaxed);
46
90
            if let Some(nz) = NonZeroUsize::new(val) {
47
88
                return ActionNum(nz);
48
2
            }
49
        }
50
88
    }
51
}
52

            
53
/// Mutable status for a single target.
54
#[derive(Debug)]
55
struct TargetStatus {
56
    /// The current state of this target.
57
    state: TargetState,
58

            
59
    /// How many times has this target failed since we last received an answer from it?
60
    /// ("Published" and "Rejected" both count as answers; "Try again later" does not.)
61
    n_failures: usize,
62

            
63
    /// State of our retry-timing algorithm.
64
    retry: RetryDelay,
65

            
66
    /// Identifier for the most recent action that we launched for this target.
67
    ///
68
    /// (Multiple actions can be pending at once.
69
    /// We call an action "the latest action for a target"
70
    /// if its ActionNum matches this value.)
71
    latest_action: Option<ActionNum>,
72
}
73

            
74
/// The current state for a single target.
75
#[derive(Debug)]
76
#[expect(unused)] // TODO: we don't use all these fields yet; we should remove or expose them.
77
enum TargetState {
78
    /// There is no document to upload, so we don't have anything to do.
79
    NoDocument,
80

            
81
    /// We are ready to try uploading the most recent document to this target.
82
    Ready,
83

            
84
    /// We are trying to upload the most recent document to this target.
85
    ///
86
    /// (Invariant: If a target is in this state, there is an Upload future
87
    /// among our pending actions for that target.)
88
    Inflight {
89
        /// The time when we began trying to upload.
90
        since: Instant,
91
    },
92

            
93
    /// An upload attempt has failed; we are waiting for a while before we try again.
94
    ///
95
    /// (Invariant: If a target is in this state, there is a Sleep future
96
    /// among our pending actions for that target.)
97
    Waiting {
98
        /// The time until which we are waiting.
99
        until: Instant,
100
    },
101

            
102
    /// We have published the most recent document to this target.
103
    Published,
104

            
105
    /// We have failed permanently for some reason.
106
    /// We won't retry until the document changes.
107
    PermanentlyFailed(UploadError),
108

            
109
    /// The target told us that it will not accept the most recent document,
110
    /// and so we should not try that document again.
111
    Rejected(Rejection),
112
}
113

            
114
/// An enum to declare whether a document is present.
115
//
116
// (We don't use bool here because bools are error-prone.)
117
#[derive(Clone, Copy, Debug)]
118
enum DocumentPresent {
119
    /// We have a document.
120
    Present,
121
    /// We do not currently have a document to upload
122
    Absent,
123
}
124

            
125
impl DocumentPresent {
126
    /// Return the initial state that new targets should enter.
127
28
    fn initial_target_state(self) -> TargetState {
128
28
        match self {
129
16
            DocumentPresent::Present => TargetState::Ready,
130
12
            DocumentPresent::Absent => TargetState::NoDocument,
131
        }
132
28
    }
133
}
134

            
135
impl<D: ?Sized> super::Document<D> {
136
    /// Return a DocumentPresent for this document.
137
24
    fn present(&self) -> DocumentPresent {
138
24
        if self.contents.is_some() {
139
20
            DocumentPresent::Present
140
        } else {
141
4
            DocumentPresent::Absent
142
        }
143
24
    }
144
}
145

            
146
impl TargetStatus {
147
    /// Construct a new TargetStatus in the Ready state.
148
28
    fn new(initial_delay: Duration, document_present: DocumentPresent) -> Self {
149
28
        Self {
150
28
            n_failures: 0,
151
28
            retry: RetryDelay::from_duration(initial_delay),
152
28
            latest_action: None,
153
28
            state: document_present.initial_target_state(),
154
28
        }
155
28
    }
156

            
157
    /// Called after the latest action for a target has failed:
158
    /// increments the failure count, and returns the interval for which we should wait.
159
    ///
160
    /// If `suggested_delay` is provided, it is an amount that the target
161
    /// told us to wait before retrying.   We will treat this amount
162
    /// as a _minimum_.  A `suggested_delay` will not prevent us
163
    /// from retrying immediately if our failure status is reset.
164
24
    fn set_waiting(
165
24
        &mut self,
166
24
        now: Instant,
167
24
        suggested_delay: Option<Duration>,
168
24
        action: ActionNum,
169
24
    ) -> Duration {
170
24
        self.n_failures += 1;
171
24
        self.latest_action = Some(action);
172

            
173
24
        let d = self.retry.next_delay(&mut rand::rng());
174
24
        let suggested = suggested_delay.unwrap_or_default();
175
24
        let d = std::cmp::max(d, suggested);
176

            
177
24
        self.state = TargetState::Waiting { until: now + d };
178
24
        d
179
24
    }
180

            
181
    /// Called after we have decided to launch an upload for a target.
182
64
    fn set_inflight(&mut self, now: Instant, action: ActionNum) {
183
64
        self.state = TargetState::Inflight { since: now };
184
64
        self.latest_action = Some(action);
185
64
    }
186

            
187
    /// Change this action's state to Ready.
188
24
    fn set_ready(&mut self) {
189
24
        self.state = TargetState::Ready;
190
24
    }
191

            
192
    /// Reset the failure count and timeout for this target.
193
40
    fn reset_failures(&mut self) {
194
40
        self.n_failures = 0;
195
40
        self.retry.reset();
196
40
    }
197

            
198
    /// Called after the latest action for a target has succeeded
199
    /// in uploading the most recent document.
200
40
    fn set_published(&mut self) {
201
40
        self.reset_failures();
202
40
        self.state = TargetState::Published;
203
40
    }
204

            
205
    /// Called after the latest action for a target has been rejected
206
    /// in uploading the most recent document.
207
    fn set_rejected(&mut self, rejection: Rejection) {
208
        self.reset_failures();
209
        self.state = TargetState::Rejected(rejection);
210
    }
211

            
212
    /// Mark this target as permanently unable to receive the current
213
    /// document because of some error `e`.
214
    fn set_permanently_failed(&mut self, e: UploadError) {
215
        self.state = TargetState::PermanentlyFailed(e);
216
    }
217
}
218

            
219
/// The result of a single action.
220
#[derive(Debug)]
221
enum ActionOutcome {
222
    /// A sleep action has expired.
223
    DoneSleeping,
224

            
225
    /// An upload action has succeeded.
226
    Published,
227

            
228
    /// An upload action has been rejected.
229
    Rejected(Rejection),
230

            
231
    /// An upload action has failed with some error.
232
    Err(UploadError),
233
}
234

            
235
impl ActionOutcome {
236
    /// Construct an [`ActionOutcome`] from the result of [`Uploader::upload()`].
237
64
    fn from_upload_result(r: Result<(), UploadError>) -> Self {
238
24
        match r {
239
40
            Ok(()) => Self::Published,
240
            Err(UploadError::Rejected(rejection)) => Self::Rejected(rejection),
241
24
            Err(e) => Self::Err(e),
242
        }
243
64
    }
244
}
245

            
246
/// The type returned by one of the action futures in `PublishReactor.inflight`.
247
struct TaskResult<T: ?Sized> {
248
    /// Which target were we taking this action for?
249
    target: Arc<T>,
250
    /// An `ActionNum` to identify whether the action was the latest one for the target.
251
    action: ActionNum,
252
    /// Which document was the most recent when the action was launched?
253
    doc_version: DocVersion,
254
    /// The result of the action.
255
    outcome: ActionOutcome,
256
}
257

            
258
/// Backend data that we use to publish a document (or series of documents).
259
pub(crate) struct PublishReactor<R: SleepProvider, D: ?Sized, T, UP: ?Sized>
260
where
261
    T: Hash + Eq + ?Sized,
262
{
263
    /// A sleep provider used to launch wait actions.
264
    runtime: R,
265

            
266
    /// A description of what we're uploading, for log messages.
267
    description: String,
268

            
269
    /// The current actions that the [`Publisher`](crate::Publisher) has told us to take.
270
    ///
271
    /// We watch for changes in this directive and adjust our behavior accordingly.
272
    directive: Fuse<watch::Receiver<PublishDirective<D, T>>>,
273

            
274
    /// A channel we use to report our current status to the [`Publisher`](crate::Publisher).
275
    status: watch::Sender<PublishStatus>,
276

            
277
    /// The document which we are currently trying to upload.
278
    latest_document: Document<D>,
279

            
280
    /// The initial retry delay for a failed target.
281
    ///
282
    /// (Used to seed our retry delay algorithm.)
283
    initial_retry_delay: Duration,
284

            
285
    /// The most recent value we've seen for the `reset_count` field of our `PublishDirective`.
286
    latest_reset_count: usize,
287

            
288
    /// The [`Uploader`] object we use to upload documents.
289
    uploader: Arc<UP>,
290

            
291
    /// A set of all our pending actions.
292
    ///
293
    /// Actions are either upload attempts, or sleep actions.
294
    ///
295
    /// Additionally, this `FuturesUnordered` contains a single future that is always
296
    /// pending, to guarantee that `inflight.next()` never returns None.
297
    ///
298
    /// Multiple actions may be inflight for a given target at a time.
299
    /// This is deliberate:
300
    /// If we change our document while the upload of an older document is inflight,
301
    /// we do not want to stop the inflight upload in the middle.
302
    ///
303
    /// TODO: We _could_ cancel any Sleep action that is superseded.
304
    /// That's a fair amount of effort, though, since FuturesUnordered doesn't have
305
    /// very nice accessors nor does Sleep have a good way to make it cancellable.
306
    inflight: FuturesUnordered<BoxFuture<'static, TaskResult<T>>>,
307

            
308
    /// The current status for each of our live targets.
309
    target_status: HashMap<Arc<T>, TargetStatus>,
310
}
311

            
312
/// Return type used to tell the reactor loop to exit.
313
#[derive(Debug)]
314
struct ExitLoop;
315

            
316
impl<R: SleepProvider, D, T, UP> PublishReactor<R, D, T, UP>
317
where
318
    D: ?Sized,
319
    T: Hash + Eq + Send + Sync + Debug + ?Sized + 'static,
320
    UP: Uploader<Doc = D, Target = T> + ?Sized,
321
{
322
    /// Construct a new reactor.
323
    ///
324
    /// (Does not launch any background task or do any work).
325
8
    pub(crate) fn new(
326
8
        runtime: R,
327
8
        description: String,
328
8
        action: watch::Receiver<PublishDirective<D, T>>,
329
8
        status: watch::Sender<PublishStatus>,
330
8
        initial_retry_delay: Duration,
331
8
        publisher: Arc<UP>,
332
8
    ) -> Self {
333
8
        let (latest_document, latest_reset_count, targets) = {
334
8
            let cur_action = action.borrow();
335
8
            (
336
8
                cur_action.document.clone(),
337
8
                cur_action.reset_failures_count,
338
8
                cur_action.targets.clone(),
339
8
            )
340
8
        };
341

            
342
8
        let inflight = FuturesUnordered::new();
343
        // Add a never-finished future to keep FuturesUnordered from saying it's done.
344
8
        inflight.push(Box::pin(std::future::pending()) as _);
345
8
        let document_present = latest_document.present();
346

            
347
8
        let target_status = targets
348
8
            .into_iter()
349
24
            .map(|t| (t, TargetStatus::new(initial_retry_delay, document_present)))
350
8
            .collect();
351

            
352
8
        Self {
353
8
            runtime,
354
8
            description,
355
8
            directive: action.fuse(),
356
8
            status,
357
8
            latest_document,
358
8
            initial_retry_delay,
359
8
            latest_reset_count,
360
8
            uploader: publisher,
361
8
            inflight,
362
8
            target_status,
363
8
        }
364
8
    }
365

            
366
    /// Run forever, handling changes in the [`PublishDirective`], uploading documents, and reporting status.
367
8
    pub(crate) async fn run(mut self) {
368
8
        let _span = span!(Level::TRACE, "Publishing {}", self.description);
369

            
370
        // The first time we start, we begin uploading.
371
8
        self.launch_ready_requests(self.runtime.now());
372
8
        self.recalculate_status();
373

            
374
        'mainloop: loop {
375
112
            select_biased! {
376
                // We've been told to do something new, _or_ the last handle to the Publisher has
377
                // been dropped.
378
112
                directive_changed = self.directive.next() => {
379
24
                    let Some(directive) = directive_changed else {
380
                        // The watch::Receiver stream returned None,
381
                        // so we know that the last handle has been dropped.
382
8
                        trace!("directive stream dropped; exiting");
383
8
                        break 'mainloop;
384
                    };
385

            
386
                    // Process any change in the action.
387
16
                    if let Err(ExitLoop) = self.directive_changed(&directive) {
388
                        trace!("directive is shutdown: exiting");
389
                        break 'mainloop;
390
16
                    }
391

            
392
                    // Update our `PublishStatus`.
393
16
                    self.recalculate_status();
394
                }
395

            
396
                // Some action has finished; update accordingly.
397
112
                publication_result = self.inflight.next() => {
398
88
                    let task_result = publication_result.expect("Stream ended unexpectedly.");
399
88
                    self.handle_task_result(task_result);
400
88
                }
401
            }
402
        }
403

            
404
8
        self.status.borrow_mut().shutdown = true;
405
8
    }
406

            
407
    /// Called when a task in `self.inflight` produces a result.
408
    ///
409
    /// Update our status and launch new tasks as appropriate.
410
    #[allow(clippy::cognitive_complexity)]
411
88
    fn handle_task_result(&mut self, task_result: TaskResult<T>) {
412
        let TaskResult {
413
88
            target,
414
88
            action,
415
88
            doc_version,
416
88
            outcome,
417
88
        } = task_result;
418

            
419
88
        let Some(status) = self.target_status.get_mut(&target) else {
420
            // The target isn't here, so we don't care about what happened with it.
421
            trace!(?target, ?outcome, "Ignoring result for removed target.");
422
            return;
423
        };
424
88
        if Some(action) != status.latest_action {
425
            // There is a more recent inflight action for this target;
426
            // ignore the results of this one.
427
            //
428
            // (See note on `Publish.inflight` about why we can have multiple inflight
429
            // actions.)
430
            //
431
            // We use a != comparison here rather than < since we allow the action
432
            // identifier space to wrap around.
433
            trace!(?target, ?outcome, "Ignoring result for superseded action.");
434
            return;
435
88
        }
436

            
437
24
        match outcome {
438
            ActionOutcome::Published => {
439
40
                if doc_version != self.latest_document.version {
440
                    // We aren't tracking this particular document any more;
441
                    // this was a stale upload.
442
                    return;
443
40
                }
444

            
445
40
                trace!(?target, "Document published");
446
40
                status.set_published();
447
            }
448
            ActionOutcome::Rejected(rejection) => {
449
                if doc_version != self.latest_document.version {
450
                    // We aren't tracking this particular document any more;
451
                    // this was a stale upload.
452
                    return;
453
                }
454

            
455
                warn!(
456
                    "{} upload rejected. The target ({:?}) said {}",
457
                    &self.description, &target, &rejection
458
                );
459

            
460
                status.set_rejected(rejection);
461
            }
462
24
            ActionOutcome::DoneSleeping => {
463
24
                // It's time to try a new upload to this target.
464
24
                self.launch_one(&target, self.runtime.now());
465
24
            }
466
24
            ActionOutcome::Err(e) if !e.is_retriable() => {
467
                warn_report!(
468
                    &e,
469
                    "Attempt to publish {} to {:?} failed. Not retriable.",
470
                    &self.description,
471
                    &target
472
                );
473
                status.set_permanently_failed(e);
474
            }
475
24
            ActionOutcome::Err(e) => {
476
                // We failed to upload: we log the error and wait until it's time to
477
                // retry.
478

            
479
                // TODO: This might need to be downgraded, but for now we'll leave it as-is.
480
24
                warn_report!(
481
                    &e,
482
                    "Attempt to publish {} to {:?} failed. We'll retry later.",
483
                    &self.description,
484
                    &target
485
                );
486
24
                self.begin_sleeping(target, e.suggested_delay(), self.runtime.now());
487
            }
488
        }
489

            
490
88
        self.recalculate_status();
491
88
    }
492

            
493
    /// Called when we have received a new [`PublishDirective`] from the publisher.
494
    ///
495
    /// Update the status of all of our targets, and launch new uploads as appropriate.
496
16
    fn directive_changed(&mut self, directive: &PublishDirective<D, T>) -> Result<(), ExitLoop> {
497
        use TargetState::*;
498

            
499
16
        if directive.shutdown {
500
            // We're supposed to shut down.  Just go ahead and do that.
501
            return Err(ExitLoop);
502
16
        }
503

            
504
        // Check to see if any targets have been added or removed;
505
        // update target_status accordingly.
506
16
        let document_present = directive.document.present();
507
52
        for new_target in directive.targets.iter() {
508
52
            self.target_status
509
52
                .entry(new_target.clone())
510
52
                .or_insert_with(|| TargetStatus::new(self.initial_retry_delay, document_present));
511
        }
512
16
        self.target_status
513
56
            .retain(|t, _| directive.targets.contains(t));
514

            
515
        // Have gotten a new document?  Have we been told to reset failing targets?
516
        //
517
        // (We use != rather than > here since we want to allow these counters to wrap around.)
518
16
        let document_changed = directive.document.version != self.latest_document.version;
519
16
        let reset_failing_targets = directive.reset_failures_count != self.latest_reset_count;
520
        // Update our own versions of the counters from the PublishDirective.
521
16
        if document_changed {
522
8
            self.latest_document = directive.document.clone();
523
8
        }
524
16
        self.latest_reset_count = directive.reset_failures_count;
525

            
526
16
        let no_document = directive.document.contents.is_none();
527
16
        if document_changed {
528
8
            let v = self.latest_document.version;
529
8
            if no_document {
530
                trace!("Publisher paused (version {v:?})");
531
            } else {
532
8
                trace!("New document (version {v:?})");
533
            }
534
8
        }
535

            
536
        // Reset failure timings if appropriate,
537
        // and mark targets ready if we want to launch a new upload to them.
538
52
        for status in self.target_status.values_mut() {
539
52
            if reset_failing_targets {
540
                status.reset_failures();
541
52
            }
542

            
543
52
            if no_document {
544
                status.state = NoDocument;
545
                continue;
546
52
            }
547

            
548
52
            let should_reset = match &status.state {
549
                // If this target is waiting, then we should let it continue
550
                // waiting unless we've been told to reset failing targets.
551
                Waiting { .. } => reset_failing_targets,
552

            
553
                // If the target is ready, there's no point in making it ready.
554
4
                Ready => false,
555

            
556
                // If we're currently uploading to a target,
557
                // we only want to launch a new upload if the document changed.
558
12
                Inflight { .. } => document_changed,
559

            
560
                // If we've published successfully,
561
                // or if we have been rejected,
562
                // or if we had nothing to do,
563
                // we only want to launch a new upload if the document changed.
564
24
                Published | Rejected(_) | PermanentlyFailed(_) => document_changed,
565

            
566
                // If we had no document, we want to launch now that we have one.
567
12
                NoDocument => true,
568
            };
569

            
570
52
            if should_reset {
571
24
                status.set_ready();
572
28
            }
573
        }
574

            
575
16
        self.launch_ready_requests(self.runtime.now());
576

            
577
16
        Ok(())
578
16
    }
579

            
580
    /// Launch a new upload for every Ready target,
581
    /// making its status Inflight.
582
24
    fn launch_ready_requests(&mut self, now: Instant) {
583
        // Build a list of the ready targets.
584
        //
585
        // This is a separate step to avoid a concurrent mutable/immutable borrow.
586
24
        let to_launch: Vec<Arc<T>> = self
587
24
            .target_status
588
24
            .iter()
589
76
            .filter(|(_target, status)| matches!(&status.state, TargetState::Ready))
590
40
            .map(|(target, _status)| Arc::clone(target))
591
24
            .collect();
592

            
593
        // Launch an upload for each of them.
594
40
        for target in to_launch {
595
40
            self.launch_one(&target, now);
596
40
        }
597
24
    }
598

            
599
    /// Compute a new [`PublishStatus`] reflecting our progress uploading the current document,
600
    /// and deliver it to the Publisher.
601
112
    fn recalculate_status(&mut self) {
602
        use TargetState::*;
603

            
604
112
        let n_targets = self.target_status.len();
605
112
        let mut n_inert = 0;
606
112
        let mut n_pending = 0;
607
112
        let mut n_failing = 0;
608
112
        let mut n_failed = 0;
609
112
        let mut n_published = 0;
610
112
        let mut n_rejected = 0;
611

            
612
344
        for status in self.target_status.values() {
613
344
            match &status.state {
614
12
                NoDocument => n_inert += 1,
615
126
                Published => n_published += 1,
616
                Rejected(_) => n_rejected += 1,
617
                Ready => {}
618
                PermanentlyFailed(_) => n_failed += 1,
619
                Inflight { .. } | Waiting { .. } => {
620
206
                    if status.n_failures > 0 {
621
118
                        n_failing += 1;
622
118
                    } else {
623
88
                        n_pending += 1;
624
88
                    }
625
                }
626
            }
627
        }
628

            
629
112
        let new_status = PublishStatus {
630
112
            document_version: self.latest_document.version,
631
112
            n_targets,
632
112
            n_inert,
633
112
            n_published,
634
112
            n_rejected,
635
112
            n_failed_permanently: n_failed,
636
112
            n_failing,
637
112
            n_pending,
638
112
            initialized: true,
639
112
            shutdown: false,
640
112
        };
641
112
        debug!("Publishing {}: {}", &self.description, &new_status);
642

            
643
112
        {
644
112
            *self.status.borrow_mut() = new_status;
645
112
        }
646
112
    }
647

            
648
    /// Launch an upload action for a given `target`, changing its status to Inflight.
649
64
    fn launch_one(&mut self, target: &Arc<T>, now: Instant) {
650
        // Launch the publish request, and add it to inflight.
651
64
        let Some(status) = self.target_status.get_mut(target) else {
652
            return;
653
        };
654
64
        let Some(document) = self.latest_document.contents.clone() else {
655
            // There's no document, so we can't upload it.
656
            return;
657
        };
658
64
        let target = target.clone();
659

            
660
64
        trace!(?target, "Launching {} upload request", &self.description);
661

            
662
64
        let doc_version = self.latest_document.version;
663
64
        let action = ActionNum::next();
664
64
        let future = Arc::clone(&self.uploader)
665
64
            .upload(target.clone(), document)
666
64
            .map(move |res| TaskResult {
667
64
                target,
668
64
                action,
669
64
                doc_version,
670
64
                outcome: ActionOutcome::from_upload_result(res),
671
64
            });
672
64
        self.inflight.push(Box::pin(future));
673

            
674
64
        status.set_inflight(now, action);
675
64
    }
676

            
677
    /// Launch a sleep action for a given `target`, changing its status to `Waiting`.
678
24
    fn begin_sleeping(&mut self, target: Arc<T>, suggested_delay: Option<Duration>, now: Instant) {
679
        // Launch the publish request, and add it to inflight.
680
24
        let Some(status) = self.target_status.get_mut(&target) else {
681
            return;
682
        };
683

            
684
24
        let action = ActionNum::next();
685
24
        let delay = status.set_waiting(now, suggested_delay, action);
686

            
687
24
        trace!(
688
            ?target,
689
            ?delay,
690
            "Waiting for next {} upload attempt.",
691
            &self.description
692
        );
693

            
694
24
        let doc_version = self.latest_document.version;
695
24
        let future = self.runtime.sleep(delay).map(move |()| TaskResult {
696
24
            target,
697
24
            action,
698
24
            doc_version,
699
24
            outcome: ActionOutcome::DoneSleeping,
700
24
        });
701
24
        self.inflight.push(Box::pin(future));
702
24
    }
703
}