1
//! The onion service publisher reactor.
2
//!
3
//! Generates and publishes hidden service descriptors in response to various events.
4
//!
5
//! [`Reactor::run`] is the entry-point of the reactor. It starts the reactor,
6
//! and runs until [`Reactor::run_once`] returns [`ShutdownStatus::Terminate`]
7
//! or a fatal error occurs. `ShutdownStatus::Terminate` is returned if
8
//! any of the channels the reactor is receiving events from is closed
9
//! (i.e. when the senders are dropped).
10
//!
11
//! ## Publisher status
12
//!
13
//! The publisher has an internal [`PublishStatus`], distinct from its [`State`],
14
//! which is used for onion service status reporting.
15
//!
16
//! The main loop of the reactor reads the current `PublishStatus` from `publish_status_rx`,
17
//! and responds by generating and publishing a new descriptor if needed.
18
//!
19
//! See [`PublishStatus`] and [`Reactor::publish_status_rx`] for more details.
20
//!
21
//! ## When do we publish?
22
//!
23
//! We generate and publish a new descriptor if
24
//!   * the introduction points have changed
25
//!   * the onion service configuration has changed in a meaningful way (for example,
26
//!     if the `restricted_discovery` configuration or its [`Anonymity`](crate::Anonymity)
27
//!     has changed. See [`OnionServiceConfigPublisherView`]).
28
//!   * there is a new consensus
29
//!   * it is time to republish the descriptor (after we upload a descriptor,
30
//!     we schedule it for republishing at a random time between 60 minutes and 120 minutes
31
//!     in the future)
32
//!
33
//! ## Onion service status
34
//!
35
//! With respect to [`OnionServiceStatus`] reporting,
36
//! the following state transitions are possible:
37
//!
38
//!
39
//! ```ignore
40
//!
41
//!                 update_publish_status(UploadScheduled|AwaitingIpts|RateLimited)
42
//!                +---------------------------------------+
43
//!                |                                       |
44
//!                |                                       v
45
//!                |                               +---------------+
46
//!                |                               | Bootstrapping |
47
//!                |                               +---------------+
48
//!                |                                       |
49
//!                |                                       |           uploaded to at least
50
//!                |  not enough HsDir uploads succeeded   |        some HsDirs from each ring
51
//!                |         +-----------------------------+-----------------------+
52
//!                |         |                             |                       |
53
//!                |         |              all HsDir uploads succeeded            |
54
//!                |         |                             |                       |
55
//!                |         v                             v                       v
56
//!                |  +---------------------+         +---------+        +---------------------+
57
//!                |  | DegradedUnreachable |         | Running |        |  DegradedReachable  |
58
//! +----------+   |  +---------------------+         +---------+        +---------------------+
59
//! | Shutdown |-- |         |                           |                        |
60
//! +----------+   |         |                           |                        |
61
//!                |         |                           |                        |
62
//!                |         |                           |                        |
63
//!                |         +---------------------------+------------------------+
64
//!                |                                     |   invalid authorized_clients
65
//!                |                                     |      after handling config change
66
//!                |                                     |
67
//!                |                                     v
68
//!                |     run_once() returns an error +--------+
69
//!                +-------------------------------->| Broken |
70
//!                                                  +--------+
71
//! ```
72
//!
73
//! We can also transition from `Broken`, `DegradedReachable`, or `DegradedUnreachable`
74
//! back to `Bootstrapping` (those transitions were omitted for brevity).
75

            
76
use tor_circmgr::ServiceOnionServiceDirTunnel;
77
use tor_config::file_watcher::{
78
    self, Event as FileEvent, FileEventReceiver, FileEventSender, FileWatcher, FileWatcherBuilder,
79
};
80
use tor_config_path::{CfgPath, CfgPathResolver};
81
use tor_dirclient::SourceInfo;
82
use tor_netdir::{DirEvent, NetDir};
83
use tracing::instrument;
84

            
85
use crate::config::OnionServiceConfigPublisherView;
86
use crate::config::restricted_discovery::{
87
    DirectoryKeyProviderList, RestrictedDiscoveryConfig, RestrictedDiscoveryKeys,
88
};
89
use crate::status::{DescUploadRetryError, Problem};
90

            
91
use super::*;
92
use derive_more::From;
93

            
94
// TODO-CLIENT-AUTH: perhaps we should add a separate CONFIG_CHANGE_REPUBLISH_DEBOUNCE_INTERVAL
95
// for rate-limiting the publish jobs triggered by a change in the config?
96
//
97
// Currently the descriptor publish tasks triggered by changes in the config
98
// are rate-limited via the usual rate limiting mechanism
99
// (which rate-limits the uploads for 1m).
100
//
101
// I think this is OK for now, but we might need to rethink this if it becomes problematic
102
// (for example, we might want an even longer rate-limit, or to reset any existing rate-limits
103
// each time the config is modified).
104

            
105
/// The upload rate-limiting threshold.
106
///
107
/// Before initiating an upload, the reactor checks if the last upload was at least
108
/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
109
/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
110
/// current time.
111
//
112
// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
113
const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
114

            
115
/// The maximum number of concurrent upload tasks per time period.
116
//
117
// TODO: this value was arbitrarily chosen and may not be optimal.  For now, it
118
// will have no effect, since the current number of replicas is far less than
119
// this value.
120
//
121
// The uploads for all TPs happen in parallel.  As a result, the actual limit for the maximum
122
// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
123
// (currently 2, which means the concurrency limit will, in fact, be 32).
124
//
125
// We should try to decouple this value from the TP parameters.
126
const MAX_CONCURRENT_UPLOADS: usize = 16;
127

            
128
/// The maximum time allowed for uploading a descriptor to a single HSDir,
129
/// across all attempts.
130
pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
131

            
132
/// A reactor for the HsDir [`Publisher`]
133
///
134
/// The entrypoint is [`Reactor::run`].
135
#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
136
pub(super) struct Reactor<R: Runtime, M: Mockable> {
137
    /// The immutable, shared inner state.
138
    imm: Arc<Immutable<R, M>>,
139
    /// A source for new network directories that we use to determine
140
    /// our HsDirs.
141
    dir_provider: Arc<dyn NetDirProvider>,
142
    /// The mutable inner state,
143
    inner: Arc<Mutex<Inner>>,
144
    /// A channel for receiving IPT change notifications.
145
    ipt_watcher: IptsPublisherView,
146
    /// A channel for receiving onion service config change notifications.
147
    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
148
    /// A channel for receiving restricted discovery key_dirs change notifications.
149
    key_dirs_rx: FileEventReceiver,
150
    /// A channel for sending restricted discovery key_dirs change notifications.
151
    ///
152
    /// A copy of this sender is handed out to every `FileWatcher` created.
153
    key_dirs_tx: FileEventSender,
154
    /// A channel for receiving updates regarding our [`PublishStatus`].
155
    ///
156
    /// The main loop of the reactor watches for updates on this channel.
157
    ///
158
    /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
159
    /// we can start publishing descriptors.
160
    ///
161
    /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
162
    /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
163
    /// established some introduction points.
164
    publish_status_rx: watch::Receiver<PublishStatus>,
165
    /// A sender for updating our [`PublishStatus`].
166
    ///
167
    /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
168
    /// we can start publishing descriptors.
169
    publish_status_tx: watch::Sender<PublishStatus>,
170
    /// A channel for sending upload completion notifications.
171
    ///
172
    /// This channel is polled in the main loop of the reactor.
173
    upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
174
    /// A channel for receiving upload completion notifications.
175
    ///
176
    /// A copy of this sender is handed to each upload task.
177
    upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
178
    /// A sender for notifying any pending upload tasks that the reactor is shutting down.
179
    ///
180
    /// Receivers can use this channel to find out when reactor is dropped.
181
    ///
182
    /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
183
    /// Any future background tasks can also use this channel to detect if the reactor is dropped.
184
    ///
185
    /// Closing this channel will cause any pending upload tasks to be dropped.
186
    shutdown_tx: broadcast::Sender<Void>,
187
    /// Path resolver for configuration files.
188
    path_resolver: Arc<CfgPathResolver>,
189
    /// Queue on which we receive messages from the [`PowManager`] telling us that a seed has
190
    /// rotated and thus we need to republish the descriptor for a particular time period.
191
    update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
192
}
193

            
194
/// The immutable, shared state of the descriptor publisher reactor.
195
#[derive(Clone)]
196
struct Immutable<R: Runtime, M: Mockable> {
197
    /// The runtime.
198
    runtime: R,
199
    /// Mockable state.
200
    ///
201
    /// This is used for launching circuits and for obtaining random number generators.
202
    mockable: M,
203
    /// The service for which we're publishing descriptors.
204
    nickname: HsNickname,
205
    /// The key manager,
206
    keymgr: Arc<KeyMgr>,
207
    /// A sender for updating the status of the onion service.
208
    status_tx: PublisherStatusSender,
209
    /// Proof-of-work state.
210
    pow_manager: Arc<PowManager<R>>,
211
}
212

            
213
impl<R: Runtime, M: Mockable> Immutable<R, M> {
214
    /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
215
    /// with the specified [`TimePeriod`].
216
    ///
217
    /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
218
    /// the private part of the blinded identity key. Otherwise, the key is the private part of the
219
    /// descriptor signing key.
220
    ///
221
    /// Returns an error if the service is running in offline mode and the descriptor signing
222
    /// keypair of the specified `period` is not available.
223
    //
224
    // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
225
    // built from the blinded id key
226
144
    fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
227
144
        let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
228
144
            Some(key) => {
229
144
                let key: ed25519::ExpandedKeypair = key.into();
230
144
                key.to_secret_key_bytes()[0..32]
231
144
                    .try_into()
232
144
                    .expect("Wrong length on slice")
233
            }
234
            None => {
235
                // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
236
                // is unreachable (for now).
237
                let desc_sign_key_spec =
238
                    DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
239
                let key: ed25519::Keypair = self
240
                    .keymgr
241
                    .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
242
                    // TODO (#1194): internal! is not the right type for this error (we need an
243
                    // error type for the case where a hidden service running in offline mode has
244
                    // run out of its pre-previsioned keys).
245
                    //
246
                    // This will be addressed when we add support for offline hs_id mode
247
                    .ok_or_else(|| {
248
                        internal!(
249
                            "identity keys are offline, but descriptor signing key is unavailable?!"
250
                        )
251
                    })?
252
                    .into();
253
                key.to_bytes()
254
            }
255
        };
