1
//! Declare DataStream, a type that wraps DataReader and DataWriter so as to be useful
2
//! for byte-oriented communication.
3

            
4
use crate::{Error, Result};
5
use static_assertions::assert_impl_all;
6
use tor_cell::relaycell::msg::EndReason;
7
use tor_cell::relaycell::{RelayCellFormat, RelayCmd};
8

            
9
use futures::io::{AsyncRead, AsyncWrite};
10
use futures::stream::StreamExt;
11
use futures::task::{Context, Poll};
12
use futures::{Future, Stream};
13
use pin_project::pin_project;
14
use postage::watch;
15

            
16
#[cfg(feature = "tokio")]
17
use tokio_crate::io::ReadBuf;
18
#[cfg(feature = "tokio")]
19
use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
20
#[cfg(feature = "tokio")]
21
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
22
use tor_cell::restricted_msg;
23

            
24
use std::fmt::Debug;
25
use std::io::Result as IoResult;
26
use std::num::NonZero;
27
use std::pin::Pin;
28
#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
29
use std::sync::Arc;
30
#[cfg(feature = "stream-ctrl")]
31
use std::sync::{Mutex, Weak};
32

            
33
use educe::Educe;
34

            
35
use crate::client::ClientTunnel;
36
use crate::memquota::StreamAccount;
37
use crate::stream::StreamReceiver;
38
use crate::stream::StreamTarget;
39
use crate::stream::cmdcheck::{AnyCmdChecker, CmdChecker, StreamStatus};
40
use crate::stream::flow_ctrl::state::StreamRateLimit;
41
use crate::stream::flow_ctrl::xon_xoff::reader::{BufferIsEmpty, XonXoffReader, XonXoffReaderCtrl};
42
use tor_async_utils::rate_limited_writer::{
43
    DynamicRateLimitedWriter, RateLimitedWriter, RateLimitedWriterConfig,
44
};
45
use tor_basic_utils::skip_fmt;
46
use tor_cell::relaycell::msg::Data;
47
use tor_error::internal;
48
use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
49

            
50
/// A stream of [`RateLimitedWriterConfig`] used to update a [`DynamicRateLimitedWriter`].
51
///
52
/// Unfortunately we need to store the result of a [`StreamExt::map`] and [`StreamExt::fuse`] in
53
/// [`DataWriter`], which leaves us with this ugly type.
54
/// We use a type alias to make `DataWriter` a little nicer.
55
type RateConfigStream = futures::stream::Map<
56
    futures::stream::Fuse<watch::Receiver<StreamRateLimit>>,
57
    fn(StreamRateLimit) -> RateLimitedWriterConfig,
58
>;
59

            
60
/// An anonymized stream over the Tor network.
61
///
62
/// For most purposes, you can think of this type as an anonymized
63
/// TCP stream: it can read and write data, and get closed when it's done.
64
///
65
/// [`DataStream`] implements [`futures::io::AsyncRead`] and
66
/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
67
/// traits are expected.
68
///
69
/// # Examples
70
///
71
/// Connecting to an HTTP server and sending a request, using
72
/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
73
///
74
/// ```ignore
75
/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
76
///
77
/// use futures::io::AsyncWriteExt;
78
///
79
/// stream
80
///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
81
///     .await?;
82
///
83
/// // Flushing the stream is important; see below!
84
/// stream.flush().await?;
85
/// ```
86
///
87
/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
88
///
89
/// ```ignore
90
/// use futures::io::AsyncReadExt;
91
///
92
/// let mut buf = Vec::new();
93
/// stream.read_to_end(&mut buf).await?;
94
///
95
/// println!("{}", String::from_utf8_lossy(&buf));
96
/// ```
97
///
98
/// # Usage with Tokio
99
///
100
/// If the `tokio` crate feature is enabled, this type also implements
101
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
102
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
103
/// with code that expects those traits.
104
///
105
/// # Remember to call `flush`!
106
///
107
/// DataStream buffers data internally, in order to write as few cells
108
/// as possible onto the network.  In order to make sure that your
109
/// data has actually been sent, you need to make sure that
110
/// [`AsyncWrite::poll_flush`] runs to completion: probably via
111
/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
112
///
113
/// # Splitting the type
114
///
115
/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
116
/// `DataStream::split` method can be used to split it into those two parts, for more
117
/// convenient usage with e.g. stream combinators.
118
///
119
/// # How long does a stream live?
120
///
121
/// A `DataStream` will live until all references to it are dropped,
122
/// or until it is closed explicitly.
123
///
124
/// If you split the stream into a `DataReader` and a `DataWriter`, it
125
/// will survive until _both_ are dropped, or until it is closed
126
/// explicitly.
127
///
128
/// A stream can also close because of a network error,
129
/// or because the other side of the stream decided to close it.
130
///
131
// # Semver note
132
//
133
// Note that this type is re-exported as a part of the public API of
134
// the `arti-client` crate.  Any changes to its API here in
135
// `tor-proto` need to be reflected above.
136
#[derive(Debug)]
137
pub struct DataStream {
138
    /// Underlying writer for this stream
139
    w: DataWriter,
140
    /// Underlying reader for this stream
141
    r: DataReader,
142
    /// A control object that can be used to monitor and control this stream
143
    /// without needing to own it.
144
    ///
145
    /// Set to `None` if this is not a client stream.
146
    #[cfg(feature = "stream-ctrl")]
147
    ctrl: Option<Arc<ClientDataStreamCtrl>>,
148
}
149
assert_impl_all! { DataStream: Send, Sync }
150

            
151
/// An object used to control and monitor a data stream.
152
///
153
/// # Notes
154
///
155
/// This is a separate type from [`DataStream`] because it's useful to have
156
/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
157
/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
158
/// work correctly.
159
#[cfg(feature = "stream-ctrl")]
160
#[cfg_attr(
161
    feature = "rpc",
