1
//! Module providing [`CtrlMsg`].
2

            
3
use super::circuit::extender::CircuitExtender;
4
use super::{
5
    CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6
    RunOnceCmdInner, SendRelayCell,
7
};
8
use crate::Result;
9
use crate::circuit::celltypes::CreateResponse;
10
use crate::circuit::circhop::HopSettings;
11
#[cfg(feature = "circ-padding-manual")]
12
use crate::client::circuit::padding;
13
use crate::client::circuit::path;
14
use crate::client::reactor::{NoJoinPointError, NtorClient, ReactorError};
15
use crate::client::{HopLocation, TargetHop};
16
use crate::crypto::binding::CircuitBinding;
17
use crate::crypto::cell::{InboundClientLayer, OutboundClientLayer};
18
use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
19
use crate::stream::cmdcheck::AnyCmdChecker;
20
use crate::stream::flow_ctrl::state::StreamRateLimit;
21
use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
22
use crate::stream::queue::StreamQueueSender;
23
use crate::streammap;
24
use crate::util::notify::NotifySender;
25
use crate::util::skew::ClockSkew;
26
use crate::util::tunnel_activity::TunnelActivity;
27
#[cfg(test)]
28
use crate::{circuit::UniqId, client::circuit::CircParameters, crypto::cell::HopNum};
29
use postage::watch;
30
use tor_cell::chancell::msg::HandshakeType;
31
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
32
use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
33
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId};
34
use tor_error::{Bug, bad_api_usage, internal, into_bad_api_usage};
35
use tracing::{debug, trace};
36
#[cfg(feature = "hs-service")]
37
use {
38
    crate::client::reactor::IncomingStreamRequestHandler,
39
    crate::client::stream::IncomingStreamRequestFilter, crate::stream::incoming::StreamReqSender,
40
};
41

            
42
#[cfg(test)]
43
use tor_cell::relaycell::msg::SendmeTag;
44

            
45
#[cfg(feature = "conflux")]
46
use super::{Circuit, ConfluxLinkResultChannel};
47

            
48
use oneshot_fused_workaround as oneshot;
49

            
50
use crate::crypto::handshake::ntor::NtorPublicKey;
51
use crate::stream::StreamMpscReceiver;
52
use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
53

            
54
use std::result::Result as StdResult;
55

            
56
/// A message telling the reactor to do something.
57
///
58
/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
59
///
60
/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
61
/// cause the reactor to send cells on the reactor's `chan_sender`,
62
/// whereas `CtrlCmd` do not.
63
#[derive(educe::Educe)]
64
#[educe(Debug)]
65
pub(crate) enum CtrlMsg {
66
    /// Create the first hop of this circuit.
67
    Create {
68
        /// A oneshot channel on which we'll receive the creation response.
69
        recv_created: oneshot::Receiver<CreateResponse>,
70
        /// The handshake type to use for the first hop.
71
        handshake: CircuitHandshake,
72
        /// Other parameters relevant for circuit creation.
73
        settings: HopSettings,
74
        /// Oneshot channel to notify on completion.
75
        done: ReactorResultChannel<()>,
76
    },
77
    /// Extend a circuit by one hop, using the ntor handshake.
78
    ExtendNtor {
79
        /// The peer that we're extending to.
80
        ///
81
        /// Used to extend our record of the circuit's path.
82
        peer_id: OwnedChanTarget,
83
        /// The handshake type to use for this hop.
84
        public_key: NtorPublicKey,
85
        /// Information about how to connect to the relay we're extending to.
86
        linkspecs: Vec<EncodedLinkSpec>,
87
        /// Other parameters we are negotiating.
88
        settings: HopSettings,
89
        /// Oneshot channel to notify on completion.
90
        done: ReactorResultChannel<()>,
91
    },
92
    /// Extend a circuit by one hop, using the ntorv3 handshake.
93
    ExtendNtorV3 {
94
        /// The peer that we're extending to.
95
        ///
96
        /// Used to extend our record of the circuit's path.
97
        peer_id: OwnedChanTarget,
98
        /// The handshake type to use for this hop.
99
        public_key: NtorV3PublicKey,
100
        /// Information about how to connect to the relay we're extending to.
101
        linkspecs: Vec<EncodedLinkSpec>,
102
        /// Other parameters we are negotiating.
103
        settings: HopSettings,
104
        /// Oneshot channel to notify on completion.
105
        done: ReactorResultChannel<()>,
106
    },
107
    /// Begin a stream with the provided hop in this circuit.
108
    ///
109
    /// Allocates a stream ID, and sends the provided message to that hop.
110
    BeginStream {
111
        /// The hop number to begin the stream with.
112
        hop: TargetHop,
113
        /// The message to send.
114
        message: AnyRelayMsg,
115
        /// A channel to send messages on this stream down.
116
        ///
117
        /// This sender shouldn't ever block, because we use congestion control and only send
118
        /// SENDME cells once we've read enough out of the other end. If it *does* block, we
119
        /// can assume someone is trying to send us more cells than they should, and abort
120
        /// the stream.
121
        sender: StreamQueueSender,
122
        /// A channel to receive messages to send on this stream from.
123
        rx: StreamMpscReceiver<AnyRelayMsg>,
124
        /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
125
        rate_limit_notifier: watch::Sender<StreamRateLimit>,
126
        /// Notifies the stream reader when it should send a new drain rate.
127
        drain_rate_requester: NotifySender<DrainRateRequest>,
128
        /// Oneshot channel to notify on completion, with the allocated stream ID.
129
        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
130
        /// A `CmdChecker` to keep track of which message types are acceptable.
131
        cmd_checker: AnyCmdChecker,
132
    },
133
    /// Close the specified pending incoming stream, sending the provided END message.
134
    ///
135
    /// A stream is said to be pending if the message for initiating the stream was received but
136
    /// not has not been responded to yet.
137
    ///
138
    /// This should be used by responders for closing pending incoming streams initiated by the
139
    /// other party on the circuit.
140
    #[cfg(feature = "hs-service")]
141
    ClosePendingStream {
142
        /// The hop number the stream is on.
143
        hop: HopLocation,
144
        /// The stream ID to send the END for.
145
        stream_id: StreamId,
146
        /// The END message to send, if any.
147
        message: CloseStreamBehavior,
148
        /// Oneshot channel to notify on completion.
149
        done: ReactorResultChannel<()>,
150
    },
151
    /// Send a given control message on this circuit.
152
    #[cfg(feature = "send-control-msg")]
153
    SendMsg {
154
        /// The hop to receive this message.
155
        hop: TargetHop,
156
        /// The message to send.
157
        msg: AnyRelayMsg,
158
        /// A sender that we use to tell the caller that the message was sent
159
        /// and the handler installed.
160
        sender: oneshot::Sender<Result<()>>,
161
    },
162
    /// Send a given control message on this circuit, and install a control-message handler to
163
    /// receive responses.
164
    #[cfg(feature = "send-control-msg")]
165
    SendMsgAndInstallHandler {
166
        /// The message to send, if any
167
        msg: Option<AnyRelayMsgOuter>,
168
        /// A message handler to install.
169
        ///
170
        /// If this is `None`, there must already be a message handler installed
171
        #[educe(Debug(ignore))]
172
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
173
        /// A sender that we use to tell the caller that the message was sent
174
        /// and the handler installed.
175
        sender: oneshot::Sender<Result<()>>,
176
    },
177
    /// Inform the reactor that there's a flow control update for a given stream.
178
    ///
179
    /// The reactor will decide how to handle this update depending on the type of flow control and
180
    /// the current state of the stream.
181
    FlowCtrlUpdate {
182
        /// The type of flow control update, and any associated metadata.
183
        msg: FlowCtrlMsg,
184
        /// The stream ID that the update is for.
185
        stream_id: StreamId,
186
        /// The hop that the stream is on.
187
        hop: HopLocation,
188
    },
189
    /// Get the clock skew claimed by the first hop of the circuit.
190
    FirstHopClockSkew {
191
        /// Oneshot channel to return the clock skew.
192
        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
193
    },
194
    /// Link the specified circuits into the current tunnel,
195
    /// to form a multi-path tunnel.
196
    #[cfg(feature = "conflux")]
197
    #[allow(unused)] // TODO(conflux)
198
    LinkCircuits {
199
        /// The circuits to link into the tunnel,
200
        #[educe(Debug(ignore))]
201
        circuits: Vec<Circuit>,
202
        /// Oneshot channel to notify sender when all the specified circuits have finished linking,
203
        /// or have failed to link.
204
        ///
205
        /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
206
        /// (see [set construction]).
207
        ///
208
        /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
209
        answer: ConfluxLinkResultChannel,
210
    },
211
}
212

            
213
/// A message telling the reactor to do something.
214
///
215
/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
216
/// never cause cells to sent on the channel,
217
/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
218
/// some of which instruct the reactor to send cells down the channel.
219
#[derive(educe::Educe)]
220
#[educe(Debug)]
221
pub(crate) enum CtrlCmd {
222
    /// Shut down the reactor.
223
    Shutdown,
224
    /// Extend the circuit by one hop, in response to an out-of-band handshake.
225
    ///
226
    /// (This is used for onion services, where the negotiation takes place in
227
    /// INTRODUCE and RENDEZVOUS messages.)
228
    #[cfg(feature = "hs-common")]
229
    ExtendVirtual {
230
        /// The cryptographic algorithms and keys to use when communicating with
231
        /// the newly added hop.
232
        #[educe(Debug(ignore))]
233
        cell_crypto: (
234
            Box<dyn OutboundClientLayer + Send>,
235
            Box<dyn InboundClientLayer + Send>,
236
            Option<CircuitBinding>,
237
        ),
238
        /// A set of parameters to negotiate with this hop.
239
        settings: HopSettings,
240
        /// Oneshot channel to notify on completion.
241
        done: ReactorResultChannel<()>,
242
    },
243
    /// Resolve a given [`TargetHop`] into a precise [`HopLocation`].
244
    ResolveTargetHop {
245
        /// The target hop to resolve.
246
        hop: TargetHop,
247
        /// Oneshot channel to notify on completion.
248
        done: ReactorResultChannel<HopLocation>,
249
    },
250
    /// Begin accepting streams on this circuit.
251
    #[cfg(feature = "hs-service")]
252
    AwaitStreamRequest {
253
        /// A channel for sending information about an incoming stream request.
254
        incoming_sender: StreamReqSender,
255
        /// A `CmdChecker` to keep track of which message types are acceptable.
256
        cmd_checker: AnyCmdChecker,
257
        /// Oneshot channel to notify on completion.
258
        done: ReactorResultChannel<()>,
259
        /// The hop that is allowed to create streams.
260
        hop: TargetHop,
261
        /// A filter used to check requests before passing them on.
262
        #[educe(Debug(ignore))]
263
        #[cfg(feature = "hs-service")]
264
        filter: Box<dyn IncomingStreamRequestFilter>,
265
    },
266
    /// Request the binding key of a target hop.
267
    #[cfg(feature = "hs-service")]
268
    GetBindingKey {
269
        /// The hop for which we want the key.
270
        hop: TargetHop,
271
        /// Oneshot channel to notify on completion.
272
        done: ReactorResultChannel<Option<CircuitBinding>>,
273
    },
274
    /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
275
    #[cfg(test)]
276
    AddFakeHop {
277
        relay_cell_format: RelayCellFormat,
278
        fwd_lasthop: bool,
279
        rev_lasthop: bool,
280
        peer_id: path::HopDetail,
281
        params: CircParameters,
282
        done: ReactorResultChannel<()>,
283
    },
284
    /// (tests only) Get the send window and expected tags for a given hop.
285
    #[cfg(test)]
286
    QuerySendWindow {
287
        hop: HopNum,
288
        leg: UniqId,
289
        done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
290
    },
291
    /// Shut down the reactor, and return the underlying [`Circuit`],
292
    /// if the tunnel is not multi-path.
293
    ///
294
    /// Returns an error if called on a multi-path reactor.
295
    #[cfg(feature = "conflux")]
296
    #[allow(unused)] // TODO(conflux)
297
    ShutdownAndReturnCircuit {
298
        /// Oneshot channel to return the underlying [`Circuit`],
299
        /// or an error if the reactor's tunnel is multi-path.
300
        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
301
    },
302

            
303
    /// Install or remove a [`padding::CircuitPadder`] for a given hop.
304
    ///
305
    /// Any existing `CircuitPadder` at that hop is replaced.
306
    #[cfg(feature = "circ-padding-manual")]
307
    SetPadder {
308
        /// The hop to modify.
309
        hop: HopLocation,
310
        /// The Padder to install, or None to remove any existing padder.
311
        padder: Option<padding::CircuitPadder>,
312
        /// A sender to alert after we've changed the padding.
313
        sender: oneshot::Sender<Result<()>>,
314
    },
315

            
316
    /// Yield the most active [`TunnelActivity`] for any hop on any leg of this tunnel.
317
    GetTunnelActivity {
318
        /// A sender to receive the reply.
319
        sender: oneshot::Sender<TunnelActivity>,
320
    },
321
}
322

            
323
/// A flow control update message.
324
#[derive(Debug)]
325
pub(crate) enum FlowCtrlMsg {
326
    /// Send a SENDME message on this stream.
327
    Sendme,
328
    /// Send an XON message on this stream with the given rate.
329
    Xon(XonKbpsEwma),
330
}
331

            
332
/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
333
///
334
/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
335
/// respectively, are handled.
336
pub(crate) struct ControlHandler<'a> {
337
    /// Reference to the reactor of this
