1
//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2
//!
3
//! Channels form the basis of the rest of the Tor protocol: they are
4
//! the only way for two Tor instances to talk.
5
//!
6
//! Channels are not useful directly for application requests: after
7
//! making a channel, it needs to get used to build circuits, and the
8
//! circuits are used to anonymize streams.  The streams are the
9
//! objects corresponding to directory requests.
10
//!
11
//! In general, you shouldn't try to manage channels on your own;
12
//! use the `tor-chanmgr` crate instead.
13
//!
14
//! To launch a channel:
15
//!
16
//!  * Create a TLS connection as an object that implements AsyncRead +
17
//!    AsyncWrite + StreamOps, and pass it to a channel builder. This will
18
//!    yield an [crate::client::channel::handshake::ClientInitiatorHandshake] that represents
19
//!    the state of the handshake.
20
//!  * Call [crate::client::channel::handshake::ClientInitiatorHandshake::connect] on the result
21
//!    to negotiate the rest of the handshake.  This will verify
22
//!    syntactic correctness of the handshake, but not its cryptographic
23
//!    integrity.
24
//!  * Call handshake::UnverifiedChannel::check on the result.  This
25
//!    finishes the cryptographic checks.
26
//!  * Call handshake::VerifiedChannel::finish on the result. This
27
//!    completes the handshake and produces an open channel and Reactor.
28
//!  * Launch an asynchronous task to call the reactor's run() method.
29
//!
30
//! One you have a running channel, you can create circuits on it with
31
//! its [Channel::new_tunnel] method.  See
32
//! [crate::client::circuit::PendingClientTunnel] for information on how to
33
//! proceed from there.
34
//!
35
//! # Design
36
//!
37
//! For now, this code splits the channel into two pieces: a "Channel"
38
//! object that can be used by circuits to write cells onto the
39
//! channel, and a "Reactor" object that runs as a task in the
40
//! background, to read channel cells and pass them to circuits as
41
//! appropriate.
42
//!
43
//! I'm not at all sure that's the best way to do that, but it's what
44
//! I could think of.
45
//!
46
//! # Limitations
47
//!
48
//! TODO: There is no rate limiting or fairness.
49

            
50
/// The size of the channel buffer for communication between `Channel` and its reactor.
51
pub const CHANNEL_BUFFER_SIZE: usize = 128;
52

            
53
pub(crate) mod circmap;
54
pub(crate) mod handler;
55
pub(crate) mod handshake;
56
pub mod kist;
57
mod msg;
58
pub mod padding;
59
pub mod params;
60
mod reactor;
61
mod unique_id;
62

            
63
pub use crate::channel::params::*;
64
pub(crate) use crate::channel::reactor::Reactor;
65
use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream};
66
pub use crate::channel::unique_id::UniqId;
67
use crate::client::circuit::PendingClientTunnel;
68
use crate::client::circuit::padding::{PaddingController, QueuedCellPaddingInfo};
69
use crate::memquota::{ChannelAccount, CircuitAccount, SpecificAccount as _};
70
use crate::peer::PeerInfo;
71
use crate::util::err::ChannelClosed;
72
use crate::util::oneshot_broadcast;
73
use crate::util::timeout::TimeoutEstimator;
74
use crate::util::ts::AtomicOptTimestamp;
75
use crate::{ClockSkew, client};
76
use crate::{Error, Result};
77
use cfg_if::cfg_if;
78
use reactor::BoxedChannelStreamOps;
79
use safelog::{MaybeSensitive, sensitive as sv};
80
use std::future::{Future, IntoFuture};
81
use std::net::IpAddr;
82
use std::pin::Pin;
83
use std::sync::{Mutex, MutexGuard};
84
use std::time::Duration;
85
use tor_cell::chancell::ChanMsg;
86
use tor_cell::chancell::{AnyChanCell, CircId, msg::Netinfo, msg::PaddingNegotiate};
87
use tor_error::internal;
88
use tor_linkspec::{HasRelayIds, OwnedChanTarget};
89
use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
90
use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, Runtime, SleepProvider};
91

            
92
#[cfg(feature = "circ-padding")]
93
use tor_async_utils::counting_streams::{self, CountingSink, CountingStream};
94

            
95
#[cfg(feature = "relay")]
96
use {
97
    crate::channel::reactor::CreateRequestHandlerAndData, crate::circuit::CircuitRxReceiver,
98
    crate::relay::channel::create_handler::CreateRequestHandler,
99
    tor_llcrypto::pk::ed25519::Ed25519Identity, tor_llcrypto::pk::rsa::RsaIdentity,
100
};
101

            
102
/// Imports that are re-exported pub if feature `testing` is enabled
103
///
104
/// Putting them together in a little module like this allows us to select the
105
/// visibility for all of these things together.
106
mod testing_exports {
107
    #![allow(unreachable_pub)]
108
    pub use super::reactor::CtrlMsg;
109
    pub use crate::circuit::celltypes::CreateResponse;
110
}
111
#[cfg(feature = "testing")]
112
pub use testing_exports::*;
113
#[cfg(not(feature = "testing"))]
114
use testing_exports::*;
115

            
116
use asynchronous_codec;
117
use futures::channel::mpsc;
118
use futures::io::{AsyncRead, AsyncWrite};
119
use oneshot_fused_workaround as oneshot;
120

            
121
use educe::Educe;
122
use futures::{FutureExt as _, Sink};
123
use std::result::Result as StdResult;
124
use std::sync::Arc;
125
use std::task::{Context, Poll};
126

            
127
use tracing::{instrument, trace};
128

            
129
// reexport
130
pub use super::client::channel::handshake::ClientInitiatorHandshake;
131
#[cfg(feature = "relay")]
132
pub use super::relay::channel::handshake::RelayInitiatorHandshake;
133
pub(crate) use crate::channel::handler::{ClogDigest, SlogDigest};
134
use crate::channel::unique_id::CircUniqIdContext;
135

            
136
use kist::KistParams;
137

            
138
/// This indicate what type of channel it is. It allows us to decide for the correct channel cell
139
/// state machines and authentication process (if any).
140
///
141
/// It is created when a channel is requested for creation which means the subsystem wanting to
142
/// open a channel needs to know what type it wants.
143
#[derive(Clone, Copy, Debug, derive_more::Display)]
144
#[non_exhaustive]
145
pub enum ChannelType {
146
    /// Client: Initiated from a client to a relay. Client is unauthenticated and relay is
147
    /// authenticated.
148
    ClientInitiator,
149
    /// Relay: Initiating as a relay to a relay. Both sides are authenticated.
150
    RelayInitiator,
151
    /// Relay: Responding as a relay to a relay or client. Authenticated or Unauthenticated.
152
    RelayResponder {
153
        /// Indicate if the channel is authenticated. Responding as a relay can be either from a
154
        /// Relay (authenticated) or a Client/Bridge (Unauthenticated). We only know this
155
        /// information once the handshake is completed.
156
        ///
157
        /// This side is always authenticated, the other side can be if a relay or not if
158
        /// bridge/client. This is set to false unless we end up authenticating the other side
159
        /// meaning a relay.
160
        authenticated: bool,
161
    },
162
}
163

            
164
impl ChannelType {
165
    /// Set that this channel type is now authenticated. This only applies to RelayResponder.
166
    pub(crate) fn set_authenticated(&mut self) {
167
        if let Self::RelayResponder { authenticated } = self {
168
            *authenticated = true;
169
        }
170
    }
171
}
172

            
173
/// A channel cell frame used for sending and receiving cells on a channel. The handler takes care
174
/// of the cell codec transition depending in which state the channel is.
175
///
176
/// ChannelFrame is used to basically handle all in and outbound cells on a channel for its entire
177
/// lifetime.
178
pub(crate) type ChannelFrame<T> = asynchronous_codec::Framed<T, handler::ChannelCellHandler>;
179

            
180
/// An entry in a channel's queue of cells to be flushed.
181
pub(crate) type ChanCellQueueEntry = (AnyChanCell, Option<QueuedCellPaddingInfo>);
182

            
183
/// Helper: Return a new channel frame [ChannelFrame] from an object implementing AsyncRead + AsyncWrite. In the
184
/// tor context, it is always a TLS stream.
185
///
186
/// The ty (type) argument needs to be able to transform into a [handler::ChannelCellHandler] which would
187
/// generally be a [ChannelType].
188
94
pub(crate) fn new_frame<T, I>(tls: T, ty: I) -> ChannelFrame<T>
189
94
where
190
94
    T: AsyncRead + AsyncWrite,
