1
//! Wrap [tor_cell::chancell::codec::ChannelCodec] for use with the futures_codec
2
//! crate.
3

            
4
use digest::Digest;
5
use tor_bytes::Reader;
6
use tor_cell::chancell::{
7
    AnyChanCell, ChanCell, ChanCmd, ChanMsg, codec,
8
    msg::{self, AnyChanMsg},
9
};
10
use tor_error::internal;
11
use tor_llcrypto as ll;
12

            
13
use asynchronous_codec as futures_codec;
14
use bytes::BytesMut;
15

            
16
use crate::{channel::msg::LinkVersion, util::err::Error as ChanError};
17

            
18
use super::{ChannelType, msg::MessageFilter};
19

            
20
/// An authentication rolling digest value.
21
pub(crate) type AuthLogDigest = [u8; 32];
22
/// The rolling digest for the bytes sent on a channel. (Received by the responder).
23
#[derive(Debug, PartialEq)]
24
pub(crate) struct ClogDigest(AuthLogDigest);
25
/// The rolling digest for the bytes received on a channel. (Sent by the responder).
26
#[derive(Debug, PartialEq)]
27
pub(crate) struct SlogDigest(AuthLogDigest);
28

            
29
impl ClogDigest {
30
    /// Constructor from a `AuthLogDigest`
31
    pub(crate) fn new(digest: AuthLogDigest) -> Self {
32
        Self(digest)
33
    }
34
}
35

            
36
impl SlogDigest {
37
    /// Constructor from a `AuthLogDigest`
38
8
    pub(crate) fn new(digest: AuthLogDigest) -> Self {
39
8
        Self(digest)
40
8
    }
41
}
42

            
43
impl AsRef<[u8]> for ClogDigest {
44
    fn as_ref(&self) -> &[u8] {
45
        &self.0
46
    }
47
}
48
impl AsRef<[u8]> for SlogDigest {
49
    fn as_ref(&self) -> &[u8] {
50
        &self.0
51
    }
52
}
53

            
54
/// Channel cell handler which is always in three state.
55
///
56
/// This ALWAYS starts the handler at New. This can only be constructed from a [ChannelType] which
57
/// forces it to start at New.
58
///
59
/// From the New state, it will automatically transition to the right state as information is
60
/// attached to it (ex: link protocol version).
61
pub(crate) enum ChannelCellHandler {
62
    /// When a network connection opens to another endpoint, the channel is considered "New" and
63
    /// so we use this handler to start the handshake.
64
    New(NewChannelHandler),
65
    /// We opened and negotiated a VERSIONS cell. If successful, we transition to this cell handler
66
    /// with sole purpose to handle the handshake phase.
67
    Handshake(HandshakeChannelHandler),
68
    /// Once the handshake is successful, the channel is Open and we use this handler.
69
    Open(OpenChannelHandler),
70
}
71

            
72
/// This is the only way to construct a ChannelCellHandler, from the channel type which will always
73
/// start the handler at the New state.
74
impl From<super::ChannelType> for ChannelCellHandler {
75
142
    fn from(ty: ChannelType) -> Self {
76
142
        Self::New(ty.into())
77
142
    }
78
}
79

            
80
impl ChannelCellHandler {
81
    /// Return the [`ChannelType`] of the inner handler.
82
50
    pub(crate) fn channel_type(&self) -> ChannelType {
83
50
        match self {
84
            Self::New(h) => h.channel_type,
85
            Self::Handshake(h) => h.channel_type(),
86
50
            Self::Open(h) => h.channel_type(),
87
        }
88
50
    }
89

            
90
    /// Set link protocol for this channel cell handler. This transition the handler into the
91
    /// handshake handler state.
92
    ///
93
    /// An error is returned if the current handler is NOT the New one or if the link version is
94
    /// unknown.
95
134
    pub(crate) fn set_link_version(&mut self, link_version: u16) -> Result<(), ChanError> {
96
134
        let Self::New(new_handler) = self else {
97
            return Err(ChanError::Bug(internal!(
98
                "Setting link protocol without a new handler",
99
            )));
100
        };
101
134
        *self = Self::Handshake(new_handler.next_handler(link_version.try_into()?));
102
134
        Ok(())
103
134
    }
104

            
105
    /// This transition into the open handler state.
106
    ///
107
    /// An error is returned if the current handler is NOT the Handshake one.
108
74
    pub(crate) fn set_open(&mut self) -> Result<(), ChanError> {
109
74
        let Self::Handshake(handler) = self else {
110
            return Err(ChanError::Bug(internal!(
111
                "Setting open without a handshake handler"
112
            )));
113
        };
114
74
        *self = Self::Open(handler.next_handler());
115
74
        Ok(())
116
74
    }
117

            
118
    /// Mark this handler as authenticated.
119
    ///
120
    /// This can only happen during the Handshake process as a New handler can't be authenticated
121
    /// from the start and an Open handler can only be opened after authentication.
122
    pub(crate) fn set_authenticated(&mut self) -> Result<(), ChanError> {
123
        let Self::Handshake(handler) = self else {
124
            return Err(ChanError::Bug(internal!(
125
                "Setting authenticated without a handshake handler"
126
            )));
127
        };
128
        handler.set_authenticated();
129
        Ok(())
130
    }
131

            
132
    /// The digest of bytes sent on this channel.
133
    ///
134
    /// This should only ever be called once as it consumes the send log.
135
    ///
136
    /// This will return an error if one of:
137
    /// - The channel is not recording the send log.
138
    /// - The send log digest has already been taken.
139
    /// - This cell handler is not using a handshake handler.
140
12
    pub(crate) fn take_send_log_digest(&mut self) -> Result<AuthLogDigest, ChanError> {
141
12
        if let Self::Handshake(handler) = self {
142
12
            handler
143
12
                .take_send_log_digest()
144
12
                .ok_or(ChanError::Bug(internal!(
145
12
                    "No send log digest on channel, or already taken"
146
12
                )))
147
        } else {
148
            Err(ChanError::Bug(internal!(
149
                "Getting send log digest without a handshake handler"
150
            )))
151
        }
152
12
    }
153

            
154
    /// The digest of bytes received on this channel.
155
    ///
156
    /// This should only ever be called once as it consumes the receive log.
157
    ///
158
    /// This will return `None` if one of:
159
    /// - The channel is not recording the receive log.
160
    /// - The receive log digest has already been taken.
161
    /// - This cell handler is not using a handshake handler.
162
20
    pub(crate) fn take_recv_log_digest(&mut self) -> Result<AuthLogDigest, ChanError> {
163
20
        if let Self::Handshake(handler) = self {
164
20
            handler
165
20
                .take_recv_log_digest()
166
20
                .ok_or(ChanError::Bug(internal!(
167
20
                    "No recv log digest on channel, or already taken"
168
20
                )))
169
        } else {
170
            Err(ChanError::Bug(internal!(
171
                "Getting recv log digest without a handshake handler"
172
            )))
173
        }
174
20
    }
175
}
176

            
177
// Security Consideration.
178
//
179
// Here is an explanation on why AnyChanCell is used as Item in the Handshake and Open handler and
180
// thus the higher level ChannelCellHandler.
181
//
182
// Technically, we could use a restricted message set and so the decoding and encoding wouldn't do
183
// anything if the cell/data was not part of that set.
184
//
185
// However, with relay and client, we have multiple channel types which means we have now a lot
186
// more sets of restricted message (see msg.rs) and each of them are per link protocol version, per
187
// stage of the channel opening process and per direction (inbound or outbound).
188
//
189
// To go around this, we use [MessageFilter] in order to decode on the specific restricted message
190
// set but still return a [AnyChanCell].
191
//
192
// If someone wants to contribute a more elegant solution that wouldn't require us to duplicate
193
// code for each restricted message set, by all means, go for it :).
194

            
195
impl futures_codec::Decoder for ChannelCellHandler {
196
    type Item = AnyChanCell;
197
    type Error = ChanError;
198

            
199
1168
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
200
1168
        match self {
201
256
            Self::New(c) => c
202
256
                .decode(src)
203
313
                .map(|opt| opt.map(|msg| ChanCell::new(None, msg.into()))),
204
888
            Self::Handshake(c) => c.decode(src),
205
24
            Self::Open(c) => c.decode(src),
206
        }
207
1168
    }