338
    reactor: &'a mut Reactor,
339
}
340

            
341
impl<'a> ControlHandler<'a> {
342
    /// Constructor.
343
1064
    pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
344
1064
        Self { reactor }
345
1064
    }
346

            
347
    /// Handle a control message.
348
240
    pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
349
240
        trace!(
350
            tunnel_id = %self.reactor.tunnel_id,
351
            msg = ?msg,
352
            "reactor received control message"
353
        );
354

            
355
240
        match msg {
356
            // This is handled earlier, since it requires blocking.
357
            CtrlMsg::Create { done, .. } => {
358
                if self.reactor.circuits.len() == 1 {
359
                    // This should've been handled in Reactor::run_once()
360
                    // (ControlHandler::handle_msg() is never called before wait_for_create()).
361
                    debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
362
                    // Don't care if the receiver goes away
363
                    let _ = done.send(Err(tor_error::bad_api_usage!(
364
                        "cannot create first hop twice"
365
                    )
366
                    .into()));
367
                } else {
368
                    // Don't care if the receiver goes away
369
                    let _ = done.send(Err(tor_error::bad_api_usage!(
370
                        "cannot create first hop on multipath tunnel"
371
                    )
372
                    .into()));
373
                }
374

            
375
                Ok(None)
376
            }
377
            CtrlMsg::ExtendNtor {
378
60
                peer_id,
379
60
                public_key,
380
60
                linkspecs,
381
60
                settings,
382
60
                done,
383
            } => {
384
60
                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
385
                    // Don't care if the receiver goes away
386
                    let _ = done.send(Err(tor_error::bad_api_usage!(
387
                        "cannot extend multipath tunnel"
388
                    )
389
                    .into()));
390

            
391
                    return Ok(None);
392
                };
393

            
394
60
                let (extender, cell) = CircuitExtender::<NtorClient>::begin(
395
60
                    peer_id,
396
                    HandshakeType::NTOR,
397
60
                    &public_key,
398
60
                    linkspecs,
399
60
                    settings,
400
60
                    &(),
401
60
                    circ,
402
60
                    done,
403
                )?;
404
60
                self.reactor
405
60
                    .cell_handlers
406
60
                    .set_meta_handler(Box::new(extender))?;
407

            
408
60
                Ok(Some(RunOnceCmdInner::Send {
409
60
                    leg: circ.unique_id(),
410
60
                    cell,
411
60
                    done: None,
412
60
                }))
413
            }
