1
//! Handler for CREATE* cells.
2

            
3
use crate::FlowCtrlParameters;
4
use crate::ccparams::{
5
    Algorithm, AlgorithmDiscriminants, CongestionControlParams, CongestionWindowParams,
6
    FixedWindowParams, RoundTripEstimatorParams, VegasParams,
7
};
8
use crate::channel::Channel;
9
use crate::circuit::CircuitRxSender;
10
use crate::circuit::UniqId;
11
use crate::circuit::celltypes::{CreateRequest, CreateResponse};
12
use crate::circuit::circhop::{HopNegotiationType, HopSettings};
13
use crate::client::circuit::CircParameters;
14
use crate::client::circuit::padding::PaddingController;
15
use crate::crypto::cell::CryptInit as _;
16
use crate::crypto::cell::RelayLayer as _;
17
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer, tor1};
18
use crate::crypto::handshake::RelayHandshakeError;
19
use crate::crypto::handshake::ServerHandshake as _;
20
use crate::crypto::handshake::fast::CreateFastServer;
21
use crate::memquota::SpecificAccount as _;
22
use crate::memquota::{ChannelAccount, CircuitAccount};
23
use crate::relay::RelayCirc;
24
use crate::relay::channel_provider::ChannelProvider;
25
use crate::relay::reactor::Reactor;
26
use std::sync::{Arc, RwLock, Weak};
27
use tor_cell::chancell::ChanMsg as _;
28
use tor_cell::chancell::CircId;
29
use tor_cell::chancell::msg::{CreateFast, CreatedFast, Destroy, DestroyReason};
30
use tor_error::{Bug, ErrorKind, HasKind, debug_report, internal, into_internal};
31
use tor_linkspec::OwnedChanTarget;
32
use tor_llcrypto::cipher::aes::Aes128Ctr;
33
use tor_llcrypto::d::Sha1;
34
use tor_llcrypto::pk::ed25519::Ed25519Identity;
35
use tor_llcrypto::pk::rsa::RsaIdentity;
36
use tor_memquota::mq_queue::ChannelSpec as _;
37
use tor_memquota::mq_queue::MpscSpec;
38
use tor_relay_crypto::pk::RelayNtorKeys;
39
use tor_rtcompat::SpawnExt as _;
40
use tor_rtcompat::{DynTimeProvider, Runtime};
41
use tracing::warn;
42

            
43
/// Everything needed to handle CREATE* messages on channels.
44
#[derive(derive_more::Debug)]
45
pub struct CreateRequestHandler {
46
    /// Something that can launch channels. Typically the `ChanMgr`.
47
    chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
48
    /// Circuit-related network parameters.
49
    circ_net_params: RwLock<CircNetParameters>,
50
    /// The circuit extension keys.
51
    #[debug(skip)]
52
    ntor_keys: RwLock<RelayNtorKeys>,
53
}
54

            
55
impl CreateRequestHandler {
56
    /// Build a new [`CreateRequestHandler`].
57
22
    pub fn new(
58
22
        chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
59
22
        circ_net_params: CircNetParameters,
60
22
        ntor_keys: RelayNtorKeys,
61
22
    ) -> Self {
62
22
        Self {
63
22
            chan_provider,
64
22
            circ_net_params: RwLock::new(circ_net_params),
65
22
            ntor_keys: RwLock::new(ntor_keys),
66
22
        }
67
22
    }
68

            
69
    /// Update the circuit parameters from a network consensus.
70
    pub fn update_params(&self, circ_net_params: CircNetParameters) {
71
        *self.circ_net_params.write().expect("rwlock poisoned") = circ_net_params;
72
    }
73

            
74
    /// Update the handler with a new set of circuit extension keys.
75
    ///
76
    /// This is called periodically by the relay key rotation task.
77
    pub fn update_ntor_keys(&self, ntor_keys: RelayNtorKeys) {
78
        *self.ntor_keys.write().expect("rwlock poisoned") = ntor_keys;
79
    }
80

            
81
    /// Handle a CREATE* cell.
82
    ///
83
    /// This intentionally does not return a [`crate::Error`] so that we don't accidentally shut
84
    /// down the channel reactor when we really should be returning a DESTROY. Shutting down a
85
    /// channel may cause us to leak information about paths of circuits travelling through this
86
    /// relay. This is especially important here since we're handling data that is controllable from
87
    /// the other end of the circuit.
88
    #[allow(clippy::too_many_arguments)]
89
    pub(crate) fn handle_create<R: Runtime>(
90
        &self,
91
        runtime: &R,
92
        channel: &Arc<Channel>,
93
        our_ed25519_id: &Ed25519Identity,
94
        our_rsa_id: &RsaIdentity,
95
        circ_id: CircId,
96
        msg: &CreateRequest,
97
        memquota: &ChannelAccount,
98
        circ_unique_id: UniqId,
99
    ) -> Result<(CreateResponse, RelayCircComponents), Destroy> {
100
        let result = self.handle_create_inner(
101
            runtime,
102
            channel,
103
            our_ed25519_id,
104
            our_rsa_id,
105
            circ_id,
106
            msg,
107
            memquota,
108
            circ_unique_id,
109
        );
110

            
111
        match result {
112
            Ok(x) => Ok(x),
113
            Err(e) => {
114
                // TODO(relay): The log messages throughout could be very noisy, so should have rate limiting.
115
                let cmd = msg.cmd();
116
                debug_report!(&e, %cmd, "Failed to handle circuit create request");
117

            
118
                // `tor-spec/tearing-down-circuits.md`:
119
                //
120
                // > Implementations SHOULD always use the NONE reason to avoid side channels: [...]
121
                Err(Destroy::new(DestroyReason::NONE))
122
            }
123
        }
124
    }
125

            
126
    /// See [`Self::handle_create`].
127
    #[allow(clippy::too_many_arguments)]
128
    fn handle_create_inner<R: Runtime>(
129
        &self,
130
        runtime: &R,
131
        channel: &Arc<Channel>,
132
        // TODO(relay): Use these for ntor handshakes.
133
        _our_ed25519_id: &Ed25519Identity,
134
        _our_rsa_id: &RsaIdentity,
135
        circ_id: CircId,
136
        msg: &CreateRequest,
137
        memquota: &ChannelAccount,
138
        circ_unique_id: UniqId,
139
    ) -> Result<(CreateResponse, RelayCircComponents), HandleCreateError> {
140
        // Perform the handshake crypto and build the response.
141
        let handshake_components = match msg {
142
            CreateRequest::CreateFast(msg) => self.handle_create_fast(msg)?,
143
            CreateRequest::Create2(_) => {
144
                // TODO(relay): We might want to offload this to a CPU worker in the future.
145
                // TODO(relay): Implement this.
146
                return Err(internal!("Not implemented").into());
147
            }
148
        };
149

            
150
        let memquota = CircuitAccount::new(memquota)?;
151

            
152
        // We use a large mpsc queue here since a circuit should never block the channel,
153
        // and we hope that memquota will help us if an attacker intentionally fills this buffer.
154
        // We use `10_000_000` since `usize::MAX` causes `futures::channel::mpsc` to panic.
155
        // TODO(relay): We should switch to an unbounded queue, but the circuit reactor is expecting
156
        // a bounded queue.
157
        let time_provider = DynTimeProvider::new(runtime.clone());
158
        let account = memquota.as_raw_account();
159
        let (sender, receiver) = MpscSpec::new(10_000_000).new_mq(time_provider, account)?;
160
        let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver);
161

            
162
        // TODO(relay): Do we really want a client padding machine here?
163
        let (padding_ctrl, padding_stream) =
164
            crate::client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
165

            
166
        // Upgrade the channel provider, which in practice is the `ChanMgr` so this should not fail.
167
        let Some(chan_provider) = self.chan_provider.upgrade() else {
168
            return Err(internal!("Unable to upgrade weak `ChannelProvider`").into());
169
        };
170

            
171
        // Build the relay circuit reactor.
172
        let (reactor, circ) = Reactor::new(
173
            runtime.clone(),
174
            channel,
175
            circ_id,
176
            circ_unique_id,
177
            receiver,
178
            handshake_components.crypto_in,
179
            handshake_components.crypto_out,
180
            &handshake_components.hop_settings,
181
            chan_provider,
182
            padding_ctrl.clone(),
183
            padding_stream,
184
            &memquota,
185
        )
186
        .map_err(into_internal!("Failed to start circuit reactor"))?;
187

            
188
        // Start the reactor in a task.
189
        let () = runtime.spawn(async {
190
            match reactor.run().await {
191
                Ok(()) => {}
192
                Err(e) => {
193
                    debug_report!(e, "Relay circuit reactor exited with an error");
194
                }
195
            }
196
        })?;
197

            
198
        Ok((
199
            handshake_components.response,
200
            RelayCircComponents {
201
                circ,
202
                sender,
203
                padding_ctrl,
204
            },
205
        ))
206
    }