162
    derive(derive_deftly::Deftly),
163
    derive_deftly(tor_rpcbase::templates::Object)
164
)]
165
#[derive(Debug)]
166
pub struct ClientDataStreamCtrl {
167
    /// The circuit to which this stream is attached.
168
    ///
169
    /// Note that the stream's reader and writer halves each contain a `StreamTarget`,
170
    /// which in turn has a strong reference to the `ClientCirc`.  So as long as any
171
    /// one of those is alive, this reference will be present.
172
    ///
173
    /// We make this a Weak reference so that once the stream itself is closed,
174
    /// we can't leak circuits.
175
    tunnel: Weak<ClientTunnel>,
176

            
177
    /// Shared user-visible information about the state of this stream.
178
    ///
179
    /// TODO RPC: This will probably want to be a `postage::Watch` or something
180
    /// similar, if and when it stops moving around.
181
    #[cfg(feature = "stream-ctrl")]
182
    status: Arc<Mutex<DataStreamStatus>>,
183

            
184
    /// The memory quota account that should be used for this stream's data
185
    ///
186
    /// Exists to keep the account alive
187
    _memquota: StreamAccount,
188
}
189

            
190
/// The inner writer for [`DataWriter`].
191
///
192
/// This type is responsible for taking bytes and packaging them into cells.
193
/// Rate limiting is implemented in [`DataWriter`] to avoid making this type more complex.
194
#[derive(Debug)]
195
struct DataWriterInner {
196
    /// Internal state for this writer
197
    ///
198
    /// This is stored in an Option so that we can mutate it in the
199
    /// AsyncWrite functions.  It might be possible to do better here,
200
    /// and we should refactor if so.
201
    state: Option<DataWriterState>,
202

            
203
    /// The memory quota account that should be used for this stream's data
204
    ///
205
    /// Exists to keep the account alive
206
    // If we liked, we could make this conditional; see DataReaderInner.memquota
207
    _memquota: StreamAccount,
208

            
209
    /// A control object that can be used to monitor and control this stream
210
    /// without needing to own it.
211
    ///
212
    /// Set to `None` if this is not a client stream.
213
    #[cfg(feature = "stream-ctrl")]
214
    ctrl: Option<Arc<ClientDataStreamCtrl>>,
215
}
216

            
217
/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
218
///
219
/// See the [`DataStream`] docs for more information. In particular, note
220
/// that this writer requires `poll_flush` to complete in order to guarantee that
221
/// all data has been written.
222
///
223
/// # Usage with Tokio
224
///
225
/// If the `tokio` crate feature is enabled, this type also implements
226
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
227
/// with code that expects that trait.
228
///
229
/// # Drop and close
230
///
231
/// Note that dropping a `DataWriter` has no special effect on its own:
232
/// if the `DataWriter` is dropped, the underlying stream will still remain open
233
/// until the `DataReader` is also dropped.
234
///
235
/// If you want the stream to close earlier, use [`close`](futures::io::AsyncWriteExt::close)
236
/// (or [`shutdown`](tokio_crate::io::AsyncWriteExt::shutdown) with `tokio`).
237
///
238
/// Remember that Tor does not support half-open streams:
239
/// If you `close` or `shutdown` a stream,
240
/// the other side will not see the stream as half-open,
241
/// and so will (probably) not finish sending you any in-progress data.
242
/// Do not use `close`/`shutdown` to communicate anything besides
243
/// "I am done using this stream."
244
///
245
// # Semver note
246
//
247
// Note that this type is re-exported as a part of the public API of
248
// the `arti-client` crate.  Any changes to its API here in
249
// `tor-proto` need to be reflected above.
250
#[derive(Debug)]
251
pub struct DataWriter {
252
    /// A wrapper around [`DataWriterInner`] that adds rate limiting.
253
    writer: DynamicRateLimitedWriter<DataWriterInner, RateConfigStream, DynTimeProvider>,
254
}
255

            
256
impl DataWriter {
257
    /// Create a new rate-limited [`DataWriter`] from a [`DataWriterInner`].
258
124
    fn new(
259
124
        inner: DataWriterInner,
260
124
        rate_limit_updates: watch::Receiver<StreamRateLimit>,
261
124
        time_provider: DynTimeProvider,
262
124
    ) -> Self {
263
        /// Converts a `rate` into a `RateLimitedWriterConfig`.
264
184
        fn rate_to_config(rate: StreamRateLimit) -> RateLimitedWriterConfig {
265
184
            let rate = rate.bytes_per_sec();
266
184
            RateLimitedWriterConfig {
267
184
                rate,        // bytes per second
268
184
                burst: rate, // bytes
269
184
                // This number is chosen arbitrarily, but the idea is that we want to balance
270
184
                // between throughput and latency. Assume the user tries to write a large buffer
271
184
                // (~600 bytes). If we set this too small (for example 1), we'll be waking up
272
184
                // frequently and writing a small number of bytes each time to the
273
184
                // `DataWriterInner`, even if this isn't enough bytes to send a cell. If we set this
274
184
                // too large (for example 510), we'll be waking up infrequently to write a larger
275
184
                // number of bytes each time. So even if the `DataWriterInner` has almost a full
276
184
                // cell's worth of data queued (for example 490) and only needs 509-490=19 more
277
184
                // bytes before a cell can be sent, it will block until the rate limiter allows 510
278
184
                // more bytes.
279
184
                //
280
184
                // TODO(arti#2028): Is there an optimal value here?
281
184
                wake_when_bytes_available: NonZero::new(200).expect("200 != 0"), // bytes
282
184
            }
283
184
        }
284

            
285
        // get the current rate from the `watch::Receiver`, which we'll use as the initial rate
286
124
        let initial_rate: StreamRateLimit = *rate_limit_updates.borrow();
287

            
288
        // map the rate update stream to the type required by `DynamicRateLimitedWriter`
289
124
        let rate_limit_updates = rate_limit_updates.fuse().map(rate_to_config as fn(_) -> _);
290

            
291
        // build the rate limiter
292
124
        let writer = RateLimitedWriter::new(inner, &rate_to_config(initial_rate), time_provider);
293
124
        let writer = DynamicRateLimitedWriter::new(writer, rate_limit_updates);
294

            
295
124
        Self { writer }
296
124
    }
297

            
298
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
299
    /// interact with this stream without holding the stream itself.
300
    ///
301
    /// Returns `None` if this is not a client stream.
302
    #[cfg(feature = "stream-ctrl")]
303
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
304
        self.writer.inner().client_stream_ctrl()
305
    }
