1
//! A relay's view of the forward (away from the client, towards the exit) state of a circuit.
2

            
3
mod extend_handler;
4

            
5
use extend_handler::ExtendRequestHandler;
6

            
7
use crate::channel::{Channel, ChannelSender};
8
use crate::circuit::CircuitRxReceiver;
9
use crate::circuit::UniqId;
10
use crate::circuit::celltypes::RelayMaybeEarlyChanMsg;
11
use crate::circuit::reactor::ControlHandler;
12
use crate::circuit::reactor::backward::BackwardReactorCmd;
13
use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
14
use crate::circuit::reactor::hop_mgr::HopMgr;
15
use crate::crypto::cell::OutboundRelayLayer;
16
use crate::crypto::cell::RelayCellBody;
17
use crate::relay::RelayCircChanMsg;
18
use crate::util::err::ReactorError;
19
use crate::{Error, HopNum, Result};
20

            
21
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
22
use crate::client::circuit::padding::QueuedCellPaddingInfo;
23

            
24
use crate::relay::channel_provider::ChannelProvider;
25
use crate::relay::reactor::CircuitAccount;
26
use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
27
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
28
use tor_cell::relaycell::msg::{Extended2, SendmeTag};
29
use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
30
use tor_error::internal;
31
use tor_linkspec::OwnedChanTarget;
32
use tor_rtcompat::Runtime;
33

            
34
use futures::channel::mpsc;
35
use futures::{SinkExt as _, future};
36
use tracing::{debug, trace};
37

            
38
use std::result::Result as StdResult;
39
use std::sync::Arc;
40
use std::task::Poll;
41

            
42
/// Placeholder for our custom control message type.
43
type CtrlMsg = ();
44

            
45
/// Placeholder for our custom control command type.
46
type CtrlCmd = ();
47

            
48
/// The maximum number of RELAY_EARLY cells allowed on a circuit.
49
///
50
// TODO(relay): should we come up with a consensus parameter for this? (arti#2349)
51
const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;
52

            
53
/// Relay-specific state for the forward reactor.
54
pub(crate) struct Forward {
55
    /// An identifier for logging about this reactor's circuit.
56
    unique_id: UniqId,
57
    /// The outbound view of this circuit, if we are not the last hop.
58
    ///
59
    /// Delivers cells towards the exit.
60
    ///
61
    /// Only set for middle relays.
62
    outbound: Option<Outbound>,
63
    /// The cryptographic state for this circuit for inbound cells.
64
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
65
    /// The number of RELAY_EARLY cells we have seen so far on this circuit.
66
    ///
67
    /// If we see more than [`MAX_RELAY_EARLY_CELLS_PER_CIRCUIT`] RELAY_EARLY cells, we tear down the circuit.
68
    relay_early_count: usize,
69
    /// Helper for handling circuit extension requests.
70
    ///
71
    /// Used for validating EXTEND2 cells.
72
    extend_handler: ExtendRequestHandler,
73
}
74

            
75
/// A type of event issued by the relay forward reactor.
76
pub(crate) enum CircEvent {
77
    /// The outcome of an EXTEND2 request.
78
    ExtendResult(StdResult<ExtendResult, ReactorError>),
79
}
80

            
81
/// A successful circuit extension result.
82
pub(crate) struct ExtendResult {
83
    /// The EXTENDED2 cell to send back to the client.
84
    extended2: Extended2,
85
    /// The outbound channel.
86
    outbound: Outbound,
87
    /// The reading end of the outbound Tor channel, if we are not the last hop.
88
    ///
89
    /// Yields cells moving from the exit towards the client, if we are a middle relay.
90
    outbound_chan_rx: CircuitRxReceiver,
91
}
92

            
93
/// The outbound view of a relay circuit.
94
struct Outbound {
95
    /// The circuit identifier on the outbound Tor channel.
96
    circ_id: CircId,
97
    /// The outbound Tor channel.
98
    channel: Arc<Channel>,
99
    /// The sending end of the outbound Tor channel.
100
    outbound_chan_tx: ChannelSender,
101
}
102

            
103
/// The outcome of `decode_relay_cell`.
104
enum CellDecodeResult {
105
    /// A decrypted cell.
106
    Recognized(SendmeTag, RelayCellDecoderResult),
107
    /// A cell we could not decrypt.
108
    Unrecognizd(RelayCellBody),
109
}
110

            
111
impl Forward {
112
    /// Create a new [`Forward`].
113
40
    pub(crate) fn new(
114
40
        inbound_chan: &Arc<Channel>,
115
40
        unique_id: UniqId,
116
40
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
117
40
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
118
40
        event_tx: mpsc::Sender<CircEvent>,
119
40
        memquota: CircuitAccount,
120
40
    ) -> Self {
121
40
        let inbound_peer = Arc::clone(inbound_chan.peer_info());
122
40
        let extend_handler =
123
40
            ExtendRequestHandler::new(unique_id, chan_provider, inbound_peer, event_tx, memquota);
124

            
125
40
        Self {
126
40
            unique_id,
127
40
            // Initially, we are the last hop in the circuit.
128
40
            outbound: None,
129
40
            crypto_out,
130
40
            relay_early_count: 0,
131
40
            extend_handler,
132
40
        }
133
40
    }
134

            
135
    /// Decode `cell`, returning its corresponding hop number, tag and decoded body.
136
52
    fn decode_relay_cell<R: Runtime>(
137
52
        &mut self,
138
52
        hop_mgr: &mut HopMgr<R>,
139
52
        cell: RelayMaybeEarlyChanMsg,
140
52
    ) -> Result<(Option<HopNum>, CellDecodeResult)> {
141
        // Note: the client reactor will return the actual source hopnum
142
52
        let hopnum = None;
143
52
        let cmd = cell.cmd();
144
52
        let mut body = cell.into_relay_body().into();
145
52
        let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
146
12
            return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
147
        };
148

            
149
        // The message is addressed to us! Now it's time to handle it...
150
40
        let mut hops = hop_mgr.hops().write().expect("poisoned lock");
151
40
        let decode_res = hops
152
40
            .get_mut(hopnum)
153
40
            .ok_or_else(|| internal!("msg from non-existent hop???"))?
154
            .inbound
155
40
            .decode(body.into())?;
156

            
157
40
        Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
158
52
    }
