1
//! Handler for CREATE* cells.
2

            
3
use crate::FlowCtrlParameters;
4
use crate::ccparams::{
5
    Algorithm, AlgorithmDiscriminants, CongestionControlParams, CongestionWindowParams,
6
    FixedWindowParams, RoundTripEstimatorParams, VegasParams,
7
};
8
use crate::channel::Channel;
9
use crate::circuit::CircuitRxSender;
10
use crate::circuit::UniqId;
11
use crate::circuit::celltypes::{CreateRequest, CreateResponse};
12
use crate::circuit::circhop::{HopNegotiationType, HopSettings};
13
use crate::client::circuit::CircParameters;
14
use crate::client::circuit::padding::PaddingController;
15
use crate::crypto::binding::CircuitBinding;
16
use crate::crypto::cell::CryptInit as _;
17
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer, RelayLayer, tor1};
18
use crate::crypto::handshake::RelayHandshakeError;
19
use crate::crypto::handshake::ServerHandshake as _;
20
use crate::crypto::handshake::fast::CreateFastServer;
21
use crate::crypto::handshake::ntor::{NtorSecretKey, NtorServer};
22
use crate::memquota::SpecificAccount as _;
23
use crate::memquota::{ChannelAccount, CircuitAccount};
24
use crate::relay::channel_provider::ChannelProvider;
25
use crate::relay::reactor::Reactor;
26
use crate::relay::{IncomingStreamRequestFilter, RelayCirc};
27
use smallvec::SmallVec;
28
use std::sync::{Arc, RwLock, Weak};
29
use tor_cell::chancell::ChanMsg as _;
30
use tor_cell::chancell::CircId;
31
use tor_cell::chancell::msg::{
32
    CreateFast, Created2, CreatedFast, Destroy, DestroyReason, HandshakeType,
33
};
34
use tor_error::{Bug, ErrorKind, HasKind, debug_report, internal, into_internal};
35
use tor_linkspec::OwnedChanTarget;
36
use tor_llcrypto::cipher::aes::Aes128Ctr;
37
use tor_llcrypto::d::Sha1;
38
use tor_llcrypto::pk::ed25519::Ed25519Identity;
39
use tor_llcrypto::pk::rsa::RsaIdentity;
40
use tor_memquota::mq_queue::ChannelSpec as _;
41
use tor_memquota::mq_queue::MpscSpec;
42
use tor_relay_crypto::pk::{RelayNtorKeypair, RelayNtorKeys};
43
use tor_rtcompat::SpawnExt as _;
44
use tor_rtcompat::{DynTimeProvider, Runtime};
45
use tracing::warn;
46

            
47
/// Everything needed to handle CREATE* messages on channels.
48
#[derive(derive_more::Debug)]
49
pub struct CreateRequestHandler {
50
    /// Something that can launch channels. Typically the `ChanMgr`.
51
    chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
52
    /// Circuit-related network parameters.
53
    circ_net_params: RwLock<CircNetParameters>,
54
    /// The circuit extension keys.
55
    #[debug(skip)]
56
    ntor_keys: RwLock<RelayNtorKeys>,
57
    /// An [`IncomingStreamRequestFilter`] factory for checking whether the user wants
58
    /// this request, or wants to reject it immediately.
59
    ///
60
    /// Used for obtaining a current [`IncomingStreamRequestFilter`]
61
    /// for building a circuit reactor.
62
    //
63
    // TODO(relay): it's likely this will end up changing quite a bit once we start
64
    // figuring out exactly how the config/reconfigure() logic and IncomingStreamRequestFilter
65
    // should function for relays.
66
    #[debug(skip)]
67
    incoming_filter_factory: Box<dyn IncomingStreamRequestFilterFactory + Send + Sync>,
68
}
69

            
70
impl CreateRequestHandler {
71
    /// Build a new [`CreateRequestHandler`].
72
22
    pub fn new(
73
22
        chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
74
22
        circ_net_params: CircNetParameters,
75
22
        ntor_keys: RelayNtorKeys,
76
22
        incoming_filter_factory: Box<dyn IncomingStreamRequestFilterFactory + Send + Sync>,
77
22
    ) -> Self {
78
22
        Self {
79
22
            chan_provider,
80
22
            circ_net_params: RwLock::new(circ_net_params),
81
22
            ntor_keys: RwLock::new(ntor_keys),
82
22
            incoming_filter_factory,
83
22
        }
84
22
    }
85

            
86
    /// Update the circuit parameters from a network consensus.
87
    pub fn update_params(&self, circ_net_params: CircNetParameters) {
88
        *self.circ_net_params.write().expect("rwlock poisoned") = circ_net_params;
89
    }
90

            
91
    /// Update the handler with a new set of circuit extension keys.
92
    ///
93
    /// This is called periodically by the relay key rotation task.
94
    pub fn update_ntor_keys(&self, ntor_keys: RelayNtorKeys) {
95
        *self.ntor_keys.write().expect("rwlock poisoned") = ntor_keys;
96
    }
97

            
98
    /// Handle a CREATE* cell.
99
    ///
100
    /// This intentionally does not return a [`crate::Error`] so that we don't accidentally shut
101
    /// down the channel reactor when we really should be returning a DESTROY. Shutting down a
102
    /// channel may cause us to leak information about paths of circuits travelling through this
103
    /// relay. This is especially important here since we're handling data that is controllable from
104
    /// the other end of the circuit.
105
    #[allow(clippy::too_many_arguments)]
106
    pub(crate) fn handle_create<R: Runtime>(
107
        &self,
108
        runtime: &R,
109
        channel: &Arc<Channel>,
110
        our_ed25519_id: &Ed25519Identity,
111
        our_rsa_id: &RsaIdentity,
112
        circ_id: CircId,
113
        msg: &CreateRequest,
114
        memquota: &ChannelAccount,
115
        circ_unique_id: UniqId,
116
    ) -> Result<(CreateResponse, RelayCircComponents), Destroy> {
117
        let result = self.handle_create_inner(
118
            runtime,
119
            channel,
120
            our_ed25519_id,
121
            our_rsa_id,
122
            circ_id,
123
            msg,
124
            memquota,
125
            circ_unique_id,
126
        );
127

            
128
        match result {
129
            Ok(x) => Ok(x),
130
            Err(e) => {
131
                // TODO(relay): The log messages throughout could be very noisy, so should have rate limiting.
132
                let cmd = msg.cmd();
133
                debug_report!(&e, %cmd, "Failed to handle circuit create request");
134

            
135
                // `tor-spec/tearing-down-circuits.md`:
136
                //
137
                // > Implementations SHOULD always use the NONE reason to avoid side channels: [...]
138
                Err(Destroy::new(DestroyReason::NONE))
139
            }
140
        }
141
    }
142

            
143
    /// See [`Self::handle_create`].
144
    #[allow(clippy::too_many_arguments)]
145
    fn handle_create_inner<R: Runtime>(
146
        &self,
147
        runtime: &R,
148
        channel: &Arc<Channel>,
149
        our_ed25519_id: &Ed25519Identity,
150
        our_rsa_id: &RsaIdentity,
151
        circ_id: CircId,
152
        msg: &CreateRequest,
153
        memquota: &ChannelAccount,
154
        circ_unique_id: UniqId,
155
    ) -> Result<(CreateResponse, RelayCircComponents), HandleCreateError> {
156
        // Perform the handshake crypto and build the response.
157
        let handshake_components = match msg {
158
            CreateRequest::CreateFast(msg) => self.handle_create_fast(msg)?,
159
            CreateRequest::Create2(msg) => match msg.handshake_type() {
160
                HandshakeType::NTOR_V3 => self.handle_create2_ntorv3(msg.body(), our_ed25519_id)?,
161
                HandshakeType::NTOR => self.handle_create2_ntor(msg.body(), our_rsa_id)?,
162
                x @ HandshakeType::TAP | x => {
163
                    return Err(HandleCreateError::Create2HandshakeType(x));
164
                }
165
            },
166
        };
167

            
168
        let memquota = CircuitAccount::new(memquota)?;
169

            
170
        // We use a large mpsc queue here since a circuit should never block the channel,
171
        // and we hope that memquota will help us if an attacker intentionally fills this buffer.
172
        // We use `10_000_000` since `usize::MAX` causes `futures::channel::mpsc` to panic.
173
        // TODO(relay): We should switch to an unbounded queue, but the circuit reactor is expecting
174
        // a bounded queue.
175
        let time_provider = DynTimeProvider::new(runtime.clone());
176
        let account = memquota.as_raw_account();
177
        let (sender, receiver) =
178
            MpscSpec::new(10_000_000).new_mq(time_provider.clone(), account)?;
179
        let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver);
180

            
181
        // TODO(relay): Do we really want a client padding machine here?
182
        let (padding_ctrl, padding_stream) =
183
            crate::client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
184

            
185
        // Upgrade the channel provider, which in practice is the `ChanMgr` so this should not fail.
186
        let Some(chan_provider) = self.chan_provider.upgrade() else {
187
            return Err(internal!("Unable to upgrade weak `ChannelProvider`").into());
188
        };
189

            
190
        // Create an IncomingStreamRequestFilter for this circuit.
191
        // This will get applied to every stream request (BEGIN, BEGIN_DIR, RESOLVE)
192
        // arriving on the circuit.
193
        //
194
        // Note: once built, a circuit reactor's IncomingStreamRequestFilter cannot be changed
195
        // (it's fixed for the entire duration of the circuit).
196
        let incoming_filter = self.incoming_filter_factory.current_filter();
197

            
198
        // Build the relay circuit reactor.
199
        let (reactor, circ, _incoming_streams) = Reactor::new(
200
            runtime.clone(),
201
            channel,
202
            circ_id,
203
            circ_unique_id,
204
            receiver,
205
            handshake_components.crypto_in,
206
            handshake_components.crypto_out,
207
            &handshake_components.hop_settings,
208
            chan_provider,
209
            padding_ctrl.clone(),
210
            padding_stream,
211
            incoming_filter,
212
            &memquota,
213
        )
214
        .map_err(into_internal!("Failed to start circuit reactor"))?;
215

            
216
        // TODO(relay): send the incoming_streams stream to the handler in arti-relay
217

            
218
        // Start the reactor in a task.
219
        let () = runtime.spawn(async {
220
            match reactor.run().await {
221
                Ok(()) => {}
222
                Err(e) => {
223
                    debug_report!(e, "Relay circuit reactor exited with an error");
224
                }
225
            }
226
        })?;
227

            
228
        Ok((
229
            handshake_components.response,
230
            RelayCircComponents {
231
                circ,
232
                sender,
233
                padding_ctrl,
234
            },
235
        ))
236
    }
