1
//! Handler for CREATE* cells.
2

            
3
use crate::FlowCtrlParameters;
4
use crate::ccparams::{
5
    Algorithm, AlgorithmDiscriminants, CongestionControlParams, CongestionWindowParams,
6
    FixedWindowParams, RoundTripEstimatorParams, VegasParams,
7
};
8
use crate::channel::Channel;
9
use crate::circuit::CircuitRxSender;
10
use crate::circuit::UniqId;
11
use crate::circuit::celltypes::{CreateRequest, CreateResponse};
12
use crate::circuit::circhop::{HopNegotiationType, HopSettings};
13
use crate::client::circuit::CircParameters;
14
use crate::client::circuit::padding::PaddingController;
15
use crate::crypto::cell::CryptInit as _;
16
use crate::crypto::cell::RelayLayer as _;
17
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer, tor1};
18
use crate::crypto::handshake::RelayHandshakeError;
19
use crate::crypto::handshake::ServerHandshake as _;
20
use crate::crypto::handshake::fast::CreateFastServer;
21
use crate::memquota::SpecificAccount as _;
22
use crate::memquota::{ChannelAccount, CircuitAccount};
23
use crate::relay::RelayCirc;
24
use crate::relay::channel_provider::ChannelProvider;
25
use crate::relay::reactor::Reactor;
26
use std::sync::{Arc, RwLock, Weak};
27
use tor_cell::chancell::ChanMsg as _;
28
use tor_cell::chancell::CircId;
29
use tor_cell::chancell::msg::{CreateFast, CreatedFast, Destroy, DestroyReason};
30
use tor_error::{Bug, ErrorKind, HasKind, debug_report, internal, into_internal};
31
use tor_linkspec::OwnedChanTarget;
32
use tor_llcrypto::cipher::aes::Aes128Ctr;
33
use tor_llcrypto::d::Sha1;
34
use tor_llcrypto::pk::ed25519::Ed25519Identity;
35
use tor_llcrypto::pk::rsa::RsaIdentity;
36
use tor_memquota::mq_queue::ChannelSpec as _;
37
use tor_memquota::mq_queue::MpscSpec;
38
use tor_relay_crypto::pk::RelayNtorKeys;
39
use tor_rtcompat::SpawnExt as _;
40
use tor_rtcompat::{DynTimeProvider, Runtime};
41
use tracing::warn;
42

            
43
/// Everything needed to handle CREATE* messages on channels.
44
#[derive(derive_more::Debug)]
45
pub struct CreateRequestHandler {
46
    /// Something that can launch channels. Typically the `ChanMgr`.
47
    chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
48
    /// Circuit-related network parameters.
49
    circ_net_params: RwLock<CircNetParameters>,
50
    /// The circuit extension keys.
51
    #[debug(skip)]
52
    ntor_keys: RwLock<RelayNtorKeys>,
53
}
54

            
55
impl CreateRequestHandler {
56
    /// Build a new [`CreateRequestHandler`].
57
22
    pub fn new(
58
22
        chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
59
22
        circ_net_params: CircNetParameters,
60
22
        ntor_keys: RelayNtorKeys,
61
22
    ) -> Self {
62
22
        Self {
63
22
            chan_provider,
64
22
            circ_net_params: RwLock::new(circ_net_params),
65
22
            ntor_keys: RwLock::new(ntor_keys),
66
22
        }
67
22
    }
68

            
69
    /// Update the circuit parameters from a network consensus.
70
    pub fn update_params(&self, circ_net_params: CircNetParameters) {
71
        *self.circ_net_params.write().expect("rwlock poisoned") = circ_net_params;
72
    }
73

            
74
    /// Update the handler with a new set of circuit extension keys.
75
    ///
76
    /// This is called periodically by the relay key rotation task.
77
    pub fn update_ntor_keys(&self, ntor_keys: RelayNtorKeys) {
78
        *self.ntor_keys.write().expect("rwlock poisoned") = ntor_keys;
79
    }
80

            
81
    /// Handle a CREATE* cell.
82
    ///
83
    /// This intentionally does not return a [`crate::Error`] so that we don't accidentally shut
84
    /// down the channel reactor when we really should be returning a DESTROY. Shutting down a
85
    /// channel may cause us to leak information about paths of circuits travelling through this
86
    /// relay. This is especially important here since we're handling data that is controllable from
87
    /// the other end of the circuit.
88
    #[allow(clippy::too_many_arguments)]
89
    pub(crate) fn handle_create<R: Runtime>(
90
        &self,
91
        runtime: &R,
92
        channel: &Arc<Channel>,
93
        our_ed25519_id: &Ed25519Identity,
94
        our_rsa_id: &RsaIdentity,
95
        circ_id: CircId,
96
        msg: &CreateRequest,
97
        memquota: &ChannelAccount,
98
        circ_unique_id: UniqId,
99
    ) -> Result<(CreateResponse, RelayCircComponents), Destroy> {
100
        let result = self.handle_create_inner(
101
            runtime,
102
            channel,
103
            our_ed25519_id,
104
            our_rsa_id,
105
            circ_id,
106
            msg,
107
            memquota,
108
            circ_unique_id,
109
        );
110

            
111
        match result {
112
            Ok(x) => Ok(x),
113
            Err(e) => {
114
                // TODO(relay): The log messages throughout could be very noisy, so should have rate limiting.
115
                let cmd = msg.cmd();
116
                debug_report!(&e, %cmd, "Failed to handle circuit create request");
117
                Err(Destroy::new(e.destroy_reason()))
118
            }
119
        }
120
    }
121

            
122
    /// See [`Self::handle_create`].
123
    #[allow(clippy::too_many_arguments)]
124
    fn handle_create_inner<R: Runtime>(
125
        &self,
126
        runtime: &R,
127
        channel: &Arc<Channel>,
128
        // TODO(relay): Use these for ntor handshakes.
129
        _our_ed25519_id: &Ed25519Identity,
130
        _our_rsa_id: &RsaIdentity,
131
        circ_id: CircId,
132
        msg: &CreateRequest,
133
        memquota: &ChannelAccount,
134
        circ_unique_id: UniqId,
135
    ) -> Result<(CreateResponse, RelayCircComponents), HandleCreateError> {
136
        // Perform the handshake crypto and build the response.
137
        let handshake_components = match msg {
138
            CreateRequest::CreateFast(msg) => self.handle_create_fast(msg)?,
139
            CreateRequest::Create2(_) => {
140
                // TODO(relay): We might want to offload this to a CPU worker in the future.
141
                // TODO(relay): Implement this.
142
                return Err(internal!("Not implemented").into());
143
            }
144
        };
145

            
146
        let memquota = CircuitAccount::new(memquota)?;
147

            
148
        // We use a large mpsc queue here since a circuit should never block the channel,
149
        // and we hope that memquota will help us if an attacker intentionally fills this buffer.
150
        // We use `10_000_000` since `usize::MAX` causes `futures::channel::mpsc` to panic.
151
        // TODO(relay): We should switch to an unbounded queue, but the circuit reactor is expecting
152
        // a bounded queue.
153
        let time_provider = DynTimeProvider::new(runtime.clone());
154
        let account = memquota.as_raw_account();
155
        let (sender, receiver) = MpscSpec::new(10_000_000).new_mq(time_provider, account)?;
156

            
157
        // TODO(relay): Do we really want a client padding machine here?
158
        let (padding_ctrl, padding_stream) =
159
            crate::client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
160

            
161
        // Upgrade the channel provider, which in practice is the `ChanMgr` so this should not fail.
162
        let Some(chan_provider) = self.chan_provider.upgrade() else {
163
            return Err(internal!("Unable to upgrade weak `ChannelProvider`").into());
164
        };
165

            
166
        // Build the relay circuit reactor.
167
        let (reactor, circ) = Reactor::new(
168
            runtime.clone(),
169
            channel,
170
            circ_id,
171
            circ_unique_id,
172
            receiver,
173
            handshake_components.crypto_in,
174
            handshake_components.crypto_out,
175
            &handshake_components.hop_settings,
176
            chan_provider,
177
            padding_ctrl.clone(),
178
            padding_stream,
179
            &memquota,
180
        )
181
        .map_err(into_internal!("Failed to start circuit reactor"))?;
182

            
183
        // Start the reactor in a task.
184
        let () = runtime.spawn(async {
185
            match reactor.run().await {
186
                Ok(()) => {}
187
                Err(e) => {
188
                    debug_report!(e, "Relay circuit reactor exited with an error");
189
                }
190
            }
191
        })?;
192

            
193
        Ok((
194
            handshake_components.response,
195
            RelayCircComponents {
196
                circ,
197
                sender,
198
                padding_ctrl,
199
            },
200
        ))
201
    }
202

            
203
    /// The handshake code for a CREATE_FAST request.
204
    fn handle_create_fast(
205
        &self,
206
        msg: &CreateFast,
207
    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
208
        // TODO(relay): We might want to offload this to a CPU worker in the future.
209
        let (keygen, handshake_msg) = CreateFastServer::server(
210
            &mut rand::rng(),
211
            &mut |_: &()| Some(()),
212
            &[()],
213
            msg.handshake(),
214
        )?;
215

            
216
        let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
217
            .map_err(into_internal!("Circuit crypt state construction failed"))?;
218

            
219
        let circ_params = self
220
            .circ_net_params
221
            .read()
222
            .expect("rwlock poisoned")
223
            // CREATE_FAST always uses fixed-window flow control.
224
            .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
225

            
226
        // TODO(relay): I think we might want to get these from the consensus instead?
227
        let protos = tor_protover::Protocols::default();
228

            
229
        // TODO(relay): I'm not sure if this is the right way to do this. It works for
230
        // CREATE_FAST, but we might want to rethink it for CREATE2.
231
        let hop_settings =
232
            HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
233
                .map_err(into_internal!("Unable to build `HopSettings`"))?;
234

            
235
        let response = CreatedFast::new(handshake_msg);
236
        let response = CreateResponse::CreatedFast(response);
237

            
238
        let (crypto_out, crypto_in, _binding) = crypt.split_relay_layer();
239
        let (crypto_out, crypto_in) = (Box::new(crypto_out), Box::new(crypto_in));
240

            
241
        Ok(CompletedHandshakeComponents {
242
            response,
243
            hop_settings,
244
            crypto_out,
245
            crypto_in,
246
        })
247
    }
248
}
249

            
250
/// An error that occurred while handling a CREATE* request.
251
#[derive(Debug, thiserror::Error)]
252
enum HandleCreateError {
253
    /// Circuit relay handshake failed.
254
    #[error("Circuit relay handshake failed")]
255
    Handshake(#[from] RelayHandshakeError),
256
    /// A memquota error.
257
    #[error("Memquota error")]
258
    Memquota(#[from] tor_memquota::Error),
259
    /// Error when spawning a task.
260
    #[error("Runtime task spawn error")]
261
    Spawn(#[from] futures::task::SpawnError),
262
    /// An internal error.
263
    ///
264
    /// Note that other variants (such as `Handshake` containing a [`RelayHandshakeError`])
265
    /// may themselves contain internal errors.
266
    #[error("Internal error")]
267
    Internal(#[from] tor_error::Bug),
268
}
269

            
270
impl HandleCreateError {
271
    /// The reason to use in a DESTROY message for this failure.
272
    fn destroy_reason(&self) -> DestroyReason {
273
        // Note that this may return an INTERNAL destroy reason even when
274
        // the inner error is not `ErrorKind::Internal`.
275
        match self {
276
            Self::Handshake(e) => e.destroy_reason(),
277
            Self::Memquota(_) => DestroyReason::INTERNAL,
278
            Self::Spawn(_) => DestroyReason::INTERNAL,
279
            Self::Internal(_) => DestroyReason::INTERNAL,
280
        }
281
    }
282
}
283

            
284
impl HasKind for HandleCreateError {
285
    fn kind(&self) -> ErrorKind {
286
        match self {
287
            Self::Handshake(e) => e.kind(),
288
            Self::Memquota(e) => e.kind(),
289
            Self::Spawn(e) => e.kind(),
290
            Self::Internal(_) => ErrorKind::Internal,
291
        }
292
    }
293
}
294

            
295
/// The components of a completed CREATE* handshake.
296
struct CompletedHandshakeComponents {
297
    /// The message to send in response.
298
    response: CreateResponse,
299
    /// The negotiated hop settings.
300
    hop_settings: HopSettings,
301
    /// Outbound onion crypto.
302
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
303
    /// Inbound onion crypto.
304
    crypto_in: Box<dyn InboundRelayLayer + Send>,
305
}
306

            
307
/// A collection of objects built for a new relay circuit.
308
pub(crate) struct RelayCircComponents {
309
    /// The relay circuit handle.
310
    pub(crate) circ: Arc<RelayCirc>,
311
    /// Used to send data from the channel to the circuit reactor.
312
    pub(crate) sender: CircuitRxSender,
313
    /// The circuit's padding controller.
314
    pub(crate) padding_ctrl: PaddingController,
315
}
316

            
317
/// Congestion control network parameters.
318
#[derive(Debug, Clone)]
319
#[allow(clippy::exhaustive_structs)]
320
pub struct CongestionControlNetParams {
321
    /// Fixed-window algorithm parameters.
322
    pub fixed_window: FixedWindowParams,
323

            
324
    /// Vegas algorithm parameters for exit circuits.
325
    // NOTE: In this module we are handling CREATE* cells,
326
    // which only happens for non-hs circuits.
327
    // So we don't need to store the vegas hs parameters here.
328
    pub vegas_exit: VegasParams,
329

            
330
    /// Congestion window parameters.
331
    pub cwnd: CongestionWindowParams,
332

            
333
    /// RTT calculation parameters.
334
    pub rtt: RoundTripEstimatorParams,
335

            
336
    /// Flow control parameters to use for all streams on this circuit.
337
    pub flow_ctrl: FlowCtrlParameters,
338
}
339

            
340
impl CongestionControlNetParams {
341
    #[cfg(test)]
342
    // These have been copied from C-tor.
343
22
    pub(crate) fn defaults_for_tests() -> Self {
344
22
        Self {
345
22
            fixed_window: FixedWindowParams::defaults_for_tests(),
346
22
            vegas_exit: VegasParams::defaults_for_tests(),
347
22
            cwnd: CongestionWindowParams::defaults_for_tests(),
348
22
            rtt: RoundTripEstimatorParams::defaults_for_tests(),
349
22
            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
350
22
        }
351
22
    }
352
}
353

            
354
/// Network consensus parameters for handling incoming circuits.
355
///
356
/// Unlike `CircParameters`,
357
/// this is unopinionated and contains all relevant consensus parameters,
358
/// which is needed when handling an incoming CREATE* request where the
359
/// circuit origin chooses the type/settings
360
/// (for example congestion control type) of the circuit.
361
#[derive(Debug, Clone)]
362
#[allow(clippy::exhaustive_structs)]
363
pub struct CircNetParameters {
364
    /// Whether we should include ed25519 identities when we send EXTEND2 cells.
365
    pub extend_by_ed25519_id: bool,
366

            
367
    /// Congestion control network parameters.
368
    pub cc: CongestionControlNetParams,
369
}
370

            
371
impl CircNetParameters {
372
    /// Convert the [`CircNetParameters`] into a [`CircParameters`].
373
    ///
374
    /// We expect the circuit creation handshake to know what congestion control algorithm was
375
    /// negotiated, and provide that as `algorithm`.
376
    //
377
    // We disable `unused` warnings at the root of tor-proto,
378
    // but it's nice to have here so we re-enable it.
379
    #[warn(unused)]
380
    fn as_circ_parameters(&self, algorithm: AlgorithmDiscriminants) -> Result<CircParameters, Bug> {
381
        // Unpack everything to make sure that we aren't missing anything
382
        // (otherwise clippy would warn).
383
        let Self {
384
            extend_by_ed25519_id,
385
            cc:
386
                CongestionControlNetParams {
387
                    fixed_window,
388
                    vegas_exit,
389
                    cwnd,
390
                    rtt,
391
                    flow_ctrl,
392
                },
393
        } = self;
394

            
395
        let algorithm = match algorithm {
396
            AlgorithmDiscriminants::FixedWindow => Algorithm::FixedWindow(*fixed_window),
397
            AlgorithmDiscriminants::Vegas => Algorithm::Vegas(*vegas_exit),
398
        };
399

            
400
        // TODO(arti#2442): The builder pattern here seems like a footgun.
401
        let cc = CongestionControlParams::builder()
402
            .alg(algorithm)
403
            .fixed_window_params(*fixed_window)
404
            .cwnd_params(*cwnd)
405
            .rtt_params(rtt.clone())
406
            .build()
407
            .map_err(into_internal!("Could not build `CongestionControlParams`"))?;
408

            
409
        Ok(CircParameters::new(
410
            *extend_by_ed25519_id,
411
            cc,
412
            flow_ctrl.clone(),
413
        ))
414
    }
415
}