1
//! Module providing [`CtrlMsg`].
2

            
3
use super::circuit::extender::CircuitExtender;
4
use super::{
5
    CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6
    RunOnceCmdInner, SendRelayCell,
7
};
8
use crate::Result;
9
use crate::circuit::celltypes::CreateResponse;
10
use crate::circuit::circhop::HopSettings;
11
#[cfg(feature = "circ-padding-manual")]
12
use crate::client::circuit::padding;
13
use crate::client::circuit::path;
14
use crate::client::reactor::{NoJoinPointError, NtorClient, ReactorError};
15
use crate::client::{HopLocation, TargetHop};
16
use crate::crypto::binding::CircuitBinding;
17
use crate::crypto::cell::{InboundClientLayer, OutboundClientLayer};
18
use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
19
use crate::stream::cmdcheck::AnyCmdChecker;
20
use crate::stream::flow_ctrl::state::StreamRateLimit;
21
use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
22
use crate::stream::queue::StreamQueueSender;
23
use crate::streammap;
24
use crate::util::notify::NotifySender;
25
use crate::util::skew::ClockSkew;
26
use crate::util::tunnel_activity::TunnelActivity;
27
#[cfg(test)]
28
use crate::{circuit::UniqId, client::circuit::CircParameters, crypto::cell::HopNum};
29
use postage::watch;
30
use tor_cell::chancell::msg::HandshakeType;
31
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
32
use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
33
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId};
34
use tor_error::{Bug, bad_api_usage, internal, into_bad_api_usage};
35
use tracing::{debug, trace};
36
#[cfg(feature = "hs-service")]
37
use {
38
    crate::client::reactor::IncomingStreamRequestHandler,
39
    crate::client::stream::IncomingStreamRequestFilter, crate::stream::incoming::StreamReqSender,
40
};
41

            
42
#[cfg(test)]
43
use tor_cell::relaycell::msg::SendmeTag;
44

            
45
#[cfg(feature = "conflux")]
46
use super::{Circuit, ConfluxLinkResultChannel};
47

            
48
use oneshot_fused_workaround as oneshot;
49

            
50
use crate::crypto::handshake::ntor::NtorPublicKey;
51
use crate::stream::StreamMpscReceiver;
52
use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
53

            
54
use std::result::Result as StdResult;
55

            
56
/// A message telling the reactor to do something.
57
///
58
/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
59
///
60
/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
61
/// cause the reactor to send cells on the reactor's `chan_sender`,
62
/// whereas `CtrlCmd` do not.
63
#[derive(educe::Educe)]
64
#[educe(Debug)]
65
pub(crate) enum CtrlMsg {
66
    /// Create the first hop of this circuit.
67
    Create {
68
        /// A oneshot channel on which we'll receive the creation response.
69
        recv_created: oneshot::Receiver<CreateResponse>,
70
        /// The handshake type to use for the first hop.
71
        handshake: CircuitHandshake,
72
        /// Other parameters relevant for circuit creation.
73
        settings: HopSettings,
74
        /// Oneshot channel to notify on completion.
75
        done: ReactorResultChannel<()>,
76
    },
77
    /// Extend a circuit by one hop, using the ntor handshake.
78
    ExtendNtor {
79
        /// The peer that we're extending to.
80
        ///
81
        /// Used to extend our record of the circuit's path.
82
        peer_id: OwnedChanTarget,
83
        /// The handshake type to use for this hop.
84
        public_key: NtorPublicKey,
85
        /// Information about how to connect to the relay we're extending to.
86
        linkspecs: Vec<EncodedLinkSpec>,
87
        /// Other parameters we are negotiating.
88
        settings: HopSettings,
89
        /// Oneshot channel to notify on completion.
90
        done: ReactorResultChannel<()>,
91
    },
92
    /// Extend a circuit by one hop, using the ntorv3 handshake.
93
    ExtendNtorV3 {
94
        /// The peer that we're extending to.
95
        ///
96
        /// Used to extend our record of the circuit's path.
97
        peer_id: OwnedChanTarget,
98
        /// The handshake type to use for this hop.
99
        public_key: NtorV3PublicKey,
100
        /// Information about how to connect to the relay we're extending to.
101
        linkspecs: Vec<EncodedLinkSpec>,
102
        /// Other parameters we are negotiating.
103
        settings: HopSettings,
104
        /// Oneshot channel to notify on completion.
105
        done: ReactorResultChannel<()>,
106
    },
107
    /// Begin a stream with the provided hop in this circuit.
108
    ///
109
    /// Allocates a stream ID, and sends the provided message to that hop.
110
    BeginStream {
111
        /// The hop number to begin the stream with.
112
        hop: TargetHop,
113
        /// The message to send.
114
        message: AnyRelayMsg,
115
        /// A channel to send messages on this stream down.
116
        ///
117
        /// This sender shouldn't ever block, because we use congestion control and only send
118
        /// SENDME cells once we've read enough out of the other end. If it *does* block, we
119
        /// can assume someone is trying to send us more cells than they should, and abort
120
        /// the stream.
121
        sender: StreamQueueSender,
122
        /// A channel to receive messages to send on this stream from.
123
        rx: StreamMpscReceiver<AnyRelayMsg>,
124
        /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
125
        rate_limit_notifier: watch::Sender<StreamRateLimit>,
126
        /// Notifies the stream reader when it should send a new drain rate.
127
        drain_rate_requester: NotifySender<DrainRateRequest>,
128
        /// Oneshot channel to notify on completion, with the allocated stream ID.
129
        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
130
        /// A `CmdChecker` to keep track of which message types are acceptable.
131
        cmd_checker: AnyCmdChecker,
132
    },
133
    /// Close the specified pending incoming stream, sending the provided END message.
134
    ///
135
    /// A stream is said to be pending if the message for initiating the stream was received but
136
    /// not has not been responded to yet.
137
    ///
138
    /// This should be used by responders for closing pending incoming streams initiated by the
139
    /// other party on the circuit.
140
    #[cfg(feature = "hs-service")]
141
    ClosePendingStream {
142
        /// The hop number the stream is on.
143
        hop: HopLocation,
144
        /// The stream ID to send the END for.
145
        stream_id: StreamId,
146
        /// The END message to send, if any.
147
        message: CloseStreamBehavior,
148
        /// Oneshot channel to notify on completion.
149
        done: ReactorResultChannel<()>,
150
    },
151
    /// Send a given control message on this circuit.
152
    #[cfg(feature = "send-control-msg")]
153
    SendMsg {
154
        /// The hop to receive this message.
155
        hop: TargetHop,
156
        /// The message to send.
157
        msg: AnyRelayMsg,
158
        /// A sender that we use to tell the caller that the message was sent
159
        /// and the handler installed.
160
        sender: oneshot::Sender<Result<()>>,
161
    },
162
    /// Send a given control message on this circuit, and install a control-message handler to
163
    /// receive responses.
164
    #[cfg(feature = "send-control-msg")]
165
    SendMsgAndInstallHandler {
166
        /// The message to send, if any
167
        msg: Option<AnyRelayMsgOuter>,
168
        /// A message handler to install.
169
        ///
170
        /// If this is `None`, there must already be a message handler installed
171
        #[educe(Debug(ignore))]
172
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
173
        /// A sender that we use to tell the caller that the message was sent
174
        /// and the handler installed.
175
        sender: oneshot::Sender<Result<()>>,
176
    },
177
    /// Inform the reactor that there's a flow control update for a given stream.
178
    ///
179
    /// The reactor will decide how to handle this update depending on the type of flow control and
180
    /// the current state of the stream.
181
    FlowCtrlUpdate {
182
        /// The type of flow control update, and any associated metadata.
183
        msg: FlowCtrlMsg,
184
        /// The stream ID that the update is for.
185
        stream_id: StreamId,
186
        /// The hop that the stream is on.
187
        hop: HopLocation,
188
    },
189
    /// Get the clock skew claimed by the first hop of the circuit.
190
    FirstHopClockSkew {
191
        /// Oneshot channel to return the clock skew.
192
        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
193
    },
194
    /// Link the specified circuits into the current tunnel,
195
    /// to form a multi-path tunnel.
196
    #[cfg(feature = "conflux")]
197
    #[allow(unused)] // TODO(conflux)
198
    LinkCircuits {
199
        /// The circuits to link into the tunnel,
200
        #[educe(Debug(ignore))]
201
        circuits: Vec<Circuit>,
202
        /// Oneshot channel to notify sender when all the specified circuits have finished linking,
203
        /// or have failed to link.
204
        ///
205
        /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
206
        /// (see [set construction]).
207
        ///
208
        /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
209
        answer: ConfluxLinkResultChannel,
210
    },
211
}
212

            
213
/// A message telling the reactor to do something.
214
///
215
/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
216
/// never cause cells to sent on the channel,
217
/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
218
/// some of which instruct the reactor to send cells down the channel.
219
#[derive(educe::Educe)]
220
#[educe(Debug)]
221
pub(crate) enum CtrlCmd {
222
    /// Shut down the reactor.
223
    Shutdown,
224
    /// Extend the circuit by one hop, in response to an out-of-band handshake.
225
    ///
226
    /// (This is used for onion services, where the negotiation takes place in
227
    /// INTRODUCE and RENDEZVOUS messages.)
228
    #[cfg(feature = "hs-common")]
229
    ExtendVirtual {
230
        /// The cryptographic algorithms and keys to use when communicating with
231
        /// the newly added hop.
232
        #[educe(Debug(ignore))]
233
        cell_crypto: (
234
            Box<dyn OutboundClientLayer + Send>,
235
            Box<dyn InboundClientLayer + Send>,
236
            Option<CircuitBinding>,
237
        ),
238
        /// A set of parameters to negotiate with this hop.
239
        settings: HopSettings,
240
        /// Oneshot channel to notify on completion.
241
        done: ReactorResultChannel<()>,
242
    },
243
    /// Resolve a given [`TargetHop`] into a precise [`HopLocation`].
244
    ResolveTargetHop {
245
        /// The target hop to resolve.
246
        hop: TargetHop,
247
        /// Oneshot channel to notify on completion.
248
        done: ReactorResultChannel<HopLocation>,
249
    },
250
    /// Begin accepting streams on this circuit.
251
    #[cfg(feature = "hs-service")]
252
    AwaitStreamRequest {
253
        /// A channel for sending information about an incoming stream request.
254
        incoming_sender: StreamReqSender,
255
        /// A `CmdChecker` to keep track of which message types are acceptable.
256
        cmd_checker: AnyCmdChecker,
257
        /// Oneshot channel to notify on completion.
258
        done: ReactorResultChannel<()>,
259
        /// The hop that is allowed to create streams.
260
        hop: TargetHop,
261
        /// A filter used to check requests before passing them on.
262
        #[educe(Debug(ignore))]
263
        #[cfg(feature = "hs-service")]
264
        filter: Box<dyn IncomingStreamRequestFilter>,
265
    },
266
    /// Request the binding key of a target hop.
267
    #[cfg(feature = "hs-service")]
268
    GetBindingKey {
269
        /// The hop for which we want the key.
270
        hop: TargetHop,
271
        /// Oneshot channel to notify on completion.
272
        done: ReactorResultChannel<Option<CircuitBinding>>,
273
    },
274
    /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
275
    #[cfg(test)]
276
    AddFakeHop {
277
        relay_cell_format: RelayCellFormat,
278
        fwd_lasthop: bool,
279
        rev_lasthop: bool,
280
        peer_id: path::HopDetail,
281
        params: CircParameters,
282
        done: ReactorResultChannel<()>,
283
    },
284
    /// (tests only) Get the send window and expected tags for a given hop.
285
    #[cfg(test)]
286
    QuerySendWindow {
287
        hop: HopNum,
288
        leg: UniqId,
289
        done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
290
    },
291
    /// Shut down the reactor, and return the underlying [`Circuit`],
292
    /// if the tunnel is not multi-path.
293
    ///
294
    /// Returns an error if called on a multi-path reactor.
295
    #[cfg(feature = "conflux")]
296
    #[allow(unused)] // TODO(conflux)
297
    ShutdownAndReturnCircuit {
298
        /// Oneshot channel to return the underlying [`Circuit`],
299
        /// or an error if the reactor's tunnel is multi-path.
300
        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
301
    },
302

            
303
    /// Install or remove a [`padding::CircuitPadder`] for a given hop.
304
    ///
305
    /// Any existing `CircuitPadder` at that hop is replaced.
306
    #[cfg(feature = "circ-padding-manual")]
307
    SetPadder {
308
        /// The hop to modify.
309
        hop: HopLocation,
310
        /// The Padder to install, or None to remove any existing padder.
311
        padder: Option<padding::CircuitPadder>,
312
        /// A sender to alert after we've changed the padding.
313
        sender: oneshot::Sender<Result<()>>,
314
    },
315

            
316
    /// Yield the most active [`TunnelActivity`] for any hop on any leg of this tunnel.
317
    GetTunnelActivity {
318
        /// A sender to receive the reply.
319
        sender: oneshot::Sender<TunnelActivity>,
320
    },
321
}
322

            
323
/// A flow control update message.
324
#[derive(Debug)]
325
pub(crate) enum FlowCtrlMsg {
326
    /// Send a SENDME message on this stream.
327
    Sendme,
328
    /// Send an XON message on this stream with the given rate.
329
    Xon(XonKbpsEwma),
330
}
331

            
332
/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
333
///
334
/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
335
/// respectively, are handled.
336
pub(crate) struct ControlHandler<'a> {
337
    /// Reference to the reactor of this
