1
//! A relay's view of the forward (away from the client, towards the exit) state of a circuit.
2

            
3
use crate::channel::{Channel, ChannelSender};
4
use crate::circuit::CircuitRxReceiver;
5
use crate::circuit::UniqId;
6
use crate::circuit::create::{Create2Wrap, CreateHandshakeWrap};
7
use crate::circuit::reactor::ControlHandler;
8
use crate::circuit::reactor::backward::BackwardReactorCmd;
9
use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
10
use crate::circuit::reactor::hop_mgr::HopMgr;
11
use crate::crypto::cell::OutboundRelayLayer;
12
use crate::crypto::cell::RelayCellBody;
13
use crate::relay::RelayCircChanMsg;
14
use crate::util::err::ReactorError;
15
use crate::{Error, HopNum, Result};
16

            
17
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
18
use crate::client::circuit::padding::QueuedCellPaddingInfo;
19

            
20
use crate::relay::channel_provider::{ChannelProvider, ChannelResult, OutboundChanSender};
21
use crate::relay::reactor::CircuitAccount;
22
use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
23
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
24
use tor_cell::relaycell::msg::{Extend2, Extended2, SendmeTag};
25
use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
26
use tor_error::{internal, into_internal, warn_report};
27
use tor_linkspec::decode::Strictness;
28
use tor_linkspec::{OwnedChanTarget, OwnedChanTargetBuilder};
29
use tor_rtcompat::{Runtime, SpawnExt as _};
30

            
31
use futures::channel::mpsc;
32
use futures::{SinkExt as _, StreamExt as _, future};
33
use tracing::{debug, trace};
34

            
35
use std::result::Result as StdResult;
36
use std::sync::Arc;
37
use std::task::Poll;
38

            
39
/// Placeholder for our custom control message type.
40
type CtrlMsg = ();
41

            
42
/// Placeholder for our custom control command type.
43
type CtrlCmd = ();
44

            
45
/// The maximum number of RELAY_EARLY cells allowed on a circuit.
46
///
47
// TODO(relay): should we come up with a consensus parameter for this? (arti#2349)
48
const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;
49

            
50
/// Relay-specific state for the forward reactor.
51
pub(crate) struct Forward {
52
    /// An identifier for logging about this reactor's circuit.
53
    unique_id: UniqId,
54
    /// The outbound view of this circuit, if we are not the last hop.
55
    ///
56
    /// Delivers cells towards the exit.
57
    ///
58
    /// Only set for middle relays.
59
    outbound: Option<Outbound>,
60
    /// The cryptographic state for this circuit for inbound cells.
61
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
62
    /// A handle to a [`ChannelProvider`], used for initiating outgoing Tor channels.
63
    ///
64
    /// Note: all circuit reactors of a relay need to be initialized
65
    /// with the *same* underlying Tor channel provider (`ChanMgr`),
66
    /// to enable the reuse of existing Tor channels where possible.
67
    chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
68
    /// Whether we have received an EXTEND2 on this circuit.
69
    ///
70
    // TODO(relay): bools can be finicky.
71
    // Maybe we should combine this bool and the optional
72
    // outbound into a new state machine type
73
    // (with states Initial -> Extending -> Extended(Outbound))?
74
    // But should not do this if it turns out more convoluted than the bool-based approach.
75
    have_seen_extend2: bool,
76
    /// The number of RELAY_EARLY cells we have seen so far on this circuit.
77
    ///
78
    /// If we see more than [`MAX_RELAY_EARLY_CELLS_PER_CIRCUIT`] RELAY_EARLY cells, we tear down the circuit.
79
    relay_early_count: usize,
80
    /// A stream of events to be read from the main loop of the reactor.
81
    event_tx: mpsc::Sender<CircEvent>,
82
    /// Memory quota account
83
    memquota: CircuitAccount,
84
}
85

            
86
/// A type of event issued by the relay forward reactor.
87
pub(crate) enum CircEvent {
88
    /// The outcome of an EXTEND2 request.
89
    ExtendResult(StdResult<ExtendResult, ReactorError>),
90
}
91

            
92
/// A successful circuit extension result.
93
pub(crate) struct ExtendResult {
94
    /// The EXTENDED2 cell to send back to the client.
95
    extended2: Extended2,
96
    /// The outbound channel.
97
    outbound: Outbound,
98
    /// The reading end of the outbound Tor channel, if we are not the last hop.
99
    ///
100
    /// Yields cells moving from the exit towards the client, if we are a middle relay.
101
    outbound_chan_rx: CircuitRxReceiver,
102
}
103

            
104
/// The outbound view of a relay circuit.
105
struct Outbound {
106
    /// The circuit identifier on the outbound Tor channel.
107
    circ_id: CircId,
108
    /// The outbound Tor channel.
109
    channel: Arc<Channel>,
110
    /// The sending end of the outbound Tor channel.
111
    outbound_chan_tx: ChannelSender,
112
}
113

            
114
/// The outcome of `decode_relay_cell`.
115
enum CellDecodeResult {
116
    /// A decrypted cell.
117
    Recognized(SendmeTag, RelayCellDecoderResult),
118
    /// A cell we could not decrypt.
119
    Unrecognizd(RelayCellBody),
120
}
121

            
122
impl Forward {
123
    /// Create a new [`Forward`].
124
16
    pub(crate) fn new(
125
16
        unique_id: UniqId,
126
16
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
127
16
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
128
16
        event_tx: mpsc::Sender<CircEvent>,
129
16
        memquota: CircuitAccount,
130
16
    ) -> Self {
131
16
        Self {
132
16
            unique_id,
133
16
            // Initially, we are the last hop in the circuit.
134
16
            outbound: None,
135
16
            crypto_out,
136
16
            chan_provider,
137
16
            have_seen_extend2: false,
138
16
            relay_early_count: 0,
139
16
            event_tx,
140
16
            memquota,
141
16
        }
142
16
    }
143

            
144
    /// Decode `cell`, returning its corresponding hop number, tag and decoded body.
145
24
    fn decode_relay_cell<R: Runtime>(
146
24
        &mut self,
147
24
        hop_mgr: &mut HopMgr<R>,
148
24
        cell: Relay,
149
24
    ) -> Result<(Option<HopNum>, CellDecodeResult)> {
150
        // Note: the client reactor will return the actual source hopnum
151
24
        let hopnum = None;
152
24
        let cmd = cell.cmd();
153
24
        let mut body = cell.into_relay_body().into();
154
24
        let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
155
12
            return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
156
        };
157

            
158
        // The message is addressed to us! Now it's time to handle it...
159
12
        let mut hops = hop_mgr.hops().write().expect("poisoned lock");
160
12
        let decode_res = hops
161
12
            .get_mut(hopnum)
162
12
            .ok_or_else(|| internal!("msg from non-existent hop???"))?
163
            .inbound
164
12
            .decode(body.into())?;
165

            
166
12
        Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
167
24
    }