237

            
238
    /// The handshake code for a CREATE_FAST request.
239
    fn handle_create_fast(
240
        &self,
241
        msg: &CreateFast,
242
    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
243
        // TODO(relay): We might want to offload this to a CPU worker in the future.
244
        let (keygen, handshake_msg) = CreateFastServer::server(
245
            &mut rand::rng(),
246
            // The CREATE_FAST handshake doesn't accept or return extensions,
247
            // so this `AuxDataReply` is a no-op.
248
            &mut |_: &()| Some(()),
249
            // The CREATE_FAST handshake doesn't use any keys.
250
            &[()],
251
            msg.handshake(),
252
        )?;
253

            
254
        let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
255
            .map_err(into_internal!("Circuit crypt state construction failed"))?;
256

            
257
        let circ_params = self
258
            .circ_net_params
259
            .read()
260
            .expect("rwlock poisoned")
261
            // CREATE_FAST always uses fixed-window flow control.
262
            .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
263

            
264
        // TODO(relay): I don't think that this is the right way to do this. It works for
265
        // CREATE_FAST, but we might want to rethink it for CREATE2.
266
        let protos = tor_protover::Protocols::default();
267
        let hop_settings =
268
            HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
269
                .map_err(into_internal!("Unable to build `HopSettings`"))?;
270

            
271
        let response = CreatedFast::new(handshake_msg);
272
        let response = CreateResponse::CreatedFast(response);
273

            
274
        let (crypto_out, crypto_in, _binding) = split_relay_layer(crypt);
275

            
276
        Ok(CompletedHandshakeComponents {
277
            response,
278
            hop_settings,
279
            crypto_out,
280
            crypto_in,
281
        })
282
    }