256

            
257
144
        Ok(AesOpeKey::from_secret(&ope_key))
258
144
    }
259

            
260
    /// Generate a revision counter for a descriptor associated with the specified
261
    /// [`TimePeriod`].
262
    ///
263
    /// Returns a revision counter generated according to the [encrypted time in period] scheme.
264
    ///
265
    /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
266
144
    fn generate_revision_counter(
267
144
        &self,
268
144
        params: &HsDirParams,
269
144
        now: SystemTime,
270
144
    ) -> Result<RevisionCounter, FatalError> {
271
        // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
272
        // to each time we generate a new descriptor), for performance reasons.
273
144
        let ope_key = self.create_ope_key(params.time_period())?;
274

            
275
        // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
276
144
        let srv_start = params.start_of_shard_rand_period();
277
144
        let offset = params.offset_within_srv_period(now).ok_or_else(|| {
278
            internal!(
279
                "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
280
                now,
281
                srv_start
282
            )
283
        })?;
284
144
        let rev = ope_key.encrypt(offset);
285

            
286
144
        Ok(RevisionCounter::from(rev))
287
144
    }
288
}
289

            
290
/// Mockable state for the descriptor publisher reactor.
291
///
292
/// This enables us to mock parts of the [`Reactor`] for testing purposes.
293
#[async_trait]
294
pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
295
    /// The type of random number generator.
296
    type Rng: rand::Rng + rand::CryptoRng;
297

            
298
    /// The type of client circuit.
299
    type Tunnel: MockableDirTunnel;
300

            
301
    /// Return a random number generator.
302
    fn thread_rng(&self) -> Self::Rng;
303

            
304
    /// Create a circuit of the specified `kind` to `target`.
305
    async fn get_or_launch_hs_dir<T>(
306
        &self,
307
        netdir: &NetDir,
308
        target: T,
309
    ) -> Result<Self::Tunnel, tor_circmgr::Error>
310
    where
311
        T: CircTarget + Send + Sync;
312

            
313
    /// Return an estimate-based value for how long we should allow a single
314
    /// directory upload operation to complete.
315
    ///
316
    /// Includes circuit construction, stream opening, upload, and waiting for a
317
    /// response.
318
    fn estimate_upload_timeout(&self) -> Duration;
319
}
320

            
321
/// Mockable client circuit
322
#[async_trait]
323
pub(crate) trait MockableDirTunnel: Send + Sync {
324
    /// The data stream type.
325
    type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
326

            
327
    /// Start a new stream to the last relay in the circuit, using
328
    /// a BEGIN_DIR cell.
329
    async fn begin_dir_stream(&self) -> Result<Self::DataStream, tor_circmgr::Error>;
330

            
331
    /// Try to get a SourceInfo for this circuit, for using it in a directory request.
332
    fn source_info(&self) -> tor_proto::Result<Option<SourceInfo>>;
333
}
334

            
335
#[async_trait]
336
impl MockableDirTunnel for ServiceOnionServiceDirTunnel {
337
    type DataStream = tor_proto::client::stream::DataStream;
338

            
339
    async fn begin_dir_stream(&self) -> Result<Self::DataStream, tor_circmgr::Error> {
340
        Self::begin_dir_stream(self).await
341
    }
342

            
343
    fn source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
344
        SourceInfo::from_tunnel(self)
345
    }
346
}
347

            
348
/// The real version of the mockable state of the reactor.
349
#[derive(Clone, From, Into)]
350
pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
351

            
352
#[async_trait]
353
impl<R: Runtime> Mockable for Real<R> {
354
    type Rng = rand::rngs::ThreadRng;
355
    type Tunnel = ServiceOnionServiceDirTunnel;
356

            
357
    fn thread_rng(&self) -> Self::Rng {
358
        rand::rng()
359
    }
360

            
361
    #[instrument(level = "trace", skip_all)]
362
    async fn get_or_launch_hs_dir<T>(
363
        &self,
364
        netdir: &NetDir,
365
        target: T,
366
    ) -> Result<Self::Tunnel, tor_circmgr::Error>
367
    where
368
        T: CircTarget + Send + Sync,
369
    {
370
        self.0.get_or_launch_svc_dir(netdir, target).await
371
    }
372

            
373
    fn estimate_upload_timeout(&self) -> Duration {
374
        use tor_circmgr::timeouts::Action;
375
        let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
376
        let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
377
        // We assume that in the worst case we'll have to wait for an entire
378
        // circuit construction and two round-trips to the hsdir.
379
        let est_total = est_build + est_roundtrip * 2;
380
        // We always allow _at least_ this much time, in case our estimate is
381
        // ridiculously low.
382
        let min_timeout = Duration::from_secs(30);
383
        max(est_total, min_timeout)
384
    }
385
}
386

            
387
/// The mutable state of a [`Reactor`].
388
struct Inner {
389
    /// The onion service config.
390
    config: Arc<OnionServiceConfigPublisherView>,
391
    /// Watcher for key_dirs.
392
    ///
393
    /// Set to `None` if the reactor is not running, or if `watch_configuration` is false.
394
    ///
395
    /// The watcher is recreated whenever the `restricted_discovery.key_dirs` change.
396
    file_watcher: Option<FileWatcher>,
397
    /// The relevant time periods.
398
    ///
399
    /// This includes the current time period, as well as any other time periods we need to be
400
    /// publishing descriptors for.
401
    ///
402
    /// This is empty until we fetch our first netdir in [`Reactor::run`].
403
    time_periods: Vec<TimePeriodContext>,
404
    /// Our most up to date netdir.
405
    ///
406
    /// This is initialized in [`Reactor::run`].
407
    netdir: Option<Arc<NetDir>>,
408
    /// The timestamp of our last upload.
409
    ///
410
    /// This is the time when the last update was _initiated_ (rather than completed), to prevent
411
    /// the publisher from spawning multiple upload tasks at once in response to multiple external
412
    /// events happening in quick succession, such as the IPT manager sending multiple IPT change
413
    /// notifications in a short time frame (#1142), or an IPT change notification that's
414
    /// immediately followed by a consensus change. Starting two upload tasks at once is not only
415
    /// inefficient, but it also causes the publisher to generate two different descriptors with
416
    /// the same revision counter (the revision counter is derived from the current timestamp),
417
    /// which ultimately causes the slower upload task to fail (see #1142).
418
    ///
419
    /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
420
    /// used for retrying failed uploads (these are handled internally by
421
    /// [`Reactor::upload_descriptor_with_retries`]).
422
    last_uploaded: Option<Instant>,
423
    /// A max-heap containing the time periods for which we need to reupload the descriptor.
424
    // TODO: we are currently reuploading more than nececessary.
425
    // Ideally, this shouldn't contain contain duplicate TimePeriods,
426
    // because we only need to retain the latest reupload time for each time period.
427
    //
428
    // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
429
    // we will end up with multiple ReuploadTimer entries for that TP,
430
    // each of which will (eventually) result in a reupload.
431
    //
432
    // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
433
    //
434
    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
435
    reupload_timers: BinaryHeap<ReuploadTimer>,
436
    /// The restricted discovery authorized clients.
437
    ///
438
    /// `None`, unless the service is running in restricted discovery mode.
439
    authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
440
}
441

            
442
/// The part of the reactor state that changes with every time period.
443
struct TimePeriodContext {
444
    /// The HsDir params.
445
    params: HsDirParams,
446
    /// The HsDirs to use in this time period.
447
    ///
448
    // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
449
    // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
450
    // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
451
    // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
452
    hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
453
    /// The revision counter of the last successful upload, if any.
454
    last_successful: Option<RevisionCounter>,
455
    /// The outcome of the last upload, if any.
456
    upload_results: Vec<HsDirUploadStatus>,
457
}
458

            
459
impl TimePeriodContext {
460
    /// Create a new `TimePeriodContext`.
461
    ///
462
    /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
463
    /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
464
8
    fn new<'r>(
465
8
        params: HsDirParams,
466
8
        blind_id: HsBlindId,
467
8
        netdir: &Arc<NetDir>,
468
8
        old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
469
8
        old_upload_results: Vec<HsDirUploadStatus>,
470
8
    ) -> Result<Self, FatalError> {
471
8
        let period = params.time_period();
472
8
        let hs_dirs = Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?;
473
8
        let upload_results = old_upload_results
474
8
            .into_iter()
475
8
            .filter(|res|
476
                // Check if the HsDir of this result still exists
477
                hs_dirs
478
                    .iter()
479
                    .any(|(relay_ids, _status)| relay_ids == &res.relay_ids))
480
8
            .collect();
481

            
482
8
        Ok(Self {
483
8
            params,
484
8
            hs_dirs,
485
8
            last_successful: None,
486
8
            upload_results,
487
8
        })
488
8
    }
489

            
490
    /// Recompute the HsDirs for this time period.
491
8
    fn compute_hsdirs<'r>(
492
8
        period: TimePeriod,
493
8
        blind_id: HsBlindId,
494
8
        netdir: &Arc<NetDir>,
495
8
        mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
496
8
    ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
497
8
        let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
498

            
499
8
        Ok(hs_dirs
500
64
            .map(|hs_dir| {
501
64
                let mut builder = RelayIds::builder();
502
64
                if let Some(ed_id) = hs_dir.ed_identity() {
503
64
                    builder.ed_identity(*ed_id);
504
64
                }
505

            
506
64
                if let Some(rsa_id) = hs_dir.rsa_identity() {
507
64
                    builder.rsa_identity(*rsa_id);
508
64
                }
509

            
510
64
                let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
511

            
512
                // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
513
                // reupload it unless it was already dirty and due for a reupload.
514
64
                let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
515
                    Some((_, status)) => *status,
516
64
                    None => DescriptorStatus::Dirty,
517
                };
518

            
519
64
                (relay_id, status)
520
64
            })
521
8
            .collect::<Vec<_>>())
