1
//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2
//! for byte-oriented communication.
3

            
4
use crate::{Error, Result};
5
use static_assertions::assert_impl_all;
6
use tor_cell::relaycell::msg::EndReason;
7
use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8

            
9
use futures::io::{AsyncRead, AsyncWrite};
10
use futures::stream::StreamExt;
11
use futures::task::{Context, Poll};
12
use futures::{Future, Stream};
13
use pin_project::pin_project;
14
use postage::watch;
15

            
16
#[cfg(feature = "tokio")]
17
use tokio_crate::io::ReadBuf;
18
#[cfg(feature = "tokio")]
19
use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
20
#[cfg(feature = "tokio")]
21
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
22
use tor_cell::restricted_msg;
23

            
24
use std::fmt::Debug;
25
use std::io::Result as IoResult;
26
use std::num::NonZero;
27
use std::pin::Pin;
28
#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
29
use std::sync::Arc;
30
#[cfg(feature = "stream-ctrl")]
31
use std::sync::{Mutex, Weak};
32

            
33
use educe::Educe;
34

            
35
use crate::client::ClientTunnel;
36
use crate::client::stream::StreamReceiver;
37
use crate::memquota::StreamAccount;
38
use crate::stream::StreamTarget;
39
use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
40
use crate::stream::flow_ctrl::state::StreamRateLimit;
41
use crate::stream::flow_ctrl::xon_xoff::reader::{BufferIsEmpty, XonXoffReader, XonXoffReaderCtrl};
42
use crate::util::token_bucket::dynamic_writer::DynamicRateLimitedWriter;
43
use crate::util::token_bucket::writer::{RateLimitedWriter, RateLimitedWriterConfig};
44
use tor_basic_utils::skip_fmt;
45
use tor_cell::relaycell::msg::Data;
46
use tor_error::internal;
47
use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
48

            
49
/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
50
///
51
/// Unfortunately we need to store the result of a [`StreamExt::map`] and [`StreamExt::fuse`] in
52
/// [`DataWriter`], which leaves us with this ugly type.
53
/// We use a type alias to make `DataWriter` a little nicer.
54
type RateConfigStream = futures::stream::Map<
55
    futures::stream::Fuse<watch::Receiver<StreamRateLimit>>,
56
    fn(StreamRateLimit) -> RateLimitedWriterConfig,
57
>;
58

            
59
/// An anonymized stream over the Tor network.
60
///
61
/// For most purposes, you can think of this type as an anonymized
62
/// TCP stream: it can read and write data, and get closed when it's done.
63
///
64
/// [`DataStream`] implements [`futures::io::AsyncRead`] and
65
/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
66
/// traits are expected.
67
///
68
/// # Examples
69
///
70
/// Connecting to an HTTP server and sending a request, using
71
/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
72
///
73
/// ```ignore
74
/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
75
///
76
/// use futures::io::AsyncWriteExt;
77
///
78
/// stream
79
///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
80
///     .await?;
81
///
82
/// // Flushing the stream is important; see below!
83
/// stream.flush().await?;
84
/// ```
85
///
86
/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
87
///
88
/// ```ignore
89
/// use futures::io::AsyncReadExt;
90
///
91
/// let mut buf = Vec::new();
92
/// stream.read_to_end(&mut buf).await?;
93
///
94
/// println!("{}", String::from_utf8_lossy(&buf));
95
/// ```
96
///
97
/// # Usage with Tokio
98
///
99
/// If the `tokio` crate feature is enabled, this type also implements
100
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
101
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
102
/// with code that expects those traits.
103
///
104
/// # Remember to call `flush`!
105
///
106
/// DataStream buffers data internally, in order to write as few cells
107
/// as possible onto the network.  In order to make sure that your
108
/// data has actually been sent, you need to make sure that
109
/// [`AsyncWrite::poll_flush`] runs to completion: probably via
110
/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
111
///
112
/// # Splitting the type
113
///
114
/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
115
/// `DataStream::split` method can be used to split it into those two parts, for more
116
/// convenient usage with e.g. stream combinators.
117
///
118
/// # How long does a stream live?
119
///
120
/// A `DataStream` will live until all references to it are dropped,
121
/// or until it is closed explicitly.
122
///
123
/// If you split the stream into a `DataReader` and a `DataWriter`, it
124
/// will survive until _both_ are dropped, or until it is closed
125
/// explicitly.
126
///
127
/// A stream can also close because of a network error,
128
/// or because the other side of the stream decided to close it.
129
///
130
// # Semver note
131
//
132
// Note that this type is re-exported as a part of the public API of
133
// the `arti-client` crate.  Any changes to its API here in
134
// `tor-proto` need to be reflected above.
135
#[derive(Debug)]
136
pub struct DataStream {
137
    /// Underlying writer for this stream
138
    w: DataWriter,
139
    /// Underlying reader for this stream
140
    r: DataReader,
141
    /// A control object that can be used to monitor and control this stream
142
    /// without needing to own it.
143
    #[cfg(feature = "stream-ctrl")]
144
    ctrl: Arc<ClientDataStreamCtrl>,
145
}
146
assert_impl_all! { DataStream: Send, Sync }
147

            
148
/// An object used to control and monitor a data stream.
149
///
150
/// # Notes
151
///
152
/// This is a separate type from [`DataStream`] because it's useful to have
153
/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
154
/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
155
/// work correctly.
156
#[cfg(feature = "stream-ctrl")]
157
#[cfg_attr(
158
    feature = "rpc",
159
    derive(derive_deftly::Deftly),
