1
//! Code to watch configuration files for any changes.
2

            
3
use std::sync::Weak;
4
use std::time::Duration;
5

            
6
use anyhow::Context;
7
use arti_client::config::Reconfigure;
8
use arti_client::TorClient;
9
use futures::{select_biased, FutureExt as _, Stream};
10
use tor_config::file_watcher::{self, FileWatcherBuilder, FileEventSender, FileWatcher};
11
use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSources};
12
use tor_rtcompat::Runtime;
13
use tracing::{debug, error, info, instrument, warn};
14
use tor_rtcompat::SpawnExt;
15
use futures::StreamExt;
16

            
17
#[cfg(target_family = "unix")]
18
use crate::process::sighup_stream;
19

            
20
#[cfg(not(target_family = "unix"))]
21
use futures::stream;
22

            
23
use crate::{ArtiCombinedConfig, ArtiConfig};
24

            
25
/// How long to wait after an event got received, before we try to process it.
26
const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
27

            
28
/// An object that can be reconfigured when our configuration changes.
29
///
30
/// We use this trait so that we can represent abstract modules in our
31
/// application, and pass the configuration to each of them.
32
//
33
// TODO: It is very likely we will want to refactor this even further once we
34
// have a notion of what our modules truly are.
35
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36
pub(crate) trait ReconfigurableModule: Send + Sync {
37
    /// Try to reconfigure this module according to a newly loaded configuration.
38
    ///
39
    /// By convention, this should only return fatal errors; any such error
40
    /// should cause the program to exit.  For other cases, we should just warn.
41
    //
42
    // TODO: This should probably take "how: Reconfigure" as an argument, and
43
    // pass it down as appropriate. See issue #1156.
44
    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()>;
45
}
46

            
47
/// Launch a thread to reload our configuration files.
48
///
49
/// If current configuration requires it, watch for changes in `sources`
50
/// and try to reload our configuration. On unix platforms, also watch
51
/// for SIGHUP and reload configuration then.
52
///
53
/// The modules are `Weak` references to prevent this background task
54
/// from keeping them alive.
55
///
56
/// See the [`FileWatcher`](FileWatcher#Limitations) docs for limitations.
57
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
58
#[instrument(level = "trace", skip_all)]
59
pub(crate) fn watch_for_config_changes<R: Runtime>(
60
    runtime: &R,
61
    sources: ConfigurationSources,
62
    config: &ArtiConfig,
63
    modules: Vec<Weak<dyn ReconfigurableModule>>,
64
) -> anyhow::Result<()> {
65
    let watch_file = config.application().watch_configuration;
66

            
67
    cfg_if::cfg_if! {
68
        if #[cfg(target_family = "unix")] {
69
            let sighup_stream = sighup_stream()?;
70
        } else {
71
            let sighup_stream = stream::pending();
72
        }
73
    }
74

            
75
    let rt = runtime.clone();
76
    let () = runtime.clone().spawn(async move {
77
        let res: anyhow::Result<()> = run_watcher(
78
            rt,
79
            sources,
80
            modules,
81
            watch_file,
82
            sighup_stream,
83
            Some(DEBOUNCE_INTERVAL)
84
        ).await;
85

            
86
        match res {
87
            Ok(()) => debug!("Config watcher task exiting"),
88
            // TODO: warn_report does not work on anyhow::Error.
89
            Err(e) => error!("Config watcher task exiting: {}", tor_error::Report(e)),
90
        }
91
    }).context("failed to spawn task")?;
92

            
93
    Ok(())