306
}
307

            
308
impl AsyncWrite for DataWriter {
309
5540
    fn poll_write(
310
5540
        mut self: Pin<&mut Self>,
311
5540
        cx: &mut Context<'_>,
312
5540
        buf: &[u8],
313
5540
    ) -> Poll<IoResult<usize>> {
314
5540
        AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf)
315
5540
    }
316

            
317
32
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
318
32
        AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx)
319
32
    }
320

            
321
16
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
322
16
        AsyncWrite::poll_close(Pin::new(&mut self.writer), cx)
323
16
    }
324
}
325

            
326
#[cfg(feature = "tokio")]
327
impl TokioAsyncWrite for DataWriter {
328
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
329
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
330
    }
331

            
332
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
333
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
334
    }
335

            
336
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
337
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
338
    }
339
}
340

            
341
/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
342
///
343
/// See the [`DataStream`] docs for more information.
344
///
345
/// # Usage with Tokio
346
///
347
/// If the `tokio` crate feature is enabled, this type also implements
348
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
349
/// with code that expects that trait.
350
//
351
// # Semver note
352
//
353
// Note that this type is re-exported as a part of the public API of
354
// the `arti-client` crate.  Any changes to its API here in
355
// `tor-proto` need to be reflected above.
356
#[derive(Debug)]
357
pub struct DataReader {
358
    /// The [`DataReaderInner`] with a wrapper to support XON/XOFF flow control.
359
    reader: XonXoffReader<DataReaderInner>,
360
}
361

            
362
impl DataReader {
363
    /// Create a new [`DataReader`].
364
124
    fn new(reader: DataReaderInner, xon_xoff_reader_ctrl: XonXoffReaderCtrl) -> Self {
365
124
        Self {
366
124
            reader: XonXoffReader::new(xon_xoff_reader_ctrl, reader),
367
124
        }
368
124
    }
369

            
370
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
371
    /// interact with this stream without holding the stream itself.
372
    ///
373
    /// Returns `None` if this is not a client stream.
374
    #[cfg(feature = "stream-ctrl")]
375
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
376
        self.reader.inner().client_stream_ctrl()
377
    }
378
}
379

            
380
impl AsyncRead for DataReader {
381
242
    fn poll_read(
382
242
        mut self: Pin<&mut Self>,
383
242
        cx: &mut Context<'_>,
384
242
        buf: &mut [u8],
385
242
    ) -> Poll<IoResult<usize>> {
386
242
        AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf)
387
242
    }