207

            
208
    /// The handshake code for a CREATE_FAST request.
209
    fn handle_create_fast(
210
        &self,
211
        msg: &CreateFast,
212
    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
213
        // TODO(relay): We might want to offload this to a CPU worker in the future.
214
        let (keygen, handshake_msg) = CreateFastServer::server(
215
            &mut rand::rng(),
216
            &mut |_: &()| Some(()),
217
            &[()],
218
            msg.handshake(),
219
        )?;
220

            
221
        let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
222
            .map_err(into_internal!("Circuit crypt state construction failed"))?;
223

            
224
        let circ_params = self
225
            .circ_net_params
226
            .read()
227
            .expect("rwlock poisoned")
228
            // CREATE_FAST always uses fixed-window flow control.
229
            .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
230

            
231
        // TODO(relay): I think we might want to get these from the consensus instead?
232
        let protos = tor_protover::Protocols::default();
233

            
234
        // TODO(relay): I'm not sure if this is the right way to do this. It works for
235
        // CREATE_FAST, but we might want to rethink it for CREATE2.
236
        let hop_settings =
237
            HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
238
                .map_err(into_internal!("Unable to build `HopSettings`"))?;
239

            
240
        let response = CreatedFast::new(handshake_msg);
241
        let response = CreateResponse::CreatedFast(response);
242

            
243
        let (crypto_out, crypto_in, _binding) = crypt.split_relay_layer();
244
        let (crypto_out, crypto_in) = (Box::new(crypto_out), Box::new(crypto_in));
245

            
246
        Ok(CompletedHandshakeComponents {
247
            response,
248
            hop_settings,
249
            crypto_out,
250
            crypto_in,
251
        })
252
    }
