1
//! Code to handle incoming cells on a channel.
2
//!
3
//! The role of this code is to run in a separate asynchronous task,
4
//! and routes cells to the right circuits.
5
//!
6
//! TODO: I have zero confidence in the close-and-cleanup behavior here,
7
//! or in the error handling behavior.
8

            
9
use super::circmap::{CircEnt, CircMap};
10
use crate::circuit::CircuitRxSender;
11
use crate::client::circuit::halfcirc::HalfCirc;
12
use crate::client::circuit::padding::{
13
    PaddingController, PaddingEvent, PaddingEventStream, SendPadding, StartBlocking,
14
};
15
use crate::util::err::ReactorError;
16
use crate::util::oneshot_broadcast;
17
use crate::{Error, HopNum, Result};
18
use tor_async_utils::SinkPrepareExt as _;
19
use tor_cell::chancell::ChanMsg;
20
use tor_cell::chancell::msg::{Destroy, DestroyReason, Padding, PaddingNegotiate};
21
use tor_cell::chancell::{AnyChanCell, CircId, msg::AnyChanMsg};
22
use tor_error::debug_report;
23
use tor_rtcompat::{DynTimeProvider, Runtime};
24

            
25
#[cfg_attr(not(target_os = "linux"), allow(unused))]
26
use tor_error::error_report;
27
#[cfg_attr(not(target_os = "linux"), allow(unused))]
28
use tor_rtcompat::StreamOps;
29

            
30
use futures::channel::mpsc;
31
use oneshot_fused_workaround as oneshot;
32

            
33
use futures::Sink;
34
use futures::StreamExt as _;
35
use futures::sink::SinkExt;
36
use futures::stream::Stream;
37
use futures::{select, select_biased};
38
use tor_error::internal;
39

            
40
use std::fmt;
41
use std::pin::Pin;
42
use std::sync::Arc;
43

            
44
use crate::channel::{ChannelDetails, CloseInfo, kist::KistParams, padding, params::*, unique_id};
45
use crate::circuit::celltypes::CreateResponse;
46
use tracing::{debug, instrument, trace};
47

            
48
#[cfg(feature = "relay")]
49
use {
50
    crate::channel::Channel,
51
    crate::circuit::celltypes::CreateRequest,
52
    crate::relay::channel::create_handler::{CreateRequestHandler, RelayCircComponents},
53
    std::sync::Weak,
54
    tor_llcrypto::pk::ed25519::Ed25519Identity,
55
    tor_llcrypto::pk::rsa::RsaIdentity,
56
};
57

            
58
/// A boxed trait object that can provide `ChanCell`s.
59
pub(super) type BoxedChannelStream =
60
    Box<dyn Stream<Item = std::result::Result<AnyChanCell, Error>> + Send + Unpin + 'static>;
61
/// A boxed trait object that can sink `ChanCell`s.
62
pub(super) type BoxedChannelSink =
63
    Box<dyn Sink<AnyChanCell, Error = Error> + Send + Unpin + 'static>;
64
/// A boxed trait object that can provide additional `StreamOps` on a `BoxedChannelStream`.
65
pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
66
/// The type of a oneshot channel used to inform reactor users of the result of an operation.
67
pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
68

            
69
cfg_if::cfg_if! {
70
    if #[cfg(feature = "circ-padding")] {
71
        use crate::util::sink_blocker::{SinkBlocker, CountingPolicy};
72
        /// Type used by a channel reactor to send cells to the network.
73
        pub(super) type ChannelOutputSink = SinkBlocker<BoxedChannelSink, CountingPolicy>;
74
    } else {
75
        /// Type used by a channel reactor to send cells to the network.
76
        pub(super) type ChannelOutputSink = BoxedChannelSink;
77
    }