160
    derive_deftly(tor_rpcbase::templates::Object)
161
)]
162
#[derive(Debug)]
163
pub struct ClientDataStreamCtrl {
164
    /// The circuit to which this stream is attached.
165
    ///
166
    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
167
    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
168
    /// one of those is alive, this reference will be present.
169
    ///
170
    /// We make this a Weak reference so that once the stream itself is closed,
171
    /// we can't leak circuits.
172
    tunnel: Weak<ClientTunnel>,
173

            
174
    /// Shared user-visible information about the state of this stream.
175
    ///
176
    /// TODO RPC: This will probably want to be a `postage::Watch` or something
177
    /// similar, if and when it stops moving around.
178
    #[cfg(feature = "stream-ctrl")]
179
    status: Arc<Mutex<DataStreamStatus>>,
180

            
181
    /// The memory quota account that should be used for this stream's data
182
    ///
183
    /// Exists to keep the account alive
184
    _memquota: StreamAccount,
185
}
186

            
187
/// The inner writer for [`DataWriter`].
188
///
189
/// This type is responsible for taking bytes and packaging them into cells.
190
/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
191
#[derive(Debug)]
192
struct DataWriterInner {
193
    /// Internal state for this writer
194
    ///
195
    /// This is stored in an Option so that we can mutate it in the
196
    /// AsyncWrite functions.  It might be possible to do better here,
197
    /// and we should refactor if so.
198
    state: Option<DataWriterState>,
199

            
200
    /// The memory quota account that should be used for this stream's data
201
    ///
202
    /// Exists to keep the account alive
203
    // If we liked, we could make this conditional; see DataReaderInner.memquota
204
    _memquota: StreamAccount,
205

            
206
    /// A control object that can be used to monitor and control this stream
207
    /// without needing to own it.
208
    #[cfg(feature = "stream-ctrl")]
209
    ctrl: Arc<ClientDataStreamCtrl>,
210
}
211

            
212
/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
213
///
214
/// See the [`DataStream`] docs for more information. In particular, note
215
/// that this writer requires `poll_flush` to complete in order to guarantee that
216
/// all data has been written.
217
///
218
/// # Usage with Tokio
219
///
220
/// If the `tokio` crate feature is enabled, this type also implements
221
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
222
/// with code that expects that trait.
223
///
224
/// # Drop and close
225
///
226
/// Note that dropping a `DataWriter` has no special effect on its own:
227
/// if the `DataWriter` is dropped, the underlying stream will still remain open
228
/// until the `DataReader` is also dropped.
229
///
230
/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
231
/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
232
///
233
/// Remember that Tor does not support half-open streams:
234
/// If you `close` or `shutdown` a stream,
235
/// the other side will not see the stream as half-open,
236
/// and so will (probably) not finish sending you any in-progress data.
237
/// Do not use `close`/`shutdown` to communicate anything besides
238
/// "I am done using this stream."
239
///
240
// # Semver note
241
//
242
// Note that this type is re-exported as a part of the public API of
243
// the `arti-client` crate.  Any changes to its API here in
244
// `tor-proto` need to be reflected above.
245
#[derive(Debug)]
246
pub struct DataWriter {
247
    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
248
    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
249
}
250

            
251
impl DataWriter {
252
    /// Create a new rate-limited [`DataWriter`] from a [`DataWriterInner`].
253
120
    fn new(
254
120
        inner: DataWriterInner,
255
120
        rate_limit_updates: watch::Receiver<StreamRateLimit>,
256
120
        time_provider: DynTimeProvider,
257
120
    ) -> Self {
258
        /// Converts a `rate` into a `RateLimitedWriterConfig`.
259
180
        fn rate_to_config(rate: StreamRateLimit) -> RateLimitedWriterConfig {
260
180
            let rate = rate.bytes_per_sec();
261
180
            RateLimitedWriterConfig {
262
180
                rate,        // bytes per second
263
180
                burst: rate, // bytes
264
180
                // This number is chosen arbitrarily, but the idea is that we want to balance
265
180
                // between throughput and latency. Assume the user tries to write a large buffer
266
180
                // (~600 bytes). If we set this too small (for example 1), we'll be waking up
267
180
                // frequently and writing a small number of bytes each time to the
268
180
                // `DataWriterInner`, even if this isn't enough bytes to send a cell. If we set this
269
180
                // too large (for example 510), we'll be waking up infrequently to write a larger
270
180
                // number of bytes each time. So even if the `DataWriterInner` has almost a full
271
180
                // cell's worth of data queued (for example 490) and only needs 509-490=19 more
272
180
                // bytes before a cell can be sent, it will block until the rate limiter allows 510
273
180
                // more bytes.
274
180
                //
275
180
                // TODO(arti#2028): Is there an optimal value here?
276
180
                wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
277
180
            }
278
180
        }
279

            
280
        // get the current rate from the `watch::Receiver`, which we'll use as the initial rate
281
120
        let initial_rate: StreamRateLimit = *rate_limit_updates.borrow();
282

            
283
        // map the rate update stream to the type required by `DynamicRateLimitedWriter`
284
120
        let rate_limit_updates = rate_limit_updates.fuse().map(rate_to_config as fn(_) -> _);
285

            
286
        // build the rate limiter
287
120
        let writer = RateLimitedWriter::new(inner, &rate_to_config(initial_rate), time_provider);
288
120
        let writer = DynamicRateLimitedWriter::new(writer, rate_limit_updates);
289

            
290
120
        Self { writer }
291
120
    }
292

            
293
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
294
    /// interact with this stream without holding the stream itself.
295
    #[cfg(feature = "stream-ctrl")]
296
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
297
        Some(self.writer.inner().client_stream_ctrl())
298
    }