208
}
209

            
210
impl futures_codec::Encoder for ChannelCellHandler {
211
    type Item<'a> = AnyChanCell;
212
    type Error = ChanError;
213

            
214
176
    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
215
176
        match self {
216
104
            Self::New(c) => {
217
                // The new handler pins the only possible message to be a Versions. That is why we
218
                // extract it here and validate before else we can't pass Item to encode().
219
104
                let AnyChanMsg::Versions(versions) = item.into_circid_and_msg().1 else {
220
                    return Err(Self::Error::HandshakeProto(
221
                        "Non VERSIONS cell for new handler".into(),
222
                    ));
223
                };
224
104
                c.encode(versions, dst)
225
            }
226
60
            Self::Handshake(c) => c.encode(item, dst),
227
12
            Self::Open(c) => c.encode(item, dst),
228
        }
229
176
    }
230
}
231

            
232
/// A new channel handler used when a channel is created but before the handshake meaning there is no
233
/// link protocol version yet associated with it.
234
///
235
/// This handler only handles the VERSIONS cell.
236
pub(crate) struct NewChannelHandler {
237
    /// The channel type for this handler.
238
    channel_type: ChannelType,
239
    /// The digest of bytes sent on this channel.
240
    ///
241
    /// Will be used for the SLOG or CLOG of the AUTHENTICATE cell.
242
    send_log: Option<ll::d::Sha256>,
243
    /// The digest of bytes received on this channel.
244
    ///
245
    /// Will be used for the SLOG or CLOG of the AUTHENTICATE cell.
246
    recv_log: Option<ll::d::Sha256>,
247
}
248

            
249
impl NewChannelHandler {
250
    /// Return a handshake handler ready for the given link protocol.
251
134
    fn next_handler(&mut self, link_version: LinkVersion) -> HandshakeChannelHandler {
252
134
        HandshakeChannelHandler::new(self, link_version)
253
134
    }
254
}
255

            
256
impl From<ChannelType> for NewChannelHandler {
257
142
    fn from(channel_type: ChannelType) -> Self {
258
142
        match channel_type {
259
96
            ChannelType::ClientInitiator => Self {
260
96
                channel_type,
261
96
                send_log: None,
262
96
                recv_log: None,
263
96
            },
264
            // Relay responder might not need clog/slog but that is fine. We don't know until the
265
            // end of the handshake.
266
46
            ChannelType::RelayInitiator | ChannelType::RelayResponder { .. } => Self {
267
46
                channel_type,
268
46
                send_log: Some(ll::d::Sha256::new()),
269
46
                recv_log: Some(ll::d::Sha256::new()),
270
46
            },
271
        }
272
142
    }
273
}
274

            
275
impl futures_codec::Decoder for NewChannelHandler {
276
    type Item = msg::Versions;
277
    type Error = ChanError;
278

            
279
256
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
280
        // NOTE: Until the body can be extracted from src buffer, it MUST NOT be modified as in
281
        // advanced with the Buf trait or modified in any ways. Reason is that we can realize we
282
        // don't have enough bytes in the src buffer for the expected body length from the header
283
        // so we have to leave the src buffer untouched and wait for more bytes.
284

            
285
        // See tor-spec, starting a handshake, all cells are variable length so the first 5 bytes
286
        // are: CircId as u16, Command as u8, Length as u16 totalling 5 bytes.
287
        const HEADER_SIZE: usize = 5;
288

            
289
        // Below this amount, this is not a valid cell we can decode. This is important because we
290
        // can get an empty buffer in normal circumstances (see how Framed work) and so we have to
291
        // return that we weren't able to decode and thus no Item.
292
256
        if src.len() < HEADER_SIZE {
293
152
            return Ok(None);
294
104
        }
295

            
296
        // Get the CircID and Command from the header. This is safe due to the header size check
297
        // above.
298
104
        let circ_id = u16::from_be_bytes([src[0], src[1]]);
299
104
        if circ_id != 0 {
300
4
            return Err(Self::Error::HandshakeProto(
301
4
                "Invalid CircID in variable cell".into(),
302
4
            ));
303
100
        }
304

            
305
        // We are only expecting these specific commands. We have to do this by hand here as after
306
        // that we can use a proper codec.
307
100
        let cmd = ChanCmd::from(src[2]);
308
100
        if cmd != ChanCmd::VERSIONS {
309
            return Err(Self::Error::HandshakeProto(format!(
310
                "Invalid command {cmd} variable cell, expected a VERSIONS."
311
            )));
312
100
        }
313

            
314
        // Get the body length now from the next two bytes. This is still safe due to the first
315
        // header size check at the start.
316
100
        let body_len = u16::from_be_bytes([src[3], src[4]]) as usize;
317

            
318
        // See https://gitlab.torproject.org/tpo/core/tor/-/issues/10365. The gist is that because
319
        // version numbers are u16, an odd payload would mean we have a trailing byte that is
320
        // unused which shouldn't be and because we don't expect not controlled that byte, as maxi
321
        // precaution, we don't allow.
322
100
        if body_len % 2 == 1 {
323
            return Err(Self::Error::HandshakeProto(
324
                "VERSIONS cell body length is odd. Rejecting.".into(),
325
            ));
326
100
        }
327

            
328
        // Make sure we have enough bytes in our payload.
329
100
        let wanted_bytes = HEADER_SIZE + body_len;
330
100
        if src.len() < wanted_bytes {
331
            // We don't haven't received enough data to decode the expected length from the header
332
            // so return no Item.
333
            //
334
            // IMPORTANT: The src buffer here can't be advance before reaching this check.
335
            return Ok(None);
336
100
        }
337
        // Extract the exact data we will be looking at.
338
100
        let mut data = src.split_to(wanted_bytes);
339

            
340
        // Update the receive log digest with the entire cell up to the end of the payload hence the
341
        // data we are looking at (and not the whole source). Even on error, this doesn't matter
342
        // because if decoding fails, the channel is closed.
343
100
        if let Some(recv_log) = self.recv_log.as_mut() {
344
32
            recv_log.update(&data);
345
80
        }
346

            
347
        // Get the actual body from the data.
348
100
        let body = data.split_off(HEADER_SIZE).freeze();
349
100
        let mut reader = Reader::from_bytes(&body);
350

            
351
        // Decode the VERSIONS.
352
100
        let cell = msg::Versions::decode_from_reader(cmd, &mut reader)
353
100
            .map_err(|e| Self::Error::from_bytes_err(e, "new cell handler"))?;
354
100
        Ok(Some(cell))
355
256
    }