283

            
284
    /// The handshake code for a CREATE2 ntor (non-v3) request.
285
    fn handle_create2_ntor(
286
        &self,
287
        msg_body: &[u8],
288
        our_rsa_id: &RsaIdentity,
289
    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
290
        let ntor_keys = self.ntor_keys(|k| {
291
            NtorSecretKey::new(k.secret().clone(), *k.public().inner(), *our_rsa_id)
292
        });
293

            
294
        // TODO(relay): We might want to offload this to a CPU worker in the future.
295
        let (keygen, handshake_msg) = NtorServer::server(
296
            &mut rand::rng(),
297
            // The ntor (non-v3) handshake doesn't accept or return extensions,
298
            // so this `AuxDataReply` is a no-op.
299
            &mut |_: &()| Some(()),
300
            ntor_keys.as_ref(),
301
            msg_body,
302
        )?;
303

            
304
        let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
305
            .map_err(into_internal!("Circuit crypt state construction failed"))?;
306

            
307
        let (crypto_out, crypto_in, _binding) = split_relay_layer(crypt);
308

            
309
        let circ_params = self
310
            .circ_net_params
311
            .read()
312
            .expect("rwlock poisoned")
313
            // CREATE2 with ntor (non-v3) always uses fixed-window flow control.
314
            .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
315

            
316
        // TODO(relay): I don't think that this is the right way to do this. It works for
317
        // ntor, but won't work well for ntor-v3.
318
        let protos = tor_protover::Protocols::default();
319
        let hop_settings =
320
            HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
321
                .map_err(into_internal!("Unable to build `HopSettings`"))?;
322

            
323
        let response = Created2::new(handshake_msg);
324
        let response = CreateResponse::Created2(response);
325

            
326
        Ok(CompletedHandshakeComponents {
327
            response,
328
            hop_settings,
329
            crypto_out,
330
            crypto_in,
331
        })
332
    }