388

            
389
    fn poll_read_vectored(
390
        mut self: Pin<&mut Self>,
391
        cx: &mut Context<'_>,
392
        bufs: &mut [std::io::IoSliceMut<'_>],
393
    ) -> Poll<IoResult<usize>> {
394
        AsyncRead::poll_read_vectored(Pin::new(&mut self.reader), cx, bufs)
395
    }
396
}
397

            
398
#[cfg(feature = "tokio")]
399
impl TokioAsyncRead for DataReader {
400
    fn poll_read(
401
        self: Pin<&mut Self>,
402
        cx: &mut Context<'_>,
403
        buf: &mut ReadBuf<'_>,
404
    ) -> Poll<IoResult<()>> {
405
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
406
    }
407
}
408

            
409
/// The inner reader for [`DataReader`].
410
///
411
/// This type is responsible for taking stream messages and extracting the stream data from them.
412
/// Flow control logic is implemented in [`DataReader`] to avoid making this type more complex.
413
#[derive(Debug)]
414
pub(crate) struct DataReaderInner {
415
    /// Internal state for this reader.
416
    ///
417
    /// This is stored in an Option so that we can mutate it in
418
    /// poll_read().  It might be possible to do better here, and we
419
    /// should refactor if so.
420
    state: Option<DataReaderState>,
421

            
422
    /// The memory quota account that should be used for this stream's data
423
    ///
424
    /// Exists to keep the account alive
425
    // If we liked, we could make this conditional on not(cfg(feature = "stream-ctrl"))
426
    // since, ClientDataStreamCtrl contains a StreamAccount clone too.  But that seems fragile.
427
    _memquota: StreamAccount,
428

            
429
    /// A control object that can be used to monitor and control this stream
430
    /// without needing to own it.
431
    ///
432
    /// Set to `None` if this is not a client stream.
433
    #[cfg(feature = "stream-ctrl")]
434
    ctrl: Option<Arc<ClientDataStreamCtrl>>,
435
}
436

            
437
impl BufferIsEmpty for DataReaderInner {
438
    /// The result will become stale,
439
    /// so is most accurate immediately after a [`poll_read`](AsyncRead::poll_read).
440
    fn is_empty(mut self: Pin<&mut Self>) -> bool {
441
        match self
442
            .state
443
            .as_mut()
444
            .expect("forgot to put `DataReaderState` back")
445
        {
446
            DataReaderState::Open(imp) => {
447
                // check if the partial cell in `pending` is empty,
448
                // and if the message stream is empty
449
                imp.pending[imp.offset..].is_empty() && imp.s.is_empty()
450
            }
451
            // closed, so any data should have been discarded
452
            DataReaderState::Closed => true,
453
        }
454
    }
455
}
456

            
457
/// Shared status flags for tracking the status of as `DataStream`.
458
///
459
/// We expect to refactor this a bit, so it's not exposed at all.
460
//
461
// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
462
// from various points in this module, we should instead construct
463
// DataStreamStatus as needed from information available elsewhere.  In any
464
// case, we should really  eliminate as much duplicate state here as we can.
465
// (See discussions at !1198 for some challenges with this.)
466
#[cfg(feature = "stream-ctrl")]
467
#[derive(Clone, Debug, Default)]
468
struct DataStreamStatus {
469
    /// True if we've received a CONNECTED message.
470
    //
471
    // TODO: This is redundant with `connected` in DataReaderImpl.
472
    received_connected: bool,
473
    /// True if we have decided to send an END message.
474
    //
475
    // TODO RPC: There is not an easy way to set this from this module!  Really,
476
    // the decision to send an "end" is made when the StreamTarget object is
477
    // dropped, but we don't currently have any way to see when that happens.
478
    // Perhaps we need a different shared StreamStatus object that the
479
    // StreamTarget holds?
480
    sent_end: bool,
481
    /// True if we have received an END message telling us to close the stream.
482
    received_end: bool,
483
    /// True if we have received an error.
484
    ///
485
    /// (This is not a subset or superset of received_end; some errors are END
486
    /// messages but some aren't; some END messages are errors but some aren't.)
487
    received_err: bool,
488
}
489

            
490
#[cfg(feature = "stream-ctrl")]
491
impl DataStreamStatus {
492
    /// Remember that we've received a connected message.
493
124
    fn record_connected(&mut self) {
494
124
        self.received_connected = true;
495
124
    }
496

            
497
    /// Remember that we've received an error of some kind.
498
24
    fn record_error(&mut self, e: &Error) {
499
        // TODO: Probably we should remember the actual error in a box or
500
        // something.  But that means making a redundant copy of the error
501
        // even if nobody will want it.  Do we care?
502
24
        match e {
503
24
            Error::EndReceived(EndReason::DONE) => self.received_end = true,
504
            Error::EndReceived(_) => {
505
                self.received_end = true;
506
                self.received_err = true;
507
            }
508
            _ => self.received_err = true,
509
        }
510
24
    }
511
}
512

            
513
restricted_msg! {
514
    /// An allowable incoming message on a client data stream.
515
    enum ClientDataStreamMsg:RelayMsg {
516
        // SENDME is handled by the reactor.
517
        Data, End, Connected,
518
    }
519
}
520

            
521
// TODO RPC: Should we also implement this trait for everything that holds a
522
// ClientDataStreamCtrl?
523
#[cfg(feature = "stream-ctrl")]
524
impl super::ctrl::ClientStreamCtrl for ClientDataStreamCtrl {
525
    fn tunnel(&self) -> Option<Arc<ClientTunnel>> {
526
        self.tunnel.upgrade()
527
    }
528
}
529

            
530
#[cfg(feature = "stream-ctrl")]
531
impl ClientDataStreamCtrl {
532
    /// Return true if the underlying stream is connected. (That is, if it has
533
    /// received a `CONNECTED` message, and has not been closed.)
534
    pub fn is_connected(&self) -> bool {
535
        let s = self.status.lock().expect("poisoned lock");
536
        s.received_connected && !(s.sent_end || s.received_end || s.received_err)
537
    }
538

            
539
    // TODO RPC: Add more functions once we have the desired API more nailed
540
    // down.
541
}
542

            
543
impl DataStream {
544
    /// Wrap raw stream receiver and target parts as a DataStream.
545
    ///
546
    /// For non-optimistic stream, function `wait_for_connection`
547
    /// must be called after to make sure CONNECTED is received.
548
96
    pub(crate) fn new<P: SleepProvider + CoarseTimeProvider>(
549
96
        time_provider: P,
550
96
        receiver: StreamReceiver,
551
96
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
552
96
        target: StreamTarget,
553
96
        memquota: StreamAccount,
554
96
    ) -> Self {
555
96
        Self::new_inner(
556
96
            time_provider,
557
96
            receiver,
558
96
            xon_xoff_reader_ctrl,
559
96
            target,
560
            false,
561
96
            memquota,
562
        )
563
96
    }
564

            
565
    /// Wrap raw stream receiver and target parts as a connected DataStream.
566
    ///
567
    /// Unlike [`DataStream::new`], this creates a `DataStream` that does not expect to receive a
568
    /// CONNECTED cell.
569
    ///
570
    /// This is used by hidden services, exit relays, and directory servers to accept streams.
571
    #[cfg(any(feature = "hs-service", feature = "relay"))]
572
28
    pub(crate) fn new_connected<P: SleepProvider + CoarseTimeProvider>(
573
28
        time_provider: P,
574
28
        receiver: StreamReceiver,
575
28
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
576
28
        target: StreamTarget,
577
28
        memquota: StreamAccount,
578
28
    ) -> Self {
579
28
        Self::new_inner(
580
28
            time_provider,
581
28
            receiver,
582
28
            xon_xoff_reader_ctrl,
583
28
            target,
584
            true,
585
28
            memquota,
586
        )
587
28
    }
588

            
589
    /// The shared implementation of the `new*()` functions.
590
124
    fn new_inner<P: SleepProvider + CoarseTimeProvider>(
591
124
        time_provider: P,
592
124
        receiver: StreamReceiver,
593
124
        xon_xoff_reader_ctrl: XonXoffReaderCtrl,
594
124
        target: StreamTarget,
595
124
        connected: bool,
596
124
        memquota: StreamAccount,
597
124
    ) -> Self {
598
124
        let relay_cell_format = target.relay_cell_format();
599
124
        let out_buf_len = Data::max_body_len(relay_cell_format);
600
124
        let rate_limit_stream = target.rate_limit_stream().clone();
601

            
602
        #[cfg(feature = "stream-ctrl")]
603
124
        let status = {
604
124
            let mut data_stream_status = DataStreamStatus::default();
605
124
            if connected {
606
28
                data_stream_status.record_connected();
607
96
            }
608
124
            Arc::new(Mutex::new(data_stream_status))
609
        };
610

            
611
        #[cfg(feature = "stream-ctrl")]
612
124
        let ctrl = {
613
124
            let tunnel = match target.tunnel() {
614
120
                crate::stream::Tunnel::Client(t) => Some(Arc::downgrade(t)),
615
                #[cfg(feature = "relay")]
616
4
                crate::stream::Tunnel::Relay(_) => None,
617
            };
618

            
619
124
            tunnel.map(|tunnel| {
620
120
                Arc::new(ClientDataStreamCtrl {
621
120
                    tunnel,
622
120
                    status: status.clone(),
623
120
                    _memquota: memquota.clone(),
624
120
                })
625
120
            })
626
        };
627
124
        let r = DataReaderInner {
628
124
            state: Some(DataReaderState::Open(DataReaderImpl {
629
124
                s: receiver,
630
124
                pending: Vec::new(),
631
124
                offset: 0,
632
124
                connected,
633
124
                #[cfg(feature = "stream-ctrl")]
634
124
                status: status.clone(),
635
124
            })),
636
124
            _memquota: memquota.clone(),
637
124
            #[cfg(feature = "stream-ctrl")]
638
124
            ctrl: ctrl.clone(),
639
124
        };
640
124
        let w = DataWriterInner {
641
124
            state: Some(DataWriterState::Ready(DataWriterImpl {
642
124
                s: target,
643
124
                buf: vec![0; out_buf_len].into_boxed_slice(),
644
124
                n_pending: 0,
645
124
                #[cfg(feature = "stream-ctrl")]
646
124
                status,
647
124
                relay_cell_format,
648
124
            })),
649
124
            _memquota: memquota,
650
124
            #[cfg(feature = "stream-ctrl")]
651
124
            ctrl: ctrl.clone(),
652
124
        };
653

            
654
124
        let time_provider = DynTimeProvider::new(time_provider);
655

            
656
124
        DataStream {
657
124
            w: DataWriter::new(w, rate_limit_stream, time_provider),
658
124
            r: DataReader::new(r, xon_xoff_reader_ctrl),
659
124
            #[cfg(feature = "stream-ctrl")]
660
124
            ctrl,
661
124
        }
662
124
    }
663

            
664
    /// Divide this DataStream into its constituent parts.
665
28
    pub fn split(self) -> (DataReader, DataWriter) {
666
28
        (self.r, self.w)
667
28
    }
668

            
669
    /// Wait until a CONNECTED cell is received, or some other cell
670
    /// is received to indicate an error.
671
    ///
672
    /// Does nothing if this stream is already connected.
673
126
    pub async fn wait_for_connection(&mut self) -> Result<()> {
674
        // We must put state back before returning
675
84
        let state = self
676
84
            .r
677
84
            .reader
678
84
            .inner_mut()
679
84
            .state
680
84
            .take()
681
84
            .expect("Missing state in DataReaderInner");
682

            
683
84
        if let DataReaderState::Open(mut imp) = state {
684
84
            let result = if imp.connected {
685
                Ok(())
686
            } else {
687
                // This succeeds if the cell is CONNECTED, and fails otherwise.
688
186
                std::future::poll_fn(|cx| Pin::new(&mut imp).read_cell(cx)).await
689
            };
690
84
            self.r.reader.inner_mut().state = Some(match result {
691
                Err(_) => DataReaderState::Closed,
692
84
                Ok(_) => DataReaderState::Open(imp),
693
            });
694
84
            result
695
        } else {
696
            Err(Error::from(internal!(
697
                "Expected ready state, got {:?}",
698
                state
699
            )))
700
        }
701
84
    }
702

            
703
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
704
    /// interact with this stream without holding the stream itself.
705
    #[cfg(feature = "stream-ctrl")]
706
    pub fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
707
        self.ctrl.as_ref()
708
    }