191
94
    I: Into<handler::ChannelCellHandler>,
192
{
193
94
    let mut framed = asynchronous_codec::Framed::new(tls, ty.into());
194
94
    framed.set_send_high_water_mark(32 * 1024);
195
94
    framed
196
94
}
197

            
198
/// Canonical state between this channel and its peer. This is inferred from the [`Netinfo`]
199
/// received during the channel handshake.
200
///
201
/// A connection is "canonical" if the TCP connection's peer IP address matches an address
202
/// that the relay itself claims in its [`Netinfo`] cell.
203
#[derive(Debug)]
204
pub(crate) struct Canonicity {
205
    /// The peer has proven this connection is canonical for its address: at least one NETINFO "my
206
    /// address" matches the observed TCP peer address.
207
    pub(crate) peer_is_canonical: bool,
208
    /// We appear canonical from the peer's perspective: its NETINFO "other address" matches our
209
    /// advertised relay address.
210
    pub(crate) canonical_to_peer: bool,
211
}
212

            
213
impl Canonicity {
214
    /// Using a [`Netinfo`], build the canonicity object with the given addresses.
215
    ///
216
    /// The `my_addrs` are the advertised address of this relay or empty if a client/bridge as they
217
    /// do not advertise or expose a reachable address.
218
    ///
219
    /// The `peer_addr` is the IP address we believe the peer has. In other words, it is either the
220
    /// IP we used to connect to or the address we see in the accept() phase of the connection.
221
    ///
222
    /// It can be None if we used a non-IP address to connect to the peer (PT).
223
52
    pub(crate) fn from_netinfo(
224
52
        netinfo: &Netinfo,
225
52
        my_addrs: &[IpAddr],
226
52
        peer_addr: Option<IpAddr>,
227
52
    ) -> Self {
228
        Self {
229
            // The "other addr" (our address as seen by the peer) matches the one we advertised.
230
52
            canonical_to_peer: netinfo
231
52
                .their_addr()
232
54
                .is_some_and(|a: &IpAddr| my_addrs.contains(a)),
233
            // The "my addresses" (the peer addresses that it claims to have) matches the one we
234
            // see on the connection or that we attempted to connect to.
235
52
            peer_is_canonical: peer_addr
236
54
                .map(|a| netinfo.my_addrs().contains(&a))
237
52
                .unwrap_or_default(),
238
        }
239
52
    }
240

            
241
    /// Construct a fully canonical object.
242
    #[cfg(any(test, feature = "testing"))]
243
1716
    pub(crate) fn new_canonical() -> Self {
244
1716
        Self {
245
1716
            peer_is_canonical: true,
246
1716
            canonical_to_peer: true,
247
1716
        }
248
1716
    }
249
}
250

            
251
/// An open client channel, ready to send and receive Tor cells.
252
///
253
/// A channel is a direct connection to a Tor relay, implemented using TLS.
254
///
255
/// This struct is a frontend that can be used to send cells
256
/// and otherwise control the channel.  The main state is
257
/// in the Reactor object.
258
///
259
/// (Users need a mutable reference because of the types in `Sink`, and
260
/// ultimately because `cell_tx: mpsc::Sender` doesn't work without mut.
261
///
262
/// # Channel life cycle
263
///
264
/// Channels can be created directly here through a channel builder (client or relay) API.
265
/// For a higher-level API (with better support for TLS, pluggable transports,
266
/// and channel reuse) see the `tor-chanmgr` crate.
267
///
268
/// After a channel is created, it will persist until it is closed in one of
269
/// four ways:
270
///    1. A remote error occurs.
271
///    2. The other side of the channel closes the channel.
272
///    3. Someone calls [`Channel::terminate`] on the channel.
273
///    4. The last reference to the `Channel` is dropped. (Note that every circuit
274
///       on a `Channel` keeps a reference to it, which will in turn keep the
275
///       channel from closing until all those circuits have gone away.)
276
///
277
/// Note that in cases 1-3, the [`Channel`] object itself will still exist: it
278
/// will just be unusable for most purposes.  Most operations on it will fail
279
/// with an error.
280
pub struct Channel {
281
    /// A channel used to send control messages to the Reactor.
282
    control: mpsc::UnboundedSender<CtrlMsg>,
283
    /// A channel used to send cells to the Reactor.
284
    cell_tx: CellTx,
285

            
286
    /// A receiver that indicates whether the channel is closed.
287
    ///
288
    /// Awaiting will return a `CancelledError` event when the reactor is dropped.
289
    /// Read to decide if operations may succeed, and is returned by `wait_for_close`.
290
    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
291

            
292
    /// Padding controller, used to report when data is queued for this channel.
293
    padding_ctrl: PaddingController,
294

            
295
    /// A unique identifier for this channel.
296
    unique_id: UniqId,
297
    /// Target identity and address information for this peer.
298
    peer_id: OwnedChanTarget,
299
    /// Validated information for this peer.
300
    peer: MaybeSensitive<Arc<PeerInfo>>,
301
    /// The declared clock skew on this channel, at the time when this channel was
302
    /// created.
303
    clock_skew: ClockSkew,
304
    /// The time when this channel was successfully completed
305
    opened_at: coarsetime::Instant,
306
    /// Mutable state used by the `Channel.
307
    mutable: Mutex<MutableDetails>,
308
    /// Information shared with the reactor
309
    details: Arc<ChannelDetails>,
310
    /// Canonicity of this channel.
311
    canonicity: Canonicity,
312
}
313

            
314
/// This is information shared between the reactor and the frontend (`Channel` object).
315
///
316
/// `control` can't be here because we rely on it getting dropped when the last user goes away.
317
#[derive(Debug)]
318
pub(crate) struct ChannelDetails {
319
    /// Since when the channel became unused.
320
    ///
321
    /// If calling `time_since_update` returns None,
322
    /// this channel is still in use by at least one circuit.
323
    ///
324
    /// Set by reactor when a circuit is added or removed.
325
    /// Read from `Channel::duration_unused`.
326
    unused_since: AtomicOptTimestamp,
327
    /// Memory quota account
328
    ///
329
    /// This is here partly because we need to ensure it lives as long as the channel,
330
    /// as otherwise the memquota system will tear the account down.
331
    #[allow(dead_code)]
332
    memquota: ChannelAccount,
333
}
334

            
335
/// Mutable details (state) used by the `Channel` (frontend)
336
#[derive(Debug, Default)]
337
struct MutableDetails {
338
    /// State used to control padding
339
    padding: PaddingControlState,
340
}
341

            
342
/// State used to control padding
343
///
344
/// We store this here because:
345
///
346
///  1. It must be per-channel, because it depends on channel usage.  So it can't be in
347
///     (for example) `ChannelPaddingInstructionsUpdate`.
348
///
349
///  2. It could be in the channel manager's per-channel state but (for code flow reasons
350
///     there, really) at the point at which the channel manager concludes for a pending
351
///     channel that it ought to update the usage, it has relinquished the lock on its own data
352
///     structure.
353
///     And there is actually no need for this to be global: a per-channel lock is better than
354
///     reacquiring the global one.
355
///
356
///  3. It doesn't want to be in the channel reactor since that's super hot.
357
///
358
/// See also the overview at [`tor_proto::channel::padding`](padding)
359
#[derive(Debug, Educe)]
360
#[educe(Default)]
361
enum PaddingControlState {
362
    /// No usage of this channel, so far, implies sending or negotiating channel padding.
363
    ///
364
    /// This means we do not send (have not sent) any `ChannelPaddingInstructionsUpdates` to the reactor,
365
    /// with the following consequences:
366
    ///
367
    ///  * We don't enable our own padding.
368
    ///  * We don't do any work to change the timeout distribution in the padding timer,
369
    ///    (which is fine since this timer is not enabled).
370
    ///  * We don't send any PADDING_NEGOTIATE cells.  The peer is supposed to come to the
371
    ///    same conclusions as us, based on channel usage: it should also not send padding.
372
    #[educe(Default)]
373
    UsageDoesNotImplyPadding {
374
        /// The last padding parameters (from reparameterize)
375
        ///
376
        /// We keep this so that we can send it if and when
377
        /// this channel starts to be used in a way that implies (possibly) sending padding.
378
        padding_params: ChannelPaddingInstructionsUpdates,
379
    },
380

            
381
    /// Some usage of this channel implies possibly sending channel padding
382
    ///
383
    /// The required padding timer, negotiation cell, etc.,
384
    /// have been communicated to the reactor via a `CtrlMsg::ConfigUpdate`.
385
    ///
386
    /// Once we have set this variant, it remains this way forever for this channel,
387
    /// (the spec speaks of channels "only used for" certain purposes not getting padding).
388
    PaddingConfigured,
389
}
390

            
391
use PaddingControlState as PCS;
392

            
393
cfg_if! {
394
    if #[cfg(feature="circ-padding")] {
395
        /// Implementation type for a ChannelSender.
396
        type CellTx = CountingSink<mq_queue::Sender<ChanCellQueueEntry, mq_queue::MpscSpec>>;
397

            
398
        /// Implementation type for a cell queue held by a reactor.
399
        type CellRx = CountingStream<mq_queue::Receiver<ChanCellQueueEntry, mq_queue::MpscSpec>>;
400
    } else {
401
        /// Implementation type for a ChannelSender.
402
        type CellTx = mq_queue::Sender<ChanCellQueueEntry, mq_queue::MpscSpec>;
403

            
404
        /// Implementation type for a cell queue held by a reactor.
405
        type CellRx = mq_queue::Receiver<ChanCellQueueEntry, mq_queue::MpscSpec>;
406
    }