94
}
95

            
96
/// Start watching for configuration changes.
97
///
98
/// Spawned from `watch_for_config_changes`.
99
#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Partly due to tracing.
100
#[instrument(level = "trace", skip_all)]
101
2
async fn run_watcher<R: Runtime>(
102
2
    runtime: R,
103
2
    sources: ConfigurationSources,
104
2
    modules: Vec<Weak<dyn ReconfigurableModule>>,
105
2
    watch_file: bool,
106
2
    mut sighup_stream: impl Stream<Item = ()> + Unpin,
107
2
    debounce_interval: Option<Duration>,
108
2
) -> anyhow::Result<()> {
109
    let (tx, mut rx) = file_watcher::channel();
110
    let mut watcher = if watch_file {
111
        let mut watcher = FileWatcher::builder(runtime.clone());
112
        prepare(&mut watcher, &sources)?;
113
        Some(watcher.start_watching(tx.clone())?)
114
    } else {
115
        None
116
    };
117

            
118
    debug!("Entering FS event loop");
119

            
120
    loop {
121
        select_biased! {
122
            event = sighup_stream.next().fuse() => {
123
                let Some(()) = event else {
124
                    break;
125
                };
126

            
127
                info!("Received SIGHUP");
128

            
129
                watcher = reload_configuration(
130
                    runtime.clone(),
131
                    watcher,
132
                    &sources,
133
                    &modules,
134
                    tx.clone()
135
                ).await?;
136
            },
137
            event = rx.next().fuse() => {
138
2
                if let Some(debounce_interval) = debounce_interval {
139
                    runtime.sleep(debounce_interval).await;
140
                }
141

            
142
                while let Some(_ignore) = rx.try_recv() {
143
                    // Discard other events, so that we only reload once.
144
                    //
145
                    // We can afford to treat both error cases from try_recv [Empty
146
                    // and Disconnected] as meaning that we've discarded other
147
                    // events: if we're disconnected, we'll notice it when we next
148
                    // call recv() in the outer loop.
149
                }
150
                debug!("Config reload event {:?}: reloading configuration.", event);
151
                watcher = reload_configuration(
152
                    runtime.clone(),
153
                    watcher,
154
                    &sources,
155
                    &modules,
156
                    tx.clone()
157
                ).await?;
158
            },
159
        }
160
    }
161

            
162
    Ok(())
163
2
}
164

            
165
/// Reload the configuration.
166
#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Partly due to tracing.
167
#[instrument(level = "trace", skip_all)]
168
6
async fn reload_configuration<R: Runtime>(
169
6
    runtime: R,
170
6
    mut watcher: Option<FileWatcher>,
171
6
    sources: &ConfigurationSources,
172
6
    modules: &[Weak<dyn ReconfigurableModule>],
173
6
    tx: FileEventSender,
174
6
) -> anyhow::Result<Option<FileWatcher>> {
175

            
176
    let found_files = if watcher.is_some() {
177
        let mut new_watcher = FileWatcher::builder(runtime.clone());
178
        let found_files = prepare(&mut new_watcher, sources)
179
            .context("FS watch: failed to rescan config and re-establish watch")?;
180
        let new_watcher = new_watcher
181
            .start_watching(tx.clone())
182
            .context("FS watch: failed to start watching config")?;
183
        watcher = Some(new_watcher);
184
        found_files
185
    } else {
186
        sources
187
            .scan()
188
            .context("FS watch: failed to rescan config")?
189
    };
190

            
191
    match reconfigure(found_files, modules) {
192
        Ok(watch) => {
193
            info!("Successfully reloaded configuration.");
194
            if watch && watcher.is_none() {
195
                info!("Starting watching over configuration.");
196
                let mut new_watcher = FileWatcher::builder(runtime.clone());
197
                let _found_files = prepare(&mut new_watcher, sources).context(
198
                    "FS watch: failed to rescan config and re-establish watch: {}",
199
                )?;
200
                let new_watcher = new_watcher.start_watching(tx.clone()).context(
201
                    "FS watch: failed to rescan config and re-establish watch: {}",
202
                )?;
203
                watcher = Some(new_watcher);
204
            } else if !watch && watcher.is_some() {
205
                info!("Stopped watching over configuration.");
206
                watcher = None;
207
            }
208
        }
209
        // TODO: warn_report does not work on anyhow::Error.
210
        Err(e) => warn!("Couldn't reload configuration: {}", tor_error::Report(e)),
211
    }
212

            
213
    Ok(watcher)
214
6
}
215

            
216
impl<R: Runtime> ReconfigurableModule for TorClient<R> {
217
    #[instrument(level = "trace", skip_all)]
218
    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
219
        TorClient::reconfigure(self, &new.1, Reconfigure::WarnOnFailures)?;
220
        Ok(())
221
    }
