1
//! A relay's view of the forward (away from the client, towards the exit) state of a circuit.
2

            
3
mod extend_handler;
4

            
5
use extend_handler::ExtendRequestHandler;
6

            
7
use crate::channel::{Channel, ChannelSender};
8
use crate::circuit::CircuitRxReceiver;
9
use crate::circuit::UniqId;
10
use crate::circuit::reactor::ControlHandler;
11
use crate::circuit::reactor::backward::BackwardReactorCmd;
12
use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
13
use crate::circuit::reactor::hop_mgr::HopMgr;
14
use crate::crypto::cell::OutboundRelayLayer;
15
use crate::crypto::cell::RelayCellBody;
16
use crate::relay::RelayCircChanMsg;
17
use crate::util::err::ReactorError;
18
use crate::{Error, HopNum, Result};
19

            
20
// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
21
use crate::client::circuit::padding::QueuedCellPaddingInfo;
22

            
23
use crate::relay::channel_provider::ChannelProvider;
24
use crate::relay::reactor::CircuitAccount;
25
use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
26
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
27
use tor_cell::relaycell::msg::{Extended2, SendmeTag};
28
use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
29
use tor_error::internal;
30
use tor_linkspec::OwnedChanTarget;
31
use tor_rtcompat::Runtime;
32

            
33
use futures::channel::mpsc;
34
use futures::{SinkExt as _, future};
35
use tracing::{debug, trace};
36

            
37
use std::result::Result as StdResult;
38
use std::sync::Arc;
39
use std::task::Poll;
40

            
41
/// Placeholder for our custom control message type.
42
type CtrlMsg = ();
43

            
44
/// Placeholder for our custom control command type.
45
type CtrlCmd = ();
46

            
47
/// The maximum number of RELAY_EARLY cells allowed on a circuit.
48
///
49
// TODO(relay): should we come up with a consensus parameter for this? (arti#2349)
50
const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;
51

            
52
/// Relay-specific state for the forward reactor.
53
pub(crate) struct Forward {
54
    /// An identifier for logging about this reactor's circuit.
55
    unique_id: UniqId,
56
    /// The outbound view of this circuit, if we are not the last hop.
57
    ///
58
    /// Delivers cells towards the exit.
59
    ///
60
    /// Only set for middle relays.
61
    outbound: Option<Outbound>,
62
    /// The cryptographic state for this circuit for inbound cells.
63
    crypto_out: Box<dyn OutboundRelayLayer + Send>,
64
    /// The number of RELAY_EARLY cells we have seen so far on this circuit.
65
    ///
66
    /// If we see more than [`MAX_RELAY_EARLY_CELLS_PER_CIRCUIT`] RELAY_EARLY cells, we tear down the circuit.
67
    relay_early_count: usize,
68
    /// Helper for handling circuit extension requests.
69
    ///
70
    /// Used for validating EXTEND2 cells.
71
    extend_handler: ExtendRequestHandler,
72
}
73

            
74
/// A type of event issued by the relay forward reactor.
75
pub(crate) enum CircEvent {
76
    /// The outcome of an EXTEND2 request.
77
    ExtendResult(StdResult<ExtendResult, ReactorError>),
78
}
79

            
80
/// A successful circuit extension result.
81
pub(crate) struct ExtendResult {
82
    /// The EXTENDED2 cell to send back to the client.
83
    extended2: Extended2,
84
    /// The outbound channel.
85
    outbound: Outbound,
86
    /// The reading end of the outbound Tor channel, if we are not the last hop.
87
    ///
88
    /// Yields cells moving from the exit towards the client, if we are a middle relay.
89
    outbound_chan_rx: CircuitRxReceiver,
90
}
91

            
92
/// The outbound view of a relay circuit.
93
struct Outbound {
94
    /// The circuit identifier on the outbound Tor channel.
95
    circ_id: CircId,
96
    /// The outbound Tor channel.
97
    channel: Arc<Channel>,
98
    /// The sending end of the outbound Tor channel.
99
    outbound_chan_tx: ChannelSender,
100
}
101

            
102
/// The outcome of `decode_relay_cell`.
103
enum CellDecodeResult {
104
    /// A decrypted cell.
105
    Recognized(SendmeTag, RelayCellDecoderResult),
106
    /// A cell we could not decrypt.
107
    Unrecognizd(RelayCellBody),
108
}
109

            
110
impl Forward {
111
    /// Create a new [`Forward`].
112
36
    pub(crate) fn new(
113
36
        inbound_chan: &Arc<Channel>,
114
36
        unique_id: UniqId,
115
36
        crypto_out: Box<dyn OutboundRelayLayer + Send>,
116
36
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
117
36
        event_tx: mpsc::Sender<CircEvent>,
118
36
        memquota: CircuitAccount,
119
36
    ) -> Self {
120
36
        let inbound_peer = Arc::clone(inbound_chan.peer_info());
121
36
        let extend_handler =
122
36
            ExtendRequestHandler::new(unique_id, chan_provider, inbound_peer, event_tx, memquota);
123

            
124
36
        Self {
125
36
            unique_id,
126
36
            // Initially, we are the last hop in the circuit.
127
36
            outbound: None,
128
36
            crypto_out,
129
36
            relay_early_count: 0,
130
36
            extend_handler,
131
36
        }
132
36
    }
133

            
134
    /// Decode `cell`, returning its corresponding hop number, tag and decoded body.
135
44
    fn decode_relay_cell<R: Runtime>(
136
44
        &mut self,
137
44
        hop_mgr: &mut HopMgr<R>,
138
44
        cell: Relay,
139
44
    ) -> Result<(Option<HopNum>, CellDecodeResult)> {
140
        // Note: the client reactor will return the actual source hopnum
141
44
        let hopnum = None;
142
44
        let cmd = cell.cmd();
143
44
        let mut body = cell.into_relay_body().into();
144
44
        let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
145
12
            return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
146
        };
147

            
148
        // The message is addressed to us! Now it's time to handle it...
149
32
        let mut hops = hop_mgr.hops().write().expect("poisoned lock");
150
32
        let decode_res = hops
151
32
            .get_mut(hopnum)
152
32
            .ok_or_else(|| internal!("msg from non-existent hop???"))?
153
            .inbound
154
32
            .decode(body.into())?;
155

            
156
32
        Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
157
44
    }