168

            
169
    /// Handle a DROP message.
170
    #[allow(clippy::unnecessary_wraps)] // Returns Err if circ-padding is enabled
171
    fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
172
        cfg_if::cfg_if! {
173
            if #[cfg(feature = "circ-padding")] {
174
                Err(internal!("relay circuit padding not yet supported").into())
175
            } else {
176
                Ok(())
177
            }
178
        }
179
    }
180

            
181
    /// Handle an EXTEND2 cell.
182
    ///
183
    /// This spawns a background task for dealing with the circuit extension,
184
    /// which then reports back the result via the [`Self::event_tx`] MPSC stream.
185
    /// Note that this MPSC stream is polled from the `ForwardReactor` main loop,
186
    /// and each `CircEvent` is passed back to [`Self::handle_event()`[ for handling.
187
8
    fn handle_extend2<R: Runtime>(
188
8
        &mut self,
189
8
        runtime: &R,
190
8
        early: bool,
191
8
        msg: UnparsedRelayMsg,
192
8
    ) -> StdResult<(), ReactorError> {
193
        // TODO(relay): this should be allowed if the AllowNonearlyExtend consensus
194
        // param is set (arti#2349)
195
8
        if !early {
196
4
            return Err(Error::CircProto("got EXTEND2 in a RELAY cell?!".into()).into());
197
4
        }
198

            
199
        // Check if we're in the right state before parsing the EXTEND2
200
4
        if self.have_seen_extend2 {
201
            return Err(Error::CircProto("got 2 EXTEND2 on the same circuit?!".into()).into());
202
4
        }
203

            
204
4
        self.have_seen_extend2 = true;
205

            
206
4
        let to_bytes_err = |e| Error::from_bytes_err(e, "EXTEND2 message");
207

            
208
4
        let extend2 = msg.decode::<Extend2>().map_err(to_bytes_err)?.into_msg();
209

            
210
4
        let chan_target = OwnedChanTargetBuilder::from_encoded_linkspecs(
211
4
            Strictness::Standard,
212
4
            extend2.linkspecs(),
213
        )
214
4
        .map_err(|err| Error::LinkspecDecodeErr {
215
            object: "EXTEND2",
216
            err,
217
        })?
218
4
        .build()
219
4
        .map_err(|_| {
220
            // TODO: should we include the error in the circ proto error context?
221
            Error::CircProto("Invalid channel target".into())
222
        })?;
223

            
224
        // Note: we don't do any further validation on the EXTEND2 here,
225
        // under the assumption it will be handled by the ChannelProvider.
226

            
227
4
        let (chan_tx, chan_rx) = mpsc::unbounded();
228

            
229
4
        let chan_tx = OutboundChanSender(chan_tx);
230
4
        Arc::clone(&self.chan_provider).get_or_launch(self.unique_id, chan_target, chan_tx)?;
231

            
232
4
        let mut result_tx = self.event_tx.clone();
233
4
        let rt = runtime.clone();
234
4
        let unique_id = self.unique_id;
235
4
        let memquota = self.memquota.clone();
236

            
237
        // TODO(relay): because we dispatch this the entire EXTEND2 handling to a background task,
238
        // we don't really need the channel provider to send us the outcome via an MPSC channel,
239
        // because get_or_launch() could simply be async (it wouldn't block the reactor,
240
        // because it runs in another task). Maybe we need to rethink the ChannelProvider API?
241
4
        runtime
242
4
            .spawn(async move {
243
4
                let res = Self::extend_circuit(rt, unique_id, extend2, chan_rx, memquota).await;
244

            
245
                // Discard the error if the reactor shut down before we had
246
                // a chance to complete the extend handshake
247
4
                let _ = result_tx.send(CircEvent::ExtendResult(res)).await;
248
4
            })
249
4
            .map_err(into_internal!("failed to spawn extend task?!"))?;
250

            
251
4
        Ok(())
252
8
    }