407
}
408

            
409
/// A handle to a [`Channel`]` that can be used, by circuits, to send channel cells.
410
#[derive(Debug)]
411
pub(crate) struct ChannelSender {
412
    /// MPSC sender to send cells.
413
    cell_tx: CellTx,
414
    /// A receiver used to check if the channel is closed.
415
    reactor_closed_rx: oneshot_broadcast::Receiver<Result<CloseInfo>>,
416
    /// Unique ID for this channel. For logging.
417
    unique_id: UniqId,
418
    /// Padding controller for this channel:
419
    /// used to report when we queue data that will eventually wind up on the channel.
420
    padding_ctrl: PaddingController,
421
}
422

            
423
impl ChannelSender {
424
    /// Check whether a cell type is permissible to be _sent_ on an
425
    /// open client channel.
426
4686
    fn check_cell(&self, cell: &AnyChanCell) -> Result<()> {
427
        use tor_cell::chancell::msg::AnyChanMsg::*;
428
4686
        let msg = cell.msg();
429
4686
        match msg {
430
12
            Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
431
12
                "Can't send {} cell on client channel",
432
12
                msg.cmd()
433
12
            ))),
434
            Certs(_) | Versions(_) | Authenticate(_) | AuthChallenge(_) | Netinfo(_) => {
435
12
                Err(Error::from(internal!(
436
12
                    "Can't send {} cell after handshake is done",
437
12
                    msg.cmd()
438
12
                )))
439
            }
440
4662
            _ => Ok(()),
441
        }
442
4686
    }
443

            
444
    /// Obtain a reference to the `ChannelSender`'s [`DynTimeProvider`]
