1
//! Entry point of a Tor relay that is the [`TorRelay`] objects
2

            
3
use std::net::SocketAddr;
4
use std::path::{Path, PathBuf};
5
use std::sync::{Arc, Weak};
6

            
7
use anyhow::Context;
8
use tokio::task::JoinSet;
9
use tor_proto::RelayChannelAuthMaterial;
10
use tracing::debug;
11
#[cfg(unix)]
12
use tracing::warn;
13

            
14
use fs_mistrust::Mistrust;
15
use tor_basic_utils::iter_join;
16
use tor_chanmgr::{ChanMgr, ChanMgrConfig, Dormancy};
17
use tor_config_path::CfgPathResolver;
18
use tor_dirmgr::DirMgrConfig;
19
use tor_keymgr::{ArtiNativeKeystore, KeyMgr, KeyMgrBuilder};
20
use tor_memquota::MemoryQuotaTracker;
21
use tor_netdir::params::NetParameters;
22
use tor_persist::state_dir::StateDirectory;
23
use tor_persist::{FsStateMgr, StateMgr};
24
use tor_proto::relay::CreateRequestHandler;
25
use tor_rtcompat::{NetStreamProvider, Runtime};
26

            
27
use crate::client::RelayClient;
28
use crate::config::TorRelayConfig;
29
use crate::tasks::channel::build_circ_net_params;
30

            
31
/// An initialized but unbootstrapped relay.
32
///
33
/// This intentionally does not have access to the runtime to prevent it from doing network io.
34
///
35
/// The idea is that we can build up the relay's components in an `InertTorRelay` without a runtime,
36
/// and then call `init()` on it and provide a runtime to turn it into a network-capable relay.
37
/// This gives us two advantages:
38
///
39
/// - We can initialize the internal data structures in the `InertTorRelay` (load the keystores,
40
///   configure memquota, etc), which leaves `TorRelay` to just "running" the relay (bootstrapping,
41
///   setting up listening sockets, etc). We don't need to combine the initialization and "running
42
///   the relay" all within the same object.
43
/// - We will likely want to share some of arti's key management subcommands in the future.
44
///   arti-client has an `InertTorClient` which is used so that arti subcommands can access the
45
///   keystore. If we do a similar thing here in arti-relay in the future, it might be nice to have
46
///   an `InertTorRelay` which has these internal data structures, but doesn't need a runtime or
47
///   have any networking capabilities.
48
///
49
/// Time will tell if this ends up being a bad design decision in practice, and we can always change
50
/// it later.
51
#[derive(Clone)]
52
pub(crate) struct InertTorRelay {
53
    /// The configuration options for the relay.
54
    config: TorRelayConfig,
55

            
56
    /// The configuration options for the client's directory manager.
57
    dirmgr_config: DirMgrConfig,
58

            
59
    /// Path resolver for expanding variables in [`CfgPath`](tor_config_path::CfgPath)s.
60
    #[expect(unused)] // TODO RELAY remove
61
    path_resolver: CfgPathResolver,
62

            
63
    /// State directory path.
64
    ///
65
    /// The [`StateDirectory`] stored in `state_dir` doesn't seem to have a way of getting the state
66
    /// directory path, so we need to store a copy of the path here.
67
    #[expect(unused)] // TODO RELAY remove
68
    state_path: PathBuf,
69

            
70
    /// Relay's state directory.
71
    #[expect(unused)] // TODO RELAY remove
72
    state_dir: StateDirectory,
73

            
74
    /// Location on disk where we store persistent data.
75
    state_mgr: FsStateMgr,
76

            
77
    /// Key manager holding all relay keys and certificates.
78
    keymgr: Arc<KeyMgr>,
79
}
80

            
81
impl InertTorRelay {
82
    /// Create a new Tor relay with the given configuration.
83
    pub(crate) fn new(
84
        config: TorRelayConfig,
85
        path_resolver: CfgPathResolver,
86
    ) -> anyhow::Result<Self> {
87
        let state_path = config.storage.state_dir(&path_resolver)?;
88
        let cache_path = config.storage.cache_dir(&path_resolver)?;
89

            
90
        let state_dir = StateDirectory::new(&state_path, config.storage.permissions())
91
            .context("Failed to create `StateDirectory`")?;
92
        let state_mgr =
93
            FsStateMgr::from_path_and_mistrust(&state_path, config.storage.permissions())
94
                .context("Failed to create `FsStateMgr`")?;
95

            
96
        // Try to take state ownership early, so we'll know if we have it.
97
        // Note that this `try_lock()` may return `Ok` even if we can't acquire the lock.
98
        // (At this point we don't yet care if we have it.)
99
        let _ignore_status = state_mgr
100
            .try_lock()
101
            .context("Failed to try locking the state manager")?;
102

            
103
        let keymgr = Self::create_keymgr(&state_path, config.storage.permissions())
104
            .context("Failed to create key manager")?;
105

            
106
        let dirmgr_config = DirMgrConfig {
107
            cache_dir: cache_path,
108
            cache_trust: config.storage.permissions().clone(),
109
            network: config.tor_network.clone(),
110
            schedule: Default::default(),
111
            tolerance: Default::default(),
112
            override_net_params: Default::default(),
113
            extensions: Default::default(),
114
        };
115

            
116
        Ok(Self {
117
            config,
118
            dirmgr_config,
119
            path_resolver,
120
            state_path,
121
            state_dir,
122
            state_mgr,
123
            keymgr,
124
        })
125
    }
126

            
127
    /// Connect the [`InertTorRelay`] to the Tor network.
128
    pub(crate) async fn init<R: Runtime>(self, runtime: R) -> anyhow::Result<TorRelay<R>> {
129
        // Attempt to generate any missing keys/cert from the KeyMgr.
130
        let auth_material = crate::tasks::crypto::try_generate_keys(&runtime, &self.keymgr)
131
            .context("Failed to generate keys")?;
132

            
133
        TorRelay::init(runtime, self, auth_material).await
134
    }
135

            
136
    /// Create the [key manager](KeyMgr).
137
    fn create_keymgr(state_path: &Path, mistrust: &Mistrust) -> anyhow::Result<Arc<KeyMgr>> {
138
        let key_store_dir = state_path.join("keystore");
139

            
140
        let persistent_store = ArtiNativeKeystore::from_path_and_mistrust(&key_store_dir, mistrust)
141
            .context("Failed to construct the native keystore")?;
142

            
143
        // Should only log fs paths at debug level or lower,
144
        // unless they're part of a diagnostic message.
145
        debug!("Using relay keystore from {key_store_dir:?}");
146

            
147
        let keymgr = KeyMgrBuilder::default()
148
            .primary_store(Box::new(persistent_store))
149
            .build()
150
            .context("Failed to build the 'KeyMgr'")?;
151
        let keymgr = Arc::new(keymgr);
152

            
153
        // TODO: support C-tor keystore
154

            
155
        Ok(keymgr)
156
    }
157
}
158

            
159
/// Represent an active Relay on the Tor network.
160
pub(crate) struct TorRelay<R: Runtime> {
161
    /// Asynchronous runtime object.
162
    runtime: R,
163

            
164
    /// Memory quota tracker.
165
    #[expect(unused)] // TODO RELAY remove
166
    memquota: Arc<MemoryQuotaTracker>,
167

            
168
    /// A "client" used by relays to construct circuits.
169
    client: RelayClient<R>,
170

            
171
    /// Channel manager, used by circuits etc.
172
    chanmgr: Arc<ChanMgr<R>>,
173

            
174
    /// Handles CREATE* requests on channels.
175
    ///
176
    /// Given to the [`ChanMgr`],
177
    /// which gives it to each channel.
178
    /// We can access this handler directly to update consensus parameters or keys.
179
    create_request_handler: Arc<CreateRequestHandler>,
180

            
181
    /// See [`InertTorRelay::keymgr`].
182
    keymgr: Arc<KeyMgr>,
183

            
184
    /// Listening OR ports.
185
    or_listeners: Vec<<R as NetStreamProvider<SocketAddr>>::Listener>,
186
}
187

            
188
impl<R: Runtime> TorRelay<R> {
189
    /// Create a new Tor relay with the given [`runtime`][tor_rtcompat].
190
    ///
191
    /// We use this to initialize components, open sockets, etc.
192
    /// Doing work with these components should happen in [`TorRelay::run()`].
193
    ///
194
    /// Expected to be called from [`InertTorRelay::init()`].
195
    async fn init(
196
        runtime: R,
197
        inert: InertTorRelay,
198
        auth_material: RelayChannelAuthMaterial,
199
    ) -> anyhow::Result<Self> {
200
        let memquota = MemoryQuotaTracker::new(&runtime, inert.config.system.memory.clone())
201
            .context("Failed to initialize memquota tracker")?;
202

            
203
        // Init the channel manager.
204
        let config = ChanMgrConfig::new(inert.config.channel.clone())
205
            .with_my_addrs(inert.config.relay.advertise.all_addr())
206
            .with_auth_material(Arc::new(auth_material));
207
        let chanmgr = Arc::new(
208
            ChanMgr::new(
209
                runtime.clone(),
210
                config,
211
                Dormancy::Active,
212
                // TODO: It seems wrong to start with the compiled-in defaults when we might have
213
                // a newer network status on disk that would provide a better initial value,
214
                // but `TorClient` does this too so let's not worry about it.
215
                &NetParameters::default(),
216
                memquota.clone(),
217
            )
218
            .context("Failed to build chan manager")?,
219
        );
220

            
221
        // Init the relay's client.
222
        let client = RelayClient::new(
223
            runtime.clone(),
224
            Arc::clone(&chanmgr),
225
            &inert.config,
226
            &inert.config,
227
            inert.dirmgr_config,
228
            inert.state_mgr,
229
        )
230
        .context("Failed to construct the relay's client")?;
231

            
232
        // Circuit-related network status parameters.
233
        let circ_net_params = build_circ_net_params(client.dirmgr().params().as_ref().as_ref())
234
            .context("Failed to build circuit parameters for CREATE* request handler")?;
235

            
236
        // A handler that will process CREATE* requests on channels.
237
        let create_request_handler =
238
            CreateRequestHandler::new(Arc::downgrade(&chanmgr) as Weak<_>, circ_net_params);
239
        let create_request_handler = Arc::new(create_request_handler);
240

            
241
        // Configure the channel manager to handle CREATE* requests.
242
        //
243
        // We do this once, and can later update its network parameters and keys using the
244
        // `Arc` handle that we store.
245
        // The `ChanMgr` will hold an `Arc<CreateRequestHandler>` and
246
        // the `CreateRequestHandler` will hold a `Weak<ChanMgr>`.
247
        //
248
        // We could technically do something fancier by creating the `ChanMgr` and handler
249
        // inside an `Arc::new_cyclic()` and pass the handler as part of the `ChanMgrConfig`,
250
        // but the code becomes a mess.
251
        chanmgr
252
            .set_create_request_handler(Arc::clone(&create_request_handler))
253
            .context("Failed to set the CREATE* request handler")?;
254

            
255
        // An iterator of `listen()` futures with some extra error handling.
256
        let or_listeners = inert.config.relay.listen.addrs().map(async |addr| {
257
            match runtime.listen(addr).await {
258
                Ok(x) => Some(Ok(x)),
259
                // If we don't support the address family (typically IPv6), only warn.
260
                #[cfg(unix)]
261
                Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
262
                    let message =
263
                        format!("Could not listen at {addr}: address family not supported");
264
                    if addr.is_ipv6() {
265
                        warn!("{message}");
266
                    } else {
267
                        // If we got `EAFNOSUPPORT` for a non-IPv6 address, then warn louder.
268
                        tor_error::warn_report!(e, "{message}");
269
                    }
270
                    None
271
                }
272
                Err(e) => {
273
                    Some(Err(e).with_context(|| format!("Failed to listen at address {addr}")))
274
                }
275
            }
276
        });
277

            
278
        // We await the futures sequentially rather than with something like `join_all` to make
279
        // errors more reproducible.
280
        let or_listeners = {
281
            let mut awaited_listeners = vec![];
282
            for listener in or_listeners {
283
                match listener.await {
284
                    Some(Ok(x)) => awaited_listeners.push(x),
285
                    Some(Err(e)) => return Err(e),
286
                    None => {}
287
                };
288
            }
289
            awaited_listeners
290
        };
291

            
292
        // Typically we would have returned with an error if we failed to listen on an address,
293
        // but we ignore `EAFNOSUPPORT` errors above, so it's possible that all failed with
294
        // `EAFNOSUPPORT` and we ended up here.
295
        if or_listeners.is_empty() {
296
            return Err(anyhow::anyhow!(
297
                "Could not listen at any OR port addresses: {}",
298
                iter_join(", ", inert.config.relay.listen.addrs()),
299
            ));
300
        }
301

            
302
        Ok(Self {
303
            runtime,
304
            memquota,
305
            client,
306
            chanmgr,
307
            create_request_handler,
308
            keymgr: inert.keymgr,
309
            or_listeners,
310
        })
311
    }
