1
//! Entry point of a Tor relay that is the [`TorRelay`] objects
2

            
3
use std::net::SocketAddr;
4
use std::path::{Path, PathBuf};
5
use std::sync::Arc;
6

            
7
use anyhow::Context;
8
use tokio::task::JoinSet;
9
use tor_proto::RelayIdentities;
10
use tracing::{debug, warn};
11

            
12
use fs_mistrust::Mistrust;
13
use tor_chanmgr::{ChanMgr, ChanMgrConfig, Dormancy};
14
use tor_config_path::CfgPathResolver;
15
use tor_dirmgr::DirMgrConfig;
16
use tor_keymgr::{ArtiNativeKeystore, KeyMgr, KeyMgrBuilder};
17
use tor_memquota::MemoryQuotaTracker;
18
use tor_netdir::params::NetParameters;
19
use tor_persist::state_dir::StateDirectory;
20
use tor_persist::{FsStateMgr, StateMgr};
21
use tor_rtcompat::{NetStreamProvider, Runtime};
22

            
23
use crate::client::RelayClient;
24
use crate::config::TorRelayConfig;
25

            
26
/// An initialized but unbootstrapped relay.
27
///
28
/// This intentionally does not have access to the runtime to prevent it from doing network io.
29
///
30
/// The idea is that we can build up the relay's components in an `InertTorRelay` without a runtime,
31
/// and then call `init()` on it and provide a runtime to turn it into a network-capable relay.
32
/// This gives us two advantages:
33
///
34
/// - We can initialize the internal data structures in the `InertTorRelay` (load the keystores,
35
///   configure memquota, etc), which leaves `TorRelay` to just "running" the relay (bootstrapping,
36
///   setting up listening sockets, etc). We don't need to combine the initialization and "running
37
///   the relay" all within the same object.
38
/// - We will likely want to share some of arti's key management subcommands in the future.
39
///   arti-client has an `InertTorClient` which is used so that arti subcommands can access the
40
///   keystore. If we do a similar thing here in arti-relay in the future, it might be nice to have
41
///   an `InertTorRelay` which has these internal data structures, but doesn't need a runtime or
42
///   have any networking capabilities.
43
///
44
/// Time will tell if this ends up being a bad design decision in practice, and we can always change
45
/// it later.
46
#[derive(Clone)]
47
pub(crate) struct InertTorRelay {
48
    /// The configuration options for the relay.
49
    config: TorRelayConfig,
50

            
51
    /// The configuration options for the client's directory manager.
52
    dirmgr_config: DirMgrConfig,
53

            
54
    /// Path resolver for expanding variables in [`CfgPath`](tor_config_path::CfgPath)s.
55
    #[expect(unused)] // TODO RELAY remove
56
    path_resolver: CfgPathResolver,
57

            
58
    /// State directory path.
59
    ///
60
    /// The [`StateDirectory`] stored in `state_dir` doesn't seem to have a way of getting the state
61
    /// directory path, so we need to store a copy of the path here.
62
    #[expect(unused)] // TODO RELAY remove
63
    state_path: PathBuf,
64

            
65
    /// Relay's state directory.
66
    #[expect(unused)] // TODO RELAY remove
67
    state_dir: StateDirectory,
68

            
69
    /// Location on disk where we store persistent data.
70
    state_mgr: FsStateMgr,
71

            
72
    /// Key manager holding all relay keys and certificates.
73
    keymgr: Arc<KeyMgr>,
74
}
75

            
76
impl InertTorRelay {
77
    /// Create a new Tor relay with the given configuration.
78
    pub(crate) fn new(
79
        config: TorRelayConfig,
80
        path_resolver: CfgPathResolver,
81
    ) -> anyhow::Result<Self> {
82
        let state_path = config.storage.state_dir(&path_resolver)?;
83
        let cache_path = config.storage.cache_dir(&path_resolver)?;
84

            
85
        let state_dir = StateDirectory::new(&state_path, config.storage.permissions())
86
            .context("Failed to create `StateDirectory`")?;
87
        let state_mgr =
88
            FsStateMgr::from_path_and_mistrust(&state_path, config.storage.permissions())
89
                .context("Failed to create `FsStateMgr`")?;
90

            
91
        // Try to take state ownership early, so we'll know if we have it.
92
        // Note that this `try_lock()` may return `Ok` even if we can't acquire the lock.
93
        // (At this point we don't yet care if we have it.)
94
        let _ignore_status = state_mgr
95
            .try_lock()
96
            .context("Failed to try locking the state manager")?;
97

            
98
        let keymgr = Self::create_keymgr(&state_path, config.storage.permissions())
99
            .context("Failed to create key manager")?;
100

            
101
        let dirmgr_config = DirMgrConfig {
102
            cache_dir: cache_path,
103
            cache_trust: config.storage.permissions().clone(),
104
            network: config.tor_network.clone(),
105
            schedule: Default::default(),
106
            tolerance: Default::default(),
107
            override_net_params: Default::default(),
108
            extensions: Default::default(),
109
        };
110

            
111
        Ok(Self {
112
            config,
113
            dirmgr_config,
114
            path_resolver,
115
            state_path,
116
            state_dir,
117
            state_mgr,
118
            keymgr,
119
        })
120
    }
121

            
122
    /// Connect the [`InertTorRelay`] to the Tor network.
123
    pub(crate) async fn init<R: Runtime>(self, runtime: R) -> anyhow::Result<TorRelay<R>> {
124
        // Attempt to generate any missing keys/cert from the KeyMgr.
125
        let identities = crate::tasks::crypto::try_generate_keys(&self.keymgr)
126
            .context("Failed to generate keys")?;
127

            
128
        TorRelay::init(runtime, self, identities).await
129
    }
130

            
131
    /// Create the [key manager](KeyMgr).
132
    fn create_keymgr(state_path: &Path, mistrust: &Mistrust) -> anyhow::Result<Arc<KeyMgr>> {
133
        let key_store_dir = state_path.join("keystore");
134

            
135
        let persistent_store = ArtiNativeKeystore::from_path_and_mistrust(&key_store_dir, mistrust)
136
            .context("Failed to construct the native keystore")?;
137

            
138
        // Should only log fs paths at debug level or lower,
139
        // unless they're part of a diagnostic message.
140
        debug!("Using relay keystore from {key_store_dir:?}");
141

            
142
        let keymgr = KeyMgrBuilder::default()
143
            .primary_store(Box::new(persistent_store))
144
            .build()
145
            .context("Failed to build the 'KeyMgr'")?;
146
        let keymgr = Arc::new(keymgr);
147

            
148
        // TODO: support C-tor keystore
149

            
150
        Ok(keymgr)
151
    }
152
}
153

            
154
/// Represent an active Relay on the Tor network.
155
pub(crate) struct TorRelay<R: Runtime> {
156
    /// Asynchronous runtime object.
157
    runtime: R,
158

            
159
    /// Memory quota tracker.
160
    #[expect(unused)] // TODO RELAY remove
161
    memquota: Arc<MemoryQuotaTracker>,
162

            
163
    /// A "client" used by relays to construct circuits.
164
    client: RelayClient<R>,
165

            
166
    /// Channel manager, used by circuits etc.
167
    chanmgr: Arc<ChanMgr<R>>,
168

            
169
    /// See [`InertTorRelay::keymgr`].
170
    keymgr: Arc<KeyMgr>,
171

            
172
    /// Listening OR ports.
173
    or_listeners: Vec<<R as NetStreamProvider<SocketAddr>>::Listener>,
174
}
175

            
176
impl<R: Runtime> TorRelay<R> {
177
    /// Create a new Tor relay with the given [`runtime`][tor_rtcompat].
178
    ///
179
    /// We use this to initialize components, open sockets, etc.
180
    /// Doing work with these components should happen in [`TorRelay::run()`].
181
    ///
182
    /// Expected to be called from [`InertTorRelay::init()`].
183
    async fn init(
184
        runtime: R,
185
        inert: InertTorRelay,
186
        identities: RelayIdentities,
187
    ) -> anyhow::Result<Self> {
188
        let memquota = MemoryQuotaTracker::new(&runtime, inert.config.system.memory.clone())
189
            .context("Failed to initialize memquota tracker")?;
190

            
191
        let config = ChanMgrConfig::new(inert.config.channel.clone())
192
            .with_my_addrs(inert.config.relay.advertise.all_ips())
193
            .with_identities(Arc::new(identities));
194
        let chanmgr = Arc::new(
195
            ChanMgr::new(
196
                runtime.clone(),
197
                config,
198
                Dormancy::Active,
199
                &NetParameters::default(),
200
                memquota.clone(),
201
            )
202
            .context("Failed to build chan manager")?,
203
        );
204

            
205
        let client = RelayClient::new(
206
            runtime.clone(),
207
            Arc::clone(&chanmgr),
208
            &inert.config,
209
            &inert.config,
210
            inert.dirmgr_config,
211
            inert.state_mgr,
212
        )
213
        .context("Failed to construct the relay's client")?;
214

            
215
        // An iterator of `listen()` futures with some extra error handling.
216
        let or_listeners = inert.config.relay.listen.addrs().map(async |addr| {
217
            match runtime.listen(addr).await {
218
                Ok(x) => Some(Ok(x)),
219
                // If we don't support the address family (typically IPv6), only warn.
220
                #[cfg(unix)]
221
                Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
222
                    let message =
223
                        format!("Could not listen at {addr}: address family not supported");
224
                    if addr.is_ipv6() {
225
                        warn!("{message}");
226
                    } else {
227
                        // If we got `EAFNOSUPPORT` for a non-IPv6 address, then warn louder.
228
                        tor_error::warn_report!(e, "{message}");
229
                    }
230
                    None
231
                }
232
                Err(e) => {
233
                    Some(Err(e).with_context(|| format!("Failed to listen at address {addr}")))
234
                }
235
            }
236
        });
237

            
238
        // We await the futures sequentially rather than with something like `join_all` to make
239
        // errors more reproducible.
240
        let or_listeners = {
241
            let mut awaited_listeners = vec![];
242
            for listener in or_listeners {
243
                match listener.await {
244
                    Some(Ok(x)) => awaited_listeners.push(x),
245
                    Some(Err(e)) => return Err(e),
246
                    None => {}
247
                };
248
            }
249
            awaited_listeners
250
        };
251

            
252
        // Typically we would have returned with an error if we failed to listen on an address,
253
        // but we ignore `EAFNOSUPPORT` errors above, so it's possible that all failed with
254
        // `EAFNOSUPPORT` and we ended up here.
255
        if or_listeners.is_empty() {
256
            return Err(anyhow::anyhow!(
257
                "Could not listen at any OR port addresses: {}",
258
                crate::util::iter_join(", ", inert.config.relay.listen.addrs()),
259
            ));
260
        }
261

            
262
        Ok(Self {
263
            runtime,
264
            memquota,
265
            client,
266
            chanmgr,
267
            keymgr: inert.keymgr,
268
            or_listeners,
269
        })
270
    }
