1
//! A relay's view of the forward (away from the client, towards the exit) state of a circuit.
2

            
3
mod extend_handler;
4

            
5
use extend_handler::ExtendRequestHandler;
6

            
7
use crate::channel::{Channel, ChannelSender};
8
use crate::circuit::CircuitRxReceiver;
9
use crate::circuit::UniqId;
10
use crate::circuit::reactor::ControlHandler;
11
use crate::circuit::reactor::backward::BackwardReactorCmd;
12
use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
13
use crate::circuit::reactor::hop_mgr::HopMgr;
14
use crate::crypto::cell::OutboundRelayLayer;
15
use crate::crypto::cell::RelayCellBody;
16
use crate::relay::RelayCircChanMsg;
17
use crate::util::err::ReactorError;
18
use crate::{Error, HopNum, Result};
19

            
20
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
21
use crate::client::circuit::padding::QueuedCellPaddingInfo;
22

            
23
use crate::relay::channel_provider::ChannelProvider;
24
use crate::relay::reactor::CircuitAccount;
25
use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
26
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
27
use tor_cell::relaycell::msg::{Extended2, SendmeTag};
28
use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
29
use tor_error::internal;
30
use tor_linkspec::OwnedChanTarget;
31
use tor_rtcompat::Runtime;
32

            
33
use futures::channel::mpsc;
34
use futures::{SinkExt as _, future};
35
use tracing::trace;
36

            
37
use std::result::Result as StdResult;
38
use std::sync::Arc;
39
use std::task::Poll;
40

            
41
/// Placeholder for our custom control message type.
42
type CtrlMsg = ();
43

            
44
/// Placeholder for our custom control command type.
45
type CtrlCmd = ();
46

            
47
/// The maximum number of RELAY_EARLY cells allowed on a circuit.
48
///
49
// TODO(relay): should we come up with a consensus parameter for this? (arti#2349)
50
const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;
51

            
52
/// Relay-specific state for the forward reactor.
53
pub(crate) struct Forward {
54
    /// An identifier for logging about this reactor's circuit.
55
    unique_id: UniqId,
56
    /// The outbound view of this circuit, if we are not the last hop.
57
    ///
58
    /// Delivers cells towards the exit.
59
    ///
60
    /// Only set for middle relays.
61
    outbound: Option<Outbound>,
62
    /// The cryptographic state for this circuit for inbound cells.
63
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
64
    /// The number of RELAY_EARLY cells we have seen so far on this circuit.
65
    ///
66
    /// If we see more than [`MAX_RELAY_EARLY_CELLS_PER_CIRCUIT`] RELAY_EARLY cells, we tear down the circuit.
67
    relay_early_count: usize,
68
    /// Helper for handling circuit extension requests.
69
    ///
70
    /// Used for validating EXTEND2 cells.
71
    extend_handler: ExtendRequestHandler,
72
}
73

            
74
/// A type of event issued by the relay forward reactor.
75
pub(crate) enum CircEvent {
76
    /// The outcome of an EXTEND2 request.
77
    ExtendResult(StdResult<ExtendResult, ReactorError>),
78
}
79

            
80
/// A successful circuit extension result.
81
pub(crate) struct ExtendResult {
82
    /// The EXTENDED2 cell to send back to the client.
83
    extended2: Extended2,
84
    /// The outbound channel.
85
    outbound: Outbound,
86
    /// The reading end of the outbound Tor channel, if we are not the last hop.
87
    ///
88
    /// Yields cells moving from the exit towards the client, if we are a middle relay.
89
    outbound_chan_rx: CircuitRxReceiver,
90
}
91

            
92
/// The outbound view of a relay circuit.
93
struct Outbound {
94
    /// The circuit identifier on the outbound Tor channel.
95
    circ_id: CircId,
96
    /// The outbound Tor channel.
97
    channel: Arc<Channel>,
98
    /// The sending end of the outbound Tor channel.
99
    outbound_chan_tx: ChannelSender,
100
}
101

            
102
/// The outcome of `decode_relay_cell`.
103
enum CellDecodeResult {
104
    /// A decrypted cell.
105
    Recognized(SendmeTag, RelayCellDecoderResult),
106
    /// A cell we could not decrypt.
107
    Unrecognizd(RelayCellBody),
108
}
109

            
110
impl Forward {
111
    /// Create a new [`Forward`].
112
20
    pub(crate) fn new(
113
20
        inbound_chan: &Arc<Channel>,
114
20
        unique_id: UniqId,
115
20
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
116
20
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
117
20
        event_tx: mpsc::Sender<CircEvent>,
118
20
        memquota: CircuitAccount,
119
20
    ) -> Self {
120
20
        let inbound_peer = Arc::clone(inbound_chan.peer_info());
121
20
        let extend_handler =
122
20
            ExtendRequestHandler::new(unique_id, chan_provider, inbound_peer, event_tx, memquota);
123

            
124
20
        Self {
125
20
            unique_id,
126
20
            // Initially, we are the last hop in the circuit.
127
20
            outbound: None,
128
20
            crypto_out,
129
20
            relay_early_count: 0,
130
20
            extend_handler,
131
20
        }
132
20
    }
133

            
134
    /// Decode `cell`, returning its corresponding hop number, tag and decoded body.
135
28
    fn decode_relay_cell<R: Runtime>(
136
28
        &mut self,
137
28
        hop_mgr: &mut HopMgr<R>,
138
28
        cell: Relay,
139
28
    ) -> Result<(Option<HopNum>, CellDecodeResult)> {
140
        // Note: the client reactor will return the actual source hopnum
141
28
        let hopnum = None;
142
28
        let cmd = cell.cmd();
143
28
        let mut body = cell.into_relay_body().into();
144
28
        let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
145
12
            return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
146
        };
147

            
148
        // The message is addressed to us! Now it's time to handle it...
149
16
        let mut hops = hop_mgr.hops().write().expect("poisoned lock");
150
16
        let decode_res = hops
151
16
            .get_mut(hopnum)
152
16
            .ok_or_else(|| internal!("msg from non-existent hop???"))?
153
            .inbound
154
16
            .decode(body.into())?;
155

            
156
16
        Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
157
28
    }
