1
//! Code implementing version 1 proof-of-work for onion service hosts.
2
//!
3
//! Spec links:
4
//! * <https://spec.torproject.org/hspow-spec/common-protocol.html>
5
//! * <https://spec.torproject.org/hspow-spec/v1-equix.html>
6

            
7
use std::{
8
    collections::{BTreeSet, HashMap, VecDeque},
9
    sync::{Arc, Mutex, RwLock},
10
    task::Waker,
11
    time::{Duration, Instant, SystemTime},
12
};
13

            
14
use arrayvec::ArrayVec;
15
use equix::EquiXBuilder;
16
use futures::{SinkExt, StreamExt};
17
use futures::{Stream, channel::mpsc};
18
use num_traits::FromPrimitive;
19
use rand::{CryptoRng, RngCore};
20
use serde::{Deserialize, Serialize};
21
use thiserror::Error;
22
use tor_basic_utils::RngExt as _;
23
use tor_cell::relaycell::hs::pow::{ProofOfWork, v1::ProofOfWorkV1};
24
use tor_checkable::timed::TimerangeBound;
25
use tor_error::warn_report;
26
use tor_hscrypto::{
27
    pk::HsBlindIdKey,
28
    pow::v1::{
29
        Effort, Instance, RuntimeOption, Seed, SeedHead, Solution, SolutionErrorV1, Verifier,
30
    },
31
    time::TimePeriod,
32
};
33
use tor_keymgr::KeyMgr;
34
use tor_netdir::{NetDirProvider, NetdirProviderShutdown, params::NetParameters};
35
use tor_netdoc::doc::hsdesc::pow::{PowParams, v1::PowParamsV1};
36
use tor_persist::{
37
    hsnickname::HsNickname,
38
    state_dir::{InstanceRawSubdir, StorageHandle},
39
};
40
use tor_rtcompat::Runtime;
41
use tor_rtcompat::SpawnExt;
42

            
43
use crate::{
44
    BlindIdPublicKeySpecifier, OnionServiceConfig, RendRequest, ReplayError, StartupError,
45
    rend_handshake,
46
    replay::{OpenReplayLogError, PowNonceReplayLog},
47
    status::{PowManagerStatusSender, Problem, State as PowManagerState},
48
};
49

            
50
use super::NewPowManager;
51

            
52
/// Proof-of-Work manager type alias for production, using concrete [`RendRequest`].
53
pub(crate) type PowManager<R> = PowManagerGeneric<R, RendRequest>;
54

            
55
/// This is responsible for rotating Proof-of-Work seeds and doing verification of PoW solves.
56
pub(crate) struct PowManagerGeneric<R, Q>(RwLock<State<R, Q>>);
57

            
58
/// Internal state for [`PowManagerGeneric`].
59
struct State<R, Q> {
60
    /// The [`Seed`]s for a given [`TimePeriod`]
61
    ///
62
    /// The [`ArrayVec`] contains the current and previous seed, and the [`SystemTime`] is when the
63
    /// current seed will expire.
64
    seeds: HashMap<TimePeriod, SeedsForTimePeriod>,
65

            
66
    /// Verifiers for all the seeds that exist in `seeds`.
67
    verifiers: HashMap<SeedHead, (Verifier, Mutex<PowNonceReplayLog>)>,
68

            
69
    /// The nickname for this hidden service.
70
    ///
71
    /// We need this so we can get the blinded keys from the [`KeyMgr`].
72
    nickname: HsNickname,
73

            
74
    /// Directory used to store nonce replay log.
75
    instance_dir: InstanceRawSubdir,
76

            
77
    /// Key manager.
78
    keymgr: Arc<KeyMgr>,
79

            
80
    /// Current suggested effort that we publish in the pow-params line.
81
    ///
82
    /// This is only read by the PowManagerGeneric, and is written to by the [`RendRequestReceiver`].
83
    suggested_effort: Arc<Mutex<Effort>>,
84

            
85
    /// Runtime
86
    runtime: R,
87

            
88
    /// Handle for storing state we need to persist to disk.
89
    storage_handle: StorageHandle<PowManagerStateRecord>,
90

            
91
    /// Queue to tell the publisher to re-upload a descriptor for a given TP, since we've rotated
92
    /// that seed.
93
    publisher_update_tx: mpsc::Sender<TimePeriod>,
94

            
95
    /// The [`RendRequestReceiver`], which contains the queue of [`RendRequest`]s.
96
    ///
97
    /// We need a reference to this in order to tell it when to update the suggested_effort value.
98
    rend_request_rx: RendRequestReceiver<R, Q>,
99

            
100
    /// [`NetDirProvider`], used for getting consensus parameters for configuration values.
101
    netdir_provider: Arc<dyn NetDirProvider>,
102

            
103
    /// Sender for reporting back onion service status.
104
    status_tx: PowManagerStatusSender,
105

            
106
    /// Receiver for the current configuration.
107
    config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
108
}
109

            
110
#[derive(Serialize, Deserialize, Debug, Clone)]
111
/// Information about the current and previous [`Seed`] for a given [`TimePeriod`].
112
struct SeedsForTimePeriod {
113
    /// The previous and current [`Seed`].
114
    ///
115
    /// The last element in this array is the current seed.
116
    seeds: ArrayVec<Seed, 2>,
117

            
118
    /// When the current seed will expire.
119
    next_expiration_time: SystemTime,
120
}
121

            
122
#[derive(Debug)]
123
#[allow(unused)]
124
/// A PoW solve was invalid.
125
///
126
/// While this contains the reason for the failure, we probably just want to use that for
127
/// debugging, we shouldn't make any logical decisions based on what the particular error was.
128
pub(crate) enum PowSolveError {
129
    /// Seed head was not recognized, it may be expired.
130
    InvalidSeedHead,
131
    /// We have already seen a solve with this nonce
132
    NonceReplay(ReplayError),
133
    /// The bytes given as a solution do not form a valid Equi-X puzzle
134
    InvalidEquixSolution(SolutionErrorV1),
135
    /// The solution given was invalid.
136
    InvalidSolve(tor_hscrypto::pow::Error),
137
}
138

            
139
/// On-disk record of [`PowManagerGeneric`] state.
140
#[derive(Serialize, Deserialize, Debug, Default)]
141
pub(crate) struct PowManagerStateRecord {
142
    /// Seeds for each time period.
143
    ///
144
    /// Conceptually, this is a map between TimePeriod and SeedsForTimePeriod, but since TimePeriod
145
    /// can't be serialized to a string, it's not very simple to use serde to serialize it like
146
    /// that, so we instead store it as a list of tuples, and convert it to/from the map when
147
    /// saving/loading.
148
    seeds: Vec<(TimePeriod, SeedsForTimePeriod)>,
149

            
150
    /// Most recently published suggested_effort value.
151
    #[serde(default)]
152
    suggested_effort: Effort,
153
    // We don't persist any per-period state. While it might be sort of nice to, it's complex to
154
    // decide when to write the state out to disk. The disadvantage to not storing it is that when
155
    // we restart the process, we may be up to 5 minutes slower to update the suggested effort to a
156
    // new value, which isn't particularly bad. The only case it would be bad is if a attacker has
157
    // a way to cause the Arti process to restart (in which case they could do that just before the
158
    // update period to pin the suggested effort value at a specific value), but if they have that,
159
    // they have a much more valuable attack (including as a DoS vector) than just a PoW bypass.
160
}
161

            
162
impl<R: Runtime, Q> State<R, Q> {
163
    /// Make a [`PowManagerStateRecord`] for this state.
164
8
    pub(crate) fn to_record(&self) -> PowManagerStateRecord {
165
8
        PowManagerStateRecord {
166
8
            seeds: self.seeds.clone().into_iter().collect(),
167
8
            suggested_effort: *self.suggested_effort.lock().expect("Lock poisoned"),
168
8
        }
169
8
    }
170
}
171

            
172
/// How frequently the suggested effort should be recalculated.
173
const HS_UPDATE_PERIOD: Duration = Duration::from_secs(300);
174

            
175
/// When the suggested effort has changed by less than this much, we don't republish it.
176
///
177
/// Specified as "15 percent" in <https://spec.torproject.org/hspow-spec/common-protocol.html>
178
///
179
/// However, we may want to make this configurable in the future.
180
const SUGGESTED_EFFORT_DEADZONE: f64 = 0.15;
181

            
182
/// How soon before a seed's expiration time we should rotate it and publish a new seed.
183
const SEED_EARLY_ROTATION_TIME: Duration = Duration::from_secs(60 * 5);
184

            
185
/// Minimum seed expiration time in minutes. See:
186
/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
187
const EXPIRATION_TIME_MINS_MIN: u64 = 105;
188

            
189
/// Maximum seed expiration time in minutes. See:
190
/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
191
const EXPIRATION_TIME_MINS_MAX: u64 = 120;
192

            
193
/// Enforce that early rotation time is less than or equal to min expiration time.
194
const _: () = assert!(
195
    SEED_EARLY_ROTATION_TIME.as_secs() <= EXPIRATION_TIME_MINS_MIN * 60,
196
    "Early rotation time must be less than minimum expiration time"
197
);
198

            
199
/// Enforce that min expiration time is less than or equal to max.
200
const _: () = assert!(
201
    EXPIRATION_TIME_MINS_MIN <= EXPIRATION_TIME_MINS_MAX,
202
    "Minimum expiration time must be less than or equal to max"
203
);
204

            
205
/// Depth of the queue used to signal the publisher that it needs to update a given time period.
206
///
207
/// 32 is likely way larger than we need but the messages are tiny so we might as well.
208
const PUBLISHER_UPDATE_QUEUE_DEPTH: usize = 32;
209

            
210
#[derive(Error, Debug, Clone)]
211
#[allow(dead_code)] // We want to show fields in Debug even if we don't use them.
212
#[non_exhaustive]
213
/// Error within the PoW subsystem.
214
pub enum PowError {
215
    /// We don't have a key that is needed.
216
    #[error("Missing required key.")]
217
    MissingKey,
218
    /// Error in the underlying storage layer.
219
    #[error(transparent)]
220
    StorageError(#[from] tor_persist::Error),
221
    /// Error from the ReplayLog.
222
    #[error(transparent)]
223
    OpenReplayLog(#[from] OpenReplayLogError),
224
    /// NetDirProvider has shut down
225
    #[error(transparent)]
226
    NetdirProviderShutdown(#[from] NetdirProviderShutdown),
227
}
228

            
229
impl<R: Runtime, Q: MockableRendRequest + Send + 'static> PowManagerGeneric<R, Q> {
230
    /// Create a new [`PowManagerGeneric`].
231
    #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
232
8
    pub(crate) fn new(
233
8
        runtime: R,
234
8
        nickname: HsNickname,
235
8
        instance_dir: InstanceRawSubdir,
236
8
        keymgr: Arc<KeyMgr>,
237
8
        storage_handle: StorageHandle<PowManagerStateRecord>,
238
8
        netdir_provider: Arc<dyn NetDirProvider>,
239
8
        status_tx: PowManagerStatusSender,
240
8
        config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
241
8
    ) -> Result<NewPowManager<R>, StartupError> {
242
8
        let on_disk_state = storage_handle
243
8
            .load()
244
8
            .map_err(StartupError::LoadState)?
245
8
            .unwrap_or(PowManagerStateRecord::default());
246

            
247
8
        let seeds: HashMap<TimePeriod, SeedsForTimePeriod> =
248
8
            on_disk_state.seeds.into_iter().collect();
249
8
        let suggested_effort = Arc::new(Mutex::new(on_disk_state.suggested_effort));
250

            
251
8
        let mut verifiers = HashMap::new();
252
8
        for (tp, seeds_for_tp) in seeds.clone().into_iter() {
253
            for seed in seeds_for_tp.seeds {
254
                let verifier = match Self::make_verifier(
255
                    &keymgr,
256
                    nickname.clone(),
257
                    tp,
258
                    seed.clone(),
259
                    &config_rx.borrow(),
260
                ) {
261
                    Some(verifier) => verifier,
262
                    None => {
263
                        tracing::warn!(
264
                            "Couldn't construct verifier (key not available?). We will continue without this key, but this may prevent clients from connecting..."
265
                        );
266
                        continue;
267
                    }
268
                };
269
                let replay_log = match PowNonceReplayLog::new_logged(&instance_dir, &seed) {
270
                    Ok(replay_log) => replay_log,
271
                    Err(err) => {
272
                        warn_report!(
273
                            err,
274
                            "Error constructing replay log. We will continue without the log, but be aware that this may allow attackers to bypass PoW defenses..."
275
                        );
276
                        continue;
277
                    }
278
                };
279
                verifiers.insert(seed.head(), (verifier, Mutex::new(replay_log)));
280
            }
281
        }
282

            
283
        // This queue is extremely small, and we only make one of it per onion service, so it's
284
        // fine to not use memquota tracking.
285
8
        let (publisher_update_tx, publisher_update_rx) =
286
8
            crate::mpsc_channel_no_memquota(PUBLISHER_UPDATE_QUEUE_DEPTH);
287

            
288
8
        let (rend_req_tx, rend_req_rx_channel) = super::make_rend_queue();
289
8
        let rend_req_rx = RendRequestReceiver::new(
290
8
            runtime.clone(),
291
8
            nickname.clone(),
292
8
            suggested_effort.clone(),
293
8
            netdir_provider.clone(),
294
8
            status_tx.clone(),
295
8
            config_rx.clone(),
296
        );
297

            
298
8
        let state = State {
299
8
            seeds,
300
8
            nickname,
301
8
            instance_dir,
302
8
            keymgr,
303
8
            publisher_update_tx,
304
8
            verifiers,
305
8
            suggested_effort: suggested_effort.clone(),
306
8
            runtime: runtime.clone(),
307
8
            storage_handle,
308
8
            rend_request_rx: rend_req_rx.clone(),
309
8
            netdir_provider,
310
8
            status_tx,
311
8
            config_rx,
312
8
        };
313
8
        let pow_manager = Arc::new(PowManagerGeneric(RwLock::new(state)));
314

            
315
8
        rend_req_rx.start_accept_thread(runtime, pow_manager.clone(), rend_req_rx_channel);
316

            
317
8
        Ok(NewPowManager {
318
8
            pow_manager,
319
8
            rend_req_tx,
320
8
            rend_req_rx: Box::pin(rend_req_rx),
321
8
            publisher_update_rx,
322
8
        })
323
8
    }
324

            
325
    /// Launch background task to rotate seeds.
326
    pub(crate) fn launch(self: &Arc<Self>) -> Result<(), StartupError> {
327
        let pow_manager = self.clone();
328
        let runtime = pow_manager.0.read().expect("Lock poisoned").runtime.clone();
329

            
330
        runtime
331
            .spawn(pow_manager.main_loop_error_wrapper())
332
            .map_err(|cause| StartupError::Spawn {
333
                spawning: "pow manager",
334
                cause: cause.into(),
335
            })?;
336

            
337
        self.0
338
            .write()
339
            .expect("Lock poisoned")
340
            .status_tx
341
            .send(PowManagerState::Running, None);
342
        Ok(())
343
    }
344

            
345
    /// Run [`Self::main_loop_task`], reporting any errors.
346
    async fn main_loop_error_wrapper(self: Arc<Self>) {
347
        if let Err(err) = self.clone().main_loop_task().await {
348
            self.0
349
                .write()
350
                .expect("Lock poisoned")
351
                .status_tx
352
                .send_broken(Problem::Pow(err));
353
        }
354
    }
355

            
356
    /// Main loop for rotating seeds.
357
    async fn main_loop_task(self: Arc<Self>) -> Result<(), PowError> {
358
        let runtime = self.0.write().expect("Lock poisoned").runtime.clone();
359

            
360
        let mut last_suggested_effort_update = runtime.now();
361
        let mut last_published_suggested_effort: u32 = (*self
362
            .0
363
            .read()
364
            .expect("Lock poisoned")
365
            .suggested_effort
366
            .lock()
367
            .expect("Lock poisoned"))
368
        .into();
369

            
370
        let netdir_provider = self
371
            .0
372
            .read()
373
            .expect("Lock poisoned")
374
            .netdir_provider
375
            .clone();
376
        let net_params = netdir_provider
377
            .wait_for_netdir(tor_netdir::Timeliness::Timely)
378
            .await?
379
            .params()
380
            .clone();
381

            
382
        loop {
383
            let next_update_time = self.rotate_seeds_if_expiring().await;
384

            
385
            // Update the suggested effort, if needed
386
            if runtime.now() - last_suggested_effort_update >= HS_UPDATE_PERIOD {
387
                let (tps_to_update, mut publisher_update_tx) = {
388
                    let mut tps_to_update = vec![];
389

            
390
                    let inner = self.0.read().expect("Lock poisoned");
391

            
392
                    inner.rend_request_rx.update_suggested_effort(&net_params);
393
                    last_suggested_effort_update = runtime.now();
394
                    let new_suggested_effort: u32 =
395
                        (*inner.suggested_effort.lock().expect("Lock poisoned")).into();
396

            
397
                    let percent_change =
398
                        f64::from(new_suggested_effort - last_published_suggested_effort)
399
                            / f64::from(last_published_suggested_effort);
400
                    if percent_change.abs() >= SUGGESTED_EFFORT_DEADZONE {
401
                        last_published_suggested_effort = new_suggested_effort;
402

            
403
                        tps_to_update = inner.seeds.iter().map(|x| *x.0).collect();
404
                    }
405

            
406
                    let publisher_update_tx = inner.publisher_update_tx.clone();
407
                    (tps_to_update, publisher_update_tx)
408
                };
409

            
410
                for time_period in tps_to_update {
411
                    let _ = publisher_update_tx.send(time_period).await;
412
                }
413
            }
414

            
415
            let suggested_effort_update_delay = HS_UPDATE_PERIOD.saturating_sub(
416
                runtime
417
                    .now()
418
                    .saturating_duration_since(last_suggested_effort_update),
419
            );
420

            
421
            // A new TimePeriod that we don't know about (and thus that isn't in next_update_time)
422
            // might get added at any point. Making sure that our maximum delay is the minimum
423
            // amount of time that it might take for a seed to expire means that we can be sure
424
            // that we will rotate newly-added seeds properly.
425
            const MAX_DELAY: Duration = Duration::from_secs(EXPIRATION_TIME_MINS_MIN * 60)
426
                .checked_sub(SEED_EARLY_ROTATION_TIME)
427
                .expect("SEED_EARLY_ROTATION_TIME too high, or EXPIRATION_TIME_MINS_MIN too low.");
428
            let delay = next_update_time
429
                .map(|x| x.duration_since(SystemTime::now()).unwrap_or(MAX_DELAY))
430
                .unwrap_or(MAX_DELAY)
431
                .min(MAX_DELAY)
432
                .min(suggested_effort_update_delay);
433

            
434
            tracing::debug!(next_wakeup = ?delay, "Recalculated PoW seeds.");
435

            
436
            runtime.sleep(delay).await;
437
        }
438
    }
439

            
440
    /// Make a randomized seed expiration time.
441
8
    fn make_next_expiration_time<Rng: RngCore + CryptoRng>(rng: &mut Rng) -> SystemTime {
442
8
        SystemTime::now()
443
8
            + Duration::from_secs(
444
8
                60 * rng
445
8
                    .gen_range_checked(EXPIRATION_TIME_MINS_MIN..=EXPIRATION_TIME_MINS_MAX)
446
8
                    .expect("Can't generate expiration_time"),
447
8
            )
448
8
    }
449

            
450
    /// Make a ner [`Verifier`] for a given [`TimePeriod`] and [`Seed`].
451
    ///
452
    /// If a key is not available for this TP, returns None.
453
    ///
454
    /// This takes individual arguments instead of `&self` to avoid getting into any trouble with
455
    /// locking.
456
8
    fn make_verifier(
457
8
        keymgr: &Arc<KeyMgr>,
458
8
        nickname: HsNickname,
459
8
        time_period: TimePeriod,
460
8
        seed: Seed,
461
8
        config: &OnionServiceConfig,
462
8
    ) -> Option<Verifier> {
463
8
        let blind_id_spec = BlindIdPublicKeySpecifier::new(nickname, time_period);
464
8
        let blind_id_key = match keymgr.get::<HsBlindIdKey>(&blind_id_spec) {
465
8
            Ok(blind_id_key) => blind_id_key,
466
            Err(err) => {
467
                warn_report!(err, "KeyMgr error when getting blinded ID key for PoW");
468
                None
469
            }
470
        };
471
8
        let instance = Instance::new(blind_id_key?.id(), seed);
472
8
        let mut equix = EquiXBuilder::default();
473
8
        if *config.disable_pow_compilation() {
474
            equix.runtime(RuntimeOption::InterpretOnly);
475
8
        }
476
8
        Some(Verifier::new_with_equix(instance, equix))
477
8
    }
478

            
479
    /// Calculate a time when we want to rotate a seed, slightly before it expires, in order to
480
    /// ensure that clients don't ever download a seed that is already out of date.
481
    fn calculate_early_rotation_time(expiration_time: SystemTime) -> SystemTime {
482
        // Underflow cannot happen because:
483
        //
484
        // * We set the expiration time to the current time plus at least the minimum
485
        //   expiration time
486
        // * We know (backed up by a compile-time assertion) that SEED_EARLY_ROTATION_TIME is
487
        //   less than the minimum expiration time.
488
        //
489
        // Thus, the only way this subtraction can underflow is if the system time at the
490
        // moment we set the expiration time was before the epoch, which is not possible on
491
        // reasonable platforms.
492
        expiration_time
493
            .checked_sub(SEED_EARLY_ROTATION_TIME)
494
            .expect("PoW seed expiration underflow")
495
    }
496

            
497
    /// Rotate any seeds that will expire soon.
498
    ///
499
    /// This also pokes the publisher when needed to cause rotated seeds to be published.
500
    ///
501
    /// Returns the next time this function should be called again.
502
    #[allow(clippy::cognitive_complexity)]
503
    async fn rotate_seeds_if_expiring(&self) -> Option<SystemTime> {
504
        let mut expired_verifiers = vec![];
505
        let mut new_verifiers = vec![];
506

            
507
        let mut update_times = vec![];
508
        let mut updated_tps = vec![];
509
        let mut expired_tps = vec![];
510

            
511
        let mut publisher_update_tx = {
512
            let mut state = self.0.write().expect("Lock poisoned");
513

            
514
            let config = state.config_rx.borrow().clone();
515
            let keymgr = state.keymgr.clone();
516
            let nickname = state.nickname.clone();
517

            
518
            for (time_period, info) in state.seeds.iter_mut() {
519
                let rotation_time = Self::calculate_early_rotation_time(info.next_expiration_time);
520
                update_times.push(rotation_time);
521

            
522
                if rotation_time <= SystemTime::now() {
523
                    // This does not allow for easy testing, but because we're in a async function, it's
524
                    // non-trivial to pass in a Rng from the outside world. If we end up writing tests that
525
                    // require that, we can take a function to generate a Rng, but for now, just using the
526
                    // thread rng is fine.
527
                    let mut rng = rand::rng();
528

            
529
                    let seed = Seed::new(&mut rng, None);
530
                    let verifier = match Self::make_verifier(
531
                        &keymgr,
532
                        nickname.clone(),
533
                        *time_period,
534
                        seed.clone(),
535
                        &config,
536
                    ) {
537
                        Some(verifier) => verifier,
538
                        None => {
539
                            // We use not having a key for a given TP as the signal that we should
540
                            // stop keeping track of seeds for that TP.
541
                            expired_tps.push(*time_period);
542
                            continue;
543
                        }
544
                    };
545

            
546
                    let expired_seed = if info.seeds.is_full() {
547
                        info.seeds.pop_at(0)
548
                    } else {
549
                        None
550
                    };
551
                    // .push() is safe, since we just made space above.
552
                    info.seeds.push(seed.clone());
553
                    info.next_expiration_time = Self::make_next_expiration_time(&mut rng);
554
                    update_times.push(info.next_expiration_time);
555

            
556
                    // Make a note to add the new verifier and remove the old one.
557
                    new_verifiers.push((seed, verifier));
558
                    if let Some(expired_seed) = expired_seed {
559
                        expired_verifiers.push(expired_seed.head());
560
                    }
561

            
562
                    // Tell the publisher to update this TP
563
                    updated_tps.push(*time_period);
564

            
565
                    tracing::debug!(time_period = ?time_period, "Rotated PoW seed");
566
                }
567
            }
568

            
569
            for time_period in expired_tps {
570
                if let Some(seeds) = state.seeds.remove(&time_period) {
571
                    for seed in seeds.seeds {
572
                        state.verifiers.remove(&seed.head());
573
                    }
574
                }
575
            }
576

            
577
            for (seed, verifier) in new_verifiers {
578
                let replay_log = Mutex::new(
579
                    PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
580
                        .expect("Couldn't make ReplayLog."),
581
                );
582
                state.verifiers.insert(seed.head(), (verifier, replay_log));
583
            }
584

            
585
            for seed_head in expired_verifiers {
586
                state.verifiers.remove(&seed_head);
587
            }
588

            
589
            let record = state.to_record();
590
            if let Err(err) = state.storage_handle.store(&record) {
591
                warn_report!(err, "Error saving PoW state");
592
            }
593

            
594
            state.publisher_update_tx.clone()
595
        };
596

            
597
        for time_period in updated_tps {
598
            if let Err(err) = publisher_update_tx.send(time_period).await {
599
                warn_report!(err, "Couldn't send update message to publisher");
600
            }
601
        }
602

            
603
        update_times.iter().min().cloned()
604
    }
605

            
606
    /// Get [`PowParams`] for a given [`TimePeriod`].
607
    ///
608
    /// If we don't have any [`Seed`]s for the requested period, generate them. This is the only
609
    /// way that [`PowManagerGeneric`] learns about new [`TimePeriod`]s.
610
144
    pub(crate) fn get_pow_params<Rng: RngCore + CryptoRng>(
611
144
        self: &Arc<Self>,
612
144
        time_period: TimePeriod,
613
144
        rng: &mut Rng,
614
144
    ) -> Result<PowParams, PowError> {
615
144
        let (seed_and_expiration, suggested_effort) = {
616
144
            let state = self.0.read().expect("Lock poisoned");
617
144
            let seed = state
618
144
                .seeds
619
144
                .get(&time_period)
620
144
                .and_then(|x| Some((x.seeds.last()?.clone(), x.next_expiration_time)));
621
144
            let suggested_effort = *state.suggested_effort.lock().expect("Lock poisoned");
622
144
            (seed, suggested_effort)
623
        };
624

            
625
144
        let (seed, expiration) = match seed_and_expiration {
626
136
            Some(seed) => seed,
627
            None => {
628
                // We don't have a seed for this time period, so we need to generate one.
629

            
630
8
                let seed = Seed::new(rng, None);
631
8
                let next_expiration_time = Self::make_next_expiration_time(rng);
632

            
633
8
                let mut seeds = ArrayVec::new();
634
8
                seeds.push(seed.clone());
635

            
636
8
                let mut state = self.0.write().expect("Lock poisoned");
637

            
638
8
                state.seeds.insert(
639
8
                    time_period,
640
8
                    SeedsForTimePeriod {
641
8
                        seeds,
642
8
                        next_expiration_time,
643
8
                    },
644
                );
645

            
646
8
                let verifier = Self::make_verifier(
647
8
                    &state.keymgr,
648
8
                    state.nickname.clone(),
649
8
                    time_period,
650
8
                    seed.clone(),
651
8
                    &state.config_rx.borrow(),
652
                )
653
8
                .ok_or(PowError::MissingKey)?;
654

            
655
8
                let replay_log =
656
8
                    Mutex::new(PowNonceReplayLog::new_logged(&state.instance_dir, &seed)?);
657
8
                state.verifiers.insert(seed.head(), (verifier, replay_log));
658

            
659
8
                let record = state.to_record();
660
8
                state.storage_handle.store(&record)?;
661

            
662
8
                (seed, next_expiration_time)
663
            }
664
        };
665

            
666
144
        Ok(PowParams::V1(PowParamsV1::new(
667
144
            TimerangeBound::new(seed, ..expiration),
668
144
            suggested_effort,
669
144
        )))
670
144
    }
671

            
672
    /// Verify a PoW solve.
673
    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
674
        // Note that we put the nonce into the replay log before we check the solve. While this
675
        // might not be ideal, it's not a problem and is probably the most reasonable thing to do.
676
        // See commit bc5b313028 for a more full explaination.
677
        {
678
            let state = self.0.write().expect("Lock poisoned");
679
            let mut replay_log = match state.verifiers.get(&solve.seed_head()) {
680
                Some((_, replay_log)) => replay_log.lock().expect("Lock poisoned"),
681
                None => return Err(PowSolveError::InvalidSeedHead),
682
            };
683
            replay_log
684
                .check_for_replay(solve.nonce())
685
                .map_err(PowSolveError::NonceReplay)?;
686
        }
687

            
688
        // TODO: Once RwLock::downgrade is stabilized, it would make sense to use it here...
689

            
690
        let state = self.0.read().expect("Lock poisoned");
691
        let verifier = match state.verifiers.get(&solve.seed_head()) {
692
            Some((verifier, _)) => verifier,
693
            None => return Err(PowSolveError::InvalidSeedHead),
694
        };
695

            
696
        let solution = match Solution::try_from_bytes(
697
            solve.nonce().clone(),
698
            solve.effort(),
699
            solve.seed_head(),
700
            solve.solution(),
701
        ) {
702
            Ok(solution) => solution,
703
            Err(err) => return Err(PowSolveError::InvalidEquixSolution(err)),
704
        };
705

            
706
        match verifier.check(&solution) {
707
            Ok(()) => Ok(()),
708
            Err(err) => Err(PowSolveError::InvalidSolve(err)),
709
        }
710
    }
711
}
712

            
713
/// Trait to allow mocking PowManagerGeneric in tests.
714
trait MockablePowManager {
715
    /// Verify a PoW solve.
716
    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError>;
717
}
718

            
719
impl<R: Runtime> MockablePowManager for PowManager<R> {
720
    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
721
        PowManager::check_solve(self, solve)
722
    }
723
}
724

            
725
/// Trait to allow mocking RendRequest in tests.
726
pub(crate) trait MockableRendRequest {
727
    /// Get the proof-of-work extension associated with this request.
728
    fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError>;
729
}
730

            
731
impl MockableRendRequest for RendRequest {
732
    fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError> {
733
        Ok(self
734
            .intro_request()?
735
            .intro_payload()
736
            .proof_of_work_extension())
737
    }
738
}
739

            
740
/// Wrapper around [`RendRequest`] that implements [`std::cmp::Ord`] to sort by [`Effort`] and time.
741
#[derive(Debug)]
742
struct RendRequestOrdByEffort<Q> {
743
    /// The underlying request.
744
    request: Q,
745
    /// The proof-of-work options, if given.
746
    pow: Option<ProofOfWorkV1>,
747
    /// The maximum effort allowed. If the effort of this request is higher than this, it will be
748
    /// treated as though it is this value.
749
    max_effort: Effort,
750
    /// When this request was received, used for ordreing if the effort values are the same.
751
    recv_time: Instant,
752
    /// Unique number for this request, which is used for ordering among requests with the same
753
    /// timestamp.
754
    ///
755
    /// This is intended to be monotonically increasing, although it may overflow. Overflows are
756
    /// not handled in any special way, given that they are a edge case of an edge case, and
757
    /// ordering among requests that came in at the same instant is not important.
758
    request_num: u64,
759
}
760

            
761
impl<Q: MockableRendRequest> RendRequestOrdByEffort<Q> {
762
    /// Create a new [`RendRequestOrdByEffort`].
763
1476
    fn new(
764
1476
        request: Q,
765
1476
        max_effort: Effort,
766
1476
        request_num: u64,
767
1476
    ) -> Result<Self, rend_handshake::IntroRequestError> {
768
1476
        let pow = match request.proof_of_work()?.cloned() {
769
1444
            Some(ProofOfWork::V1(pow)) => Some(pow),
770
32
            None | Some(_) => None,
771
        };
772

            
773
1476
        Ok(Self {
774
1476
            request,
775
1476
            pow,
776
1476
            max_effort,
777
1476
            recv_time: Instant::now(),
778
1476
            request_num,
779
1476
        })
780
1476
    }
781
}
782

            
783
impl<Q: MockableRendRequest> Ord for RendRequestOrdByEffort<Q> {
784
6780
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
785
6780
        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| {
786
6764
            Effort::min(pow.effort(), self.max_effort)
787
6764
        });