253
}
254

            
255
/// An error that occurred while handling a CREATE* request.
256
#[derive(Debug, thiserror::Error)]
257
enum HandleCreateError {
258
    /// Circuit relay handshake failed.
259
    #[error("Circuit relay handshake failed")]
260
    Handshake(#[from] RelayHandshakeError),
261
    /// A memquota error.
262
    #[error("Memquota error")]
263
    Memquota(#[from] tor_memquota::Error),
264
    /// Error when spawning a task.
265
    #[error("Runtime task spawn error")]
266
    Spawn(#[from] futures::task::SpawnError),
267
    /// An internal error.
268
    ///
269
    /// Note that other variants (such as `Handshake` containing a [`RelayHandshakeError`])
270
    /// may themselves contain internal errors.
271
    #[error("Internal error")]
272
    Internal(#[from] tor_error::Bug),
273
}
274

            
275
impl HasKind for HandleCreateError {
276
    fn kind(&self) -> ErrorKind {
277
        match self {
278
            Self::Handshake(e) => e.kind(),
279
            Self::Memquota(e) => e.kind(),
280
            Self::Spawn(e) => e.kind(),
281
            Self::Internal(_) => ErrorKind::Internal,
282
        }
283
    }
284
}
285

            
286
/// The components of a completed CREATE* handshake.
287
struct CompletedHandshakeComponents {
288
    /// The message to send in response.
289
    response: CreateResponse,
290
    /// The negotiated hop settings.
291
    hop_settings: HopSettings,
292
    /// Outbound onion crypto.
293
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
294
    /// Inbound onion crypto.
295
    crypto_in: Box<dyn InboundRelayLayer + Send>,
296
}
297

            
298
/// A collection of objects built for a new relay circuit.
299
pub(crate) struct RelayCircComponents {
300
    /// The relay circuit handle.
301
    pub(crate) circ: Arc<RelayCirc>,
302
    /// Used to send data from the channel to the circuit reactor.
303
    pub(crate) sender: CircuitRxSender,
304
    /// The circuit's padding controller.
305
    pub(crate) padding_ctrl: PaddingController,
306
}
307

            
308
/// Congestion control network parameters.
309
#[derive(Debug, Clone)]
310
#[allow(clippy::exhaustive_structs)]
311
pub struct CongestionControlNetParams {
312
    /// Fixed-window algorithm parameters.
313
    pub fixed_window: FixedWindowParams,
314

            
315
    /// Vegas algorithm parameters for exit circuits.
316
    // NOTE: In this module we are handling CREATE* cells,
317
    // which only happens for non-hs circuits.
318
    // So we don't need to store the vegas hs parameters here.
319
    pub vegas_exit: VegasParams,
320

            
321
    /// Congestion window parameters.
322
    pub cwnd: CongestionWindowParams,
323

            
324
    /// RTT calculation parameters.
325
    pub rtt: RoundTripEstimatorParams,
326

            
327
    /// Flow control parameters to use for all streams on this circuit.
328
    pub flow_ctrl: FlowCtrlParameters,
329
}
330

            
331
impl CongestionControlNetParams {
332
    #[cfg(test)]
333
    // These have been copied from C-tor.
334
22
    pub(crate) fn defaults_for_tests() -> Self {
335
22
        Self {
336
22
            fixed_window: FixedWindowParams::defaults_for_tests(),
337
22
            vegas_exit: VegasParams::defaults_for_tests(),
338
22
            cwnd: CongestionWindowParams::defaults_for_tests(),
339
22
            rtt: RoundTripEstimatorParams::defaults_for_tests(),
340
22
            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
341
22
        }
342
22
    }
343
}
344

            
345
/// Network consensus parameters for handling incoming circuits.
346
///
347
/// Unlike `CircParameters`,
348
/// this is unopinionated and contains all relevant consensus parameters,
349
/// which is needed when handling an incoming CREATE* request where the
350
/// circuit origin chooses the type/settings
351
/// (for example congestion control type) of the circuit.
352
#[derive(Debug, Clone)]
353
#[allow(clippy::exhaustive_structs)]
354
pub struct CircNetParameters {
355
    /// Whether we should include ed25519 identities when we send EXTEND2 cells.
356
    pub extend_by_ed25519_id: bool,
357

            
358
    /// Congestion control network parameters.
359
    pub cc: CongestionControlNetParams,
360
}
361

            
362
impl CircNetParameters {
363
    /// Convert the [`CircNetParameters`] into a [`CircParameters`].
364
    ///
365
    /// We expect the circuit creation handshake to know what congestion control algorithm was
366
    /// negotiated, and provide that as `algorithm`.
367
    //
368
    // We disable `unused` warnings at the root of tor-proto,
369
    // but it's nice to have here so we re-enable it.
370
    #[warn(unused)]
371
    fn as_circ_parameters(&self, algorithm: AlgorithmDiscriminants) -> Result<CircParameters, Bug> {
372
        // Unpack everything to make sure that we aren't missing anything
373
        // (otherwise clippy would warn).
374
        let Self {
375
            extend_by_ed25519_id,
376
            cc:
377
                CongestionControlNetParams {
378
                    fixed_window,
379
                    vegas_exit,
380
                    cwnd,
381
                    rtt,
382
                    flow_ctrl,
383
                },
384
        } = self;
385

            
386
        let algorithm = match algorithm {
387
            AlgorithmDiscriminants::FixedWindow => Algorithm::FixedWindow(*fixed_window),
388
            AlgorithmDiscriminants::Vegas => Algorithm::Vegas(*vegas_exit),
389
        };
390

            
391
        // TODO(arti#2442): The builder pattern here seems like a footgun.
392
        let cc = CongestionControlParams::builder()
393
            .alg(algorithm)
394
            .fixed_window_params(*fixed_window)
395
            .cwnd_params(*cwnd)
396
            .rtt_params(rtt.clone())
397
            .build()
398
            .map_err(into_internal!("Could not build `CongestionControlParams`"))?;
399

            
400
        Ok(CircParameters::new(
401
            *extend_by_ed25519_id,
402
            cc,
403
            flow_ctrl.clone(),
404
        ))
405
    }
406
}