414
            CtrlMsg::ExtendNtorV3 {
415
12
                peer_id,
416
12
                public_key,
417
12
                linkspecs,
418
12
                settings,
419
12
                done,
420
            } => {
421
12
                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
422
                    // Don't care if the receiver goes away
423
                    let _ = done.send(Err(tor_error::bad_api_usage!(
424
                        "cannot extend multipath tunnel"
425
                    )
426
                    .into()));
427

            
428
                    return Ok(None);
429
                };
430

            
431
12
                let client_extensions = settings.circuit_request_extensions()?;
432

            
433
12
                let (extender, cell) = CircuitExtender::<NtorV3Client>::begin(
434
12
                    peer_id,
435
                    HandshakeType::NTOR_V3,
436
12
                    &public_key,
437
12
                    linkspecs,
438
12
                    settings,
439
12
                    &client_extensions,
440
12
                    circ,
441
12
                    done,
442
                )?;
443
12
                self.reactor
444
12
                    .cell_handlers
445
12
                    .set_meta_handler(Box::new(extender))?;
446

            
447
12
                Ok(Some(RunOnceCmdInner::Send {
448
12
                    leg: circ.unique_id(),
449
12
                    cell,
450
12
                    done: None,
451
12
                }))
452
            }
453
            CtrlMsg::BeginStream {
454
96
                hop,
455
96
                message,
456
96
                sender,
457
96
                rx,
458
96
                rate_limit_notifier,
459
96
                drain_rate_requester,
460
96
                done,
461
96
                cmd_checker,
462
            } => {
463
                // If resolving the hop fails,
464
                // we want to report an error back to the initiator and not shut down the reactor.
465
96
                let hop_location = match self.reactor.resolve_target_hop(hop) {
466
96
                    Ok(x) => x,
467
                    Err(e) => {
468
                        let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
469
                        // don't care if receiver goes away
470
                        let _ = done.send(Err(e.into()));
471
                        return Ok(None);
472
                    }
473
                };
474
96
                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
475
96
                    Ok(x) => x,
476
                    Err(e) => {
477
                        let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
478
                        // don't care if receiver goes away
479
                        let _ = done.send(Err(e.into()));
480
                        return Ok(None);
481
                    }
482
                };