788
6780
        let other_effort = other.pow.as_ref().map_or(Effort::zero(), |pow| {
789
6764
            Effort::min(pow.effort(), other.max_effort)
790
6764
        });
791
6780
        match self_effort.cmp(&other_effort) {
792
            std::cmp::Ordering::Equal => {
793
                // Flip ordering, since we want the oldest ones to be handled first.
794
2968
                match other.recv_time.cmp(&self.recv_time) {
795
                    // Use request_num as a final tiebreaker, also flipping ordering (since
796
                    // lower-numbered requests should be older and thus come first)
797
                    std::cmp::Ordering::Equal => other.request_num.cmp(&self.request_num),
798
2968
                    not_equal => not_equal,
799
                }
800
            }
801
3812
            not_equal => not_equal,
802
        }
803
6780
    }
804
}
805

            
806
impl<Q: MockableRendRequest> PartialOrd for RendRequestOrdByEffort<Q> {
807
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
808
        Some(self.cmp(other))
809
    }
810
}
811

            
812
impl<Q: MockableRendRequest> PartialEq for RendRequestOrdByEffort<Q> {
813
    fn eq(&self, other: &Self) -> bool {
814
        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| {
815
            Effort::min(pow.effort(), self.max_effort)
816
        });
817
        let other_effort = other.pow.as_ref().map_or(Effort::zero(), |pow| {
818
            Effort::min(pow.effort(), other.max_effort)
819
        });
820
        self_effort == other_effort && self.recv_time == other.recv_time
821
    }