333

            
334
    /// The handshake code for a CREATE2 ntor-v3 request.
335
    fn handle_create2_ntorv3(
336
        &self,
337
        _msg_body: &[u8],
338
        _our_ed25519_id: &Ed25519Identity,
339
    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
340
        Err(HandleCreateError::Create2HandshakeType(
341
            HandshakeType::NTOR_V3,
342
        ))
343
    }
344

            
345
    /// Helper to get the ntor keypairs after some transformation `map`.
346
    ///
347
    /// The `map` transformation must be fast since it blocks a read lock.
348
    /// The returned keys are sorted with the most recent key first.
349
    ///
350
    /// It would be nice if this just returned an iterator,
351
    /// but the read lock prevents this.
352
    fn ntor_keys<T>(&self, map: impl FnMut(&RelayNtorKeypair) -> T) -> impl AsRef<[T]> {
353
        let ntor_keys = self.ntor_keys.read().expect("rwlock poisoned");
354
        let ntor_keys = [Some(ntor_keys.latest()), ntor_keys.previous()];
355
        ntor_keys
356
            .into_iter()
357
            .flatten()
358
            .map(map)
359
            .collect::<SmallVec<[T; 2]>>()
360
    }
361
}
362

            
363
/// Helper function to split a `RelayLayer` into forward and backward type-erased trait objects.
364
fn split_relay_layer<F, B>(
365
    crypt: impl RelayLayer<F, B>,
366
) -> (
367
    Box<dyn OutboundRelayLayer + Send>,
368
    Box<dyn InboundRelayLayer + Send>,
369
    CircuitBinding,
370
)
371
where
372
    F: OutboundRelayLayer + Send + 'static,