356
}
357

            
358
impl futures_codec::Encoder for NewChannelHandler {
359
    type Item<'a> = msg::Versions;
360
    type Error = ChanError;
361

            
362
104
    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
363
104
        let encoded_bytes = item
364
104
            .encode_for_handshake()
365
104
            .map_err(|e| Self::Error::from_bytes_enc(e, "new cell handler"))?;
366
        // Update the send log digest.
367
104
        if let Some(send_log) = self.send_log.as_mut() {
368
34
            send_log.update(&encoded_bytes);
369
82
        }
370
        // Special encoding for the VERSIONS cell.
371
104
        dst.extend_from_slice(&encoded_bytes);
372
104
        Ok(())
373
104
    }
374
}
375

            
376
/// The handshake channel handler which is used to decode and encode cells onto a channel that is
377
/// handshaking with an endpoint.
378
pub(crate) struct HandshakeChannelHandler {
379
    /// Message filter used to allow or not a certain message.
380
    filter: MessageFilter,
381
    /// The cell codec that we'll use to encode and decode our cells.
382
    inner: codec::ChannelCodec,
383
    /// The digest of bytes sent on this channel.
384
    ///
385
    /// Will be used for the SLOG or CLOG of the AUTHENTICATE cell.
386
    send_log: Option<ll::d::Sha256>,
387
    /// The digest of bytes received on this channel.
388
    ///
389
    /// Will be used for the SLOG or CLOG of the AUTHENTICATE cell.
390
    recv_log: Option<ll::d::Sha256>,
391
}
392

            
393
impl HandshakeChannelHandler {
394
    /// Constructor
395
134
    fn new(new_handler: &mut NewChannelHandler, link_version: LinkVersion) -> Self {
396
134
        Self {
397
134
            filter: MessageFilter::new(
398
134
                link_version,
399
134
                new_handler.channel_type,
400
134
                super::msg::MessageStage::Handshake,
401
134
            ),
402
134
            send_log: new_handler.send_log.take(),
403
134
            recv_log: new_handler.recv_log.take(),
404
134
            inner: codec::ChannelCodec::new(link_version.value()),
405
134
        }
406
134
    }
407

            
408
    /// Internal helper: Take a SHA256 digest and finalize it if any. None is returned if no log
409
    /// digest is given.
410
32
    fn finalize_log(log: Option<ll::d::Sha256>) -> Option<[u8; 32]> {
411
48
        log.map(|sha256| sha256.finalize().into())
412
32
    }
413

            
414
    /// Return an open handshake handler.
415
74
    fn next_handler(&mut self) -> OpenChannelHandler {
416
74
        OpenChannelHandler::new(
417
74
            self.inner
418
74
                .link_version()
419
74
                .try_into()
420
74
                .expect("Channel Codec with unknown link version"),
421
74
            self.channel_type(),
422
        )
423
74
    }
424

            
425
    /// The digest of bytes sent on this channel.
426
    ///
427
    /// This should only ever be called once as it consumes the send log.
428
    ///
429
    /// This will return `None` if one of:
430
    /// - The channel is not recording the send log.
431
    /// - The send log digest has already been taken.
432
12
    pub(crate) fn take_send_log_digest(&mut self) -> Option<AuthLogDigest> {
433
12
        Self::finalize_log(self.send_log.take())
434
12
    }
435

            
436
    /// The digest of bytes received on this channel.
437
    ///
438
    /// This should only ever be called once as it consumes the receive log.
439
    ///
440
    /// This will return `None` if one of:
441
    /// - The channel is not recording the receive log.
442
    /// - The receive log digest has already been taken.
443
20
    pub(crate) fn take_recv_log_digest(&mut self) -> Option<AuthLogDigest> {
444
20
        Self::finalize_log(self.recv_log.take())
445
20
    }
446

            
447
    /// Return the [`ChannelType`] of this handler.
448
74
    pub(crate) fn channel_type(&self) -> ChannelType {
449
74
        self.filter.channel_type()
450
74
    }
451

            
452
    /// Mark this handler as authenticated.
453
    pub(crate) fn set_authenticated(&mut self) {
454
        self.filter.channel_type_mut().set_authenticated();
455
    }
456
}
457

            
458
impl futures_codec::Encoder for HandshakeChannelHandler {
459
    type Item<'a> = AnyChanCell;
460
    type Error = ChanError;
461

            
462
60
    fn encode(
463
60
        &mut self,
464
60
        item: Self::Item<'_>,
465
60
        dst: &mut BytesMut,
466
60
    ) -> std::result::Result<(), Self::Error> {
467
60
        let before_dst_len = dst.len();
468
60
        self.filter.encode_cell(item, &mut self.inner, dst)?;
469
60
        let after_dst_len = dst.len();
470
60
        if let Some(send_log) = self.send_log.as_mut() {
471
12
            // Only use what we actually wrote. Variable length cell are not padded and thus this
472
12
            // won't catch a bunch of padding.
473
12
            send_log.update(&dst[before_dst_len..after_dst_len]);
474
60
        }
475
60
        Ok(())
476
60
    }
477
}
478

            
479
impl futures_codec::Decoder for HandshakeChannelHandler {
480
    type Item = AnyChanCell;
481
    type Error = ChanError;
482

            
483
888
    fn decode(
484
888
        &mut self,
485
888
        src: &mut BytesMut,
486
888
    ) -> std::result::Result<Option<Self::Item>, Self::Error> {
487
888
        let orig = src.clone(); // NOTE: Not fun. But This is only done during handshake.
488
888
        let cell = self.filter.decode_cell(&mut self.inner, src)?;
489
880
        if let Some(recv_log) = self.recv_log.as_mut() {
490
56
            let n_used = orig.len() - src.len();
491
56
            recv_log.update(&orig[..n_used]);
492
824
        }
493
880
        Ok(cell)
494
888
    }
495
}
496

            
497
/// The open channel handler which is used to decode and encode cells onto an open Channel.
498
pub(crate) struct OpenChannelHandler {
499
    /// Message filter used to allow or not a certain message.
500
    filter: MessageFilter,
501
    /// The cell codec that we'll use to encode and decode our cells.
502
    inner: codec::ChannelCodec,
503
}
504

            
505
impl OpenChannelHandler {
506
    /// Constructor
507
98
    fn new(link_version: LinkVersion, channel_type: ChannelType) -> Self {
508
98
        Self {
509
98
            inner: codec::ChannelCodec::new(link_version.value()),
510
98
            filter: MessageFilter::new(link_version, channel_type, super::msg::MessageStage::Open),
511
98
        }
512
98
    }
513

            
514
    /// Return the [`ChannelType`] of this handler.
515
50
    fn channel_type(&self) -> ChannelType {
516
50
        self.filter.channel_type()
517
50
    }
518
}
519

            
520
impl futures_codec::Encoder for OpenChannelHandler {
521
    type Item<'a> = AnyChanCell;
522
    type Error = ChanError;
523

            
524
12
    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
525
12
        self.filter.encode_cell(item, &mut self.inner, dst)
526
12
    }
