1
//! Client-side conflux message handling.
2

            
3
use std::sync::Arc;
4
use std::sync::atomic::{self, AtomicU64};
5
use std::time::{Duration, SystemTime};
6

            
7
use tor_cell::relaycell::conflux::V1Nonce;
8
use tor_cell::relaycell::msg::{ConfluxLinked, ConfluxLinkedAck, ConfluxSwitch};
9
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCmd, UnparsedRelayMsg};
10
use tor_error::{Bug, internal, warn_report};
11
use tor_rtcompat::{DynTimeProvider, SleepProvider as _};
12

            
13
use crate::Error;
14
use crate::client::HopNum;
15
use crate::client::reactor::circuit::unsupported_client_cell;
16
use crate::congestion::params::CongestionWindowParams;
17

            
18
use crate::conflux::msghandler::{AbstractConfluxMsgHandler, ConfluxCmd, ConfluxStatus};
19

            
20
/// Client-side implementation of a conflux message handler.
21
pub(crate) struct ClientConfluxMsgHandler {
22
    /// The current state this leg is in.
23
    state: ConfluxState,
24
    /// The nonce associated with the circuits from this set.
25
    nonce: V1Nonce,
26
    /// The expected conflux join point.
27
    join_point: HopNum,
28
    //// The initial round-trip time measurement,
29
    /// measured during the conflux handshake.
30
    ///
31
    /// On the client side, this is the RTT between
32
    /// `RELAY_CONFLUX_LINK` and `RELAY_CONFLUX_LINKED`.
33
    init_rtt: Option<Duration>,
34
    /// The time when the handshake was initiated.
35
    link_sent: Option<SystemTime>,
36
    /// A handle to the time provider.
37
    runtime: DynTimeProvider,
38
    /// The sequence number of the last message received on this leg.
39
    ///
40
    /// This is a *relative* number.
41
    ///
42
    /// Incremented by the [`ConfluxMsgHandler`](super::ConfluxMsgHandler::action_for_msg)
43
    /// each time a cell that counts towards sequence numbers is received on this leg.
44
    last_seq_recv: u64,
45
    /// The sequence number of the last message sent on this leg.
46
    ///
47
    /// This is a *relative* number.
48
    ///
49
    /// Incremented by the [`ConfluxMsgHandler`](super::ConfluxMsgHandler::note_cell_sent)
50
    /// each time a cell that counts towards sequence numbers is sent on this leg.
51
    last_seq_sent: u64,
52
    /// The absolute sequence number of the last message delivered to a stream.
53
    ///
54
    /// This is shared by all the circuits in a conflux set,
55
    ///
56
    /// Incremented by the [`ConfluxMsgHandler`](super::ConfluxMsgHandler::inc_last_seq_delivered)
57
    /// upon delivering the cell to its corresponding stream.
58
    last_seq_delivered: Arc<AtomicU64>,
59
    /// Whether we have processed any SWITCH cells on the leg this handler is installed on.
60
    have_seen_switch: bool,
61
    /// The number of cells that count towards the conflux seqnos
62
    /// received on this leg since the last SWITCH.
63
    cells_since_switch: usize,
64
    /// The congestion window parameters.
65
    ///
66
    /// Used for SWITCH cell validation.
67
    cwnd_params: CongestionWindowParams,
68
}
69

            
70
/// The state of a client circuit from a conflux set.
71
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
72
enum ConfluxState {
73
    /// The circuit is not linked.
74
    Unlinked,
75
    /// The LINK cell was sent, awaiting response.
76
    AwaitingLink(V1Nonce),
77
    /// The circuit is linked.
78
    Linked,
79
}
80

            
81
impl AbstractConfluxMsgHandler for ClientConfluxMsgHandler {
82
112
    fn validate_source_hop(&self, msg: &UnparsedRelayMsg, hop: HopNum) -> crate::Result<()> {
83
112
        if hop != self.join_point {
84
            return Err(Error::CircProto(format!(
85
                "Received {} cell from unexpected hop {} on client conflux circuit",
86
                msg.cmd(),
87
                hop.display(),
88
            )));
89
112
        }
90

            
91
112
        Ok(())
92
112
    }
93

            
94
112
    fn handle_msg(
95
112
        &mut self,
96
112
        msg: UnparsedRelayMsg,
97
112
        hop: HopNum,
98
112
    ) -> crate::Result<Option<ConfluxCmd>> {
99
112
        match msg.cmd() {
100
4
            RelayCmd::CONFLUX_LINK => self.handle_conflux_link(msg, hop),
101
72
            RelayCmd::CONFLUX_LINKED => self.handle_conflux_linked(msg, hop),
102
            RelayCmd::CONFLUX_LINKED_ACK => self.handle_conflux_linked_ack(msg, hop),
103
36
            RelayCmd::CONFLUX_SWITCH => self.handle_conflux_switch(msg, hop),
104
            _ => Err(internal!("received non-conflux cell in conflux handler?!").into()),
105
        }
106
112
    }
107

            
108
1420
    fn status(&self) -> ConfluxStatus {
109
1420
        match self.state {
110
104
            ConfluxState::Unlinked => ConfluxStatus::Unlinked,
111
32
            ConfluxState::AwaitingLink(_) => ConfluxStatus::Pending,
112
1284
            ConfluxState::Linked => ConfluxStatus::Linked,
113
        }
114
1420
    }
115

            
116
104
    fn note_link_sent(&mut self, ts: SystemTime) -> Result<(), Bug> {
117
104
        match self.state {
118
104
            ConfluxState::Unlinked => {
119
104
                self.state = ConfluxState::AwaitingLink(self.nonce);
120
104
            }
121
            ConfluxState::AwaitingLink(_) | ConfluxState::Linked => {
122
                return Err(internal!("Sent duplicate LINK cell?!"));
123
            }
124
        }
125

            
126
104
        self.link_sent = Some(ts);
127
104
        Ok(())
128
104
    }
129

            
130
    /// Get the wallclock time when the handshake on this circuit is supposed to time out.
131
    ///
132
    /// Returns `None` if the handshake is not currently in progress.
133
2904
    fn handshake_timeout(&self) -> Option<SystemTime> {
134
        /// The maximum amount of time to wait for the LINKED cell to arrive.
135
        //
136
        // TODO(conflux-tuning): we should pick a less arbitrary timeout
137
        //
138
        // "This timeout MUST be no larger than the normal SOCKS/stream timeout in
139
        // use for RELAY_BEGIN, but MAY be the Circuit Build Timeout value, instead.
140
        // (The C-Tor implementation currently uses Circuit Build Timeout)".
141
        const LINK_TIMEOUT: Duration = Duration::from_secs(60);
142

            
143
2904
        if matches!(self.state, ConfluxState::AwaitingLink(_)) {
144
120
            debug_assert!(
145
120
                self.link_sent.is_some(),
146
                "awaiting LINKED, but LINK not sent?!"
147
            );
148
180
            self.link_sent.map(|link_sent| link_sent + LINK_TIMEOUT)
149
        } else {
150
2784
            None
151
        }
152
2904
    }
153

            
154
    /// Returns the initial RTT measured by this handler.
155
1512
    fn init_rtt(&self) -> Option<Duration> {
156
1512
        self.init_rtt
157
1512
    }
158

            
159
124
    fn last_seq_recv(&self) -> u64 {
160
124
        self.last_seq_recv
161
124
    }
162

            
163
48
    fn last_seq_sent(&self) -> u64 {
164
48
        self.last_seq_sent
165
48
    }
166

            
167
8
    fn set_last_seq_sent(&mut self, n: u64) {
168
8
        self.last_seq_sent = n;
169
8
    }
170

            
171
76
    fn inc_last_seq_recv(&mut self) {
172
76
        self.last_seq_recv += 1;
173
76
        self.cells_since_switch += 1;
174
76
    }
175

            
176
1212
    fn inc_last_seq_sent(&mut self) {
177
1212
        self.last_seq_sent += 1;
178
1212
    }
179
}
180

            
181
impl ClientConfluxMsgHandler {
182
    /// Create a new client conflux message handler.
183
104
    pub(crate) fn new(
184
104
        join_point: HopNum,
185
104
        nonce: V1Nonce,
186
104
        last_seq_delivered: Arc<AtomicU64>,
187
104
        cwnd_params: CongestionWindowParams,
188
104
        runtime: DynTimeProvider,
189
104
    ) -> Self {
190
104
        Self {
191
104
            state: ConfluxState::Unlinked,
192
104
            nonce,
193
104
            last_seq_delivered,
194
104
            join_point,
195
104
            link_sent: None,
196
104
            runtime,
197
104
            init_rtt: None,
198
104
            last_seq_recv: 0,
199
104
            last_seq_sent: 0,
200
104
            have_seen_switch: false,
201
104
            cells_since_switch: 0,
202
104
            cwnd_params,
203
104
        }
204
104
    }
205

            
206
    /// Handle a conflux LINK request coming from the specified hop.
207
    #[allow(clippy::needless_pass_by_value)]
208
4
    fn handle_conflux_link(
209
4
        &mut self,
210
4
        msg: UnparsedRelayMsg,
211
4
        hop: HopNum,
212
4
    ) -> crate::Result<Option<ConfluxCmd>> {
213
4
        unsupported_client_cell!(msg, hop)
214
4
    }
215

            
216
    /// Handle a conflux LINKED response coming from the specified `hop`.
217
    ///
218
    /// The caller must validate the `hop` the cell originated from.
219
    ///
220
    /// To prevent [DROPMARK] attacks, this returns a protocol error
221
    /// if any of the following conditions are not met:
222
    ///
223
    ///   * this is an unlinked circuit that sent a LINK command
224
    ///   * that the nonce matches the nonce used in the LINK command
225
    ///
226
    /// [DROPMARK]: https://www.petsymposium.org/2018/files/papers/issue2/popets-2018-0011.pdf
227
72
    fn handle_conflux_linked(
228
72
        &mut self,
229
72
        msg: UnparsedRelayMsg,
230
72
        hop: HopNum,
231
72
    ) -> crate::Result<Option<ConfluxCmd>> {
232
        // See [SIDE_CHANNELS] for rules for when to reject unexpected handshake cells.
233

            
234
72
        let Some(link_sent) = self.link_sent else {
235
            return Err(Error::CircProto(
236
                "Received CONFLUX_LINKED cell before sending CONFLUX_LINK?!".into(),
237
            ));
238
        };
239

            
240
72
        let expected_nonce = match self.state {
241
            ConfluxState::Unlinked => {
242
                return Err(Error::CircProto(
243
                    "Received CONFLUX_LINKED cell before sending CONFLUX_LINK?!".into(),
244
                ));
245
            }
246
72
            ConfluxState::AwaitingLink(expected_nonce) => expected_nonce,
247
            ConfluxState::Linked => {
248
                return Err(Error::CircProto(
249
                    "Received CONFLUX_LINKED on already linked circuit".into(),
250
                ));
251
            }
252
        };
253

            
254
72
        let linked = msg
255
72
            .decode::<ConfluxLinked>()
256
72
            .map_err(|e| Error::from_bytes_err(e, "linked message"))?
257
72
            .into_msg();
258

            
259
72
        let linked_nonce = *linked.payload().nonce();
260

            
261
72
        if expected_nonce == linked_nonce {
262
68
            self.state = ConfluxState::Linked;
263
68
        } else {
264
4
            return Err(Error::CircProto(
265
4
                "Received CONFLUX_LINKED cell with mismatched nonce".into(),
266
4
            ));
267
        }
268

            
269
68
        let now = self.runtime.wallclock();
270
        // Measure the initial RTT between the time we sent the LINK and received the LINKED
271
68
        self.init_rtt = Some(now.duration_since(link_sent).unwrap_or_else(|e| {
272
            warn_report!(e, "failed to calculate initial RTT for conflux circuit",);
273

            
274
            // TODO(conflux): this is terrible, because SystemTime is not monotonic.
275
            // Can we somehow use Instant instead of SystemTime?
276
            // (DynTimeProvider doesn't have a way of sleeping until an Instant,
277
            // it only has sleep_until_wallclock)
278
            Duration::from_secs(u64::MAX)
279
        }));
280

            
281
68
        let linked_ack = ConfluxLinkedAck::default();
282
68
        let cell = AnyRelayMsgOuter::new(None, linked_ack.into());
283

            
284
68
        Ok(Some(ConfluxCmd::HandshakeComplete {
285
68
            hop,
286
68
            early: false,
287
68
            cell,
288
68
        }))
289
72
    }
290

            
291
    /// Handle a conflux LINKED acknowledgment coming from the specified hop.
292
    #[allow(clippy::needless_pass_by_value)]
293
    fn handle_conflux_linked_ack(
294
        &mut self,
295
        msg: UnparsedRelayMsg,
296
        hop: HopNum,
297
    ) -> crate::Result<Option<ConfluxCmd>> {
298
        unsupported_client_cell!(msg, hop)
299
    }
300

            
301
    /// Handle a conflux SWITCH coming from the specified hop.
302
36
    fn handle_conflux_switch(
303
36
        &mut self,
304
36
        msg: UnparsedRelayMsg,
305
36
        _hop: HopNum,
306
36
    ) -> crate::Result<Option<ConfluxCmd>> {
307
36
        if self.state != ConfluxState::Linked {
308
4
            return Err(Error::CircProto(
309
4
                "Received CONFLUX_SWITCH on unlinked circuit?!".into(),
310
4
            ));
311
32
        }
312

            
313
32
        if self.have_seen_switch && self.cells_since_switch == 0 {
314
4
            return Err(Error::CircProto(
315
4
                "Received consecutive SWITCH cells on circuit?!".into(),
316
4
            ));
317
28
        }
318

            
319
28
        let switch = msg
320
28
            .decode::<ConfluxSwitch>()
321
28
            .map_err(|e| Error::from_bytes_err(e, "switch message"))?
322
28
            .into_msg();
323

            
324
28
        let rel_seqno = switch.seqno();
325

            
326
28
        self.validate_switch_seqno(rel_seqno)?;
327

            
328
        // Update the absolute sequence number on this leg by the delta.
329
        // Since this cell is not multiplexed, we do not count it towards
330
        // absolute sequence numbers. We only increment the sequence
331
        // numbers for multiplexed cells. Hence there is no +1 here.
332
20
        self.last_seq_recv += u64::from(rel_seqno);
333
        // Note that we've received at least one SWITCH on this leg.
334
20
        self.have_seen_switch = true;
335
        // Reset our counter for the number of relevant (DATA, etc.) cells
336
        // received since the last SWITCH
337
20
        self.cells_since_switch = 0;
338

            
339
20
        Ok(None)
340
36
    }
341

            
342
    /// Validate the relative sequence number specified in a switch command.
343
    ///
344
    /// Returns an error if
345
    ///
346
    ///   * `rel_seqno` is 0 (i.e. the SWITCH cell does not actually increment
347
    ///     the `last_seq_recv` seqno on this leg)
348
    ///   * the tunnel has not yet received any data and `rel_seqno` is greater
349
    ///     than the initial congestion window,
350
28
    fn validate_switch_seqno(&self, rel_seqno: u32) -> crate::Result<()> {
351
        // The sequence number from the switch must be non-zero.
352
28
        if rel_seqno == 0 {
353
4
            return Err(Error::CircProto(
354
4
                "Received SWITCH cell with seqno = 0".into(),
355
4
            ));
356
24
        }
357

            
358
24
        let no_data = self.last_seq_delivered.load(atomic::Ordering::Acquire) == 0;
359
24
        let is_first_switch = !self.have_seen_switch;
360

            
361
        // If we haven't received any DATA cells on this tunnel,
362
        // the seqno delta from the first SWITCH can't possibly
363
        // exceed the initial congestion window.
364
24
        if no_data && is_first_switch && rel_seqno > self.cwnd_params.cwnd_init() {
365
4
            return Err(Error::CircProto(
366
4
                "SWITCH cell seqno exceeds initial cwnd".into(),
367
4
            ));
368
20
        }
369

            
370
20
        Ok(())
371
28
    }
372
}