1
//! Entry point of a Tor relay that is the [`TorRelay`] objects
2

            
3
use std::net::SocketAddr;
4
use std::path::{Path, PathBuf};
5
use std::sync::Arc;
6

            
7
use anyhow::Context;
8
use tokio::task::JoinSet;
9
use tor_proto::RelayIdentities;
10
use tracing::debug;
11
#[cfg(unix)]
12
use tracing::warn;
13

            
14
use fs_mistrust::Mistrust;
15
use tor_basic_utils::iter_join;
16
use tor_chanmgr::{ChanMgr, ChanMgrConfig, Dormancy};
17
use tor_config_path::CfgPathResolver;
18
use tor_dirmgr::DirMgrConfig;
19
use tor_keymgr::{ArtiNativeKeystore, KeyMgr, KeyMgrBuilder};
20
use tor_memquota::MemoryQuotaTracker;
21
use tor_netdir::params::NetParameters;
22
use tor_persist::state_dir::StateDirectory;
23
use tor_persist::{FsStateMgr, StateMgr};
24
use tor_rtcompat::{NetStreamProvider, Runtime};
25

            
26
use crate::client::RelayClient;
27
use crate::config::TorRelayConfig;
28

            
29
/// An initialized but unbootstrapped relay.
30
///
31
/// This intentionally does not have access to the runtime to prevent it from doing network io.
32
///
33
/// The idea is that we can build up the relay's components in an `InertTorRelay` without a runtime,
34
/// and then call `init()` on it and provide a runtime to turn it into a network-capable relay.
35
/// This gives us two advantages:
36
///
37
/// - We can initialize the internal data structures in the `InertTorRelay` (load the keystores,
38
///   configure memquota, etc), which leaves `TorRelay` to just "running" the relay (bootstrapping,
39
///   setting up listening sockets, etc). We don't need to combine the initialization and "running
40
///   the relay" all within the same object.
41
/// - We will likely want to share some of arti's key management subcommands in the future.
42
///   arti-client has an `InertTorClient` which is used so that arti subcommands can access the
43
///   keystore. If we do a similar thing here in arti-relay in the future, it might be nice to have
44
///   an `InertTorRelay` which has these internal data structures, but doesn't need a runtime or
45
///   have any networking capabilities.
46
///
47
/// Time will tell if this ends up being a bad design decision in practice, and we can always change
48
/// it later.
49
#[derive(Clone)]
50
pub(crate) struct InertTorRelay {
51
    /// The configuration options for the relay.
52
    config: TorRelayConfig,
53

            
54
    /// The configuration options for the client's directory manager.
55
    dirmgr_config: DirMgrConfig,
56

            
57
    /// Path resolver for expanding variables in [`CfgPath`](tor_config_path::CfgPath)s.
58
    #[expect(unused)] // TODO RELAY remove
59
    path_resolver: CfgPathResolver,
60

            
61
    /// State directory path.
62
    ///
63
    /// The [`StateDirectory`] stored in `state_dir` doesn't seem to have a way of getting the state
64
    /// directory path, so we need to store a copy of the path here.
65
    #[expect(unused)] // TODO RELAY remove
66
    state_path: PathBuf,
67

            
68
    /// Relay's state directory.
69
    #[expect(unused)] // TODO RELAY remove
70
    state_dir: StateDirectory,
71

            
72
    /// Location on disk where we store persistent data.
73
    state_mgr: FsStateMgr,
74

            
75
    /// Key manager holding all relay keys and certificates.
76
    keymgr: Arc<KeyMgr>,
77
}
78

            
79
impl InertTorRelay {
80
    /// Create a new Tor relay with the given configuration.
81
    pub(crate) fn new(
82
        config: TorRelayConfig,
83
        path_resolver: CfgPathResolver,
84
    ) -> anyhow::Result<Self> {
85
        let state_path = config.storage.state_dir(&path_resolver)?;
86
        let cache_path = config.storage.cache_dir(&path_resolver)?;
87

            
88
        let state_dir = StateDirectory::new(&state_path, config.storage.permissions())
89
            .context("Failed to create `StateDirectory`")?;
90
        let state_mgr =
91
            FsStateMgr::from_path_and_mistrust(&state_path, config.storage.permissions())
92
                .context("Failed to create `FsStateMgr`")?;
93

            
94
        // Try to take state ownership early, so we'll know if we have it.
95
        // Note that this `try_lock()` may return `Ok` even if we can't acquire the lock.
96
        // (At this point we don't yet care if we have it.)
97
        let _ignore_status = state_mgr
98
            .try_lock()
99
            .context("Failed to try locking the state manager")?;
100

            
101
        let keymgr = Self::create_keymgr(&state_path, config.storage.permissions())
102
            .context("Failed to create key manager")?;
103

            
104
        let dirmgr_config = DirMgrConfig {
105
            cache_dir: cache_path,
106
            cache_trust: config.storage.permissions().clone(),
107
            network: config.tor_network.clone(),
108
            schedule: Default::default(),
109
            tolerance: Default::default(),
110
            override_net_params: Default::default(),
111
            extensions: Default::default(),
112
        };
113

            
114
        Ok(Self {
115
            config,
116
            dirmgr_config,
117
            path_resolver,
118
            state_path,
119
            state_dir,
120
            state_mgr,
121
            keymgr,
122
        })
123
    }
124

            
125
    /// Connect the [`InertTorRelay`] to the Tor network.
126
    pub(crate) async fn init<R: Runtime>(self, runtime: R) -> anyhow::Result<TorRelay<R>> {
127
        // Attempt to generate any missing keys/cert from the KeyMgr.
128
        let identities = crate::tasks::crypto::try_generate_keys(&runtime, &self.keymgr)
129
            .context("Failed to generate keys")?;
130

            
131
        TorRelay::init(runtime, self, identities).await
132
    }
133

            
134
    /// Create the [key manager](KeyMgr).
135
    fn create_keymgr(state_path: &Path, mistrust: &Mistrust) -> anyhow::Result<Arc<KeyMgr>> {
136
        let key_store_dir = state_path.join("keystore");
137

            
138
        let persistent_store = ArtiNativeKeystore::from_path_and_mistrust(&key_store_dir, mistrust)
139
            .context("Failed to construct the native keystore")?;
140

            
141
        // Should only log fs paths at debug level or lower,
142
        // unless they're part of a diagnostic message.
143
        debug!("Using relay keystore from {key_store_dir:?}");
144

            
145
        let keymgr = KeyMgrBuilder::default()
146
            .primary_store(Box::new(persistent_store))
147
            .build()
148
            .context("Failed to build the 'KeyMgr'")?;
149
        let keymgr = Arc::new(keymgr);
150

            
151
        // TODO: support C-tor keystore
152

            
153
        Ok(keymgr)
154
    }
155
}
156

            
157
/// Represent an active Relay on the Tor network.
158
pub(crate) struct TorRelay<R: Runtime> {
159
    /// Asynchronous runtime object.
160
    runtime: R,
161

            
162
    /// Memory quota tracker.
163
    #[expect(unused)] // TODO RELAY remove
164
    memquota: Arc<MemoryQuotaTracker>,
165

            
166
    /// A "client" used by relays to construct circuits.
167
    client: RelayClient<R>,
168

            
169
    /// Channel manager, used by circuits etc.
170
    chanmgr: Arc<ChanMgr<R>>,
171

            
172
    /// See [`InertTorRelay::keymgr`].
173
    keymgr: Arc<KeyMgr>,
174

            
175
    /// Listening OR ports.
176
    or_listeners: Vec<<R as NetStreamProvider<SocketAddr>>::Listener>,
177
}
178

            
179
impl<R: Runtime> TorRelay<R> {
180
    /// Create a new Tor relay with the given [`runtime`][tor_rtcompat].
181
    ///
182
    /// We use this to initialize components, open sockets, etc.
183
    /// Doing work with these components should happen in [`TorRelay::run()`].
184
    ///
185
    /// Expected to be called from [`InertTorRelay::init()`].
186
    async fn init(
187
        runtime: R,
188
        inert: InertTorRelay,
189
        identities: RelayIdentities,
190
    ) -> anyhow::Result<Self> {
191
        let memquota = MemoryQuotaTracker::new(&runtime, inert.config.system.memory.clone())
192
            .context("Failed to initialize memquota tracker")?;
193

            
194
        let config = ChanMgrConfig::new(inert.config.channel.clone())
195
            .with_my_addrs(inert.config.relay.advertise.all_ips())
196
            .with_identities(Arc::new(identities));
197
        let chanmgr = Arc::new(
198
            ChanMgr::new(
199
                runtime.clone(),
200
                config,
201
                Dormancy::Active,
202
                &NetParameters::default(),
203
                memquota.clone(),
204
            )
205
            .context("Failed to build chan manager")?,
206
        );
207

            
208
        let client = RelayClient::new(
209
            runtime.clone(),
210
            Arc::clone(&chanmgr),
211
            &inert.config,
212
            &inert.config,
213
            inert.dirmgr_config,
214
            inert.state_mgr,
215
        )
216
        .context("Failed to construct the relay's client")?;
217

            
218
        // An iterator of `listen()` futures with some extra error handling.
219
        let or_listeners = inert.config.relay.listen.addrs().map(async |addr| {
220
            match runtime.listen(addr).await {
221
                Ok(x) => Some(Ok(x)),
222
                // If we don't support the address family (typically IPv6), only warn.
223
                #[cfg(unix)]
224
                Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
225
                    let message =
226
                        format!("Could not listen at {addr}: address family not supported");
227
                    if addr.is_ipv6() {
228
                        warn!("{message}");
229
                    } else {
230
                        // If we got `EAFNOSUPPORT` for a non-IPv6 address, then warn louder.
231
                        tor_error::warn_report!(e, "{message}");
232
                    }
233
                    None
234
                }
235
                Err(e) => {
236
                    Some(Err(e).with_context(|| format!("Failed to listen at address {addr}")))
237
                }
238
            }
239
        });
240

            
241
        // We await the futures sequentially rather than with something like `join_all` to make
242
        // errors more reproducible.
243
        let or_listeners = {
244
            let mut awaited_listeners = vec![];
245
            for listener in or_listeners {
246
                match listener.await {
247
                    Some(Ok(x)) => awaited_listeners.push(x),
248
                    Some(Err(e)) => return Err(e),
249
                    None => {}
250
                };
251
            }
252
            awaited_listeners
253
        };
254

            
255
        // Typically we would have returned with an error if we failed to listen on an address,
256
        // but we ignore `EAFNOSUPPORT` errors above, so it's possible that all failed with
257
        // `EAFNOSUPPORT` and we ended up here.
258
        if or_listeners.is_empty() {
259
            return Err(anyhow::anyhow!(
260
                "Could not listen at any OR port addresses: {}",
261
                iter_join(", ", inert.config.relay.listen.addrs()),
262
            ));
263
        }
264

            
265
        Ok(Self {
266
            runtime,
267
            memquota,
268
            client,
269
            chanmgr,
270
            keymgr: inert.keymgr,
271
            or_listeners,
272
        })
273
    }