338
    reactor: &'a mut Reactor,
339
}
340

            
341
impl<'a> ControlHandler<'a> {
342
    /// Constructor.
343
1064
    pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
344
1064
        Self { reactor }
345
1064
    }
346

            
347
    /// Handle a control message.
348
240
    pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
349
240
        trace!(
350
            tunnel_id = %self.reactor.tunnel_id,
351
            msg = ?msg,
352
            "reactor received control message"
353
        );
354

            
355
240
        match msg {
356
            // This is handled earlier, since it requires blocking.
357
            CtrlMsg::Create { done, .. } => {
358
                if self.reactor.circuits.len() == 1 {
359
                    // This should've been handled in Reactor::run_once()
360
                    // (ControlHandler::handle_msg() is never called before wait_for_create()).
361
                    debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
362
                    // Don't care if the receiver goes away
363
                    let _ = done.send(Err(tor_error::bad_api_usage!(
364
                        "cannot create first hop twice"
365
                    )
366
                    .into()));
367
                } else {
368
                    // Don't care if the receiver goes away
369
                    let _ = done.send(Err(tor_error::bad_api_usage!(
370
                        "cannot create first hop on multipath tunnel"
371
                    )
372
                    .into()));
373
                }
374

            
375
                Ok(None)
376
            }
377
            CtrlMsg::ExtendNtor {
378
60
                peer_id,
379
60
                public_key,
380
60
                linkspecs,
381
60
                settings,
382
60
                done,
383
            } => {
384
60
                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
385
                    // Don't care if the receiver goes away
386
                    let _ = done.send(Err(tor_error::bad_api_usage!(
387
                        "cannot extend multipath tunnel"
388
                    )
389
                    .into()));
390

            
391
                    return Ok(None);
392
                };
393

            
394
60
                let (extender, cell) = CircuitExtender::<NtorClient>::begin(
395
60
                    peer_id,
396
                    HandshakeType::NTOR,
397
60
                    &public_key,
398
60
                    linkspecs,
399
60
                    settings,
400
60
                    &(),
401
60
                    circ,
402
60
                    done,
403
                )?;
404
60
                self.reactor
405
60
                    .cell_handlers
406
60
                    .set_meta_handler(Box::new(extender))?;
407

            
408
60
                Ok(Some(RunOnceCmdInner::Send {
409
60
                    leg: circ.unique_id(),
410
60
                    cell,
411
60
                    done: None,
412
60
                }))
413
            }