299
}
300

            
301
impl AsyncWrite for DataWriter {
302
5540
    fn poll_write(
303
5540
        mut self: Pin<&mut Self>,
304
5540
        cx: &mut Context<'_>,
305
5540
        buf: &[u8],
306
5540
    ) -> Poll<IoResult<usize>> {
307
5540
        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
308
5540
    }
309

            
310
32
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
311
32
        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
312
32
    }
313

            
314
16
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
315
16
        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
316
16
    }
317
}
318

            
319
#[cfg(feature = "tokio")]
320
impl TokioAsyncWrite for DataWriter {
321
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
322
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
323
    }
324

            
325
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
326
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
327
    }
328

            
329
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
330
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
331
    }
332
}
333

            
334
/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
335
///
336
/// See the [`DataStream`] docs for more information.
337
///
338
/// # Usage with Tokio
339
///
340
/// If the `tokio` crate feature is enabled, this type also implements
341
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
342
/// with code that expects that trait.
343
//
344
// # Semver note
345
//
346
// Note that this type is re-exported as a part of the public API of
347
// the `arti-client` crate.  Any changes to its API here in
348
// `tor-proto` need to be reflected above.
349
#[derive(Debug)]
350
pub struct DataReader {
351
    /// The [`DataReaderInner`] with a wrapper to support XON/XOFF flow control.
352
    reader: XonXoffReader<DataReaderInner>,
353
}
354

            
355
impl DataReader {
356
    /// Create a new [`DataReader`].
357
120
    fn new(reader: DataReaderInner, xon_xoff_reader_ctrl: XonXoffReaderCtrl) -> Self {
358
120
        Self {
359
120
            reader: XonXoffReader::new(xon_xoff_reader_ctrl, reader),
360
120
        }
361
120
    }
362

            
363
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
364
    /// interact with this stream without holding the stream itself.
365
    #[cfg(feature = "stream-ctrl")]
366
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
367
        Some(self.reader.inner().client_stream_ctrl())
368
    }
369
}
370

            
371
impl AsyncRead for DataReader {
372
230
    fn poll_read(
373
230
        mut self: Pin<&mut Self>,
374
230
        cx: &mut Context<'_>,
375
230
        buf: &mut [u8],
376
230
    ) -> Poll<IoResult<usize>> {
377
230
        AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf)
378
230
    }
379

            
380
    fn poll_read_vectored(
381
        mut self: Pin<&mut Self>,
382
        cx: &mut Context<'_>,
383
        bufs: &mut [std::io::IoSliceMut<'_>],
384
    ) -> Poll<IoResult<usize>> {
385
        AsyncRead::poll_read_vectored(Pin::new(&mut self.reader), cx, bufs)
386
    }
387
}
388

            
389
#[cfg(feature = "tokio")]
390
impl TokioAsyncRead for DataReader {
391
    fn poll_read(
392
        self: Pin<&mut Self>,
393
        cx: &mut Context<'_>,
394
        buf: &mut ReadBuf<'_>,
395
    ) -> Poll<IoResult<()>> {
396
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
397
    }
398
}
399

            
400
/// The inner reader for [`DataReader`].
401
///
402
/// This type is responsible for taking stream messages and extracting the stream data from them.
403
/// Flow control logic is implemented in [`DataReader`] to avoid making this type more complex.
404
#[derive(Debug)]
405
pub(crate) struct DataReaderInner {
406
    /// Internal state for this reader.
407
    ///
408
    /// This is stored in an Option so that we can mutate it in
409
    /// poll_read().  It might be possible to do better here, and we
410
    /// should refactor if so.
411
    state: Option<DataReaderState>,
412

            
413
    /// The memory quota account that should be used for this stream's data
414
    ///
415
    /// Exists to keep the account alive
416
    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
417
    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
418
    _memquota: StreamAccount,
419

            
420
    /// A control object that can be used to monitor and control this stream
421
    /// without needing to own it.
422
    #[cfg(feature = "stream-ctrl")]
423
    ctrl: Arc<ClientDataStreamCtrl>,
424
}
425

            
426
impl BufferIsEmpty for DataReaderInner {
427
    /// The result will become stale,
428
    /// so is most accurate immediately after a [`poll_read`](AsyncRead::poll_read).
429
    fn is_empty(mut self: Pin<&mut Self>) -> bool {
430
        match self
431
            .state
432
            .as_mut()
433
            .expect("forgot to put `DataReaderState` back")
434
        {
435
            DataReaderState::Open(imp) => {
436
                // check if the partial cell in `pending` is empty,
437
                // and if the message stream is empty
438
                imp.pending[imp.offset..].is_empty() && imp.s.is_empty()
439
            }
440
            // closed, so any data should have been discarded
441
            DataReaderState::Closed => true,
442
        }
443
    }
444
}
445

            
446
/// Shared status flags for tracking the status of as `DataStream`.
447
///
448
/// We expect to refactor this a bit, so it's not exposed at all.
449
//
450
// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
451
// from various points in this module, we should instead construct
452
// DataStreamStatus as needed from information available elsewhere.  In any
453
// case, we should really  eliminate as much duplicate state here as we can.
454
// (See discussions at !1198 for some challenges with this.)
455
#[cfg(feature = "stream-ctrl")]
456
#[derive(Clone, Debug, Default)]
457
struct DataStreamStatus {
458
    /// True if we've received a CONNECTED message.
459
    //
460
    // TODO: This is redundant with `connected` in DataReaderImpl.
461
    received_connected: bool,
462
    /// True if we have decided to send an END message.
463
    //
464
    // TODO RPC: There is not an easy way to set this from this module!  Really,
465
    // the decision to send an "end" is made when the StreamTarget object is
466
    // dropped, but we don't currently have any way to see when that happens.
467
    // Perhaps we need a different shared StreamStatus object that the
468
    // StreamTarget holds?
469
    sent_end: bool,
470
    /// True if we have received an END message telling us to close the stream.
471
    received_end: bool,
472
    /// True if we have received an error.
473
    ///
474
    /// (This is not a subset or superset of received_end; some errors are END
475
    /// messages but some aren't; some END messages are errors but some aren't.)
476
    received_err: bool,
477
}
478

            
479
#[cfg(feature = "stream-ctrl")]
480
impl DataStreamStatus {
481
    /// Remember that we've received a connected message.
482
120
    fn record_connected(&mut self) {
483
120
        self.received_connected = true;
484
120
    }
485

            
486
    /// Remember that we've received an error of some kind.
487
24
    fn record_error(&mut self, e: &Error) {
488
        // TODO: Probably we should remember the actual error in a box or
489
        // something.  But that means making a redundant copy of the error
490
        // even if nobody will want it.  Do we care?
491
24
        match e {
492
24
            Error::EndReceived(EndReason::DONE) => self.received_end = true,
493
            Error::EndReceived(_) => {
494
                self.received_end = true;
495
                self.received_err = true;
496
            }
497
            _ => self.received_err = true,
498
        }
499
24
    }
500
}
501

            
502
restricted_msg! {
503
    /// An allowable incoming message on a client data stream.
504
    enum ClientDataStreamMsg:RelayMsg {
505
        // SENDME is handled by the reactor.
506
        Data, End, Connected,
507
    }
508
}
509

            
510
// TODO RPC: Should we also implement this trait for everything that holds a
511
// ClientDataStreamCtrl?
512
#[cfg(feature = "stream-ctrl")]
513
impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
514
    fn tunnel(&self) -> Option<Arc<ClientTunnel>> {
515
        self.tunnel.upgrade()
516
    }
