1
//! A relay's view of the forward (away from the client, towards the exit) state of a circuit.
2

            
3
use crate::channel::{Channel, ChannelSender};
4
use crate::circuit::CircuitRxReceiver;
5
use crate::circuit::UniqId;
6
use crate::circuit::create::{Create2Wrap, CreateHandshakeWrap};
7
use crate::circuit::reactor::ControlHandler;
8
use crate::circuit::reactor::backward::BackwardReactorCmd;
9
use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
10
use crate::circuit::reactor::hop_mgr::HopMgr;
11
use crate::crypto::cell::OutboundRelayLayer;
12
use crate::crypto::cell::RelayCellBody;
13
use crate::relay::RelayCircChanMsg;
14
use crate::util::err::ReactorError;
15
use crate::{Error, HopNum, Result};
16

            
17
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
18
use crate::client::circuit::padding::QueuedCellPaddingInfo;
19

            
20
use crate::relay::channel_provider::{ChannelProvider, ChannelResult, OutboundChanSender};
21
use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
22
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
23
use tor_cell::relaycell::msg::{Extend2, Extended2, SendmeTag};
24
use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
25
use tor_error::{internal, into_internal, warn_report};
26
use tor_linkspec::decode::Strictness;
27
use tor_linkspec::{OwnedChanTarget, OwnedChanTargetBuilder};
28
use tor_rtcompat::{Runtime, SpawnExt as _};
29

            
30
use futures::channel::mpsc;
31
use futures::{SinkExt as _, StreamExt as _, future};
32

            
33
use std::result::Result as StdResult;
34
use std::sync::Arc;
35
use std::task::Poll;
36

            
37
/// Placeholder for our custom control message type.
38
type CtrlMsg = ();
39

            
40
/// Placeholder for our custom control command type.
41
type CtrlCmd = ();
42

            
43
/// The maximum number of RELAY_EARLY cells allowed on a circuit.
44
///
45
// TODO(relay): should we come up with a consensus parameter for this? (arti#2349)
46
const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;
47

            
48
/// Relay-specific state for the forward reactor.
49
pub(crate) struct Forward {
50
    /// An identifier for logging about this reactor's circuit.
51
    unique_id: UniqId,
52
    /// The outbound view of this circuit, if we are not the last hop.
53
    ///
54
    /// Delivers cells towards the exit.
55
    ///
56
    /// Only set for middle relays.
57
    outbound: Option<Outbound>,
58
    /// The cryptographic state for this circuit for inbound cells.
59
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
60
    /// A handle to a [`ChannelProvider`], used for initiating outgoing Tor channels.
61
    ///
62
    /// Note: all circuit reactors of a relay need to be initialized
63
    /// with the *same* underlying Tor channel provider (`ChanMgr`),
64
    /// to enable the reuse of existing Tor channels where possible.
65
    chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send>,
66
    /// Whether we have received an EXTEND2 on this circuit.
67
    ///
68
    // TODO(relay): bools can be finicky.
69
    // Maybe we should combine this bool and the optional
70
    // outbound into a new state machine type
71
    // (with states Initial -> Extending -> Extended(Outbound))?
72
    // But should not do this if it turns out more convoluted than the bool-based approach.
73
    have_seen_extend2: bool,
74
    /// The number of RELAY_EARLY cells we have seen so far on this circuit.
75
    ///
76
    /// If we see more than [`MAX_RELAY_EARLY_CELLS_PER_CIRCUIT`] RELAY_EARLY cells, we tear down the circuit.
77
    relay_early_count: usize,
78
    /// A stream of events to be read from the main loop of the reactor.
79
    event_tx: mpsc::Sender<CircEvent>,
80
}
81

            
82
/// A type of event issued by the relay forward reactor.
83
pub(crate) enum CircEvent {
84
    /// The outcome of an EXTEND2 request.
85
    ExtendResult(StdResult<ExtendResult, ReactorError>),
86
}
87

            
88
/// A successful circuit extension result.
89
pub(crate) struct ExtendResult {
90
    /// The EXTENDED2 cell to send back to the client.
91
    extended2: Extended2,
92
    /// The outbound channel.
93
    outbound: Outbound,
94
    /// The reading end of the outbound Tor channel, if we are not the last hop.
95
    ///
96
    /// Yields cells moving from the exit towards the client, if we are a middle relay.
97
    outbound_chan_rx: CircuitRxReceiver,
98
}
99

            
100
/// The outbound view of a relay circuit.
101
struct Outbound {
102
    /// The circuit identifier on the outbound Tor channel.
103
    circ_id: CircId,
104
    /// The outbound Tor channel.
105
    channel: Arc<Channel>,
106
    /// The sending end of the outbound Tor channel.
107
    outbound_chan_tx: ChannelSender,
108
}
109

            
110
/// The outcome of `decode_relay_cell`.
111
enum CellDecodeResult {
112
    /// A decrypted cell.
113
    Recognized(SendmeTag, RelayCellDecoderResult),
114
    /// A cell we could not decrypt.
115
    Unrecognizd(RelayCellBody),
116
}
117

            
118
impl Forward {
119
    /// Create a new [`Forward`].
120
    pub(crate) fn new(
121
        unique_id: UniqId,
122
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
123
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send>,
124
        event_tx: mpsc::Sender<CircEvent>,
125
    ) -> Self {
126
        Self {
127
            unique_id,
128
            // Initially, we are the last hop in the circuit.
129
            outbound: None,
130
            crypto_out,
131
            chan_provider,
132
            have_seen_extend2: false,
133
            relay_early_count: 0,
134
            event_tx,
135
        }
136
    }
137

            
138
    /// Decode `cell`, returning its corresponding hop number, tag and decoded body.
139
    fn decode_relay_cell<R: Runtime>(
140
        &mut self,
141
        hop_mgr: &mut HopMgr<R>,
142
        cell: Relay,
143
    ) -> Result<(Option<HopNum>, CellDecodeResult)> {
144
        // Note: the client reactor will return the actual source hopnum
145
        let hopnum = None;
146
        let cmd = cell.cmd();
147
        let mut body = cell.into_relay_body().into();
148
        let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
149
            return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
150
        };
151

            
152
        // The message is addressed to us! Now it's time to handle it...
153
        let mut hops = hop_mgr.hops().write().expect("poisoned lock");
154
        let decode_res = hops
155
            .get_mut(hopnum)
156
            .ok_or_else(|| internal!("msg from non-existant hop???"))?
157
            .inbound
158
            .decode(body.into())?;
159

            
160
        Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
161
    }