414
            CtrlMsg::ExtendNtorV3 {
415
12
                peer_id,
416
12
                public_key,
417
12
                linkspecs,
418
12
                settings,
419
12
                done,
420
            } => {
421
12
                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
422
                    // Don't care if the receiver goes away
423
                    let _ = done.send(Err(tor_error::bad_api_usage!(
424
                        "cannot extend multipath tunnel"
425
                    )
426
                    .into()));
427

            
428
                    return Ok(None);
429
                };
430

            
431
12
                let client_extensions = settings.circuit_request_extensions()?;
432

            
433
12
                let (extender, cell) = CircuitExtender::<NtorV3Client>::begin(
434
12
                    peer_id,
435
                    HandshakeType::NTOR_V3,
436
12
                    &public_key,
437
12
                    linkspecs,
438
12
                    settings,
439
12
                    &client_extensions,
440
12
                    circ,
441
12
                    done,
442
                )?;
443
12
                self.reactor
444
12
                    .cell_handlers
445
12
                    .set_meta_handler(Box::new(extender))?;
446

            
447
12
                Ok(Some(RunOnceCmdInner::Send {
448
12
                    leg: circ.unique_id(),
449
12
                    cell,
450
12
                    done: None,
451
12
                }))
452
            }
453
            CtrlMsg::BeginStream {
454
96
                hop,
455
96
                message,
456
96
                sender,
457
96
                rx,
458
96
                rate_limit_notifier,
459
96
                drain_rate_requester,
460
96
                done,
461
96
                cmd_checker,
462
            } => {
463
                // If resolving the hop fails,
464
                // we want to report an error back to the initiator and not shut down the reactor.
465
96
                let hop_location = match self.reactor.resolve_target_hop(hop) {
466
96
                    Ok(x) => x,
467
                    Err(e) => {
468
                        let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
469
                        // don't care if receiver goes away
470
                        let _ = done.send(Err(e.into()));
471
                        return Ok(None);
472
                    }
473
                };
474
96
                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
475
96
                    Ok(x) => x,
476
                    Err(e) => {
477
                        let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
478
                        // don't care if receiver goes away
479
                        let _ = done.send(Err(e.into()));
480
                        return Ok(None);
481
                    }
482
                };