78
}
79

            
80
/// A message telling the channel reactor to do something.
81
#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
82
#[derive(Debug)]
83
#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
84
#[allow(clippy::exhaustive_enums, private_interfaces)]
85
pub enum CtrlMsg {
86
    /// Shut down the reactor.
87
    Shutdown,
88
    /// Tell the reactor that a given circuit has gone away.
89
    CloseCircuit(CircId),
90
    /// Allocate a new circuit in this channel's circuit map, generating an ID for it
91
    /// and registering senders for messages received for the circuit.
92
    AllocateCircuit {
93
        /// Channel to send the circuit's `CreateResponse` down.
94
        created_sender: oneshot::Sender<CreateResponse>,
95
        /// Channel to send other messages from this circuit down.
96
        sender: CircuitRxSender,
97
        /// Oneshot channel to send the new circuit's identifiers down.
98
        tx: ReactorResultChannel<(
99
            CircId,
100
            crate::circuit::UniqId,
101
            PaddingController,
102
            PaddingEventStream,
103
        )>,
104
    },
105
    /// Enable/disable/reconfigure channel padding
106
    ///
107
    /// The sender of these messages is responsible for the optimisation of
108
    /// ensuring that "no-change" messages are elided.
109
    /// (This is implemented in `ChannelsParamsUpdatesBuilder`.)
110
    ///
111
    /// These updates are done via a control message to avoid adding additional branches to the
112
    /// main reactor `select!`.
113
    ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
114
    /// Enable/disable/reconfigure KIST.
115
    ///
116
    /// Like in the case of `ConfigUpdate`,
117
    /// the sender of these messages is responsible for the optimisation of
118
    /// ensuring that "no-change" messages are elided.
119
    KistConfigUpdate(KistParams),
120
    /// Change the current padding implementation to the one provided.
121
    #[cfg(feature = "circ-padding-manual")]
122
    SetChannelPadder {
123
        /// The padder to install, or None to remove any existing padder.
124
        padder: Option<crate::client::CircuitPadder>,
125
        /// A oneshot channel to use in reporting the outcome.
126
        sender: oneshot::Sender<Result<()>>,
127
    },
128
}
129

            
130
/// Object to handle incoming cells and background tasks on a channel.
131
///
132
/// This type is returned when you finish a channel; you need to spawn a
133
/// new task that calls `run()` on it.
134
#[must_use = "If you don't call run() on a reactor, the channel won't work."]
135
pub struct Reactor<R: Runtime> {
136
    /// Underlying runtime we use for generating sleep futures and telling time.
137
    pub(super) runtime: R,
138
    /// A receiver for control messages from `Channel` objects.
139
    pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
140
    /// A oneshot sender that is used to alert other tasks when this reactor is
141
    /// finally dropped.
142
    pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
143
    /// A receiver for cells to be sent on this reactor's sink.
144
    ///
145
    /// `Channel` objects have a sender that can send cells here.
146
    pub(super) cells: super::CellRx,
147
    /// A Stream from which we can read `ChanCell`s.
148
    ///
149
    /// This should be backed by a TLS connection if you want it to be secure.
150
    pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
151
    /// A Sink to which we can write `ChanCell`s.
152
    ///
153
    /// This should also be backed by a TLS connection if you want it to be secure.
154
    pub(super) output: ChannelOutputSink,
155
    /// A handler for setting stream options on the underlying stream.
156
    #[cfg_attr(not(target_os = "linux"), allow(unused))]
157
    pub(super) streamops: BoxedChannelStreamOps,
158
    /// A handler and associated data for CREATE2/CREATE_FAST messages,
159
    /// if this channel should handle them.
160
    #[cfg(feature = "relay")]
161
    pub(super) create_request_handler: Option<CreateRequestHandlerAndData>,
162
    /// Timer tracking when to generate channel padding.
163
    ///
164
    /// Note that this is _distinct_ from the experimental maybenot-based padding
165
    /// implemented with padding_ctrl and padding_stream.
166
    /// This is the existing per-channel padding
167
    /// in the tor protocol used to resist netflow attacks.
168
    pub(super) padding_timer: Pin<Box<padding::Timer<R>>>,
169
    /// Outgoing cells introduced at the channel reactor
170
    pub(super) special_outgoing: SpecialOutgoing,
171
    /// A map from circuit ID to Sinks on which we can deliver cells.
172
    pub(super) circs: CircMap,
173
    /// A unique identifier for this channel.
174
    pub(super) unique_id: super::UniqId,
175
    /// Information shared with the frontend
176
    pub(super) details: Arc<ChannelDetails>,
177
    /// Context for allocating unique circuit log identifiers.
178
    pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
179
    /// A padding controller to which padding-related events should be reported.
180
    ///
181
    /// (This is used for experimental maybenot-based padding.)
182
    //
183
    // TODO: It would be good to use S here instead of DynTimeProvider,
184
    // but we still need the latter for the clones of padding_ctrl that we hand out
185
    // inside ChannelSender.
186
    pub(super) padding_ctrl: PaddingController<DynTimeProvider>,
187
    /// An event stream telling us about padding-related events.
188
    ///
189
    /// (This is used for experimental maybenot-based padding.)
190
    pub(super) padding_event_stream: PaddingEventStream<DynTimeProvider>,
191
    /// If present, the current rules for blocking the output based on the padding framework.
192
    pub(super) padding_blocker: Option<StartBlocking>,
193
    /// What link protocol is the channel using?
194
    #[allow(dead_code)] // We don't support protocols where this would matter
195
    pub(super) link_protocol: u16,
196
}
197

            
198
/// Outgoing cells introduced at the channel reactor
199
#[derive(Default, Debug, Clone)]
200
pub(super) struct SpecialOutgoing {
201
    /// If we must send a `PaddingNegotiate`, this is present.
202
    padding_negotiate: Option<PaddingNegotiate>,
203
    /// A number of pending PADDING cells that we have to send, once there is space.
204
    n_padding: u16,
205
}
206

            
207
impl SpecialOutgoing {
208
    /// Do we have a special cell to send?
209
    ///
210
    /// Called by the reactor before looking for cells from the reactor's clients.
211
    /// The returned message *must* be sent by the caller, not dropped!
212
    #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
213
5050
    fn next(&mut self) -> Option<AnyChanCell> {
214
        // If this gets more cases, consider making SpecialOutgoing into a #[repr(C)]
215
        // enum, so that we can fast-path the usual case of "no special message to send".
216
5050
        if let Some(p) = self.padding_negotiate.take() {
217
            return Some(p.into());
218
5050
        }
219
5050
        if self.n_padding > 0 {
220
            self.n_padding -= 1;
221
            return Some(Padding::new().into());
222
5050
        }
223
5050
        None
224
5050
    }
225

            
226
    /// Try to queue a padding cell to be sent.
227
    fn queue_padding_cell(&mut self) {
228
        self.n_padding = self.n_padding.saturating_add(1);
229
    }
230
}
231

            
232
/// Allows us to just say debug!("{}: Reactor did a thing", &self, ...)
233
///
234
/// There is no risk of confusion because no-one would try to print a
235
/// Reactor for some other reason.
236
impl<R: Runtime> fmt::Display for Reactor<R> {
237
1140
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238
1140
        fmt::Debug::fmt(&self.unique_id, f)
239
1140
    }