162

            
163
    /// Handle a DROP message.
164
    #[allow(clippy::unnecessary_wraps)] // Returns Err if circ-padding is enabled
165
    fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
166
        cfg_if::cfg_if! {
167
            if #[cfg(feature = "circ-padding")] {
168
                Err(internal!("relay circuit padding not yet supported").into())
169
            } else {
170
                Ok(())
171
            }
172
        }
173
    }
174

            
175
    /// Handle an EXTEND2 cell.
176
    ///
177
    /// This spawns a background task for dealing with the circuit extension,
178
    /// which then reports back the result via the [`Self::event_tx`] MPSC stream.
179
    /// Note that this MPSC stream is polled from the `ForwardReactor` main loop,
180
    /// and each `CircEvent` is passed back to [`Self::handle_event()`[ for handling.
181
    fn handle_extend2<R: Runtime>(
182
        &mut self,
183
        runtime: &R,
184
        early: bool,
185
        msg: UnparsedRelayMsg,
186
    ) -> StdResult<(), ReactorError> {
187
        // TODO(relay): this should be allowed if the AllowNonearlyExtend consensus
188
        // param is set (arti#2349)
189
        if !early {
190
            return Err(Error::CircProto("got EXTEND2 in a RELAY cell?!".into()).into());
191
        }
192

            
193
        // Check if we're in the right state before parsing the EXTEND2
194
        if self.have_seen_extend2 {
195
            return Err(Error::CircProto("got 2 EXTEND2 on the same circuit?!".into()).into());
196
        }
197

            
198
        self.have_seen_extend2 = true;
199

            
200
        let to_bytes_err = |e| Error::from_bytes_err(e, "EXTEND2 message");
201

            
202
        let extend2 = msg.decode::<Extend2>().map_err(to_bytes_err)?.into_msg();
203

            
204
        let chan_target = OwnedChanTargetBuilder::from_encoded_linkspecs(
205
            Strictness::Standard,
206
            extend2.linkspecs(),
207
        )
208
        .map_err(|err| Error::LinkspecDecodeErr {
209
            object: "EXTEND2",
210
            err,
211
        })?
212
        .build()
213
        .map_err(|_| {
214
            // TODO: should we include the error in the circ proto error context?
215
            Error::CircProto("Invalid channel target".into())
216
        })?;
217

            
218
        // Note: we don't do any further validation on the EXTEND2 here,
219
        // under the assumption it will be handled by the ChannelProvider.
220

            
221
        let (chan_tx, chan_rx) = mpsc::unbounded();
222

            
223
        let chan_tx = OutboundChanSender(chan_tx);
224
        Arc::clone(&self.chan_provider).get_or_launch(self.unique_id, chan_target, chan_tx)?;
225

            
226
        let mut result_tx = self.event_tx.clone();
227
        let rt = runtime.clone();
228

            
229
        // TODO(relay): because we dispatch this the entire EXTEND2 handling to a background task,
230
        // we don't really need the channel provider to send us the outcome via an MPSC channel,
231
        // because get_or_launch() could simply be async (it wouldn't block the reactor,
232
        // because it runs in another task). Maybe we need to rethink the ChannelProvider API?
233
        runtime
234
            .spawn(async move {
235
                let res = Self::extend_circuit(rt, extend2, chan_rx).await;
236

            
237
                // Discard the error if the reactor shut down before we had
238
                // a chance to complete the extend handshake
239
                let _ = result_tx.send(CircEvent::ExtendResult(res)).await;
240
            })
241
            .map_err(into_internal!("failed to spawn extend task?!"))?;
242

            
243
        Ok(())
244
    }