483
96
                let circ = match self.reactor.circuits.leg_mut(leg_id) {
484
96
                    Some(x) => x,
485
                    None => {
486
                        let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
487
                        // don't care if receiver goes away
488
                        let _ = done.send(Err(e.into()));
489
                        return Ok(None);
490
                    }
491
                };
492

            
493
96
                let cell = circ.begin_stream(
494
96
                    hop_num,
495
96
                    message,
496
96
                    sender,
497
96
                    rx,
498
96
                    rate_limit_notifier,
499
96
                    drain_rate_requester,
500
96
                    cmd_checker,
501
                )?;
502
96
                Ok(Some(RunOnceCmdInner::BeginStream {
503
96
                    leg: leg_id,
504
96
                    cell,
505
96
                    hop: hop_location,
506
96
                    done,
507
96
                }))
508
            }
509
            #[cfg(feature = "hs-service")]
510
            CtrlMsg::ClosePendingStream {
511
12
                hop,
512
12
                stream_id,
513
12
                message,
514
12
                done,
515
12
            } => Ok(Some(RunOnceCmdInner::CloseStream {
516
12
                hop,
517
12
                sid: stream_id,
518
12
                behav: message,
519
12
                reason: streammap::TerminateReason::ExplicitEnd,
520
12
                done: Some(done),
521
12
            })),