483
96
                let circ = match self.reactor.circuits.leg_mut(leg_id) {
484
96
                    Some(x) => x,
485
                    None => {
486
                        let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
487
                        // don't care if receiver goes away
488
                        let _ = done.send(Err(e.into()));
489
                        return Ok(None);
490
                    }
491
                };
492

            
493
96
                let result = circ.begin_stream(
494
96
                    hop_num,
495
96
                    message,
496
96
                    sender,
497
96
                    rx,
498
96
                    rate_limit_notifier,
499
96
                    drain_rate_requester,
500
96
                    cmd_checker,
501
                );
502

            
503
96
                let (cell, stream_id) = match result {
504
96
                    Ok((cell, stream_id)) => (cell, stream_id),
505
                    Err(e) => {
506
                        // don't care if receiver goes away.
507
                        let _ = done.send(Err(e.clone()));
508
                        return Err(e);
509
                    }
510
                };
511

            
512
96
                Ok(Some(RunOnceCmdInner::BeginStream {
513
96
                    leg: leg_id,
514
96
                    cell,
515
96
                    stream_id,
516
96
                    hop: hop_location,
517
96
                    done,
518
96
                }))
519
            }
520
            #[cfg(feature = "hs-service")]
521
            CtrlMsg::ClosePendingStream {
522
12
                hop,
523
12
                stream_id,
524
12
                message,
525
12
                done,
526
12
            } => Ok(Some(RunOnceCmdInner::CloseStream {
527
12
                hop,
528
12
                sid: stream_id,
529
12
                behav: message,
530
12
                reason: streammap::TerminateReason::ExplicitEnd,
531
12
                done: Some(done),
532
12
            })),