522
8
    }
523

            
524
    /// Mark the descriptor dirty for all HSDirs of this time period.
525
18
    fn mark_all_dirty(&mut self) {
526
18
        self.hs_dirs
527
18
            .iter_mut()
528
153
            .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
529
18
    }
530

            
531
    /// Update the upload result for this time period.
532
14
    fn set_upload_results(&mut self, upload_results: Vec<HsDirUploadStatus>) {
533
14
        self.upload_results = upload_results;
534
14
    }
535
}
536

            
537
/// An error that occurs while trying to upload a descriptor.
538
#[derive(Clone, Debug, thiserror::Error)]
539
#[non_exhaustive]
540
pub enum UploadError {
541
    /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
542
    #[error("descriptor upload request failed: {}", _0.error)]
543
    Request(#[from] RequestFailedError),
544

            
545
    /// Failed to establish circuit to hidden service directory
546
    #[error("could not build circuit to HsDir")]
547
    Circuit(#[from] tor_circmgr::Error),
548

            
549
    /// Failed to establish stream to hidden service directory
550
    #[error("failed to establish directory stream to HsDir")]
551
    Stream(#[source] tor_circmgr::Error),
552

            
553
    /// An internal error.
554
    #[error("Internal error")]
555
    Bug(#[from] tor_error::Bug),
556
}
557
define_asref_dyn_std_error!(UploadError);
558

            
559
impl UploadError {
560
    /// Return true if this error is one that we should report as a suspicious event,
561
    /// along with the dirserver, and description of the relevant document.
562
64
    pub(crate) fn should_report_as_suspicious(&self) -> bool {
563
64
        match self {
564
64
            UploadError::Request(e) => e.error.should_report_as_suspicious_if_anon(),
565
            UploadError::Circuit(_) => false, // TODO prop360
566
            UploadError::Stream(_) => false,  // TODO prop360
567
            UploadError::Bug(_) => false,
568
        }
569
64
    }
570
}
571

            
572
impl<R: Runtime, M: Mockable> Reactor<R, M> {
573
    /// Create a new `Reactor`.
574
    #[allow(clippy::too_many_arguments)]
575
8
    pub(super) fn new(
576
8
        runtime: R,
577
8
        nickname: HsNickname,
578
8
        dir_provider: Arc<dyn NetDirProvider>,
579
8
        mockable: M,
580
8
        config: &OnionServiceConfig,
581
8
        ipt_watcher: IptsPublisherView,
582
8
        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
583
8
        status_tx: PublisherStatusSender,
584
8
        keymgr: Arc<KeyMgr>,
585
8
        path_resolver: Arc<CfgPathResolver>,
586
8
        pow_manager: Arc<PowManager<R>>,
587
8
        update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
588
8
    ) -> Self {
589
        /// The maximum size of the upload completion notifier channel.
590
        ///
591
        /// The channel we use this for is a futures::mpsc channel, which has a capacity of
592
        /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
593
        /// each sender will send exactly one message.
594
        const UPLOAD_CHAN_BUF_SIZE: usize = 0;
595

            
596
        // Internally-generated instructions, no need for mq.
597
8
        let (upload_task_complete_tx, upload_task_complete_rx) =
598
8
            mpsc_channel_no_memquota(UPLOAD_CHAN_BUF_SIZE);
599

            
600
8
        let (publish_status_tx, publish_status_rx) = watch::channel();
601
        // Setting the buffer size to zero here is OK,
602
        // since we never actually send anything on this channel.
603
8
        let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
604

            
605
8
        let authorized_clients =
606
8
            Self::read_authorized_clients(&config.restricted_discovery, &path_resolver);
607

            
608
        // Create a channel for watching for changes in the configured
609
        // restricted_discovery.key_dirs.
610
8
        let (key_dirs_tx, key_dirs_rx) = file_watcher::channel();
611

            
612
8
        let imm = Immutable {
613
8
            runtime,
614
8
            mockable,
615
8
            nickname,
616
8
            keymgr,
617
8
            status_tx,
618
8
            pow_manager,
619
8
        };
620

            
621
8
        let inner = Inner {
622
8
            time_periods: vec![],
623
8
            config: Arc::new(config.into()),
624
8
            file_watcher: None,
625
8
            netdir: None,
626
8
            last_uploaded: None,
627
8
            reupload_timers: Default::default(),
628
8
            authorized_clients,
629
8
        };
630

            
631
8
        Self {
632
8
            imm: Arc::new(imm),
633
8
            inner: Arc::new(Mutex::new(inner)),
634
8
            dir_provider,
635
8
            ipt_watcher,
636
8
            config_rx,
637
8
            key_dirs_rx,
638
8
            key_dirs_tx,
639
8
            publish_status_rx,
640
8
            publish_status_tx,
641
8
            upload_task_complete_rx,
642
8
            upload_task_complete_tx,
643
8
            shutdown_tx,
644
8
            path_resolver,
645
8
            update_from_pow_manager_rx,
646
8
        }
647
8
    }
648

            
649
    /// Start the reactor.
650
    ///
651
    /// Under normal circumstances, this function runs indefinitely.
652
    ///
653
    /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
654
    /// upload fails or is rate-limited.
655
8
    pub(super) async fn run(mut self) -> Result<(), FatalError> {
656
8
        debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
657

            
658
        {
659
8
            let netdir = self
660
8
                .dir_provider
661
8
                .wait_for_netdir(Timeliness::Timely)
662
8
                .await?;
663
8
            let time_periods = self.compute_time_periods(&netdir, &[])?;
664

            
665
8
            let mut inner = self.inner.lock().expect("poisoned lock");
666

            
667
8
            inner.netdir = Some(netdir);
668
8
            inner.time_periods = time_periods;
669
        }
670

            
671
        // Create the initial key_dirs watcher.
672
8
        self.update_file_watcher();
673

            
674
        loop {
675
100
            match self.run_once().await {
676
92
                Ok(ShutdownStatus::Continue) => continue,
677
                Ok(ShutdownStatus::Terminate) => {
678
                    debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
679

            
680
                    self.imm.status_tx.send_shutdown();
681
                    return Ok(());
682
                }
683
                Err(e) => {
684
                    error_report!(
685
                        e,
686
                        "HS service {}: descriptor publisher crashed!",
687
                        self.imm.nickname
688
                    );
689

            
690
                    self.imm.status_tx.send_broken(e.clone());
691

            
692
                    return Err(e);
693
                }
694
            }
695
        }
696
    }
697

            
698
    /// Run one iteration of the reactor loop.
699
    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
700
100
    async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
701
100
        let mut netdir_events = self.dir_provider.events();
702

            
703
        // Note: TrackingNow tracks the values it is compared with.
704
        // This is equivalent to sleeping for (until - now) units of time,
705
100
        let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
706
100
        if let PublishStatus::RateLimited(until) = self.status() {
707
            if upload_rate_lim > until {
708
                // We are no longer rate-limited
709
                self.expire_rate_limit().await?;
710
            }
711
100
        }
712

            
713
100
        let reupload_tracking = TrackingNow::now(&self.imm.runtime);
714
100
        let mut reupload_periods = vec![];
715
        {
716
100
            let mut inner = self.inner.lock().expect("poisoned lock");
717
100
            let inner = &mut *inner;
718
110
            while let Some(reupload) = inner.reupload_timers.peek().copied() {
719
                // First, extract all the timeouts that already elapsed.
720
24
                if reupload.when <= reupload_tracking {
721
10
                    inner.reupload_timers.pop();
722
10
                    reupload_periods.push(reupload.period);
723
10
                } else {
724
                    // We are not ready to schedule any more reuploads.
725
                    //
726
                    // How much we need to sleep is implicitly
727
                    // tracked in reupload_tracking (through
728
                    // the TrackingNow implementation)
729
14
                    break;
730
                }
731
            }
732
        }
733

            
734
        // Check if it's time to schedule any reuploads.
735
110
        for period in reupload_periods {
736
10
            if self.mark_dirty(&period) {
737
10
                debug!(
738
                    time_period=?period,
739
                    "descriptor reupload timer elapsed; scheduling reupload",
740
                );
741
10
                self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
742
10
                    .await?;
743
            }
744
        }
745

            
746
100
        select_biased! {
747
100
            res = self.upload_task_complete_rx.next().fuse() => {
748
14
                let Some(upload_res) = res else {
749
                    return Ok(ShutdownStatus::Terminate);
750
                };
751

            
752
14
                self.handle_upload_results(upload_res);
753
14
                self.upload_result_to_svc_status()?;
754
            },
755
100
            () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
756
                self.expire_rate_limit().await?;
757
            },
758
100
            () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
759
                // Run another iteration, executing run_once again. This time, we will remove the
760
                // expired reupload from self.reupload_timers, mark the descriptor dirty for all
761
                // relevant HsDirs, and schedule the upload by setting our status to
762
                // UploadScheduled.
763
10
                return Ok(ShutdownStatus::Continue);
764
            },
765
100
            netdir_event = netdir_events.next().fuse() => {
766
                let Some(netdir_event) = netdir_event else {
767
                    debug!("netdir event stream ended");
768
                    return Ok(ShutdownStatus::Terminate);
769
                };
770

            
771
                if !matches!(netdir_event, DirEvent::NewConsensus) {
772
                    return Ok(ShutdownStatus::Continue);
773
                };
774

            
775
                // The consensus changed. Grab a new NetDir.
776
                let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
777
                    Ok(y) => y,
778
                    Err(e) => {
779
                        error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
780
                        // Hopefully a netdir will appear in the future.
781
                        // in the meantime, suspend operations.
782
                        //
783
                        // TODO (#1218): there is a bug here: we stop reading on our inputs
784
                        // including eg publish_status_rx, but it is our job to log some of
785
                        // these things.  While we are waiting for a netdir, all those messages
786
                        // are "stuck"; they'll appear later, with misleading timestamps.
787
                        //
788
                        // Probably this should be fixed by moving the logging
789
                        // out of the reactor, where it won't be blocked.
790
                        self.dir_provider.wait_for_netdir(Timeliness::Timely)
791
                            .await?
792
                    }
793
                };
794
                let relevant_periods = netdir.hs_all_time_periods();
795
                self.handle_consensus_change(netdir).await?;
796
                expire_publisher_keys(
797
                    &self.imm.keymgr,
798
                    &self.imm.nickname,
799
                    &relevant_periods,
800
                ).unwrap_or_else(|e| {
801
                    error_report!(e, "failed to remove expired keys");
802
                });
803
            }
804
100
            update = self.ipt_watcher.await_update().fuse() => {
805
8
                if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
806
                    return Ok(ShutdownStatus::Terminate);
807
8
                }
808
            },
809
100
            config = self.config_rx.next().fuse() => {
810
8
                let Some(config) = config else {
811
                    return Ok(ShutdownStatus::Terminate);
812
                };
813

            
814
8
                self.handle_svc_config_change(&config).await?;
815
            },
816
100
            res = self.key_dirs_rx.next().fuse() => {
817
8
                let Some(event) = res else {
818
                    return Ok(ShutdownStatus::Terminate);
819
                };
820

            
821
8
                while let Some(_ignore) = self.key_dirs_rx.try_recv() {
822
                    // Discard other events, so that we only reload once.
823
                }
824

            
825
8
                self.handle_key_dirs_change(event).await?;
826
            }
827
100
            should_upload = self.publish_status_rx.next().fuse() => {
828
44
                let Some(should_upload) = should_upload else {
829
                    return Ok(ShutdownStatus::Terminate);
830
                };
831

            
832
                // Our PublishStatus changed -- are we ready to publish?
833
44
                if should_upload == PublishStatus::UploadScheduled {
834
18
                    self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
835
18
                    self.upload_all().await?;
836
26
                }
837
            }
838
100
            update_tp_pow_seed = self.update_from_pow_manager_rx.next().fuse() => {
839
                debug!("Update PoW seed for TP!");
840
                let Some(time_period) = update_tp_pow_seed else {
841
                    return Ok(ShutdownStatus::Terminate);
842
                };
843
                self.mark_dirty(&time_period);
844
                self.upload_all().await?;
845
            }
846
        }
847

            
848
82
        Ok(ShutdownStatus::Continue)
849
92
    }