517
}
518

            
519
#[cfg(feature = "stream-ctrl")]
520
impl ClientDataStreamCtrl {
521
    /// Return true if the underlying stream is connected. (That is, if it has
522
    /// received a `CONNECTED` message, and has not been closed.)
523
    pub fn is_connected(&self) -> bool {
524
        let s = self.status.lock().expect("poisoned lock");
525
        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
526
    }
527

            
528
    // TODO RPC: Add more functions once we have the desired API more nailed
529
    // down.
530
}
531

            
532
impl DataStream {
533
    /// Wrap raw stream receiver and target parts as a DataStream.
534
    ///
535
    /// For non-optimistic stream, function `wait_for_connection`
536
    /// must be called after to make sure CONNECTED is received.
537
96
    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
538
96
        time_provider: P,
539
96
        receiver: StreamReceiver,
540
96
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
541
96
        target: StreamTarget,
542
96
        memquota: StreamAccount,
543
96
    ) -> Self {
544
96
        Self::new_inner(
545
96
            time_provider,
546
96
            receiver,
547
96
            xon_xoff_reader_ctrl,
548
96
            target,
549
            false,
550
96
            memquota,
551
        )
552
96
    }
553

            
554
    /// Wrap raw stream receiver and target parts as a connected DataStream.
555
    ///
556
    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
557
    /// CONNECTED cell.
558
    ///
559
    /// This is used by hidden services, exit relays, and directory servers to accept streams.
560
    #[cfg(any(feature = "hs-service", feature = "relay"))]
561
24
    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
562
24
        time_provider: P,
563
24
        receiver: StreamReceiver,
564
24
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
565
24
        target: StreamTarget,
566
24
        memquota: StreamAccount,
567
24
    ) -> Self {
568
24
        Self::new_inner(
569
24
            time_provider,
570
24
            receiver,
571
24
            xon_xoff_reader_ctrl,
572
24
            target,
573
            true,
574
24
            memquota,
575
        )
576
24
    }
577

            
578
    /// The shared implementation of the `new*()` functions.
579
120
    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
580
120
        time_provider: P,
581
120
        receiver: StreamReceiver,
582
120
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
583
120
        target: StreamTarget,
584
120
        connected: bool,
585
120
        memquota: StreamAccount,