709
}
710

            
711
impl AsyncRead for DataStream {
712
242
    fn poll_read(
713
242
        mut self: Pin<&mut Self>,
714
242
        cx: &mut Context<'_>,
715
242
        buf: &mut [u8],
716
242
    ) -> Poll<IoResult<usize>> {
717
242
        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
718
242
    }
719
}
720

            
721
#[cfg(feature = "tokio")]
722
impl TokioAsyncRead for DataStream {
723
    fn poll_read(
724
        self: Pin<&mut Self>,
725
        cx: &mut Context<'_>,
726
        buf: &mut ReadBuf<'_>,
727
    ) -> Poll<IoResult<()>> {
728
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
729
    }
730
}
731

            
732
impl AsyncWrite for DataStream {
733
5540
    fn poll_write(
734
5540
        mut self: Pin<&mut Self>,
735
5540
        cx: &mut Context<'_>,
736
5540
        buf: &[u8],
737
5540
    ) -> Poll<IoResult<usize>> {
738
5540
        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
739
5540
    }
740
32
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
741
32
        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
742
32
    }
743
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
744
        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
745
    }
746
}
747

            
748
#[cfg(feature = "tokio")]
749
impl TokioAsyncWrite for DataStream {
750
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
751
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
752
    }
753

            
754
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
755
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
756
    }
757

            
758
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
759
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
760
    }