850

            
851
    /// Returns the current status of the publisher
852
172
    fn status(&self) -> PublishStatus {
853
172
        *self.publish_status_rx.borrow()
854
172
    }
855

            
856
    /// Handle a batch of upload outcomes,
857
    /// possibly updating the status of the descriptor for the corresponding HSDirs.
858
14
    fn handle_upload_results(&self, results: TimePeriodUploadResult) {
859
14
        let mut inner = self.inner.lock().expect("poisoned lock");
860
14
        let inner = &mut *inner;
861

            
862
        // Check which time period these uploads pertain to.
863
14
        let period = inner
864
14
            .time_periods
865
14
            .iter_mut()
866
14
            .find(|ctx| ctx.params.time_period() == results.time_period);
867

            
868
14
        let Some(period) = period else {
869
            // The uploads were for a time period that is no longer relevant, so we
870
            // can ignore the result.
871
            return;
872
        };
873

            
874
        // We will need to reupload this descriptor at at some point, so we pick
875
        // a random time between 60 minutes and 120 minutes in the future.
876
        //
877
        // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
878
14
        let mut rng = self.imm.mockable.thread_rng();
879
        // TODO SPEC: Control republish period using a consensus parameter?
880
14
        let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
881
14
        let duration = Duration::from_secs(minutes * 60);
882
14
        let reupload_when = self.imm.runtime.now() + duration;
883
14
        let time_period = period.params.time_period();
884

            
885
14
        info!(
886
            time_period=?time_period,
887
            "reuploading descriptor in {}",
888
            humantime::format_duration(duration),
889
        );
890

            
891
14
        inner.reupload_timers.push(ReuploadTimer {
892
14
            period: time_period,
893
14
            when: reupload_when,
894
14
        });
895

            
896
14
        let mut upload_results = vec![];
897
126
        for upload_res in results.hsdir_result {
898
112
            let relay = period
899
112
                .hs_dirs
900
112
                .iter_mut()
901
504
                .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
902

            
903
112
            let Some((_relay, status)): Option<&mut (RelayIds, _)> = relay else {
904
                // This HSDir went away, so the result doesn't matter.
905
                // Continue processing the rest of the results
906
                continue;
907
            };
908

            
909
112
            if upload_res.upload_res.is_ok() {
910
112
                let update_last_successful = match period.last_successful {
911
4
                    None => true,
912
108
                    Some(counter) => counter <= upload_res.revision_counter,
913
                };
914

            
915
112
                if update_last_successful {
916
112
                    period.last_successful = Some(upload_res.revision_counter);
917
112
                    // TODO (#1098): Is it possible that this won't update the statuses promptly
918
112
                    // enough. For example, it's possible for the reactor to see a Dirty descriptor
919
112
                    // and start an upload task for a descriptor has already been uploaded (or is
920
112
                    // being uploaded) in another task, but whose upload results have not yet been
921
112
                    // processed.
922
112
                    //
923
112
                    // This is probably made worse by the fact that the statuses are updated in
924
112
                    // batches (grouped by time period), rather than one by one as the upload tasks
925
112
                    // complete (updating the status involves locking the inner mutex, and I wanted
926
112
                    // to minimize the locking/unlocking overheads). I'm not sure handling the
927
112
                    // updates in batches was the correct decision here.
928
112
                    *status = DescriptorStatus::Clean;
929
112
                }
930
            }
931

            
932
112
            upload_results.push(upload_res);
933
        }
934

            
935
14
        period.set_upload_results(upload_results);
936
14
    }
937

            
938
    /// Maybe update our list of HsDirs.
939
    async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
940
        trace!("the consensus has changed; recomputing HSDirs");
941

            
942
        let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
943

            
944
        self.recompute_hs_dirs()?;
945
        self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
946
            .await?;
947

            
948
        // If the time period has changed, some of our upload results may now be irrelevant,
949
        // so we might need to update our status (for example, if our uploads are
950
        // for a no-longer-relevant time period, it means we might be able to update
951
        // out status from "degraded" to "running")
952
        self.upload_result_to_svc_status()?;
953

            
954
        Ok(())
955
    }
956

            
957
    /// Recompute the HsDirs for all relevant time periods.
958
    fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
959
        let mut inner = self.inner.lock().expect("poisoned lock");
960
        let inner = &mut *inner;
961

            
962
        let netdir = Arc::clone(
963
            inner
964
                .netdir
965
                .as_ref()
966
                .ok_or_else(|| internal!("started upload task without a netdir"))?,
967
        );
968

            
969
        // Update our list of relevant time periods.
970
        let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
971
        inner.time_periods = new_time_periods;
972

            
973
        Ok(())
974
    }
975

            
976
    /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
977
    ///
978
    /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
979
    /// HsDirs where possible.
980
8
    fn compute_time_periods(
981
8
        &self,
982
8
        netdir: &Arc<NetDir>,
983
8
        time_periods: &[TimePeriodContext],
984
8
    ) -> Result<Vec<TimePeriodContext>, FatalError> {
985
8
        netdir
986
8
            .hs_all_time_periods()
987
8
            .iter()
988
8
            .map(|params| {
989
8
                let period = params.time_period();
990
8
                let blind_id_kp =
991
8
                    read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
992
                        // Note: for now, read_blind_id_keypair cannot return Ok(None).
993
                        // It's supposed to return Ok(None) if we're in offline hsid mode,
994
                        // but that might change when we do #1194
995
8
                        .ok_or_else(|| internal!("offline hsid mode not supported"))?;
996

            
997
8
                let blind_id: HsBlindIdKey = (&blind_id_kp).into();
998

            
999
                // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
                // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
                // publishing the descriptor to the HsDirs that already have it (the ones that are
                // marked with DescriptorStatus::Clean).
                //
                // In other words, we only want to publish to those HsDirs that
                //   * are part of a new time period (which we have never published the descriptor
                //   for), or
                //   * have just been added to the ring of a time period we already knew about
8
                if let Some(ctx) = time_periods
8
                    .iter()
8
                    .find(|ctx| ctx.params.time_period() == period)
                {
                    TimePeriodContext::new(
                        params.clone(),
                        blind_id.into(),
                        netdir,
                        ctx.hs_dirs.iter(),
                        ctx.upload_results.clone(),
                    )
                } else {
                    // Passing an empty iterator here means all HsDirs in this TimePeriodContext
                    // will be marked as dirty, meaning we will need to upload our descriptor to them.
8
                    TimePeriodContext::new(
8
                        params.clone(),
8
                        blind_id.into(),
8
                        netdir,
8
                        iter::empty(),
8
                        vec![],
                    )
                }
8
            })
8
            .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
8
    }
    /// Replace the old netdir with the new, returning the old.
    fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
        self.inner
            .lock()
            .expect("poisoned lock")
            .netdir
            .replace(new_netdir)
    }
    /// Replace our view of the service config with `new_config` if `new_config` contains changes
    /// that would cause us to generate a new descriptor.
8
    fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfigPublisherView>) -> bool {
8
        let mut inner = self.inner.lock().expect("poisoned lock");
8
        let old_config = &mut inner.config;
        // The fields we're interested in haven't changed, so there's no need to update
        // `inner.config`.
8
        if *old_config == new_config {
8
            return false;
        }
        let log_change = match (
            old_config.restricted_discovery.enabled,
            new_config.restricted_discovery.enabled,
        ) {
            (true, false) => Some("Disabling restricted discovery mode"),
            (false, true) => Some("Enabling restricted discovery mode"),
            _ => None,
        };
        if let Some(msg) = log_change {
            info!(nickname=%self.imm.nickname, "{}", msg);
        }
        let _old: Arc<OnionServiceConfigPublisherView> = std::mem::replace(old_config, new_config);
        true
8
    }
    /// Recreate the FileWatcher for watching the restricted discovery key_dirs.