586
120
    ) -> Self {
587
120
        let relay_cell_format = target.relay_cell_format();
588
120
        let out_buf_len = Data::max_body_len(relay_cell_format);
589
120
        let rate_limit_stream = target.rate_limit_stream().clone();
590

            
591
        #[cfg(feature = "stream-ctrl")]
592
120
        let status = {
593
120
            let mut data_stream_status = DataStreamStatus::default();
594
120
            if connected {
595
24
                data_stream_status.record_connected();
596
96
            }
597
120
            Arc::new(Mutex::new(data_stream_status))
598
        };
599

            
600
        #[cfg(feature = "stream-ctrl")]
601
120
        let ctrl = {
602
120
            let tunnel = match target.tunnel() {
603
120
                crate::stream::Tunnel::Client(t) => Arc::downgrade(t),
604
                #[cfg(feature = "relay")]
605
                crate::stream::Tunnel::Relay(_) => panic!("created a relay tunnel in the client?!"),
606
            };
607

            
608
120
            Arc::new(ClientDataStreamCtrl {
609
120
                tunnel,
610
120
                status: status.clone(),
611
120
                _memquota: memquota.clone(),
612
120
            })
613
        };
614
120
        let r = DataReaderInner {
615
120
            state: Some(DataReaderState::Open(DataReaderImpl {
616
120
                s: receiver,
617
120
                pending: Vec::new(),
618
120
                offset: 0,
619
120
                connected,
620
120
                #[cfg(feature = "stream-ctrl")]
621
120
                status: status.clone(),
622
120
            })),
623
120
            _memquota: memquota.clone(),
624
120
            #[cfg(feature = "stream-ctrl")]
625
120
            ctrl: ctrl.clone(),
626
120
        };
627
120
        let w = DataWriterInner {
628
120
            state: Some(DataWriterState::Ready(DataWriterImpl {
629
120
                s: target,
630
120
                buf: vec![0; out_buf_len].into_boxed_slice(),
631
120
                n_pending: 0,
632
120
                #[cfg(feature = "stream-ctrl")]
633
120
                status,
634
120
                relay_cell_format,
635
120
            })),
636
120
            _memquota: memquota,
637
120
            #[cfg(feature = "stream-ctrl")]
638
120
            ctrl: ctrl.clone(),
639
120
        };
640

            
641
120
        let time_provider = DynTimeProvider::new(time_provider);
642

            
643
120
        DataStream {
644
120
            w: DataWriter::new(w, rate_limit_stream, time_provider),
645
120
            r: DataReader::new(r, xon_xoff_reader_ctrl),
646
120
            #[cfg(feature = "stream-ctrl")]
647
120
            ctrl,
648
120
        }
649
120
    }
650

            
651
    /// Divide this DataStream into its constituent parts.
652
28
    pub fn split(self) -> (DataReader, DataWriter) {
653
28
        (self.r, self.w)
654
28
    }
655

            
656
    /// Wait until a CONNECTED cell is received, or some other cell
657
    /// is received to indicate an error.
658
    ///
659
    /// Does nothing if this stream is already connected.
660
126
    pub async fn wait_for_connection(&mut self) -> Result<()> {
661
        // We must put state back before returning
662
84
        let state = self
663
84
            .r
664
84
            .reader
665
84
            .inner_mut()
666
84
            .state
667
84
            .take()
668
84
            .expect("Missing state in DataReaderInner");
669

            
670
84
        if let DataReaderState::Open(mut imp) = state {
671
84
            let result = if imp.connected {
672
                Ok(())
673
            } else {
674
                // This succeeds if the cell is CONNECTED, and fails otherwise.
675
184
                std::future::poll_fn(|cx| Pin::new(&mut imp).read_cell(cx)).await
676
            };
677
84
            self.r.reader.inner_mut().state = Some(match result {
678
                Err(_) => DataReaderState::Closed,
679
84
                Ok(_) => DataReaderState::Open(imp),
680
            });
681
84
            result
682
        } else {
683
            Err(Error::from(internal!(
684
                "Expected ready state, got {:?}",
685
                state
686
            )))
687
        }
688
84
    }
689

            
690
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
691
    /// interact with this stream without holding the stream itself.
692
    #[cfg(feature = "stream-ctrl")]
693
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
694
        Some(&self.ctrl)
695
    }
696
}
697

            
698
impl AsyncRead for DataStream {
699
230
    fn poll_read(
700
230
        mut self: Pin<&mut Self>,
701
230
        cx: &mut Context<'_>,
702
230
        buf: &mut [u8],
703
230
    ) -> Poll<IoResult<usize>> {
704
230
        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
705
230
    }
706
}
707

            
708
#[cfg(feature = "tokio")]
709
impl TokioAsyncRead for DataStream {
710
    fn poll_read(
711
        self: Pin<&mut Self>,
712
        cx: &mut Context<'_>,
713
        buf: &mut ReadBuf<'_>,
714
    ) -> Poll<IoResult<()>> {
715
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
716
    }
717
}
718

            
719
impl AsyncWrite for DataStream {
720
5540
    fn poll_write(
721
5540
        mut self: Pin<&mut Self>,
722
5540
        cx: &mut Context<'_>,
723
5540
        buf: &[u8],
724
5540
    ) -> Poll<IoResult<usize>> {
725
5540
        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
726
5540
    }
727
32
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
728
32
        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
729
32
    }
730
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
731
        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
732
    }
733
}
734

            
735
#[cfg(feature = "tokio")]
736
impl TokioAsyncWrite for DataStream {
737
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
738
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
739
    }
740

            
741
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
742
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
743
    }
744

            
745
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
746
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
747
    }