445
    ///
446
    /// (This can sometimes be used to avoid having to keep
447
    /// a separate clone of the time provider.)
448
72
    pub(crate) fn time_provider(&self) -> &DynTimeProvider {
449
        cfg_if! {
450
            if #[cfg(feature="circ-padding")] {
451
72
                self.cell_tx.inner().time_provider()
452
            } else {
453
                self.cell_tx.time_provider()
454
            }
455
        }
456
72
    }
457

            
458
    /// Return an approximate count of the number of outbound cells queued for this channel.
459
    ///
460
    /// This count is necessarily approximate,
461
    /// because the underlying count can be modified by other senders and receivers
462
    /// between when this method is called and when its return value is used.
463
    ///
464
    /// Does not include cells that have already been passed to the TLS connection.
465
    ///
466
    /// Circuit padding uses this count to determine
467
    /// when messages are already outbound for the first hop of a circuit.
468
    #[cfg(feature = "circ-padding")]
469
    pub(crate) fn approx_count(&self) -> usize {
470
        self.cell_tx.approx_count()
471
    }
472

            
473
    /// Note that a cell has been queued that will eventually be placed onto this sender.
474
    ///
475
    /// We use this as an input for padding machines.
476
4638
    pub(crate) fn note_cell_queued(&self) {
477
4638
        self.padding_ctrl.queued_data(crate::HopNum::from(0));
478
4638
    }
479
}
480

            
481
impl Sink<ChanCellQueueEntry> for ChannelSender {
482
    type Error = Error;
483

            
484
18898
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
485
18898
        let this = self.get_mut();
486
18898
        Pin::new(&mut this.cell_tx)
487
18898
            .poll_ready(cx)
488
18904
            .map_err(|_| ChannelClosed.into())
489
18898
    }
490

            
491
4650
    fn start_send(self: Pin<&mut Self>, cell: ChanCellQueueEntry) -> Result<()> {
492
4650
        let this = self.get_mut();
493
4650
        if this.reactor_closed_rx.is_ready() {
494
            return Err(ChannelClosed.into());
495
4650
        }
496
4650
        this.check_cell(&cell.0)?;
497
        {
498
            use tor_cell::chancell::msg::AnyChanMsg::*;
499
4650
            match cell.0.msg() {
500
4522
                Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
501
128
                _ => trace!(
502
                    channel_id = %this.unique_id,
503
                    "Sending {} for {}",
504
                    cell.0.msg().cmd(),
505
                    CircId::get_or_zero(cell.0.circid())
506
                ),
507
            }
508
        }
509

            
510
4650
        Pin::new(&mut this.cell_tx)
511
4650
            .start_send(cell)
512
4650
            .map_err(|_| ChannelClosed.into())
513
4650
    }
514

            
515
88
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
516
88
        let this = self.get_mut();
517
88
        Pin::new(&mut this.cell_tx)
518
88
            .poll_flush(cx)
519
88
            .map_err(|_| ChannelClosed.into())
520
88
    }
521

            
522
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
523
        let this = self.get_mut();
524
        Pin::new(&mut this.cell_tx)
525
            .poll_close(cx)
526
            .map_err(|_| ChannelClosed.into())
527
    }
528
}
529

            
530
impl Channel {
531
    /// Construct a channel and reactor.
532
    ///
533
    /// Internal method, called to finalize the channel when we've
534
    /// sent our netinfo cell, received the peer's netinfo cell, and
535
    /// we're finally ready to create circuits.
536
    ///
537
    /// Quick note on the allow clippy. This is has one call site so for now, it is fine that we
538
    /// bust the mighty 7 arguments.
539
    #[allow(clippy::too_many_arguments)] // TODO consider if we want a builder
540
500
    fn new<R>(
541
500
        channel_mode: ChannelMode,
542
500
        link_protocol: u16,
543
500
        sink: BoxedChannelSink,
544
500
        stream: BoxedChannelStream,
545
500
        streamops: BoxedChannelStreamOps,
546
500
        unique_id: UniqId,
547
500
        peer_id: OwnedChanTarget,
548
500
        peer: MaybeSensitive<PeerInfo>,
549
500
        clock_skew: ClockSkew,
550
500
        runtime: R,
551
500
        memquota: ChannelAccount,
552
500
        canonicity: Canonicity,
553
500
    ) -> Result<(Arc<Self>, reactor::Reactor<R>)>
554
500
    where
555
500
        R: Runtime,
556
    {
557
        use circmap::{CircIdRange, CircMap};
558
500
        let circid_range = match channel_mode {
559
            // client channels always originate here
560
500
            ChannelMode::Client => CircIdRange::High,
561
            #[cfg(feature = "relay")]
562
            ChannelMode::Relay { circ_id_range, .. } => circ_id_range,
563
        };
564
500
        let circmap = CircMap::new(circid_range);
565
500
        let dyn_time = DynTimeProvider::new(runtime.clone());
566

            
567
500
        let (control_tx, control_rx) = mpsc::unbounded();
568
500
        let (cell_tx, cell_rx) = mq_queue::MpscSpec::new(CHANNEL_BUFFER_SIZE)
569
500
            .new_mq(dyn_time.clone(), memquota.as_raw_account())?;
570
        #[cfg(feature = "circ-padding")]
571
500
        let (cell_tx, cell_rx) = counting_streams::channel(cell_tx, cell_rx);
572
500
        let unused_since = AtomicOptTimestamp::new();
573
500
        unused_since.update();
574

            
575
500
        let mutable = MutableDetails::default();
576
500
        let (reactor_closed_tx, reactor_closed_rx) = oneshot_broadcast::channel();
577

            
578
500
        let details = ChannelDetails {
579
500
            unused_since,
580
500
            memquota,
581
500
        };
582
500
        let details = Arc::new(details);
583

            
584
        // We might be using experimental maybenot padding; this creates the padding framework for that.
585
        //
586
        // TODO: This backend is currently optimized for circuit padding,
587
        // so it might allocate a bit more than necessary to account for multiple hops.
588
        // We should tune it when we deploy padding in production.
589
500
        let (padding_ctrl, padding_event_stream) =
590
500
            client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
591

            
592
500
        let channel = Arc::new(Channel {
593
500
            control: control_tx,
594
500
            cell_tx,
595
500
            reactor_closed_rx,
596
500
            padding_ctrl: padding_ctrl.clone(),
597
500
            unique_id,
598
500
            peer_id,
599
500
            peer: peer.map(Arc::new),
600
500
            clock_skew,
601
500
            opened_at: coarsetime::Instant::now(),
602
500
            mutable: Mutex::new(mutable),
603
500
            details: Arc::clone(&details),
604
500
            canonicity,
605
500
        });
606

            
607
        // We start disabled; the channel manager will `reconfigure` us soon after creation.
608
500
        let padding_timer = Box::pin(padding::Timer::new_disabled(runtime.clone(), None)?);
609

            
610
        cfg_if! {
611
            if #[cfg(feature = "circ-padding")] {
612
                use crate::util::sink_blocker::{SinkBlocker,CountingPolicy};
613
500
                let sink = SinkBlocker::new(sink, CountingPolicy::new_unlimited());
614
            }
615
        }
616

            
617
        #[cfg(feature = "relay")]
618
500
        let create_request_handler: Option<_> = match channel_mode {
619
            ChannelMode::Relay {
620
                create_request_handler,
621
                our_ed25519_id,
622
                our_rsa_id,
623
                ..
624
            } => Some(CreateRequestHandlerAndData {
625
                handler: create_request_handler,
626
                channel: Arc::downgrade(&channel),
627
                our_ed25519_id,
628
                our_rsa_id,
629
            }),
630
500
            ChannelMode::Client => None,
631
        };
632
        // clippy wants us to consume `channel_mode` (`needless_pass_by_value`)
633
        #[cfg(not(feature = "relay"))]
634
        #[expect(clippy::drop_non_drop)]
635
        drop(channel_mode);
636

            
637
500
        let reactor = Reactor {
638
500
            runtime,
639
500
            control: control_rx,
640
500
            cells: cell_rx,
641
500
            reactor_closed_tx,
642
500
            input: futures::StreamExt::fuse(stream),
643
500
            output: sink,
644
500
            streamops,
645
500
            circs: circmap,
646
500
            circ_unique_id_ctx: CircUniqIdContext::new(),
647
500
            link_protocol,
648
500
            unique_id,
649
500
            details,
650
500
            #[cfg(feature = "relay")]
651
500
            create_request_handler,
652
500
            padding_timer,
653
500
            padding_ctrl,
654
500
            padding_event_stream,
655
500
            padding_blocker: None,
656
500
            special_outgoing: Default::default(),
657
500
        };
658

            
659
500
        Ok((channel, reactor))
660
500
    }
