1
//! Publish and maintain onion service descriptors
2
//!
3
//! See the [`reactor`] module-level documentation for more details.
4

            
5
mod backoff;
6
mod descriptor;
7
mod reactor;
8
mod reupload_timer;
9

            
10
use crate::config::restricted_discovery::RestrictedDiscoveryKeys;
11
use crate::internal_prelude::*;
12
use crate::pow::PowManager;
13

            
14
use backoff::{BackoffError, BackoffSchedule, RetriableError, Runner};
15
use descriptor::{DescriptorStatus, VersionedDescriptor, build_sign};
16
use reactor::Reactor;
17
use reactor::read_blind_id_keypair;
18
use reupload_timer::ReuploadTimer;
19

            
20
use tor_config_path::CfgPathResolver;
21

            
22
pub use reactor::UploadError;
23
pub(crate) use reactor::{Mockable, OVERALL_UPLOAD_TIMEOUT, Real};
24

            
25
/// A handle for the Hsdir Publisher for an onion service.
26
///
27
/// This handle represents a set of tasks that identify the hsdirs for each
28
/// relevant time period, construct descriptors, publish them, and keep them
29
/// up-to-date.
30
#[must_use = "If you don't call launch() on the publisher, it won't publish any descriptors."]
31
pub(crate) struct Publisher<R: Runtime, M: Mockable> {
32
    /// The runtime.
33
    runtime: R,
34
    /// The service for which we're publishing descriptors.
35
    nickname: HsNickname,
36
    /// A source for new network directories that we use to determine
37
    /// our HsDirs.
38
    dir_provider: Arc<dyn NetDirProvider>,
39
    /// Mockable state.
40
    ///
41
    /// This is used for launching circuits and for obtaining random number generators.
42
    mockable: M,
43
    /// The onion service config.
44
    config: Arc<OnionServiceConfig>,
45
    /// A channel for receiving IPT change notifications.
46
    ipt_watcher: IptsPublisherView,
47
    /// A channel for receiving onion service config change notifications.
48
    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
49
    /// The key manager.
50
    keymgr: Arc<KeyMgr>,
51
    /// A sender for updating the status of the onion service.
52
    status_tx: PublisherStatusSender,
53
    /// Path resolver for configuration files.
54
    path_resolver: Arc<CfgPathResolver>,
55
    /// Proof-of-work state
56
    pow_manager: Arc<PowManager<R>>,
57
    /// Queue on which we receive messages from the [`PowManager`] telling us that a seed has
58
    /// rotated and thus we need to republish the descriptor for a particular time period.
59
    update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
60
}
61

            
62
impl<R: Runtime, M: Mockable> Publisher<R, M> {
63
    /// Create a new publisher.
64
    ///
65
    /// When it launches, it will know no keys or introduction points,
66
    /// and will therefore not upload any descriptors.
67
    ///
68
    /// The publisher won't start publishing until you call [`Publisher::launch`].
69
    #[allow(clippy::too_many_arguments)]
70
8
    pub(crate) fn new(
71
8
        runtime: R,
72
8
        nickname: HsNickname,
73
8
        dir_provider: Arc<dyn NetDirProvider>,
74
8
        mockable: impl Into<M>,
75
8
        ipt_watcher: IptsPublisherView,
76
8
        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
77
8
        status_tx: PublisherStatusSender,
78
8
        keymgr: Arc<KeyMgr>,
79
8
        path_resolver: Arc<CfgPathResolver>,
80
8
        pow_manager: Arc<PowManager<R>>,
81
8
        update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
82
8
    ) -> Self {
83
8
        let config = config_rx.borrow().clone();
84
8
        Self {
85
8
            runtime,
86
8
            nickname,
87
8
            dir_provider,
88
8
            mockable: mockable.into(),
89
8
            config,
90
8
            ipt_watcher,
91
8
            config_rx,
92
8
            status_tx,
93
8
            keymgr,
94
8
            path_resolver,
95
8
            pow_manager,
96
8
            update_from_pow_manager_rx,
97
8
        }
98
8
    }
99

            
100
    /// Launch the publisher reactor.
101
8
    pub(crate) fn launch(self) -> Result<(), StartupError> {
102
        let Publisher {
103
8
            runtime,
104
8
            nickname,
105
8
            dir_provider,
106
8
            mockable,
107
8
            config,
108
8
            ipt_watcher,
109
8
            config_rx,
110
8
            status_tx,
111
8
            keymgr,
112
8
            path_resolver,
113
8
            pow_manager,
114
8
            update_from_pow_manager_rx: publisher_update_rx,
115
8
        } = self;
116

            
117
8
        let reactor = Reactor::new(
118
8
            runtime.clone(),
119
8
            nickname,
120
8
            dir_provider,
121
8
            mockable,
122
8
            &config,
123
8
            ipt_watcher,
124
8
            config_rx,
125
8
            status_tx,
126
8
            keymgr,
127
8
            path_resolver,
128
8
            pow_manager,
129
8
            publisher_update_rx,
130
        );
131

            
132
8
        runtime
133
8
            .spawn(async move {
134
8
                match reactor.run().await {
135
                    Ok(()) => debug!("the publisher reactor has shut down"),
136
                    Err(e) => warn_report!(e, "the publisher reactor has shut down"),
137
                }
138
            })
139
8
            .map_err(|e| StartupError::Spawn {
140
                spawning: "publisher reactor task",
141
                cause: e.into(),
142
            })?;
143

            
144
8
        Ok(())
145
8
    }
146
}
147

            
148
#[cfg(test)]
149
mod test {
150
    // @@ begin test lint list maintained by maint/add_warning @@
151
    #![allow(clippy::bool_assert_comparison)]
152
    #![allow(clippy::clone_on_copy)]
153
    #![allow(clippy::dbg_macro)]
154
    #![allow(clippy::mixed_attributes_style)]
155
    #![allow(clippy::print_stderr)]
156
    #![allow(clippy::print_stdout)]
157
    #![allow(clippy::single_char_pattern)]
158
    #![allow(clippy::unwrap_used)]
159
    #![allow(clippy::unchecked_time_subtraction)]
160
    #![allow(clippy::useless_vec)]
161
    #![allow(clippy::needless_pass_by_value)]
162
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
163
    use super::*;
164

            
165
    use std::collections::HashMap;
166
    use std::io;
167
    use std::path::Path;
168
    use std::pin::Pin;
169
    use std::sync::Mutex;
170
    use std::sync::atomic::{AtomicUsize, Ordering};
171
    use std::task::{Context, Poll};
172

            
173
    use async_trait::async_trait;
174
    use fs_mistrust::Mistrust;
175
    use futures::{AsyncRead, AsyncWrite};
176
    use tempfile::{TempDir, tempdir};
177
    use test_temp_dir::test_temp_dir;
178

            
179
    use tor_basic_utils::test_rng::{TestingRng, testing_rng};
180

            
181
    use tor_hscrypto::pk::{HsBlindId, HsDescSigningKeypair, HsId, HsIdKey, HsIdKeypair};
182
    use tor_key_forge::ToEncodableKey;
183
    use tor_keymgr::{ArtiNativeKeystore, KeyMgrBuilder, KeySpecifier};
184
    use tor_llcrypto::pk::{ed25519, rsa};
185
    use tor_netdir::testprovider::TestNetDirProvider;
186
    use tor_netdir::{NetDir, testnet};
187
    use tor_netdoc::doc::hsdesc::test_data;
188
    use tor_rtcompat::ToplevelBlockOn;
189
    use tor_rtmock::MockRuntime;
190

            
191
    use crate::HsNickname;
192
    use crate::config::OnionServiceConfigBuilder;
193
    use crate::ipt_set::{IptInSet, IptSet, ipts_channel};
194
    use crate::pow::NewPowManager;
195
    use crate::publish::reactor::MockableDirTunnel;
196
    use crate::status::{OnionServiceStatus, StatusSender};
197
    use crate::test::create_storage_handles;
198
    use crate::{
199
        BlindIdKeypairSpecifier, BlindIdPublicKeySpecifier, DescSigningKeypairSpecifier,
200
        HsIdKeypairSpecifier, HsIdPublicKeySpecifier,
201
    };
202

            
203
    /// The nickname of the test service.
204
    const TEST_SVC_NICKNAME: &str = "test-svc";
205

            
206
    /// The HTTP response the HSDir returns if everything went well.
207
    const OK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n\r\n";
208

            
209
    /// The HTTP response the HSDir returns if something went wrong
210
    const ERR_RESPONSE: &str = "HTTP/1.1 500 UH_OH\r\n\r\n";
211

            
212
    /// The error doesn't matter (we return a dummy io::Error from poll_read).
213
    ///
214
    /// NOTE: ideally, this would be an io::Result, but io::Error isn't Clone (the tests need to
215
    /// clone the iterator over these Results for each HSDir).
216
    type PollReadResult<T> = Result<T, ()>;
217

            
218
    /// A trait for our poll_read response iterator.
219
    trait PollReadIter:
220
        Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
221
    {
222
    }
223

            
224
    impl<I> PollReadIter for I where
225
        I: Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
226
    {
227
    }
228

            
229
    #[derive(Clone, Debug, Default)]
230
    struct MockReactorState<I: PollReadIter> {
231
        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
232
        publish_count: Arc<AtomicUsize>,
233
        /// The values returned by `DataStream::poll_read` when uploading to an HSDir.
234
        ///
235
        /// The values represent the HTTP response (or lack thereof) each HSDir sends upon
236
        /// receiving a POST request for uploading a descriptor.
237
        ///
238
        /// Note: this field is only used for populating responses_for_hsdir. Each time
239
        /// get_or_launch_specific is called for a new CircTarget, this iterator is cloned and
240
        /// added to the responses_for_hsdir entry corresponding to the new CircTarget (HSDir).
241
        poll_read_responses: I,
242
        /// The responses that will be returned by each test HSDir (identified by its RsaIdentity).
243
        ///
244
        /// Used for testing whether the reactor correctly retries on failure.
245
        responses_for_hsdir: Arc<Mutex<HashMap<rsa::RsaIdentity, I>>>,
246
    }
247

            
248
    #[async_trait]
249
    impl<I: PollReadIter> Mockable for MockReactorState<I> {
250
        type Rng = TestingRng;
251
        type Tunnel = MockClientCirc<I>;
252

            
253
        fn thread_rng(&self) -> Self::Rng {
254
            testing_rng()
255
        }
256

            
257
        async fn get_or_launch_hs_dir<T>(
258
            &self,
259
            _netdir: &tor_netdir::NetDir,
260
            target: T,
261
        ) -> Result<Self::Tunnel, tor_circmgr::Error>
262
        where
263
            T: tor_linkspec::CircTarget + Send + Sync,
264
        {
265
            // Look up the next poll_read value to return for this relay.
266
            let id = target.rsa_identity().unwrap();
267
            let mut map = self.responses_for_hsdir.lock().unwrap();
268
            let poll_read_responses = map
269
                .entry(*id)
270
                .or_insert_with(|| self.poll_read_responses.clone());
271

            
272
            Ok(MockClientCirc {
273
                publish_count: Arc::clone(&self.publish_count),
274
                poll_read_responses: poll_read_responses.clone(),
275
            })
276
        }
277

            
278
        fn estimate_upload_timeout(&self) -> Duration {
279
            // chosen arbitrarily for testing.
280
            Duration::from_secs(30)
281
        }
282
    }
283

            
284
    #[derive(Debug, Clone)]
285
    struct MockClientCirc<I: PollReadIter> {
286
        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
287
        publish_count: Arc<AtomicUsize>,
288
        /// The values to return from `poll_read`.
289
        ///
290
        /// Used for testing whether the reactor correctly retries on failure.
291
        poll_read_responses: I,
292
    }
293

            
294
    #[async_trait]
295
    impl<I: PollReadIter> MockableDirTunnel for MockClientCirc<I> {
296
        type DataStream = MockDataStream<I>;
297

            
298
        async fn begin_dir_stream(&self) -> Result<Self::DataStream, tor_circmgr::Error> {
299
            Ok(MockDataStream {
300
                publish_count: Arc::clone(&self.publish_count),
301
                // TODO: this will need to change when we start reusing circuits (currently,
302
                // we only ever create one data stream per circuit).
303
                poll_read_responses: self.poll_read_responses.clone(),
304
                at_start: true,
305
            })
306
        }
307

            
308
        fn source_info(&self) -> tor_proto::Result<Option<tor_dirclient::SourceInfo>> {
309
            Ok(None)
310
        }
311
    }
312

            
313
    #[derive(Debug)]
314
    struct MockDataStream<I: PollReadIter> {
315
        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
316
        publish_count: Arc<AtomicUsize>,
317
        /// The values to return from `poll_read`.
318
        ///
319
        /// Used for testing whether the reactor correctly retries on failure.
320
        poll_read_responses: I,
321
        /// Are we at the start of a stream?
322
        ///
323
        /// (We keep track of this so we can assert that streams begin with a reasonable
324
        /// HTTP header.)
325
        at_start: bool,
326
    }
327

            
328
    impl<I: PollReadIter> AsyncRead for MockDataStream<I> {
329
        fn poll_read(
330
            mut self: Pin<&mut Self>,
331
            _cx: &mut Context<'_>,
332
            buf: &mut [u8],
333
        ) -> Poll<io::Result<usize>> {
334
            match self.as_mut().poll_read_responses.next() {
335
                Some(res) => {
336
                    match res {
337
                        Ok(res) => {
338
                            buf[..res.len()].copy_from_slice(res.as_bytes());
339

            
340
                            Poll::Ready(Ok(res.len()))
341
                        }
342
                        Err(()) => {
343
                            // Return an error. This should cause the reactor to reattempt the
344
                            // upload.
345
                            Poll::Ready(Err(io::Error::other("test error")))
346
                        }
347
                    }
348
                }
349
                None => Poll::Ready(Ok(0)),
350
            }
351
        }
352
    }
353

            
354
    impl<I: PollReadIter> AsyncWrite for MockDataStream<I> {
355
        fn poll_write(
356
            mut self: Pin<&mut Self>,
357
            _cx: &mut Context<'_>,
358
            buf: &[u8],
359
        ) -> Poll<io::Result<usize>> {
360
            let request = std::str::from_utf8(buf).unwrap();
361

            
362
            if self.at_start {
363
                assert!(request.starts_with("POST /tor/hs/3/publish HTTP/1.0\r\n"));
364
                let _prev = self.publish_count.fetch_add(1, Ordering::SeqCst);
365
                self.as_mut().at_start = false;
366
            }
367

            
368
            Poll::Ready(Ok(request.len()))
369
        }
370

            
371
        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
372
            Poll::Ready(Ok(()))
373
        }
374

            
375
        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
376
            Poll::Ready(Ok(()))
377
        }