222
}
223

            
224
/// Internal type to represent the Arti application as a `ReconfigurableModule`.
225
pub(crate) struct Application {
226
    /// The configuration that Arti had at startup.
227
    ///
228
    /// We use this to check whether the user is asking for any impermissible
229
    /// transitions.
230
    original_config: ArtiConfig,
231
}
232

            
233
impl Application {
234
    /// Construct a new `Application` to receive configuration changes for the
235
    /// arti application.
236
    pub(crate) fn new(cfg: ArtiConfig) -> Self {
237
        Self {
238
            original_config: cfg,
239
        }
240
    }
241
}
242

            
243
impl ReconfigurableModule for Application {
244
    // TODO: This should probably take "how: Reconfigure" as an argument, and
245
    // pass it down as appropriate. See issue #1156.
246
    #[allow(clippy::cognitive_complexity)]
247
    #[instrument(level = "trace", skip_all)]
248
    fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
249
        let original = &self.original_config;
250
        let config = &new.0;
251

            
252
        if config.proxy() != original.proxy() {
253
            warn!("Can't (yet) reconfigure proxy settings while arti is running.");
254
        }
255
        if config.logging() != original.logging() {
256
            warn!("Can't (yet) reconfigure logging settings while arti is running.");
257
        }
258
        #[cfg(feature = "rpc")]
259
        if config.rpc != original.rpc {
260
            warn!("Can't (yet) change RPC settings while arti is running.");
261
        }
262
        if config.application().permit_debugging && !original.application().permit_debugging {
263
            warn!("Cannot disable application hardening when it has already been enabled.");
264
        }
265

            
266
        // Note that this is the only config transition we actually perform so far.
267
        if !config.application().permit_debugging {
268
            #[cfg(feature = "harden")]
269
            crate::process::enable_process_hardening()?;
270
        }
271

            
272
        Ok(())
273
    }
274
}
275

            
276
/// Find the configuration files and prepare the watcher
277
8
fn prepare<'a, R: Runtime>(
278
8
    watcher: &mut FileWatcherBuilder<R>,
279
8
    sources: &'a ConfigurationSources,
280
8
) -> anyhow::Result<FoundConfigFiles<'a>> {
281
8
    let sources = sources.scan()?;
282
8
    for source in sources.iter() {
283
8
        match source {
284
            ConfigurationSource::Dir(dir) => watcher.watch_dir(dir, "toml")?,
285
8
            ConfigurationSource::File(file) => watcher.watch_path(file)?,
286
            ConfigurationSource::Verbatim(_) => {}
287
        }
288
    }
289
8
    Ok(sources)