822
}
823

            
824
impl<Q: MockableRendRequest> Eq for RendRequestOrdByEffort<Q> {}
825

            
826
/// Implements [`Stream`] for incoming [`RendRequest`]s, using a priority queue system to dequeue
827
/// high-[`Effort`] requests first.
828
///
829
/// This is implemented on top of a [`mpsc::Receiver`]. There is a thread that dequeues from the
830
/// [`mpsc::Receiver`], checks the PoW solve, and if it is correct, adds it to a [`BTreeSet`],
831
/// which the [`Stream`] implementation reads from.
832
///
833
/// This is not particularly optimized — queueing and dequeuing use a [`Mutex`], so there may be
834
/// some contention there. It's possible there may be some fancy lockless (or more optimized)
835
/// priority queue that we could use, but we should properly benchmark things before trying to make
836
/// a optimization like that.
837
pub(crate) struct RendRequestReceiver<R, Q>(Arc<Mutex<RendRequestReceiverInner<R, Q>>>);
838

            
839
impl<R, Q> Clone for RendRequestReceiver<R, Q> {
840
120
    fn clone(&self) -> Self {
841
120
        Self(self.0.clone())
842
120
    }
843
}
844

            
845
/// Inner implementation for [`RendRequestReceiver`].
846
struct RendRequestReceiverInner<R, Q> {
847
    /// Internal priority queue of requests.
848
    queue: BTreeSet<RendRequestOrdByEffort<Q>>,
849

            
850
    /// Internal FIFO queue of requests used when PoW is disabled.
851
    ///
852
    /// We have this here to support switching back and forth between PoW enabled and disabled at
853
    /// runtime, although that isn't currently supported.
854
    queue_pow_disabled: VecDeque<Q>,
855

            
856
    /// Waker to inform async readers when there is a new message on the queue.
857
    waker: Option<Waker>,
858

            
859
    /// Runtime, used to get current time in a testable way.
860
    runtime: R,
861

            
862
    /// Nickname, use when reporting metrics.
863
    nickname: HsNickname,
864

            
865
    /// [`NetDirProvider`], for getting configuration values in consensus parameters.
866
    netdir_provider: Arc<dyn NetDirProvider>,
867

            
868
    /// Current configuration, used to see whether PoW is enabled or not.
869
    config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
870

            
871
    /// When the current update period started.
872
    update_period_start: Instant,
873
    /// Number of requests that were enqueued during the current update period, and had an effort
874
    /// greater than or equal to the suggested effort.
875
    num_enqueued_gte_suggested: usize,
876
    /// Number of requests that were dequeued during the current update period.
877
    num_dequeued: u32,
878
    /// Amount of time during the current update period that we spent with no requests in the
879
    /// queue.
880
    idle_time: Duration,
881
    /// Time that the queue last went from having items in it to not having items in it, or vice
882
    /// versa. This is used to update idle_time.
883
    last_transition: Instant,
884
    /// Sum of all effort values that were validated and enqueued during the current update period.
885
    total_effort: u64,
886

            
887
    /// Most recent published suggested effort value.
888
    ///
889
    /// We write to this, which is then published in the pow-params line by [`PowManagerGeneric`].
890
    suggested_effort: Arc<Mutex<Effort>>,
891

            
892
    /// Sender for reporting back onion service status.
893
    status_tx: PowManagerStatusSender,
894
}
895

            
896
impl<R: Runtime, Q: MockableRendRequest + Send + 'static> RendRequestReceiver<R, Q> {
897
    /// Create a new [`RendRequestReceiver`].