661

            
662
    /// Return a process-unique identifier for this channel.
663
4
    pub fn unique_id(&self) -> UniqId {
664
4
        self.unique_id
665
4
    }
666

            
667
    /// Return a reference to the memory tracking account for this Channel
668
    pub fn mq_account(&self) -> &ChannelAccount {
669
        &self.details.memquota
670
    }
671

            
672
    /// Obtain a reference to the `Channel`'s [`DynTimeProvider`]
673
    ///
674
    /// (This can sometimes be used to avoid having to keep
675
    /// a separate clone of the time provider.)
676
392
    pub fn time_provider(&self) -> &DynTimeProvider {
677
        cfg_if! {
678
            if #[cfg(feature="circ-padding")] {
679
392
                self.cell_tx.inner().time_provider()
680
            } else {
681
                self.cell_tx.time_provider()
682
            }
683
        }
684
392
    }
685

            
686
    /// Return an OwnedChanTarget representing the actual handshake used to
687
    /// create this channel.
688
1252
    pub fn target(&self) -> &OwnedChanTarget {
689
1252
        &self.peer_id
690
1252
    }
691

            
692
    /// Return the amount of time that has passed since this channel became open.
693
    pub fn age(&self) -> Duration {
694
        self.opened_at.elapsed().into()
695
    }
696

            
697
    /// Return a ClockSkew declaring how much clock skew the other side of this channel
698
    /// claimed that we had when we negotiated the connection.
699
    pub fn clock_skew(&self) -> ClockSkew {
700
        self.clock_skew
701
    }
702

            
703
    /// Send a control message
704
    #[instrument(level = "trace", skip_all)]
705
3100
    fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
706
3100
        self.control
707
3100
            .unbounded_send(msg)
708
3100
            .map_err(|_| ChannelClosed)?;
709
3002
        Ok(())
710
3100
    }
711

            
712
    /// Acquire the lock on `mutable` (and handle any poison error)
713
1200
    fn mutable(&self) -> MutexGuard<MutableDetails> {
714
1200
        self.mutable.lock().expect("channel details poisoned")
715
1200
    }
716

            
717
    /// Specify that this channel should do activities related to channel padding
718
    ///
719
    /// Initially, the channel does nothing related to channel padding:
720
    /// it neither sends any padding, nor sends any PADDING_NEGOTIATE cells.
721
    ///
722
    /// After this function has been called, it will do both,
723
    /// according to the parameters specified through `reparameterize`.
724
    /// Note that this might include *disabling* padding
725
    /// (for example, by sending a `PADDING_NEGOTIATE`).
726
    ///
727
    /// Idempotent.
728
    ///
729
    /// There is no way to undo the effect of this call.
730
    #[instrument(level = "trace", skip_all)]
731
1200
    pub fn engage_padding_activities(&self) {
732
1200
        let mut mutable = self.mutable();
733

            
734
1200
        match &mutable.padding {
735
            PCS::UsageDoesNotImplyPadding {
736
1200
                padding_params: params,
737
            } => {
738
                // Well, apparently the channel usage *does* imply padding now,
739
                // so we need to (belatedly) enable the timer,
740
                // send the padding negotiation cell, etc.
741
1200
                let mut params = params.clone();
742

            
743
                // Except, maybe the padding we would be requesting is precisely default,
744
                // so we wouldn't actually want to send that cell.
745
1200
                if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
746
                    params.padding_negotiate = None;
747
1200
                }
748

            
749
1200
                match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
750
1200
                    Ok(()) => {}
751
                    Err(ChannelClosed) => return,
752
                }
753

            
754
1200
                mutable.padding = PCS::PaddingConfigured;
755
            }
756

            
757
            PCS::PaddingConfigured => {
758
                // OK, nothing to do
759
            }
760
        }
761

            
762
1200
        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
763
1200
    }
764

            
765
    /// Reparameterise (update parameters; reconfigure)
766
    ///
767
    /// Returns `Err` if the channel was closed earlier
768
    #[instrument(level = "trace", skip_all)]
769
2700
    pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
770
2700
        let mut mutable = self
771
2700
            .mutable
772
2700
            .lock()
773
2700
            .map_err(|_| internal!("channel details poisoned"))?;
774

            
775
2700
        match &mut mutable.padding {
776
            PCS::PaddingConfigured => {
777
1500
                self.send_control(CtrlMsg::ConfigUpdate(params))?;
778
            }
779
1200
            PCS::UsageDoesNotImplyPadding { padding_params } => {
780
1200
                padding_params.combine(&params);
781
1200
            }
782
        }