290
8
}
291

            
292
/// Reload the configuration files, apply the runtime configuration, and
293
/// reconfigure the client as much as we can.
294
///
295
/// Return true if we should be watching for configuration changes.
296
//
297
// TODO: This should probably take "how: Reconfigure" as an argument, and
298
// pass it down as appropriate. See issue #1156.
299
#[instrument(level = "trace", skip_all)]
300
6
fn reconfigure(
301
6
    found_files: FoundConfigFiles<'_>,
302
6
    reconfigurable: &[Weak<dyn ReconfigurableModule>],
303
6
) -> anyhow::Result<bool> {
304
6
    let _ = reconfigurable;
305
6
    let config = found_files.load()?;
306
6
    let config = tor_config::resolve::<ArtiCombinedConfig>(config)?;
307

            
308
    // Filter out the modules that have been dropped
309
6
    let reconfigurable = reconfigurable.iter().flat_map(Weak::upgrade);
310
    // If there are no more modules, we should exit.
311
6
    let mut has_modules = false;
312

            
313
12
    for module in reconfigurable {
314
6
        has_modules = true;
315
6
        module.reconfigure(&config)?;
316
    }
317

            
318
6
    Ok(has_modules && config.0.application().watch_configuration)
319
6
}
320

            
321
#[cfg(test)]
322
mod test {
323
    // @@ begin test lint list maintained by maint/add_warning @@
324
    #![allow(clippy::bool_assert_comparison)]
325
    #![allow(clippy::clone_on_copy)]
326
    #![allow(clippy::dbg_macro)]
327
    #![allow(clippy::mixed_attributes_style)]
328
    #![allow(clippy::print_stderr)]
329
    #![allow(clippy::print_stdout)]
330
    #![allow(clippy::single_char_pattern)]
331
    #![allow(clippy::unwrap_used)]
332
    #![allow(clippy::unchecked_time_subtraction)]
333
    #![allow(clippy::useless_vec)]
334
    #![allow(clippy::needless_pass_by_value)]
335
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
336

            
337
    use crate::ArtiConfigBuilder;
338

            
339
    use super::*;
340
    use futures::channel::mpsc;
341
    use futures::SinkExt as _;
342
    use tor_config::sources::MustRead;
343
    use std::path::PathBuf;
344
    use std::sync::{Arc, Mutex};
345
    use test_temp_dir::{test_temp_dir, TestTempDir};
346
    use postage::watch;
347
    use tor_async_utils::PostageWatchSenderExt;
348

            
349
    /// Filename for config1
350
    const CONFIG_NAME1: &str = "config1.toml";
351
    /// Filename for config2
352
    const CONFIG_NAME2: &str = "config2.toml";
353
    /// Filename for config3
354
    const CONFIG_NAME3: &str = "config3.toml";
355

            
356
    struct TestModule {
357
        // A sender for sending the new config to the test function
358
        tx: Arc<Mutex<watch::Sender<ArtiCombinedConfig>>>,
359
    }
360

            
361
    impl ReconfigurableModule for TestModule {
362
        fn reconfigure(&self, new: &ArtiCombinedConfig) -> anyhow::Result<()> {
363
            let config = new.clone();
364
            self.tx.lock().unwrap().maybe_send(|_| config);
365

            
366
            Ok(())
367
        }
368
    }
369

            
370
    /// Create a test reconfigurable module.
371
    ///
372
    /// Returns the module and a channel on which the new configs received by the module are sent.
373
    async fn create_module(
374
    ) -> (Arc<dyn ReconfigurableModule>, watch::Receiver<ArtiCombinedConfig>) {
375
        let (tx, mut rx) = watch::channel();
376
        // Read the initial value from the postage::watch stream
377
        // (the first observed value on this test stream is always the default config)
378
        let _: ArtiCombinedConfig = rx.next().await.unwrap();
379

            
380
        (Arc::new(TestModule { tx: Arc::new(Mutex::new(tx)) }), rx)
381
    }
382

            
383
    /// Write `data` to file `name` within `dir`.
384
    fn write_file(dir: &TestTempDir, name: &str, data: &[u8]) -> PathBuf {
385
        let tmp = dir.as_path_untracked().join("tmp");
386
        std::fs::write(&tmp, data).unwrap();
387
        let path = dir.as_path_untracked().join(name);
388
        // Atomically write the config file
389
        std::fs::rename(tmp, &path).unwrap();
390
        path
391
    }
392

            
393
    /// Write an `ArtiConfigBuilder` to a file within `dir`.
394
    fn write_config(dir: &TestTempDir, name: &str, config: &ArtiConfigBuilder) -> PathBuf {
395
        let s = toml::to_string(&config).unwrap();
396
        write_file(dir, name, s.as_bytes())
397
    }
398

            
399
    #[test]
400
    fn watch_single_file() {
401
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
402
            let temp_dir = test_temp_dir!();
403
            let mut config_builder =  ArtiConfigBuilder::default();
404
            config_builder.application().watch_configuration(true);
405

            
406
            let cfg_file = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
407
            let mut cfg_sources = ConfigurationSources::new_empty();
408
            cfg_sources.push_source(ConfigurationSource::File(cfg_file), MustRead::MustRead);
409

            
410
            let (module, mut rx) = create_module().await;
411

            
412
            config_builder.logging().log_sensitive_information(true);
413
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
414

            
415
            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
416
            // loop is entered
417
            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
418
            let runtime = rt.clone();
419
            let () = rt.spawn(async move {
420
                run_watcher(
421
                    runtime,
422
                    cfg_sources,
423
                    vec![Arc::downgrade(&module)],
424
                    true,
425
                    sighup_rx,
426
                    None,
427
                ).await.unwrap();
428
            }).unwrap();
429

            
430
            sighup_tx.send(()).await.unwrap();
431

            
432
            // The reconfigurable modules should've been reloaded in response to sighup
433
            let config = rx.next().await.unwrap();
434

            
435
            assert_eq!(config.0, config_builder.build().unwrap());
436

            
437
            // Overwrite the config
438
            config_builder.logging().log_sensitive_information(false);
439
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder);
440
            // The reconfigurable modules should've been reloaded in response to the config change
441
            let config = rx.next().await.unwrap();
442
            assert_eq!(config.0, config_builder.build().unwrap());
443

            
444
        });