533
            CtrlMsg::FlowCtrlUpdate {
534
                msg,
535
                stream_id,
536
                hop,
537
            } => {
538
                match msg {
539
                    FlowCtrlMsg::Sendme => {
540
                        let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
541
                            Ok(x) => x,
542
                            Err(NoJoinPointError) => {
543
                                // A stream tried to send a stream-level SENDME message to the join point of
544
                                // a tunnel that has never had a join point. Currently in arti, only a
545
                                // `StreamTarget` asks us to send a stream-level SENDME, and this tunnel
546
                                // originally created the `StreamTarget` to begin with. So this is a
547
                                // legitimate bug somewhere in the tunnel code.
548
                                return Err(
549
                                    internal!(
550
                                        "Could not send a stream-level SENDME to a join point on a tunnel without a join point",
551
                                    )
552
                                    .into()
553
                                );
554
                            }
555
                        };
556

            
557
                        // Congestion control decides if we can send stream level SENDMEs or not.
558
                        let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num)
559
                        {
560
                            Some(x) => x,
561
                            None => {
562
                                // The leg/hop has disappeared. This is fine since the stream may have ended
563
                                // and been cleaned up while this `CtrlMsg::SendSendme` message was queued.
564
                                // It is possible that is a bug and this is an incorrect leg/hop number, but
565
                                // it's not currently possible to differentiate between an incorrect leg/hop
566
                                // number and a circuit hop that has been closed.
567
                                debug!(
568
                                    "Could not send a stream-level SENDME on a hop that does not exist. Ignoring."
569
                                );
570
                                return Ok(None);
571
                            }
572
                        };
573

            
574
                        if !sendme_required {
575
                            // Nothing to do, so discard the SENDME.
576
                            return Ok(None);
577
                        }
578

            
579
                        let sendme = Sendme::new_empty();
580
                        let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
581

            
582
                        let cell = SendRelayCell {
583
                            hop: Some(hop_num),
584
                            early: false,
585
                            cell,
586
                        };
587

            
588
                        Ok(Some(RunOnceCmdInner::Send {
589
                            leg: leg_id,
590
                            cell,
591
                            done: None,
592
                        }))
593
                    }