16
    fn update_file_watcher(&self) {
16
        let mut inner = self.inner.lock().expect("poisoned lock");
16
        if inner.config.restricted_discovery.watch_configuration() {
            debug!("The restricted_discovery.key_dirs have changed, updating file watcher");
            let mut watcher = FileWatcher::builder(self.imm.runtime.clone());
            let dirs = inner.config.restricted_discovery.key_dirs().clone();
            watch_dirs(&mut watcher, &dirs, &self.path_resolver);
            let watcher = watcher
                .start_watching(self.key_dirs_tx.clone())
                .map_err(|e| {
                    // TODO: update the publish status (see also the module-level TODO about this).
                    error_report!(e, "Cannot set file watcher");
                })
                .ok();
            inner.file_watcher = watcher;
        } else {
16
            if inner.file_watcher.is_some() {
                debug!("removing key_dirs watcher");
16
            }
16
            inner.file_watcher = None;
        }
16
    }
    /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
    /// uploading.
8
    fn note_ipt_change(&self) -> PublishStatus {
8
        let mut ipts = self.ipt_watcher.borrow_for_publish();
8
        match ipts.ipts.as_mut() {
8
            Some(_ipts) => PublishStatus::UploadScheduled,
            None => PublishStatus::AwaitingIpts,
        }
8
    }
    /// Update our list of introduction points.
8
    async fn handle_ipt_change(
8
        &mut self,
8
        update: Option<Result<(), crate::FatalError>>,
8
    ) -> Result<ShutdownStatus, FatalError> {
8
        trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
8
        match update {
            Some(Ok(())) => {
8
                let should_upload = self.note_ipt_change();
8
                debug!(nickname=%self.imm.nickname, "the introduction points have changed");
8
                self.mark_all_dirty();
8
                self.update_publish_status_unless_rate_lim(should_upload)
8
                    .await?;
8
                Ok(ShutdownStatus::Continue)
            }
            Some(Err(e)) => Err(e),
            None => {
                debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
                Ok(ShutdownStatus::Terminate)
            }
        }
8
    }
    /// Update the `PublishStatus` of the reactor with `new_state`,
    /// unless the current state is `AwaitingIpts`.
18
    async fn update_publish_status_unless_waiting(
18
        &mut self,
18
        new_state: PublishStatus,
18
    ) -> Result<(), FatalError> {
        // Only update the state if we're not waiting for intro points.
18
        if self.status() != PublishStatus::AwaitingIpts {
18
            self.update_publish_status(new_state).await?;
        }
18
        Ok(())
18
    }
    /// Update the `PublishStatus` of the reactor with `new_state`,
    /// unless the current state is `RateLimited`.
18
    async fn update_publish_status_unless_rate_lim(
18
        &mut self,
18
        new_state: PublishStatus,
18
    ) -> Result<(), FatalError> {
        // We can't exit this state until the rate-limit expires.
18
        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
18
            self.update_publish_status(new_state).await?;
        }
18
        Ok(())
18
    }
    /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
36
    async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), Bug> {
36
        let onion_status = match new_state {
18
            PublishStatus::Idle => None,
            PublishStatus::UploadScheduled
            | PublishStatus::AwaitingIpts
18
            | PublishStatus::RateLimited(_) => Some(State::Bootstrapping),
        };
36
        if let Some(onion_status) = onion_status {
18
            self.imm.status_tx.send(onion_status, None);
18
        }
36
        trace!(
            "publisher reactor status change: {:?} -> {:?}",
            self.status(),
            new_state
        );
36
        self.publish_status_tx.send(new_state).await.map_err(
            |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
        )?;
36
        Ok(())
36
    }
    /// Update the onion svc status based on the results of the last descriptor uploads.
14
    fn upload_result_to_svc_status(&self) -> Result<(), FatalError> {
14
        let inner = self.inner.lock().expect("poisoned lock");
14
        let netdir = inner
14
            .netdir
14
            .as_ref()
14
            .ok_or_else(|| internal!("handling upload results without netdir?!"))?;
14
        let (state, err) = upload_result_state(netdir, &inner.time_periods);
14
        self.imm.status_tx.send(state, err);
14
        Ok(())
14
    }
    /// Update the descriptors based on the config change.
8
    async fn handle_svc_config_change(
8
        &mut self,
8
        config: &OnionServiceConfig,
8
    ) -> Result<(), FatalError> {
8
        let new_config = Arc::new(config.into());
8
        if self.replace_config_if_changed(Arc::clone(&new_config)) {
            self.update_file_watcher();
            self.update_authorized_clients_if_changed();
            info!(nickname=%self.imm.nickname, "Config has changed, generating a new descriptor");
            self.mark_all_dirty();
            // Schedule an upload, unless we're still waiting for IPTs.
            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
                .await?;
8
        }
8
        Ok(())
8
    }
    /// Update the descriptors based on a restricted discovery key_dirs change.
    ///
    /// If the authorized clients from the [`RestrictedDiscoveryConfig`] have changed,
    /// this marks the descriptor as dirty for all time periods,
    /// and schedules a reupload.
8
    async fn handle_key_dirs_change(&mut self, event: FileEvent) -> Result<(), FatalError> {
8
        debug!("The configured key_dirs have changed");
8
        match event {
8
            FileEvent::Rescan | FileEvent::FileChanged => {
8
                // These events are handled in the same way, by re-reading the keys from disk
8
                // and republishing the descriptor if necessary
8
            }
            _ => return Err(internal!("file watcher event {event:?}").into()),
        };
        // Update the file watcher, in case the change was triggered by a key_dir move.
8
        self.update_file_watcher();
8
        if self.update_authorized_clients_if_changed() {
            self.mark_all_dirty();
            // Schedule an upload, unless we're still waiting for IPTs.
            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
                .await?;
8
        }
8
        Ok(())
8
    }
    /// Recreate the authorized_clients based on the current config.
    ///
    /// Returns `true` if the authorized clients have changed.
8
    fn update_authorized_clients_if_changed(&mut self) -> bool {
8
        let mut inner = self.inner.lock().expect("poisoned lock");
8
        let authorized_clients =
8
            Self::read_authorized_clients(&inner.config.restricted_discovery, &self.path_resolver);
8
        let clients = &mut inner.authorized_clients;
8
        let changed = clients.as_ref() != authorized_clients.as_ref();
8
        if changed {
            info!("The restricted discovery mode authorized clients have changed");
            *clients = authorized_clients;
8
        }
8
        changed
8
    }
    /// Read the authorized `RestrictedDiscoveryKeys` from `config`.
16
    fn read_authorized_clients(
16
        config: &RestrictedDiscoveryConfig,
16
        path_resolver: &CfgPathResolver,
16
    ) -> Option<Arc<RestrictedDiscoveryKeys>> {
16
        let authorized_clients = config.read_keys(path_resolver);
16
        if matches!(authorized_clients.as_ref(), Some(c) if c.is_empty()) {
            warn!(
                "Running in restricted discovery mode, but we have no authorized clients. Service will be unreachable"
            );
16
        }
16
        authorized_clients.map(Arc::new)
16
    }
    /// Mark the descriptor dirty for all time periods.
8
    fn mark_all_dirty(&self) {
8
        trace!("marking the descriptor dirty for all time periods");
8
        self.inner
8
            .lock()
8
            .expect("poisoned lock")
8
            .time_periods
8
            .iter_mut()
8
            .for_each(|tp| tp.mark_all_dirty());
8
    }
    /// Mark the descriptor dirty for the specified time period.
    ///
    /// Returns `true` if the specified period is still relevant, and `false` otherwise.
10
    fn mark_dirty(&self, period: &TimePeriod) -> bool {
10
        let mut inner = self.inner.lock().expect("poisoned lock");
10
        let period_ctx = inner
10
            .time_periods
10
            .iter_mut()
10
            .find(|tp| tp.params.time_period() == *period);
10
        match period_ctx {
10
            Some(ctx) => {
10
                trace!(time_period=?period, "marking the descriptor dirty");
10
                ctx.mark_all_dirty();
10
                true
            }
            None => false,
        }
10
    }
    /// Try to upload our descriptor to the HsDirs that need it.
    ///
    /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
    /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
    ///
    /// Failed uploads are retried
    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
    ///
    /// If restricted discovery mode is enabled and there are no authorized clients,
    /// we abort the upload and set our status to [`State::Broken`].
    //
    // Note: a broken restricted discovery config won't prevent future uploads from being scheduled
    // (for example if the IPTs change),
    // which can can cause the publisher's status to oscillate between `Bootstrapping` and `Broken`.
    // TODO: we might wish to refactor the publisher to be more sophisticated about this.
    //
    /// For each current time period, we spawn a task that uploads the descriptor to
    /// all the HsDirs on the HsDir ring of that time period.
    /// Each task shuts down on completion, or when the reactor is dropped.
    ///
    /// Each task reports its upload results (`TimePeriodUploadResult`)
    /// via the `upload_task_complete_tx` channel.
    /// The results are received and processed in the main loop of the reactor.
    ///
    /// Returns an error if it fails to spawn a task, or if an internal error occurs.
    #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor
18
    async fn upload_all(&mut self) -> Result<(), FatalError> {
18
        trace!("starting descriptor upload task...");
        // Abort the upload entirely if we have an empty list of authorized clients
18
        let authorized_clients = match self.authorized_clients() {
18
            Ok(authorized_clients) => authorized_clients,
            Err(e) => {
                error_report!(e, "aborting upload");
                self.imm.status_tx.send_broken(e.clone());
                // Returning an error would shut down the reactor, so we have to return Ok here.
                return Ok(());
            }
        };
18
        let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
18
        let now = self.imm.runtime.now();
        // Check if we should rate-limit this upload.
18
        if let Some(ts) = last_uploaded {
10
            let duration_since_upload = now.duration_since(ts);
10
            if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
                return Ok(self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await?);
10
            }
8
        }
18
        let mut inner = self.inner.lock().expect("poisoned lock");
18
        let inner = &mut *inner;
18
        let _ = inner.last_uploaded.insert(now);