378
    }
379

            
380
    /// Insert the specified key into the keystore.
381
    fn insert_svc_key<K>(key: K, keymgr: &KeyMgr, svc_key_spec: &dyn KeySpecifier)
382
    where
383
        K: ToEncodableKey,
384
    {
385
        keymgr
386
            .insert(
387
                key,
388
                svc_key_spec,
389
                tor_keymgr::KeystoreSelector::Primary,
390
                true,
391
            )
392
            .unwrap();
393
    }
394

            
395
    /// Create a new `KeyMgr`, provisioning its keystore with the necessary keys.
396
    fn init_keymgr(
397
        keystore_dir: &TempDir,
398
        nickname: &HsNickname,
399
        netdir: &NetDir,
400
    ) -> (HsId, HsBlindId, Arc<KeyMgr>) {
401
        let period = netdir.hs_time_period();
402

            
403
        let mut rng = testing_rng();
404
        let keypair = ed25519::Keypair::generate(&mut rng);
405
        let id_pub = HsIdKey::from(keypair.verifying_key());
406
        let id_keypair = HsIdKeypair::from(ed25519::ExpandedKeypair::from(&keypair));
407

            
408
        let (hs_blind_id_key, hs_blind_id_kp, _subcredential) =
409
            id_keypair.compute_blinded_key(period).unwrap();
410

            
411
        let keystore = ArtiNativeKeystore::from_path_and_mistrust(
412
            keystore_dir,
413
            &Mistrust::new_dangerously_trust_everyone(),
414
        )
415
        .unwrap();
416

            
417
        // Provision the keystore with the necessary keys:
418
        let keymgr = KeyMgrBuilder::default()
419
            .primary_store(Box::new(keystore))
420
            .build()
421
            .unwrap();
422

            
423
        insert_svc_key(
424
            id_keypair,
425
            &keymgr,
426
            &HsIdKeypairSpecifier::new(nickname.clone()),
427
        );
428

            
429
        insert_svc_key(
430
            id_pub.clone(),
431
            &keymgr,
432
            &HsIdPublicKeySpecifier::new(nickname.clone()),
433
        );
434

            
435
        insert_svc_key(
436
            hs_blind_id_kp,
437
            &keymgr,
438
            &BlindIdKeypairSpecifier::new(nickname.clone(), period),
439
        );
440

            
441
        insert_svc_key(
442
            hs_blind_id_key.clone(),
443
            &keymgr,
444
            &BlindIdPublicKeySpecifier::new(nickname.clone(), period),
445
        );
446

            
447
        insert_svc_key(
448
            HsDescSigningKeypair::from(ed25519::Keypair::generate(&mut rng)),
449
            &keymgr,
450
            &DescSigningKeypairSpecifier::new(nickname.clone(), period),
451
        );
452

            
453
        let hs_id = id_pub.into();
454
        (hs_id, hs_blind_id_key.into(), keymgr.into())
455
    }