783

            
784
2700
        drop(mutable); // release the lock now: lock span covers the send, ensuring ordering
785
2700
        Ok(())
786
2700
    }
787

            
788
    /// Update the KIST parameters.
789
    ///
790
    /// Returns `Err` if the channel is closed.
791
    #[instrument(level = "trace", skip_all)]
792
    pub fn reparameterize_kist(&self, kist_params: KistParams) -> Result<()> {
793
        Ok(self.send_control(CtrlMsg::KistConfigUpdate(kist_params))?)
794
    }
795

            
796
    /// Return an error if this channel is somehow mismatched with the
797
    /// given target.
798
42
    pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
799
42
        check_id_match_helper(&self.peer_id, target)
800
42
    }
801

            
802
    /// Return true if this channel is closed and therefore unusable.
803
126
    pub fn is_closing(&self) -> bool {
804
126
        self.reactor_closed_rx.is_ready()
805
126
    }
806

            
807
    /// Return true iff this channel is considered canonical by us.
808
    pub fn is_canonical(&self) -> bool {
809
        self.canonicity.peer_is_canonical
810
    }
811

            
812
    /// Return true if we think the peer considers this channel as canonical.
813
    pub fn is_canonical_to_peer(&self) -> bool {
814
        self.canonicity.canonical_to_peer
815
    }
816

            
817
    /// If the channel is not in use, return the amount of time
818
    /// it has had with no circuits.
819
    ///
820
    /// Return `None` if the channel is currently in use.
821
188
    pub fn duration_unused(&self) -> Option<std::time::Duration> {
822
188
        self.details
823
188
            .unused_since
824
188
            .time_since_update()
825
188
            .map(Into::into)
826
188
    }
827

            
828
    /// Return a new [`ChannelSender`] to transmit cells on this channel.
829
436
    pub(crate) fn sender(&self) -> ChannelSender {
830
436
        ChannelSender {
831
436
            cell_tx: self.cell_tx.clone(),
832
436
            reactor_closed_rx: self.reactor_closed_rx.clone(),
833
436
            unique_id: self.unique_id,
834
436
            padding_ctrl: self.padding_ctrl.clone(),
835
436
        }
836
436
    }
837

            
838
    /// Return the [`PeerInfo`] of this channel.
839
    #[cfg(feature = "relay")]
840
20
    pub(crate) fn peer_info(&self) -> &Arc<PeerInfo> {
841
20
        &self.peer
842
20
    }
843

            
844
    /// Return a newly allocated PendingClientTunnel object with
845
    /// a corresponding tunnel reactor. A circuit ID is allocated, but no
846
    /// messages are sent, and no cryptography is done.
847
    ///
848
    /// To use the results of this method, call Reactor::run() in a
849
    /// new task, then use the methods of
850
    /// [crate::client::circuit::PendingClientTunnel] to build the circuit.
851
    #[instrument(level = "trace", skip_all)]
852
12
    pub async fn new_tunnel(
853
12
        self: &Arc<Self>,
854
12
        timeouts: Arc<dyn TimeoutEstimator>,
855
18
    ) -> Result<(PendingClientTunnel, client::reactor::Reactor)> {
856
        if self.is_closing() {
857
            return Err(ChannelClosed.into());
858
        }
859

            
860
        let time_prov = self.time_provider().clone();
861
        let memquota = CircuitAccount::new(&self.details.memquota)?;
862

            
863
        // TODO: blocking is risky, but so is unbounded.
864
        let (sender, receiver) =
865
            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
866
        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
867

            
868
        let (tx, rx) = oneshot::channel();
869
        self.send_control(CtrlMsg::AllocateCircuit {
870
            created_sender: createdsender,
871
            sender,
872
            tx,
873
        })?;
874
        let (id, circ_unique_id, padding_ctrl, padding_stream) =
875
            rx.await.map_err(|_| ChannelClosed)??;
876

            
877
        trace!("{}: Allocated CircId {}", circ_unique_id, id);
878

            
879
        Ok(PendingClientTunnel::new(
880
            id,
881
            self.clone(),
882
            createdreceiver,
883
            receiver,
884
            circ_unique_id,
885
            time_prov,
886
            memquota,
887
            padding_ctrl,
888
            padding_stream,
889
            timeouts,
890
        ))
891
12
    }
892

            
893
    /// Return a newly allocated outbound relay circuit with.
894
    ///
895
    /// A circuit ID is allocated, but no messages are sent, and no cryptography is done.
896
    ///
897
    // TODO(relay): this duplicates much of new_tunnel above, but I expect
898
    // the implementations to diverge once we introduce a new CtrlMsg for
899
    // allocating relay circuits.
900
    #[cfg(feature = "relay")]
901
4
    pub(crate) async fn new_outbound_circ(
902
4
        self: &Arc<Self>,
903
4
        memquota: CircuitAccount,
904
6
    ) -> Result<(CircId, CircuitRxReceiver, oneshot::Receiver<CreateResponse>)> {
905
4
        if self.is_closing() {
906
            return Err(ChannelClosed.into());
907
4
        }
908

            
909
4
        let time_prov = self.time_provider().clone();
910

            
911
        // TODO: blocking is risky, but so is unbounded.
912
4
        let (sender, receiver) =
913
4
            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
914
4
        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
915

            
916
4
        let (tx, rx) = oneshot::channel();
917

            
918
4
        self.send_control(CtrlMsg::AllocateCircuit {
919
4
            created_sender: createdsender,
920
4
            sender,
921
4
            tx,
922
4
        })?;
923

            
924
        // TODO(relay): I don't think we need circuit-level padding on this side of the circuit.
925
        // This just drops the padding controller and corresponding event stream,
926
        // but maybe it would be better to just not set it up in the first place?
927
        // This suggests we might need a different control command for allocating
928
        // the outbound relay circuits...
929
4
        let (id, circ_unique_id, _padding_ctrl, _padding_stream) =
930
4
            rx.await.map_err(|_| ChannelClosed)??;
931

            
932
4
        let channel_account = self.details.memquota.as_raw_account();
933
        // Link the memquota circuit account with the outbound channel account:
934
4
        memquota.as_raw_account().add_parent(channel_account)?;
935

            
936
4
        trace!("{}: Allocated CircId {}", circ_unique_id, id);
937

            
938
4
        Ok((id, receiver, createdreceiver))
939
4
    }
940

            
941
    /// Shut down this channel immediately, along with all circuits that
942
    /// are using it.
943
    ///
944
    /// Note that other references to this channel may exist.  If they
945
    /// do, they will stop working after you call this function.