748
}
749

            
750
/// Helper type: Like BoxFuture, but also requires that the future be Sync.
751
type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
752

            
753
/// An enumeration for the state of a DataWriter.
754
///
755
/// We have to use an enum here because, for as long as we're waiting
756
/// for a flush operation to complete, the future returned by
757
/// `flush_cell()` owns the DataWriterImpl.
758
#[derive(Educe)]
759
#[educe(Debug)]
760
enum DataWriterState {
761
    /// The writer has closed or gotten an error: nothing more to do.
762
    Closed,
763
    /// The writer is not currently flushing; more data can get queued
764
    /// immediately.
765
    Ready(DataWriterImpl),
766
    /// The writer is flushing a cell.
767
    Flushing(
768
        #[educe(Debug(method = "skip_fmt"))] //
769
        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
770
    ),
771
}
772

            
773
/// Internal: the write part of a DataStream
774
#[derive(Educe)]
775
#[educe(Debug)]
776
struct DataWriterImpl {
777
    /// The underlying StreamTarget object.
778
    s: StreamTarget,
779

            
780
    /// Buffered data to send over the connection.
781
    ///
782
    /// This buffer is currently allocated using a number of bytes
783
    /// equal to the maximum that we can package at a time.
784
    //
785
    // TODO: this buffer is probably smaller than we want, but it's good
786
    // enough for now.  If we _do_ make it bigger, we'll have to change
787
    // our use of Data::split_from to handle the case where we can't fit
788
    // all the data.
789
    #[educe(Debug(method = "skip_fmt"))]
790
    buf: Box<[u8]>,
791

            
792
    /// Number of unflushed bytes in buf.
793
    n_pending: usize,
794

            
795
    /// Relay cell format in use
796
    relay_cell_format: RelayCellFormat,
797

            
798
    /// Shared user-visible information about the state of this stream.
799
    #[cfg(feature = "stream-ctrl")]
800
    status: Arc<Mutex<DataStreamStatus>>,
801
}
802

            
803
impl DataWriterInner {
804
    /// See [`DataWriter::client_stream_ctrl`].
805
    #[cfg(feature = "stream-ctrl")]
806
    fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
807
        &self.ctrl
808
    }
809

            
810
    /// Helper for poll_flush() and poll_close(): Performs a flush, then
811
    /// closes the stream if should_close is true.
812
48
    fn poll_flush_impl(
813
48
        mut self: Pin<&mut Self>,
814
48
        cx: &mut Context<'_>,
815
48
        should_close: bool,
816
48
    ) -> Poll<IoResult<()>> {
817
48
        let state = self.state.take().expect("Missing state in DataWriter");
818

            
819
        // TODO: this whole function is a bit copy-pasted.
820
48
        let mut future: BoxSyncFuture<_> = match state {
821
48
            DataWriterState::Ready(imp) => {
822
48
                if imp.n_pending == 0 {
823
                    // Nothing to flush!
824
24
                    if should_close {
825
                        // We need to actually continue with this function to do the closing.
826
                        // Thus, make a future that does nothing and is ready immediately.
827
16
                        Box::pin(futures::future::ready((imp, Ok(()))))
828
                    } else {
829
                        // There's nothing more to do; we can return.
830
8
                        self.state = Some(DataWriterState::Ready(imp));
831
8
                        return Poll::Ready(Ok(()));
832
                    }
833
                } else {
834
                    // We need to flush the buffer's contents; Make a future for that.
835
24
                    Box::pin(imp.flush_buf())
836
                }
837
            }
838
            DataWriterState::Flushing(fut) => fut,
839
            DataWriterState::Closed => {
840
                self.state = Some(DataWriterState::Closed);
841
                return Poll::Ready(Err(Error::NotConnected.into()));
842
            }
843
        };
844

            
845
40
        match future.as_mut().poll(cx) {
846
            Poll::Ready((_imp, Err(e))) => {
847
                self.state = Some(DataWriterState::Closed);
848
                Poll::Ready(Err(e.into()))
849
            }
850
40
            Poll::Ready((mut imp, Ok(()))) => {
851
40
                if should_close {
852
16
                    // Tell the StreamTarget to close, so that the reactor
853
16
                    // realizes that we are done sending. (Dropping `imp.s` does not
854
16
                    // suffice, since there may be other clones of it.  In particular,
855
16
                    // the StreamReceiver has one, which it uses to keep the stream
856
16
                    // open, among other things.)
857
16
                    imp.s.close();
858
16

            
859
16
                    #[cfg(feature = "stream-ctrl")]
860
16
                    {
861
16
                        // TODO RPC:  This is not sufficient to track every case
862
16
                        // where we might have sent an End.  See note on the
863
16
                        // `sent_end` field.
864
16
                        imp.status.lock().expect("lock poisoned").sent_end = true;
865
16
                    }
866
16
                    self.state = Some(DataWriterState::Closed);
867
24
                } else {
868
24
                    self.state = Some(DataWriterState::Ready(imp));
869
24
                }
870
40
                Poll::Ready(Ok(()))
871
            }
872
            Poll::Pending => {
873
                self.state = Some(DataWriterState::Flushing(future));
874
                Poll::Pending
875
            }
876
        }
877
48
    }
878
}
879

            
880
impl AsyncWrite for DataWriterInner {
881
5540
    fn poll_write(
882
5540
        mut self: Pin<&mut Self>,
883
5540
        cx: &mut Context<'_>,
884
5540
        buf: &[u8],
885
5540
    ) -> Poll<IoResult<usize>> {
886
5540
        if buf.is_empty() {
887
            return Poll::Ready(Ok(0));
888
5540
        }
889

            
890
5540
        let state = self.state.take().expect("Missing state in DataWriter");
891

            
892
5540
        let mut future = match state {
893
5500
            DataWriterState::Ready(mut imp) => {
894
5500
                let n_queued = imp.queue_bytes(buf);
895
5500
                if n_queued != 0 {
896
1220
                    self.state = Some(DataWriterState::Ready(imp));
897
1220
                    return Poll::Ready(Ok(n_queued));
898
4280
                }
899
                // we couldn't queue anything, so the current cell must be full.
900
4280
                Box::pin(imp.flush_buf())
901
            }
902
40
            DataWriterState::Flushing(fut) => fut,
903
            DataWriterState::Closed => {
904
                self.state = Some(DataWriterState::Closed);
905
                return Poll::Ready(Err(Error::NotConnected.into()));
906
            }
907
        };
908

            
909
4320
        match future.as_mut().poll(cx) {
910
            Poll::Ready((_imp, Err(e))) => {
911
                #[cfg(feature = "stream-ctrl")]
912
                {
913
                    _imp.status.lock().expect("lock poisoned").record_error(&e);
914
                }
915
                self.state = Some(DataWriterState::Closed);
916
                Poll::Ready(Err(e.into()))
917
            }
918
4280
            Poll::Ready((mut imp, Ok(()))) => {
919
                // Great!  We're done flushing.  Queue as much as we can of this
920
                // cell.
921
4280
                let n_queued = imp.queue_bytes(buf);
922
4280
                self.state = Some(DataWriterState::Ready(imp));
923
4280
                Poll::Ready(Ok(n_queued))
924
            }
925
            Poll::Pending => {
926
40
                self.state = Some(DataWriterState::Flushing(future));
927
40
                Poll::Pending
928
            }
929
        }
930
5540
    }
931

            
932
32
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
933
32
        self.poll_flush_impl(cx, false)
934
32
    }
