1
//! Wrap [tor_cell::chancell::codec::ChannelCodec] for use with the asynchronous_codec
2
//! crate.
3

            
4
use digest::Digest;
5
use tor_bytes::Reader;
6
use tor_cell::chancell::{
7
    AnyChanCell, ChanCell, ChanCmd, ChanMsg, codec,
8
    msg::{self, AnyChanMsg},
9
};
10
use tor_error::internal;
11
use tor_llcrypto as ll;
12

            
13
use bytes::BytesMut;
14

            
15
use crate::{channel::msg::LinkVersion, util::err::Error as ChanError};
16

            
17
use super::{ChannelType, msg::MessageFilter};
18

            
19
/// An authentication rolling digest value.
20
pub(crate) type AuthLogDigest = [u8; 32];
21
/// The rolling digest for the bytes sent on a channel. (Received by the responder).
22
#[derive(Debug, PartialEq)]
23
pub(crate) struct ClogDigest(AuthLogDigest);
24
/// The rolling digest for the bytes received on a channel. (Sent by the responder).
25
#[derive(Debug, PartialEq)]
26
pub(crate) struct SlogDigest(AuthLogDigest);
27

            
28
impl ClogDigest {
29
    /// Constructor from a `AuthLogDigest`
30
    pub(crate) fn new(digest: AuthLogDigest) -> Self {
31
        Self(digest)
32
    }
33
}
34

            
35
impl SlogDigest {
36
    /// Constructor from a `AuthLogDigest`
37
8
    pub(crate) fn new(digest: AuthLogDigest) -> Self {
38
8
        Self(digest)
39
8
    }
40
}
41

            
42
impl AsRef<[u8]> for ClogDigest {
43
    fn as_ref(&self) -> &[u8] {
44
        &self.0
45
    }
46
}
47
impl AsRef<[u8]> for SlogDigest {
48
    fn as_ref(&self) -> &[u8] {
49
        &self.0
50
    }
51
}
52

            
53
/// Channel cell handler which is always in three state.
54
///
55
/// This ALWAYS starts the handler at New. This can only be constructed from a [ChannelType] which
56
/// forces it to start at New.
57
///
58
/// From the New state, it will automatically transition to the right state as information is
59
/// attached to it (ex: link protocol version).
60
pub(crate) enum ChannelCellHandler {
61
    /// When a network connection opens to another endpoint, the channel is considered "New" and
62
    /// so we use this handler to start the handshake.
63
    New(NewChannelHandler),
64
    /// We opened and negotiated a VERSIONS cell. If successful, we transition to this cell handler
65
    /// with sole purpose to handle the handshake phase.
66
    Handshake(HandshakeChannelHandler),
67
    /// Once the handshake is successful, the channel is Open and we use this handler.
68
    Open(OpenChannelHandler),
69
}
70

            
71
/// This is the only way to construct a ChannelCellHandler, from the channel type which will always
72
/// start the handler at the New state.
73
impl From<super::ChannelType> for ChannelCellHandler {
74
145
    fn from(ty: ChannelType) -> Self {
75
145
        Self::New(ty.into())
76
145
    }
77
}
78

            
79
impl ChannelCellHandler {
80
    /// Return the [`ChannelType`] of the inner handler.
81
53
    pub(crate) fn channel_type(&self) -> ChannelType {
82
53
        match self {
83
            Self::New(h) => h.channel_type,
84
            Self::Handshake(h) => h.channel_type(),
85
53
            Self::Open(h) => h.channel_type(),
86
        }
87
53
    }
88

            
89
    /// Set link protocol for this channel cell handler. This transition the handler into the
90
    /// handshake handler state.
91
    ///
92
    /// An error is returned if the current handler is NOT the New one or if the link version is
93
    /// unknown.
94
137
    pub(crate) fn set_link_version(&mut self, link_version: u16) -> Result<(), ChanError> {
95
137
        let Self::New(new_handler) = self else {
96
            return Err(ChanError::Bug(internal!(
97
                "Setting link protocol without a new handler",
98
            )));
99
        };
100
137
        *self = Self::Handshake(new_handler.next_handler(link_version.try_into()?));
101
137
        Ok(())
102
137
    }
103

            
104
    /// This transition into the open handler state.
105
    ///
106
    /// An error is returned if the current handler is NOT the Handshake one.
107
77
    pub(crate) fn set_open(&mut self) -> Result<(), ChanError> {
108
77
        let Self::Handshake(handler) = self else {
109
            return Err(ChanError::Bug(internal!(
110
                "Setting open without a handshake handler"
111
            )));
112
        };
113
77
        *self = Self::Open(handler.next_handler());
114
77
        Ok(())
115
77
    }
116

            
117
    /// Mark this handler as authenticated.
118
    ///
119
    /// This can only happen during the Handshake process as a New handler can't be authenticated
120
    /// from the start and an Open handler can only be opened after authentication.
121
    pub(crate) fn set_authenticated(&mut self) -> Result<(), ChanError> {
122
        let Self::Handshake(handler) = self else {
123
            return Err(ChanError::Bug(internal!(
124
                "Setting authenticated without a handshake handler"
125
            )));
126
        };
127
        handler.set_authenticated();
128
        Ok(())
129
    }
130

            
131
    /// The digest of bytes sent on this channel.
132
    ///
133
    /// This should only ever be called once as it consumes the send log.
134
    ///
135
    /// This will return an error if one of:
136
    /// - The channel is not recording the send log.
137
    /// - The send log digest has already been taken.
138
    /// - This cell handler is not using a handshake handler.
139
12
    pub(crate) fn take_send_log_digest(&mut self) -> Result<AuthLogDigest, ChanError> {
140
12
        if let Self::Handshake(handler) = self {
141
12
            handler
142
12
                .take_send_log_digest()
143
12
                .ok_or(ChanError::Bug(internal!(
144
12
                    "No send log digest on channel, or already taken"
145
12
                )))
146
        } else {
147
            Err(ChanError::Bug(internal!(
148
                "Getting send log digest without a handshake handler"
149
            )))
150
        }
151
12
    }
152

            
153
    /// The digest of bytes received on this channel.
154
    ///
155
    /// This should only ever be called once as it consumes the receive log.
156
    ///
157
    /// This will return `None` if one of:
158
    /// - The channel is not recording the receive log.
159
    /// - The receive log digest has already been taken.
160
    /// - This cell handler is not using a handshake handler.
161
20
    pub(crate) fn take_recv_log_digest(&mut self) -> Result<AuthLogDigest, ChanError> {
162
20
        if let Self::Handshake(handler) = self {
163
20
            handler
164
20
                .take_recv_log_digest()
165
20
                .ok_or(ChanError::Bug(internal!(
166
20
                    "No recv log digest on channel, or already taken"
167
20
                )))
168
        } else {
169
            Err(ChanError::Bug(internal!(
170
                "Getting recv log digest without a handshake handler"
171
            )))
172
        }
173
20
    }
174
}
175

            
176
// Security Consideration.
177
//
178
// Here is an explanation on why AnyChanCell is used as Item in the Handshake and Open handler and
179
// thus the higher level ChannelCellHandler.
180
//
181
// Technically, we could use a restricted message set and so the decoding and encoding wouldn't do
182
// anything if the cell/data was not part of that set.
183
//
184
// However, with relay and client, we have multiple channel types which means we have now a lot
185
// more sets of restricted message (see msg.rs) and each of them are per link protocol version, per
186
// stage of the channel opening process and per direction (inbound or outbound).
187
//
188
// To go around this, we use [MessageFilter] in order to decode on the specific restricted message
189
// set but still return a [AnyChanCell].
190
//
191
// If someone wants to contribute a more elegant solution that wouldn't require us to duplicate
192
// code for each restricted message set, by all means, go for it :).
193

            
194
impl asynchronous_codec::Decoder for ChannelCellHandler {
195
    type Item = AnyChanCell;
196
    type Error = ChanError;
197

            
198
1225
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
199
1225
        match self {
200
265
            Self::New(c) => c
201
265
                .decode(src)
202
322
                .map(|opt| opt.map(|msg| ChanCell::new(None, msg.into()))),
203
936
            Self::Handshake(c) => c.decode(src),
204
24
            Self::Open(c) => c.decode(src),
205
        }
206
1225
    }