946
    ///
947
    /// It's not necessary to call this method if you're just done
948
    /// with a channel: the channel should close on its own once nothing
949
    /// is using it any more.
950
    #[instrument(level = "trace", skip_all)]
951
36
    pub fn terminate(&self) {
952
36
        let _ = self.send_control(CtrlMsg::Shutdown);
953
36
    }
954

            
955
    /// Tell the reactor that the circuit with the given ID has gone away.
956
    #[instrument(level = "trace", skip_all)]
957
348
    pub fn close_circuit(&self, circid: CircId) -> Result<()> {
958
348
        self.send_control(CtrlMsg::CloseCircuit(circid))?;
959
250
        Ok(())
960
348
    }
961

            
962
    /// Return a future that will resolve once this channel has closed.
963
    ///
964
    /// Note that this method does not _cause_ the channel to shut down on its own.
965
36
    pub fn wait_for_close(
966
36
        &self,
967
36
    ) -> impl Future<Output = StdResult<CloseInfo, ClosedUnexpectedly>> + Send + Sync + 'static + use<>
968
    {
969
36
        self.reactor_closed_rx
970
36
            .clone()
971
36
            .into_future()
972
48
            .map(|recv| match recv {
973
12
                Ok(Ok(info)) => Ok(info),
974
12
                Ok(Err(e)) => Err(ClosedUnexpectedly::ReactorError(e)),
975
12
                Err(oneshot_broadcast::SenderDropped) => Err(ClosedUnexpectedly::ReactorDropped),
976
36
            })
977
36
    }
978

            
979
    /// Install a [`CircuitPadder`](client::CircuitPadder) for this channel.
980
    ///
981
    /// Replaces any previous padder installed.
982
    #[cfg(feature = "circ-padding-manual")]
983
    pub async fn start_padding(self: &Arc<Self>, padder: client::CircuitPadder) -> Result<()> {
984
        self.set_padder_impl(Some(padder)).await
985
    }
986

            
987
    /// Remove any [`CircuitPadder`](client::CircuitPadder) installed for this channel.
988
    ///
989
    /// Does nothing if there was not a padder installed there.
990
    #[cfg(feature = "circ-padding-manual")]
991
    pub async fn stop_padding(self: &Arc<Self>) -> Result<()> {
992
        self.set_padder_impl(None).await
993
    }
994

            
995
    /// Replace the [`CircuitPadder`](client::CircuitPadder) installed for this channel with `padder`.
996
    #[cfg(feature = "circ-padding-manual")]
997
    async fn set_padder_impl(
998
        self: &Arc<Self>,
999
        padder: Option<client::CircuitPadder>,
    ) -> Result<()> {
        let (tx, rx) = oneshot::channel();
        let msg = CtrlMsg::SetChannelPadder { padder, sender: tx };
        self.control
            .unbounded_send(msg)
            .map_err(|_| Error::ChannelClosed(ChannelClosed))?;
        rx.await.map_err(|_| Error::ChannelClosed(ChannelClosed))?
    }
    /// Make a new fake reactor-less channel.  For testing only, obviously.
    ///
    /// Returns the receiver end of the control message mpsc.
    ///
    /// Suitable for external callers who want to test behaviour
    /// of layers including the logic in the channel frontend
    /// (`Channel` object methods).
    //
    // This differs from test::fake_channel as follows:
    //  * It returns the mpsc Receiver
    //  * It does not require explicit specification of details
    #[cfg(feature = "testing")]
48
    pub fn new_fake(
48
        rt: impl SleepProvider + CoarseTimeProvider,
48
        _channel_type: ChannelType,
48
    ) -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
48
        let (control, control_recv) = mpsc::unbounded();
48
        let details = fake_channel_details();
48
        let unique_id = UniqId::new();
48
        let peer_id = OwnedChanTarget::builder()
48
            .ed_identity([6_u8; 32].into())
48
            .rsa_identity([10_u8; 20].into())
48
            .build()
48
            .expect("Couldn't construct peer id");
        // This will make rx trigger immediately.
48
        let (_tx, rx) = oneshot_broadcast::channel();
48
        let (padding_ctrl, _) = client::circuit::padding::new_padding(DynTimeProvider::new(rt));
48
        let channel = Channel {
48
            control,
48
            cell_tx: fake_mpsc().0,
48
            reactor_closed_rx: rx,
48
            padding_ctrl,
48
            unique_id,
48
            peer_id,
48
            peer: MaybeSensitive::not_sensitive(Arc::new(PeerInfo::EMPTY)),
48
            clock_skew: ClockSkew::None,
48
            opened_at: coarsetime::Instant::now(),
48
            mutable: Default::default(),
48
            details,
48
            canonicity: Canonicity::new_canonical(),
48
        };
48
        (channel, control_recv)
48
    }
}
/// If there is any identity in `wanted_ident` that is not present in
/// `my_ident`, return a ChanMismatch error.
///
/// This is a helper for [`Channel::check_match`] and
/// UnverifiedChannel::check_internal.
56
fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
56
where
56
    T: HasRelayIds + ?Sized,