158

            
159
    /// Handle a DROP message.
160
    #[allow(clippy::unnecessary_wraps)] // Returns Err if circ-padding is enabled
161
    fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
162
        cfg_if::cfg_if! {
163
            if #[cfg(feature = "circ-padding")] {
164
                Err(internal!("relay circuit padding not yet supported").into())
165
            } else {
166
                Ok(())
167
            }
168
        }
169
    }
170

            
171
    /// Handle the outcome of handling an EXTEND2.
172
8
    fn handle_extend_result(
173
8
        &mut self,
174
8
        res: StdResult<ExtendResult, ReactorError>,
175
8
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
176
        let ExtendResult {
177
8
            extended2,
178
8
            outbound,
179
8
            outbound_chan_rx,
180
8
        } = res?;
181

            
182
8
        self.outbound = Some(outbound);
183

            
184
8
        Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
185
8
            hop: None,
186
8
            extended2,
187
8
            outbound_chan_rx,
188
8
        }))
189
8
    }
190

            
191
    /// Handle a RELAY or RELAY_EARLY cell.
192
44
    fn handle_relay_cell<R: Runtime>(
193
44
        &mut self,
194
44
        hop_mgr: &mut HopMgr<R>,
195
44
        cell: Relay,
196
44
        early: bool,
197
44
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
198
44
        if early {
199
20
            self.relay_early_count += 1;
200

            
201
20
            if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
202
                return Err(
203
                    Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
204
                );
205
20
            }
206
24
        }
207

            
208
44
        let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
209
44
        let (tag, decode_res) = match res {
210
12
            CellDecodeResult::Unrecognizd(body) => {
211
12
                self.handle_unrecognized_cell(body, None, early)?;
212
8
                return Ok(None);
213
            }
214
32
            CellDecodeResult::Recognized(tag, res) => (tag, res),
215
        };
216

            
217
32
        Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
218
32
            cell: decode_res,
219
32
            early,
220
32
            hopnum,
221
32
            tag,
222
32
        }))
223
44
    }
224

            
225
    /// Handle a forward cell that we could not decrypt.
226
12
    fn handle_unrecognized_cell(
227
12
        &mut self,
228
12
        body: RelayCellBody,
229
12
        info: Option<QueuedCellPaddingInfo>,
230
12
        early: bool,
231
12
    ) -> StdResult<(), ReactorError> {
232
        // TODO(relay): remove this log once we add some tests
233
        // and confirm relaying cells works as expected
234
        // (in practice it will be too noisy to be useful, even at trace level).
235
12
        trace!(
236
            circ_id = %self.unique_id,
237
            "Forwarding unrecognized cell"
238
        );
239

            
240
12
        let Some(chan) = self.outbound.as_mut() else {
241
            // The client shouldn't try to send us any cells before it gets
242
            // an EXTENDED2 cell from us
243
4
            return Err(Error::CircProto(
244
4
                "Asked to forward cell before the circuit was extended?!".into(),
245
4
            )
246
4
            .into());
247
        };
248

            
249
8
        let msg = Relay::from(BoxedCellBody::from(body));
250
8
        let relay = if early {
251
4
            AnyChanMsg::RelayEarly(msg.into())
252
        } else {
253
4
            AnyChanMsg::Relay(msg)
254
        };
255
8
        let cell = AnyChanCell::new(Some(chan.circ_id), relay);
256

            
257
        // Note: this future is always `Ready`, because we checked the sink for readiness
258
        // before polling the input channel, so await won't block.
259
8
        chan.outbound_chan_tx.start_send_unpin((cell, info))?;
260

            
261
8
        Ok(())
262
12
    }
