1
//! Code to watch configuration files for any changes.
2

            
3
use std::sync::{Arc, Mutex, Weak};
4
use std::time::Duration;
5

            
6
use anyhow::Context;
7
use arti_client::TorClient;
8
use arti_client::config::Reconfigure;
9
use futures::StreamExt;
10
use futures::{FutureExt as _, Stream, select_biased};
11
use tor_config::file_watcher::{self, FileEventSender, FileWatcher, FileWatcherBuilder};
12
use tor_config::{ConfigurationSource, ConfigurationSources, sources::FoundConfigFiles};
13
use tor_rtcompat::Runtime;
14
use tor_rtcompat::SpawnExt;
15
use tracing::{debug, error, info, instrument, warn};
16

            
17
#[cfg(target_family = "unix")]
18
use crate::process::sighup_stream;
19

            
20
#[cfg(not(target_family = "unix"))]
21
use futures::stream;
22

            
23
use crate::{ArtiCombinedConfig, ArtiConfig};
24

            
25
/// How long to wait after an event got received, before we try to process it.
26
const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
27

            
28
/// An object that can be reconfigured when our configuration changes.
29
///
30
/// We use this trait so that we can represent abstract modules in our
31
/// application, and pass the configuration to each of them.
32
//
33
// TODO: It is very likely we will want to refactor this even further once we
34
// have a notion of what our modules truly are.
35
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36
pub(crate) trait ReconfigurableModule: Send + Sync {
37
    /// Try to reconfigure this module according to a newly loaded configuration.
38
    ///
39
    /// By convention, this should only return fatal errors; any such error
40
    /// should cause the program to exit.  For other cases, we should just warn.
41
    //
42
    // TODO: This should probably take "how: Reconfigure" as an argument, and
43
    // pass it down as appropriate. See issue #1156.
44
    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()>;
45
}
46

            
47
/// Launch a thread to reload our configuration files.
48
///
49
/// If current configuration requires it, watch for changes in `sources`
50
/// and try to reload our configuration. On unix platforms, also watch
51
/// for SIGHUP and reload configuration then.
52
///
53
/// The modules are `Weak` references to prevent this background task
54
/// from keeping them alive.
55
///
56
/// See the [`FileWatcher`](FileWatcher#Limitations) docs for limitations.
57
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
58
#[instrument(level = "trace", skip_all)]
59
pub(crate) fn watch_for_config_changes<R: Runtime>(
60
    runtime: &R,
61
    sources: ConfigurationSources,
62
    config: &ArtiConfig,
63
    modules: Vec<Weak<dyn ReconfigurableModule>>,
64
) -> anyhow::Result<()> {
65
    let watch_file = config.application().watch_configuration;
66

            
67
    cfg_if::cfg_if! {
68
        if #[cfg(target_family = "unix")] {
69
            let sighup_stream = sighup_stream()?;
70
        } else {
71
            let sighup_stream = stream::pending();
72
        }
73
    }
74

            
75
    let rt = runtime.clone();
76
    let () = runtime
77
        .clone()
78
        .spawn(async move {
79
            let res: anyhow::Result<()> = run_watcher(
80
                rt,
81
                sources,
82
                modules,
83
                watch_file,
84
                sighup_stream,
85
                Some(DEBOUNCE_INTERVAL),
86
            )
87
            .await;
88

            
89
            match res {
90
                Ok(()) => debug!("Config watcher task exiting"),
91
                // TODO: warn_report does not work on anyhow::Error.
92
                Err(e) => error!("Config watcher task exiting: {}", tor_error::Report(e)),
93
            }
94
        })
95
        .context("failed to spawn task")?;
96

            
97
    Ok(())