245

            
246
    /// Handle the outcome of handling an EXTEND2.
247
    fn handle_extend_result(
248
        &mut self,
249
        res: StdResult<ExtendResult, ReactorError>,
250
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
251
        let ExtendResult {
252
            extended2,
253
            outbound,
254
            outbound_chan_rx,
255
        } = res?;
256

            
257
        self.outbound = Some(outbound);
258

            
259
        Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
260
            hop: None,
261
            extended2,
262
            outbound_chan_rx,
263
        }))
264
    }
265

            
266
    /// Extend this circuit on the channel received on `chan_rx`.
267
    ///
268
    /// Note: this gets spawned in a background task from
269
    /// [`Self::handle_extend2`] so as not to block the reactor main loop.
270
    ///
271
    #[allow(unused_variables)] // will become used once we implement CREATED2 timeouts
272
    async fn extend_circuit<R: Runtime>(
273
        _runtime: R,
274
        extend2: Extend2,
275
        mut chan_rx: mpsc::UnboundedReceiver<ChannelResult>,
276
    ) -> StdResult<ExtendResult, ReactorError> {
277
        // We expect the channel build timeout to be enforced by the ChannelProvider
278
        let chan_res = chan_rx
279
            .next()
280
            .await
281
            .ok_or_else(|| internal!("channel provider task exited"))?;
282

            
283
        let channel = match chan_res {
284
            Ok(c) => c,
285
            Err(e) => {
286
                warn_report!(e, "Failed to launch outgoing channel");
287
                // Note: retries are handled within
288
                // get_or_launch(), so if we receive an
289
                // error at this point, we need to bail
290
                return Err(ReactorError::Shutdown);
291
            }
292
        };
293

            
294
        // Now that we finally have a forward Tor channel,
295
        // it's time to forward the onion skin and extend the circuit...
296
        //
297
        // Note: the only reason we need to await here is because internally
298
        // new_outbound_circ() sends a control message to the channel reactor handles,
299
        // which is handled asynchronously. In practice, we're not actually waiting on
300
        // the network here, so in theory we shouldn't need a timeout for this operation.
301
        let (circ_id, outbound_chan_rx, createdreceiver) = channel.new_outbound_circ().await?;
302

            
303
        // We have allocated a circuit in the channel's circmap,
304
        // now it's time to send the CREATE2 and wait for the response.
305
        let create2_wrap = Create2Wrap {
306
            handshake_type: extend2.handshake_type(),
307
        };
308
        let create2 = create2_wrap.to_chanmsg(extend2.handshake().into());
309

            
310
        // Time to write the CREATE2 to the outbound channel...
311
        let mut outbound_chan_tx = channel.sender();
312
        let cell = AnyChanCell::new(Some(circ_id), create2);
313
        outbound_chan_tx.send((cell, None)).await?;
314

            
315
        // TODO(relay): we need a timeout here, otherwise we might end up waiting forever
316
        // for the CREATED2 to arrive.
317
        //
318
        // There is some complexity here, see
319
        // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3648#note_3340125
320
        let response = createdreceiver
321
            .await
322
            .map_err(|_| internal!("channel disappeared?"))?;
323

            
324
        let outbound = Outbound {
325
            circ_id,
326
            channel: Arc::clone(&channel),
327
            outbound_chan_tx,
328
        };
329

            
330
        // If we reach this point, it means we have extended
331
        // the circuit by one hop, so we need to take the contents
332
        // of the CREATE/CREATED2 cell, and package an EXTEND/EXTENDED2
333
        // to send back to the client.
334
        let created2_body = create2_wrap.decode_chanmsg(response)?;
335
        let extended2 = Extended2::new(created2_body);
336

            
337
        Ok(ExtendResult {
338
            extended2,
339
            outbound,
340
            outbound_chan_rx,
341
        })
342
    }