898
28
    fn new(
899
28
        runtime: R,
900
28
        nickname: HsNickname,
901
28
        suggested_effort: Arc<Mutex<Effort>>,
902
28
        netdir_provider: Arc<dyn NetDirProvider>,
903
28
        status_tx: PowManagerStatusSender,
904
28
        config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
905
28
    ) -> Self {
906
28
        let now = runtime.now();
907
28
        RendRequestReceiver(Arc::new(Mutex::new(RendRequestReceiverInner {
908
28
            queue: BTreeSet::new(),
909
28
            queue_pow_disabled: VecDeque::new(),
910
28
            waker: None,
911
28
            runtime,
912
28
            nickname,
913
28
            netdir_provider,
914
28
            config_rx,
915
28
            update_period_start: now,
916
28
            num_enqueued_gte_suggested: 0,
917
28
            num_dequeued: 0,
918
28
            idle_time: Duration::new(0, 0),
919
28
            last_transition: now,
920
28
            total_effort: 0,
921
28
            suggested_effort,
922
28
            status_tx,
923
28
        })))
924
28
    }
925

            
926
    // spawn_blocking executes immediately, but some of our abstractions make clippy not
927
    // realize this.
928
    #[allow(clippy::let_underscore_future)]
929
    /// Start helper thread to accept and validate [`RendRequest`]s.
