1
//! A conflux-aware message handler.
2

            
3
use std::cmp::Ordering;
4
use std::sync::Arc;
5
use std::sync::atomic::{self, AtomicU64};
6
use std::time::{Duration, SystemTime};
7

            
8
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd, StreamId, UnparsedRelayMsg};
9
use tor_error::{Bug, internal};
10

            
11
use crate::Error;
12
use crate::crypto::cell::HopNum;
13

            
14
/// Cell handler for conflux cells.
15
///
16
/// One per Circuit.
17
//
18
// Note: this is not a `MetaCellHandler` because we need a slightly different API here.
19
// Perhaps we should redesign `MetaCellHandler` API to make it work for this too?
20
pub(crate) struct ConfluxMsgHandler {
21
    /// Inner message handler
22
    ///
23
    /// Customizes the cell handling logic,
24
    /// because clients and exits behave differently.
25
    ///
26
    /// TODO: can/should we avoid dynamic dispatch here?
27
    handler: Box<dyn AbstractConfluxMsgHandler + Send + Sync>,
28
    /// The absolute sequence number of the last message delivered to a stream.
29
    ///
30
    /// This is shared by all the circuits in a conflux set.
31
    last_seq_delivered: Arc<AtomicU64>,
32
}
33

            
34
impl ConfluxMsgHandler {
35
    /// Create a new message handler using the specified [`AbstractConfluxMsgHandler`].
36
    ///
37
    /// Clients and relays both use this function.
38
    ///
39
    // TODO(relay): exits will need to implement their own AbstractConfluxMsgHandler
40
104
    pub(crate) fn new(
41
104
        handler: Box<dyn AbstractConfluxMsgHandler + Send + Sync>,
42
104
        last_seq_delivered: Arc<AtomicU64>,
43
104
    ) -> Self {
44
104
        Self {
45
104
            handler,
46
104
            last_seq_delivered,
47
104
        }
48
104
    }
49

            
50
    /// Validate the specified source hop of a conflux cell.
51
    ///
52
    /// The client impl of this function returns an error if the hop is not the last hop.
53
    ///
54
    /// The exit impl of this function returns an error if the source hop is not the last hop,
55
    /// or if there are further hops after the source hop.
56
112
    fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()> {
57
112
        self.handler.validate_source_hop(msg, hop)
58
112
    }
59

            
60
    /// Handle the specified conflux `msg`.
61
112
    pub(crate) fn handle_conflux_msg(
62
112
        &mut self,
63
112
        msg: UnparsedRelayMsg,
64
112
        hop: HopNum,
65
112
    ) -> Option<ConfluxCmd> {
66
168
        let res = (|| {
67
            // Ensure the conflux cell came from the expected hop
68
            // (see 4.2.1. Cell Injection Side Channel Mitigations in prop329).
69
112
            let () = self.validate_source_hop(&msg, hop)?;
70
112
            self.handler.handle_msg(msg, hop)
71
        })();
72

            
73
        // Returning an error here would cause the reactor to shut down,
74
        // so we return a ConfluxCmd::RemoveLeg command instead.
75
        //
76
        // After removing the leg, the reactor will decide whether it needs
77
        // to shut down or not.
78
112
        match res {
79
88
            Ok(cmd) => cmd,
80
24
            Err(e) => {
81
                // Tell the reactor to remove this leg from the conflux set,
82
                // and to notify the handshake initiator of the error
83
24
                Some(ConfluxCmd::RemoveLeg(RemoveLegReason::ConfluxHandshakeErr(
84
24
                    e,
85
24
                )))
86
            }
87
        }
88
112
    }
89

            
90
    /// Client-only: note that the LINK cell was sent.
91
    ///
92
    /// Used for the initial RTT measurement.
93
104
    pub(crate) fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug> {
94
104
        self.handler.note_link_sent(ts)
95
104
    }
96

            
97
    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
98
    ///
99
    /// Returns `None` if the handshake is not currently in progress.
100
2904
    pub(crate) fn handshake_timeout(&self) -> Option<SystemTime> {
101
2904
        self.handler.handshake_timeout()
102
2904
    }
103

            
104
    /// Returns the conflux status of this handler.
105
1420
    pub(crate) fn status(&self) -> ConfluxStatus {
106
1420
        self.handler.status()
107
1420
    }
108

            
109
    /// Check our sequence numbers to see if the current msg is in order.
110
    ///
111
    /// Returns an internal error if the relative seqno is lower than the absolute seqno.
112
76
    fn is_msg_in_order(&self) -> Result<bool, Bug> {
113
76
        let last_seq_delivered = self.last_seq_delivered.load(atomic::Ordering::Acquire);
114
76
        match self.handler.last_seq_recv().cmp(&(last_seq_delivered + 1)) {
115
            Ordering::Less => {
116
                // Our internal accounting is busted!
117
                Err(internal!(
118
                    "Got a conflux cell with a sequence number less than the last delivered"
119
                ))
120
            }
121
60
            Ordering::Equal => Ok(true),
122
16
            Ordering::Greater => Ok(false),
123
        }
124
76
    }
125

            
126
    /// Return a [`OooRelayMsg`] for the reactor to buffer.
127
16
    fn prepare_ooo_entry(
128
16
        &self,
129
16
        hopnum: HopNum,
130
16
        cell_counts_towards_windows: bool,
131
16
        streamid: StreamId,
132
16
        msg: UnparsedRelayMsg,
133
16
    ) -> OooRelayMsg {
134
16
        OooRelayMsg {
135
16
            seqno: self.handler.last_seq_recv(),
136
16
            hopnum,
137
16
            cell_counts_towards_windows,
138
16
            streamid,
139
16
            msg,
140
16
        }
141
16
    }
142

            
143
    /// Check the sequence number of the specified `msg`,
144
    /// and decide whether it should be delivered or buffered.
145
    #[cfg(feature = "conflux")]
146
76
    pub(crate) fn action_for_msg(
147
76
        &mut self,
148
76
        hopnum: HopNum,
149
76
        cell_counts_towards_windows: bool,
150
76
        streamid: StreamId,
151
76
        msg: UnparsedRelayMsg,
152
76
    ) -> Result<ConfluxAction, Bug> {
153
76
        if !super::cmd_counts_towards_seqno(msg.cmd()) {
154
            return Ok(ConfluxAction::Deliver(msg));
155
76
        }
156

            
157
        // Increment the relative seqno on this leg.
158
76
        self.handler.inc_last_seq_recv();
159

            
160
76
        let action = if self.is_msg_in_order()? {
161
60
            ConfluxAction::Deliver(msg)
162
        } else {
163
16
            ConfluxAction::Enqueue(self.prepare_ooo_entry(
164
16
                hopnum,
165
16
                cell_counts_towards_windows,
166
16
                streamid,
167
16
                msg,
168
16
            ))
169
        };
170

            
171
76
        Ok(action)
172
76
    }
173

            
174
    /// Increment the absolute "delivered" seqno for this conflux set
175
    /// if the specified message counts towards sequence numbers.
176
76
    pub(crate) fn inc_last_seq_delivered(&self, msg: &UnparsedRelayMsg) {
177
76
        if super::cmd_counts_towards_seqno(msg.cmd()) {
178
76
            self.last_seq_delivered
179
76
                .fetch_add(1, atomic::Ordering::AcqRel);
180
76
        }
181
76
    }
182

            
183
    /// Returns the initial RTT measured by this handler.
184
1512
    pub(crate) fn init_rtt(&self) -> Option<Duration> {
185
1512
        self.handler.init_rtt()
186
1512
    }
187

            
188
    /// Return the sequence number of the last message sent on this leg.
189
48
    pub(crate) fn last_seq_sent(&self) -> u64 {
190
48
        self.handler.last_seq_sent()
191
48
    }
192

            
193
    /// Set the sequence number of the last message sent on this leg.
194
8
    pub(crate) fn set_last_seq_sent(&mut self, n: u64) {
195
8
        self.handler.set_last_seq_sent(n);
196
8
    }
197

            
198
    /// Return the sequence number of the last message received on this leg.
199
32
    pub(crate) fn last_seq_recv(&self) -> u64 {
200
32
        self.handler.last_seq_recv()
201
32
    }
202

            
203
    /// Note that a cell has been sent.
204
    ///
205
    /// Updates the internal sequence numbers.
206
1388
    pub(crate) fn note_cell_sent(&mut self, cmd: RelayCmd) {
207
1388
        if super::cmd_counts_towards_seqno(cmd) {
208
1212
            self.handler.inc_last_seq_sent();
209
1212
        }
210
1388
    }
211
}
212

            
213
/// An action to take after processing a potentially out of order message.
214
#[derive(Debug)]
215
#[cfg(feature = "conflux")]
216
pub(crate) enum ConfluxAction {
217
    /// Deliver the message to its corresponding stream
218
    Deliver(UnparsedRelayMsg),
219
    /// Enqueue the specified entry in the out-of-order queue.
220
    Enqueue(OooRelayMsg),
221
}
222

            
223
/// An object that can process conflux relay messages
224
/// and manage the conflux state of a circuit.
225
///
226
/// This is indirectly used by the circuit reactor (via `ConfluxSet`)
227
/// for conflux handling.
228
pub(crate) trait AbstractConfluxMsgHandler {
229
    /// Validate the specified source hop of a conflux cell.
230
    fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()>;
231

            
232
    /// Handle the specified conflux `msg`.
233
    fn handle_msg(
234
        &mut self,
235
        msg: UnparsedRelayMsg,
236
        hop: HopNum,
237
    ) -> crate::Result<Option<ConfluxCmd>>;
238

            
239
    /// Returns the conflux status of this handler.
240
    fn status(&self) -> ConfluxStatus;
241

            
242
    /// Client-only: note that the LINK cell was sent.
243
    fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug>;
244

            
245
    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
246
    ///
247
    /// Returns `None` if the handshake is not currently in progress.
248
    fn handshake_timeout(&self) -> Option<SystemTime>;
249

            
250
    /// Returns the initial RTT measured by this handler.
251
    fn init_rtt(&self) -> Option<Duration>;
252

            
253
    /// Return the sequence number of the last message received on this leg.
254
    fn last_seq_recv(&self) -> u64;
255

            
256
    /// Return the sequence number of the last message sent on this leg.
257
    fn last_seq_sent(&self) -> u64;
258

            
259
    /// Set the sequence number of the last message sent on this leg.
260
    fn set_last_seq_sent(&mut self, n: u64);
261

            
262
    /// Increment the sequence number of the last message received on this leg.
263
    fn inc_last_seq_recv(&mut self);
264

            
265
    /// Increment the sequence number of the last message sent on this leg.
266
    fn inc_last_seq_sent(&mut self);
267
}
268

            
269
/// An out-of-order message.
270
#[derive(Debug)]
271
pub(crate) struct OooRelayMsg {
272
    /// The sequence number of the message.
273
    pub(crate) seqno: u64,
274
    /// The hop this message originated from.
275
    pub(crate) hopnum: HopNum,
276
    /// Whether the cell this message originated from counts towards
277
    /// the stream-level SENDME window.
278
    ///
279
    /// See "SENDME window accounting" in prop340.
280
    pub(crate) cell_counts_towards_windows: bool,
281
    /// The stream ID of this message.
282
    pub(crate) streamid: StreamId,
283
    /// The actual message.
284
    pub(crate) msg: UnparsedRelayMsg,
285
}
286

            
287
impl Ord for OooRelayMsg {
288
4
    fn cmp(&self, other: &Self) -> Ordering {
289
4
        self.seqno.cmp(&other.seqno).reverse()
290
4
    }
291
}
292

            
293
impl PartialOrd for OooRelayMsg {
294
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
295
        Some(self.cmp(other))
296
    }