522
            CtrlMsg::FlowCtrlUpdate {
523
                msg,
524
                stream_id,
525
                hop,
526
            } => {
527
                match msg {
528
                    FlowCtrlMsg::Sendme => {
529
                        let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
530
                            Ok(x) => x,
531
                            Err(NoJoinPointError) => {
532
                                // A stream tried to send a stream-level SENDME message to the join point of
533
                                // a tunnel that has never had a join point. Currently in arti, only a
534
                                // `StreamTarget` asks us to send a stream-level SENDME, and this tunnel
535
                                // originally created the `StreamTarget` to begin with. So this is a
536
                                // legitimate bug somewhere in the tunnel code.
537
                                return Err(
538
                                    internal!(
539
                                        "Could not send a stream-level SENDME to a join point on a tunnel without a join point",
540
                                    )
541
                                    .into()
542
                                );
543
                            }
544
                        };
545

            
546
                        // Congestion control decides if we can send stream level SENDMEs or not.
547
                        let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num)
548
                        {
549
                            Some(x) => x,
550
                            None => {
551
                                // The leg/hop has disappeared. This is fine since the stream may have ended
552
                                // and been cleaned up while this `CtrlMsg::SendSendme` message was queued.
553
                                // It is possible that is a bug and this is an incorrect leg/hop number, but
554
                                // it's not currently possible to differentiate between an incorrect leg/hop
555
                                // number and a circuit hop that has been closed.
556
                                debug!(
557
                                    "Could not send a stream-level SENDME on a hop that does not exist. Ignoring."
558
                                );
559
                                return Ok(None);
560
                            }
561
                        };
562

            
563
                        if !sendme_required {
564
                            // Nothing to do, so discard the SENDME.
565
                            return Ok(None);
566
                        }
567

            
568
                        let sendme = Sendme::new_empty();
569
                        let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
570

            
571
                        let cell = SendRelayCell {
572
                            hop: Some(hop_num),
573
                            early: false,
574
                            cell,
575
                        };
576

            
577
                        Ok(Some(RunOnceCmdInner::Send {
578
                            leg: leg_id,
579
                            cell,
580
                            done: None,
581
                        }))
582
                    }