263

            
264
    /// Handle a TRUNCATE cell.
265
4
    fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
266
        // This is not strictly spec compliant,
267
        // but since none of our implementations use TRUNCATE,
268
        // we deem it a proto violation and shut down the circuit.
269
        //
270
        // TODO(spec): codify this in the spec
271
4
        Err(Error::CircProto("TRUNCATE not allowed".into()).into())
272
4
    }
273

            
274
    /// Handle a DESTROY cell originating from the client.
275
4
    fn handle_destroy_cell(&mut self, cell: &Destroy) -> StdResult<(), ReactorError> {
276
4
        debug!(
277
            circ_id = %self.unique_id,
278
            reason = %cell.reason(),
279
            "Received outbound DESTROY, circuit shutting down",
280
        );
281

            
282
        // We don't need to send a DESTROY cell down the channel,
283
        // because that's handled implicitly by our Drop implementation
284
4
        Err(ReactorError::Shutdown)
285
4
    }
286

            
287
    /// Handle a PADDING_NEGOTIATE cell originating from the client.
288
    #[allow(clippy::needless_pass_by_value)] // TODO(relay)
289
    fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
290
        Err(internal!("PADDING_NEGOTIATE is not implemented").into())
291
    }
292
}
293

            
294
impl ForwardHandler for Forward {
295
    type BuildSpec = OwnedChanTarget;
296
    type CircChanMsg = RelayCircChanMsg;
297
    type CircEvent = CircEvent;
298

            
299
20
    async fn handle_meta_msg<R: Runtime>(
300
20
        &mut self,
301
20
        runtime: &R,
302
20
        early: bool,
303
20
        _hopnum: Option<HopNum>,
304
20
        msg: UnparsedRelayMsg,
305
20
        _relay_cell_format: RelayCellFormat,
306
20
    ) -> StdResult<(), ReactorError> {
307
20
        match msg.cmd() {
308
            RelayCmd::DROP => self.handle_drop(),
309
16
            RelayCmd::EXTEND2 => self.extend_handler.handle_extend2(runtime, early, msg),
310
4
            RelayCmd::TRUNCATE => self.handle_truncate(),
311
            cmd => Err(internal!("relay cmd {cmd} not supported").into()),
312
        }
313
20
    }
314

            
315
48
    async fn handle_forward_cell<R: Runtime>(
316
48
        &mut self,
317
48
        hop_mgr: &mut HopMgr<R>,
318
48
        cell: RelayCircChanMsg,
319
48
    ) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
320
        use RelayCircChanMsg::*;
321

            
322
48
        match cell {
323
24
            Relay(r) => self.handle_relay_cell(hop_mgr, r, false),
324
20
            RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into(), true),
325
4
            Destroy(d) => {
326
4
                self.handle_destroy_cell(&d)?;
327
                Ok(None)
328
            }
329
            PaddingNegotiate(p) => {
330
                self.handle_padding_negotiate(p)?;
331
                Ok(None)
332
            }
333
        }
334
48
    }
335

            
336
8
    fn handle_event(
337
8
        &mut self,
338
8
        event: Self::CircEvent,
339
8
    ) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
340
8
        match event {
341
8
            CircEvent::ExtendResult(res) => self.handle_extend_result(res),
342
        }
343
8
    }
344

            
345
111
    async fn outbound_chan_ready(&mut self) -> Result<()> {
346
74
        future::poll_fn(|cx| match &mut self.outbound {
347
16
            Some(chan) => {
348
16
                let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);
349

            
350
16
                chan.outbound_chan_tx.poll_ready_unpin(cx)
351
            }
352
            None => {
353
                // Pedantically, if the channel doesn't exist, it can't be ready,
354
                // but we have no choice here than to return Ready
355
                // (returning Pending would cause the reactor to lock up).
356
                //
357
                // Returning ready here means the base reactor is allowed to read
358
                // from its inbound channel. This is OK, because if we *do*
359
                // read a cell from that channel and find ourselves needing to
360
                // forward it to the next hop, we simply return a proto violation error,
361
                // shutting down the reactor.
362
58
                Poll::Ready(Ok(()))
363
            }
364
74
        })
365
74
        .await
366
74
    }
367
}
368

            
369
impl ControlHandler for Forward {
370
    type CtrlMsg = CtrlMsg;
371
    type CtrlCmd = CtrlCmd;
372

            
373
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
374
        let () = cmd;
375
        Ok(())
376
    }
377

            
378
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
379
        let () = msg;
380
        Ok(())
381
    }
382
}
383

            
384
impl Drop for Forward {
385
36
    fn drop(&mut self) {
386
36
        if let Some(outbound) = self.outbound.as_mut() {
387
8
            // This will send a DESTROY down the outbound channel
388
8
            let _ = outbound.channel.close_circuit(outbound.circ_id);
389
28
        }
390
36
    }
391
}