18
        for period_ctx in inner.time_periods.iter_mut() {
18
            let upload_task_complete_tx = self.upload_task_complete_tx.clone();
            // Figure out which HsDirs we need to upload the descriptor to (some of them might already
            // have our latest descriptor, so we filter them out).
18
            let hs_dirs = period_ctx
18
                .hs_dirs
18
                .iter()
144
                .filter_map(|(relay_id, status)| {
144
                    if *status == DescriptorStatus::Dirty {
144
                        Some(relay_id.clone())
                    } else {
                        None
                    }
144
                })
18
                .collect::<Vec<_>>();
18
            if hs_dirs.is_empty() {
                trace!("the descriptor is clean for all HSDirs. Nothing to do");
                return Ok(());
18
            }
18
            let time_period = period_ctx.params.time_period();
            // This scope exists because rng is not Send, so it needs to fall out of scope before we
            // await anything.
18
            let netdir = Arc::clone(
18
                inner
18
                    .netdir
18
                    .as_ref()
18
                    .ok_or_else(|| internal!("started upload task without a netdir"))?,
            );
18
            let imm = Arc::clone(&self.imm);
18
            let ipt_upload_view = self.ipt_watcher.upload_view();
18
            let config = Arc::clone(&inner.config);
18
            let authorized_clients = authorized_clients.clone();
18
            trace!(nickname=%self.imm.nickname, time_period=?time_period,
                "spawning upload task"
            );
18
            let params = period_ctx.params.clone();
18
            let shutdown_rx = self.shutdown_tx.subscribe();
            // Spawn a task to upload the descriptor to all HsDirs of this time period.
            //
            // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
            // dropped).
18
            let _handle: () = self
18
                .imm
18
                .runtime
18
                .spawn(async move {
18
                    if let Err(e) = Self::upload_for_time_period(
18
                        hs_dirs,
18
                        &netdir,
18
                        config,
18
                        params,
18
                        Arc::clone(&imm),
18
                        ipt_upload_view.clone(),
18
                        authorized_clients.clone(),
18
                        upload_task_complete_tx,
18
                        shutdown_rx,
                    )
18
                    .await
                    {
                        error_report!(
                            e,
                            "descriptor upload failed for HS service {} and time period {:?}",
                            imm.nickname,
                            time_period
                        );
14
                    }
14
                })
18
                .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
        }
18
        Ok(())
18
    }
    /// Upload the descriptor for the time period specified in `params`.
    ///
    /// Failed uploads are retried
    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
    #[allow(clippy::too_many_arguments)] // TODO: refactor
    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
18
    async fn upload_for_time_period(
18
        hs_dirs: Vec<RelayIds>,
18
        netdir: &Arc<NetDir>,
18
        config: Arc<OnionServiceConfigPublisherView>,
18
        params: HsDirParams,
18
        imm: Arc<Immutable<R, M>>,
18
        ipt_upload_view: IptsPublisherUploadView,
18
        authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
18
        mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
18
        shutdown_rx: broadcast::Receiver<Void>,
18
    ) -> Result<(), FatalError> {
18
        let time_period = params.time_period();
18
        trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
18
        let hsdir_count = hs_dirs.len();
        /// An error returned from an upload future.
        //
        // Exhaustive, because this is a private type.
        #[derive(Clone, Debug, thiserror::Error)]
        enum PublishError {
            /// The upload was aborted because there are no IPTs.
            ///
            /// This happens because of an inevitable TOCTOU race, where after being notified by
            /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
            /// we find out there actually are no IPTs, so we can't build the descriptor.
            ///
            /// This is a special kind of error that interrupts the current upload task, and is
            /// logged at `debug!` level rather than `warn!` or `error!`.
            ///
            /// Ideally, this shouldn't happen very often (if at all).
            #[error("No IPTs")]
            NoIpts,
            /// The reactor has shut down
            #[error("The reactor has shut down")]
            Shutdown,
            /// An fatal error.
            #[error("{0}")]
            Fatal(#[from] FatalError),
        }
18
        let max_hsdesc_len: usize = netdir
18
            .params()
18
            .hsdir_max_desc_size
18
            .try_into()
18
            .expect("Unable to convert positive int32 to usize!?");
18
        let upload_results = futures::stream::iter(hs_dirs)
144
            .map(|relay_ids| {
144
                let netdir = netdir.clone();
144
                let config = Arc::clone(&config);
144
                let imm = Arc::clone(&imm);
144
                let ipt_upload_view = ipt_upload_view.clone();
144
                let authorized_clients = authorized_clients.clone();
144
                let params = params.clone();
144
                let mut shutdown_rx = shutdown_rx.clone();
144
                let ed_id = relay_ids
144
                    .rsa_identity()
144
                    .map(|id| id.to_string())
144
                    .unwrap_or_else(|| "unknown".into());
144
                let rsa_id = relay_ids
144
                    .rsa_identity()
144
                    .map(|id| id.to_string())
144
                    .unwrap_or_else(|| "unknown".into());
144
                async move {
144
                    let run_upload = |desc| async {
144
                        let Some(hsdir) = netdir.by_ids(&relay_ids) else {
                            // This should never happen (all of our relay_ids are from the stored
                            // netdir).
                            let err =
                                "tried to upload descriptor to relay not found in consensus?!";
                            warn!(
                                nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
                                "{err}"
                            );
                            return Err(internal!("{err}").into());
                        };
144
                        Self::upload_descriptor_with_retries(
144
                            desc,
144
                            &netdir,
144
                            &hsdir,
144
                            &ed_id,
144
                            &rsa_id,
144
                            Arc::clone(&imm),
144
                        )
144
                        .await
256
                    };
                    // How long until we're supposed to time out?
144
                    let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
                    // We generate a new descriptor before _each_ HsDir upload. This means each
                    // HsDir could, in theory, receive a different descriptor (not just in terms of
                    // revision-counters, but also with a different set of IPTs). It may seem like
                    // this could lead to some HsDirs being left with an outdated descriptor, but
                    // that's not the case: after the upload completes, the publisher will be
                    // notified by the ipt_watcher of the IPT change event (if there was one to
                    // begin with), which will trigger another upload job.
144
                    let hsdesc = {
                        // This scope is needed because the ipt_set MutexGuard is not Send, so it
                        // needs to fall out of scope before the await point below
144
                        let mut ipt_set = ipt_upload_view.borrow_for_publish();
                        // If there are no IPTs, we abort the upload. At this point, we might have
                        // uploaded the descriptor to some, but not all, HSDirs from the specified
                        // time period.
                        //
                        // Returning an error here means the upload completion task is never
                        // notified of the outcome of any of these uploads (which means the
                        // descriptor is not marked clean). This is OK, because if we suddenly find
                        // out we have no IPTs, it means our built `hsdesc` has an outdated set of
                        // IPTs, so we need to go back to the main loop to wait for IPT changes,
                        // and generate a fresh descriptor anyway.
                        //
                        // Ideally, this shouldn't happen very often (if at all).
144
                        let Some(ipts) = ipt_set.ipts.as_mut() else {
                            return Err(PublishError::NoIpts);
                        };
144
                        let hsdesc = {
144
                            trace!(
                                nickname=%imm.nickname, time_period=?time_period,
                                "building descriptor"
                            );
144
                            let mut rng = imm.mockable.thread_rng();
144
                            let mut key_rng = tor_llcrypto::rng::CautiousRng;
                            // We're about to generate a new version of the descriptor,
                            // so let's generate a new revision counter.
144
                            let now = imm.runtime.wallclock();
144
                            let revision_counter = imm.generate_revision_counter(&params, now)?;
144
                            build_sign(
144
                                &imm.keymgr,
144
                                &imm.pow_manager,
144
                                &config,
144
                                authorized_clients.as_deref(),
144
                                ipts,
144
                                time_period,
144
                                revision_counter,
144
                                &mut rng,
144
                                &mut key_rng,
144
                                imm.runtime.wallclock(),
144
                                max_hsdesc_len,
                            )?
                        };
                        if let Err(e) =
144
                            ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
                        {
                            let wait = e.log_retry_max(&imm.nickname)?;
                            // TODO (#1226): retry instead of this
                            return Err(FatalError::Bug(internal!(
                                "ought to retry after {wait:?}, crashing instead"
                            ))
                            .into());
144
                        }
144
                        hsdesc
                    };
                    let VersionedDescriptor {
144
                        desc,
144
                        revision_counter,
144
                    } = hsdesc;
144
                    trace!(
                        nickname=%imm.nickname, time_period=?time_period,
                        revision_counter=?revision_counter,
                        "generated new descriptor for time period",
                    );
                    // (Actually launch the upload attempt. No timeout is needed
                    // here, since the backoff::Runner code will handle that for us.)
144
                    let upload_res: UploadResult = select_biased! {
144
                        shutdown = shutdown_rx.next().fuse() => {
                            // This will always be None, since Void is uninhabited.
                            let _: Option<Void> = shutdown;
                            // It looks like the reactor has shut down,
                            // so there is no point in uploading the descriptor anymore.
                            //
                            // Let's shut down the upload task too.
                            trace!(
                                nickname=%imm.nickname, time_period=?time_period,
                                "upload task received shutdown signal"
                            );
                            return Err(PublishError::Shutdown);
                        },
144
                        res = run_upload(desc.clone()).fuse() => res,
                    };
                    // Note: UploadResult::Failure is only returned when
                    // upload_descriptor_with_retries fails, i.e. if all our retry
                    // attempts have failed
112
                    Ok(HsDirUploadStatus {
112
                        relay_ids,
112
                        upload_res,
112
                        revision_counter,
112
                    })
112
                }
144
            })
            // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
18
            .boxed()
18
            .buffer_unordered(MAX_CONCURRENT_UPLOADS)
18
            .try_collect::<Vec<_>>()
18
            .await;
14
        let upload_results = match upload_results {
14
            Ok(v) => v,
            Err(PublishError::Fatal(e)) => return Err(e),
            Err(PublishError::NoIpts) => {
                debug!(
                    nickname=%imm.nickname, time_period=?time_period,
                     "no introduction points; skipping upload"
                );
                return Ok(());
            }
            Err(PublishError::Shutdown) => {
                debug!(
                    nickname=%imm.nickname, time_period=?time_period,
                     "the reactor has shut down; aborting upload"
                );
                return Ok(());
            }
        };
14
        let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
14
            .iter()
112
            .partition(|res| res.upload_res.is_ok());
14
        debug!(
            nickname=%imm.nickname, time_period=?time_period,
            "descriptor uploaded successfully to {}/{} HSDirs",
            succeeded.len(), hsdir_count
        );
14
        if upload_task_complete_tx
14
            .send(TimePeriodUploadResult {
14
                time_period,
14
                hsdir_result: upload_results,
14
            })
14
            .await
14
            .is_err()
        {
            return Err(internal!(
                "failed to notify reactor of upload completion (reactor shut down)"
            )
            .into());
14
        }
14
        Ok(())
14
    }
    /// Upload a descriptor to the specified HSDir.
    ///
    /// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
    /// to the caller to retry on failure.
    ///
    /// This function does not handle timeouts.
176
    async fn upload_descriptor(
176
        hsdesc: String,
176
        netdir: &Arc<NetDir>,
176
        hsdir: &Relay<'_>,
176
        imm: Arc<Immutable<R, M>>,
176
    ) -> Result<(), UploadError> {
176
        let request = HsDescUploadRequest::new(hsdesc);
176
        trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
            "starting descriptor upload",
        );
176
        let tunnel = imm
176
            .mockable
176
            .get_or_launch_hs_dir(netdir, OwnedCircTarget::from_circ_target(hsdir))
176
            .await?;
176
        let source: Option<SourceInfo> = tunnel
176
            .source_info()
176
            .map_err(into_internal!("Couldn't get SourceInfo for circuit"))?;
176
        let mut stream = tunnel
176
            .begin_dir_stream()
176
            .await
176
            .map_err(UploadError::Stream)?;
176
        let _response: String = send_request(&imm.runtime, &request, &mut stream, source)
176
            .await
176
            .map_err(|dir_error| -> UploadError {
32
                match dir_error {
32
                    DirClientError::RequestFailed(e) => e.into(),
                    DirClientError::CircMgr(e) => into_internal!(
                        "tor-dirclient complains about circmgr going wrong but we gave it a stream"
                    )(e)
                    .into(),
                    e => into_internal!("unexpected error")(e).into(),
                }
32
            })?
144
            .into_output_string()?; // This returns an error if we received an error response
112
        Ok(())
176
    }
    /// Upload a descriptor to the specified HSDir, retrying if appropriate.
    ///
    /// Any failed uploads are retried according to a [`PublisherBackoffSchedule`].
    /// Each failed upload is retried until it succeeds, or until the overall timeout specified
    /// by [`BackoffSchedule::overall_timeout`] elapses. Individual attempts are timed out
    /// according to the [`BackoffSchedule::single_attempt_timeout`].
    /// This function gives up after the overall timeout elapses,
    /// declaring the upload a failure, and never retrying it again.
    ///
    /// See also [`BackoffSchedule`].