240
}
241

            
242
impl<R: Runtime> Reactor<R> {
243
    /// Launch the reactor, and run until the channel closes or we
244
    /// encounter an error.
245
    ///
246
    /// Once this function returns, the channel is dead, and can't be
247
    /// used again.
248
    #[instrument(level = "trace", skip_all)]
249
422
    pub async fn run(mut self) -> Result<()> {
250
        trace!(channel_id = %self, "Running reactor");
251
        let result: Result<()> = loop {
252
            match self.run_once().await {
253
                Ok(()) => (),
254
                Err(ReactorError::Shutdown) => break Ok(()),
255
                Err(ReactorError::Err(e)) => break Err(e),
256
            }
257
        };
258

            
259
        // Log that the reactor stopped, possibly with the associated error as a report.
260
        // May log at a higher level depending on the error kind.
261
        const MSG: &str = "Reactor stopped";
262
        match &result {
263
            Ok(()) => debug!(channel_id = %self, "{MSG}"),
264
            Err(e) => debug_report!(e, channel_id = %self, "{MSG}"),
265
        }
266

            
267
        // Inform any waiters that the channel has closed.
268
        let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
269
        self.reactor_closed_tx.send(close_msg);
270
        result
271
332
    }
272

            
273
    /// Helper for run(): handles only one action.
274
    #[instrument(level = "trace", skip_all)]
275
5296
    async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
276
        select! {
277

            
278
            // See if the output sink can have cells written to it yet.
279
            // If so, see if we have to-be-transmitted cells.
280
5004
            ret = self.output.prepare_send_from(async {
281
                // This runs if we will be able to write, so try to obtain a cell:
282

            
283
5004
                if let Some(l) = self.special_outgoing.next() {
284
                    // See reasoning below.
285
                    // eprintln!("PADDING - SENDING NEOGIATION: {:?}", &l);
286
                    self.padding_timer.as_mut().note_cell_sent();
287
                    return Some((l, None));
288
5004
                }
289

            
290
5004
                select_biased! {
291
5004
                    n = self.cells.next() => {
292
                        // Note transmission on *input* to the reactor, not ultimate
293
                        // transmission.  Ideally we would tap into the TCP stream at the far
294
                        // end of our TLS or perhaps during encoding on entry to the TLS, but
295
                        // both of those would involve quite some plumbing.  Doing it here in
296
                        // the reactor avoids additional inter-task communication, mutexes,
297
                        // etc.  (And there is no real difference between doing it here on
298
                        // input, to just below, on enquieing into the `sendable`.)
299
                        //
300
                        // Padding is sent when the output channel is idle, and the effect of
301
                        // buffering is just that we might sent it a little early because we
302
                        // measure idleness when we last put something into the output layers.
303
                        //
304
                        // We can revisit this if measurement shows it to be bad in practice.
305
                        //
306
                        // (We in any case need padding that we generate when idle to make it
307
                        // through to the output promptly, or it will be late and ineffective.)
308
4434
                        self.padding_timer.as_mut().note_cell_sent();
309
4434
                        n
310
                    },
311
5004
                    p = self.padding_timer.as_mut().next() => {
312
                        // eprintln!("PADDING - SENDING PADDING: {:?}", &p);
313

            
314
                        // Note that we treat padding from the padding_timer as a normal cell,
315
                        // since it doesn't have a padding machine.
316
                        self.padding_ctrl.queued_data(HopNum::from(0));
317

            
318
                        self.padding_timer.as_mut().note_cell_sent();
319
                        Some((p.into(), None))
320
                    },
321
                }
322
4434
            }) => {
323
                self.padding_ctrl.flushed_channel_cell();
324
                let (queued, sendable) = ret?;
325
                let (msg, cell_padding_info) = queued.ok_or(ReactorError::Shutdown)?;
326
                // Tell the relevant circuit padder that this cell is getting flushed.
327
                // Note that, technically, it won't go onto the network for a while longer:
328
                // it has to go through the TLS buffer, and the kernel TCP buffer.
329
                // We've got to live with that.
330
                // TODO: conceivably we could defer this even longer, but it would take
331
                // some tricky hacking!
332
                if let (Some(cell_padding_info), Some(circid)) = (cell_padding_info, msg.circid()) {
333
                    self.circs.note_cell_flushed(circid, cell_padding_info);
334
                }
335
                sendable.send(msg)?;
336
            }
337

            
338
            ret = self.control.next() => {
339
                let ctrl = match ret {
340
                    None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
341
                    Some(x) => x,
342
                };
343
                self.handle_control(ctrl).await?;
344
            }
345

            
346
            ret = self.padding_event_stream.next() => {
347
                let event = ret.ok_or_else(|| Error::from(internal!("Padding event stream was exhausted")))?;
348
                self.handle_padding_event(event).await?;
349
            }
350

            
351
            ret = self.input.next() => {
352
                let item = ret
353
                    .ok_or(ReactorError::Shutdown)??;
354
                crate::note_incoming_traffic();
355
                self.handle_cell(item).await?;
356
            }
357

            
358
        }
359
        Ok(()) // Run again.
360
5244
    }
361

            
362
    /// Handle a CtrlMsg other than Shutdown.
363
    #[instrument(level = "trace", skip(self))] // Intentionally omitting skip_all, msg is useful and not sensitive
364
128
    async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
365
        trace!(
366
            channel_id = %self,
367
            msg = ?msg,
368
            "reactor received control message"
369
        );
370

            
371
128
        match msg {
372
            CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
373
            CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
374
            CtrlMsg::AllocateCircuit {
375
                created_sender,
376
                sender,
377
                tx,
378
            } => {
379
                let mut rng = rand::rng();
380
128
                let my_unique_id = self.unique_id;
381
                let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
382
                // NOTE: This is a very weird place to be calling new_padding, but:
383
                //  - we need to do it here or earlier, so we can add it as part of the CircEnt to
384
                //    our map.
385
                //  - We need to do it at some point where we have a runtime, which implies in a
386
                //    reactor.
387
                //
388
                // TODO circpad: We might want to lazy-allocate this somehow, or try harder to make
389
                // it a no-op when we aren't padding on a particular circuit.
390
                let (padding_ctrl, padding_stream) = crate::client::circuit::padding::new_padding(
391
                    // TODO: avoid using DynTimeProvider at some point, and re-parameterize for efficiency.
392
                    DynTimeProvider::new(self.runtime.clone()),
393
                );
394
                let ret: Result<_> = self
395
                    .circs
396
                    .add_origin_ent(&mut rng, created_sender, sender, padding_ctrl.clone())
397
16
                    .map(|id| (id, circ_unique_id, padding_ctrl, padding_stream));
398
                let _ = tx.send(ret); // don't care about other side going away
399
                self.update_disused_since();
400
            }
401
            CtrlMsg::ConfigUpdate(updates) => {
402
                if self.link_protocol == 4 {
403
                    // Link protocol 4 does not permit sending, or negotiating, link padding.
404
                    // We test for == 4 so that future updates to handshake.rs LINK_PROTOCOLS
405
                    // keep doing padding things.
406
                    return Ok(());
407
                }
408

            
409
                let ChannelPaddingInstructionsUpdates {
410
                    // List all the fields explicitly; that way the compiler will warn us
411
                    // if one is added and we fail to handle it here.
412
                    padding_enable,
413
                    padding_parameters,
414
                    padding_negotiate,
415
                } = &*updates;
416
                if let Some(parameters) = padding_parameters {
417
                    self.padding_timer.as_mut().reconfigure(parameters)?;
418
                }
419
                if let Some(enable) = padding_enable {
420
                    if *enable {
421
                        self.padding_timer.as_mut().enable();
422
                    } else {
423
                        self.padding_timer.as_mut().disable();
424
                    }
425
                }
426
                if let Some(padding_negotiate) = padding_negotiate {
427
                    // This replaces any previous PADDING_NEGOTIATE cell that we were
428
                    // told to send, but which we didn't manage to send yet.
429
                    // It doesn't make sense to queue them up.
430
                    self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
431
                }
432
            }
433
            CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
434
            #[cfg(feature = "circ-padding-manual")]
435
            CtrlMsg::SetChannelPadder { padder, sender } => {
436
                self.padding_ctrl
437
                    .install_padder_padding_at_hop(HopNum::from(0), padder);
438
                let _ignore = sender.send(Ok(()));
439
            }
440
        }