761
}
762

            
763
/// Helper type: Like BoxFuture, but also requires that the future be Sync.
764
type BoxSyncFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
765

            
766
/// An enumeration for the state of a DataWriter.
767
///
768
/// We have to use an enum here because, for as long as we're waiting
769
/// for a flush operation to complete, the future returned by
770
/// `flush_cell()` owns the DataWriterImpl.
771
#[derive(Educe)]
772
#[educe(Debug)]
773
enum DataWriterState {
774
    /// The writer has closed or gotten an error: nothing more to do.
775
    Closed,
776
    /// The writer is not currently flushing; more data can get queued
777
    /// immediately.
778
    Ready(DataWriterImpl),
779
    /// The writer is flushing a cell.
780
    Flushing(
781
        #[educe(Debug(method = "skip_fmt"))] //
782
        BoxSyncFuture<'static, (DataWriterImpl, Result<()>)>,
783
    ),
784
}
785

            
786
/// Internal: the write part of a DataStream
787
#[derive(Educe)]
788
#[educe(Debug)]
789
struct DataWriterImpl {
790
    /// The underlying StreamTarget object.
791
    s: StreamTarget,
792

            
793
    /// Buffered data to send over the connection.
794
    ///
795
    /// This buffer is currently allocated using a number of bytes
796
    /// equal to the maximum that we can package at a time.
797
    //
798
    // TODO: this buffer is probably smaller than we want, but it's good
799
    // enough for now.  If we _do_ make it bigger, we'll have to change
800
    // our use of Data::split_from to handle the case where we can't fit
801
    // all the data.
802
    #[educe(Debug(method = "skip_fmt"))]
803
    buf: Box<[u8]>,
804

            
805
    /// Number of unflushed bytes in buf.
806
    n_pending: usize,
807

            
808
    /// Relay cell format in use
809
    relay_cell_format: RelayCellFormat,
810

            
811
    /// Shared user-visible information about the state of this stream.
812
    #[cfg(feature = "stream-ctrl")]
813
    status: Arc<Mutex<DataStreamStatus>>,
814
}
815

            
816
impl DataWriterInner {
817
    /// See [`DataWriter::client_stream_ctrl`].
818
    #[cfg(feature = "stream-ctrl")]
819
    fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
820
        self.ctrl.as_ref()
821
    }
822

            
823
    /// Helper for poll_flush() and poll_close(): Performs a flush, then
824
    /// closes the stream if should_close is true.
825
48
    fn poll_flush_impl(
826
48
        mut self: Pin<&mut Self>,
827
48
        cx: &mut Context<'_>,
828
48
        should_close: bool,
829
48
    ) -> Poll<IoResult<()>> {
830
48
        let state = self.state.take().expect("Missing state in DataWriter");
831

            
832
        // TODO: this whole function is a bit copy-pasted.
833
48
        let mut future: BoxSyncFuture<_> = match state {
834
48
            DataWriterState::Ready(imp) => {
835
48
                if imp.n_pending == 0 {
836
                    // Nothing to flush!
837
24
                    if should_close {
838
                        // We need to actually continue with this function to do the closing.
839
                        // Thus, make a future that does nothing and is ready immediately.
840
16
                        Box::pin(futures::future::ready((imp, Ok(()))))
841
                    } else {
842
                        // There's nothing more to do; we can return.
843
8
                        self.state = Some(DataWriterState::Ready(imp));
844
8
                        return Poll::Ready(Ok(()));
845
                    }
846
                } else {
847
                    // We need to flush the buffer's contents; Make a future for that.
848
24
                    Box::pin(imp.flush_buf())
849
                }
850
            }
851
            DataWriterState::Flushing(fut) => fut,
852
            DataWriterState::Closed => {
853
                self.state = Some(DataWriterState::Closed);
854
                return Poll::Ready(Err(Error::NotConnected.into()));
855
            }
856
        };
857

            
858
40
        match future.as_mut().poll(cx) {
859
            Poll::Ready((_imp, Err(e))) => {
860
                self.state = Some(DataWriterState::Closed);
861
                Poll::Ready(Err(e.into()))
862
            }
863
40
            Poll::Ready((mut imp, Ok(()))) => {
864
40
                if should_close {
865
16
                    // Tell the StreamTarget to close, so that the reactor
866
16
                    // realizes that we are done sending. (Dropping `imp.s` does not
867
16
                    // suffice, since there may be other clones of it.  In particular,
868
16
                    // the StreamReceiver has one, which it uses to keep the stream
869
16
                    // open, among other things.)
870
16
                    imp.s.close();
871
16

            
872
16
                    #[cfg(feature = "stream-ctrl")]
873
16
                    {
874
16
                        // TODO RPC:  This is not sufficient to track every case
875
16
                        // where we might have sent an End.  See note on the
876
16
                        // `sent_end` field.
877
16
                        imp.status.lock().expect("lock poisoned").sent_end = true;
878
16
                    }
879
16
                    self.state = Some(DataWriterState::Closed);
880
24
                } else {
881
24
                    self.state = Some(DataWriterState::Ready(imp));
882
24
                }
883
40
                Poll::Ready(Ok(()))
884
            }
885
            Poll::Pending => {
886
                self.state = Some(DataWriterState::Flushing(future));
887
                Poll::Pending
888
            }
889
        }
890
48
    }