935

            
936
16
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
937
16
        self.poll_flush_impl(cx, true)
938
16
    }
939
}
940

            
941
#[cfg(feature = "tokio")]
942
impl TokioAsyncWrite for DataWriterInner {
943
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
944
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
945
    }
946

            
947
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
948
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
949
    }
950

            
951
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
952
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
953
    }
954
}
955

            
956
impl DataWriterImpl {
957
    /// Try to flush the current buffer contents as a data cell.
958
6456
    async fn flush_buf(mut self) -> (Self, Result<()>) {
959
4304
        let result = if let Some((cell, remainder)) =
960
4304
            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
961
        {
962
            // TODO: Eventually we may want a larger buffer; if we do,
963
            // this invariant will become false.
964
4304
            assert!(remainder.is_empty());
965
4304
            self.n_pending = 0;
966
4304
            self.s.send(cell.into()).await
967
        } else {
968
            Ok(())
969
        };
970

            
971
4304
        (self, result)
972
4304
    }
973

            
974
    /// Add as many bytes as possible from `b` to our internal buffer;
975
    /// return the number we were able to add.
976
9780
    fn queue_bytes(&mut self, b: &[u8]) -> usize {
977
9780
        let empty_space = &mut self.buf[self.n_pending..];
978
9780
        if empty_space.is_empty() {
979
            // that is, len == 0
980
4280
            return 0;
981
5500
        }
982

            
983
5500
        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
984
5500
        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
985
5500
        self.n_pending += n_to_copy;
986
5500
        n_to_copy
987
9780
    }
988
}
989

            
990
impl DataReaderInner {
991
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
992
    /// interact with this stream without holding the stream itself.
993
    #[cfg(feature = "stream-ctrl")]
994
    pub(crate) fn client_stream_ctrl(&self) -> &Arc<ClientDataStreamCtrl> {
995
        &self.ctrl
996
    }
997
}
998

            
999
/// An enumeration for the state of a [`DataReaderInner`].
// TODO: We don't need to implement the state in this way anymore now that we've removed the saved
// future. There are a few ways we could simplify this. See:
// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3076#note_3218210
#[derive(Educe)]
#[educe(Debug)]
// We allow this since it's expected that streams will spend most of their time in the `Open` state,
// and will be cleaned up shortly after closing.
#[allow(clippy::large_enum_variant)]
enum DataReaderState {
    /// In this state we have received an end cell or an error.
    Closed,
    /// In this state the reader is open.
    Open(DataReaderImpl),
}
/// Wrapper for the read part of a [`DataStream`].
#[derive(Educe)]
#[educe(Debug)]
#[pin_project]
struct DataReaderImpl {
    /// The underlying StreamReceiver object.
    #[educe(Debug(method = "skip_fmt"))]
    #[pin]
    s: StreamReceiver,
    /// If present, data that we received on this stream but have not
    /// been able to send to the caller yet.
    // TODO: This data structure is probably not what we want, but
    // it's good enough for now.
    #[educe(Debug(method = "skip_fmt"))]
    pending: Vec<u8>,
    /// Index into pending to show what we've already read.
    offset: usize,
    /// If true, we have received a CONNECTED cell on this stream.
    connected: bool,
    /// Shared user-visible information about the state of this stream.
    #[cfg(feature = "stream-ctrl")]
    status: Arc<Mutex<DataStreamStatus>>,
}
impl AsyncRead for DataReaderInner {
230
    fn poll_read(
230
        mut self: Pin<&mut Self>,
230
        cx: &mut Context<'_>,
230
        buf: &mut [u8],
230
    ) -> Poll<IoResult<usize>> {
        // We're pulling the state object out of the reader.  We MUST
        // put it back before this function returns.
230
        let mut state = self.state.take().expect("Missing state in DataReaderInner");
        loop {
330
            let mut imp = match state {
330
                DataReaderState::Open(mut imp) => {
                    // There may be data to read already.
330
                    let n_copied = imp.extract_bytes(buf);
330
                    if n_copied != 0 || buf.is_empty() {
                        // We read data into the buffer, or the buffer was 0-len to begin with.
                        // Tell the caller.
88
                        self.state = Some(DataReaderState::Open(imp));
88
                        return Poll::Ready(Ok(n_copied));
242
                    }
                    // No data available!  We have to try reading.
242
                    imp
                }
                DataReaderState::Closed => {
                    // TODO: Why are we returning an error rather than continuing to return EOF?
                    self.state = Some(DataReaderState::Closed);
                    return Poll::Ready(Err(Error::NotConnected.into()));
                }
            };
            // See if a cell is ready.
242
            match Pin::new(&mut imp).read_cell(cx) {
24
                Poll::Ready(Err(e)) => {
                    // There aren't any survivable errors in the current
                    // design.
24
                    self.state = Some(DataReaderState::Closed);
                    #[cfg(feature = "stream-ctrl")]
24
                    {
24
                        imp.status.lock().expect("lock poisoned").record_error(&e);
24
                    }
24
                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
24
                        Ok(0)
                    } else {
                        Err(e.into())
                    };
24
                    return Poll::Ready(result);
                }
100
                Poll::Ready(Ok(())) => {
100
                    // It read a cell!  Continue the loop.
100
                    state = DataReaderState::Open(imp);
100
                }
                Poll::Pending => {
                    // No cells ready, so tell the
                    // caller to get back to us later.
118
                    self.state = Some(DataReaderState::Open(imp));
118
                    return Poll::Pending;
                }
            }
        }
230
    }
}
#[cfg(feature = "tokio")]
impl TokioAsyncRead for DataReaderInner {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<IoResult<()>> {
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
    }
}
impl DataReaderImpl {
    /// Pull as many bytes as we can off of self.pending, and return that
    /// number of bytes.
330
    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
330
        let remainder = &self.pending[self.offset..];
330
        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
330
        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
330
        self.offset += n_to_copy;
330
        n_to_copy
330
    }
    /// Return true iff there are no buffered bytes here to yield