441
        Ok(())
442
128
    }
443

            
444
    /// Take the padding action described in `action`.
445
    ///
446
    /// (With circuit padding disabled, PaddingEvent can't be constructed.)
447
    #[cfg(not(feature = "circ-padding"))]
448
    #[allow(clippy::unused_async)] // for symmetry with the version below
449
    async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
450
        void::unreachable(action.0)
451
    }
452

            
453
    /// Take the padding action described in `action`.
454
    #[cfg(feature = "circ-padding")]
455
    async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
456
        use PaddingEvent as PE;
457
        match action {
458
            PE::SendPadding(send_padding) => {
459
                self.handle_send_padding(send_padding).await?;
460
            }
461
            PE::StartBlocking(start_blocking) => {
462
                if self.output.is_unlimited() {
463
                    self.output.set_blocked();
464
                }
465
                self.padding_blocker = Some(start_blocking);
466
            }
467
            PE::StopBlocking => {
468
                self.output.set_unlimited();
469
            }
470
        }
471
        Ok(())
472
    }
473

            
474
    /// Send the padding described in `padding`.
475
    #[cfg(feature = "circ-padding")]
476
    async fn handle_send_padding(&mut self, padding: SendPadding) -> Result<()> {
477
        // TODO circpad: This is somewhat duplicative of the logic in `Circuit::send_padding` and
478
        // `Circuit::padding_disposition`.  It might be good to unify them at some point.
479
        // For now (Oct 2025), though, they have slightly different inputs and behaviors.
480

            
481
        use crate::client::circuit::padding::{Bypass::*, Replace::*};
482
        // multihop padding belongs in circuit padders, not here.
483
        let hop = HopNum::from(0);
484
        assert_eq!(padding.hop, hop);
485

            
486
        // If true, there is blocking, but we are allowed to bypass it.
487
        let blocking_bypassed = matches!(
488
            (&self.padding_blocker, padding.may_bypass_block()),
489
            (
490
                Some(StartBlocking {
491
                    is_bypassable: true
492
                }),
493
                BypassBlocking
494
            )
495
        );
496
        // If true, there is blocking, and we can't bypass it.
497
        let this_padding_blocked = self.padding_blocker.is_some() && !blocking_bypassed;
498

            
499
        if padding.may_replace_with_data() == Replaceable {
500
            if self.output_is_full().await? {
501
                // When the output buffer is full,
502
                // we _always_ treat it as satisfying our replaceable padding.
503
                //
504
                // TODO circpad: It would be better to check whether
505
                // the output has any bytes at all, but futures_codec doesn't seem to give us a
506
                // way to check that.  If we manage to do so in the future, we should change the
507
                // logic in this function.
508
                self.padding_ctrl
509
                    .replaceable_padding_already_queued(hop, padding);
510
                return Ok(());
511
            } else if self.cells.approx_count() > 0 {
512
                // We can replace the padding with outbound cells!
513
                if this_padding_blocked {
514
                    // In the blocked case, we just declare that the pending data _is_ the queued padding.
515
                    self.padding_ctrl
516
                        .replaceable_padding_already_queued(hop, padding);
517
                } else {
518
                    // Otherwise we report that queued data _became_ padding,
519
                    // and we allow it to pass any blocking that's present.
520
                    self.padding_ctrl.queued_data_as_padding(hop, padding);
521
                    if blocking_bypassed {
522
                        self.output.allow_n_additional_items(1);
523
                    }
524
                }
525
                return Ok(());
526
            } else {
527
                // There's nothing to replace this with, so fall through.
528
            }
529
        }
530

            
531
        // There's no replacement, so we queue unconditionally.
532
        self.special_outgoing.queue_padding_cell();
533
        self.padding_ctrl.queued_padding(hop, padding);
534
        if blocking_bypassed {
535
            self.output.allow_n_additional_items(1);
536
        }
537

            
538
        Ok(())
539
    }
540

            
541
    /// Return true if the output stream is full.
542
    ///
543
    /// We use this in circuit padding to implement replaceable padding.
544
    //
545
    // TODO circpad: We'd rather check whether there is any data at all queued in self.output,
546
    // but futures_codec doesn't give us a way to do that.
547
    #[cfg(feature = "circ-padding")]
548
    async fn output_is_full(&mut self) -> Result<bool> {
549
        use futures::future::poll_fn;
550
        use std::task::Poll;
551
        // We use poll_fn to get a cx that we can pass to poll_ready_unpin.
552
        poll_fn(|cx| {
553
            Poll::Ready(match self.output.poll_ready_unpin(cx) {
554
                // If if's ready to send, it isn't full.
555
                Poll::Ready(Ok(())) => Ok(false),
556
                // If it isn't ready to send, it's full.
557
                Poll::Pending => Ok(true),
558
                // Propagate errors:
559
                Poll::Ready(Err(e)) => Err(e),
560
            })
561
        })
562
        .await
563
    }
564

            
565
    /// Helper: process a cell on a channel.  Most cell types get ignored
566
    /// or rejected; a few get delivered to circuits.
567
    #[instrument(level = "trace", skip_all)]
568
436
    async fn handle_cell(&mut self, cell: AnyChanCell) -> Result<()> {
569
        let (circid, msg) = cell.into_circid_and_msg();
570
        use AnyChanMsg::*;
571

            
572
        match msg {
573
            Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
574
            _ => trace!(
575
                channel_id = %self,
576
                "received {} for {}",
577
                msg.cmd(),
578
                CircId::get_or_zero(circid)
579
            ),
580
        }
581

            
582
        // Report the message to the padding controller.
583
        match msg {
584
            Padding(_) | Vpadding(_) => {
585
                // We always accept channel padding, even if we haven't negotiated any.
586
                let _always_acceptable = self.padding_ctrl.decrypted_padding(HopNum::from(0));
587
            }
588
            _ => self.padding_ctrl.decrypted_data(HopNum::from(0)),
589
        }
590

            
591
        match msg {
592
            // These are allowed, and need to be handled.
593
            Relay(_) => self.deliver_relay(circid, msg).await,
594

            
595
            // The 'if' guard is important as we should not consider this branch if we're not
596
            // supposed to handle CREATE* cells (and therefore RELAY_EARLY),
597
            // regardless of whether the "relay" feature is set.
598
            #[cfg(feature = "relay")]
599
            RelayEarly(_) if self.create_request_handler.is_some() => {
600
                self.deliver_relay(circid, msg).await
601
            }
602

            
603
            Destroy(_) => self.deliver_destroy(circid, msg).await,
604

            
605
            // The 'if' guard is important as we should not consider this branch if we're not
606
            // supposed to handle CREATE* cells, regardless of whether the "relay" feature is set.
607
            // We should instead fall through to the wildcard pattern.
608
            //
609
            // Clients that enable the "relay" feature, and outgoing channels for bridges,
610
            // will not have a handler set.
611
            #[cfg(feature = "relay")]
612
            CreateFast(msg) if self.create_request_handler.is_some() => {
613
                self.handle_create(circid, CreateRequest::CreateFast(msg))
614
                    .await
615
            }
616
            #[cfg(feature = "relay")]
617
            Create2(msg) if self.create_request_handler.is_some() => {
618
                self.handle_create(circid, CreateRequest::Create2(msg))
619
                    .await
620
            }
621

            
622
            CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg),
623

            
624
            // These are always ignored.
625
            Padding(_) | Vpadding(_) => Ok(()),
626
            _ => Err(Error::ChanProto(format!("Unexpected cell: {msg:?}"))),
627
        }