253

            
254
    /// Handle the outcome of handling an EXTEND2.
255
4
    fn handle_extend_result(
256
4
        &mut self,
257
4
        res: StdResult<ExtendResult, ReactorError>,
258
4
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
259
        let ExtendResult {
260
4
            extended2,
261
4
            outbound,
262
4
            outbound_chan_rx,
263
4
        } = res?;
264

            
265
4
        self.outbound = Some(outbound);
266

            
267
4
        Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
268
4
            hop: None,
269
4
            extended2,
270
4
            outbound_chan_rx,
271
4
        }))
272
4
    }
273

            
274
    /// Extend this circuit on the channel received on `chan_rx`.
275
    ///
276
    /// Note: this gets spawned in a background task from
277
    /// [`Self::handle_extend2`] so as not to block the reactor main loop.
278
    ///
279
    #[allow(unused_variables)] // will become used once we implement CREATED2 timeouts
280
4
    async fn extend_circuit<R: Runtime>(
281
4
        _runtime: R,
282
4
        unique_id: UniqId,
283
4
        extend2: Extend2,
284
4
        mut chan_rx: mpsc::UnboundedReceiver<ChannelResult>,
285
4
        memquota: CircuitAccount,
286
4
    ) -> StdResult<ExtendResult, ReactorError> {
287
        // We expect the channel build timeout to be enforced by the ChannelProvider
288
4
        let chan_res = chan_rx
289
4
            .next()
290
4
            .await
291
4
            .ok_or_else(|| internal!("channel provider task exited"))?;
292

            
293
4
        let channel = match chan_res {
294
4
            Ok(c) => c,
295
            Err(e) => {
296
                warn_report!(e, "Failed to launch outgoing channel");
297
                // Note: retries are handled within
298
                // get_or_launch(), so if we receive an
299
                // error at this point, we need to bail
300
                return Err(ReactorError::Shutdown);
301
            }
302
        };
303

            
304
4
        debug!(
305
            circ_id = %unique_id,
306
            "Launched channel to the next hop"
307
        );
308

            
309
        // Now that we finally have a forward Tor channel,
310
        // it's time to forward the onion skin and extend the circuit...
311
        //
312
        // Note: the only reason we need to await here is because internally
313
        // new_outbound_circ() sends a control message to the channel reactor handles,
314
        // which is handled asynchronously. In practice, we're not actually waiting on
315
        // the network here, so in theory we shouldn't need a timeout for this operation.
316
4
        let (circ_id, outbound_chan_rx, createdreceiver) =
317
4
            channel.new_outbound_circ(memquota).await?;
318

            
319
        // We have allocated a circuit in the channel's circmap,
320
        // now it's time to send the CREATE2 and wait for the response.
321
4
        let create2_wrap = Create2Wrap {
322
4
            handshake_type: extend2.handshake_type(),
323
4
        };
324
4
        let create2 = create2_wrap.to_chanmsg(extend2.handshake().into());
325

            
326
        // Time to write the CREATE2 to the outbound channel...
327
4
        let mut outbound_chan_tx = channel.sender();
328
4
        let cell = AnyChanCell::new(Some(circ_id), create2);
329

            
330
4
        trace!(
331
            circ_id = %unique_id,
332
            "Sending CREATE2 to the next hop"
333
        );
334

            
335
4
        outbound_chan_tx.send((cell, None)).await?;
336

            
337
        // TODO(relay): we need a timeout here, otherwise we might end up waiting forever
338
        // for the CREATED2 to arrive.
339
        //
340
        // There is some complexity here, see
341
        // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3648#note_3340125
342
4
        let response = createdreceiver
343
4
            .await
344
4
            .map_err(|_| internal!("channel disappeared?"))?;
345

            
346
4
        trace!(
347
            circ_id = %unique_id,
348
            "Got CREATED2 response from next hop"
349
        );
350

            
351
4
        let outbound = Outbound {
352
4
            circ_id,
353
4
            channel: Arc::clone(&channel),
354
4
            outbound_chan_tx,
355
4
        };
356

            
357
        // If we reach this point, it means we have extended
358
        // the circuit by one hop, so we need to take the contents
359
        // of the CREATE/CREATED2 cell, and package an EXTEND/EXTENDED2
360
        // to send back to the client.
361
4
        let created2_body = create2_wrap.decode_chanmsg(response)?;
362
4
        let extended2 = Extended2::new(created2_body);
363

            
364
4
        Ok(ExtendResult {
365
4
            extended2,
366
4
            outbound,
367
4
            outbound_chan_rx,
368
4
        })
369
4
    }