271

            
272
    /// Run the actual relay.
273
    ///
274
    /// This only returns if something has gone wrong.
275
    /// Otherwise it runs forever.
276
    pub(crate) async fn run(self) -> anyhow::Result<void::Void> {
277
        let mut task_handles = JoinSet::new();
278

            
279
        // Channel housekeeping task.
280
        task_handles.spawn({
281
            let mut t = crate::tasks::ChannelHouseKeepingTask::new(&self.chanmgr);
282
            async move {
283
                t.start()
284
                    .await
285
                    .context("Failed to run channel house keeping task")
286
            }
287
        });
288

            
289
        // Listen for new Tor (OR) connections.
290
        task_handles.spawn({
291
            let runtime = self.runtime.clone();
292
            let chanmgr = Arc::clone(&self.chanmgr);
293
            async {
294
                // TODO: Should we give all tasks a `start` method?
295
                crate::tasks::listeners::or_listener(runtime, chanmgr, self.or_listeners)
296
                    .await
297
                    .context("Failed to run OR listener task")
298
            }
299
        });
300

            
301
        // Start the key rotation tasks.
302
        task_handles.spawn({
303
            let runtime = self.runtime.clone();
304
            let keymgr = self.keymgr.clone();
305
            let chanmgr = self.chanmgr.clone();
306
            async {
307
                crate::tasks::crypto::rotate_keys_task(runtime, keymgr, chanmgr)
308
                    .await
309
                    .context("Failed to run key rotation task")
310
            }
311
        });
312

            
313
        // Launch client tasks.
314
        //
315
        // We need to hold on to these handles until the relay stops, otherwise dropping these
316
        // handles would stop the background tasks.
317
        //
318
        // These are `tor_rtcompat::scheduler::TaskHandle`s, which don't notify us if they
319
        // stop/crash.
320
        //
321
        // TODO: Whose responsibility is it to ensure that these background tasks don't crash?
322
        // Should we have a way of monitoring these tasks? Or should the circuit manager re-launch
323
        // crashed tasks?
324
        let _client_task_handles = self.client.launch_background_tasks();
325

            
326
        // TODO: More tasks will be spawned here.
327

            
328
        // Now that background tasks are started, bootstrap the client.
329
        self.client
330
            .bootstrap()
331
            .await
332
            .context("Failed to bootstrap the relay's client")?;
333

            
334
        // We block until facism is erradicated or a task ends which means the relay will shutdown
335
        // and facism will have one more chance.
336
        let void = task_handles
337
            .join_next()
338
            .await
339
            .context("Relay task set is empty")?
340
            .context("Relay task join failed")?
341
            .context("Relay task stopped unexpectedly")?;
342

            
343
        // We can never get here since a `Void` cannot be constructed.
344
        void::unreachable(void);
345
    }
346
}