343

            
344
    /// Handle a RELAY or RELAY_EARLY cell.
345
    fn handle_relay_cell<R: Runtime>(
346
        &mut self,
347
        hop_mgr: &mut HopMgr<R>,
348
        cell: Relay,
349
        early: bool,
350
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
351
        if early {
352
            self.relay_early_count += 1;
353

            
354
            if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
355
                return Err(
356
                    Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
357
                );
358
            }
359
        }
360

            
361
        let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
362
        let (tag, decode_res) = match res {
363
            CellDecodeResult::Unrecognizd(body) => {
364
                self.handle_unrecognized_cell(body, None)?;
365
                return Ok(None);
366
            }
367
            CellDecodeResult::Recognized(tag, res) => (tag, res),
368
        };
369

            
370
        Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
371
            cell: decode_res,
372
            early,
373
            hopnum,
374
            tag,
375
        }))
376
    }
377

            
378
    /// Handle a TRUNCATE cell.
379
    #[allow(clippy::unused_async)] // TODO(relay)
380
    async fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
381
        // TODO(relay): when we implement this, we should try to do better than C Tor:
382
        // if we have some cells queued for the next hop in the circuit,
383
        // we should try to flush them *before* tearing it down.
384
        //
385
        // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3487#note_3296035
386
        Err(internal!("TRUNCATE is not implemented").into())
387
    }
388

            
389
    /// Handle a DESTROY cell originating from the client.
390
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
391
    fn handle_destroy_cell(&mut self, _cell: Destroy) -> StdResult<(), ReactorError> {
392
        Err(internal!("DESTROY is not implemented").into())
393
    }
394

            
395
    /// Handle a PADDING_NEGOTIATE cell originating from the client.
396
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
397
    fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
398
        Err(internal!("PADDING_NEGOTIATE is not implemented").into())
399
    }