891
}
892

            
893
impl AsyncWrite for DataWriterInner {
894
5540
    fn poll_write(
895
5540
        mut self: Pin<&mut Self>,
896
5540
        cx: &mut Context<'_>,
897
5540
        buf: &[u8],
898
5540
    ) -> Poll<IoResult<usize>> {
899
5540
        if buf.is_empty() {
900
            return Poll::Ready(Ok(0));
901
5540
        }
902

            
903
5540
        let state = self.state.take().expect("Missing state in DataWriter");
904

            
905
5540
        let mut future = match state {
906
5500
            DataWriterState::Ready(mut imp) => {
907
5500
                let n_queued = imp.queue_bytes(buf);
908
5500
                if n_queued != 0 {
909
1220
                    self.state = Some(DataWriterState::Ready(imp));
910
1220
                    return Poll::Ready(Ok(n_queued));
911
4280
                }
912
                // we couldn't queue anything, so the current cell must be full.
913
4280
                Box::pin(imp.flush_buf())
914
            }
915
40
            DataWriterState::Flushing(fut) => fut,
916
            DataWriterState::Closed => {
917
                self.state = Some(DataWriterState::Closed);
918
                return Poll::Ready(Err(Error::NotConnected.into()));
919
            }
920
        };
921

            
922
4320
        match future.as_mut().poll(cx) {
923
            Poll::Ready((_imp, Err(e))) => {
924
                #[cfg(feature = "stream-ctrl")]
925
                {
926
                    _imp.status.lock().expect("lock poisoned").record_error(&e);
927
                }
928
                self.state = Some(DataWriterState::Closed);
929
                Poll::Ready(Err(e.into()))
930
            }
931
4280
            Poll::Ready((mut imp, Ok(()))) => {
932
                // Great!  We're done flushing.  Queue as much as we can of this
933
                // cell.
934
4280
                let n_queued = imp.queue_bytes(buf);
935
4280
                self.state = Some(DataWriterState::Ready(imp));
936
4280
                Poll::Ready(Ok(n_queued))
937
            }
938
            Poll::Pending => {
939
40
                self.state = Some(DataWriterState::Flushing(future));
940
40
                Poll::Pending
941
            }
942
        }
943
5540
    }
944

            
945
32
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
946
32
        self.poll_flush_impl(cx, false)
947
32
    }
948

            
949
16
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
950
16
        self.poll_flush_impl(cx, true)
951
16
    }
952
}
953

            
954
#[cfg(feature = "tokio")]
955
impl TokioAsyncWrite for DataWriterInner {
956
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
957
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
958
    }
959

            
960
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
961
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
962
    }
963

            
964
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
965
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
966
    }
967
}
968

            
969
impl DataWriterImpl {
970
    /// Try to flush the current buffer contents as a data cell.
971
6456
    async fn flush_buf(mut self) -> (Self, Result<()>) {
972
4304
        let result = if let Some((cell, remainder)) =
973
4304
            Data::try_split_from(self.relay_cell_format, &self.buf[..self.n_pending])
974
        {
975
            // TODO: Eventually we may want a larger buffer; if we do,
976
            // this invariant will become false.
977
4304
            assert!(remainder.is_empty());
978
4304
            self.n_pending = 0;
979
4304
            self.s.send(cell.into()).await
980
        } else {
981
            Ok(())
982
        };
983

            
984
4304
        (self, result)
985
4304
    }
986

            
987
    /// Add as many bytes as possible from `b` to our internal buffer;
988
    /// return the number we were able to add.
989
9780
    fn queue_bytes(&mut self, b: &[u8]) -> usize {
990
9780
        let empty_space = &mut self.buf[self.n_pending..];
991
9780
        if empty_space.is_empty() {
992
            // that is, len == 0
993
4280
            return 0;
994
5500
        }
995

            
996
5500
        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
997
5500
        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
998
5500
        self.n_pending += n_to_copy;
999
5500
        n_to_copy
9780
    }
}
impl DataReaderInner {
    /// Return a [`ClientDataStreamCtrl`] object that can be used to monitor and
    /// interact with this stream without holding the stream itself.
    #[cfg(feature = "stream-ctrl")]
    pub(crate) fn client_stream_ctrl(&self) -> Option<&Arc<ClientDataStreamCtrl>> {
        self.ctrl.as_ref()
    }
}
/// An enumeration for the state of a [`DataReaderInner`].
// TODO: We don't need to implement the state in this way anymore now that we've removed the saved
// future. There are a few ways we could simplify this. See:
// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3076#note_3218210
#[derive(Educe)]
#[educe(Debug)]
// We allow this since it's expected that streams will spend most of their time in the `Open` state,
// and will be cleaned up shortly after closing.
#[allow(clippy::large_enum_variant)]
enum DataReaderState {
    /// In this state we have received an end cell or an error.
    Closed,
    /// In this state the reader is open.
    Open(DataReaderImpl),
}
/// Wrapper for the read part of a [`DataStream`].
#[derive(Educe)]
#[educe(Debug)]
#[pin_project]
struct DataReaderImpl {
    /// The underlying StreamReceiver object.
    #[educe(Debug(method = "skip_fmt"))]
    #[pin]
    s: StreamReceiver,
    /// If present, data that we received on this stream but have not
    /// been able to send to the caller yet.
    // TODO: This data structure is probably not what we want, but
    // it's good enough for now.
    #[educe(Debug(method = "skip_fmt"))]
    pending: Vec<u8>,
    /// Index into pending to show what we've already read.
    offset: usize,
    /// If true, we have received a CONNECTED cell on this stream.
    connected: bool,
    /// Shared user-visible information about the state of this stream.
    #[cfg(feature = "stream-ctrl")]
    status: Arc<Mutex<DataStreamStatus>>,
}
impl AsyncRead for DataReaderInner {
242
    fn poll_read(
242
        mut self: Pin<&mut Self>,
242
        cx: &mut Context<'_>,
242
        buf: &mut [u8],
242
    ) -> Poll<IoResult<usize>> {
        // We're pulling the state object out of the reader.  We MUST
        // put it back before this function returns.
242
        let mut state = self.state.take().expect("Missing state in DataReaderInner");
        loop {
346
            let mut imp = match state {
346
                DataReaderState::Open(mut imp) => {
                    // There may be data to read already.
346
                    let n_copied = imp.extract_bytes(buf);
346
                    if n_copied != 0 || buf.is_empty() {
                        // We read data into the buffer, or the buffer was 0-len to begin with.
                        // Tell the caller.
92
                        self.state = Some(DataReaderState::Open(imp));
92
                        return Poll::Ready(Ok(n_copied));
254
                    }
                    // No data available!  We have to try reading.
254
                    imp
                }
                DataReaderState::Closed => {
                    // TODO: Why are we returning an error rather than continuing to return EOF?
                    self.state = Some(DataReaderState::Closed);
                    return Poll::Ready(Err(Error::NotConnected.into()));
                }
            };
            // See if a cell is ready.
254
            match Pin::new(&mut imp).read_cell(cx) {
24
                Poll::Ready(Err(e)) => {
                    // There aren't any survivable errors in the current
                    // design.
24
                    self.state = Some(DataReaderState::Closed);
                    #[cfg(feature = "stream-ctrl")]
24
                    {
24
                        imp.status.lock().expect("lock poisoned").record_error(&e);
24
                    }
24
                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
24
                        Ok(0)
                    } else {
                        Err(e.into())
                    };
24
                    return Poll::Ready(result);
                }
104
                Poll::Ready(Ok(())) => {
104
                    // It read a cell!  Continue the loop.
104
                    state = DataReaderState::Open(imp);
104
                }
                Poll::Pending => {
                    // No cells ready, so tell the
                    // caller to get back to us later.
126
                    self.state = Some(DataReaderState::Open(imp));
126
                    return Poll::Pending;
                }
            }
        }
242
    }
}
#[cfg(feature = "tokio")]
impl TokioAsyncRead for DataReaderInner {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<IoResult<()>> {
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
    }
}
impl DataReaderImpl {
    /// Pull as many bytes as we can off of self.pending, and return that
    /// number of bytes.
346
    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
346
        let remainder = &self.pending[self.offset..];
346
        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
346
        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
346
        self.offset += n_to_copy;
346
        n_to_copy
346
    }
    /// Return true iff there are no buffered bytes here to yield