456

            
457
    fn build_test_config(nickname: HsNickname) -> OnionServiceConfig {
458
        OnionServiceConfigBuilder::default()
459
            .nickname(nickname)
460
            .rate_limit_at_intro(None)
461
            .build()
462
            .unwrap()
463
    }
464

            
465
    #[allow(clippy::too_many_arguments)]
466
    fn run_test<I: PollReadIter>(
467
        runtime: MockRuntime,
468
        nickname: HsNickname,
469
        keymgr: Arc<KeyMgr>,
470
        pv: IptsPublisherView,
471
        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
472
        status_tx: StatusSender,
473
        netdir: NetDir,
474
        reactor_event: impl FnOnce(),
475
        poll_read_responses: I,
476
        expected_upload_count: usize,
477
        republish_count: usize,
478
        expect_errors: bool,
479
    ) {
480
        runtime.clone().block_on(async move {
481
            let netdir_provider: Arc<dyn NetDirProvider> =
482
                Arc::new(TestNetDirProvider::from(netdir));
483
            let publish_count = Default::default();
484
            let circpool = MockReactorState {
485
                publish_count: Arc::clone(&publish_count),
486
                poll_read_responses,
487
                responses_for_hsdir: Arc::new(Mutex::new(Default::default())),
488
            };
489

            
490
            let temp_dir = test_temp_dir!();
491
            let state_dir = temp_dir.subdir_untracked("state_dir");
492
            let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
493
            let state_dir = StateDirectory::new(state_dir, &mistrust).unwrap();
494
            let state_handle = state_dir.acquire_instance(&nickname).unwrap();
495
            let pow_nonce_dir = state_handle.raw_subdir("pow_nonces").unwrap();
496
            let pow_manager_storage_handle = state_handle.storage_handle("pow_manager").unwrap();
497

            
498
            let NewPowManager {
499
                pow_manager,
500
                rend_req_tx: _,
501
                rend_req_rx: _,
502
                publisher_update_rx: update_from_pow_manager_rx,
503
            } = PowManager::new(
504
                runtime.clone(),
505
                nickname.clone(),
506
                pow_nonce_dir,
507
                keymgr.clone(),
508
                pow_manager_storage_handle,
509
                netdir_provider.clone(),
510
                status_tx.clone().into(),
511
                config_rx.clone(),
512
            )
513
            .unwrap();
514
            let mut status_rx = status_tx.subscribe();
515
            let publisher: Publisher<MockRuntime, MockReactorState<_>> = Publisher::new(
516
                runtime.clone(),
517
                nickname,
518
                netdir_provider,
519
                circpool,
520
                pv,
521
                config_rx,
522
                status_tx.into(),
523
                keymgr,
524
                Arc::new(CfgPathResolver::default()),
525
                pow_manager,
526
                update_from_pow_manager_rx,
527
            );
528

            
529
            publisher.launch().unwrap();
530
            runtime.progress_until_stalled().await;
531
            let status = status_rx.next().await.unwrap().publisher_status();
532
            assert_eq!(State::Shutdown, status.state());
533
            assert!(status.current_problem().is_none());
534

            
535
            // Check that we haven't published anything yet
536
            assert_eq!(publish_count.load(Ordering::SeqCst), 0);
537

            
538
            reactor_event();
539

            
540
            runtime.progress_until_stalled().await;
541

            
542
            // We need to manually advance the time, because some of our tests check that the
543
            // failed uploads are retried, and there's a sleep() between the retries
544
            // (see BackoffSchedule::next_delay).
545
            runtime.advance_by(Duration::from_secs(1)).await;
546
            runtime.progress_until_stalled().await;
547

            
548
            let initial_publish_count = publish_count.load(Ordering::SeqCst);
549
            assert_eq!(initial_publish_count, expected_upload_count);
550

            
551
            let status = status_rx.next().await.unwrap().publisher_status();
552
            if expect_errors {
553
                // The upload results aren't ready yet.
554
                assert_eq!(State::Bootstrapping, status.state());
555
            } else {
556
                // The test network doesn't have an SRV for the previous TP,
557
                // so we are "unreachable".
558
                assert_eq!(State::DegradedUnreachable, status.state());
559
            }
560
            assert!(status.current_problem().is_none());
561

            
562
            if republish_count > 0 {
563
                /// The latest time the descriptor can be republished.
564
                const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 120);
565

            
566
                // Wait until the reactor triggers the necessary number of reuploads.
567
                runtime
568
                    .advance_by(MAX_TIMEOUT * (republish_count as u32))
569
                    .await;
570
                runtime.progress_until_stalled().await;
571

            
572
                let min_upload_count = expected_upload_count * republish_count;
573
                // There will be twice as many reuploads if the publisher happens
574
                // to reupload every hour (as opposed to every 2h).
575
                let max_upload_count = 2 * min_upload_count;
576
                let publish_count_now = publish_count.load(Ordering::SeqCst);
577
                // This is the total number of reuploads (i.e. the number of times
578
                // we published the descriptor to an HsDir).
579
                let actual_reupload_count = publish_count_now - initial_publish_count;
580

            
581
                assert!((min_upload_count..=max_upload_count).contains(&actual_reupload_count));
582
            }
583
        });