159

            
160
    /// Handle a DROP message.
161
    #[allow(clippy::unnecessary_wraps)] // Returns Err if circ-padding is enabled
162
    fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
163
        cfg_if::cfg_if! {
164
            if #[cfg(feature = "circ-padding")] {
165
                Err(internal!("relay circuit padding not yet supported").into())
166
            } else {
167
                Ok(())
168
            }
169
        }
170
    }
171

            
172
    /// Handle the outcome of handling an EXTEND2.
173
8
    fn handle_extend_result(
174
8
        &mut self,
175
8
        res: StdResult<ExtendResult, ReactorError>,
176
8
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
177
        let ExtendResult {
178
8
            extended2,
179
8
            outbound,
180
8
            outbound_chan_rx,
181
8
        } = res?;
182

            
183
8
        self.outbound = Some(outbound);
184

            
185
8
        Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
186
8
            hop: None,
187
8
            extended2,
188
8
            outbound_chan_rx,
189
8
        }))
190
8
    }
191

            
192
    /// Handle a RELAY or RELAY_EARLY cell.
193
52
    fn handle_relay_cell<R: Runtime>(
194
52
        &mut self,
195
52
        hop_mgr: &mut HopMgr<R>,
196
52
        cell: RelayMaybeEarlyChanMsg,
197
52
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
198
52
        let early = matches!(cell, RelayMaybeEarlyChanMsg::RelayEarly(_));
199

            
200
52
        if early {
201
20
            self.relay_early_count += 1;
202

            
203
20
            if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
204
                return Err(
205
                    Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
206
                );
207
20
            }
208
32
        }
209

            
210
52
        let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
211
52
        let (tag, decode_res) = match res {
212
12
            CellDecodeResult::Unrecognizd(body) => {
213
12
                self.handle_unrecognized_cell(body, None, early)?;
214
8
                return Ok(None);
215
            }
216
40
            CellDecodeResult::Recognized(tag, res) => (tag, res),
217
        };
218

            
219
40
        Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
220
40
            cell: decode_res,
221
40
            early,
222
40
            hopnum,
223
40
            tag,
224
40
        }))
225
52
    }
226

            
227
    /// Handle a forward cell that we could not decrypt.
228
12
    fn handle_unrecognized_cell(
229
12
        &mut self,
230
12
        body: RelayCellBody,
231
12
        info: Option<QueuedCellPaddingInfo>,
232
12
        early: bool,
233
12
    ) -> StdResult<(), ReactorError> {
234
        // TODO(relay): remove this log once we add some tests
235
        // and confirm relaying cells works as expected
236
        // (in practice it will be too noisy to be useful, even at trace level).
237
12
        trace!(
238
            circ_id = %self.unique_id,
239
            "Forwarding unrecognized cell"
240
        );
241

            
242
12
        let Some(chan) = self.outbound.as_mut() else {
243
            // The client shouldn't try to send us any cells before it gets
244
            // an EXTENDED2 cell from us
245
4
            return Err(Error::CircProto(
246
4
                "Asked to forward cell before the circuit was extended?!".into(),
247
4
            )
248
4
            .into());
249
        };
250

            
251
8
        let msg = Relay::from(BoxedCellBody::from(body));
252
8
        let relay = if early {
253
4
            AnyChanMsg::RelayEarly(msg.into())
254
        } else {
255
4
            AnyChanMsg::Relay(msg)
256
        };
257
8
        let cell = AnyChanCell::new(Some(chan.circ_id), relay);
258

            
259
        // Note: this future is always `Ready`, because we checked the sink for readiness
260
        // before polling the input channel, so await won't block.
261
8
        chan.outbound_chan_tx.start_send_unpin((cell, info))?;
262

            
263
8
        Ok(())
264
12
    }