527
}
528

            
529
impl futures_codec::Decoder for OpenChannelHandler {
530
    type Item = AnyChanCell;
531
    type Error = ChanError;
532

            
533
24
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
534
24
        self.filter.decode_cell(&mut self.inner, src)
535
24
    }
536
}
537

            
538
#[cfg(test)]
539
pub(crate) mod test {
540
    #![allow(clippy::unwrap_used)]
541
    use bytes::BytesMut;
542
    use digest::Digest;
543
    use futures::io::{AsyncRead, AsyncWrite, Cursor, Result};
544
    use futures::sink::SinkExt;
545
    use futures::stream::StreamExt;
546
    use futures::task::{Context, Poll};
547
    use hex_literal::hex;
548
    use std::pin::Pin;
549

            
550
    use tor_bytes::Writer;
551
    use tor_llcrypto as ll;
552
    use tor_rtcompat::StreamOps;
553

            
554
    use crate::channel::msg::LinkVersion;
555
    use crate::channel::{ChannelType, new_frame};
556

            
557
    use super::{ChannelCellHandler, OpenChannelHandler, futures_codec};
558
    use tor_cell::chancell::{AnyChanCell, ChanCmd, ChanMsg, CircId, msg};
559

            
560
    /// Helper type for reading and writing bytes to/from buffers.
561
    pub(crate) struct MsgBuf {
562
        /// Data we have received as a reader.
563
        inbuf: futures::io::Cursor<Vec<u8>>,
564
        /// Data we write as a writer.
565
        outbuf: futures::io::Cursor<Vec<u8>>,
566
    }
567

            
568
    impl AsyncRead for MsgBuf {
569
        fn poll_read(
570
            mut self: Pin<&mut Self>,
571
            cx: &mut Context<'_>,
572
            buf: &mut [u8],
573
        ) -> Poll<Result<usize>> {
574
            Pin::new(&mut self.inbuf).poll_read(cx, buf)
575
        }
576
    }
577
    impl AsyncWrite for MsgBuf {
578
        fn poll_write(
579
            mut self: Pin<&mut Self>,
580
            cx: &mut Context<'_>,
581
            buf: &[u8],
582
        ) -> Poll<Result<usize>> {
583
            Pin::new(&mut self.outbuf).poll_write(cx, buf)
584
        }
585
        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
586
            Pin::new(&mut self.outbuf).poll_flush(cx)
587
        }
588
        fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
589
            Pin::new(&mut self.outbuf).poll_close(cx)
590
        }