207
}
208

            
209
impl asynchronous_codec::Encoder for ChannelCellHandler {
210
    type Item<'a> = AnyChanCell;
211
    type Error = ChanError;
212

            
213
182
    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
214
182
        match self {
215
107
            Self::New(c) => {
216
                // The new handler pins the only possible message to be a Versions. That is why we
217
                // extract it here and validate before else we can't pass Item to encode().
218
107
                let AnyChanMsg::Versions(versions) = item.into_circid_and_msg().1 else {
219
                    return Err(Self::Error::HandshakeProto(
220
                        "Non VERSIONS cell for new handler".into(),
221
                    ));
222
                };
223
107
                c.encode(versions, dst)
224
            }
225
63
            Self::Handshake(c) => c.encode(item, dst),
226
12
            Self::Open(c) => c.encode(item, dst),
227
        }
228
182
    }
229
}
230

            
231
/// A new channel handler used when a channel is created but before the handshake meaning there is no
232
/// link protocol version yet associated with it.
233
///
234
/// This handler only handles the VERSIONS cell.
235
pub(crate) struct NewChannelHandler {
236
    /// The channel type for this handler.
237
    channel_type: ChannelType,
238
    /// The digest of bytes sent on this channel.
239
    ///
240
    /// Will be used for the SLOG or CLOG of the AUTHENTICATE cell.
241
    send_log: Option<ll::d::Sha256>,
242
    /// The digest of bytes received on this channel.
243
    ///
244
    /// Will be used for the SLOG or CLOG of the AUTHENTICATE cell.
245
    recv_log: Option<ll::d::Sha256>,
246
}
247

            
248
impl NewChannelHandler {
249
    /// Return a handshake handler ready for the given link protocol.
250
137
    fn next_handler(&mut self, link_version: LinkVersion) -> HandshakeChannelHandler {
251
137
        HandshakeChannelHandler::new(self, link_version)
252
137
    }
253
}
254

            
255
impl From<ChannelType> for NewChannelHandler {
256
145
    fn from(channel_type: ChannelType) -> Self {
257
145
        match channel_type {
258
99
            ChannelType::ClientInitiator => Self {
259
99
                channel_type,
260
99
                send_log: None,
261
99
                recv_log: None,
262
99
            },
263
            // Relay responder might not need clog/slog but that is fine. We don't know until the
264
            // end of the handshake.
265
46
            ChannelType::RelayInitiator | ChannelType::RelayResponder { .. } => Self {
266
46
                channel_type,
267
46
                send_log: Some(ll::d::Sha256::new()),
268
46
                recv_log: Some(ll::d::Sha256::new()),
269
46
            },
270
        }
271
145
    }
272
}
273

            
274
impl asynchronous_codec::Decoder for NewChannelHandler {
275
    type Item = msg::Versions;
276
    type Error = ChanError;
277

            
278
265
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
279
        // NOTE: Until the body can be extracted from src buffer, it MUST NOT be modified as in
280
        // advanced with the Buf trait or modified in any ways. Reason is that we can realize we
281
        // don't have enough bytes in the src buffer for the expected body length from the header
282
        // so we have to leave the src buffer untouched and wait for more bytes.
283

            
284
        // See tor-spec, starting a handshake, all cells are variable length so the first 5 bytes
285
        // are: CircId as u16, Command as u8, Length as u16 totalling 5 bytes.
286
        const HEADER_SIZE: usize = 5;
287

            
288
        // Below this amount, this is not a valid cell we can decode. This is important because we
289
        // can get an empty buffer in normal circumstances (see how Framed work) and so we have to
290
        // return that we weren't able to decode and thus no Item.
291
265
        if src.len() < HEADER_SIZE {
292
158
            return Ok(None);
293
107
        }
294

            
295
        // Get the CircID and Command from the header. This is safe due to the header size check
296
        // above.
297
107
        let circ_id = u16::from_be_bytes([src[0], src[1]]);
298
107
        if circ_id != 0 {
299
4
            return Err(Self::Error::HandshakeProto(
300
4
                "Invalid CircID in variable cell".into(),
301
4
            ));
302
103
        }
303

            
304
        // We are only expecting these specific commands. We have to do this by hand here as after
305
        // that we can use a proper codec.
306
103
        let cmd = ChanCmd::from(src[2]);
307
103
        if cmd != ChanCmd::VERSIONS {
308
            return Err(Self::Error::HandshakeProto(format!(
309
                "Invalid command {cmd} variable cell, expected a VERSIONS."
310
            )));
311
103
        }
312

            
313
        // Get the body length now from the next two bytes. This is still safe due to the first
314
        // header size check at the start.
315
103
        let body_len = u16::from_be_bytes([src[3], src[4]]) as usize;
316

            
317
        // See https://gitlab.torproject.org/tpo/core/tor/-/issues/10365. The gist is that because
318
        // version numbers are u16, an odd payload would mean we have a trailing byte that is
319
        // unused which shouldn't be and because we don't expect not controlled that byte, as maxi
320
        // precaution, we don't allow.
321
103
        if body_len % 2 == 1 {
322
            return Err(Self::Error::HandshakeProto(
323
                "VERSIONS cell body length is odd. Rejecting.".into(),
324
            ));
325
103
        }
326

            
327
        // Make sure we have enough bytes in our payload.
328
103
        let wanted_bytes = HEADER_SIZE + body_len;
329
103
        if src.len() < wanted_bytes {
330
            // We don't haven't received enough data to decode the expected length from the header
331
            // so return no Item.
332
            //
333
            // IMPORTANT: The src buffer here can't be advance before reaching this check.
334
            return Ok(None);
335
103
        }
336
        // Extract the exact data we will be looking at.
337
103
        let mut data = src.split_to(wanted_bytes);
338

            
339
        // Update the receive log digest with the entire cell up to the end of the payload hence the
340
        // data we are looking at (and not the whole source). Even on error, this doesn't matter
341
        // because if decoding fails, the channel is closed.
342
103
        if let Some(recv_log) = self.recv_log.as_mut() {
343
32
            recv_log.update(&data);
344
83
        }
345

            
346
        // Get the actual body from the data.
347
103
        let body = data.split_off(HEADER_SIZE).freeze();
348
103
        let mut reader = Reader::from_bytes(&body);
349

            
350
        // Decode the VERSIONS.
351
103
        let cell = msg::Versions::decode_from_reader(cmd, &mut reader)
352
103
            .map_err(|e| Self::Error::from_bytes_err(e, "new cell handler"))?;
353
103
        Ok(Some(cell))
354
265
    }
