1
//! Entry point of a Tor relay that is the [`TorRelay`] objects
2

            
3
use std::net::SocketAddr;
4
use std::path::{Path, PathBuf};
5
use std::sync::Arc;
6

            
7
use anyhow::Context;
8
use tokio::task::JoinSet;
9
use tor_proto::RelayIdentities;
10
use tracing::{debug, warn};
11

            
12
use fs_mistrust::Mistrust;
13
use tor_basic_utils::iter_join;
14
use tor_chanmgr::{ChanMgr, ChanMgrConfig, Dormancy};
15
use tor_config_path::CfgPathResolver;
16
use tor_dirmgr::DirMgrConfig;
17
use tor_keymgr::{ArtiNativeKeystore, KeyMgr, KeyMgrBuilder};
18
use tor_memquota::MemoryQuotaTracker;
19
use tor_netdir::params::NetParameters;
20
use tor_persist::state_dir::StateDirectory;
21
use tor_persist::{FsStateMgr, StateMgr};
22
use tor_rtcompat::{NetStreamProvider, Runtime};
23

            
24
use crate::client::RelayClient;
25
use crate::config::TorRelayConfig;
26

            
27
/// An initialized but unbootstrapped relay.
28
///
29
/// This intentionally does not have access to the runtime to prevent it from doing network io.
30
///
31
/// The idea is that we can build up the relay's components in an `InertTorRelay` without a runtime,
32
/// and then call `init()` on it and provide a runtime to turn it into a network-capable relay.
33
/// This gives us two advantages:
34
///
35
/// - We can initialize the internal data structures in the `InertTorRelay` (load the keystores,
36
///   configure memquota, etc), which leaves `TorRelay` to just "running" the relay (bootstrapping,
37
///   setting up listening sockets, etc). We don't need to combine the initialization and "running
38
///   the relay" all within the same object.
39
/// - We will likely want to share some of arti's key management subcommands in the future.
40
///   arti-client has an `InertTorClient` which is used so that arti subcommands can access the
41
///   keystore. If we do a similar thing here in arti-relay in the future, it might be nice to have
42
///   an `InertTorRelay` which has these internal data structures, but doesn't need a runtime or
43
///   have any networking capabilities.
44
///
45
/// Time will tell if this ends up being a bad design decision in practice, and we can always change
46
/// it later.
47
#[derive(Clone)]
48
pub(crate) struct InertTorRelay {
49
    /// The configuration options for the relay.
50
    config: TorRelayConfig,
51

            
52
    /// The configuration options for the client's directory manager.
53
    dirmgr_config: DirMgrConfig,
54

            
55
    /// Path resolver for expanding variables in [`CfgPath`](tor_config_path::CfgPath)s.
56
    #[expect(unused)] // TODO RELAY remove
57
    path_resolver: CfgPathResolver,
58

            
59
    /// State directory path.
60
    ///
61
    /// The [`StateDirectory`] stored in `state_dir` doesn't seem to have a way of getting the state
62
    /// directory path, so we need to store a copy of the path here.
63
    #[expect(unused)] // TODO RELAY remove
64
    state_path: PathBuf,
65

            
66
    /// Relay's state directory.
67
    #[expect(unused)] // TODO RELAY remove
68
    state_dir: StateDirectory,
69

            
70
    /// Location on disk where we store persistent data.
71
    state_mgr: FsStateMgr,
72

            
73
    /// Key manager holding all relay keys and certificates.
74
    keymgr: Arc<KeyMgr>,
75
}
76

            
77
impl InertTorRelay {
78
    /// Create a new Tor relay with the given configuration.
79
    pub(crate) fn new(
80
        config: TorRelayConfig,
81
        path_resolver: CfgPathResolver,
82
    ) -> anyhow::Result<Self> {
83
        let state_path = config.storage.state_dir(&path_resolver)?;
84
        let cache_path = config.storage.cache_dir(&path_resolver)?;
85

            
86
        let state_dir = StateDirectory::new(&state_path, config.storage.permissions())
87
            .context("Failed to create `StateDirectory`")?;
88
        let state_mgr =
89
            FsStateMgr::from_path_and_mistrust(&state_path, config.storage.permissions())
90
                .context("Failed to create `FsStateMgr`")?;
91

            
92
        // Try to take state ownership early, so we'll know if we have it.
93
        // Note that this `try_lock()` may return `Ok` even if we can't acquire the lock.
94
        // (At this point we don't yet care if we have it.)
95
        let _ignore_status = state_mgr
96
            .try_lock()
97
            .context("Failed to try locking the state manager")?;
98

            
99
        let keymgr = Self::create_keymgr(&state_path, config.storage.permissions())
100
            .context("Failed to create key manager")?;
101

            
102
        let dirmgr_config = DirMgrConfig {
103
            cache_dir: cache_path,
104
            cache_trust: config.storage.permissions().clone(),
105
            network: config.tor_network.clone(),
106
            schedule: Default::default(),
107
            tolerance: Default::default(),
108
            override_net_params: Default::default(),
109
            extensions: Default::default(),
110
        };
111

            
112
        Ok(Self {
113
            config,
114
            dirmgr_config,
115
            path_resolver,
116
            state_path,
117
            state_dir,
118
            state_mgr,
119
            keymgr,
120
        })
121
    }
122

            
123
    /// Connect the [`InertTorRelay`] to the Tor network.
124
    pub(crate) async fn init<R: Runtime>(self, runtime: R) -> anyhow::Result<TorRelay<R>> {
125
        // Attempt to generate any missing keys/cert from the KeyMgr.
126
        let identities = crate::tasks::crypto::try_generate_keys(&self.keymgr)
127
            .context("Failed to generate keys")?;
128

            
129
        TorRelay::init(runtime, self, identities).await
130
    }
131

            
132
    /// Create the [key manager](KeyMgr).
133
    fn create_keymgr(state_path: &Path, mistrust: &Mistrust) -> anyhow::Result<Arc<KeyMgr>> {
134
        let key_store_dir = state_path.join("keystore");
135

            
136
        let persistent_store = ArtiNativeKeystore::from_path_and_mistrust(&key_store_dir, mistrust)
137
            .context("Failed to construct the native keystore")?;
138

            
139
        // Should only log fs paths at debug level or lower,
140
        // unless they're part of a diagnostic message.
141
        debug!("Using relay keystore from {key_store_dir:?}");
142

            
143
        let keymgr = KeyMgrBuilder::default()
144
            .primary_store(Box::new(persistent_store))
145
            .build()
146
            .context("Failed to build the 'KeyMgr'")?;
147
        let keymgr = Arc::new(keymgr);
148

            
149
        // TODO: support C-tor keystore
150

            
151
        Ok(keymgr)
152
    }
153
}
154

            
155
/// Represent an active Relay on the Tor network.
156
pub(crate) struct TorRelay<R: Runtime> {
157
    /// Asynchronous runtime object.
158
    runtime: R,
159

            
160
    /// Memory quota tracker.
161
    #[expect(unused)] // TODO RELAY remove
162
    memquota: Arc<MemoryQuotaTracker>,
163

            
164
    /// A "client" used by relays to construct circuits.
165
    client: RelayClient<R>,
166

            
167
    /// Channel manager, used by circuits etc.
168
    chanmgr: Arc<ChanMgr<R>>,
169

            
170
    /// See [`InertTorRelay::keymgr`].
171
    keymgr: Arc<KeyMgr>,
172

            
173
    /// Listening OR ports.
174
    or_listeners: Vec<<R as NetStreamProvider<SocketAddr>>::Listener>,
175
}
176

            
177
impl<R: Runtime> TorRelay<R> {
178
    /// Create a new Tor relay with the given [`runtime`][tor_rtcompat].
179
    ///
180
    /// We use this to initialize components, open sockets, etc.
181
    /// Doing work with these components should happen in [`TorRelay::run()`].
182
    ///
183
    /// Expected to be called from [`InertTorRelay::init()`].
184
    async fn init(
185
        runtime: R,
186
        inert: InertTorRelay,
187
        identities: RelayIdentities,
188
    ) -> anyhow::Result<Self> {
189
        let memquota = MemoryQuotaTracker::new(&runtime, inert.config.system.memory.clone())
190
            .context("Failed to initialize memquota tracker")?;
191

            
192
        let config = ChanMgrConfig::new(inert.config.channel.clone())
193
            .with_my_addrs(inert.config.relay.advertise.all_ips())
194
            .with_identities(Arc::new(identities));
195
        let chanmgr = Arc::new(
196
            ChanMgr::new(
197
                runtime.clone(),
198
                config,
199
                Dormancy::Active,
200
                &NetParameters::default(),
201
                memquota.clone(),
202
            )
203
            .context("Failed to build chan manager")?,
204
        );
205

            
206
        let client = RelayClient::new(
207
            runtime.clone(),
208
            Arc::clone(&chanmgr),
209
            &inert.config,
210
            &inert.config,
211
            inert.dirmgr_config,
212
            inert.state_mgr,
213
        )
214
        .context("Failed to construct the relay's client")?;
215

            
216
        // An iterator of `listen()` futures with some extra error handling.
217
        let or_listeners = inert.config.relay.listen.addrs().map(async |addr| {
218
            match runtime.listen(addr).await {
219
                Ok(x) => Some(Ok(x)),
220
                // If we don't support the address family (typically IPv6), only warn.
221
                #[cfg(unix)]
222
                Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
223
                    let message =
224
                        format!("Could not listen at {addr}: address family not supported");
225
                    if addr.is_ipv6() {
226
                        warn!("{message}");
227
                    } else {
228
                        // If we got `EAFNOSUPPORT` for a non-IPv6 address, then warn louder.
229
                        tor_error::warn_report!(e, "{message}");
230
                    }
231
                    None
232
                }
233
                Err(e) => {
234
                    Some(Err(e).with_context(|| format!("Failed to listen at address {addr}")))
235
                }
236
            }
237
        });
238

            
239
        // We await the futures sequentially rather than with something like `join_all` to make
240
        // errors more reproducible.
241
        let or_listeners = {
242
            let mut awaited_listeners = vec![];
243
            for listener in or_listeners {
244
                match listener.await {
245
                    Some(Ok(x)) => awaited_listeners.push(x),
246
                    Some(Err(e)) => return Err(e),
247
                    None => {}
248
                };
249
            }
250
            awaited_listeners
251
        };
252

            
253
        // Typically we would have returned with an error if we failed to listen on an address,
254
        // but we ignore `EAFNOSUPPORT` errors above, so it's possible that all failed with
255
        // `EAFNOSUPPORT` and we ended up here.
256
        if or_listeners.is_empty() {
257
            return Err(anyhow::anyhow!(
258
                "Could not listen at any OR port addresses: {}",
259
                iter_join(", ", inert.config.relay.listen.addrs()),
260
            ));
261
        }
262

            
263
        Ok(Self {
264
            runtime,
265
            memquota,
266
            client,
267
            chanmgr,
268
            keymgr: inert.keymgr,
269
            or_listeners,
270
        })
271
    }