930
28
    fn start_accept_thread<P: MockablePowManager + Send + Sync + 'static>(
931
28
        &self,
932
28
        runtime: R,
933
28
        pow_manager: Arc<P>,
934
28
        inner_receiver: mpsc::Receiver<Q>,
935
28
    ) {
936
28
        let receiver = self.clone();
937
28
        let runtime_clone = runtime.clone();
938
28
        let _ = runtime.clone().spawn_blocking(move || {
939
            if let Err(err) =
940
28
                receiver
941
28
                    .clone()
942
28
                    .accept_loop(&runtime_clone, &pow_manager, inner_receiver)
943
            {
944
                warn_report!(err, "PoW accept loop error!");
945
                receiver
946
                    .0
947
                    .lock()
948
                    .expect("Lock poisoned")
949
                    .status_tx
950
                    .send_broken(Problem::Pow(err));
951
28
            }
952
28
        });
953

            
954
28
        let receiver = self.clone();
955
28
        let _ = runtime.clone().spawn_blocking(move || {
956
28
            if let Err(err) = receiver.clone().expire_old_requests_loop(&runtime) {
957
28
                warn_report!(err, "PoW request expiration loop error!");
958
28
                receiver
959
28
                    .0
960
28
                    .lock()
961
28
                    .expect("Lock poisoned")
962
28
                    .status_tx
963
28
                    .send_broken(Problem::Pow(err));
964
            }
965
28
        });
966
28
    }