355
}
356

            
357
impl asynchronous_codec::Encoder for NewChannelHandler {
358
    type Item<'a> = msg::Versions;
359
    type Error = ChanError;
360

            
361
107
    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
362
107
        let encoded_bytes = item
363
107
            .encode_for_handshake()
364
107
            .map_err(|e| Self::Error::from_bytes_enc(e, "new cell handler"))?;
365
        // Update the send log digest.
366
107
        if let Some(send_log) = self.send_log.as_mut() {
367
34
            send_log.update(&encoded_bytes);
368
85
        }
369
        // Special encoding for the VERSIONS cell.
370
107
        dst.extend_from_slice(&encoded_bytes);
371
107
        Ok(())
372
107
    }
373
}
374

            
375
/// The handshake channel handler which is used to decode and encode cells onto a channel that is
376
/// handshaking with an endpoint.
377
pub(crate) struct HandshakeChannelHandler {
378
    /// Message filter used to allow or not a certain message.
379
    filter: MessageFilter,
380
    /// The cell codec that we'll use to encode and decode our cells.
381
    inner: codec::ChannelCodec,
382
    /// The digest of bytes sent on this channel.
383
    ///
384
    /// Will be used for the SLOG or CLOG of the AUTHENTICATE cell.
385
    send_log: Option<ll::d::Sha256>,
386
    /// The digest of bytes received on this channel.
387
    ///
388
    /// Will be used for the SLOG or CLOG of the AUTHENTICATE cell.
389
    recv_log: Option<ll::d::Sha256>,
390
}
391

            
392
impl HandshakeChannelHandler {
393
    /// Constructor
394
137
    fn new(new_handler: &mut NewChannelHandler, link_version: LinkVersion) -> Self {
395
137
        Self {
396
137
            filter: MessageFilter::new(
397
137
                link_version,
398
137
                new_handler.channel_type,
399
137
                super::msg::MessageStage::Handshake,
400
137
            ),
401
137
            send_log: new_handler.send_log.take(),
402
137
            recv_log: new_handler.recv_log.take(),
403
137
            inner: codec::ChannelCodec::new(link_version.value()),
404
137
        }
405
137
    }
406

            
407
    /// Internal helper: Take a SHA256 digest and finalize it if any. None is returned if no log
408
    /// digest is given.
409
32
    fn finalize_log(log: Option<ll::d::Sha256>) -> Option<[u8; 32]> {
410
48
        log.map(|sha256| sha256.finalize().into())
411
32
    }
412

            
413
    /// Return an open handshake handler.
414
77
    fn next_handler(&mut self) -> OpenChannelHandler {
415
77
        OpenChannelHandler::new(
416
77
            self.inner
417
77
                .link_version()
418
77
                .try_into()
419
77
                .expect("Channel Codec with unknown link version"),
420
77
            self.channel_type(),
421
        )
422
77
    }
423

            
424
    /// The digest of bytes sent on this channel.
425
    ///
426
    /// This should only ever be called once as it consumes the send log.
427
    ///
428
    /// This will return `None` if one of:
429
    /// - The channel is not recording the send log.
430
    /// - The send log digest has already been taken.
431
12
    pub(crate) fn take_send_log_digest(&mut self) -> Option<AuthLogDigest> {
432
12
        Self::finalize_log(self.send_log.take())
433
12
    }
434

            
435
    /// The digest of bytes received on this channel.
436
    ///
437
    /// This should only ever be called once as it consumes the receive log.
438
    ///
439
    /// This will return `None` if one of:
440
    /// - The channel is not recording the receive log.
441
    /// - The receive log digest has already been taken.
442
20
    pub(crate) fn take_recv_log_digest(&mut self) -> Option<AuthLogDigest> {
443
20
        Self::finalize_log(self.recv_log.take())
444
20
    }
445

            
446
    /// Return the [`ChannelType`] of this handler.
447
77
    pub(crate) fn channel_type(&self) -> ChannelType {
448
77
        self.filter.channel_type()
449
77
    }
450

            
451
    /// Mark this handler as authenticated.
452
    pub(crate) fn set_authenticated(&mut self) {
453
        self.filter.channel_type_mut().set_authenticated();
454
    }
455
}
456

            
457
impl asynchronous_codec::Encoder for HandshakeChannelHandler {
458
    type Item<'a> = AnyChanCell;
459
    type Error = ChanError;
460

            
461
63
    fn encode(
462
63
        &mut self,
463
63
        item: Self::Item<'_>,
464
63
        dst: &mut BytesMut,
465
63
    ) -> std::result::Result<(), Self::Error> {
466
63
        let before_dst_len = dst.len();
467
63
        self.filter.encode_cell(item, &mut self.inner, dst)?;
468
63
        let after_dst_len = dst.len();
469
63
        if let Some(send_log) = self.send_log.as_mut() {
470
12
            // Only use what we actually wrote. Variable length cell are not padded and thus this
471
12
            // won't catch a bunch of padding.
472
12
            send_log.update(&dst[before_dst_len..after_dst_len]);
473
63
        }
474
63
        Ok(())
475
63
    }
476
}
477

            
478
impl asynchronous_codec::Decoder for HandshakeChannelHandler {
479
    type Item = AnyChanCell;
480
    type Error = ChanError;
481

            
482
936
    fn decode(
483
936
        &mut self,
484
936
        src: &mut BytesMut,
485
936
    ) -> std::result::Result<Option<Self::Item>, Self::Error> {
486
936
        let orig = src.clone(); // NOTE: Not fun. But This is only done during handshake.
487
936
        let cell = self.filter.decode_cell(&mut self.inner, src)?;
488
928
        if let Some(recv_log) = self.recv_log.as_mut() {
489
56
            let n_used = orig.len() - src.len();
490
56
            recv_log.update(&orig[..n_used]);
491
872
        }
492
928
        Ok(cell)
493
936
    }
494
}
495

            
496
/// The open channel handler which is used to decode and encode cells onto an open Channel.
497
pub(crate) struct OpenChannelHandler {
498
    /// Message filter used to allow or not a certain message.
499
    filter: MessageFilter,
500
    /// The cell codec that we'll use to encode and decode our cells.
501
    inner: codec::ChannelCodec,
502
}
503

            
504
impl OpenChannelHandler {
505
    /// Constructor
506
101
    fn new(link_version: LinkVersion, channel_type: ChannelType) -> Self {
507
101
        Self {
508
101
            inner: codec::ChannelCodec::new(link_version.value()),
509
101
            filter: MessageFilter::new(link_version, channel_type, super::msg::MessageStage::Open),
510
101
        }
511
101
    }
512

            
513
    /// Return the [`ChannelType`] of this handler.
514
53
    fn channel_type(&self) -> ChannelType {
515
53
        self.filter.channel_type()
516
53
    }
517
}
518

            
519
impl asynchronous_codec::Encoder for OpenChannelHandler {
520
    type Item<'a> = AnyChanCell;
521
    type Error = ChanError;
522

            
523
12
    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
524
12
        self.filter.encode_cell(item, &mut self.inner, dst)
525
12
    }