583
                    FlowCtrlMsg::Xon(rate) => Ok(Some(RunOnceCmdInner::MaybeSendXon {
584
                        rate,
585
                        hop,
586
                        stream_id,
587
                    })),
588
                }
589
            }
590
            // TODO(conflux): this should specify which leg to send the msg on
591
            // (currently we send it down the primary leg).
592
            //
593
            // This will involve updating ClientCIrc::send_raw_msg() to take a
594
            // leg id argument (which is a breaking change.
595
            #[cfg(feature = "send-control-msg")]
596
            CtrlMsg::SendMsg { hop, msg, sender } => {
597
                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
598
                    // Don't care if receiver goes away
599
                    let _ = sender.send(Err(bad_api_usage!("Unknown {hop:?}").into()));
600
                    return Ok(None);
601
                };
602

            
603
                let cell = AnyRelayMsgOuter::new(None, msg);
604
                let cell = SendRelayCell {
605
                    hop: Some(hop_num),
606
                    early: false,
607
                    cell,
608
                };
609

            
610
                Ok(Some(RunOnceCmdInner::Send {
611
                    leg: leg_id,
612
                    cell,
613
                    done: Some(sender),
614
                }))
615
            }
616
            // TODO(conflux): this should specify which leg to send the msg on
617
            // (currently we send it down the primary leg)
618
            #[cfg(feature = "send-control-msg")]
619
            CtrlMsg::SendMsgAndInstallHandler {
620
                msg,
621
                handler,
622
                sender,
623
            } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
624
                msg,
625
                handler,
626
                done: sender,
627
            })),
628
            CtrlMsg::FirstHopClockSkew { answer } => {
629
                Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
630
            }
631
            #[cfg(feature = "conflux")]
632
60
            CtrlMsg::LinkCircuits { circuits, answer } => {
633
60
                Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
634
            }
635
        }
636
240
    }
637

            
638
    /// Handle a control command.
639
    #[allow(clippy::needless_pass_by_value)] // Needed when conflux is enabled
640
824
    pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
641
824
        trace!(
642
            tunnel_id = %self.reactor.tunnel_id,
643
            msg = ?msg,
644
            "reactor received control command"
645
        );
646

            
647
824
        match msg {
648
            CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
649
            #[cfg(feature = "hs-common")]
650
            #[allow(unreachable_code)]
651
            CtrlCmd::ExtendVirtual {
652
                cell_crypto,
653
                settings,
654
                done,
655
            } => {
656
                let (outbound, inbound, binding) = cell_crypto;
657

            
658
                // TODO HS: Perhaps this should describe the onion service, or
659
                // describe why the virtual hop was added, or something?
660
                let peer_id = path::HopDetail::Virtual;
661

            
662
                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
663
                    // Don't care if the receiver goes away
664
                    let _ = done.send(Err(tor_error::bad_api_usage!(
665
                        "cannot extend multipath tunnel"
666
                    )
667
                    .into()));
668

            
669
                    return Ok(());
670
                };
671

            
672
                leg.add_hop(peer_id, outbound, inbound, binding, &settings)?;
673
                let _ = done.send(Ok(()));
674

            
675
                Ok(())
676
            }
677
48
            CtrlCmd::ResolveTargetHop { hop, done } => {
678
48
                let _ = done.send(
679
48
                    self.reactor
680
48
                        .resolve_target_hop(hop)
681
48
                        .map_err(|_| crate::util::err::Error::NoSuchHop),
682
                );
683
48
                Ok(())
684
            }
