1
//! Entry point of a Tor relay that is the [`TorRelay`] objects
2

            
3
use std::net::SocketAddr;
4
use std::path::{Path, PathBuf};
5
use std::sync::{Arc, Weak};
6

            
7
use anyhow::Context;
8
use tokio::task::JoinSet;
9
use tracing::debug;
10
#[cfg(unix)]
11
use tracing::warn;
12

            
13
use fs_mistrust::Mistrust;
14
use tor_basic_utils::iter_join;
15
use tor_chanmgr::{ChanMgr, ChanMgrConfig, Dormancy};
16
use tor_config_path::CfgPathResolver;
17
use tor_dirmgr::DirMgrConfig;
18
use tor_keymgr::{ArtiNativeKeystore, KeyMgr, KeyMgrBuilder};
19
use tor_memquota::MemoryQuotaTracker;
20
use tor_netdir::params::NetParameters;
21
use tor_persist::state_dir::StateDirectory;
22
use tor_persist::{FsStateMgr, StateMgr};
23
use tor_proto::relay::CreateRequestHandler;
24
use tor_rtcompat::{NetStreamProvider, Runtime};
25

            
26
use crate::client::RelayClient;
27
use crate::config::TorRelayConfig;
28
use crate::tasks::channel::build_circ_net_params;
29
use crate::tasks::crypto::InitKeyMaterial;
30

            
31
/// An initialized but unbootstrapped relay.
32
///
33
/// This intentionally does not have access to the runtime to prevent it from doing network io.
34
///
35
/// The idea is that we can build up the relay's components in an `InertTorRelay` without a runtime,
36
/// and then call `init()` on it and provide a runtime to turn it into a network-capable relay.
37
/// This gives us two advantages:
38
///
39
/// - We can initialize the internal data structures in the `InertTorRelay` (load the keystores,
40
///   configure memquota, etc), which leaves `TorRelay` to just "running" the relay (bootstrapping,
41
///   setting up listening sockets, etc). We don't need to combine the initialization and "running
42
///   the relay" all within the same object.
43
/// - We will likely want to share some of arti's key management subcommands in the future.
44
///   arti-client has an `InertTorClient` which is used so that arti subcommands can access the
45
///   keystore. If we do a similar thing here in arti-relay in the future, it might be nice to have
46
///   an `InertTorRelay` which has these internal data structures, but doesn't need a runtime or
47
///   have any networking capabilities.
48
///
49
/// Time will tell if this ends up being a bad design decision in practice, and we can always change
50
/// it later.
51
pub(crate) struct InertTorRelay {
52
    /// The configuration options for the relay.
53
    config: TorRelayConfig,
54

            
55
    /// The configuration options for the client's directory manager.
56
    dirmgr_config: DirMgrConfig,
57

            
58
    /// Path resolver for expanding variables in [`CfgPath`](tor_config_path::CfgPath)s.
59
    #[expect(unused)] // TODO RELAY remove
60
    path_resolver: CfgPathResolver,
61

            
62
    /// State directory path.
63
    ///
64
    /// The [`StateDirectory`] stored in `state_dir` doesn't seem to have a way of getting the state
65
    /// directory path, so we need to store a copy of the path here.
66
    #[expect(unused)] // TODO RELAY remove
67
    state_path: PathBuf,
68

            
69
    /// Relay's state directory.
70
    #[expect(unused)] // TODO RELAY remove
71
    state_dir: StateDirectory,
72

            
73
    /// Location on disk where we store persistent data.
74
    state_mgr: FsStateMgr,
75

            
76
    /// Key manager. The ownership is shared between the crypto task and the main task
77
    /// [`TorRelay`].
78
    ///
79
    // NOTE: In a future world, would be great if this wouldn't be an Arc<> and we could move it to
80
    // the crypto task so nobody has access to it. For now, this is the compromise for simplicity.
81
    keymgr: Arc<KeyMgr>,
82
}
83

            
84
impl InertTorRelay {
85
    /// Create a new Tor relay with the given configuration.
86
    pub(crate) fn new(
87
        config: TorRelayConfig,
88
        path_resolver: CfgPathResolver,
89
    ) -> anyhow::Result<Self> {
90
        let state_path = config.storage.state_dir(&path_resolver)?;
91
        let cache_path = config.storage.cache_dir(&path_resolver)?;
92

            
93
        let state_dir = StateDirectory::new(&state_path, config.storage.permissions())
94
            .context("Failed to create `StateDirectory`")?;
95
        let state_mgr =
96
            FsStateMgr::from_path_and_mistrust(&state_path, config.storage.permissions())
97
                .context("Failed to create `FsStateMgr`")?;
98

            
99
        // Try to take state ownership early, so we'll know if we have it.
100
        // Note that this `try_lock()` may return `Ok` even if we can't acquire the lock.
101
        // (At this point we don't yet care if we have it.)
102
        let _ignore_status = state_mgr
103
            .try_lock()
104
            .context("Failed to try locking the state manager")?;
105

            
106
        let keymgr = Self::create_keymgr(&state_path, config.storage.permissions())
107
            .context("Failed to create key manager")?;
108

            
109
        let dirmgr_config = DirMgrConfig {
110
            cache_dir: cache_path,
111
            cache_trust: config.storage.permissions().clone(),
112
            network: config.tor_network.clone(),
113
            schedule: Default::default(),
114
            tolerance: Default::default(),
115
            override_net_params: Default::default(),
116
            extensions: Default::default(),
117
        };
118

            
119
        Ok(Self {
120
            config,
121
            dirmgr_config,
122
            path_resolver,
123
            state_path,
124
            state_dir,
125
            state_mgr,
126
            keymgr,
127
        })
128
    }
129

            
130
    /// Connect the [`InertTorRelay`] to the Tor network.
131
    pub(crate) async fn init<R: Runtime>(self, runtime: R) -> anyhow::Result<TorRelay<R>> {
132
        // Attempt to generate any missing keys/cert from the KeyMgr.
133
        let init_key_material = crate::tasks::crypto::init_keys(&runtime, Arc::clone(&self.keymgr))
134
            .context("Failed to generate keys")?;
135

            
136
        TorRelay::init(runtime, self, init_key_material).await
137
    }
138

            
139
    /// Create the [key manager](KeyMgr).
140
    fn create_keymgr(state_path: &Path, mistrust: &Mistrust) -> anyhow::Result<Arc<KeyMgr>> {
141
        let key_store_dir = state_path.join("keystore");
142

            
143
        let persistent_store = ArtiNativeKeystore::from_path_and_mistrust(&key_store_dir, mistrust)
144
            .context("Failed to construct the native keystore")?;
145

            
146
        // Should only log fs paths at debug level or lower,
147
        // unless they're part of a diagnostic message.
148
        debug!("Using relay keystore from {key_store_dir:?}");
149

            
150
        let keymgr = KeyMgrBuilder::default()
151
            .primary_store(Box::new(persistent_store))
152
            .build()
153
            .context("Failed to build the 'KeyMgr'")?;
154
        let keymgr = Arc::new(keymgr);
155

            
156
        // TODO: support C-tor keystore
157

            
158
        Ok(keymgr)
159
    }
160
}
161

            
162
/// Represent an active Relay on the Tor network.
163
pub(crate) struct TorRelay<R: Runtime> {
164
    /// Asynchronous runtime object.
165
    runtime: R,
166

            
167
    /// Memory quota tracker.
168
    #[expect(unused)] // TODO RELAY remove
169
    memquota: Arc<MemoryQuotaTracker>,
170

            
171
    /// A "client" used by relays to construct circuits.
172
    client: RelayClient<R>,
173

            
174
    /// Channel manager, used by circuits etc.
175
    chanmgr: Arc<ChanMgr<R>>,
176

            
177
    /// Handles CREATE* requests on channels.
178
    ///
179
    /// Given to the [`ChanMgr`],
180
    /// which gives it to each channel.
181
    /// We can access this handler directly to update consensus parameters or keys.
182
    create_request_handler: Arc<CreateRequestHandler>,
183

            
184
    /// See [`InertTorRelay::keymgr`].
185
    keymgr: Arc<KeyMgr>,
186

            
187
    /// Listening OR ports.
188
    or_listeners: Vec<<R as NetStreamProvider<SocketAddr>>::Listener>,
189
}
190

            
191
impl<R: Runtime> TorRelay<R> {
192
    /// Create a new Tor relay with the given [`runtime`][tor_rtcompat].
193
    ///
194
    /// We use this to initialize components, open sockets, etc.
195
    /// Doing work with these components should happen in [`TorRelay::run()`].
196
    ///
197
    /// Expected to be called from [`InertTorRelay::init()`].
198
    async fn init(
199
        runtime: R,
200
        inert: InertTorRelay,
201
        init_key_material: InitKeyMaterial,
202
    ) -> anyhow::Result<Self> {
203
        let memquota = MemoryQuotaTracker::new(&runtime, inert.config.system.memory.clone())
204
            .context("Failed to initialize memquota tracker")?;
205

            
206
        // Init the channel manager.
207
        let config = ChanMgrConfig::new(inert.config.channel.clone())
208
            .with_my_addrs(inert.config.relay.advertise.all_addr())
209
            .with_auth_material(Arc::new(init_key_material.chan_auth_keys));
210
        let chanmgr = Arc::new(
211
            ChanMgr::new(
212
                runtime.clone(),
213
                config,
214
                Dormancy::Active,
215
                // TODO: It seems wrong to start with the compiled-in defaults when we might have
216
                // a newer network status on disk that would provide a better initial value,
217
                // but `TorClient` does this too so let's not worry about it.
218
                &NetParameters::default(),
219
                memquota.clone(),
220
            )
221
            .context("Failed to build chan manager")?,
222
        );
223

            
224
        // Init the relay's client.
225
        let client = RelayClient::new(
226
            runtime.clone(),
227
            Arc::clone(&chanmgr),
228
            &inert.config,
229
            &inert.config,
230
            inert.dirmgr_config,
231
            inert.state_mgr,
232
        )
233
        .context("Failed to construct the relay's client")?;
234

            
235
        // Circuit-related network status parameters.
236
        let circ_net_params = build_circ_net_params(client.dirmgr().params().as_ref().as_ref())
237
            .context("Failed to build circuit parameters for CREATE* request handler")?;
238

            
239
        // A handler that will process CREATE* requests on channels.
240
        let create_request_handler = CreateRequestHandler::new(
241
            Arc::downgrade(&chanmgr) as Weak<_>,
242
            circ_net_params,
243
            init_key_material.ntor_keys,
244
        );
245
        let create_request_handler = Arc::new(create_request_handler);
246

            
247
        // Configure the channel manager to handle CREATE* requests.
248
        //
249
        // We do this once, and can later update its network parameters and keys using the
250
        // `Arc` handle that we store.
251
        // The `ChanMgr` will hold an `Arc<CreateRequestHandler>` and
252
        // the `CreateRequestHandler` will hold a `Weak<ChanMgr>`.
253
        //
254
        // We could technically do something fancier by creating the `ChanMgr` and handler
255
        // inside an `Arc::new_cyclic()` and pass the handler as part of the `ChanMgrConfig`,
256
        // but the code becomes a mess.
257
        chanmgr
258
            .set_create_request_handler(Arc::clone(&create_request_handler))
259
            .context("Failed to set the CREATE* request handler")?;
260

            
261
        // We don't use any custom options on the listening socket.
262
        let listen_options = Default::default();
263

            
264
        // An iterator of `listen()` futures with some extra error handling.
265
        let or_listeners = inert.config.relay.listen.addrs().map(async |addr| {
266
            match runtime.listen(addr, &listen_options).await {
267
                Ok(x) => Some(Ok(x)),
268
                // If we don't support the address family (typically IPv6), only warn.
269
                #[cfg(unix)]
270
                Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
271
                    let message =
272
                        format!("Could not listen at {addr}: address family not supported");
273
                    if addr.is_ipv6() {
274
                        warn!("{message}");
275
                    } else {
276
                        // If we got `EAFNOSUPPORT` for a non-IPv6 address, then warn louder.
277
                        tor_error::warn_report!(e, "{message}");
278
                    }
279
                    None
280
                }
281
                Err(e) => {
282
                    Some(Err(e).with_context(|| format!("Failed to listen at address {addr}")))
283
                }
284
            }
285
        });
286

            
287
        // We await the futures sequentially rather than with something like `join_all` to make
288
        // errors more reproducible.
289
        let or_listeners = {
290
            let mut awaited_listeners = vec![];
291
            for listener in or_listeners {
292
                match listener.await {
293
                    Some(Ok(x)) => awaited_listeners.push(x),
294
                    Some(Err(e)) => return Err(e),
295
                    None => {}
296
                };
297
            }
298
            awaited_listeners
299
        };
300

            
301
        // Typically we would have returned with an error if we failed to listen on an address,
302
        // but we ignore `EAFNOSUPPORT` errors above, so it's possible that all failed with
303
        // `EAFNOSUPPORT` and we ended up here.
304
        if or_listeners.is_empty() {
305
            return Err(anyhow::anyhow!(
306
                "Could not listen at any OR port addresses: {}",
307
                iter_join(", ", inert.config.relay.listen.addrs()),
308
            ));
309
        }
310

            
311
        Ok(Self {
312
            runtime,
313
            memquota,
314
            client,
315
            chanmgr,
316
            create_request_handler,
317
            keymgr: inert.keymgr,
318
            or_listeners,
319
        })
320
    }