591
    }
592

            
593
    impl StreamOps for MsgBuf {}
594

            
595
    impl MsgBuf {
596
        pub(crate) fn new<T: Into<Vec<u8>>>(output: T) -> Self {
597
            let inbuf = Cursor::new(output.into());
598
            let outbuf = Cursor::new(Vec::new());
599
            MsgBuf { inbuf, outbuf }
600
        }
601

            
602
        pub(crate) fn consumed(&self) -> usize {
603
            self.inbuf.position() as usize
604
        }
605

            
606
        pub(crate) fn all_consumed(&self) -> bool {
607
            self.inbuf.get_ref().len() == self.consumed()
608
        }
609

            
610
        pub(crate) fn into_response(self) -> Vec<u8> {
611
            self.outbuf.into_inner()
612
        }
613
    }
614

            
615
    fn new_client_open_frame(mbuf: MsgBuf) -> futures_codec::Framed<MsgBuf, ChannelCellHandler> {
616
        let open_handler = ChannelCellHandler::Open(OpenChannelHandler::new(
617
            LinkVersion::V5,
618
            ChannelType::ClientInitiator,
619
        ));
620
        futures_codec::Framed::new(mbuf, open_handler)
621
    }
622

            
623
    #[test]
624
    fn check_client_encoding() {
625
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
626
            let mb = MsgBuf::new(&b""[..]);
627
            let mut framed = new_client_open_frame(mb);
628

            
629
            let destroycell = msg::Destroy::new(2.into());
630
            framed
631
                .send(AnyChanCell::new(CircId::new(7), destroycell.into()))
632
                .await
633
                .unwrap();
634

            
635
            framed.flush().await.unwrap();
636

            
637
            let data = framed.into_inner().into_response();
638

            
639
            assert_eq!(&data[0..10], &hex!("00000007 04 0200000000")[..]);
640
        });