158

            
159
    /// Handle a DROP message.
160
    #[allow(clippy::unnecessary_wraps)] // Returns Err if circ-padding is enabled
161
    fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
162
        cfg_if::cfg_if! {
163
            if #[cfg(feature = "circ-padding")] {
164
                Err(internal!("relay circuit padding not yet supported").into())
165
            } else {
166
                Ok(())
167
            }
168
        }
169
    }
170

            
171
    /// Handle the outcome of handling an EXTEND2.
172
4
    fn handle_extend_result(
173
4
        &mut self,
174
4
        res: StdResult<ExtendResult, ReactorError>,
175
4
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
176
        let ExtendResult {
177
4
            extended2,
178
4
            outbound,
179
4
            outbound_chan_rx,
180
4
        } = res?;
181

            
182
4
        self.outbound = Some(outbound);
183

            
184
4
        Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
185
4
            hop: None,
186
4
            extended2,
187
4
            outbound_chan_rx,
188
4
        }))
189
4
    }
190

            
191
    /// Handle a RELAY or RELAY_EARLY cell.
192
28
    fn handle_relay_cell<R: Runtime>(
193
28
        &mut self,
194
28
        hop_mgr: &mut HopMgr<R>,
195
28
        cell: Relay,
196
28
        early: bool,
197
28
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
198
28
        if early {
199
16
            self.relay_early_count += 1;
200

            
201
16
            if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
202
                return Err(
203
                    Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
204
                );
205
16
            }
206
12
        }
207

            
208
28
        let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
209
28
        let (tag, decode_res) = match res {
210
12
            CellDecodeResult::Unrecognizd(body) => {
211
12
                self.handle_unrecognized_cell(body, None, early)?;
212
8
                return Ok(None);
213
            }
214
16
            CellDecodeResult::Recognized(tag, res) => (tag, res),
215
        };
216

            
217
16
        Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
218
16
            cell: decode_res,
219
16
            early,
220
16
            hopnum,
221
16
            tag,
222
16
        }))
223
28
    }
224

            
225
    /// Handle a forward cell that we could not decrypt.
226
12
    fn handle_unrecognized_cell(
227
12
        &mut self,
228
12
        body: RelayCellBody,
229
12
        info: Option<QueuedCellPaddingInfo>,
230
12
        early: bool,
231
12
    ) -> StdResult<(), ReactorError> {
232
        // TODO(relay): remove this log once we add some tests
233
        // and confirm relaying cells works as expected
234
        // (in practice it will be too noisy to be useful, even at trace level).
235
12
        trace!(
236
            circ_id = %self.unique_id,
237
            "Forwarding unrecognized cell"
238
        );
239

            
240
12
        let Some(chan) = self.outbound.as_mut() else {
241
            // The client shouldn't try to send us any cells before it gets
242
            // an EXTENDED2 cell from us
243
4
            return Err(Error::CircProto(
244
4
                "Asked to forward cell before the circuit was extended?!".into(),
245
4
            )
246
4
            .into());
247
        };
248

            
249
8
        let msg = Relay::from(BoxedCellBody::from(body));
250
8
        let relay = if early {
251
4
            AnyChanMsg::RelayEarly(msg.into())
252
        } else {
253
4
            AnyChanMsg::Relay(msg)
254
        };
255
8
        let cell = AnyChanCell::new(Some(chan.circ_id), relay);
256

            
257
        // Note: this future is always `Ready`, because we checked the sink for readiness
258
        // before polling the input channel, so await won't block.
259
8
        chan.outbound_chan_tx.start_send_unpin((cell, info))?;
260

            
261
8
        Ok(())
262
12
    }