321

            
322
    /// Run the actual relay.
323
    ///
324
    /// This only returns if something has gone wrong.
325
    /// Otherwise it runs forever.
326
    pub(crate) async fn run(self) -> anyhow::Result<void::Void> {
327
        let mut task_handles = JoinSet::new();
328

            
329
        // Channel housekeeping task.
330
        task_handles.spawn({
331
            let mut t = crate::tasks::ChannelHouseKeepingTask::new(&self.chanmgr);
332
            async move {
333
                t.start()
334
                    .await
335
                    .context("Failed to run channel house keeping task")
336
            }
337
        });
338

            
339
        // Update the CREATE* request handler when there are new network parameters.
340
        task_handles.spawn({
341
            let create_request_handler = Arc::clone(&self.create_request_handler);
342
            let dir_provider = Arc::clone(self.client.dirmgr());
343
            async {
344
                crate::tasks::channel::update_create_request_handler_netparams(
345
                    create_request_handler,
346
                    dir_provider as Arc<_>,
347
                )
348
                .await
349
                .context("Failed to run create request handler update task")
350
            }
351
        });
352

            
353
        // Listen for new Tor (OR) connections.
354
        task_handles.spawn({
355
            let runtime = self.runtime.clone();
356
            let chanmgr = Arc::clone(&self.chanmgr);
357
            async {
358
                // TODO: Should we give all tasks a `start` method?
359
                crate::tasks::listeners::or_listener(runtime, chanmgr, self.or_listeners)
360
                    .await
361
                    .context("Failed to run OR listener task")
362
            }
363
        });
364

            
365
        // Start the crypto task.
366
        task_handles.spawn({
367
            let reactor = crate::tasks::crypto::Reactor::new(
368
                self.runtime.clone(),
369
                self.chanmgr.clone(),
370
                self.create_request_handler.clone(),
371
                self.keymgr,
372
                self.client.dirmgr().clone(),
373
            )?;
374
            async {
375
                reactor
376
                    .run()
377
                    .await
378
                    .context("Failed to run key rotation task")
379
            }
380
        });
381

            
382
        // Launch client tasks.
383
        //
384
        // We need to hold on to these handles until the relay stops, otherwise dropping these
385
        // handles would stop the background tasks.
386
        //
387
        // These are `tor_rtcompat::scheduler::TaskHandle`s, which don't notify us if they
388
        // stop/crash.
389
        //
390
        // TODO: Whose responsibility is it to ensure that these background tasks don't crash?
391
        // Should we have a way of monitoring these tasks? Or should the circuit manager re-launch
392
        // crashed tasks?
393
        let _client_task_handles = self.client.launch_background_tasks();
394

            
395
        // TODO: More tasks will be spawned here.
396

            
397
        // Now that background tasks are started, bootstrap the client.
398
        self.client
399
            .bootstrap()
400
            .await
401
            .context("Failed to bootstrap the relay's client")?;
402

            
403
        // We block until facism is erradicated or a task ends which means the relay will shutdown
404
        // and facism will have one more chance.
405
        let void = task_handles
406
            .join_next()
407
            .await
408
            .context("Relay task set is empty")?
409
            .context("Relay task join failed")?
410
            .context("Relay task stopped unexpectedly")?;
411

            
412
        // We can never get here since a `Void` cannot be constructed.
413
        void::unreachable(void);
414
    }
415
}