641
    }
642

            
643
    #[test]
644
    fn check_client_decoding() {
645
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
646
            let mut dat = Vec::new();
647
            // DESTROY cell.
648
            dat.extend_from_slice(&hex!("00000007 04 0200000000")[..]);
649
            dat.resize(514, 0);
650
            let mb = MsgBuf::new(&dat[..]);
651
            let mut framed = new_client_open_frame(mb);
652

            
653
            let destroy = framed.next().await.unwrap().unwrap();
654

            
655
            let circ_id = CircId::new(7);
656
            assert_eq!(destroy.circid(), circ_id);
657
            assert_eq!(destroy.msg().cmd(), ChanCmd::DESTROY);
658

            
659
            assert!(framed.into_inner().all_consumed());
660
        });
661
    }
662

            
663
    #[test]
664
    fn handler_transition() {
665
        // Start as a client initiating a channel to a relay.
666
        let mut handler: ChannelCellHandler = ChannelType::ClientInitiator.into();
667
        assert!(matches!(handler, ChannelCellHandler::New(_)));
668

            
669
        // Set the link version protocol. Should transition to Handshake.
670
        let r = handler.set_link_version(5);
671
        assert!(r.is_ok());
672
        assert!(matches!(handler, ChannelCellHandler::Handshake(_)));
673

            
674
        // Set the link version protocol.
675
        let r = handler.set_open();
676
        assert!(r.is_ok());
677
        assert!(matches!(handler, ChannelCellHandler::Open(_)));
678
    }