265

            
266
    /// Handle a TRUNCATE cell.
267
4
    fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
268
        // This is not strictly spec compliant,
269
        // but since none of our implementations use TRUNCATE,
270
        // we deem it a proto violation and shut down the circuit.
271
        //
272
        // TODO(spec): codify this in the spec
273
4
        Err(Error::CircProto("TRUNCATE not allowed".into()).into())
274
4
    }
275

            
276
    /// Handle a DESTROY cell originating from the client.
277
4
    fn handle_destroy_cell(&mut self, cell: &Destroy) -> StdResult<(), ReactorError> {
278
4
        debug!(
279
            circ_id = %self.unique_id,
280
            reason = %cell.reason(),
281
            "Received outbound DESTROY, circuit shutting down",
282
        );
283

            
284
        // We don't need to send a DESTROY cell down the channel,
285
        // because that's handled implicitly by our Drop implementation
286
4
        Err(ReactorError::Shutdown)
287
4
    }
288

            
289
    /// Handle a PADDING_NEGOTIATE cell originating from the client.
290
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
291
    fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
292
        Err(internal!("PADDING_NEGOTIATE is not implemented").into())
293
    }
294
}
295

            
296
impl ForwardHandler for Forward {
297
    type BuildSpec = OwnedChanTarget;
298
    type CircChanMsg = RelayCircChanMsg;
299
    type CircEvent = CircEvent;
300

            
301
20
    async fn handle_meta_msg<R: Runtime>(
302
20
        &mut self,
303
20
        runtime: &R,
304
20
        early: bool,
305
20
        _hopnum: Option<HopNum>,
306
20
        msg: UnparsedRelayMsg,
307
20
        _relay_cell_format: RelayCellFormat,
308
20
    ) -> StdResult<(), ReactorError> {
309
20
        match msg.cmd() {
310
            RelayCmd::DROP => self.handle_drop(),
311
16
            RelayCmd::EXTEND2 => self.extend_handler.handle_extend2(runtime, early, msg),
312
4
            RelayCmd::TRUNCATE => self.handle_truncate(),
313
            cmd => Err(internal!("relay cmd {cmd} not supported").into()),
314
        }
315
20
    }
316

            
317
56
    async fn handle_forward_cell<R: Runtime>(
318
56
        &mut self,
319
56
        hop_mgr: &mut HopMgr<R>,
320
56
        cell: RelayCircChanMsg,
321
56
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
322
        use RelayCircChanMsg::*;
323

            
324
56
        match cell {
325
32
            Relay(r) => self.handle_relay_cell(hop_mgr, r.into()),
326
20
            RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into()),
327
4
            Destroy(d) => {
328
4
                self.handle_destroy_cell(&d)?;
329
                Ok(None)
330
            }
331
            PaddingNegotiate(p) => {
332
                self.handle_padding_negotiate(p)?;
333
                Ok(None)
334
            }
335
        }
336
56
    }
337

            
338
8
    fn handle_event(
339
8
        &mut self,
340
8
        event: Self::CircEvent,
341
8
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
342
8
        match event {
343
8
            CircEvent::ExtendResult(res) => self.handle_extend_result(res),
344
        }
345
8
    }
346

            
347
123
    async fn outbound_chan_ready(&mut self) -> Result<()> {
348
82
        future::poll_fn(|cx| match &mut self.outbound {
349
16
            Some(chan) => {
350
16
                let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);
351

            
352
16
                chan.outbound_chan_tx.poll_ready_unpin(cx)
353
            }
354
            None => {
355
                // Pedantically, if the channel doesn't exist, it can't be ready,
356
                // but we have no choice here than to return Ready
357
                // (returning Pending would cause the reactor to lock up).
358
                //
359
                // Returning ready here means the base reactor is allowed to read
360
                // from its inbound channel. This is OK, because if we *do*
361
                // read a cell from that channel and find ourselves needing to
362
                // forward it to the next hop, we simply return a proto violation error,
363
                // shutting down the reactor.
364
66
                Poll::Ready(Ok(()))
365
            }
366
82
        })
367
82
        .await
368
82
    }
369
}
370

            
371
impl ControlHandler for Forward {
372
    type CtrlMsg = CtrlMsg;
373
    type CtrlCmd = CtrlCmd;
374

            
375
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
376
        let () = cmd;
377
        Ok(())
378
    }
379

            
380
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
381
        let () = msg;
382
        Ok(())
383
    }
384
}
385

            
386
impl Drop for Forward {
387
40
    fn drop(&mut self) {
388
40
        if let Some(outbound) = self.outbound.as_mut() {
389
8
            // This will send a DESTROY down the outbound channel
390
8
            let _ = outbound.channel.close_circuit(outbound.circ_id);
391
32
        }
392
40
    }
393
}