1
//! Entry point of a Tor relay that is the [`TorRelay`] objects
2

            
3
use std::net::SocketAddr;
4
use std::path::{Path, PathBuf};
5
use std::sync::{Arc, Weak};
6

            
7
use anyhow::Context;
8
use tokio::task::JoinSet;
9
use tracing::debug;
10
#[cfg(unix)]
11
use tracing::warn;
12

            
13
use fs_mistrust::Mistrust;
14
use tor_basic_utils::iter_join;
15
use tor_chanmgr::{ChanMgr, ChanMgrConfig, Dormancy};
16
use tor_config_path::CfgPathResolver;
17
use tor_dirmgr::DirMgrConfig;
18
use tor_keymgr::{ArtiNativeKeystore, KeyMgr, KeyMgrBuilder};
19
use tor_memquota::MemoryQuotaTracker;
20
use tor_netdir::params::NetParameters;
21
use tor_persist::state_dir::StateDirectory;
22
use tor_persist::{FsStateMgr, StateMgr};
23
use tor_proto::relay::CreateRequestHandler;
24
use tor_rtcompat::{NetStreamProvider, Runtime};
25

            
26
use crate::client::RelayClient;
27
use crate::config::TorRelayConfig;
28
use crate::stream::RequestFilter;
29
use crate::tasks::channel::build_circ_net_params;
30
use crate::tasks::crypto::InitKeyMaterial;
31

            
32
/// An initialized but unbootstrapped relay.
33
///
34
/// This intentionally does not have access to the runtime to prevent it from doing network io.
35
///
36
/// The idea is that we can build up the relay's components in an `InertTorRelay` without a runtime,
37
/// and then call `init()` on it and provide a runtime to turn it into a network-capable relay.
38
/// This gives us two advantages:
39
///
40
/// - We can initialize the internal data structures in the `InertTorRelay` (load the keystores,
41
///   configure memquota, etc), which leaves `TorRelay` to just "running" the relay (bootstrapping,
42
///   setting up listening sockets, etc). We don't need to combine the initialization and "running
43
///   the relay" all within the same object.
44
/// - We will likely want to share some of arti's key management subcommands in the future.
45
///   arti-client has an `InertTorClient` which is used so that arti subcommands can access the
46
///   keystore. If we do a similar thing here in arti-relay in the future, it might be nice to have
47
///   an `InertTorRelay` which has these internal data structures, but doesn't need a runtime or
48
///   have any networking capabilities.
49
///
50
/// Time will tell if this ends up being a bad design decision in practice, and we can always change
51
/// it later.
52
pub(crate) struct InertTorRelay {
53
    /// The configuration options for the relay.
54
    config: TorRelayConfig,
55

            
56
    /// The configuration options for the client's directory manager.
57
    dirmgr_config: DirMgrConfig,
58

            
59
    /// Path resolver for expanding variables in [`CfgPath`](tor_config_path::CfgPath)s.
60
    #[expect(unused)] // TODO RELAY remove
61
    path_resolver: CfgPathResolver,
62

            
63
    /// State directory path.
64
    ///
65
    /// The [`StateDirectory`] stored in `state_dir` doesn't seem to have a way of getting the state
66
    /// directory path, so we need to store a copy of the path here.
67
    #[expect(unused)] // TODO RELAY remove
68
    state_path: PathBuf,
69

            
70
    /// Relay's state directory.
71
    #[expect(unused)] // TODO RELAY remove
72
    state_dir: StateDirectory,
73

            
74
    /// Location on disk where we store persistent data.
75
    state_mgr: FsStateMgr,
76

            
77
    /// Key manager. The ownership is shared between the crypto task and the main task
78
    /// [`TorRelay`].
79
    ///
80
    // NOTE: In a future world, would be great if this wouldn't be an Arc<> and we could move it to
81
    // the crypto task so nobody has access to it. For now, this is the compromise for simplicity.
82
    keymgr: Arc<KeyMgr>,
83
}
84

            
85
impl InertTorRelay {
86
    /// Create a new Tor relay with the given configuration.
87
    pub(crate) fn new(
88
        config: TorRelayConfig,
89
        path_resolver: CfgPathResolver,
90
    ) -> anyhow::Result<Self> {
91
        let state_path = config.storage.state_dir(&path_resolver)?;
92
        let cache_path = config.storage.cache_dir(&path_resolver)?;
93

            
94
        let state_dir = StateDirectory::new(&state_path, config.storage.permissions())
95
            .context("Failed to create `StateDirectory`")?;
96
        let state_mgr =
97
            FsStateMgr::from_path_and_mistrust(&state_path, config.storage.permissions())
98
                .context("Failed to create `FsStateMgr`")?;
99

            
100
        // Try to take state ownership early, so we'll know if we have it.
101
        // Note that this `try_lock()` may return `Ok` even if we can't acquire the lock.
102
        // (At this point we don't yet care if we have it.)
103
        let _ignore_status = state_mgr
104
            .try_lock()
105
            .context("Failed to try locking the state manager")?;
106

            
107
        let keymgr = Self::create_keymgr(&state_path, config.storage.permissions())
108
            .context("Failed to create key manager")?;
109

            
110
        let dirmgr_config = DirMgrConfig {
111
            cache_dir: cache_path,
112
            cache_trust: config.storage.permissions().clone(),
113
            network: config.tor_network.clone(),
114
            schedule: Default::default(),
115
            tolerance: Default::default(),
116
            override_net_params: Default::default(),
117
            extensions: Default::default(),
118
        };
119

            
120
        Ok(Self {
121
            config,
122
            dirmgr_config,
123
            path_resolver,
124
            state_path,
125
            state_dir,
126
            state_mgr,
127
            keymgr,
128
        })
129
    }
130

            
131
    /// Connect the [`InertTorRelay`] to the Tor network.
132
    pub(crate) async fn init<R: Runtime>(self, runtime: R) -> anyhow::Result<TorRelay<R>> {
133
        // Attempt to generate any missing keys/cert from the KeyMgr.
134
        let init_key_material = crate::tasks::crypto::init_keys(&runtime, Arc::clone(&self.keymgr))
135
            .context("Failed to generate keys")?;
136

            
137
        TorRelay::init(runtime, self, init_key_material).await
138
    }
139

            
140
    /// Create the [key manager](KeyMgr).
141
    fn create_keymgr(state_path: &Path, mistrust: &Mistrust) -> anyhow::Result<Arc<KeyMgr>> {
142
        let key_store_dir = state_path.join("keystore");
143

            
144
        let persistent_store = ArtiNativeKeystore::from_path_and_mistrust(&key_store_dir, mistrust)
145
            .context("Failed to construct the native keystore")?;
146

            
147
        // Should only log fs paths at debug level or lower,
148
        // unless they're part of a diagnostic message.
149
        debug!("Using relay keystore from {key_store_dir:?}");
150

            
151
        let keymgr = KeyMgrBuilder::default()
152
            .primary_store(Box::new(persistent_store))
153
            .build()
154
            .context("Failed to build the 'KeyMgr'")?;
155
        let keymgr = Arc::new(keymgr);
156

            
157
        // TODO: support C-tor keystore
158

            
159
        Ok(keymgr)
160
    }
161
}
162

            
163
/// Represent an active Relay on the Tor network.
164
pub(crate) struct TorRelay<R: Runtime> {
165
    /// Asynchronous runtime object.
166
    runtime: R,
167

            
168
    /// Memory quota tracker.
169
    #[expect(unused)] // TODO RELAY remove
170
    memquota: Arc<MemoryQuotaTracker>,
171

            
172
    /// A "client" used by relays to construct circuits.
173
    client: RelayClient<R>,
174

            
175
    /// Channel manager, used by circuits etc.
176
    chanmgr: Arc<ChanMgr<R>>,
177

            
178
    /// Handles CREATE* requests on channels.
179
    ///
180
    /// Given to the [`ChanMgr`],
181
    /// which gives it to each channel.
182
    /// We can access this handler directly to update consensus parameters or keys.
183
    create_request_handler: Arc<CreateRequestHandler>,
184

            
185
    /// See [`InertTorRelay::keymgr`].
186
    keymgr: Arc<KeyMgr>,
187

            
188
    /// Listening OR ports.
189
    or_listeners: Vec<<R as NetStreamProvider<SocketAddr>>::Listener>,
190
}
191

            
192
impl<R: Runtime> TorRelay<R> {
193
    /// Create a new Tor relay with the given [`runtime`][tor_rtcompat].
194
    ///
195
    /// We use this to initialize components, open sockets, etc.
196
    /// Doing work with these components should happen in [`TorRelay::run()`].
197
    ///
198
    /// Expected to be called from [`InertTorRelay::init()`].
199
    async fn init(
200
        runtime: R,
201
        inert: InertTorRelay,
202
        init_key_material: InitKeyMaterial,
203
    ) -> anyhow::Result<Self> {
204
        let memquota = MemoryQuotaTracker::new(&runtime, inert.config.system.memory.clone())
205
            .context("Failed to initialize memquota tracker")?;
206

            
207
        // Init the channel manager.
208
        let config = ChanMgrConfig::new(inert.config.channel.clone())
209
            .with_my_addrs(inert.config.relay.advertise.all_addr())
210
            .with_auth_material(Arc::new(init_key_material.chan_auth_keys));
211
        let chanmgr = Arc::new(
212
            ChanMgr::new(
213
                runtime.clone(),
214
                config,
215
                Dormancy::Active,
216
                // TODO: It seems wrong to start with the compiled-in defaults when we might have
217
                // a newer network status on disk that would provide a better initial value,
218
                // but `TorClient` does this too so let's not worry about it.
219
                &NetParameters::default(),
220
                memquota.clone(),
221
            )
222
            .context("Failed to build chan manager")?,
223
        );
224

            
225
        // Init the relay's client.
226
        let client = RelayClient::new(
227
            runtime.clone(),
228
            Arc::clone(&chanmgr),
229
            &inert.config,
230
            &inert.config,
231
            inert.dirmgr_config,
232
            inert.state_mgr,
233
        )
234
        .context("Failed to construct the relay's client")?;
235

            
236
        // Circuit-related network status parameters.
237
        let circ_net_params = build_circ_net_params(client.dirmgr().params().as_ref().as_ref())
238
            .context("Failed to build circuit parameters for CREATE* request handler")?;
239

            
240
        // A handler that will process CREATE* requests on channels.
241
        let create_request_handler = CreateRequestHandler::new(
242
            Arc::downgrade(&chanmgr) as Weak<_>,
243
            circ_net_params,
244
            init_key_material.ntor_keys,
245
            Box::new(|| Box::new(RequestFilter::default()) as Box<_>),
246
        );
247
        let create_request_handler = Arc::new(create_request_handler);
248

            
249
        // Configure the channel manager to handle CREATE* requests.
250
        //
251
        // We do this once, and can later update its network parameters and keys using the
252
        // `Arc` handle that we store.
253
        // The `ChanMgr` will hold an `Arc<CreateRequestHandler>` and
254
        // the `CreateRequestHandler` will hold a `Weak<ChanMgr>`.
255
        //
256
        // We could technically do something fancier by creating the `ChanMgr` and handler
257
        // inside an `Arc::new_cyclic()` and pass the handler as part of the `ChanMgrConfig`,
258
        // but the code becomes a mess.
259
        chanmgr
260
            .set_create_request_handler(Arc::clone(&create_request_handler))
261
            .context("Failed to set the CREATE* request handler")?;
262

            
263
        // We don't use any custom options on the listening socket.
264
        let listen_options = Default::default();
265

            
266
        // An iterator of `listen()` futures with some extra error handling.
267
        let or_listeners = inert.config.relay.listen.addrs().map(async |addr| {
268
            match runtime.listen(addr, &listen_options).await {
269
                Ok(x) => Some(Ok(x)),
270
                // If we don't support the address family (typically IPv6), only warn.
271
                #[cfg(unix)]
272
                Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
273
                    let message =
274
                        format!("Could not listen at {addr}: address family not supported");
275
                    if addr.is_ipv6() {
276
                        warn!("{message}");
277
                    } else {
278
                        // If we got `EAFNOSUPPORT` for a non-IPv6 address, then warn louder.
279
                        tor_error::warn_report!(e, "{message}");
280
                    }
281
                    None
282
                }
283
                Err(e) => {
284
                    Some(Err(e).with_context(|| format!("Failed to listen at address {addr}")))
285
                }
286
            }
287
        });
288

            
289
        // We await the futures sequentially rather than with something like `join_all` to make
290
        // errors more reproducible.
291
        let or_listeners = {
292
            let mut awaited_listeners = vec![];
293
            for listener in or_listeners {
294
                match listener.await {
295
                    Some(Ok(x)) => awaited_listeners.push(x),
296
                    Some(Err(e)) => return Err(e),
297
                    None => {}
298
                };
299
            }
300
            awaited_listeners
301
        };
302

            
303
        // Typically we would have returned with an error if we failed to listen on an address,
304
        // but we ignore `EAFNOSUPPORT` errors above, so it's possible that all failed with
305
        // `EAFNOSUPPORT` and we ended up here.
306
        if or_listeners.is_empty() {
307
            return Err(anyhow::anyhow!(
308
                "Could not listen at any OR port addresses: {}",
309
                iter_join(", ", inert.config.relay.listen.addrs()),
310
            ));
311
        }
312

            
313
        Ok(Self {
314
            runtime,
315
            memquota,
316
            client,
317
            chanmgr,
318
            create_request_handler,
319
            keymgr: inert.keymgr,
320
            or_listeners,
321
        })
322
    }