628
436
    }
629

            
630
    /// Give the RELAY (or possibly RELAY_EARLY) cell `msg` to the appropriate circuit.
631
360
    async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
632
360
        let Some(circid) = circid else {
633
            return Err(Error::ChanProto("Relay cell without circuit ID".into()));
634
        };
635

            
636
360
        let mut ent = self
637
360
            .circs
638
360
            .get_mut(circid)
639
360
            .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
640

            
641
336
        match &mut *ent {
642
12
            CircEnt::OpenOrigin { cell_sender: s, .. } => {
643
                // There's an open circuit; we can give it the RELAY cell.
644
12
                if s.send(msg).await.is_err() {
645
                    drop(ent);
646
                    // The circuit's receiver went away, so we should destroy the circuit.
647
                    self.outbound_destroy_circ(circid).await?;
648
12
                }
649
12
                Ok(())
650
            }
651
            #[cfg(feature = "relay")]
652
            CircEnt::OpenRelay { cell_sender: s, .. } => {
653
                // There's an open circuit; we can give it the RELAY cell.
654
                if s.send(msg).await.is_err() {
655
                    drop(ent);
656
                    // The circuit's receiver went away, so we should destroy the circuit.
657
                    // We send a DESTROY on our own channel, and the circuit reactor should have
658
                    // taken care of sending a DESTROY on the other channel.
659
                    self.outbound_destroy_circ(circid).await?;
660
                }
661
                Ok(())
662
            }
663
12
            CircEnt::Opening { .. } => Err(Error::ChanProto(
664
12
                "Relay cell on pending circuit before CREATED* received".into(),
665
12
            )),
666
312
            CircEnt::DestroySent(hs) => hs.receive_cell(),
667
        }
668
360
    }
669

            
670
    /// Handle a CREATE* cell `msg`.
671
    #[cfg(feature = "relay")]
672
    async fn handle_create(&mut self, circid: Option<CircId>, msg: CreateRequest) -> Result<()> {
673
        let Some(ref create_request_handler) = self.create_request_handler else {
674
            // We should have checked this in an 'if' guard in 'handle_cell()'.
675
            return Err(internal!("Called 'deliver_relay()', but handler isn't set").into());
676
        };
677

            
678
        let Some(circid) = circid else {
679
            let err = format!("Received {} cell without circuit ID", msg.cmd());
680
            return Err(Error::ChanProto(err));
681
        };
682

            
683
        let Some(chan) = create_request_handler.channel.upgrade() else {
684
            // This can happen if the last `Arc<Channel>` was dropped before the reactor had a
685
            // chance to notice.
686
            // We'll just try to reject the new circuit request and let the reactor shut down
687
            // normally, rather than return an error.
688
            let destroy = Destroy::new(DestroyReason::CHANNEL_CLOSED);
689
            let destroy = AnyChanCell::new(Some(circid), destroy.into());
690

            
691
            debug!(
692
                "Unable to upgrade weak `Channel` while handling {}; sending {}",
693
                msg.cmd(),
694
                destroy.msg().cmd(),
695
            );
696
            return self.send_cell(destroy).await;
697
        };
698

            
699
        // Allocate an internal circuit ID, regardless of if the create fails or not.
700
        // We expect that this will not overflow since it would require an attacker to send
701
        // 500*2^32 bytes (~2 TiB) worth of cells.
702
        let circ_uniq_id = self.circ_unique_id_ctx.next(self.unique_id);
703

            
704
        // Build the relay circuit.
705
        let create_result = create_request_handler.handler.handle_create(
706
            &self.runtime,
707
            &chan,
708
            &create_request_handler.our_ed25519_id,
709
            &create_request_handler.our_rsa_id,
710
            circid,
711
            &msg,
712
            &self.details.memquota,
713
            circ_uniq_id,
714
        );
715

            
716
        // Add the circuit to the circuit map.
717
        let response = match create_result {
718
            Ok((response, components)) => {
719
                let RelayCircComponents {
720
                    circ,
721
                    sender,
722
                    padding_ctrl,
723
                } = components;
724

            
725
                if let Err(reason) = self.circs.add_relay_ent(circid, circ, sender, padding_ctrl) {
726
                    debug!("Unable to add circuit map entry for incoming circuit: {reason}");
727
                    CreateResponse::Destroy(Destroy::new(reason))
728
                } else {
729
                    response
730
                }
731
            }
732
            Err(destroy) => CreateResponse::Destroy(destroy),
733
        };
734

            
735
        let response = AnyChanCell::new(Some(circid), response.into());
736
        self.send_cell(response).await
737
    }
738

            
739
    /// Handle a CREATED{_FAST,2} cell by passing it on to the appropriate
740
    /// circuit, if that circuit is waiting for one.
741
28
    fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
742
28
        let Some(circid) = circid else {
743
            return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
744
        };
745

            
746
28
        let target = self.circs.advance_from_opening(circid)?;
747
4
        let created = msg.try_into()?;
748
        // TODO(nickm) I think that this one actually means the other side
749
        // is closed. See arti#269.
750
4
        target.send(created).map_err(|_| {
751
            Error::from(internal!(
752
                "Circuit queue rejected created message. Is it closing?"
753
            ))
754
        })
755
28
    }