679

            
680
    #[test]
681
    fn clog_digest() {
682
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
683
            let mut our_clog = ll::d::Sha256::new();
684
            let mbuf = MsgBuf::new(*b"");
685
            let mut frame = new_frame(mbuf, ChannelType::RelayInitiator);
686

            
687
            // This is a VERSIONS cell with value 5 in it.
688
            our_clog.update(hex!("0000 07 0002 0005"));
689
            let version_cell = AnyChanCell::new(
690
                None,
691
                msg::Versions::new(vec![5]).expect("Fail VERSIONS").into(),
692
            );
693
            let _ = frame.send(version_cell).await.unwrap();
694

            
695
            frame
696
                .codec_mut()
697
                .set_link_version(5)
698
                .expect("Fail link version set");
699

            
700
            // This is what an empty CERTS cell looks like.
701
            our_clog.update(hex!("0000 0000 81 0001 00"));
702
            let certs_cell = msg::Certs::new_empty();
703
            frame
704
                .send(AnyChanCell::new(None, certs_cell.into()))
705
                .await
706
                .unwrap();
707

            
708
            // Final CLOG should match.
709
            let clog_hash: [u8; 32] = our_clog.finalize().into();
710
            assert_eq!(frame.codec_mut().take_send_log_digest().unwrap(), clog_hash);
711
        });