400
}
401

            
402
impl ForwardHandler for Forward {
403
    type BuildSpec = OwnedChanTarget;
404
    type CircChanMsg = RelayCircChanMsg;
405
    type CircEvent = CircEvent;
406

            
407
    async fn handle_meta_msg<R: Runtime>(
408
        &mut self,
409
        runtime: &R,
410
        early: bool,
411
        _hopnum: Option<HopNum>,
412
        msg: UnparsedRelayMsg,
413
        _relay_cell_format: RelayCellFormat,
414
    ) -> StdResult<(), ReactorError> {
415
        match msg.cmd() {
416
            RelayCmd::DROP => self.handle_drop(),
417
            RelayCmd::EXTEND2 => self.handle_extend2(runtime, early, msg),
418
            RelayCmd::TRUNCATE => self.handle_truncate().await,
419
            cmd => Err(internal!("relay cmd {cmd} not supported").into()),
420
        }
421
    }
422

            
423
    fn handle_unrecognized_cell(
424
        &mut self,
425
        body: RelayCellBody,
426
        info: Option<QueuedCellPaddingInfo>,
427
    ) -> StdResult<(), ReactorError> {
428
        let Some(chan) = self.outbound.as_mut() else {
429
            // The client shouldn't try to send us any cells before it gets
430
            // an EXTENDED2 cell from us
431
            return Err(Error::CircProto(
432
                "Asked to forward cell before the circuit was extended?!".into(),
433
            )
434
            .into());
435
        };
436

            
437
        let msg = Relay::from(BoxedCellBody::from(body));
438
        let relay = AnyChanMsg::Relay(msg);
439
        let cell = AnyChanCell::new(Some(chan.circ_id), relay);
440

            
441
        // Note: this future is always `Ready`, because we checked the sink for readiness
442
        // before polling the input channel, so await won't block.
443
        chan.outbound_chan_tx.start_send_unpin((cell, info))?;
444

            
445
        Ok(())
446
    }
447

            
448
    async fn handle_forward_cell<R: Runtime>(
449
        &mut self,
450
        hop_mgr: &mut HopMgr<R>,
451
        cell: RelayCircChanMsg,
452
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
453
        use RelayCircChanMsg::*;
454

            
455
        match cell {
456
            Relay(r) => self.handle_relay_cell(hop_mgr, r, false),
457
            RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into(), true),
458
            Destroy(d) => {
459
                self.handle_destroy_cell(d)?;
460
                Ok(None)
461
            }
462
            PaddingNegotiate(p) => {
463
                self.handle_padding_negotiate(p)?;
464
                Ok(None)
465
            }
466
        }
467
    }
468

            
469
    fn handle_event(
470
        &mut self,
471
        event: Self::CircEvent,
472
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
473
        match event {
474
            CircEvent::ExtendResult(res) => self.handle_extend_result(res),
475
        }
476
    }
477

            
478
    async fn outbound_chan_ready(&mut self) -> Result<()> {
479
        future::poll_fn(|cx| match &mut self.outbound {
480
            Some(chan) => {
481
                let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);
482

            
483
                chan.outbound_chan_tx.poll_ready_unpin(cx)
484
            }
485
            None => {
486
                // Pedantically, if the channel doesn't exist, it can't be ready,
487
                // but we have no choice here than to return Ready
488
                // (returning Pending would cause the reactor to lock up).
489
                //
490
                // Returning ready here means the base reactor is allowed to read
491
                // from its inbound channel. This is OK, because if we *do*
492
                // read a cell from that channel and find ourselves needing to
493
                // forward it to the next hop, we simply return a proto violation error,
494
                // shutting down the reactor.
495
                Poll::Ready(Ok(()))
496
            }
497
        })
498
        .await
499
    }
500
}
501

            
502
impl ControlHandler for Forward {
503
    type CtrlMsg = CtrlMsg;
504
    type CtrlCmd = CtrlCmd;
505

            
506
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
507
        let () = cmd;
508
        Ok(())
509
    }
510

            
511
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
512
        let () = msg;
513
        Ok(())
514
    }
515
}
516

            
517
impl Drop for Forward {
518
    fn drop(&mut self) {
519
        if let Some(outbound) = self.outbound.as_mut() {
520
            // This will send a DESTROY down the outbound channel
521
            let _ = outbound.channel.close_circuit(outbound.circ_id);
522
        }
523
    }
524
}