526
}
527

            
528
impl asynchronous_codec::Decoder for OpenChannelHandler {
529
    type Item = AnyChanCell;
530
    type Error = ChanError;
531

            
532
24
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
533
24
        self.filter.decode_cell(&mut self.inner, src)
534
24
    }
535
}
536

            
537
#[cfg(test)]
538
pub(crate) mod test {
539
    #![allow(clippy::unwrap_used)]
540
    use bytes::BytesMut;
541
    use digest::Digest;
542
    use futures::io::{AsyncRead, AsyncWrite, Cursor, Result};
543
    use futures::sink::SinkExt;
544
    use futures::stream::StreamExt;
545
    use futures::task::{Context, Poll};
546
    use hex_literal::hex;
547
    use std::pin::Pin;
548

            
549
    use tor_bytes::Writer;
550
    use tor_llcrypto as ll;
551
    use tor_rtcompat::StreamOps;
552

            
553
    use crate::channel::msg::LinkVersion;
554
    use crate::channel::{ChannelType, new_frame};
555

            
556
    use super::{ChannelCellHandler, OpenChannelHandler};
557
    use tor_cell::chancell::{AnyChanCell, ChanCmd, ChanMsg, CircId, msg};
558

            
559
    /// Helper type for reading and writing bytes to/from buffers.
560
    pub(crate) struct MsgBuf {
561
        /// Data we have received as a reader.
562
        inbuf: futures::io::Cursor<Vec<u8>>,
563
        /// Data we write as a writer.
564
        outbuf: futures::io::Cursor<Vec<u8>>,
565
    }
566

            
567
    impl AsyncRead for MsgBuf {
568
        fn poll_read(
569
            mut self: Pin<&mut Self>,
570
            cx: &mut Context<'_>,
571
            buf: &mut [u8],
572
        ) -> Poll<Result<usize>> {
573
            Pin::new(&mut self.inbuf).poll_read(cx, buf)
574
        }
575
    }
576
    impl AsyncWrite for MsgBuf {
577
        fn poll_write(
578
            mut self: Pin<&mut Self>,
579
            cx: &mut Context<'_>,
580
            buf: &[u8],
581
        ) -> Poll<Result<usize>> {
582
            Pin::new(&mut self.outbuf).poll_write(cx, buf)
583
        }
584
        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
585
            Pin::new(&mut self.outbuf).poll_flush(cx)
586
        }
587
        fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
588
            Pin::new(&mut self.outbuf).poll_close(cx)
589
        }