712
    }
713

            
714
    #[test]
715
    fn slog_digest() {
716
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
717
            let mut our_slog = ll::d::Sha256::new();
718

            
719
            // Build a VERSIONS cell to start with.
720
            let mut data = BytesMut::new();
721
            data.extend_from_slice(
722
                msg::Versions::new(vec![5])
723
                    .unwrap()
724
                    .encode_for_handshake()
725
                    .expect("Fail VERSIONS encoding")
726
                    .as_slice(),
727
            );
728
            our_slog.update(&data);
729

            
730
            let mbuf = MsgBuf::new(data);
731
            let mut frame = new_frame(mbuf, ChannelType::RelayInitiator);
732

            
733
            // Receive the VERSIONS
734
            let _ = frame.next().await.transpose().expect("Fail to get cell");
735
            // Set the link version which will move the handler to Handshake state and then we'll be
736
            // able to decode the AUTH_CHALLENGE.
737
            frame
738
                .codec_mut()
739
                .set_link_version(5)
740
                .expect("Fail link version set");
741

            
742
            // Setup a new buffer for the next cell.
743
            let mut data = BytesMut::new();
744
            // This is a variable length cell with a wide circ ID of 0.
745
            data.write_u32(0);
746
            data.write_u8(ChanCmd::AUTH_CHALLENGE.into());
747
            data.write_u16(36); // This is the length of the payload.
748
            msg::AuthChallenge::new([42_u8; 32], vec![3])
749
                .encode_onto(&mut data)
750
                .expect("Fail AUTH_CHALLENGE encoding");
751
            our_slog.update(&data);
752

            
753
            // Change the I/O part of the Framed with this new buffer containing our new cell.
754
            *frame = MsgBuf::new(data);
755
            // Receive the AUTH_CHALLENGE
756
            let _ = frame.next().await.transpose().expect("Fail to get cell");
757

            
758
            // Final SLOG should match.
759
            let slog_hash: [u8; 32] = our_slog.finalize().into();
760
            assert_eq!(frame.codec_mut().take_recv_log_digest().unwrap(), slog_hash);
761
        });
762
    }
763
}