98
}
99

            
100
/// Start watching for configuration changes.
101
///
102
/// Spawned from `watch_for_config_changes`.
103
#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Partly due to tracing.
104
#[instrument(level = "trace", skip_all)]
105
2
async fn run_watcher<R: Runtime>(
106
2
    runtime: R,
107
2
    sources: ConfigurationSources,
108
2
    modules: Vec<Weak<dyn ReconfigurableModule>>,
109
2
    watch_file: bool,
110
2
    mut sighup_stream: impl Stream<Item = ()> + Unpin,
111
2
    debounce_interval: Option<Duration>,
112
2
) -> anyhow::Result<()> {
113
    let (tx, mut rx) = file_watcher::channel();
114
    let mut watcher = if watch_file {
115
        let mut watcher = FileWatcher::builder(runtime.clone());
116
        prepare(&mut watcher, &sources)?;
117
        Some(watcher.start_watching(tx.clone())?)
118
    } else {
119
        None
120
    };
121

            
122
    debug!("Entering FS event loop");
123

            
124
    loop {
125
        select_biased! {
126
            event = sighup_stream.next().fuse() => {
127
                let Some(()) = event else {
128
                    break;
129
                };
130

            
131
                info!("Received SIGHUP");
132
            },
133
            event = rx.next().fuse() => {
134
2
                if let Some(debounce_interval) = debounce_interval {
135
                    runtime.sleep(debounce_interval).await;
136
                }
137

            
138
                while let Some(_ignore) = rx.try_recv() {
139
                    // Discard other events, so that we only reload once.
140
                    //
141
                    // We can afford to treat both error cases from try_recv [Empty
142
                    // and Disconnected] as meaning that we've discarded other
143
                    // events: if we're disconnected, we'll notice it when we next
144
                    // call recv() in the outer loop.
145
                }
146
                debug!("Config reload event {:?}: reloading configuration.", event);
147
            },
148
        }
149

            
150
        watcher =
151
            reload_configuration(runtime.clone(), watcher, &sources, &modules, tx.clone()).await?;
152
    }
153

            
154
    Ok(())
155
2
}
156

            
157
/// Reload the configuration.
158
#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Partly due to tracing.
159
#[instrument(level = "trace", skip_all)]
160
6
async fn reload_configuration<R: Runtime>(
161
6
    runtime: R,
162
6
    mut watcher: Option<FileWatcher>,
163
6
    sources: &ConfigurationSources,
164
6
    modules: &[Weak<dyn ReconfigurableModule>],
165
6
    tx: FileEventSender,
166
6
) -> anyhow::Result<Option<FileWatcher>> {
167
    // TODO RPC: Take 'how' as an argument.
168
    let found_files = if watcher.is_some() {
169
        let mut new_watcher = FileWatcher::builder(runtime.clone());
170
        let found_files = prepare(&mut new_watcher, sources)
171
            .context("FS watch: failed to rescan config and re-establish watch")?;
172
        let new_watcher = new_watcher
173
            .start_watching(tx.clone())
174
            .context("FS watch: failed to start watching config")?;
175
        watcher = Some(new_watcher);
176
        found_files
177
    } else {
178
        sources
179
            .scan()
180
            .context("FS watch: failed to rescan config")?
181
    };
182

            
183
    match reconfigure(found_files, modules) {
184
        Ok(watch) => {
185
            info!("Successfully reloaded configuration.");
186
            if watch && watcher.is_none() {
187
                info!("Starting watching over configuration.");
188
                let mut new_watcher = FileWatcher::builder(runtime.clone());
189
                let _found_files = prepare(&mut new_watcher, sources)
190
                    .context("FS watch: failed to rescan config and re-establish watch: {}")?;
191
                let new_watcher = new_watcher
192
                    .start_watching(tx.clone())
193
                    .context("FS watch: failed to rescan config and re-establish watch: {}")?;
194
                watcher = Some(new_watcher);
195
            } else if !watch && watcher.is_some() {
196
                info!("Stopped watching over configuration.");
197
                watcher = None;
198
            }
199
        }
200
        // TODO: warn_report does not work on anyhow::Error.
201
        Err(e) => warn!("Couldn't reload configuration: {}", tor_error::Report(e)),
202
    }
203

            
204
    Ok(watcher)
205
6
}
206

            
207
/// A TorClient that we may or may not have told to start bootstrapping.
208
pub(crate) struct LaunchableTorClient<R: Runtime> {
209
    /// Original value of defer_bootstrap.
210
    orig_defer_bootstrap: bool,
211

            
212
    /// True if we have launched bootstrapping on the the client.
213
    have_launched: Mutex<bool>,
214

            
215
    /// The client itself.
216
    client: Arc<TorClient<R>>,
217
}
218

            
219
impl<R: Runtime> ReconfigurableModule for LaunchableTorClient<R> {
220
    #[instrument(level = "trace", skip_all)]
221
    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
222
        // TODO RPC: Take 'how' as an argument.
223

            
224
        if new.0.application().defer_bootstrap && !self.orig_defer_bootstrap {
225
            warn!("Cannot enable defer_bootstrap while arti is running.");
226
        }
227
        if !new.0.application().defer_bootstrap {
228
            self.ensure_bootstrap_launched()?;
229
        }
230

            
231
        TorClient::reconfigure(&self.client, &new.1, Reconfigure::WarnOnFailures)?;
232
        Ok(())
233
    }