274

            
275
    /// Run the actual relay.
276
    ///
277
    /// This only returns if something has gone wrong.
278
    /// Otherwise it runs forever.
279
    pub(crate) async fn run(self) -> anyhow::Result<void::Void> {
280
        let mut task_handles = JoinSet::new();
281

            
282
        // Channel housekeeping task.
283
        task_handles.spawn({
284
            let mut t = crate::tasks::ChannelHouseKeepingTask::new(&self.chanmgr);
285
            async move {
286
                t.start()
287
                    .await
288
                    .context("Failed to run channel house keeping task")
289
            }
290
        });
291

            
292
        // Listen for new Tor (OR) connections.
293
        task_handles.spawn({
294
            let runtime = self.runtime.clone();
295
            let chanmgr = Arc::clone(&self.chanmgr);
296
            async {
297
                // TODO: Should we give all tasks a `start` method?
298
                crate::tasks::listeners::or_listener(runtime, chanmgr, self.or_listeners)
299
                    .await
300
                    .context("Failed to run OR listener task")
301
            }
302
        });
303

            
304
        // Start the key rotation tasks.
305
        task_handles.spawn({
306
            let runtime = self.runtime.clone();
307
            let keymgr = self.keymgr.clone();
308
            let chanmgr = self.chanmgr.clone();
309
            async {
310
                crate::tasks::crypto::rotate_keys_task(runtime, keymgr, chanmgr)
311
                    .await
312
                    .context("Failed to run key rotation task")
313
            }
314
        });
315

            
316
        // Launch client tasks.
317
        //
318
        // We need to hold on to these handles until the relay stops, otherwise dropping these
319
        // handles would stop the background tasks.
320
        //
321
        // These are `tor_rtcompat::scheduler::TaskHandle`s, which don't notify us if they
322
        // stop/crash.
323
        //
324
        // TODO: Whose responsibility is it to ensure that these background tasks don't crash?
325
        // Should we have a way of monitoring these tasks? Or should the circuit manager re-launch
326
        // crashed tasks?
327
        let _client_task_handles = self.client.launch_background_tasks();
328

            
329
        // TODO: More tasks will be spawned here.
330

            
331
        // Now that background tasks are started, bootstrap the client.
332
        self.client
333
            .bootstrap()
334
            .await
335
            .context("Failed to bootstrap the relay's client")?;
336

            
337
        // We block until facism is erradicated or a task ends which means the relay will shutdown
338
        // and facism will have one more chance.
339
        let void = task_handles
340
            .join_next()
341
            .await
342
            .context("Relay task set is empty")?
343
            .context("Relay task join failed")?
344
            .context("Relay task stopped unexpectedly")?;
345

            
346
        // We can never get here since a `Void` cannot be constructed.
347
        void::unreachable(void);
348
    }
349

            
350
    /// Access the relay's key manager.
351
    pub(crate) fn keymgr(&self) -> &Arc<KeyMgr> {
352
        &self.keymgr
353
    }
354
}