967

            
968
    /// Update the suggested effort value, as per the algorithm in prop362
969
32
    fn update_suggested_effort(&self, net_params: &NetParameters) {
970
32
        let mut inner = self.0.lock().expect("Lock poisoned");
971

            
972
32
        let decay_adjustment_fraction = net_params.hs_pow_v1_default_decay_adjustment.as_fraction();
973

            
974
32
        if inner.num_dequeued != 0 {
975
32
            let update_period_duration = inner.runtime.now() - inner.update_period_start;
976
32
            let avg_request_duration = update_period_duration / inner.num_dequeued;
977
32
            if inner.queue.is_empty() {
978
24
                let now = inner.runtime.now();
979
24
                let last_transition = inner.last_transition;
980
24
                inner.idle_time += now - last_transition;
981
24
            }
982
32
            let adjusted_idle_time = Duration::saturating_sub(
983
32
                inner.idle_time,
984
32
                avg_request_duration * inner.queue.len().try_into().expect("Queue too large."),
985
            );
986
            // TODO: use as_millis_f64 when stable
987
32
            let idle_fraction = f64::from_u128(adjusted_idle_time.as_millis())
988
32
                .expect("Conversion error")
989
32
                / f64::from_u128(update_period_duration.as_millis()).expect("Conversion error");
990
32
            let busy_fraction = 1.0 - idle_fraction;
991

            
992
32
            let mut suggested_effort = inner.suggested_effort.lock().expect("Lock poisoned");
993
32
            let suggested_effort_inner: u32 = (*suggested_effort).into();
994

            
995
32
            if busy_fraction == 0.0 {
996
                let new_suggested_effort =
997
                    u32::from_f64(f64::from(suggested_effort_inner) * decay_adjustment_fraction)
998
                        .expect("Conversion error");
999
                *suggested_effort = Effort::from(new_suggested_effort);
            } else {
32
                let theoretical_num_dequeued =
32
                    f64::from(inner.num_dequeued) * (1.0 / busy_fraction);
32
                let num_enqueued_gte_suggested_f64 =
32
                    f64::from_usize(inner.num_enqueued_gte_suggested).expect("Conversion error");
32
                if num_enqueued_gte_suggested_f64 >= theoretical_num_dequeued {
8
                    let effort_per_dequeued = u32::from_f64(
8
                        f64::from_u64(inner.total_effort).expect("Conversion error")
8
                            / f64::from(inner.num_dequeued),
8
                    )
8
                    .expect("Conversion error");
8
                    *suggested_effort = Effort::from(std::cmp::max(
8
                        effort_per_dequeued,
8
                        suggested_effort_inner + 1,
8
                    ));
24
                } else {
24
                    let decay = num_enqueued_gte_suggested_f64 / theoretical_num_dequeued;
24
                    let adjusted_decay = decay + ((1.0 - decay) * decay_adjustment_fraction);
24
                    let new_suggested_effort =
24
                        u32::from_f64(f64::from(suggested_effort_inner) * adjusted_decay)
24
                            .expect("Conversion error");
24
                    *suggested_effort = Effort::from(new_suggested_effort);
24
                }
            }
32
            drop(suggested_effort);
        }
32
        let now = inner.runtime.now();
32
        inner.update_period_start = now;
32
        inner.num_enqueued_gte_suggested = 0;
32
        inner.num_dequeued = 0;
32
        inner.idle_time = Duration::new(0, 0);
32
        inner.last_transition = now;
32
        inner.total_effort = 0;
32
    }
    /// Loop to accept message from the wrapped [`mpsc::Receiver`], validate PoW sovles, and
    /// enqueue onto the priority queue.
    #[allow(clippy::cognitive_complexity)]
28
    fn accept_loop<P: MockablePowManager>(
28
        self,
28
        runtime: &R,
28
        pow_manager: &Arc<P>,
28
        mut receiver: mpsc::Receiver<Q>,
28
    ) -> Result<(), PowError> {
28
        let mut request_num = 0;
28
        let netdir_provider = self
28
            .0
28
            .lock()
28
            .expect("Lock poisoned")
28
            .netdir_provider
28
            .clone();
28
        let net_params = runtime
28
            .reenter_block_on(netdir_provider.wait_for_netdir(tor_netdir::Timeliness::Timely))?
28
            .params()
28
            .clone();
28
        let max_effort: u32 = net_params
28
            .hs_pow_v1_max_effort
28
            .get()
28
            .try_into()
28
            .expect("Bounded i32 not in range of u32?!");
28
        let max_effort = Effort::from(max_effort);
28
        let config_rx = self.0.lock().expect("Lock poisoned").config_rx.clone();
28
        let nickname = self.0.lock().expect("Lock poisoned").nickname.to_string();
        cfg_if::cfg_if! {
            if #[cfg(feature = "metrics")] {
28
                let counter_rendrequest_error_total = metrics::counter!("arti_hss_pow_rendrequest_error_total", "nickname" => nickname.clone());
28
                let counter_rendrequest_verification_failure = metrics::counter!("arti_hss_pow_rendrequest_verification_failure_total", "nickname" => nickname.clone());
28
                let counter_rend_queue_overflow = metrics::counter!("arti_hss_pow_rend_queue_overflow_total", "nickname" => nickname.clone());
28
                let counter_rendrequest_enqueued = metrics::counter!("arti_hss_pow_rendrequest_enqueued_total", "nickname" => nickname.clone());
28
                let histogram_rendrequest_effort = metrics::histogram!("arti_hss_pow_rendrequest_effort_hist", "nickname" => nickname.clone());
            }
        }
        loop {
1520
            let rend_request = if let Some(rend_request) = runtime.reenter_block_on(receiver.next())
            {
1492
                rend_request
            } else {
28
                self.0
28
                    .lock()
28
                    .expect("Lock poisoned")
28
                    .status_tx
28
                    .send_shutdown();
28
                return Ok(());
            };
1492
            if config_rx.borrow().enable_pow {
1476
                let rend_request =
1476
                    match RendRequestOrdByEffort::new(rend_request, max_effort, request_num) {
1476
                        Ok(rend_request) => rend_request,
                        Err(err) => {
                            #[cfg(feature = "metrics")]
                            counter_rendrequest_error_total.increment(1);
                            tracing::trace!(?err, "Error processing RendRequest");
                            continue;
                        }
                    };
1476
                request_num = request_num.wrapping_add(1);
1476
                if let Some(ref pow) = rend_request.pow {
1444
                    if let Err(err) = pow_manager.check_solve(pow) {
4
                        tracing::debug!(?err, "PoW verification failed");
                        #[cfg(feature = "metrics")]
4
                        counter_rendrequest_verification_failure.increment(1);
4
                        continue;
                    } else {
                        #[cfg(feature = "metrics")]
1440
                        {
1440
                            let effort: u32 = pow.effort().into();
1440
                            histogram_rendrequest_effort.record(effort);
1440
                        }
                    }
32
                }
1472
                let mut inner = self.0.lock().expect("Lock poisoned");
1472
                if inner.queue.is_empty() {
52
                    let now = runtime.now();
52
                    let last_transition = inner.last_transition;
52
                    inner.idle_time += now - last_transition;
52
                    inner.last_transition = now;
1420
                }
1472
                if let Some(ref request_pow) = rend_request.pow {
1440
                    if request_pow.effort()
1440
                        >= *inner.suggested_effort.lock().expect("Lock poisoned")
                    {
1440
                        inner.num_enqueued_gte_suggested += 1;
1440
                        let effort: u32 = request_pow.effort().into();
1440
                        if let Some(total_effort) = inner.total_effort.checked_add(effort.into()) {
1440
                            inner.total_effort = total_effort;
1440
                        } else {
                            tracing::warn!(
                                "PoW total_effort would overflow. The total effort has been capped, but this is not expected to happen - please file a bug report with logs and information about the circumstances under which this occured."
                            );
                            inner.total_effort = u64::MAX;
                        }
                    }
32
                }
1472
                if inner.queue.len() >= config_rx.borrow().pow_rend_queue_depth {
4
                    let dropped_request = inner.queue.pop_first();
                    #[cfg(feature = "metrics")]
4
                    counter_rend_queue_overflow.increment(1);
4
                    tracing::debug!(
4
                        dropped_effort = ?dropped_request.map(|x| x.pow.map(|x| x.effort())),
                        "RendRequest queue full, dropping request."
                    );
1468
                }
1472
                inner.queue.insert(rend_request);
                #[cfg(feature = "metrics")]
1472
                counter_rendrequest_enqueued.increment(1);
1472
                if let Some(waker) = &inner.waker {
36
                    waker.wake_by_ref();
1436
                }
            } else {
                // TODO (#2082): when allowing enable_pow to be toggled at runtime, we will need to
                // do bookkeeping here, as above. Perhaps it can be refactored nicely so the
                // bookkeeping code can be the same in both cases.
16
                let mut inner = self.0.lock().expect("Lock poisoned");
16
                inner.queue_pow_disabled.push_back(rend_request);
                #[cfg(feature = "metrics")]
16
                counter_rendrequest_enqueued.increment(1);
16
                if let Some(waker) = &inner.waker {
                    waker.wake_by_ref();
16
                }
            }
        }