312

            
313
    /// Run the actual relay.
314
    ///
315
    /// This only returns if something has gone wrong.
316
    /// Otherwise it runs forever.
317
    pub(crate) async fn run(self) -> anyhow::Result<void::Void> {
318
        let mut task_handles = JoinSet::new();
319

            
320
        // Channel housekeeping task.
321
        task_handles.spawn({
322
            let mut t = crate::tasks::ChannelHouseKeepingTask::new(&self.chanmgr);
323
            async move {
324
                t.start()
325
                    .await
326
                    .context("Failed to run channel house keeping task")
327
            }
328
        });
329

            
330
        // Update the CREATE* request handler when there are new network parameters.
331
        task_handles.spawn({
332
            let create_request_handler = Arc::clone(&self.create_request_handler);
333
            let dir_provider = Arc::clone(self.client.dirmgr());
334
            async {
335
                crate::tasks::channel::update_create_request_handler_netparams(
336
                    create_request_handler,
337
                    dir_provider as Arc<_>,
338
                )
339
                .await
340
                .context("Failed to run create request handler update task")
341
            }
342
        });
343

            
344
        // Listen for new Tor (OR) connections.
345
        task_handles.spawn({
346
            let runtime = self.runtime.clone();
347
            let chanmgr = Arc::clone(&self.chanmgr);
348
            async {
349
                // TODO: Should we give all tasks a `start` method?
350
                crate::tasks::listeners::or_listener(runtime, chanmgr, self.or_listeners)
351
                    .await
352
                    .context("Failed to run OR listener task")
353
            }
354
        });
355

            
356
        // Start the key rotation tasks.
357
        task_handles.spawn({
358
            let runtime = self.runtime.clone();
359
            let keymgr = self.keymgr.clone();
360
            let chanmgr = self.chanmgr.clone();
361
            let create_request_handler = Arc::clone(&self.create_request_handler);
362
            async {
363
                crate::tasks::crypto::rotate_keys_task(
364
                    runtime,
365
                    keymgr,
366
                    chanmgr,
367
                    create_request_handler,
368
                )
369
                .await
370
                .context("Failed to run key rotation task")
371
            }
372
        });
373

            
374
        // Launch client tasks.
375
        //
376
        // We need to hold on to these handles until the relay stops, otherwise dropping these
377
        // handles would stop the background tasks.
378
        //
379
        // These are `tor_rtcompat::scheduler::TaskHandle`s, which don't notify us if they
380
        // stop/crash.
381
        //
382
        // TODO: Whose responsibility is it to ensure that these background tasks don't crash?
383
        // Should we have a way of monitoring these tasks? Or should the circuit manager re-launch
384
        // crashed tasks?
385
        let _client_task_handles = self.client.launch_background_tasks();
386

            
387
        // TODO: More tasks will be spawned here.
388

            
389
        // Now that background tasks are started, bootstrap the client.
390
        self.client
391
            .bootstrap()
392
            .await
393
            .context("Failed to bootstrap the relay's client")?;
394

            
395
        // We block until facism is erradicated or a task ends which means the relay will shutdown
396
        // and facism will have one more chance.
397
        let void = task_handles
398
            .join_next()
399
            .await
400
            .context("Relay task set is empty")?
401
            .context("Relay task join failed")?
402
            .context("Relay task stopped unexpectedly")?;
403

            
404
        // We can never get here since a `Void` cannot be constructed.
405
        void::unreachable(void);
406
    }
407

            
408
    /// Access the relay's key manager.
409
    pub(crate) fn keymgr(&self) -> &Arc<KeyMgr> {
410
        &self.keymgr
411
    }
412
}