1
//! Handler for CREATE* cells.
2

            
3
use crate::FlowCtrlParameters;
4
use crate::ccparams::{
5
    Algorithm, AlgorithmDiscriminants, CongestionControlParams, CongestionWindowParams,
6
    FixedWindowParams, RoundTripEstimatorParams, VegasParams,
7
};
8
use crate::channel::Channel;
9
use crate::circuit::CircuitRxSender;
10
use crate::circuit::UniqId;
11
use crate::circuit::celltypes::{CreateRequest, CreateResponse};
12
use crate::circuit::circhop::{HopNegotiationType, HopSettings};
13
use crate::client::circuit::CircParameters;
14
use crate::client::circuit::padding::PaddingController;
15
use crate::crypto::cell::CryptInit as _;
16
use crate::crypto::cell::RelayLayer as _;
17
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer, tor1};
18
use crate::crypto::handshake::RelayHandshakeError;
19
use crate::crypto::handshake::ServerHandshake as _;
20
use crate::crypto::handshake::fast::CreateFastServer;
21
use crate::memquota::SpecificAccount as _;
22
use crate::memquota::{ChannelAccount, CircuitAccount};
23
use crate::relay::RelayCirc;
24
use crate::relay::channel_provider::ChannelProvider;
25
use crate::relay::reactor::Reactor;
26
use smallvec::{SmallVec, smallvec};
27
use std::sync::{Arc, RwLock, Weak};
28
use tor_cell::chancell::ChanMsg as _;
29
use tor_cell::chancell::CircId;
30
use tor_cell::chancell::msg::{CreateFast, CreatedFast, Destroy, DestroyReason};
31
use tor_error::{Bug, ErrorKind, HasKind, debug_report, internal, into_internal};
32
use tor_linkspec::OwnedChanTarget;
33
use tor_llcrypto::cipher::aes::Aes128Ctr;
34
use tor_llcrypto::d::Sha1;
35
use tor_llcrypto::pk::ed25519::Ed25519Identity;
36
use tor_memquota::mq_queue::ChannelSpec as _;
37
use tor_memquota::mq_queue::MpscSpec;
38
use tor_relay_crypto::pk::RelayNtorKeypair;
39
use tor_rtcompat::SpawnExt as _;
40
use tor_rtcompat::{DynTimeProvider, Runtime};
41
use tracing::warn;
42

            
43
/// The usual number of ntor keys.
44
const NTOR_KEY_COUNT: usize = 2;
45

            
46
/// Everything needed to handle CREATE* messages on channels.
47
#[derive(derive_more::Debug)]
48
pub struct CreateRequestHandler {
49
    /// Something that can launch channels. Typically the `ChanMgr`.
50
    chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
51
    /// Circuit-related network parameters.
52
    circ_net_params: RwLock<CircNetParameters>,
53
    /// The circuit extension keys.
54
    #[debug(skip)]
55
    ntor_keys: RwLock<SmallVec<[RelayNtorKeypair; NTOR_KEY_COUNT]>>,
56
}
57

            
58
impl CreateRequestHandler {
59
    /// Build a new [`CreateRequestHandler`].
60
22
    pub fn new(
61
22
        chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
62
22
        circ_net_params: CircNetParameters,
63
22
    ) -> Self {
64
22
        Self {
65
22
            chan_provider,
66
22
            circ_net_params: RwLock::new(circ_net_params),
67
22
            // Initially there are no keys. This will be updated when the keys are read/generated
68
22
            // by the crypto task.
69
22
            //
70
22
            // TODO(relay): I think it would be better to set these from the beginning,
71
22
            // but this will involve reading the ntor keys from the keystore
72
22
            // (or generating them, if they are missing)
73
22
            // outside of the relay crypto task, so it will require a bit of
74
22
            // refactoring to make sure we are not duplicating the logic from there.
75
22
            //
76
22
            // But I think this is worthwhile, because otherwise we could end up
77
22
            // starting to handle incoming CREATE2 requests before we the ntor keys are set.
78
22
            ntor_keys: RwLock::new(smallvec![]),
79
22
        }
80
22
    }
81

            
82
    /// Update the circuit parameters from a network consensus.
83
    pub fn update_params(&self, circ_net_params: CircNetParameters) {
84
        *self.circ_net_params.write().expect("rwlock poisoned") = circ_net_params;
85
    }
86

            
87
    /// Update the handler with a new set of circuit extension keys.
88
    ///
89
    /// This is called periodically by the relay key rotation task.
90
    pub fn update_ntor_keys(&self, ntor_keys: SmallVec<[RelayNtorKeypair; NTOR_KEY_COUNT]>) {
91
        *self.ntor_keys.write().expect("rwlock poisoned") = ntor_keys;
92
    }
93

            
94
    /// Handle a CREATE* cell.
95
    ///
96
    /// This intentionally does not return a [`crate::Error`] so that we don't accidentally shut
97
    /// down the channel reactor when we really should be returning a DESTROY. Shutting down a
98
    /// channel may cause us to leak information about paths of circuits travelling through this
99
    /// relay. This is especially important here since we're handling data that is controllable from
100
    /// the other end of the circuit.
101
    #[allow(clippy::too_many_arguments)]
102
    pub(crate) fn handle_create<R: Runtime>(
103
        &self,
104
        runtime: &R,
105
        channel: &Arc<Channel>,
106
        our_ed25519_id: &Ed25519Identity,
107
        circ_id: CircId,
108
        msg: &CreateRequest,
109
        memquota: &ChannelAccount,
110
        circ_unique_id: UniqId,
111
    ) -> Result<(CreateResponse, RelayCircComponents), Destroy> {
112
        let result = self.handle_create_inner(
113
            runtime,
114
            channel,
115
            our_ed25519_id,
116
            circ_id,
117
            msg,
118
            memquota,
119
            circ_unique_id,
120
        );
121

            
122
        match result {
123
            Ok(x) => Ok(x),
124
            Err(e) => {
125
                // TODO(relay): The log messages throughout could be very noisy, so should have rate limiting.
126
                let cmd = msg.cmd();
127
                debug_report!(&e, %cmd, "Failed to handle circuit create request");
128
                Err(Destroy::new(e.destroy_reason()))
129
            }
130
        }
131
    }
132

            
133
    /// See [`Self::handle_create`].
134
    #[allow(clippy::too_many_arguments)]
135
    fn handle_create_inner<R: Runtime>(
136
        &self,
137
        runtime: &R,
138
        channel: &Arc<Channel>,
139
        // TODO(relay): Use this for ntor handshakes.
140
        _our_ed25519_id: &Ed25519Identity,
141
        circ_id: CircId,
142
        msg: &CreateRequest,
143
        memquota: &ChannelAccount,
144
        circ_unique_id: UniqId,
145
    ) -> Result<(CreateResponse, RelayCircComponents), HandleCreateError> {
146
        // Perform the handshake crypto and build the response.
147
        let handshake_components = match msg {
148
            CreateRequest::CreateFast(msg) => self.handle_create_fast(msg)?,
149
            CreateRequest::Create2(_) => {
150
                // TODO(relay): We might want to offload this to a CPU worker in the future.
151
                // TODO(relay): Implement this.
152
                return Err(internal!("Not implemented").into());
153
            }
154
        };
155

            
156
        let memquota = CircuitAccount::new(memquota)?;
157

            
158
        // We use a large mpsc queue here since a circuit should never block the channel,
159
        // and we hope that memquota will help us if an attacker intentionally fills this buffer.
160
        // We use `10_000_000` since `usize::MAX` causes `futures::channel::mpsc` to panic.
161
        // TODO(relay): We should switch to an unbounded queue, but the circuit reactor is expecting
162
        // a bounded queue.
163
        let time_provider = DynTimeProvider::new(runtime.clone());
164
        let account = memquota.as_raw_account();
165
        let (sender, receiver) = MpscSpec::new(10_000_000).new_mq(time_provider, account)?;
166

            
167
        // TODO(relay): Do we really want a client padding machine here?
168
        let (padding_ctrl, padding_stream) =
169
            crate::client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
170

            
171
        // Upgrade the channel provider, which in practice is the `ChanMgr` so this should not fail.
172
        let Some(chan_provider) = self.chan_provider.upgrade() else {
173
            return Err(internal!("Unable to upgrade weak `ChannelProvider`").into());
174
        };
175

            
176
        // Build the relay circuit reactor.
177
        let (reactor, circ) = Reactor::new(
178
            runtime.clone(),
179
            channel,
180
            circ_id,
181
            circ_unique_id,
182
            receiver,
183
            handshake_components.crypto_in,
184
            handshake_components.crypto_out,
185
            &handshake_components.hop_settings,
186
            chan_provider,
187
            padding_ctrl.clone(),
188
            padding_stream,
189
            &memquota,
190
        )
191
        .map_err(into_internal!("Failed to start circuit reactor"))?;
192

            
193
        // Start the reactor in a task.
194
        let () = runtime.spawn(async {
195
            match reactor.run().await {
196
                Ok(()) => {}
197
                Err(e) => {
198
                    debug_report!(e, "Relay circuit reactor exited with an error");
199
                }
200
            }
201
        })?;
202

            
203
        Ok((
204
            handshake_components.response,
205
            RelayCircComponents {
206
                circ,
207
                sender,
208
                padding_ctrl,
209
            },
210
        ))
211
    }
212

            
213
    /// The handshake code for a CREATE_FAST request.
214
    fn handle_create_fast(
215
        &self,
216
        msg: &CreateFast,
217
    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
218
        // TODO(relay): We might want to offload this to a CPU worker in the future.
219
        let (keygen, handshake_msg) = CreateFastServer::server(
220
            &mut rand::rng(),
221
            &mut |_: &()| Some(()),
222
            &[()],
223
            msg.handshake(),
224
        )?;
225

            
226
        let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
227
            .map_err(into_internal!("Circuit crypt state construction failed"))?;
228

            
229
        let circ_params = self
230
            .circ_net_params
231
            .read()
232
            .expect("rwlock poisoned")
233
            // CREATE_FAST always uses fixed-window flow control.
234
            .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
235

            
236
        // TODO(relay): I think we might want to get these from the consensus instead?
237
        let protos = tor_protover::Protocols::default();
238

            
239
        // TODO(relay): I'm not sure if this is the right way to do this. It works for
240
        // CREATE_FAST, but we might want to rethink it for CREATE2.
241
        let hop_settings =
242
            HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
243
                .map_err(into_internal!("Unable to build `HopSettings`"))?;
244

            
245
        let response = CreatedFast::new(handshake_msg);
246
        let response = CreateResponse::CreatedFast(response);
247

            
248
        let (crypto_out, crypto_in, _binding) = crypt.split_relay_layer();
249
        let (crypto_out, crypto_in) = (Box::new(crypto_out), Box::new(crypto_in));
250

            
251
        Ok(CompletedHandshakeComponents {
252
            response,
253
            hop_settings,
254
            crypto_out,
255
            crypto_in,
256
        })
257
    }
258
}
259

            
260
/// An error that occurred while handling a CREATE* request.
261
#[derive(Debug, thiserror::Error)]
262
enum HandleCreateError {
263
    /// Circuit relay handshake failed.
264
    #[error("Circuit relay handshake failed")]
265
    Handshake(#[from] RelayHandshakeError),
266
    /// A memquota error.
267
    #[error("Memquota error")]
268
    Memquota(#[from] tor_memquota::Error),
269
    /// Error when spawning a task.
270
    #[error("Runtime task spawn error")]
271
    Spawn(#[from] futures::task::SpawnError),
272
    /// An internal error.
273
    ///
274
    /// Note that other variants (such as `Handshake` containing a [`RelayHandshakeError`])
275
    /// may themselves contain internal errors.
276
    #[error("Internal error")]
277
    Internal(#[from] tor_error::Bug),
278
}
279

            
280
impl HandleCreateError {
281
    /// The reason to use in a DESTROY message for this failure.
282
    fn destroy_reason(&self) -> DestroyReason {
283
        // Note that this may return an INTERNAL destroy reason even when
284
        // the inner error is not `ErrorKind::Internal`.
285
        match self {
286
            Self::Handshake(e) => e.destroy_reason(),
287
            Self::Memquota(_) => DestroyReason::INTERNAL,
288
            Self::Spawn(_) => DestroyReason::INTERNAL,
289
            Self::Internal(_) => DestroyReason::INTERNAL,
290
        }
291
    }
292
}
293

            
294
impl HasKind for HandleCreateError {
295
    fn kind(&self) -> ErrorKind {
296
        match self {
297
            Self::Handshake(e) => e.kind(),
298
            Self::Memquota(e) => e.kind(),
299
            Self::Spawn(e) => e.kind(),
300
            Self::Internal(_) => ErrorKind::Internal,
301
        }
302
    }
303
}
304

            
305
/// The components of a completed CREATE* handshake.
306
struct CompletedHandshakeComponents {
307
    /// The message to send in response.
308
    response: CreateResponse,
309
    /// The negotiated hop settings.
310
    hop_settings: HopSettings,
311
    /// Outbound onion crypto.
312
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
313
    /// Inbound onion crypto.
314
    crypto_in: Box<dyn InboundRelayLayer + Send>,
315
}
316

            
317
/// A collection of objects built for a new relay circuit.
318
pub(crate) struct RelayCircComponents {
319
    /// The relay circuit handle.
320
    pub(crate) circ: Arc<RelayCirc>,
321
    /// Used to send data from the channel to the circuit reactor.
322
    pub(crate) sender: CircuitRxSender,
323
    /// The circuit's padding controller.
324
    pub(crate) padding_ctrl: PaddingController,
325
}
326

            
327
/// Congestion control network parameters.
328
#[derive(Debug, Clone)]
329
#[allow(clippy::exhaustive_structs)]
330
pub struct CongestionControlNetParams {
331
    /// Fixed-window algorithm parameters.
332
    pub fixed_window: FixedWindowParams,
333

            
334
    /// Vegas algorithm parameters for exit circuits.
335
    // NOTE: In this module we are handling CREATE* cells,
336
    // which only happens for non-hs circuits.
337
    // So we don't need to store the vegas hs parameters here.
338
    pub vegas_exit: VegasParams,
339

            
340
    /// Congestion window parameters.
341
    pub cwnd: CongestionWindowParams,
342

            
343
    /// RTT calculation parameters.
344
    pub rtt: RoundTripEstimatorParams,
345

            
346
    /// Flow control parameters to use for all streams on this circuit.
347
    pub flow_ctrl: FlowCtrlParameters,
348
}
349

            
350
impl CongestionControlNetParams {
351
    #[cfg(test)]
352
    // These have been copied from C-tor.
353
22
    pub(crate) fn defaults_for_tests() -> Self {
354
22
        Self {
355
22
            fixed_window: FixedWindowParams::defaults_for_tests(),
356
22
            vegas_exit: VegasParams::defaults_for_tests(),
357
22
            cwnd: CongestionWindowParams::defaults_for_tests(),
358
22
            rtt: RoundTripEstimatorParams::defaults_for_tests(),
359
22
            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
360
22
        }
361
22
    }
362
}
363

            
364
/// Network consensus parameters for handling incoming circuits.
365
///
366
/// Unlike `CircParameters`,
367
/// this is unopinionated and contains all relevant consensus parameters,
368
/// which is needed when handling an incoming CREATE* request where the
369
/// circuit origin chooses the type/settings
370
/// (for example congestion control type) of the circuit.
371
#[derive(Debug, Clone)]
372
#[allow(clippy::exhaustive_structs)]
373
pub struct CircNetParameters {
374
    /// Whether we should include ed25519 identities when we send EXTEND2 cells.
375
    pub extend_by_ed25519_id: bool,
376

            
377
    /// Congestion control network parameters.
378
    pub cc: CongestionControlNetParams,
379
}
380

            
381
impl CircNetParameters {
382
    /// Convert the [`CircNetParameters`] into a [`CircParameters`].
383
    ///
384
    /// We expect the circuit creation handshake to know what congestion control algorithm was
385
    /// negotiated, and provide that as `algorithm`.
386
    //
387
    // We disable `unused` warnings at the root of tor-proto,
388
    // but it's nice to have here so we re-enable it.
389
    #[warn(unused)]
390
    fn as_circ_parameters(&self, algorithm: AlgorithmDiscriminants) -> Result<CircParameters, Bug> {
391
        // Unpack everything to make sure that we aren't missing anything
392
        // (otherwise clippy would warn).
393
        let Self {
394
            extend_by_ed25519_id,
395
            cc:
396
                CongestionControlNetParams {
397
                    fixed_window,
398
                    vegas_exit,
399
                    cwnd,
400
                    rtt,
401
                    flow_ctrl,
402
                },
403
        } = self;
404

            
405
        let algorithm = match algorithm {
406
            AlgorithmDiscriminants::FixedWindow => Algorithm::FixedWindow(*fixed_window),
407
            AlgorithmDiscriminants::Vegas => Algorithm::Vegas(*vegas_exit),
408
        };
409

            
410
        // TODO(arti#2442): The builder pattern here seems like a footgun.
411
        let cc = CongestionControlParams::builder()
412
            .alg(algorithm)
413
            .fixed_window_params(*fixed_window)
414
            .cwnd_params(*cwnd)
415
            .rtt_params(rtt.clone())
416
            .build()
417
            .map_err(into_internal!("Could not build `CongestionControlParams`"))?;
418

            
419
        Ok(CircParameters::new(
420
            *extend_by_ed25519_id,
421
            cc,
422
            flow_ctrl.clone(),
423
        ))
424
    }
425
}