323

            
324
    /// Run the actual relay.
325
    ///
326
    /// This only returns if something has gone wrong.
327
    /// Otherwise it runs forever.
328
    pub(crate) async fn run(self) -> anyhow::Result<void::Void> {
329
        let mut task_handles = JoinSet::new();
330

            
331
        // Channel housekeeping task.
332
        task_handles.spawn({
333
            let mut t = crate::tasks::ChannelHouseKeepingTask::new(&self.chanmgr);
334
            async move {
335
                t.start()
336
                    .await
337
                    .context("Failed to run channel house keeping task")
338
            }
339
        });
340

            
341
        // Update the CREATE* request handler when there are new network parameters.
342
        task_handles.spawn({
343
            let create_request_handler = Arc::clone(&self.create_request_handler);
344
            let dir_provider = Arc::clone(self.client.dirmgr());
345
            async {
346
                crate::tasks::channel::update_create_request_handler_netparams(
347
                    create_request_handler,
348
                    dir_provider as Arc<_>,
349
                )
350
                .await
351
                .context("Failed to run create request handler update task")
352
            }
353
        });
354

            
355
        // Listen for new Tor (OR) connections.
356
        task_handles.spawn({
357
            let runtime = self.runtime.clone();
358
            let chanmgr = Arc::clone(&self.chanmgr);
359
            async {
360
                // TODO: Should we give all tasks a `start` method?
361
                crate::tasks::listeners::or_listener(runtime, chanmgr, self.or_listeners)
362
                    .await
363
                    .context("Failed to run OR listener task")
364
            }
365
        });
366

            
367
        // Start the crypto task.
368
        task_handles.spawn({
369
            let reactor = crate::tasks::crypto::Reactor::new(
370
                self.runtime.clone(),
371
                self.chanmgr.clone(),
372
                self.create_request_handler.clone(),
373
                self.keymgr,
374
                self.client.dirmgr().clone(),
375
            )?;
376
            async {
377
                reactor
378
                    .run()
379
                    .await
380
                    .context("Failed to run key rotation task")
381
            }
382
        });
383

            
384
        // Launch client tasks.
385
        //
386
        // We need to hold on to these handles until the relay stops, otherwise dropping these
387
        // handles would stop the background tasks.
388
        //
389
        // These are `tor_rtcompat::scheduler::TaskHandle`s, which don't notify us if they
390
        // stop/crash.
391
        //
392
        // TODO: Whose responsibility is it to ensure that these background tasks don't crash?
393
        // Should we have a way of monitoring these tasks? Or should the circuit manager re-launch
394
        // crashed tasks?
395
        let _client_task_handles = self.client.launch_background_tasks();
396

            
397
        // TODO: More tasks will be spawned here.
398

            
399
        // Now that background tasks are started, bootstrap the client.
400
        self.client
401
            .bootstrap()
402
            .await
403
            .context("Failed to bootstrap the relay's client")?;
404

            
405
        // We block until facism is erradicated or a task ends which means the relay will shutdown
406
        // and facism will have one more chance.
407
        let void = task_handles
408
            .join_next()
409
            .await
410
            .context("Relay task set is empty")?
411
            .context("Relay task join failed")?
412
            .context("Relay task stopped unexpectedly")?;
413

            
414
        // We can never get here since a `Void` cannot be constructed.
415
        void::unreachable(void);
416
    }
417
}