594
                    FlowCtrlMsg::Xon(rate) => Ok(Some(RunOnceCmdInner::MaybeSendXon {
595
                        rate,
596
                        hop,
597
                        stream_id,
598
                    })),
599
                }
600
            }
601
            // TODO(conflux): this should specify which leg to send the msg on
602
            // (currently we send it down the primary leg).
603
            //
604
            // This will involve updating ClientCIrc::send_raw_msg() to take a
605
            // leg id argument (which is a breaking change.
606
            #[cfg(feature = "send-control-msg")]
607
            CtrlMsg::SendMsg { hop, msg, sender } => {
608
                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
609
                    // Don't care if receiver goes away
610
                    let _ = sender.send(Err(bad_api_usage!("Unknown {hop:?}").into()));
611
                    return Ok(None);
612
                };
613

            
614
                let cell = AnyRelayMsgOuter::new(None, msg);
615
                let cell = SendRelayCell {
616
                    hop: Some(hop_num),
617
                    early: false,
618
                    cell,
619
                };
620

            
621
                Ok(Some(RunOnceCmdInner::Send {
622
                    leg: leg_id,
623
                    cell,
624
                    done: Some(sender),
625
                }))
626
            }
627
            // TODO(conflux): this should specify which leg to send the msg on
628
            // (currently we send it down the primary leg)
629
            #[cfg(feature = "send-control-msg")]
630
            CtrlMsg::SendMsgAndInstallHandler {
631
                msg,
632
                handler,
633
                sender,
634
            } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
635
                msg,
636
                handler,
637
                done: sender,
638
            })),
639
            CtrlMsg::FirstHopClockSkew { answer } => {
640
                Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
641
            }
642
            #[cfg(feature = "conflux")]
643
60
            CtrlMsg::LinkCircuits { circuits, answer } => {
644
60
                Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
645
            }
646
        }
647
240
    }
648

            
649
    /// Handle a control command.
650
    #[allow(clippy::needless_pass_by_value)] // Needed when conflux is enabled
651
824
    pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
652
824
        trace!(
653
            tunnel_id = %self.reactor.tunnel_id,
654
            msg = ?msg,
655
            "reactor received control command"
656
        );
657

            
658
824
        match msg {
659
            CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
660
            #[cfg(feature = "hs-common")]
661
            #[allow(unreachable_code)]
662
            CtrlCmd::ExtendVirtual {
663
                cell_crypto,
664
                settings,
665
                done,
666
            } => {
667
                let (outbound, inbound, binding) = cell_crypto;
668

            
669
                // TODO HS: Perhaps this should describe the onion service, or
670
                // describe why the virtual hop was added, or something?
671
                let peer_id = path::HopDetail::Virtual;
672

            
673
                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
674
                    // Don't care if the receiver goes away
675
                    let _ = done.send(Err(tor_error::bad_api_usage!(
676
                        "cannot extend multipath tunnel"
677
                    )
678
                    .into()));
679

            
680
                    return Ok(());
681
                };
682

            
683
                leg.add_hop(peer_id, outbound, inbound, binding, &settings)?;
684
                let _ = done.send(Ok(()));
685

            
686
                Ok(())
687
            }
688
48
            CtrlCmd::ResolveTargetHop { hop, done } => {
689
48
                let _ = done.send(
690
48
                    self.reactor
691
48
                        .resolve_target_hop(hop)
692
48
                        .map_err(|_| crate::util::err::Error::NoSuchHop),
693
                );
694
48
                Ok(())
695
            }