144
    async fn upload_descriptor_with_retries(
144
        hsdesc: String,
144
        netdir: &Arc<NetDir>,
144
        hsdir: &Relay<'_>,
144
        ed_id: &str,
144
        rsa_id: &str,
144
        imm: Arc<Immutable<R, M>>,
144
    ) -> UploadResult {
        /// The base delay to use for the backoff schedule.
        const BASE_DELAY_MSEC: u32 = 1000;
144
        let schedule = PublisherBackoffSchedule {
144
            retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
144
            mockable: imm.mockable.clone(),
144
        };
144
        let runner = Runner::new(
144
            "upload a hidden service descriptor".into(),
144
            schedule.clone(),
144
            imm.runtime.clone(),
        );
176
        let fallible_op = || async {
176
            let r = Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm)).await;
176
            if let Err(e) = &r {
64
                if e.should_report_as_suspicious() {
                    // Note that not every protocol violation is suspicious:
                    // we only warn on the protocol violations that look like attempts
                    // to do a traffic tagging attack via hsdir inflation.
                    // (See proposal 360.)
                    warn_report!(
                        e,
                        "Suspicious error while uploading descriptor to {}/{}",
                        ed_id,
                        rsa_id
                    );
64
                }
112
            }
176
            r
352
        };
144
        let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
112
        match outcome {
            Ok(()) => {
112
                debug!(
                    nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
                    "successfully uploaded descriptor to HSDir",
                );
112
                Ok(())
            }
            Err(e) => {
                warn_report!(
                    e,
                    "failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
                    imm.nickname,
                    ed_id,
                    rsa_id
                );
                Err(e.into())
            }
        }
112
    }
    /// Stop publishing descriptors until the specified delay elapses.
    async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), Bug> {
        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
            debug!(
                "We are rate-limited for {}; pausing descriptor publication",
                humantime::format_duration(delay)
            );
            let until = self.imm.runtime.now() + delay;
            self.update_publish_status(PublishStatus::RateLimited(until))
                .await?;
        }
        Ok(())
    }
    /// Handle the upload rate-limit being lifted.
    async fn expire_rate_limit(&mut self) -> Result<(), Bug> {
        debug!("We are no longer rate-limited; resuming descriptor publication");
        self.update_publish_status(PublishStatus::UploadScheduled)
            .await?;
        Ok(())
    }
    /// Return the authorized clients, if restricted mode is enabled.
    ///
    /// Returns `Ok(None)` if restricted discovery mode is disabled.
    ///
    /// Returns an error if restricted discovery mode is enabled, but the client list is empty.
    #[cfg_attr(
        not(feature = "restricted-discovery"),
        allow(clippy::unnecessary_wraps)
    )]
18
    fn authorized_clients(&self) -> Result<Option<Arc<RestrictedDiscoveryKeys>>, FatalError> {
        cfg_if::cfg_if! {
            if #[cfg(feature = "restricted-discovery")] {
18
                let authorized_clients = self
18
                    .inner
18
                    .lock()
18
                    .expect("poisoned lock")
18
                    .authorized_clients
18
                    .clone();
18
                if authorized_clients.as_ref().as_ref().map(|v| v.is_empty()).unwrap_or_default() {
                    return Err(FatalError::RestrictedDiscoveryNoClients);
18
                }
18
                Ok(authorized_clients)
            } else {
                Ok(None)
            }
        }
18
    }
}
/// Try to expand a path, logging a warning on failure.
fn maybe_expand_path(p: &CfgPath, r: &CfgPathResolver) -> Option<PathBuf> {
    // map_err returns unit for clarity
    #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
    p.path(r)
        .map_err(|e| {
            tor_error::warn_report!(e, "invalid path");
            ()
        })
        .ok()
}
/// Add `path` to the specified `watcher`.
macro_rules! watch_path {
    ($watcher:expr, $path:expr, $watch_fn:ident, $($watch_fn_args:expr,)*) => {{
        if let Err(e) = $watcher.$watch_fn(&$path, $($watch_fn_args)*) {
            warn_report!(e, "failed to watch path {:?}", $path);
        } else {
            debug!("watching path {:?}", $path);
        }
    }}
}
/// Add the specified directories to the watcher.
#[allow(clippy::cognitive_complexity)]
fn watch_dirs<R: Runtime>(
    watcher: &mut FileWatcherBuilder<R>,
    dirs: &DirectoryKeyProviderList,
    path_resolver: &CfgPathResolver,
) {
    for path in dirs {
        let path = path.path();
        let Some(path) = maybe_expand_path(path, path_resolver) else {
            warn!("failed to expand key_dir path {:?}", path);
            continue;
        };
        // If the path doesn't exist, the notify watcher will return an error if we attempt to watch it,
        // so we skip over paths that don't exist at this time
        // (this obviously suffers from a TOCTOU race, but most of the time,
        // it is good enough at preventing the watcher from failing to watch.
        // If the race *does* happen it is not disastrous, i.e. the reactor won't crash,
        // but it will fail to set the watcher).
        if matches!(path.try_exists(), Ok(true)) {
            watch_path!(watcher, &path, watch_dir, "auth",);
        }
        // FileWatcher::watch_path causes the parent dir of the path to be watched.
        if matches!(path.parent().map(|p| p.try_exists()), Some(Ok(true))) {
            watch_path!(watcher, &path, watch_path,);
        }
    }
}
/// Try to read the blinded identity key for a given `TimePeriod`.
///
/// Returns `None` if the service is running in "offline" mode.
///
// TODO (#1194): we don't currently have support for "offline" mode so this can never return
// `Ok(None)`.
296
pub(super) fn read_blind_id_keypair(
296
    keymgr: &Arc<KeyMgr>,
296
    nickname: &HsNickname,
296
    period: TimePeriod,
296
) -> Result<Option<HsBlindIdKeypair>, FatalError> {
296
    let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
296
    let hsid_kp = keymgr
296
        .get::<HsIdKeypair>(&svc_key_spec)?
296
        .ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;
296
    let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);
    // TODO: make the keystore selector configurable
296
    let keystore_selector = Default::default();
296
    match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
296
        Some(kp) => Ok(Some(kp)),
        None => {
            let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
                .compute_blinded_key(period)
                .map_err(|_| internal!("failed to compute blinded key"))?;
            // Note: we can't use KeyMgr::generate because this key is derived from the HsId
            // (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
            // which assumes keys are randomly generated, rather than derived from existing keys).
            keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector, true)?;
            let arti_path = |spec: &dyn KeySpecifier| {
                spec.arti_path()
                    .map_err(into_internal!("invalid key specifier?!"))
            };
            Ok(Some(
                keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
                    FatalError::KeystoreRace {
                        action: "read",
                        path: arti_path(&blind_id_key_spec)?,
                    },
                )?,
            ))
        }
    }