92
    fn buf_is_empty(&self) -> bool {
92
        self.pending.len() == self.offset
92
    }
    /// Load self.pending with the contents of a new data cell.
440
    fn read_cell(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        use ClientDataStreamMsg::*;
440
        let msg = match self.as_mut().project().s.poll_next(cx) {
228
            Poll::Pending => return Poll::Pending,
212
            Poll::Ready(Some(Ok(unparsed))) => match unparsed.decode::<ClientDataStreamMsg>() {
212
                Ok(cell) => cell.into_msg(),
                Err(e) => {
                    self.s.protocol_error();
                    return Poll::Ready(Err(Error::from_bytes_err(e, "message on a data stream")));
                }
            },
            Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
            // TODO: This doesn't seem right to me, but seems to be the behaviour of the code before
            // the refactoring, so I've kept the same behaviour. I think if the cell stream is
            // terminated, we should be returning `None` here and not considering it as an error.
            // The `StreamReceiver` will have already returned an error if the cell stream was
            // terminated without an END message.
            Poll::Ready(None) => return Poll::Ready(Err(Error::NotConnected)),
        };
212
        let result = match msg {
96
            Connected(_) if !self.connected => {
96
                self.connected = true;
                #[cfg(feature = "stream-ctrl")]
96
                {
96
                    self.status
96
                        .lock()
96
                        .expect("poisoned lock")
96
                        .record_connected();
96
                }
96
                Ok(())
            }
            Connected(_) => {
                self.s.protocol_error();
                Err(Error::StreamProto(
                    "Received a second connect cell on a data stream".to_string(),
                ))
            }
92
            Data(d) if self.connected => {
92
                self.add_data(d.into());
92
                Ok(())
            }
            Data(_) => {
                self.s.protocol_error();
                Err(Error::StreamProto(
                    "Received a data cell an unconnected stream".to_string(),
                ))
            }
24
            End(e) => Err(Error::EndReceived(e.reason())),
        };
212
        Poll::Ready(result)
440
    }
    /// Add the data from `d` to the end of our pending bytes.
92
    fn add_data(&mut self, mut d: Vec<u8>) {
92
        if self.buf_is_empty() {
92
            // No data pending?  Just take d as the new pending.
92
            self.pending = d;
92
            self.offset = 0;
92
        } else {
            // TODO(nickm) This has potential to grow `pending` without bound.
            // Fortunately, we don't currently read cells or call this
            // `add_data` method when pending is nonempty—but if we do in the
            // future, we'll have to be careful here.
            self.pending.append(&mut d);
        }
92
    }
}
/// A `CmdChecker` that enforces invariants for outbound data streams.
#[derive(Debug)]
pub(crate) struct OutboundDataCmdChecker {
    /// True if we are expecting to receive a CONNECTED message on this stream.
    expecting_connected: bool,
}
impl Default for OutboundDataCmdChecker {
362
    fn default() -> Self {
362
        Self {
362
            expecting_connected: true,
362
        }
362
    }
}
impl CmdChecker for OutboundDataCmdChecker {
1200
    fn check_msg(&mut self, msg: &tor_cell::relaycell::UnparsedRelayMsg) -> Result<StreamStatus> {
        use StreamStatus::*;
1200
        match msg.cmd() {
            RelayCmd::CONNECTED => {
106
                if !self.expecting_connected {
4
                    Err(Error::StreamProto(
4
                        "Received CONNECTED twice on a stream.".into(),
4
                    ))
                } else {
102
                    self.expecting_connected = false;
102
                    Ok(Open)
                }
            }
            RelayCmd::DATA => {
1068
                if !self.expecting_connected {
1068
                    Ok(Open)
                } else {
                    Err(Error::StreamProto(
                        "Received DATA before CONNECTED on a stream".into(),
                    ))
                }
            }
24
            RelayCmd::END => Ok(Closed),
2
            _ => Err(Error::StreamProto(format!(
2
                "Unexpected {} on a data stream!",
2
                msg.cmd()
2
            ))),
        }
1200
    }
1008
    fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
1008
        let _ = msg
1008
            .decode::<ClientDataStreamMsg>()
1008
            .map_err(|err| Error::from_bytes_err(err, "cell on half-closed stream"))?;
1008
        Ok(())
1008
    }
}
impl OutboundDataCmdChecker {
    /// Return a new boxed `DataCmdChecker` in a state suitable for a newly
    /// constructed connection.
362
    pub(crate) fn new_any() -> AnyCmdChecker {
362
        Box::<Self>::default()
362
    }
}