696
            #[cfg(feature = "hs-service")]
697
            CtrlCmd::AwaitStreamRequest {
698
60
                cmd_checker,
699
60
                incoming_sender,
700
60
                hop,
701
60
                done,
702
60
                filter,
703
            } => {
704
60
                let Some((_, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
705
                    let _ = done.send(Err(crate::Error::NoSuchHop));
706
                    return Ok(());
707
                };
708
                // TODO: At some point we might want to add a CtrlCmd for
709
                // de-registering the handler.  See comments on `allow_stream_requests`.
710
60
                let handler = IncomingStreamRequestHandler {
711
60
                    incoming_sender,
712
60
                    cmd_checker,
713
60
                    hop_num: Some(hop_num),
714
60
                    filter,
715
60
                };
716

            
717
60
                let ret = self
718
60
                    .reactor
719
60
                    .cell_handlers
720
60
                    .set_incoming_stream_req_handler(handler);
721
60
                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
722

            
723
60
                Ok(())
724
            }
725
            #[cfg(feature = "hs-service")]
726
            CtrlCmd::GetBindingKey { hop, done } => {
727
                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
728
                    let _ = done.send(Err(tor_error::internal!(
729
                        "Unknown TargetHop when getting binding key"
730
                    )
731
                    .into()));
732
                    return Ok(());
733
                };
734
                let Some(circuit) = self.reactor.circuits.leg(leg_id) else {
735
                    let _ = done.send(Err(tor_error::bad_api_usage!(
736
                        "Unknown circuit id {leg_id} when getting binding key"
737
                    )
738
                    .into()));
739
                    return Ok(());
740
                };
741
                // Get the binding key from the mutable state and send it back.
742
                let key = circuit.mutable().binding_key(hop_num);
743
                let _ = done.send(Ok(key));
744

            
745
                Ok(())
746
            }
747
            #[cfg(test)]
748
            CtrlCmd::AddFakeHop {
749
632
                relay_cell_format,
750
632
                fwd_lasthop,
751
632
                rev_lasthop,
752
632
                peer_id,
753
632
                params,
754
632
                done,
755
            } => {
756
632
                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
757
                    // Don't care if the receiver goes away
758
                    let _ = done.send(Err(tor_error::bad_api_usage!(
759
                        "cannot add fake hop to multipath tunnel"
760
                    )
761
                    .into()));
762

            
763
                    return Ok(());
764
                };
765

            
766
632
                leg.handle_add_fake_hop(
767
632
                    relay_cell_format,
768
632
                    fwd_lasthop,
769
632
                    rev_lasthop,
770
632
                    peer_id,
771
632
                    &params,
772
632
                    done,
773
                );
774

            
775
632
                Ok(())
776
            }
777
            #[cfg(test)]
778
20
            CtrlCmd::QuerySendWindow { hop, leg, done } => {
779
                // Immediately invoked function means that errors will be sent to the channel.
780
30
                let _ = done.send((|| {
781
20
                    let leg = self.reactor.circuits.leg_mut(leg).ok_or_else(|| {
782
                        bad_api_usage!("cannot query send window of non-existent circuit")
783
                    })?;
784

            
785
20
                    let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
786
                        "received QuerySendWindow for unknown hop {}",
787
20
                        hop.display()
788
                    ))?;
789

            
790
20
                    Ok(hop.send_window_and_expected_tags())
791
                })());
792

            
793
20
                Ok(())
794
            }
795
            #[cfg(feature = "conflux")]
796
64
            CtrlCmd::ShutdownAndReturnCircuit { answer } => {
797
64
                self.reactor.handle_shutdown_and_return_circuit(answer)
798
            }
799
            #[cfg(feature = "circ-padding-manual")]
800
            CtrlCmd::SetPadder {
801
                hop,
802
                padder,
803
                sender,
804
            } => {
805
                let result = self.reactor.set_padding_at_hop(hop, padder);
806
                let _ = sender.send(result);
807
                Ok(())
808
            }
809
            CtrlCmd::GetTunnelActivity { sender } => {
810
                let count = self.reactor.circuits.tunnel_activity();
811
                let _ = sender.send(count);
812
                Ok(())
813
            }
814
        }
815
824
    }
816
}