272

            
273
    /// Run the actual relay.
274
    ///
275
    /// This only returns if something has gone wrong.
276
    /// Otherwise it runs forever.
277
    pub(crate) async fn run(self) -> anyhow::Result<void::Void> {
278
        let mut task_handles = JoinSet::new();
279

            
280
        // Channel housekeeping task.
281
        task_handles.spawn({
282
            let mut t = crate::tasks::ChannelHouseKeepingTask::new(&self.chanmgr);
283
            async move {
284
                t.start()
285
                    .await
286
                    .context("Failed to run channel house keeping task")
287
            }
288
        });
289

            
290
        // Listen for new Tor (OR) connections.
291
        task_handles.spawn({
292
            let runtime = self.runtime.clone();
293
            let chanmgr = Arc::clone(&self.chanmgr);
294
            async {
295
                // TODO: Should we give all tasks a `start` method?
296
                crate::tasks::listeners::or_listener(runtime, chanmgr, self.or_listeners)
297
                    .await
298
                    .context("Failed to run OR listener task")
299
            }
300
        });
301

            
302
        // Start the key rotation tasks.
303
        task_handles.spawn({
304
            let runtime = self.runtime.clone();
305
            let keymgr = self.keymgr.clone();
306
            let chanmgr = self.chanmgr.clone();
307
            async {
308
                crate::tasks::crypto::rotate_keys_task(runtime, keymgr, chanmgr)
309
                    .await
310
                    .context("Failed to run key rotation task")
311
            }
312
        });
313

            
314
        // Launch client tasks.
315
        //
316
        // We need to hold on to these handles until the relay stops, otherwise dropping these
317
        // handles would stop the background tasks.
318
        //
319
        // These are `tor_rtcompat::scheduler::TaskHandle`s, which don't notify us if they
320
        // stop/crash.
321
        //
322
        // TODO: Whose responsibility is it to ensure that these background tasks don't crash?
323
        // Should we have a way of monitoring these tasks? Or should the circuit manager re-launch
324
        // crashed tasks?
325
        let _client_task_handles = self.client.launch_background_tasks();
326

            
327
        // TODO: More tasks will be spawned here.
328

            
329
        // Now that background tasks are started, bootstrap the client.
330
        self.client
331
            .bootstrap()
332
            .await
333
            .context("Failed to bootstrap the relay's client")?;
334

            
335
        // We block until facism is erradicated or a task ends which means the relay will shutdown
336
        // and facism will have one more chance.
337
        let void = task_handles
338
            .join_next()
339
            .await
340
            .context("Relay task set is empty")?
341
            .context("Relay task join failed")?
342
            .context("Relay task stopped unexpectedly")?;
343

            
344
        // We can never get here since a `Void` cannot be constructed.
345
        void::unreachable(void);
346
    }
347
}