88
    fn buf_is_empty(&self) -> bool {
88
        self.pending.len() == self.offset
88
    }
    /// Load self.pending with the contents of a new data cell.
426
    fn read_cell(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        use ClientDataStreamMsg::*;
426
        let msg = match self.as_mut().project().s.poll_next(cx) {
218
            Poll::Pending => return Poll::Pending,
208
            Poll::Ready(Some(Ok(unparsed))) => match unparsed.decode::<ClientDataStreamMsg>() {
208
                Ok(cell) => cell.into_msg(),
                Err(e) => {
                    self.s.protocol_error();
                    return Poll::Ready(Err(Error::from_bytes_err(e, "message on a data stream")));
                }
            },
            Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
            // TODO: This doesn't seem right to me, but seems to be the behaviour of the code before
            // the refactoring, so I've kept the same behaviour. I think if the cell stream is
            // terminated, we should be returning `None` here and not considering it as an error.
            // The `StreamReceiver` will have already returned an error if the cell stream was
            // terminated without an END message.
            Poll::Ready(None) => return Poll::Ready(Err(Error::NotConnected)),
        };
208
        let result = match msg {
96
            Connected(_) if !self.connected => {
96
                self.connected = true;
                #[cfg(feature = "stream-ctrl")]
96
                {
96
                    self.status
96
                        .lock()
96
                        .expect("poisoned lock")
96
                        .record_connected();
96
                }
96
                Ok(())
            }
            Connected(_) => {
                self.s.protocol_error();
                Err(Error::StreamProto(
                    "Received a second connect cell on a data stream".to_string(),
                ))
            }
88
            Data(d) if self.connected => {
88
                self.add_data(d.into());
88
                Ok(())
            }
            Data(_) => {
                self.s.protocol_error();
                Err(Error::StreamProto(
                    "Received a data cell an unconnected stream".to_string(),
                ))
            }
24
            End(e) => Err(Error::EndReceived(e.reason())),
        };
208
        Poll::Ready(result)
426
    }
    /// Add the data from `d` to the end of our pending bytes.
88
    fn add_data(&mut self, mut d: Vec<u8>) {
88
        if self.buf_is_empty() {
88
            // No data pending?  Just take d as the new pending.
88
            self.pending = d;
88
            self.offset = 0;
88
        } else {
            // TODO(nickm) This has potential to grow `pending` without bound.
            // Fortunately, we don't currently read cells or call this
            // `add_data` method when pending is nonempty—but if we do in the
            // future, we'll have to be careful here.
            self.pending.append(&mut d);
        }
88
    }
}
/// A `CmdChecker` that enforces invariants for outbound data streams.
#[derive(Debug)]
pub(crate) struct OutboundDataCmdChecker {
    /// True if we are expecting to receive a CONNECTED message on this stream.
    expecting_connected: bool,
}
impl Default for OutboundDataCmdChecker {
362
    fn default() -> Self {
362
        Self {
362
            expecting_connected: true,
362
        }
362
    }
}
impl CmdChecker for OutboundDataCmdChecker {
240
    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
        use StreamStatus::*;
240
        match msg.cmd() {
            RelayCmd::CONNECTED => {
106
                if !self.expecting_connected {
4
                    Err(Error::StreamProto(
4
                        "Received CONNECTED twice on a stream.".into(),
4
                    ))
                } else {
102
                    self.expecting_connected = false;
102
                    Ok(Open)
                }
            }
            RelayCmd::DATA => {
108
                if !self.expecting_connected {
108
                    Ok(Open)
                } else {
                    Err(Error::StreamProto(
                        "Received DATA before CONNECTED on a stream".into(),
                    ))
                }
            }
24
            RelayCmd::END => Ok(Closed),
2
            _ => Err(Error::StreamProto(format!(
2
                "Unexpected {} on a data stream!",
2
                msg.cmd()
2
            ))),
        }
240
    }
48
    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
48
        let _ = msg
48
            .decode::<ClientDataStreamMsg>()
48
            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
48
        Ok(())
48
    }
}
impl OutboundDataCmdChecker {
    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
    /// constructed connection.
362
    pub(crate) fn new_any() -> AnyCmdChecker {
362
        Box::<Self>::default()
362
    }
}