445
    }
446

            
447
    // TODO: Ignored until #1607 is fixed
448
    #[test]
449
    #[ignore]
450
    fn watch_multiple() {
451
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
452
            let temp_dir = test_temp_dir!();
453
            let mut config_builder1 =  ArtiConfigBuilder::default();
454
            config_builder1.application().watch_configuration(true);
455

            
456
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
457
            let mut cfg_sources = ConfigurationSources::new_empty();
458
            cfg_sources.push_source(
459
                ConfigurationSource::Dir(temp_dir.as_path_untracked().to_path_buf()),
460
                MustRead::MustRead
461
            );
462

            
463
            let (module, mut rx) = create_module().await;
464
            // Use a fake sighup stream to wait until run_watcher()'s select_biased!
465
            // loop is entered
466
            let (mut sighup_tx, sighup_rx) = mpsc::unbounded();
467
            let runtime = rt.clone();
468
            let () = rt.spawn(async move {
469
                run_watcher(
470
                    runtime,
471
                    cfg_sources,
472
                    vec![Arc::downgrade(&module)],
473
                    true,
474
                    sighup_rx,
475
                    None,
476
                ).await.unwrap();
477
            }).unwrap();
478

            
479
            config_builder1.logging().log_sensitive_information(true);
480
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME1, &config_builder1);
481
            sighup_tx.send(()).await.unwrap();
482
            // The reconfigurable modules should've been reloaded in response to sighup
483
            let config = rx.next().await.unwrap();
484
            assert_eq!(config.0, config_builder1.build().unwrap());
485

            
486
            let mut config_builder2 =  ArtiConfigBuilder::default();
487
            config_builder2.application().watch_configuration(true);
488
            // Write another config file...
489
            config_builder2.system().max_files(0_u64);
490
            let _: PathBuf = write_config(&temp_dir, CONFIG_NAME2, &config_builder2);
491
            // Check that the 2 config files are merged
492
            let mut config_builder_combined = config_builder1.clone();
493
            config_builder_combined.system().max_files(0_u64);
494
            let config = rx.next().await.unwrap();
495
            assert_eq!(config.0, config_builder_combined.build().unwrap());
496
            // Now write a new config file to the watched dir
497
            config_builder2.logging().console("foo".to_string());
498
            let mut config_builder_combined2 = config_builder_combined.clone();
499
            config_builder_combined2.logging().console("foo".to_string());
500
            let config3: PathBuf = write_config(&temp_dir, CONFIG_NAME3, &config_builder2);
501
            let config = rx.next().await.unwrap();
502
            assert_eq!(config.0, config_builder_combined2.build().unwrap());
503

            
504
            // Removing the file should also trigger an event
505
            std::fs::remove_file(config3).unwrap();
506
            let config = rx.next().await.unwrap();
507
            assert_eq!(config.0, config_builder_combined.build().unwrap());
508
        });
509
    }
510
}