584
    }
585

            
586
    /// Test that the publisher publishes the descriptor when the IPTs change.
587
    ///
588
    /// The `poll_read_responses` are returned by each HSDir, in order, in response to each POST
589
    /// request received from the publisher.
590
    ///
591
    /// The `multiplier` represents the multiplier by which to multiply the number of HSDirs to
592
    /// obtain the total expected number of uploads (this works because the test "HSDirs" all
593
    /// behave the same, so the number of uploads is the number of HSDirs multiplied by the number
594
    /// of retries).
595
    fn publish_after_ipt_change<I: PollReadIter>(
596
        temp_dir: &Path,
597
        poll_read_responses: I,
598
        multiplier: usize,
599
        republish_count: usize,
600
        expect_errors: bool,
601
    ) {
602
        let runtime = MockRuntime::new();
603
        let nickname = HsNickname::try_from(TEST_SVC_NICKNAME.to_string()).unwrap();
604
        let config = build_test_config(nickname.clone());
605
        let (_config_tx, config_rx) = watch::channel_with(Arc::new(config));
606

            
607
        let (mut mv, pv) = ipts_channel(&runtime, create_storage_handles(temp_dir).1).unwrap();
608
        let update_ipts = || {
609
            let ipts: Vec<IptInSet> = test_data::test_parsed_hsdesc()
610
                .unwrap()
611
                .intro_points()
612
                .iter()
613
                .enumerate()
614
                .map(|(i, ipt)| IptInSet {
615
                    ipt: ipt.clone(),
616
                    lid: [i.try_into().unwrap(); 32].into(),
617
                })
618
                .collect();
619

            
620
            mv.borrow_for_update(runtime.clone()).ipts = Some(IptSet {
621
                ipts,
622
                lifetime: Duration::from_secs(20),
623
            });
624
        };
625

            
626
        let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
627
        let keystore_dir = tempdir().unwrap();
628

            
629
        let (_hsid, blind_id, keymgr) = init_keymgr(&keystore_dir, &nickname, &netdir);
630

            
631
        let hsdir_count = netdir
632
            .hs_dirs_upload(blind_id, netdir.hs_time_period())
633
            .unwrap()
634
            .collect::<Vec<_>>()
635
            .len();
636

            
637
        assert!(hsdir_count > 0);
638

            
639
        // If any of the uploads fail, they will be retried. Note that the upload failure will
640
        // affect _each_ hsdir, so the expected number of uploads is a multiple of hsdir_count.
641
        let expected_upload_count = hsdir_count * multiplier;
642
        let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown());