373
    B: InboundRelayLayer + Send + 'static,
374
{
375
    let (crypto_out, crypto_in, binding) = crypt.split_relay_layer();
376
    let (crypto_out, crypto_in) = (Box::new(crypto_out), Box::new(crypto_in));
377

            
378
    (crypto_out, crypto_in, binding)
379
}
380

            
381
/// An error that occurred while handling a CREATE* request.
382
#[derive(Debug, thiserror::Error)]
383
enum HandleCreateError {
384
    /// Circuit relay handshake failed.
385
    #[error("Circuit relay handshake failed")]
386
    Handshake(#[from] RelayHandshakeError),
387
    /// The requested handshake type is unsupported.
388
    #[error("Unsupported handshake type {0}")]
389
    Create2HandshakeType(HandshakeType),
390
    /// A memquota error.
391
    #[error("Memquota error")]
392
    Memquota(#[from] tor_memquota::Error),
393
    /// Error when spawning a task.
394
    #[error("Runtime task spawn error")]
395
    Spawn(#[from] futures::task::SpawnError),
396
    /// An internal error.
397
    ///
398
    /// Note that other variants (such as `Handshake` containing a [`RelayHandshakeError`])
399
    /// may themselves contain internal errors.
400
    #[error("Internal error")]
401
    Internal(#[from] tor_error::Bug),
402
}
403

            
404
impl HasKind for HandleCreateError {
405
    fn kind(&self) -> ErrorKind {
406
        match self {
407
            Self::Handshake(e) => e.kind(),
408
            Self::Create2HandshakeType(_) => ErrorKind::NotImplemented,
409
            Self::Memquota(e) => e.kind(),
410
            Self::Spawn(e) => e.kind(),
411
            Self::Internal(_) => ErrorKind::Internal,
412
        }
413
    }
414
}
415

            
416
/// The components of a completed CREATE* handshake.
417
struct CompletedHandshakeComponents {
418
    /// The message to send in response.
419
    response: CreateResponse,
420
    /// The negotiated hop settings.
421
    hop_settings: HopSettings,
422
    /// Outbound onion crypto.
423
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
424
    /// Inbound onion crypto.
425
    crypto_in: Box<dyn InboundRelayLayer + Send>,
426
}
427

            
428
/// A collection of objects built for a new relay circuit.
429
pub(crate) struct RelayCircComponents {
430
    /// The relay circuit handle.
431
    pub(crate) circ: Arc<RelayCirc>,
432
    /// Used to send data from the channel to the circuit reactor.
433
    pub(crate) sender: CircuitRxSender,
434
    /// The circuit's padding controller.
435
    pub(crate) padding_ctrl: PaddingController,
436
}
437

            
438
/// Congestion control network parameters.
439
#[derive(Debug, Clone)]
440
#[allow(clippy::exhaustive_structs)]
441
pub struct CongestionControlNetParams {
442
    /// Fixed-window algorithm parameters.
443
    pub fixed_window: FixedWindowParams,
444

            
445
    /// Vegas algorithm parameters for exit circuits.
446
    // NOTE: In this module we are handling CREATE* cells,
447
    // which only happens for non-hs circuits.
448
    // So we don't need to store the vegas hs parameters here.
449
    pub vegas_exit: VegasParams,
450

            
451
    /// Congestion window parameters.
452
    pub cwnd: CongestionWindowParams,
453

            
454
    /// RTT calculation parameters.
455
    pub rtt: RoundTripEstimatorParams,
456

            
457
    /// Flow control parameters to use for all streams on this circuit.
458
    pub flow_ctrl: FlowCtrlParameters,
459
}
460

            
461
impl CongestionControlNetParams {
462
    #[cfg(test)]
463
    // These have been copied from C-tor.
464
22
    pub(crate) fn defaults_for_tests() -> Self {
465
22
        Self {
466
22
            fixed_window: FixedWindowParams::defaults_for_tests(),
467
22
            vegas_exit: VegasParams::defaults_for_tests(),
468
22
            cwnd: CongestionWindowParams::defaults_for_tests(),
469
22
            rtt: RoundTripEstimatorParams::defaults_for_tests(),
470
22
            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
471
22
        }
472
22
    }
473
}
474

            
475
/// Network consensus parameters for handling incoming circuits.
476
///
477
/// Unlike `CircParameters`,
478
/// this is unopinionated and contains all relevant consensus parameters,
479
/// which is needed when handling an incoming CREATE* request where the
480
/// circuit origin chooses the type/settings
481
/// (for example congestion control type) of the circuit.
482
#[derive(Debug, Clone)]
483
#[allow(clippy::exhaustive_structs)]
484
pub struct CircNetParameters {
485
    /// Whether we should include ed25519 identities when we send EXTEND2 cells.
486
    pub extend_by_ed25519_id: bool,
487

            
488
    /// Congestion control network parameters.
489
    pub cc: CongestionControlNetParams,
490
}
491

            
492
impl CircNetParameters {
493
    /// Convert the [`CircNetParameters`] into a [`CircParameters`].
494
    ///
495
    /// We expect the circuit creation handshake to know what congestion control algorithm was
496
    /// negotiated, and provide that as `algorithm`.
497
    //
498
    // We disable `unused` warnings at the root of tor-proto,
499
    // but it's nice to have here so we re-enable it.
500
    #[warn(unused)]
501
    fn as_circ_parameters(&self, algorithm: AlgorithmDiscriminants) -> Result<CircParameters, Bug> {
502
        // Unpack everything to make sure that we aren't missing anything
503
        // (otherwise clippy would warn).
504
        let Self {
505
            extend_by_ed25519_id,
506
            cc:
507
                CongestionControlNetParams {
508
                    fixed_window,
509
                    vegas_exit,
510
                    cwnd,
511
                    rtt,
512
                    flow_ctrl,
513
                },
514
        } = self;
515

            
516
        let algorithm = match algorithm {
517
            AlgorithmDiscriminants::FixedWindow => Algorithm::FixedWindow(*fixed_window),
518
            AlgorithmDiscriminants::Vegas => Algorithm::Vegas(*vegas_exit),
519
        };
520

            
521
        // TODO(arti#2442): The builder pattern here seems like a footgun.
522
        let cc = CongestionControlParams::builder()
523
            .alg(algorithm)
524
            .fixed_window_params(*fixed_window)
525
            .cwnd_params(*cwnd)
526
            .rtt_params(rtt.clone())
527
            .build()
528
            .map_err(into_internal!("Could not build `CongestionControlParams`"))?;
529

            
530
        Ok(CircParameters::new(
531
            *extend_by_ed25519_id,
532
            cc,
533
            flow_ctrl.clone(),
534
        ))
535
    }
536
}
537

            
538
/// An [`IncomingStreamRequestFilter`] factory for building [`IncomingStreamRequestFilter`]s.
539
///
540
/// Each time a new circuit is opened, the [`CreateRequestHandler`] calls
541
/// [`IncomingStreamRequestFilterFactory::current_filter`] to build
542
/// an [`IncomingStreamRequestFilter`] for the circuit.
543
pub trait IncomingStreamRequestFilterFactory {
544
    /// Return the [`IncomingStreamRequestFilter`] to apply to the incoming stream requests
545
    /// arriving on a circuit.
546
    fn current_filter(&self) -> Box<dyn IncomingStreamRequestFilter>;
547
}
548

            
549
impl<F> IncomingStreamRequestFilterFactory for F
550
where
551
    F: Fn() -> Box<dyn IncomingStreamRequestFilter>,
552
{
553
    fn current_filter(&self) -> Box<dyn IncomingStreamRequestFilter> {
554
        (self)()
555
    }
556
}