590
    }
591

            
592
    impl StreamOps for MsgBuf {}
593

            
594
    impl MsgBuf {
595
        pub(crate) fn new<T: Into<Vec<u8>>>(output: T) -> Self {
596
            let inbuf = Cursor::new(output.into());
597
            let outbuf = Cursor::new(Vec::new());
598
            MsgBuf { inbuf, outbuf }
599
        }
600

            
601
        pub(crate) fn consumed(&self) -> usize {
602
            self.inbuf.position() as usize
603
        }
604

            
605
        pub(crate) fn all_consumed(&self) -> bool {
606
            self.inbuf.get_ref().len() == self.consumed()
607
        }
608

            
609
        pub(crate) fn into_response(self) -> Vec<u8> {
610
            self.outbuf.into_inner()
611
        }
612
    }
613

            
614
    fn new_client_open_frame(
615
        mbuf: MsgBuf,
616
    ) -> asynchronous_codec::Framed<MsgBuf, ChannelCellHandler> {
617
        let open_handler = ChannelCellHandler::Open(OpenChannelHandler::new(
618
            LinkVersion::V5,
619
            ChannelType::ClientInitiator,
620
        ));
621
        asynchronous_codec::Framed::new(mbuf, open_handler)
622
    }
623

            
624
    #[test]