370

            
371
    /// Handle a RELAY or RELAY_EARLY cell.
372
24
    fn handle_relay_cell<R: Runtime>(
373
24
        &mut self,
374
24
        hop_mgr: &mut HopMgr<R>,
375
24
        cell: Relay,
376
24
        early: bool,
377
24
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
378
24
        if early {
379
12
            self.relay_early_count += 1;
380

            
381
12
            if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
382
                return Err(
383
                    Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
384
                );
385
12
            }
386
12
        }
387

            
388
24
        let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
389
24
        let (tag, decode_res) = match res {
390
12
            CellDecodeResult::Unrecognizd(body) => {
391
12
                self.handle_unrecognized_cell(body, None, early)?;
392
8
                return Ok(None);
393
            }
394
12
            CellDecodeResult::Recognized(tag, res) => (tag, res),
395
        };
396

            
397
12
        Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
398
12
            cell: decode_res,
399
12
            early,
400
12
            hopnum,
401
12
            tag,
402
12
        }))
403
24
    }
404

            
405
    /// Handle a forward cell that we could not decrypt.
406
12
    fn handle_unrecognized_cell(
407
12
        &mut self,
408
12
        body: RelayCellBody,
409
12
        info: Option<QueuedCellPaddingInfo>,
410
12
        early: bool,
411
12
    ) -> StdResult<(), ReactorError> {
412
        // TODO(relay): remove this log once we add some tests
413
        // and confirm relaying cells works as expected
414
        // (in practice it will be too noisy to be useful, even at trace level).
415
12
        trace!(
416
            circ_id = %self.unique_id,
417
            "Forwarding unrecognized cell"
418
        );
419

            
420
12
        let Some(chan) = self.outbound.as_mut() else {
421
            // The client shouldn't try to send us any cells before it gets
422
            // an EXTENDED2 cell from us
423
4
            return Err(Error::CircProto(
424
4
                "Asked to forward cell before the circuit was extended?!".into(),
425
4
            )
426
4
            .into());
427
        };
428

            
429
8
        let msg = Relay::from(BoxedCellBody::from(body));
430
8
        let relay = if early {
431
4
            AnyChanMsg::RelayEarly(msg.into())
432
        } else {
433
4
            AnyChanMsg::Relay(msg)
434
        };
435
8
        let cell = AnyChanCell::new(Some(chan.circ_id), relay);
436

            
437
        // Note: this future is always `Ready`, because we checked the sink for readiness
438
        // before polling the input channel, so await won't block.
439
8
        chan.outbound_chan_tx.start_send_unpin((cell, info))?;
440

            
441
8
        Ok(())
442
12
    }