297
}
298

            
299
impl PartialEq for OooRelayMsg {
300
    fn eq(&self, other: &Self) -> bool {
301
        self.seqno == other.seqno
302
    }
303
}
304

            
305
impl Eq for OooRelayMsg {}
306

            
307
/// The outcome of [`AbstractConfluxMsgHandler::handle_msg`].
308
#[derive(Debug)]
309
pub(crate) enum ConfluxCmd {
310
    /// Remove this circuit from the conflux set.
311
    ///
312
    /// Returned by `ConfluxMsgHandler::handle_conflux_msg` for invalid messages
313
    /// (originating from wrong hop), and for messages that are rejected
314
    /// by its inner `AbstractMsgHandler`.
315
    RemoveLeg(RemoveLegReason),
316

            
317
    /// This circuit has completed the conflux handshake,
318
    /// and wants to send the specified cell.
319
    ///
320
    /// Returned by an `AbstractMsgHandler` to signal to the reactor that
321
    /// the conflux handshake is complete.
322
    HandshakeComplete {
323
        /// The hop number.
324
        hop: HopNum,
325
        /// Whether to use a RELAY_EARLY cell.
326
        early: bool,
327
        /// The cell to send.
328
        cell: AnyRelayMsgOuter,
329
    },
330
}
331

            
332
/// The reason for removing a circuit leg from the conflux set.
333
#[derive(Debug, derive_more::Display)]
334
pub(crate) enum RemoveLegReason {
335
    /// The conflux handshake timed out.
336
    ///
337
    /// On the client-side, this means we didn't receive
338
    /// the CONFLUX_LINKED response in time.
339
    #[display("conflux handshake timed out")]
340
    ConfluxHandshakeTimeout,
341
    /// An error occurred during conflux handshake.
342
    #[display("{}", _0)]
343
    ConfluxHandshakeErr(Error),
344
    /// The channel was closed.
345
    #[display("channel closed")]
346
    ChannelClosed,
347
}
348

            
349
/// The conflux status of a conflux circuit.
350
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
351
pub(crate) enum ConfluxStatus {
352
    /// Circuit has not begun the conflux handshake yet.
353
    Unlinked,
354
    /// Conflux handshake is in progress.
355
    Pending,
356
    /// A linked conflux circuit.
357
    Linked,
358
}