56
    U: HasRelayIds + ?Sized,
{
82
    for desired in wanted_ident.identities() {
82
        let id_type = desired.id_type();
82
        match my_ident.identity(id_type) {
82
            Some(actual) if actual == desired => {}
8
            Some(actual) => {
8
                return Err(Error::ChanMismatch(format!(
8
                    "Identity {} does not match target {}",
8
                    sv(actual),
8
                    sv(desired)
8
                )));
            }
            None => {
                return Err(Error::ChanMismatch(format!(
                    "Peer does not have {} identity",
                    id_type
                )));
            }
        }
    }
48
    Ok(())
56
}
impl HasRelayIds for Channel {
4850
    fn identity(
4850
        &self,
4850
        key_type: tor_linkspec::RelayIdType,
4850
    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
4850
        self.peer_id.identity(key_type)
4850
    }
}
/// The status of a channel which was closed successfully.
///
/// **Note:** This doesn't have any associated data,
/// but may be expanded in the future.
// I can't think of any info we'd want to return to waiters,
// but this type leaves the possibility open without requiring any backwards-incompatible changes.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct CloseInfo;
/// The status of a channel which closed unexpectedly.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ClosedUnexpectedly {
    /// The channel reactor was dropped or panicked before completing.
    #[error("channel reactor was dropped or panicked before completing")]
    ReactorDropped,
    /// The channel reactor had an internal error.
    #[error("channel reactor had an internal error")]
    ReactorError(Error),
}
/// Whether the channel is operating in "client" or "relay" mode,
/// and some mode-specific parameters.
pub(crate) enum ChannelMode {
    /// An incoming channel,
    /// or an outgoing channel made by a non-bridge relay.
    #[cfg(feature = "relay")]
    Relay {
        /// A handler for CREATE2/CREATE_FAST messages.
        create_request_handler: Arc<CreateRequestHandler>,
        /// Our Ed25519 identity.
        our_ed25519_id: Ed25519Identity,
        /// Our RSA identity.
        our_rsa_id: RsaIdentity,
        /// The range of circuit IDs that we allocate for new circuits.
        circ_id_range: circmap::CircIdRange,
    },
    /// An outgoing channel made by a client or bridge relay.
    Client,
}
impl ChannelMode {
    /// Returns an error if the mode doesn't agree with the channel type.
52
    pub(crate) fn check_agrees_with_type(
52
        &self,
52
        channel_type: ChannelType,
52
    ) -> StdResult<(), tor_error::Bug> {
        use ChannelType::*;
        use circmap::CircIdRange::*;
52
        match (channel_type, self) {
52
            (ClientInitiator, Self::Client) => {}
            #[cfg(feature = "relay")]
            #[rustfmt::skip]
            (RelayInitiator, Self::Relay { circ_id_range: High, .. }) => {}
            #[cfg(feature = "relay")]
            #[rustfmt::skip]
            (RelayResponder { .. }, Self::Relay { circ_id_range: Low, .. }) => {}
            _ => return Err(internal!("`ChannelMode` doesn't agree with `ChannelType`")),
        }
52
        Ok(())
52
    }
}
/// Make some fake channel details (for testing only!)
#[cfg(any(test, feature = "testing"))]
1222
fn fake_channel_details() -> Arc<ChannelDetails> {
1222
    let unused_since = AtomicOptTimestamp::new();
1222
    Arc::new(ChannelDetails {
1222
        unused_since,
1222
        memquota: crate::util::fake_mq(),
1222
    })
1222
}
/// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
#[cfg(any(test, feature = "testing"))] // Used by Channel::new_fake which is also feature=testing
1220
pub(crate) fn fake_mpsc() -> (CellTx, CellRx) {
1220
    let (tx, rx) = crate::fake_mpsc(CHANNEL_BUFFER_SIZE);
    #[cfg(feature = "circ-padding")]
1220
    let (tx, rx) = counting_streams::channel(tx, rx);
1220
    (tx, rx)
1220
}
#[cfg(test)]
pub(crate) mod test {
    // Most of this module is tested via tests that also check on the
    // reactor code; there are just a few more cases to examine here.
    #![allow(clippy::unwrap_used)]
    use super::*;
    pub(crate) use crate::channel::reactor::test::{CodecResult, new_reactor};
    use tor_cell::chancell::msg::HandshakeType;
    use tor_cell::chancell::{AnyChanCell, msg};
    use tor_rtcompat::test_with_one_runtime;
    /// Make a new fake reactor-less channel.  For testing only, obviously.
    pub(crate) fn fake_channel(
        rt: impl SleepProvider + CoarseTimeProvider,
        _channel_type: ChannelType,
    ) -> Channel {
        let unique_id = UniqId::new();
        let peer_id = OwnedChanTarget::builder()
            .ed_identity([6_u8; 32].into())
            .rsa_identity([10_u8; 20].into())
            .build()
            .expect("Couldn't construct peer id");
        // This will make rx trigger immediately.
        let (_tx, rx) = oneshot_broadcast::channel();
        let (padding_ctrl, _) = client::circuit::padding::new_padding(DynTimeProvider::new(rt));
        Channel {
            control: mpsc::unbounded().0,
            cell_tx: fake_mpsc().0,
            reactor_closed_rx: rx,
            padding_ctrl,
            unique_id,
            peer_id,
            peer: MaybeSensitive::not_sensitive(Arc::new(PeerInfo::EMPTY)),
            clock_skew: ClockSkew::None,
            opened_at: coarsetime::Instant::now(),
            mutable: Default::default(),
            details: fake_channel_details(),
            canonicity: Canonicity::new_canonical(),
        }
    }
    #[test]
    fn send_bad() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            use std::error::Error;
            let chan = fake_channel(rt, ChannelType::ClientInitiator);
            let cell = AnyChanCell::new(CircId::new(7), msg::Created2::new(&b"hihi"[..]).into());
            let e = chan.sender().check_cell(&cell);
            assert!(e.is_err());
            assert!(
                format!("{}", e.unwrap_err().source().unwrap())
                    .contains("Can't send CREATED2 cell on client channel")
            );
            let cell = AnyChanCell::new(None, msg::Certs::new_empty().into());
            let e = chan.sender().check_cell(&cell);
            assert!(e.is_err());
            assert!(
                format!("{}", e.unwrap_err().source().unwrap())
                    .contains("Can't send CERTS cell after handshake is done")
            );
            let cell = AnyChanCell::new(
                CircId::new(5),
                msg::Create2::new(HandshakeType::NTOR, &b"abc"[..]).into(),
            );
            let e = chan.sender().check_cell(&cell);
            assert!(e.is_ok());
            // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
            // let got = output.next().await.unwrap();
            // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
        });
    }
    #[test]
    fn check_match() {
        test_with_one_runtime!(|rt| async move {
            let chan = fake_channel(rt, ChannelType::ClientInitiator);
            let t1 = OwnedChanTarget::builder()
                .ed_identity([6; 32].into())
                .rsa_identity([10; 20].into())
                .build()
                .unwrap();
            let t2 = OwnedChanTarget::builder()
                .ed_identity([1; 32].into())
                .rsa_identity([3; 20].into())
                .build()
                .unwrap();
            let t3 = OwnedChanTarget::builder()
                .ed_identity([3; 32].into())
                .rsa_identity([2; 20].into())
                .build()
                .unwrap();
            assert!(chan.check_match(&t1).is_ok());
            assert!(chan.check_match(&t2).is_err());
            assert!(chan.check_match(&t3).is_err());
        });
    }
    #[test]
    fn unique_id() {
        test_with_one_runtime!(|rt| async move {
            let ch1 = fake_channel(rt.clone(), ChannelType::ClientInitiator);
            let ch2 = fake_channel(rt, ChannelType::ClientInitiator);
            assert_ne!(ch1.unique_id(), ch2.unique_id());
        });
    }
    #[test]
    fn duration_unused_at() {
        test_with_one_runtime!(|rt| async move {
            let details = fake_channel_details();
            let mut ch = fake_channel(rt, ChannelType::ClientInitiator);
            ch.details = details.clone();
            details.unused_since.update();
            assert!(ch.duration_unused().is_some());
        });
    }
}