234
}
235

            
236
impl<R: Runtime> LaunchableTorClient<R> {
237
    /// Create a new LaunchableTorClient.
238
    ///
239
    /// We assume that it has (or has not) been told to bootstrap itself based on `cfg`.
240
    pub(crate) fn new(client: Arc<TorClient<R>>, cfg: &crate::ApplicationConfig) -> Self {
241
        Self {
242
            orig_defer_bootstrap: cfg.defer_bootstrap,
243
            have_launched: Mutex::new(!cfg.defer_bootstrap),
244
            client,
245
        }
246
    }
247

            
248
    /// If we have not already told this LaunchableTorClient to bootstrap itself, do so.
249
    fn ensure_bootstrap_launched(&self) -> anyhow::Result<()> {
250
        let mut have_launched = self.have_launched.lock().expect("lock poisoned");
251

            
252
        if *have_launched {
253
            return Ok(());
254
        }
255

            
256
        let client = Arc::clone(&self.client);
257
        // We spawn this as a new task since `bootstrap` is very much async,
258
        // but this needs to be called from `reconfigure`, which is not.
259
        self.client
260
            .runtime()
261
            .spawn(async move {
262
                let _outcome = client.bootstrap().await;
263
            })
264
            .context("Launching bootstrap")?;
265

            
266
        *have_launched = true;
267
        Ok(())
268
    }
269

            
270
    /// As [`TorClient::bootstrap`], but performs necessary bookkeeping to remember
271
    /// that we have launched a bootstrap attempt.
272
    pub(crate) async fn bootstrap(&self) -> arti_client::Result<()> {
273
        *self.have_launched.lock().expect("lock poisoned") = true;
274

            
275
        self.client.bootstrap().await
276
    }
277
}
278

            
279
/// Internal type to represent the Arti application as a `ReconfigurableModule`.
280
pub(crate) struct Application {
281
    /// The configuration that Arti had at startup.
282
    ///
283
    /// We use this to check whether the user is asking for any impermissible
284
    /// transitions.
285
    original_config: ArtiConfig,
286
}
287

            
288
impl Application {
289
    /// Construct a new `Application` to receive configuration changes for the
290
    /// arti application.
291
    pub(crate) fn new(cfg: ArtiConfig) -> Self {
292
        Self {
293
            original_config: cfg,
294
        }
295
    }
296
}
297

            
298
impl ReconfigurableModule for Application {
299
    // TODO: This should probably take "how: Reconfigure" as an argument, and
300
    // pass it down as appropriate. See issue #1156.
301
    #[allow(clippy::cognitive_complexity)]
302
    #[instrument(level = "trace", skip_all)]
303
    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
304
        let original = &self.original_config;
305
        let config = &new.0;
306

            
307
        if config.proxy() != original.proxy() {
308
            warn!("Can't (yet) reconfigure proxy settings while arti is running.");
309
        }
310
        if config.logging() != original.logging() {
311
            warn!("Can't (yet) reconfigure logging settings while arti is running.");
312
        }
313
        #[cfg(feature = "rpc")]
314
        if config.rpc != original.rpc {
315
            warn!("Can't (yet) change RPC settings while arti is running.");
316
        }
317
        if config.application().permit_debugging && !original.application().permit_debugging {
318
            warn!("Cannot disable application hardening when it has already been enabled.");
319
        }
320
        // Note that this is the only config transition we actually perform so far.
321
        if !config.application().permit_debugging {
322
            #[cfg(feature = "harden")]
323
            crate::process::enable_process_hardening()?;
324
        }
325

            
326
        Ok(())
327
    }
328
}
329

            
330
/// Find the configuration files and prepare the watcher
331
8
fn prepare<'a, R: Runtime>(
332
8
    watcher: &mut FileWatcherBuilder<R>,
333
8
    sources: &'a ConfigurationSources,
334
8
) -> anyhow::Result<FoundConfigFiles<'a>> {
335
8
    let sources = sources.scan()?;
336
8
    for source in sources.iter() {
337
8
        match source {
338
            ConfigurationSource::Dir(dir) => watcher.watch_dir(dir, "toml")?,
339
8
            ConfigurationSource::File(file) => watcher.watch_path(file)?,
340
            ConfigurationSource::Verbatim(_) => {}
341
        }
342
    }
343
8
    Ok(sources)