625
    fn check_client_encoding() {
626
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
627
            let mb = MsgBuf::new(&b""[..]);
628
            let mut framed = new_client_open_frame(mb);
629

            
630
            let destroycell = msg::Destroy::new(2.into());
631
            framed
632
                .send(AnyChanCell::new(CircId::new(7), destroycell.into()))
633
                .await
634
                .unwrap();
635

            
636
            framed.flush().await.unwrap();
637

            
638
            let data = framed.into_inner().into_response();
639

            
640
            assert_eq!(&data[0..10], &hex!("00000007 04 0200000000")[..]);
641
        });
642
    }
643

            
644
    #[test]
645
    fn check_client_decoding() {
646
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
647
            let mut dat = Vec::new();
648
            // DESTROY cell.
649
            dat.extend_from_slice(&hex!("00000007 04 0200000000")[..]);
650
            dat.resize(514, 0);
651
            let mb = MsgBuf::new(&dat[..]);
652
            let mut framed = new_client_open_frame(mb);
653

            
654
            let destroy = framed.next().await.unwrap().unwrap();
655

            
656
            let circ_id = CircId::new(7);
657
            assert_eq!(destroy.circid(), circ_id);
658
            assert_eq!(destroy.msg().cmd(), ChanCmd::DESTROY);
659

            
660
            assert!(framed.into_inner().all_consumed());
661
        });