643

            
644
        run_test(
645
            runtime.clone(),
646
            nickname,
647
            keymgr,
648
            pv,
649
            config_rx,
650
            status_tx,
651
            netdir,
652
            update_ipts,
653
            poll_read_responses,
654
            expected_upload_count,
655
            republish_count,
656
            expect_errors,
657
        );
658
    }
659

            
660
    #[test]
661
    fn publish_after_ipt_change_no_errors() {
662
        // The HSDirs always respond with 200 OK, so we expect to publish hsdir_count times.
663
        let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
664

            
665
        test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, 0, false));
666
    }
667

            
668
    #[test]
669
    fn publish_after_ipt_change_with_errors() {
670
        let err_responses = vec![
671
            // The HSDir closed the connection without sending a response.
672
            Err(()),
673
            // The HSDir responded with an internal server error,
674
            Ok(ERR_RESPONSE.to_string()),
675
        ];
676

            
677
        for error_res in err_responses.into_iter() {
678
            let poll_reads = vec![
679
                // Each HSDir first responds with an error, which causes the publisher to retry the
680
                // upload. The HSDir then responds with "200 OK".
681
                //
682
                // We expect to publish hsdir_count * 2 times (for each HSDir, the first upload
683
                // attempt fails, but the second succeeds).
684
                error_res,
685
                Ok(OK_RESPONSE.to_string()),
686
            ]
687
            .into_iter();
688

            
689
            test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 2, 0, true));
690
        }
691
    }
692

            
693
    #[test]
694
    fn reupload_after_publishing() {
695
        let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
696
        // Test that 4 reuploads happen after the initial upload
697
        const REUPLOAD_COUNT: usize = 4;
698

            
699
        test_temp_dir!()
700
            .used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, REUPLOAD_COUNT, false));
701
    }
702

            
703
    // TODO (#1120): test that the descriptor is republished when the config changes
704

            
705
    // TODO (#1120): test that the descriptor is reuploaded only to the HSDirs that need it (i.e. the
706
    // ones for which it's dirty)
707

            
708
    // TODO (#1120): test that rate-limiting works correctly
709

            
710
    // TODO (#1120): test that the uploaded descriptor contains the expected values
711

            
712
    // TODO (#1120): test that the publisher stops publishing if the IPT manager sets the IPTs to
713
    // `None`.
714
}