344
8
}
345

            
346
/// Reload the configuration files, apply the runtime configuration, and
347
/// reconfigure the client as much as we can.
348
///
349
/// Return true if we should be watching for configuration changes.
350
//
351
// TODO: This should probably take "how: Reconfigure" as an argument, and
352
// pass it down as appropriate. See issue #1156.
353
#[instrument(level = "trace", skip_all)]
354
6
fn reconfigure(
355
6
    found_files: FoundConfigFiles<'_>,
356
6
    reconfigurable: &[Weak<dyn ReconfigurableModule>],
357
6
) -> anyhow::Result<bool> {
358
6
    let _ = reconfigurable;
359
6
    let config = found_files.load()?;
360
6
    let config = tor_config::resolve::<ArtiCombinedConfig>(config)?;
361

            
362
    // Filter out the modules that have been dropped
363
6
    let reconfigurable = reconfigurable.iter().flat_map(Weak::upgrade);
364
    // If there are no more modules, we should exit.
365
6
    let mut has_modules = false;
366

            
367
6
    for module in reconfigurable {
368
6
        has_modules = true;
369
6
        module.reconfigure(&config)?;
370
    }
371

            
372
6
    Ok(has_modules && config.0.application().watch_configuration)
373
6
}
374

            
375
#[cfg(test)]
376
mod test {
377
    // @@ begin test lint list maintained by maint/add_warning @@
378
    #![allow(clippy::bool_assert_comparison)]
379
    #![allow(clippy::clone_on_copy)]
380
    #![allow(clippy::dbg_macro)]
381
    #![allow(clippy::mixed_attributes_style)]
382
    #![allow(clippy::print_stderr)]
383
    #![allow(clippy::print_stdout)]
384
    #![allow(clippy::single_char_pattern)]
385
    #![allow(clippy::unwrap_used)]
386
    #![allow(clippy::unchecked_time_subtraction)]
387
    #![allow(clippy::useless_vec)]
388
    #![allow(clippy::needless_pass_by_value)]
389
    #![allow(clippy::string_slice)] // See arti#2571
390
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
391

            
392
    use crate::ArtiConfigBuilder;
393

            
394
    use super::*;
395
    use futures::SinkExt as _;
396
    use futures::channel::mpsc;
397
    use postage::watch;
398
    use std::path::PathBuf;
399
    use std::sync::{Arc, Mutex};
400
    use test_temp_dir::{TestTempDir, test_temp_dir};
401
    use tor_async_utils::PostageWatchSenderExt;
402
    use tor_config::sources::MustRead;
403

            
404
    /// Filename for config1
405
    const CONFIG_NAME1: &str = "config1.toml";
406
    /// Filename for config2
407
    const CONFIG_NAME2: &str = "config2.toml";
408
    /// Filename for config3
409
    const CONFIG_NAME3: &str = "config3.toml";
410

            
411
    struct TestModule {
412
        // A sender for sending the new config to the test function
413
        tx: Arc<Mutex<watch::Sender<ArtiCombinedConfig>>>,
414
    }
415

            
416
    impl ReconfigurableModule for TestModule {
417
        fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
418
            let config = new.clone();
419
            self.tx.lock().unwrap().maybe_send(|_| config);
420

            
421
            Ok(())
422
        }
423
    }
424

            
425
    /// Create a test reconfigurable module.
426
    ///
427
    /// Returns the module and a channel on which the new configs received by the module are sent.
428
    async fn create_module() -> (
429
        Arc<dyn ReconfigurableModule>,
430
        watch::Receiver<ArtiCombinedConfig>,
431
    ) {
432
        let (tx, mut rx) = watch::channel();
433
        // Read the initial value from the postage::watch stream
434
        // (the first observed value on this test stream is always the default config)
435
        let _: ArtiCombinedConfig = rx.next().await.unwrap();
436

            
437
        (
438
            Arc::new(TestModule {
439
                tx: Arc::new(Mutex::new(tx)),
440
            }),
441
            rx,
442
        )
443
    }
444

            
445
    /// Write `data` to file `name` within `dir`.
446
    fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
447
        let tmp = dir.as_path_untracked().join("tmp");
448
        std::fs::write(&tmp, data).unwrap();
449
        let path = dir.as_path_untracked().join(name);
450
        // Atomically write the config file
451
        std::fs::rename(tmp, &path).unwrap();
452
        path
453
    }
454

            
455
    /// Write an `ArtiConfigBuilder` to a file within `dir`.