756

            
757
    /// Handle a DESTROY cell by removing the corresponding circuit
758
    /// from the map, and passing the destroy cell onward to the circuit.
759
48
    async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
760
48
        let Some(circid) = circid else {
761
            return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
762
        };
763

            
764
        /// Helper to send DESTROY cell.
765
18
        async fn send_destroy(mut sender: CircuitRxSender, msg: AnyChanMsg) -> Result<()> {
766
12
            sender
767
12
                .send(msg)
768
12
                .await
769
                // TODO(nickm) I think that this one actually means the other side
770
                // is closed. See arti#269.
771
12
                .map_err(|_| internal!("open circuit wasn't interested in destroy cell?").into())
772
12
        }
773

            
774
        // Remove the circuit from the map: nothing more can be done with it.
775
48
        let entry = self.circs.remove(circid);
776
48
        self.update_disused_since();
777
36
        match entry {
778
            // If the circuit is waiting for CREATED, tell it that it
779
            // won't get one.
780
            Some(CircEnt::Opening {
781
12
                create_response_sender,
782
                ..
783
            }) => {
784
12
                trace!(channel_id = %self, "Passing destroy to pending circuit {}", circid);
785
12
                create_response_sender
786
12
                    .send(msg.try_into()?)
787
                    // TODO(nickm) I think that this one actually means the other side
788
                    // is closed. See arti#269.
789
12
                    .map_err(|_| {
790
                        internal!("pending circuit wasn't interested in destroy cell?").into()
791
                    })
792
            }
793
            // It's an open origin circuit: tell it that it got a DESTROY cell.
794
12
            Some(CircEnt::OpenOrigin { cell_sender, .. }) => {
795
12
                trace!(channel_id = %self, "Passing destroy to open origin circuit {}", circid);
796
12
                send_destroy(cell_sender, msg).await
797
            }
798
            // It's an open relay circuit: tell it that it got a DESTROY cell.
799
            #[cfg(feature = "relay")]
800
            Some(CircEnt::OpenRelay { cell_sender, .. }) => {
801
                trace!(channel_id = %self, "Passing destroy to open relay circuit {}", circid);
802
                send_destroy(cell_sender, msg).await
803
            }
804
            // We've sent a destroy; we can leave this circuit removed.
805
12
            Some(CircEnt::DestroySent(_)) => Ok(()),
806
            // Got a DESTROY cell for a circuit we don't have.
807
            None => {
808
12
                trace!(channel_id = %self, "Destroy for nonexistent circuit {}", circid);
809
12
                Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
810
            }
811
        }
812
48
    }
813

            
814
    /// Helper: send a cell on the outbound sink.
815
112
    async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
816
112
        self.output.send(cell).await?;
817
64
        Ok(())
818
112
    }
819

            
820
    /// Called when a circuit goes away: sends a DESTROY cell and removes
821
    /// the circuit.
822
112
    async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
823
112
        trace!(channel_id = %self, "Circuit {} is gone; sending DESTROY", id);
824
        // Remove the circuit's entry from the map: nothing more
825
        // can be done with it.
826
        // TODO: It would be great to have a tighter upper bound for
827
        // the number of relay cells we'll receive.
828
112
        self.circs.destroy_sent(id, HalfCirc::new(3000));
829
112
        self.update_disused_since();
830
112
        let destroy = Destroy::new(DestroyReason::NONE).into();
831
112
        let cell = AnyChanCell::new(Some(id), destroy);
832
112
        self.send_cell(cell).await?;
833

            
834
64
        Ok(())
835
112
    }
836

            
837
    /// Update disused timestamp with current time if this channel is no longer used
838
176
    fn update_disused_since(&self) {
839
176
        if self.circs.open_ent_count() == 0 {
840
160
            // Update disused_since if it still indicates that the channel is in use
841
160
            self.details.unused_since.update_if_none();
842
160
        } else {
843
16
            // Mark this channel as in use
844
16
            self.details.unused_since.clear();
845
16
        }
846
176
    }
847

            
848
    /// Use the new KIST parameters.
849
    #[cfg(target_os = "linux")]
850
    fn apply_kist_params(&self, params: &KistParams) {
851
        use super::kist::KistMode;
852

            
853
        let set_tcp_notsent_lowat = |v: u32| {
854
            if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
855
                // This is bad, but not fatal: not setting the KIST options
856
                // comes with a performance penalty, but we don't have to crash.
857
                error_report!(e, "Failed to set KIST socket options");
858
            }
859
        };
860

            
861
        match params.kist_enabled() {
862
            KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
863
            KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
864
        }
865
    }
866

            
867
    /// Use the new KIST parameters.
868
    #[cfg(not(target_os = "linux"))]
869
    fn apply_kist_params(&self, params: &KistParams) {
870
        use super::kist::KistMode;
871

            
872
        if params.kist_enabled() != KistMode::Disabled {
873
            tracing::warn!("KIST not currently supported on non-linux platforms");
874
        }
875
    }