443

            
444
    /// Handle a TRUNCATE cell.
445
    #[allow(clippy::unused_async)] // TODO(relay)
446
    async fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
447
        // TODO(relay): when we implement this, we should try to do better than C Tor:
448
        // if we have some cells queued for the next hop in the circuit,
449
        // we should try to flush them *before* tearing it down.
450
        //
451
        // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3487#note_3296035
452
        Err(internal!("TRUNCATE is not implemented").into())
453
    }
454

            
455
    /// Handle a DESTROY cell originating from the client.
456
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
457
    fn handle_destroy_cell(&mut self, _cell: Destroy) -> StdResult<(), ReactorError> {
458
        Err(internal!("DESTROY is not implemented").into())
459
    }
460

            
461
    /// Handle a PADDING_NEGOTIATE cell originating from the client.
462
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
463
    fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
464
        Err(internal!("PADDING_NEGOTIATE is not implemented").into())
465
    }
466
}
467

            
468
impl ForwardHandler for Forward {
469
    type BuildSpec = OwnedChanTarget;
470
    type CircChanMsg = RelayCircChanMsg;
471
    type CircEvent = CircEvent;
472

            
473
8
    async fn handle_meta_msg<R: Runtime>(
474
8
        &mut self,
475
8
        runtime: &R,
476
8
        early: bool,
477
8
        _hopnum: Option<HopNum>,
478
8
        msg: UnparsedRelayMsg,
479
8
        _relay_cell_format: RelayCellFormat,
480
8
    ) -> StdResult<(), ReactorError> {
481
8
        match msg.cmd() {
482
            RelayCmd::DROP => self.handle_drop(),
483
8
            RelayCmd::EXTEND2 => self.handle_extend2(runtime, early, msg),
484
            RelayCmd::TRUNCATE => self.handle_truncate().await,
485
            cmd => Err(internal!("relay cmd {cmd} not supported").into()),
486
        }
487
8
    }
488

            
489
24
    async fn handle_forward_cell<R: Runtime>(
490
24
        &mut self,
491
24
        hop_mgr: &mut HopMgr<R>,
492
24
        cell: RelayCircChanMsg,
493
24
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
494
        use RelayCircChanMsg::*;
495

            
496
24
        match cell {
497
12
            Relay(r) => self.handle_relay_cell(hop_mgr, r, false),
498
12
            RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into(), true),
499
            Destroy(d) => {
500
                self.handle_destroy_cell(d)?;
501
                Ok(None)
502
            }
503
            PaddingNegotiate(p) => {
504
                self.handle_padding_negotiate(p)?;
505
                Ok(None)
506
            }
507
        }
508
24
    }
509

            
510
4
    fn handle_event(
511
4
        &mut self,
512
4
        event: Self::CircEvent,
513
4
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
514
4
        match event {
515
4
            CircEvent::ExtendResult(res) => self.handle_extend_result(res),
516
        }
517
4
    }
518

            
519
54
    async fn outbound_chan_ready(&mut self) -> Result<()> {
520
36
        future::poll_fn(|cx| match &mut self.outbound {
521
12
            Some(chan) => {
522
12
                let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);
523

            
524
12
                chan.outbound_chan_tx.poll_ready_unpin(cx)
525
            }
526
            None => {
527
                // Pedantically, if the channel doesn't exist, it can't be ready,
528
                // but we have no choice here than to return Ready
529
                // (returning Pending would cause the reactor to lock up).
530
                //
531
                // Returning ready here means the base reactor is allowed to read
532
                // from its inbound channel. This is OK, because if we *do*
533
                // read a cell from that channel and find ourselves needing to
534
                // forward it to the next hop, we simply return a proto violation error,
535
                // shutting down the reactor.
536
24
                Poll::Ready(Ok(()))
537
            }
538
36
        })
539
36
        .await
540
36
    }
541
}
542

            
543
impl ControlHandler for Forward {
544
    type CtrlMsg = CtrlMsg;
545
    type CtrlCmd = CtrlCmd;
546

            
547
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
548
        let () = cmd;
549
        Ok(())
550
    }
551

            
552
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
553
        let () = msg;
554
        Ok(())
555
    }
556
}
557

            
558
impl Drop for Forward {
559
16
    fn drop(&mut self) {
560
16
        if let Some(outbound) = self.outbound.as_mut() {
561
4
            // This will send a DESTROY down the outbound channel
562
4
            let _ = outbound.channel.close_circuit(outbound.circ_id);
563
12
        }
564
16
    }
565
}