28
    }
    /// Loop to check for messages that are older than our timeout and remove them from the queue.
28
    fn expire_old_requests_loop(self, runtime: &R) -> Result<(), PowError> {
28
        let netdir_provider = self
28
            .0
28
            .lock()
28
            .expect("Lock poisoned")
28
            .netdir_provider
28
            .clone();
28
        let net_params = runtime
28
            .reenter_block_on(netdir_provider.wait_for_netdir(tor_netdir::Timeliness::Timely))?
28
            .params()
28
            .clone();
28
        let max_age: Duration = net_params
28
            .hs_pow_v1_service_intro_timeout
28
            .try_into()
28
            .expect(
28
                "Couldn't convert HiddenServiceProofOfWorkV1ServiceIntroTimeoutSeconds to Duration",
            );
28
        let nickname = self.0.lock().expect("Lock poisoned").nickname.to_string();
        #[cfg(feature = "metrics")]
28
        let counter_rendrequest_expired = metrics::counter!("arti_hss_pow_rendrequest_expired_total", "nickname" => nickname.clone());
        loop {
228
            let inner = self.0.lock().expect("Lock poisoned");
            // Wake up when the oldest request will reach the expiration age, or, if there are no
            // items currently in the queue, wait for the maximum age.
228
            let wait_time = inner
228
                .queue
228
                .first()
228
                .map(|r| {
4
                    max_age.saturating_sub(runtime.now().saturating_duration_since(r.recv_time))
4
                })
228
                .unwrap_or(max_age);
228
            drop(inner);
228
            runtime.reenter_block_on(runtime.sleep(wait_time));
228
            let mut inner = self.0.lock().expect("Lock poisoned");
228
            let now = runtime.now();
228
            let prev_len = inner.queue.len();
228
            inner.queue.retain(|r| now - r.recv_time < max_age);
228
            let dropped = prev_len - inner.queue.len();
228
            tracing::trace!(dropped, "Expired timed out RendRequests");
            #[cfg(feature = "metrics")]
200
            counter_rendrequest_expired
200
                .increment(dropped.try_into().expect("usize overflowed u64!"));
        }
    }
}
impl<R: Runtime, Q: MockableRendRequest> Stream for RendRequestReceiver<R, Q> {
    type Item = Q;
1508
    fn poll_next(
1508
        self: std::pin::Pin<&mut Self>,
1508
        cx: &mut std::task::Context<'_>,
1508
    ) -> std::task::Poll<Option<Self::Item>> {
1508
        let mut inner = self.get_mut().0.lock().expect("Lock poisoned");
1508
        if inner.config_rx.borrow().enable_pow {
1492
            match inner.queue.pop_last() {
1464
                Some(item) => {
1464
                    inner.num_dequeued += 1;
1464
                    if inner.queue.is_empty() {
48
                        inner.last_transition = inner.runtime.now();
1416
                    }
1464
                    std::task::Poll::Ready(Some(item.request))
                }
                None => {
28
                    inner.waker = Some(cx.waker().clone());
28
                    std::task::Poll::Pending
                }
            }
16
        } else if let Some(request) = inner.queue_pow_disabled.pop_front() {
            // TODO (#2082): when we allow changing enable_pow at runtime, we will need to do
            // bookkeeping here.
16
            std::task::Poll::Ready(Some(request))
        } else {
            inner.waker = Some(cx.waker().clone());
            std::task::Poll::Pending
        }
1508
    }
}
#[cfg(test)]
mod test {
    #![allow(clippy::unwrap_used)]
    use crate::config::OnionServiceConfigBuilder;
    use crate::status::{OnionServiceStatus, StatusSender};
    use super::*;
    use futures::FutureExt;
    use tor_hscrypto::pow::v1::{Nonce, SolutionByteArray};
    use tor_netdir::{testnet, testprovider::TestNetDirProvider};
    use tor_rtmock::MockRuntime;
    struct MockPowManager;
    #[derive(Debug)]
    struct MockRendRequest {
        id: usize,
        pow: Option<ProofOfWork>,
    }
    impl MockablePowManager for MockPowManager {
        fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
            // For testing, treat all zeros as the only valid solve. Error is chosen arbitrarily.
            if solve.solution() == &[0; 16] {
                Ok(())
            } else {
                Err(PowSolveError::InvalidSeedHead)
            }
        }
    }
    impl MockableRendRequest for MockRendRequest {
        fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError> {
            Ok(self.pow.as_ref())
        }
    }
    fn make_req(id: usize, effort: Option<u32>) -> MockRendRequest {
        MockRendRequest {
            id,
            pow: effort.map(|e| {
                ProofOfWork::V1(ProofOfWorkV1::new(
                    Nonce::from([0; 16]),
                    Effort::from(e),
                    SeedHead::from([0; 4]),
                    SolutionByteArray::from([0; 16]),
                ))
            }),
        }
    }
    fn make_req_invalid(id: usize, effort: u32) -> MockRendRequest {
        MockRendRequest {
            id,
            pow: Some(ProofOfWork::V1(ProofOfWorkV1::new(
                Nonce::from([0; 16]),
                Effort::from(effort),
                SeedHead::from([0; 4]),
                SolutionByteArray::from([1; 16]),
            ))),
        }
    }
    #[allow(clippy::type_complexity)]
    fn make_test_receiver(
        runtime: &MockRuntime,
        netdir_params: Vec<(String, i32)>,
        config: Option<OnionServiceConfig>,
    ) -> (
        RendRequestReceiver<MockRuntime, MockRendRequest>,
        mpsc::Sender<MockRendRequest>,
        Arc<Mutex<Effort>>,
        NetParameters,
        postage::watch::Sender<Arc<OnionServiceConfig>>,
    ) {
        let pow_manager = Arc::new(MockPowManager);
        let suggested_effort = Arc::new(Mutex::new(Effort::zero()));
        let netdir = testnet::construct_custom_netdir_with_params(
            testnet::simple_net_func,
            netdir_params,
            None,
        )
        .unwrap()
        .unwrap_if_sufficient()
        .unwrap();
        let net_params = netdir.params().clone();
        let netdir_provider: Arc<TestNetDirProvider> = Arc::new(netdir.into());
        let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
        let nickname = HsNickname::new("test-hs".to_string()).unwrap();
        let (config_tx, config_rx) = postage::watch::channel_with(Arc::new(
            config.unwrap_or(
                OnionServiceConfigBuilder::default()
                    .nickname(nickname.clone())
                    .enable_pow(true)
                    .build()
                    .unwrap(),
            ),
        ));
        let receiver: RendRequestReceiver<_, MockRendRequest> = RendRequestReceiver::new(
            runtime.clone(),
            nickname.clone(),
            suggested_effort.clone(),
            netdir_provider,
            status_tx,
            config_rx,
        );
        let (tx, rx) = mpsc::channel(32);
        receiver.start_accept_thread(runtime.clone(), pow_manager, rx);
        (receiver, tx, suggested_effort, net_params, config_tx)
    }
    #[test]
    fn test_basic_pow_ordering() {
        MockRuntime::test_with_various(|runtime| async move {
            let (mut receiver, mut tx, _suggested_effort, _net_params, _config_tx) =
                make_test_receiver(&runtime, vec![], None);
            // Request with no PoW
            tx.send(make_req(0, None)).await.unwrap();
            assert_eq!(receiver.next().await.unwrap().id, 0);
            // Request with PoW
            tx.send(make_req(1, Some(0))).await.unwrap();
            assert_eq!(receiver.next().await.unwrap().id, 1);
            // Request with effort is before request with zero effort
            tx.send(make_req(2, Some(0))).await.unwrap();
            tx.send(make_req(3, Some(16))).await.unwrap();
            runtime.progress_until_stalled().await;
            assert_eq!(receiver.next().await.unwrap().id, 3);
            assert_eq!(receiver.next().await.unwrap().id, 2);
            // Invalid solves are dropped
            tx.send(make_req_invalid(4, 32)).await.unwrap();
            tx.send(make_req(5, Some(16))).await.unwrap();
            runtime.progress_until_stalled().await;
            assert_eq!(receiver.next().await.unwrap().id, 5);
            assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
        });
    }
    #[test]
    fn test_suggested_effort_increase() {
        MockRuntime::test_with_various(|runtime| async move {
            let (mut receiver, mut tx, suggested_effort, net_params, _config_tx) =
                make_test_receiver(
                    &runtime,
                    vec![(
                        "HiddenServiceProofOfWorkV1ServiceIntroTimeoutSeconds".to_string(),
                        60000,
                    )],
                    None,
                );
            // Get through all the requests in plenty of time, no increase
            for n in 0..128 {
                tx.send(make_req(n, Some(0))).await.unwrap();
            }
            runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
            for _ in 0..128 {
                receiver.next().await.unwrap();
            }
            runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
            receiver.update_suggested_effort(&net_params);
            assert_eq!(suggested_effort.lock().unwrap().clone(), Effort::zero());
            // Requests left in the queue with zero suggested effort, suggested effort should
            // increase
            for n in 0..128 {
                tx.send(make_req(n, Some(0))).await.unwrap();
            }
            runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
            for _ in 0..64 {
                receiver.next().await.unwrap();
            }
            runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
            receiver.update_suggested_effort(&net_params);
            let mut new_suggested_effort = *suggested_effort.lock().unwrap();
            assert!(new_suggested_effort > Effort::zero());
            // We keep on being behind, effort should increase again.
            for n in 0..64 {
                tx.send(make_req(n, Some(new_suggested_effort.into())))
                    .await
                    .unwrap();
            }
            receiver.next().await.unwrap();
            runtime.advance_by(HS_UPDATE_PERIOD).await;
            receiver.update_suggested_effort(&net_params);
            let mut old_suggested_effort = new_suggested_effort;
            new_suggested_effort = *suggested_effort.lock().unwrap();
            assert!(new_suggested_effort > old_suggested_effort);
            // We catch up now, effort should start dropping, but not be zero immediately.
            for n in 0..32 {
                tx.send(make_req(n, Some(new_suggested_effort.into())))
                    .await
                    .unwrap();
            }
            runtime.advance_by(HS_UPDATE_PERIOD / 16 * 15).await;
            while receiver.next().now_or_never().is_some() {
                // Keep going...
            }
            runtime.advance_by(HS_UPDATE_PERIOD / 16).await;
            receiver.update_suggested_effort(&net_params);
            old_suggested_effort = new_suggested_effort;
            new_suggested_effort = *suggested_effort.lock().unwrap();
            assert!(new_suggested_effort < old_suggested_effort);
            assert!(new_suggested_effort > Effort::zero());
            // Effort will drop to zero eventually
            let mut num_loops = 0;
            loop {
                tx.send(make_req(0, Some(new_suggested_effort.into())))
                    .await
                    .unwrap();
                runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
                while receiver.next().now_or_never().is_some() {
                    // Keep going...
                }
                runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
                receiver.update_suggested_effort(&net_params);
                old_suggested_effort = new_suggested_effort;
                new_suggested_effort = *suggested_effort.lock().unwrap();
                assert!(new_suggested_effort < old_suggested_effort);
                if new_suggested_effort == Effort::zero() {
                    break;
                }
                num_loops += 1;
                if num_loops > 5 {
                    panic!("Took too long for suggested effort to fall!");
                }
            }
        });
    }
    #[test]
    fn test_rendrequest_timeout() {
        MockRuntime::test_with_various(|runtime| async move {
            let (receiver, mut tx, _suggested_effort, net_params, _config_tx) =
                make_test_receiver(&runtime, vec![], None);
            let r0 = MockRendRequest { id: 0, pow: None };
            tx.send(r0).await.unwrap();
            let max_age: Duration = net_params
                .hs_pow_v1_service_intro_timeout
                .try_into()
                .unwrap();
            runtime.advance_by(max_age * 2).await;
            // Waited too long, request has been dropped
            assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
        });
    }
    #[test]
    fn test_pow_disabled() {
        MockRuntime::test_with_various(|runtime| async move {
            let (mut receiver, mut tx, _suggested_effort, _net_params, _config_tx) =
                make_test_receiver(
                    &runtime,
                    vec![],
                    Some(
                        OnionServiceConfigBuilder::default()
                            .nickname(HsNickname::new("test-hs".to_string()).unwrap())
                            .enable_pow(false)
                            .build()
                            .unwrap(),
                    ),
                );
            // Request with no PoW
            tx.send(make_req(0, None)).await.unwrap();
            tx.send(make_req(1, Some(0))).await.unwrap();
            tx.send(make_req(2, Some(20))).await.unwrap();
            tx.send(make_req(3, Some(10))).await.unwrap();
            runtime.progress_until_stalled().await;
            // Requests are FIFO, since PoW is disabled
            assert_eq!(receiver.next().await.unwrap().id, 0);
            assert_eq!(receiver.next().await.unwrap().id, 1);
            assert_eq!(receiver.next().await.unwrap().id, 2);
            assert_eq!(receiver.next().await.unwrap().id, 3);
        });
    }
    #[test]
    fn test_rend_queue_max_depth() {
        MockRuntime::test_with_various(|runtime| async move {
            let (mut receiver, mut tx, _suggested_effort, _net_params, mut config_tx) =
                make_test_receiver(
                    &runtime,
                    vec![],
                    Some(
                        OnionServiceConfigBuilder::default()
                            .nickname(HsNickname::new("test-hs".to_string()).unwrap())
                            .enable_pow(true)
                            .pow_rend_queue_depth(2)
                            .build()
                            .unwrap(),
                    ),
                );
            tx.send(make_req(0, None)).await.unwrap();
            tx.send(make_req(1, None)).await.unwrap();
            tx.send(make_req(2, None)).await.unwrap();
            runtime.progress_until_stalled().await;
            assert!(receiver.next().await.is_some());
            assert!(receiver.next().await.is_some());
            assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
            // Check that increasing queue size at runtime works...
            config_tx
                .send(Arc::new(
                    OnionServiceConfigBuilder::default()
                        .nickname(HsNickname::new("test-hs".to_string()).unwrap())
                        .enable_pow(true)
                        .pow_rend_queue_depth(8)
                        .build()
                        .unwrap(),
                ))
                .await
                .unwrap();
            tx.send(make_req(0, None)).await.unwrap();
            tx.send(make_req(1, None)).await.unwrap();
            tx.send(make_req(2, None)).await.unwrap();
            runtime.progress_until_stalled().await;
            assert!(receiver.next().await.is_some());
            assert!(receiver.next().await.is_some());
            assert!(receiver.next().await.is_some());
        });
    }
}