876
}
877

            
878
/// If the channel is configured to handle CREATE* requests,
879
/// this contains anything that is needed solely for this purpose.
880
#[cfg(feature = "relay")]
881
pub(super) struct CreateRequestHandlerAndData {
882
    /// A handler for CREATE2/CREATE_FAST messages.
883
    pub(super) handler: Arc<CreateRequestHandler>,
884
    /// The [`Channel`] associated with this reactor.
885
    ///
886
    /// We don't want the channel reactor to access its `Channel` directly
887
    /// (shared data should use its [`ChannelDetails`] instead),
888
    /// but we need it to pass it to new circuit reactors,
889
    /// so we store a copy here.
890
    pub(super) channel: Weak<Channel>,
891
    /// Our Ed25519 identity for ntor-v3 handshakes.
892
    pub(super) our_ed25519_id: Ed25519Identity,
893
    /// Our RSA identity for ntor handshakes.
894
    pub(super) our_rsa_id: RsaIdentity,
895
}
896

            
897
#[cfg(test)]
898
pub(crate) mod test {
899
    #![allow(clippy::unwrap_used)]
900
    use super::*;
901
    use crate::channel::{Canonicity, ChannelMode, ClosedUnexpectedly, UniqId};
902
    use crate::client::circuit::CircParameters;
903
    use crate::client::circuit::padding::new_padding;
904
    use crate::fake_mpsc;
905
    use crate::peer::PeerInfo;
906
    use crate::util::{DummyTimeoutEstimator, fake_mq};
907
    use futures::sink::SinkExt;
908
    use futures::stream::StreamExt;
909
    use tor_cell::chancell::msg;
910
    use tor_linkspec::OwnedChanTarget;
911
    use tor_rtcompat::SpawnExt;
912
    use tor_rtcompat::{DynTimeProvider, NoOpStreamOpsHandle, Runtime};
913

            
914
    pub(crate) type CodecResult = std::result::Result<AnyChanCell, Error>;
915

            
916
    pub(crate) fn new_reactor<R: Runtime>(
917
        runtime: R,
918
    ) -> (
919
        Arc<crate::channel::Channel>,
920
        Reactor<R>,
921
        mpsc::Receiver<AnyChanCell>,
922
        mpsc::Sender<CodecResult>,
923
    ) {
924
        let link_protocol = 4;
925
        let (send1, recv1) = mpsc::channel(32);
926
        let (send2, recv2) = mpsc::channel(32);
927
        let unique_id = UniqId::new();
928
        let dummy_target = OwnedChanTarget::builder()
929
            .ed_identity([6; 32].into())
930
            .rsa_identity([10; 20].into())
931
            .build()
932
            .unwrap();
933
        let send1 = send1.sink_map_err(|e| {
934
            trace!("got sink error: {:?}", e);
935
            Error::CellDecodeErr {
936
                object: "reactor test",
937
                err: tor_cell::Error::ChanProto("dummy message".into()),
938
            }
939
        });
940
        let stream_ops = NoOpStreamOpsHandle::default();
941
        let (chan, reactor) = crate::channel::Channel::new(
942
            ChannelMode::Client,
943
            link_protocol,
944
            Box::new(send1),
945
            Box::new(recv2),
946
            Box::new(stream_ops),
947
            unique_id,
948
            dummy_target,
949
            safelog::MaybeSensitive::not_sensitive(PeerInfo::EMPTY),
950
            crate::ClockSkew::None,
951
            runtime,
952
            fake_mq(),
953
            Canonicity::new_canonical(),
954
        )
955
        .expect("channel create failed");
956
        (chan, reactor, recv1, send2)
957
    }
958

            
959
    // Try shutdown from inside run_once..
960
    #[test]
961
    fn shutdown() {
962
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
963
            let (chan, mut reactor, _output, _input) = new_reactor(rt);
964

            
965
            chan.terminate();
966
            let r = reactor.run_once().await;
967
            assert!(matches!(r, Err(ReactorError::Shutdown)));
968
        });
969
    }
970

            
971
    // Try shutdown while reactor is running.
972
    #[test]
973
    fn shutdown2() {
974
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
975
            // TODO: Ask a rust person if this is how to do this.
976

            
977
            use futures::future::FutureExt;
978
            use futures::join;
979

            
980
            let (chan, reactor, _output, _input) = new_reactor(rt);
981
            // Let's get the reactor running...
982
            let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
983

            
984
            let rr = run_reactor.clone();
985

            
986
            let exit_then_check = async {
987
                assert!(rr.peek().is_none());
988
                // ... and terminate the channel while that's happening.
989
                chan.terminate();
990
            };
991

            
992
            let (rr_s, _) = join!(run_reactor, exit_then_check);
993

            
994
            // Now let's see. The reactor should not _still_ be running.
995
            assert!(rr_s);
996
        });
997
    }