662
    }
663

            
664
    #[test]
665
    fn handler_transition() {
666
        // Start as a client initiating a channel to a relay.
667
        let mut handler: ChannelCellHandler = ChannelType::ClientInitiator.into();
668
        assert!(matches!(handler, ChannelCellHandler::New(_)));
669

            
670
        // Set the link version protocol. Should transition to Handshake.
671
        let r = handler.set_link_version(5);
672
        assert!(r.is_ok());
673
        assert!(matches!(handler, ChannelCellHandler::Handshake(_)));
674

            
675
        // Set the link version protocol.
676
        let r = handler.set_open();
677
        assert!(r.is_ok());
678
        assert!(matches!(handler, ChannelCellHandler::Open(_)));
679
    }
680

            
681
    #[test]
682
    fn clog_digest() {
683
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
684
            let mut our_clog = ll::d::Sha256::new();
685
            let mbuf = MsgBuf::new(*b"");
686
            let mut frame = new_frame(mbuf, ChannelType::RelayInitiator);
687

            
688
            // This is a VERSIONS cell with value 5 in it.
689
            our_clog.update(hex!("0000 07 0002 0005"));
690
            let version_cell = AnyChanCell::new(
691
                None,
692
                msg::Versions::new(vec![5]).expect("Fail VERSIONS").into(),
693
            );
694
            let _ = frame.send(version_cell).await.unwrap();
695

            
696
            frame
697
                .codec_mut()
698
                .set_link_version(5)
699
                .expect("Fail link version set");
700

            
701
            // This is what an empty CERTS cell looks like.
702
            our_clog.update(hex!("0000 0000 81 0001 00"));
703
            let certs_cell = msg::Certs::new_empty();
704
            frame
705
                .send(AnyChanCell::new(None, certs_cell.into()))
706
                .await
707
                .unwrap();
708

            
709
            // Final CLOG should match.
710
            let clog_hash: [u8; 32] = our_clog.finalize().into();
711
            assert_eq!(frame.codec_mut().take_send_log_digest().unwrap(), clog_hash);
712
        });