685
            #[cfg(feature = "hs-service")]
686
            CtrlCmd::AwaitStreamRequest {
687
60
                cmd_checker,
688
60
                incoming_sender,
689
60
                hop,
690
60
                done,
691
60
                filter,
692
            } => {
693
60
                let Some((_, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
694
                    let _ = done.send(Err(crate::Error::NoSuchHop));
695
                    return Ok(());
696
                };
697
                // TODO: At some point we might want to add a CtrlCmd for
698
                // de-registering the handler.  See comments on `allow_stream_requests`.
699
60
                let handler = IncomingStreamRequestHandler {
700
60
                    incoming_sender,
701
60
                    cmd_checker,
702
60
                    hop_num: Some(hop_num),
703
60
                    filter,
704
60
                };
705

            
706
60
                let ret = self
707
60
                    .reactor
708
60
                    .cell_handlers
709
60
                    .set_incoming_stream_req_handler(handler);
710
60
                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
711

            
712
60
                Ok(())
713
            }
714
            #[cfg(feature = "hs-service")]
715
            CtrlCmd::GetBindingKey { hop, done } => {
716
                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
717
                    let _ = done.send(Err(tor_error::internal!(
718
                        "Unknown TargetHop when getting binding key"
719
                    )
720
                    .into()));
721
                    return Ok(());
722
                };
723
                let Some(circuit) = self.reactor.circuits.leg(leg_id) else {
724
                    let _ = done.send(Err(tor_error::bad_api_usage!(
725
                        "Unknown circuit id {leg_id} when getting binding key"
726
                    )
727
                    .into()));
728
                    return Ok(());
729
                };
730
                // Get the binding key from the mutable state and send it back.
731
                let key = circuit.mutable().binding_key(hop_num);
732
                let _ = done.send(Ok(key));
733

            
734
                Ok(())
735
            }
736
            #[cfg(test)]
737
            CtrlCmd::AddFakeHop {
738
632
                relay_cell_format,
739
632
                fwd_lasthop,
740
632
                rev_lasthop,
741
632
                peer_id,
742
632
                params,
743
632
                done,
744
            } => {
745
632
                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
746
                    // Don't care if the receiver goes away
747
                    let _ = done.send(Err(tor_error::bad_api_usage!(
748
                        "cannot add fake hop to multipath tunnel"
749
                    )
750
                    .into()));
751

            
752
                    return Ok(());
753
                };
754

            
755
632
                leg.handle_add_fake_hop(
756
632
                    relay_cell_format,
757
632
                    fwd_lasthop,
758
632
                    rev_lasthop,
759
632
                    peer_id,
760
632
                    &params,
761
632
                    done,
762
                );
763

            
764
632
                Ok(())
765
            }
766
            #[cfg(test)]
767
20
            CtrlCmd::QuerySendWindow { hop, leg, done } => {
768
                // Immediately invoked function means that errors will be sent to the channel.
769
30
                let _ = done.send((|| {
770
20
                    let leg = self.reactor.circuits.leg_mut(leg).ok_or_else(|| {
771
                        bad_api_usage!("cannot query send window of non-existent circuit")
772
                    })?;
773

            
774
20
                    let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
775
20
                        "received QuerySendWindow for unknown hop {}",
776
20
                        hop.display()
777
                    ))?;
778

            
779
20
                    Ok(hop.send_window_and_expected_tags())
780
                })());
781

            
782
20
                Ok(())
783
            }
784
            #[cfg(feature = "conflux")]
785
64
            CtrlCmd::ShutdownAndReturnCircuit { answer } => {
786
64
                self.reactor.handle_shutdown_and_return_circuit(answer)
787
            }
788
            #[cfg(feature = "circ-padding-manual")]
789
            CtrlCmd::SetPadder {
790
                hop,
791
                padder,
792
                sender,
793
            } => {
794
                let result = self.reactor.set_padding_at_hop(hop, padder);
795
                let _ = sender.send(result);
796
                Ok(())
797
            }
798
            CtrlCmd::GetTunnelActivity { sender } => {
799
                let count = self.reactor.circuits.tunnel_activity();
800
                let _ = sender.send(count);
801
                Ok(())
802
            }
803
        }
804
824
    }
805
}