456
    fn write_config(dir: &TestTempDir, name: &str, config: &ArtiConfigBuilder) -> PathBuf {
457
        let s = toml::to_string(&config).unwrap();
458
        write_file(dir, name, s.as_bytes())
459
    }
460

            
461
    #[test]
462
    fn watch_single_file() {
463
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
464
            let temp_dir = test_temp_dir!();
465
            let mut config_builder = ArtiConfigBuilder::default();
466
            config_builder.application().watch_configuration(true);
467

            
468
            let cfg_file = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
469
            let mut cfg_sources = ConfigurationSources::new_empty();
470
            cfg_sources.push_source(ConfigurationSource::File(cfg_file), MustRead::MustRead);
471

            
472
            let (module, mut rx) = create_module().await;
473

            
474
            config_builder.logging().log_sensitive_information(true);
475
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
476

            
477
            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
478
            // loop is entered
479
            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
480
            let runtime = rt.clone();
481
            let () = rt
482
                .spawn(async move {
483
                    run_watcher(
484
                        runtime,
485
                        cfg_sources,
486
                        vec![Arc::downgrade(&module)],
487
                        true,
488
                        sighup_rx,
489
                        None,
490
                    )
491
                    .await
492
                    .unwrap();
493
                })
494
                .unwrap();
495

            
496
            sighup_tx.send(()).await.unwrap();
497

            
498
            // The reconfigurable modules should've been reloaded in response to sighup
499
            let config = rx.next().await.unwrap();
500

            
501
            assert_eq!(config.0, config_builder.build().unwrap());
502

            
503
            // Overwrite the config
504
            config_builder.logging().log_sensitive_information(false);
505
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
506
            // The reconfigurable modules should've been reloaded in response to the config change
507
            let config = rx.next().await.unwrap();
508
            assert_eq!(config.0, config_builder.build().unwrap());
509
        });
510
    }
511

            
512
    // TODO: Ignored until #1607 is fixed
513
    #[test]
514
    #[ignore]
515
    fn watch_multiple() {
516
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
517
            let temp_dir = test_temp_dir!();
518
            let mut config_builder1 = ArtiConfigBuilder::default();
519
            config_builder1.application().watch_configuration(true);
520

            
521
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
522
            let mut cfg_sources = ConfigurationSources::new_empty();
523
            cfg_sources.push_source(
524
                ConfigurationSource::Dir(temp_dir.as_path_untracked().to_path_buf()),
525
                MustRead::MustRead,
526
            );
527

            
528
            let (module, mut rx) = create_module().await;
529
            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
530
            // loop is entered
531
            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
532
            let runtime = rt.clone();
533
            let () = rt
534
                .spawn(async move {
535
                    run_watcher(
536
                        runtime,
537
                        cfg_sources,
538
                        vec![Arc::downgrade(&module)],
539
                        true,
540
                        sighup_rx,
541
                        None,
542
                    )
543
                    .await
544
                    .unwrap();
545
                })
546
                .unwrap();
547

            
548
            config_builder1.logging().log_sensitive_information(true);
549
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
550
            sighup_tx.send(()).await.unwrap();
551
            // The reconfigurable modules should've been reloaded in response to sighup
552
            let config = rx.next().await.unwrap();
553
            assert_eq!(config.0, config_builder1.build().unwrap());
554

            
555
            let mut config_builder2 = ArtiConfigBuilder::default();
556
            config_builder2.application().watch_configuration(true);
557
            // Write another config file...
558
            config_builder2.system().max_files(0_u64);
559
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME2, &config_builder2);
560
            // Check that the 2 config files are merged
561
            let mut config_builder_combined = config_builder1.clone();
562
            config_builder_combined.system().max_files(0_u64);
563
            let config = rx.next().await.unwrap();
564
            assert_eq!(config.0, config_builder_combined.build().unwrap());
565
            // Now write a new config file to the watched dir
566
            config_builder2.logging().console("foo".to_string());
567
            let mut config_builder_combined2 = config_builder_combined.clone();
568
            config_builder_combined2
569
                .logging()
570
                .console("foo".to_string());
571
            let config3: PathBuf = write_config(&temp_dir, CONFIG_NAME3, &config_builder2);
572
            let config = rx.next().await.unwrap();
573
            assert_eq!(config.0, config_builder_combined2.build().unwrap());
574

            
575
            // Removing the file should also trigger an event
576
            std::fs::remove_file(config3).unwrap();
577
            let config = rx.next().await.unwrap();
578
            assert_eq!(config.0, config_builder_combined.build().unwrap());
579
        });
580
    }
581
}