296
}
/// Determine the [`State`] of the publisher based on the upload results
/// from the current `time_periods`.
30
fn upload_result_state(
30
    netdir: &NetDir,
30
    time_periods: &[TimePeriodContext],
30
) -> (State, Option<Problem>) {
30
    let current_period = netdir.hs_time_period();
30
    let current_period_res = time_periods
30
        .iter()
45
        .find(|ctx| ctx.params.time_period() == current_period);
30
    let succeeded_current_tp = current_period_res
30
        .iter()
30
        .flat_map(|res| &res.upload_results)
251
        .filter(|res| res.upload_res.is_ok())
30
        .collect_vec();
30
    let secondary_tp_res = time_periods
30
        .iter()
57
        .filter(|ctx| ctx.params.time_period() != current_period)
30
        .collect_vec();
30
    let succeeded_secondary_tp = secondary_tp_res
30
        .iter()
30
        .flat_map(|res| &res.upload_results)
115
        .filter(|res| res.upload_res.is_ok())
30
        .collect_vec();
    // All of the failed uploads (for all TPs)
30
    let failed = time_periods
30
        .iter()
30
        .flat_map(|res| &res.upload_results)
351
        .filter(|res| res.upload_res.is_err())
30
        .collect_vec();
30
    let problems: Vec<DescUploadRetryError> = failed
30
        .iter()
115
        .flat_map(|e| e.upload_res.as_ref().map_err(|e| e.clone()).err())
30
        .collect();
30
    let err = match problems.as_slice() {
30
        [_, ..] => Some(problems.into()),
22
        [] => None,
    };
30
    if time_periods.len() < 2 {
        // We need at least TP contexts (one for the primary TP,
        // and another for the secondary one).
        //
        // If either is missing, we are unreachable for some or all clients.
18
        return (State::DegradedUnreachable, err);
12
    }
12
    let state = match (
12
        succeeded_current_tp.as_slice(),
12
        succeeded_secondary_tp.as_slice(),
    ) {
12
        (&[], &[..]) | (&[..], &[]) if failed.is_empty() => {
            // We don't have any upload results for one or both TPs.
            // We are still bootstrapping.
6
            State::Bootstrapping
        }
4
        (&[_, ..], &[_, ..]) if failed.is_empty() => {
            // We have uploaded the descriptor to one or more HsDirs from both
            // HsDir rings (primary and secondary), and none of the uploads failed.
            // We are fully reachable.
2
            State::Running
        }
        (&[_, ..], &[_, ..]) => {
            // We have uploaded the descriptor to one or more HsDirs from both
            // HsDir rings (primary and secondary), but some of the uploads failed.
            // We are reachable, but we failed to upload the descriptor to all the HsDirs
            // that were supposed to have it.
2
            State::DegradedReachable
        }
        (&[..], &[]) | (&[], &[..]) => {
            // We have either
            //   * uploaded the descriptor to some of the HsDirs from one of the rings,
            //   but haven't managed to upload it to any of the HsDirs on the other ring, or
            //   * all of the uploads failed
            //
            // Either way, we are definitely not reachable by all clients.
2
            State::DegradedUnreachable
        }
    };
12
    (state, err)
30
}
/// Whether the reactor should initiate an upload.
#[derive(Copy, Clone, Debug, Default, PartialEq)]
enum PublishStatus {
    /// We need to call upload_all.
    UploadScheduled,
    /// We are rate-limited until the specified [`Instant`].
    ///
    /// We have tried to schedule multiple uploads in a short time span,
    /// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
    /// channel to unblock us.
    RateLimited(Instant),
    /// We are idle and waiting for external events.
    ///
    /// We have enough information to build the descriptor, but since we have already called
    /// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
    Idle,
    /// We are waiting for the IPT manager to establish some introduction points.
    ///
    /// No descriptors will be published until the `PublishStatus` of the reactor is changed to
    /// `UploadScheduled`.
    #[default]
    AwaitingIpts,
}
/// The backoff schedule for the task that publishes descriptors.
#[derive(Clone, Debug)]
struct PublisherBackoffSchedule<M: Mockable> {
    /// The delays
    retry_delay: RetryDelay,
    /// The mockable reactor state, needed for obtaining an rng.
    mockable: M,
}
impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
176
    fn max_retries(&self) -> Option<usize> {
176
        None
176
    }
144
    fn overall_timeout(&self) -> Option<Duration> {
144
        Some(OVERALL_UPLOAD_TIMEOUT)
144
    }
176
    fn single_attempt_timeout(&self) -> Option<Duration> {
176
        Some(self.mockable.estimate_upload_timeout())
176
    }
64
    fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
64
        Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
64
    }
}
impl RetriableError for UploadError {
128
    fn should_retry(&self) -> bool {
128
        match self {
128
            UploadError::Request(_) | UploadError::Circuit(_) | UploadError::Stream(_) => true,
            UploadError::Bug(_) => false,
        }
128
    }
}
/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
#[derive(Debug, Clone)]
struct TimePeriodUploadResult {
    /// The time period.
    time_period: TimePeriod,
    /// The upload results.
    hsdir_result: Vec<HsDirUploadStatus>,
}
/// The outcome of uploading a descriptor to a particular HsDir.
#[derive(Clone, Debug)]
struct HsDirUploadStatus {
    /// The identity of the HsDir we attempted to upload the descriptor to.
    relay_ids: RelayIds,
    /// The outcome of this attempt.
    upload_res: UploadResult,
    /// The revision counter of the descriptor we tried to upload.
    revision_counter: RevisionCounter,
}
/// The outcome of uploading a descriptor.
type UploadResult = Result<(), DescUploadRetryError>;
impl From<BackoffError<UploadError>> for DescUploadRetryError {
    fn from(e: BackoffError<UploadError>) -> Self {
        use BackoffError as BE;
        use DescUploadRetryError as DURE;
        match e {
            BE::FatalError(e) => DURE::FatalError(e),
            BE::MaxRetryCountExceeded(e) => DURE::MaxRetryCountExceeded(e),
            BE::Timeout(e) => DURE::Timeout(e),
            BE::ExplicitStop(_) => {
                DURE::Bug(internal!("explicit stop in publisher backoff schedule?!"))
            }
        }
    }
}
// NOTE: the rest of the publisher tests live in publish.rs
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_time_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use tor_netdir::testnet;
    /// Create a `TimePeriodContext` from the specified upload results.
    fn create_time_period_ctx(
        params: &HsDirParams,
        upload_results: Vec<HsDirUploadStatus>,
    ) -> TimePeriodContext {
        TimePeriodContext {
            params: params.clone(),
            hs_dirs: vec![],
            last_successful: None,
            upload_results,
        }
    }
    /// Create a single `HsDirUploadStatus`
    fn create_upload_status(upload_res: UploadResult) -> HsDirUploadStatus {
        HsDirUploadStatus {
            relay_ids: RelayIds::empty(),
            upload_res,
            revision_counter: RevisionCounter::from(13),
        }
    }
    /// Create a bunch of results, all with the specified `upload_res`.
    fn create_upload_results(upload_res: UploadResult) -> Vec<HsDirUploadStatus> {
        std::iter::repeat_with(|| create_upload_status(upload_res.clone()))
            .take(10)
            .collect()
    }
    fn construct_netdir() -> NetDir {
        const SRV1: [u8; 32] = *b"The door refused to open.       ";
        const SRV2: [u8; 32] = *b"It said, 'Five cents, please.'  ";
        let dir = testnet::construct_custom_netdir(|_, _, bld| {
            bld.shared_rand_prev(7, SRV1.into(), None)
                .shared_rand_prev(7, SRV2.into(), None);
        })
        .unwrap();
        dir.unwrap_if_sufficient().unwrap()
    }
    #[test]
    fn upload_result_status_bootstrapping() {
        let netdir = construct_netdir();
        let all_params = netdir.hs_all_time_periods();
        let current_period = netdir.hs_time_period();
        let primary_params = all_params
            .iter()
            .find(|param| param.time_period() == current_period)
            .unwrap();
        let results = [
            (vec![], vec![]),
            (vec![], create_upload_results(Ok(()))),
            (create_upload_results(Ok(())), vec![]),
        ];
        for (primary_result, secondary_result) in results {
            let primary_ctx = create_time_period_ctx(primary_params, primary_result);
            let secondary_params = all_params
                .iter()
                .find(|param| param.time_period() != current_period)
                .unwrap();
            let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
            let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
            assert_eq!(status, State::Bootstrapping);
            assert!(err.is_none());
        }
    }
    #[test]
    fn upload_result_status_running() {
        let netdir = construct_netdir();
        let all_params = netdir.hs_all_time_periods();
        let current_period = netdir.hs_time_period();
        let primary_params = all_params
            .iter()
            .find(|param| param.time_period() == current_period)
            .unwrap();
        let secondary_result = create_upload_results(Ok(()));
        let secondary_params = all_params
            .iter()
            .find(|param| param.time_period() != current_period)
            .unwrap();
        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
        let primary_result = create_upload_results(Ok(()));
        let primary_ctx = create_time_period_ctx(primary_params, primary_result);
        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
        assert_eq!(status, State::Running);
        assert!(err.is_none());
    }
    #[test]
    fn upload_result_status_reachable() {
        let netdir = construct_netdir();
        let all_params = netdir.hs_all_time_periods();
        let current_period = netdir.hs_time_period();
        let primary_params = all_params
            .iter()
            .find(|param| param.time_period() == current_period)
            .unwrap();
        let primary_result = create_upload_results(Ok(()));
        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
        let failed_res = create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
        let secondary_result = create_upload_results(Ok(()))
            .into_iter()
            .chain(failed_res.iter().cloned())
            .collect();
        let secondary_params = all_params
            .iter()
            .find(|param| param.time_period() != current_period)
            .unwrap();
        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result);
        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
        // Degraded but reachable (because some of the secondary HsDir uploads failed).
        assert_eq!(status, State::DegradedReachable);
        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
    }
    #[test]
    fn upload_result_status_unreachable() {
        let netdir = construct_netdir();
        let all_params = netdir.hs_all_time_periods();
        let current_period = netdir.hs_time_period();
        let primary_params = all_params
            .iter()
            .find(|param| param.time_period() == current_period)
            .unwrap();
        let mut primary_result =
            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
        // No secondary TP (we are unreachable).
        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
        assert_eq!(status, State::DegradedUnreachable);
        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
        // Add a successful result
        primary_result.push(create_upload_status(Ok(())));
        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
        // Still degraded, and unreachable (because we don't have a TimePeriodContext
        // for the secondary TP)
        assert_eq!(status, State::DegradedUnreachable);
        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
        // If we add another time period where none of the uploads were successful,
        // we're *still* unreachable
        let secondary_result =
            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
        let secondary_params = all_params
            .iter()
            .find(|param| param.time_period() != current_period)
            .unwrap();
        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
        assert_eq!(status, State::DegradedUnreachable);
        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
    }
}