263

            
264
    /// Handle a TRUNCATE cell.
265
    #[allow(clippy::unused_async)] // TODO(relay)
266
    async fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
267
        // TODO(relay): when we implement this, we should try to do better than C Tor:
268
        // if we have some cells queued for the next hop in the circuit,
269
        // we should try to flush them *before* tearing it down.
270
        //
271
        // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3487#note_3296035
272
        Err(internal!("TRUNCATE is not implemented").into())
273
    }
274

            
275
    /// Handle a DESTROY cell originating from the client.
276
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
277
    fn handle_destroy_cell(&mut self, _cell: Destroy) -> StdResult<(), ReactorError> {
278
        Err(internal!("DESTROY is not implemented").into())
279
    }
280

            
281
    /// Handle a PADDING_NEGOTIATE cell originating from the client.
282
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
283
    fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
284
        Err(internal!("PADDING_NEGOTIATE is not implemented").into())
285
    }
286
}
287

            
288
impl ForwardHandler for Forward {
289
    type BuildSpec = OwnedChanTarget;
290
    type CircChanMsg = RelayCircChanMsg;
291
    type CircEvent = CircEvent;
292

            
293
12
    async fn handle_meta_msg<R: Runtime>(
294
12
        &mut self,
295
12
        runtime: &R,
296
12
        early: bool,
297
12
        _hopnum: Option<HopNum>,
298
12
        msg: UnparsedRelayMsg,
299
12
        _relay_cell_format: RelayCellFormat,
300
12
    ) -> StdResult<(), ReactorError> {
301
12
        match msg.cmd() {
302
            RelayCmd::DROP => self.handle_drop(),
303
12
            RelayCmd::EXTEND2 => self.extend_handler.handle_extend2(runtime, early, msg),
304
            RelayCmd::TRUNCATE => self.handle_truncate().await,
305
            cmd => Err(internal!("relay cmd {cmd} not supported").into()),
306
        }
307
12
    }
308

            
309
28
    async fn handle_forward_cell<R: Runtime>(
310
28
        &mut self,
311
28
        hop_mgr: &mut HopMgr<R>,
312
28
        cell: RelayCircChanMsg,
313
28
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
314
        use RelayCircChanMsg::*;
315

            
316
28
        match cell {
317
12
            Relay(r) => self.handle_relay_cell(hop_mgr, r, false),
318
16
            RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into(), true),
319
            Destroy(d) => {
320
                self.handle_destroy_cell(d)?;
321
                Ok(None)
322
            }
323
            PaddingNegotiate(p) => {
324
                self.handle_padding_negotiate(p)?;
325
                Ok(None)
326
            }
327
        }
328
28
    }
329

            
330
4
    fn handle_event(
331
4
        &mut self,
332
4
        event: Self::CircEvent,
333
4
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
334
4
        match event {
335
4
            CircEvent::ExtendResult(res) => self.handle_extend_result(res),
336
        }
337
4
    }
338

            
339
60
    async fn outbound_chan_ready(&mut self) -> Result<()> {
340
40
        future::poll_fn(|cx| match &mut self.outbound {
341
12
            Some(chan) => {
342
12
                let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);
343

            
344
12
                chan.outbound_chan_tx.poll_ready_unpin(cx)
345
            }
346
            None => {
347
                // Pedantically, if the channel doesn't exist, it can't be ready,
348
                // but we have no choice here than to return Ready
349
                // (returning Pending would cause the reactor to lock up).
350
                //
351
                // Returning ready here means the base reactor is allowed to read
352
                // from its inbound channel. This is OK, because if we *do*
353
                // read a cell from that channel and find ourselves needing to
354
                // forward it to the next hop, we simply return a proto violation error,
355
                // shutting down the reactor.
356
28
                Poll::Ready(Ok(()))
357
            }
358
40
        })
359
40
        .await
360
40
    }
361
}
362

            
363
impl ControlHandler for Forward {
364
    type CtrlMsg = CtrlMsg;
365
    type CtrlCmd = CtrlCmd;
366

            
367
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
368
        let () = cmd;
369
        Ok(())
370
    }
371

            
372
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
373
        let () = msg;
374
        Ok(())
375
    }
376
}
377

            
378
impl Drop for Forward {
379
20
    fn drop(&mut self) {
380
20
        if let Some(outbound) = self.outbound.as_mut() {
381
4
            // This will send a DESTROY down the outbound channel
382
4
            let _ = outbound.channel.close_circuit(outbound.circ_id);
383
16
        }
384
20
    }
385
}