713
    }
714

            
715
    #[test]
716
    fn slog_digest() {
717
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
718
            let mut our_slog = ll::d::Sha256::new();
719

            
720
            // Build a VERSIONS cell to start with.
721
            let mut data = BytesMut::new();
722
            data.extend_from_slice(
723
                msg::Versions::new(vec![5])
724
                    .unwrap()
725
                    .encode_for_handshake()
726
                    .expect("Fail VERSIONS encoding")
727
                    .as_slice(),
728
            );
729
            our_slog.update(&data);
730

            
731
            let mbuf = MsgBuf::new(data);
732
            let mut frame = new_frame(mbuf, ChannelType::RelayInitiator);
733

            
734
            // Receive the VERSIONS
735
            let _ = frame.next().await.transpose().expect("Fail to get cell");
736
            // Set the link version which will move the handler to Handshake state and then we'll be
737
            // able to decode the AUTH_CHALLENGE.
738
            frame
739
                .codec_mut()
740
                .set_link_version(5)
741
                .expect("Fail link version set");
742

            
743
            // Setup a new buffer for the next cell.
744
            let mut data = BytesMut::new();
745
            // This is a variable length cell with a wide circ ID of 0.
746
            data.write_u32(0);
747
            data.write_u8(ChanCmd::AUTH_CHALLENGE.into());
748
            data.write_u16(36); // This is the length of the payload.
749
            msg::AuthChallenge::new([42_u8; 32], vec![3])
750
                .encode_onto(&mut data)
751
                .expect("Fail AUTH_CHALLENGE encoding");
752
            our_slog.update(&data);
753

            
754
            // Change the I/O part of the Framed with this new buffer containing our new cell.
755
            *frame = MsgBuf::new(data);
756
            // Receive the AUTH_CHALLENGE
757
            let _ = frame.next().await.transpose().expect("Fail to get cell");
758

            
759
            // Final SLOG should match.
760
            let slog_hash: [u8; 32] = our_slog.finalize().into();
761
            assert_eq!(frame.codec_mut().take_recv_log_digest().unwrap(), slog_hash);
762
        });
763
    }
764
}