998

            
999
    #[test]
    fn new_circ_closed() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
            assert!(chan.duration_unused().is_some()); // unused yet
            let (ret, reac) = futures::join!(
                chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
                reactor.run_once()
            );
            let (pending, circr) = ret.unwrap();
            rt.spawn(async {
                let _ignore = circr.run().await;
            })
            .unwrap();
            assert!(reac.is_ok());
            let id = pending.peek_circid();
            let ent = reactor.circs.get_mut(id);
            assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
            assert!(chan.duration_unused().is_none()); // in use
            // Now drop the circuit; this should tell the reactor to remove
            // the circuit from the map.
            drop(pending);
            reactor.run_once().await.unwrap();
            let ent = reactor.circs.get_mut(id);
            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
            let cell = output.next().await.unwrap();
            assert_eq!(cell.circid(), Some(id));
            assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
            assert!(chan.duration_unused().is_some()); // unused again
        });
    }
    // Test proper delivery of a created cell that doesn't make a channel
    #[test]
    #[ignore] // See bug #244: re-enable this test once it passes reliably.
    fn new_circ_create_failure() {
        use std::time::Duration;
        use tor_rtcompat::SleepProvider;
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
            let (ret, reac) = futures::join!(
                chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
                reactor.run_once()
            );
            let (pending, circr) = ret.unwrap();
            rt.spawn(async {
                let _ignore = circr.run().await;
            })
            .unwrap();
            assert!(reac.is_ok());
            let circparams = CircParameters::default();
            let id = pending.peek_circid();
            let ent = reactor.circs.get_mut(id);
            assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
            #[allow(clippy::clone_on_copy)]
            let rtc = rt.clone();
            let send_response = async {
                rtc.sleep(Duration::from_millis(100)).await;
                trace!("sending createdfast");
                // We'll get a bad handshake result from this createdfast cell.
                let created_cell = AnyChanCell::new(Some(id), msg::CreatedFast::new(*b"x").into());
                input.send(Ok(created_cell)).await.unwrap();
                reactor.run_once().await.unwrap();
            };
            let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
            // Make sure statuses are as expected.
            assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
            reactor.run_once().await.unwrap();
            // Make sure that the createfast cell got sent
            let cell_sent = output.next().await.unwrap();
            assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
            // But the next run if the reactor will make the circuit get closed.
            let ent = reactor.circs.get_mut(id);
            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
        });
    }
    // Try incoming cells that shouldn't arrive on channels.
    #[test]
    fn bad_cells() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
            // shouldn't get created2 cells for nonexistent circuits
            let created2_cell = msg::Created2::new(*b"hihi").into();
            input
                .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
                .await
                .unwrap();
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
            assert_eq!(
                format!("{}", e),
                "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
            );
            // Can't get a relay cell on a circuit we've never heard of.
            let relay_cell = msg::Relay::new(b"abc").into();
            input
                .send(Ok(AnyChanCell::new(CircId::new(4), relay_cell)))
                .await
                .unwrap();
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
            assert_eq!(
                format!("{}", e),
                "Channel protocol violation: Relay cell on nonexistent circuit"
            );
            // There used to be tests here for other types, but now that we only
            // accept OpenClientChanCell, we know that the codec can't even try
            // to give us e.g. VERSIONS or CREATE.
        });
    }
    #[test]
    fn deliver_relay() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            use oneshot_fused_workaround as oneshot;
            let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
            let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
            let (_circ_stream_7, mut circ_stream_13) = {
                let (snd1, _rcv1) = oneshot::channel();
                let (snd2, rcv2) = fake_mpsc(64);
                reactor.circs.put_unchecked(
                    CircId::new(7).unwrap(),
                    CircEnt::Opening {
                        create_response_sender: snd1,
                        cell_sender: snd2,
                        padding_ctrl: padding_ctrl.clone(),
                    },
                );
                let (snd3, rcv3) = fake_mpsc(64);
                reactor.circs.put_unchecked(
                    CircId::new(13).unwrap(),
                    CircEnt::OpenOrigin {
                        cell_sender: snd3,
                        padding_ctrl,
                    },
                );
                reactor.circs.put_unchecked(
                    CircId::new(23).unwrap(),
                    CircEnt::DestroySent(HalfCirc::new(25)),
                );
                (rcv2, rcv3)
            };
            // If a relay cell is sent on an open channel, the correct circuit
            // should get it.
            let relaycell: AnyChanMsg = msg::Relay::new(b"do you suppose").into();
            input
                .send(Ok(AnyChanCell::new(CircId::new(13), relaycell.clone())))
                .await
                .unwrap();
            reactor.run_once().await.unwrap();
            let got = circ_stream_13.next().await.unwrap();
            assert!(matches!(got, AnyChanMsg::Relay(_)));
            // If a relay cell is sent on an opening channel, that's an error.
            input
                .send(Ok(AnyChanCell::new(CircId::new(7), relaycell.clone())))
                .await
                .unwrap();
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
            assert_eq!(
                format!("{}", e),
                "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
            );
            // If a relay cell is sent on a non-existent channel, that's an error.
            input
                .send(Ok(AnyChanCell::new(CircId::new(101), relaycell.clone())))
                .await
                .unwrap();
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
            assert_eq!(
                format!("{}", e),
                "Channel protocol violation: Relay cell on nonexistent circuit"
            );
            // It's fine to get a relay cell on a DestroySent channel: that happens
            // when the other side hasn't noticed the Destroy yet.
            // We can do this 25 more times according to our setup:
            for _ in 0..25 {
                input
                    .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
                    .await
                    .unwrap();
                reactor.run_once().await.unwrap(); // should be fine.
            }
            // This one will fail.
            input
                .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
                .await
                .unwrap();
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
            assert_eq!(
                format!("{}", e),
                "Channel protocol violation: Too many cells received on destroyed circuit"
            );
        });
    }
    #[test]
    fn deliver_destroy() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            use crate::circuit::celltypes::*;
            use oneshot_fused_workaround as oneshot;
            let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
            let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
            let (circ_oneshot_7, mut circ_stream_13) = {
                let (snd1, rcv1) = oneshot::channel();
                let (snd2, _rcv2) = fake_mpsc(64);
                reactor.circs.put_unchecked(
                    CircId::new(7).unwrap(),
                    CircEnt::Opening {
                        create_response_sender: snd1,
                        cell_sender: snd2,
                        padding_ctrl: padding_ctrl.clone(),
                    },
                );
                let (snd3, rcv3) = fake_mpsc(64);
                reactor.circs.put_unchecked(
                    CircId::new(13).unwrap(),
                    CircEnt::OpenOrigin {
                        cell_sender: snd3,
                        padding_ctrl: padding_ctrl.clone(),
                    },
                );
                reactor.circs.put_unchecked(
                    CircId::new(23).unwrap(),
                    CircEnt::DestroySent(HalfCirc::new(25)),
                );
                (rcv1, rcv3)
            };
            // Destroying an opening circuit is fine.
            let destroycell: AnyChanMsg = msg::Destroy::new(0.into()).into();
            input
                .send(Ok(AnyChanCell::new(CircId::new(7), destroycell.clone())))
                .await
                .unwrap();
            reactor.run_once().await.unwrap();
            let msg = circ_oneshot_7.await;
            assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
            // Destroying an open circuit is fine.
            input
                .send(Ok(AnyChanCell::new(CircId::new(13), destroycell.clone())))
                .await
                .unwrap();
            reactor.run_once().await.unwrap();
            let msg = circ_stream_13.next().await.unwrap();
            assert!(matches!(msg, AnyChanMsg::Destroy(_)));
            // Destroying a DestroySent circuit is fine.
            input
                .send(Ok(AnyChanCell::new(CircId::new(23), destroycell.clone())))
                .await
                .unwrap();
            reactor.run_once().await.unwrap();
            // Destroying a nonexistent circuit is an error.
            input
                .send(Ok(AnyChanCell::new(CircId::new(101), destroycell.clone())))
                .await
                .unwrap();
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
            assert_eq!(
                format!("{}", e),
                "Channel protocol violation: Destroy for nonexistent circuit"
            );
        });
    }
    #[test]
    fn closing_if_reactor_dropped() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, reactor, _output, _input) = new_reactor(rt);
            assert!(!chan.is_closing());
            drop(reactor);
            assert!(chan.is_closing());
            assert!(matches!(
                chan.wait_for_close().await,
                Err(ClosedUnexpectedly::ReactorDropped),
            ));
        });
    }
    #[test]
    fn closing_if_reactor_shutdown() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, reactor, _output, _input) = new_reactor(rt);
            assert!(!chan.is_closing());
            chan.terminate();
            assert!(!chan.is_closing());
            let r = reactor.run().await;
            assert!(r.is_ok());
            assert!(chan.is_closing());
            assert!(chan.wait_for_close().await.is_ok());
        });
    }
    #[test]
    fn reactor_error_wait_for_close() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, reactor, _output, mut input) = new_reactor(rt);
            // force an error by sending created2 cell for nonexistent circuit
            let created2_cell = msg::Created2::new(*b"hihi").into();
            input
                .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
                .await
                .unwrap();
            // `reactor.run()` should return an error
            let run_error = reactor.run().await.unwrap_err();
            // `chan.wait_for_close()` should return the same error
            let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
            else {
                panic!("Expected a 'ReactorError'");
            };
            // `Error` doesn't implement `PartialEq`, so best we can do is to compare the strings
            assert_eq!(run_error.to_string(), wait_error.to_string());
        });
    }
}