1
//! Module providing [`CtrlMsg`].
2

            
3
use super::circuit::extender::CircuitExtender;
4
use super::{
5
    CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6
    RunOnceCmdInner, SendRelayCell,
7
};
8
use crate::Result;
9
use crate::circuit::celltypes::CreateResponse;
10
use crate::circuit::circhop::{HopSettings, ReactorStreamComponents};
11
#[cfg(feature = "circ-padding-manual")]
12
use crate::client::circuit::padding;
13
use crate::client::circuit::path;
14
use crate::client::reactor::{NoJoinPointError, NtorClient, ReactorError};
15
use crate::client::{HopLocation, TargetHop};
16
use crate::crypto::binding::CircuitBinding;
17
use crate::crypto::cell::{InboundClientLayer, OutboundClientLayer};
18
use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
19
use crate::memquota::StreamAccount;
20
use crate::stream::cmdcheck::AnyCmdChecker;
21
use crate::streammap;
22
use crate::util::skew::ClockSkew;
23
use crate::util::tunnel_activity::TunnelActivity;
24
#[cfg(test)]
25
use crate::{circuit::UniqId, client::circuit::CircParameters, crypto::cell::HopNum};
26
use tor_cell::chancell::msg::HandshakeType;
27
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
28
use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
29
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId};
30
use tor_error::{Bug, bad_api_usage, internal, into_bad_api_usage};
31
use tracing::{debug, trace};
32
#[cfg(feature = "hs-service")]
33
use {
34
    crate::client::reactor::IncomingStreamRequestHandler,
35
    crate::client::stream::IncomingStreamRequestFilter, crate::stream::incoming::StreamReqSender,
36
};
37

            
38
#[cfg(test)]
39
use tor_cell::relaycell::msg::SendmeTag;
40

            
41
#[cfg(feature = "conflux")]
42
use super::{Circuit, ConfluxLinkResultChannel};
43

            
44
use oneshot_fused_workaround as oneshot;
45

            
46
use crate::crypto::handshake::ntor::NtorPublicKey;
47
use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
48

            
49
use std::result::Result as StdResult;
50

            
51
/// A message telling the reactor to do something.
52
///
53
/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
54
///
55
/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
56
/// cause the reactor to send cells on the reactor's `chan_sender`,
57
/// whereas `CtrlCmd` do not.
58
#[derive(educe::Educe)]
59
#[educe(Debug)]
60
pub(crate) enum CtrlMsg {
61
    /// Create the first hop of this circuit.
62
    Create {
63
        /// A oneshot channel on which we'll receive the creation response.
64
        recv_created: oneshot::Receiver<CreateResponse>,
65
        /// The handshake type to use for the first hop.
66
        handshake: CircuitHandshake,
67
        /// Other parameters relevant for circuit creation.
68
        settings: HopSettings,
69
        /// Oneshot channel to notify on completion.
70
        done: ReactorResultChannel<()>,
71
    },
72
    /// Extend a circuit by one hop, using the ntor handshake.
73
    ExtendNtor {
74
        /// The peer that we're extending to.
75
        ///
76
        /// Used to extend our record of the circuit's path.
77
        peer_id: OwnedChanTarget,
78
        /// The handshake type to use for this hop.
79
        public_key: NtorPublicKey,
80
        /// Information about how to connect to the relay we're extending to.
81
        linkspecs: Vec<EncodedLinkSpec>,
82
        /// Other parameters we are negotiating.
83
        settings: HopSettings,
84
        /// Oneshot channel to notify on completion.
85
        done: ReactorResultChannel<()>,
86
    },
87
    /// Extend a circuit by one hop, using the ntorv3 handshake.
88
    ExtendNtorV3 {
89
        /// The peer that we're extending to.
90
        ///
91
        /// Used to extend our record of the circuit's path.
92
        peer_id: OwnedChanTarget,
93
        /// The handshake type to use for this hop.
94
        public_key: NtorV3PublicKey,
95
        /// Information about how to connect to the relay we're extending to.
96
        linkspecs: Vec<EncodedLinkSpec>,
97
        /// Other parameters we are negotiating.
98
        settings: HopSettings,
99
        /// Oneshot channel to notify on completion.
100
        done: ReactorResultChannel<()>,
101
    },
102
    /// Begin a stream with the provided hop in this circuit.
103
    ///
104
    /// Allocates a stream ID, and sends the provided message to that hop.
105
    BeginStream {
106
        /// The hop number to begin the stream with.
107
        hop: TargetHop,
108
        /// The message to send.
109
        message: AnyRelayMsg,
110
        /// The stream account to use for anything we allocate for the purpose of this stream.
111
        memquota: StreamAccount,
112
        /// Oneshot channel to notify on completion, with the allocated stream ID.
113
        done: ReactorResultChannel<(
114
            StreamId,
115
            HopLocation,
116
            RelayCellFormat,
117
            ReactorStreamComponents,
118
        )>,
119
        /// A `CmdChecker` to keep track of which message types are acceptable.
120
        cmd_checker: AnyCmdChecker,
121
    },
122
    /// Close the specified pending incoming stream, sending the provided END message.
123
    ///
124
    /// A stream is said to be pending if the message for initiating the stream was received but
125
    /// not has not been responded to yet.
126
    ///
127
    /// This should be used by responders for closing pending incoming streams initiated by the
128
    /// other party on the circuit.
129
    #[cfg(feature = "hs-service")]
130
    ClosePendingStream {
131
        /// The hop number the stream is on.
132
        hop: HopLocation,
133
        /// The stream ID to send the END for.
134
        stream_id: StreamId,
135
        /// The END message to send, if any.
136
        message: CloseStreamBehavior,
137
        /// Oneshot channel to notify on completion.
138
        done: ReactorResultChannel<()>,
139
    },
140
    /// Send a given control message on this circuit.
141
    #[cfg(feature = "send-control-msg")]
142
    SendMsg {
143
        /// The hop to receive this message.
144
        hop: TargetHop,
145
        /// The message to send.
146
        msg: AnyRelayMsg,
147
        /// A sender that we use to tell the caller that the message was sent
148
        /// and the handler installed.
149
        sender: oneshot::Sender<Result<()>>,
150
    },
151
    /// Send a given control message on this circuit, and install a control-message handler to
152
    /// receive responses.
153
    #[cfg(feature = "send-control-msg")]
154
    SendMsgAndInstallHandler {
155
        /// The message to send, if any
156
        msg: Option<AnyRelayMsgOuter>,
157
        /// A message handler to install.
158
        ///
159
        /// If this is `None`, there must already be a message handler installed
160
        #[educe(Debug(ignore))]
161
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
162
        /// A sender that we use to tell the caller that the message was sent
163
        /// and the handler installed.
164
        sender: oneshot::Sender<Result<()>>,
165
    },
166
    /// Inform the reactor that there's a flow control update for a given stream.
167
    ///
168
    /// The reactor will decide how to handle this update depending on the type of flow control and
169
    /// the current state of the stream.
170
    FlowCtrlUpdate {
171
        /// The type of flow control update, and any associated metadata.
172
        msg: FlowCtrlMsg,
173
        /// The stream ID that the update is for.
174
        stream_id: StreamId,
175
        /// The hop that the stream is on.
176
        hop: HopLocation,
177
    },
178
    /// Get the clock skew claimed by the first hop of the circuit.
179
    FirstHopClockSkew {
180
        /// Oneshot channel to return the clock skew.
181
        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
182
    },
183
    /// Link the specified circuits into the current tunnel,
184
    /// to form a multi-path tunnel.
185
    #[cfg(feature = "conflux")]
186
    #[allow(unused)] // TODO(conflux)
187
    LinkCircuits {
188
        /// The circuits to link into the tunnel,
189
        #[educe(Debug(ignore))]
190
        circuits: Vec<Circuit>,
191
        /// Oneshot channel to notify sender when all the specified circuits have finished linking,
192
        /// or have failed to link.
193
        ///
194
        /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
195
        /// (see [set construction]).
196
        ///
197
        /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
198
        answer: ConfluxLinkResultChannel,
199
    },
200
}
201

            
202
/// A message telling the reactor to do something.
203
///
204
/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
205
/// never cause cells to sent on the channel,
206
/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
207
/// some of which instruct the reactor to send cells down the channel.
208
#[derive(educe::Educe)]
209
#[educe(Debug)]
210
pub(crate) enum CtrlCmd {
211
    /// Shut down the reactor.
212
    Shutdown,
213
    /// Extend the circuit by one hop, in response to an out-of-band handshake.
214
    ///
215
    /// (This is used for onion services, where the negotiation takes place in
216
    /// INTRODUCE and RENDEZVOUS messages.)
217
    #[cfg(feature = "hs-common")]
218
    ExtendVirtual {
219
        /// The cryptographic algorithms and keys to use when communicating with
220
        /// the newly added hop.
221
        #[educe(Debug(ignore))]
222
        cell_crypto: (
223
            Box<dyn OutboundClientLayer + Send>,
224
            Box<dyn InboundClientLayer + Send>,
225
            Option<CircuitBinding>,
226
        ),
227
        /// A set of parameters to negotiate with this hop.
228
        settings: HopSettings,
229
        /// Oneshot channel to notify on completion.
230
        done: ReactorResultChannel<()>,
231
    },
232
    /// Resolve a given [`TargetHop`] into a precise [`HopLocation`].
233
    ResolveTargetHop {
234
        /// The target hop to resolve.
235
        hop: TargetHop,
236
        /// Oneshot channel to notify on completion.
237
        done: ReactorResultChannel<HopLocation>,
238
    },
239
    /// Begin accepting streams on this circuit.
240
    #[cfg(feature = "hs-service")]
241
    AwaitStreamRequest {
242
        /// A channel for sending information about an incoming stream request.
243
        incoming_sender: StreamReqSender,
244
        /// A `CmdChecker` to keep track of which message types are acceptable.
245
        cmd_checker: AnyCmdChecker,
246
        /// Oneshot channel to notify on completion.
247
        done: ReactorResultChannel<()>,
248
        /// The hop that is allowed to create streams.
249
        hop: TargetHop,
250
        /// A filter used to check requests before passing them on.
251
        #[educe(Debug(ignore))]
252
        #[cfg(feature = "hs-service")]
253
        filter: Box<dyn IncomingStreamRequestFilter>,
254
    },
255
    /// Request the binding key of a target hop.
256
    #[cfg(feature = "hs-service")]
257
    GetBindingKey {
258
        /// The hop for which we want the key.
259
        hop: TargetHop,
260
        /// Oneshot channel to notify on completion.
261
        done: ReactorResultChannel<Option<CircuitBinding>>,
262
    },
263
    /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
264
    #[cfg(test)]
265
    AddFakeHop {
266
        relay_cell_format: RelayCellFormat,
267
        fwd_lasthop: bool,
268
        rev_lasthop: bool,
269
        peer_id: path::HopDetail,
270
        params: CircParameters,
271
        done: ReactorResultChannel<()>,
272
    },
273
    /// (tests only) Get the send window and expected tags for a given hop.
274
    #[cfg(test)]
275
    QuerySendWindow {
276
        hop: HopNum,
277
        leg: UniqId,
278
        done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
279
    },
280
    /// Shut down the reactor, and return the underlying [`Circuit`],
281
    /// if the tunnel is not multi-path.
282
    ///
283
    /// Returns an error if called on a multi-path reactor.
284
    #[cfg(feature = "conflux")]
285
    #[allow(unused)] // TODO(conflux)
286
    ShutdownAndReturnCircuit {
287
        /// Oneshot channel to return the underlying [`Circuit`],
288
        /// or an error if the reactor's tunnel is multi-path.
289
        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
290
    },
291

            
292
    /// Install or remove a [`padding::CircuitPadder`] for a given hop.
293
    ///
294
    /// Any existing `CircuitPadder` at that hop is replaced.
295
    #[cfg(feature = "circ-padding-manual")]
296
    SetPadder {
297
        /// The hop to modify.
298
        hop: HopLocation,
299
        /// The Padder to install, or None to remove any existing padder.
300
        padder: Option<padding::CircuitPadder>,
301
        /// A sender to alert after we've changed the padding.
302
        sender: oneshot::Sender<Result<()>>,
303
    },
304

            
305
    /// Yield the most active [`TunnelActivity`] for any hop on any leg of this tunnel.
306
    GetTunnelActivity {
307
        /// A sender to receive the reply.
308
        sender: oneshot::Sender<TunnelActivity>,
309
    },
310
}
311

            
312
/// A flow control update message.
313
#[derive(Debug)]
314
pub(crate) enum FlowCtrlMsg {
315
    /// Send a SENDME message on this stream.
316
    Sendme,
317
    /// Send an XON message on this stream with the given rate.
318
    Xon(XonKbpsEwma),
319
}
320

            
321
/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
322
///
323
/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
324
/// respectively, are handled.
325
pub(crate) struct ControlHandler<'a> {
326
    /// Reference to the reactor of this
327
    reactor: &'a mut Reactor,
328
}
329

            
330
impl<'a> ControlHandler<'a> {
331
    /// Constructor.
332
1064
    pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
333
1064
        Self { reactor }
334
1064
    }
335

            
336
    /// Handle a control message.
337
240
    pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
338
240
        trace!(
339
            tunnel_id = %self.reactor.tunnel_id,
340
            msg = ?msg,
341
            "reactor received control message"
342
        );
343

            
344
240
        match msg {
345
            // This is handled earlier, since it requires blocking.
346
            CtrlMsg::Create { done, .. } => {
347
                if self.reactor.circuits.len() == 1 {
348
                    // This should've been handled in Reactor::run_once()
349
                    // (ControlHandler::handle_msg() is never called before wait_for_create()).
350
                    debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
351
                    // Don't care if the receiver goes away
352
                    let _ = done.send(Err(tor_error::bad_api_usage!(
353
                        "cannot create first hop twice"
354
                    )
355
                    .into()));
356
                } else {
357
                    // Don't care if the receiver goes away
358
                    let _ = done.send(Err(tor_error::bad_api_usage!(
359
                        "cannot create first hop on multipath tunnel"
360
                    )
361
                    .into()));
362
                }
363

            
364
                Ok(None)
365
            }
366
            CtrlMsg::ExtendNtor {
367
60
                peer_id,
368
60
                public_key,
369
60
                linkspecs,
370
60
                settings,
371
60
                done,
372
            } => {
373
60
                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
374
                    // Don't care if the receiver goes away
375
                    let _ = done.send(Err(tor_error::bad_api_usage!(
376
                        "cannot extend multipath tunnel"
377
                    )
378
                    .into()));
379

            
380
                    return Ok(None);
381
                };
382

            
383
60
                let (extender, cell) = CircuitExtender::<NtorClient>::begin(
384
60
                    peer_id,
385
                    HandshakeType::NTOR,
386
60
                    &public_key,
387
60
                    linkspecs,
388
60
                    settings,
389
60
                    &(),
390
60
                    circ,
391
60
                    done,
392
                )?;
393
60
                self.reactor
394
60
                    .cell_handlers
395
60
                    .set_meta_handler(Box::new(extender))?;
396

            
397
60
                Ok(Some(RunOnceCmdInner::Send {
398
60
                    leg: circ.unique_id(),
399
60
                    cell,
400
60
                    done: None,
401
60
                }))
402
            }
403
            CtrlMsg::ExtendNtorV3 {
404
12
                peer_id,
405
12
                public_key,
406
12
                linkspecs,
407
12
                settings,
408
12
                done,
409
            } => {
410
12
                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
411
                    // Don't care if the receiver goes away
412
                    let _ = done.send(Err(tor_error::bad_api_usage!(
413
                        "cannot extend multipath tunnel"
414
                    )
415
                    .into()));
416

            
417
                    return Ok(None);
418
                };
419

            
420
12
                let client_extensions = settings.circuit_request_extensions()?;
421

            
422
12
                let (extender, cell) = CircuitExtender::<NtorV3Client>::begin(
423
12
                    peer_id,
424
                    HandshakeType::NTOR_V3,
425
12
                    &public_key,
426
12
                    linkspecs,
427
12
                    settings,
428
12
                    &client_extensions,
429
12
                    circ,
430
12
                    done,
431
                )?;
432
12
                self.reactor
433
12
                    .cell_handlers
434
12
                    .set_meta_handler(Box::new(extender))?;
435

            
436
12
                Ok(Some(RunOnceCmdInner::Send {
437
12
                    leg: circ.unique_id(),
438
12
                    cell,
439
12
                    done: None,
440
12
                }))
441
            }
442
            CtrlMsg::BeginStream {
443
96
                hop,
444
96
                message,
445
96
                memquota,
446
96
                done,
447
96
                cmd_checker,
448
            } => {
449
                // If resolving the hop fails,
450
                // we want to report an error back to the initiator and not shut down the reactor.
451
96
                let hop_location = match self.reactor.resolve_target_hop(hop) {
452
96
                    Ok(x) => x,
453
                    Err(e) => {
454
                        let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
455
                        // don't care if receiver goes away
456
                        let _ = done.send(Err(e.into()));
457
                        return Ok(None);
458
                    }
459
                };
460
96
                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
461
96
                    Ok(x) => x,
462
                    Err(e) => {
463
                        let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
464
                        // don't care if receiver goes away
465
                        let _ = done.send(Err(e.into()));
466
                        return Ok(None);
467
                    }
468
                };
469
96
                let circ = match self.reactor.circuits.leg_mut(leg_id) {
470
96
                    Some(x) => x,
471
                    None => {
472
                        let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
473
                        // don't care if receiver goes away
474
                        let _ = done.send(Err(e.into()));
475
                        return Ok(None);
476
                    }
477
                };
478

            
479
96
                let result = circ.begin_stream(
480
96
                    hop_num,
481
96
                    message,
482
96
                    &self.reactor.runtime,
483
96
                    cmd_checker,
484
96
                    &memquota,
485
                );
486

            
487
96
                let (cell, stream_id, stream_components) = match result {
488
96
                    Ok((cell, stream_id, receiver)) => (cell, stream_id, receiver),
489
                    Err(e) => {
490
                        // don't care if receiver goes away.
491
                        let _ = done.send(Err(e.clone()));
492
                        return Err(e);
493
                    }
494
                };
495

            
496
96
                Ok(Some(RunOnceCmdInner::BeginStream {
497
96
                    cell,
498
96
                    stream_id,
499
96
                    hop: hop_location,
500
96
                    leg: leg_id,
501
96
                    stream_components,
502
96
                    done,
503
96
                }))
504
            }
505
            #[cfg(feature = "hs-service")]
506
            CtrlMsg::ClosePendingStream {
507
12
                hop,
508
12
                stream_id,
509
12
                message,
510
12
                done,
511
12
            } => Ok(Some(RunOnceCmdInner::CloseStream {
512
12
                hop,
513
12
                sid: stream_id,
514
12
                behav: message,
515
12
                reason: streammap::TerminateReason::ExplicitEnd,
516
12
                done: Some(done),
517
12
            })),
518
            CtrlMsg::FlowCtrlUpdate {
519
                msg,
520
                stream_id,
521
                hop,
522
            } => {
523
                match msg {
524
                    FlowCtrlMsg::Sendme => {
525
                        let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
526
                            Ok(x) => x,
527
                            Err(NoJoinPointError) => {
528
                                // A stream tried to send a stream-level SENDME message to the join point of
529
                                // a tunnel that has never had a join point. Currently in arti, only a
530
                                // `StreamTarget` asks us to send a stream-level SENDME, and this tunnel
531
                                // originally created the `StreamTarget` to begin with. So this is a
532
                                // legitimate bug somewhere in the tunnel code.
533
                                return Err(
534
                                    internal!(
535
                                        "Could not send a stream-level SENDME to a join point on a tunnel without a join point",
536
                                    )
537
                                    .into()
538
                                );
539
                            }
540
                        };
541

            
542
                        // Congestion control decides if we can send stream level SENDMEs or not.
543
                        let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num)
544
                        {
545
                            Some(x) => x,
546
                            None => {
547
                                // The leg/hop has disappeared. This is fine since the stream may have ended
548
                                // and been cleaned up while this `CtrlMsg::SendSendme` message was queued.
549
                                // It is possible that is a bug and this is an incorrect leg/hop number, but
550
                                // it's not currently possible to differentiate between an incorrect leg/hop
551
                                // number and a circuit hop that has been closed.
552
                                debug!(
553
                                    "Could not send a stream-level SENDME on a hop that does not exist. Ignoring."
554
                                );
555
                                return Ok(None);
556
                            }
557
                        };
558

            
559
                        if !sendme_required {
560
                            // Nothing to do, so discard the SENDME.
561
                            return Ok(None);
562
                        }
563

            
564
                        let sendme = Sendme::new_empty();
565
                        let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
566

            
567
                        let cell = SendRelayCell {
568
                            hop: Some(hop_num),
569
                            early: false,
570
                            cell,
571
                        };
572

            
573
                        Ok(Some(RunOnceCmdInner::Send {
574
                            leg: leg_id,
575
                            cell,
576
                            done: None,
577
                        }))
578
                    }
579
                    FlowCtrlMsg::Xon(rate) => Ok(Some(RunOnceCmdInner::MaybeSendXon {
580
                        rate,
581
                        hop,
582
                        stream_id,
583
                    })),
584
                }
585
            }
586
            // TODO(conflux): this should specify which leg to send the msg on
587
            // (currently we send it down the primary leg).
588
            //
589
            // This will involve updating ClientCIrc::send_raw_msg() to take a
590
            // leg id argument (which is a breaking change.
591
            #[cfg(feature = "send-control-msg")]
592
            CtrlMsg::SendMsg { hop, msg, sender } => {
593
                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
594
                    // Don't care if receiver goes away
595
                    let _ = sender.send(Err(bad_api_usage!("Unknown {hop:?}").into()));
596
                    return Ok(None);
597
                };
598

            
599
                let cell = AnyRelayMsgOuter::new(None, msg);
600
                let cell = SendRelayCell {
601
                    hop: Some(hop_num),
602
                    early: false,
603
                    cell,
604
                };
605

            
606
                Ok(Some(RunOnceCmdInner::Send {
607
                    leg: leg_id,
608
                    cell,
609
                    done: Some(sender),
610
                }))
611
            }
612
            // TODO(conflux): this should specify which leg to send the msg on
613
            // (currently we send it down the primary leg)
614
            #[cfg(feature = "send-control-msg")]
615
            CtrlMsg::SendMsgAndInstallHandler {
616
                msg,
617
                handler,
618
                sender,
619
            } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
620
                msg,
621
                handler,
622
                done: sender,
623
            })),
624
            CtrlMsg::FirstHopClockSkew { answer } => {
625
                Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
626
            }
627
            #[cfg(feature = "conflux")]
628
60
            CtrlMsg::LinkCircuits { circuits, answer } => {
629
60
                Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
630
            }
631
        }
632
240
    }
633

            
634
    /// Handle a control command.
635
    #[allow(clippy::needless_pass_by_value)] // Needed when conflux is enabled
636
824
    pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
637
824
        trace!(
638
            tunnel_id = %self.reactor.tunnel_id,
639
            msg = ?msg,
640
            "reactor received control command"
641
        );
642

            
643
824
        match msg {
644
            CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
645
            #[cfg(feature = "hs-common")]
646
            #[allow(unreachable_code)]
647
            CtrlCmd::ExtendVirtual {
648
                cell_crypto,
649
                settings,
650
                done,
651
            } => {
652
                let (outbound, inbound, binding) = cell_crypto;
653

            
654
                // TODO HS: Perhaps this should describe the onion service, or
655
                // describe why the virtual hop was added, or something?
656
                let peer_id = path::HopDetail::Virtual;
657

            
658
                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
659
                    // Don't care if the receiver goes away
660
                    let _ = done.send(Err(tor_error::bad_api_usage!(
661
                        "cannot extend multipath tunnel"
662
                    )
663
                    .into()));
664

            
665
                    return Ok(());
666
                };
667

            
668
                leg.add_hop(peer_id, outbound, inbound, binding, &settings)?;
669
                let _ = done.send(Ok(()));
670

            
671
                Ok(())
672
            }
673
48
            CtrlCmd::ResolveTargetHop { hop, done } => {
674
48
                let _ = done.send(
675
48
                    self.reactor
676
48
                        .resolve_target_hop(hop)
677
48
                        .map_err(|_| crate::util::err::Error::NoSuchHop),
678
                );
679
48
                Ok(())
680
            }
681
            #[cfg(feature = "hs-service")]
682
            CtrlCmd::AwaitStreamRequest {
683
60
                cmd_checker,
684
60
                incoming_sender,
685
60
                hop,
686
60
                done,
687
60
                filter,
688
            } => {
689
60
                let Some((_, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
690
                    let _ = done.send(Err(crate::Error::NoSuchHop));
691
                    return Ok(());
692
                };
693
                // TODO: At some point we might want to add a CtrlCmd for
694
                // de-registering the handler.  See comments on `allow_stream_requests`.
695
60
                let handler = IncomingStreamRequestHandler {
696
60
                    incoming_sender,
697
60
                    cmd_checker,
698
60
                    hop_num: Some(hop_num),
699
60
                    filter,
700
60
                };
701

            
702
60
                let ret = self
703
60
                    .reactor
704
60
                    .cell_handlers
705
60
                    .set_incoming_stream_req_handler(handler);
706
60
                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
707

            
708
60
                Ok(())
709
            }
710
            #[cfg(feature = "hs-service")]
711
            CtrlCmd::GetBindingKey { hop, done } => {
712
                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
713
                    let _ = done.send(Err(tor_error::internal!(
714
                        "Unknown TargetHop when getting binding key"
715
                    )
716
                    .into()));
717
                    return Ok(());
718
                };
719
                let Some(circuit) = self.reactor.circuits.leg(leg_id) else {
720
                    let _ = done.send(Err(tor_error::bad_api_usage!(
721
                        "Unknown circuit id {leg_id} when getting binding key"
722
                    )
723
                    .into()));
724
                    return Ok(());
725
                };
726
                // Get the binding key from the mutable state and send it back.
727
                let key = circuit.mutable().binding_key(hop_num);
728
                let _ = done.send(Ok(key));
729

            
730
                Ok(())
731
            }
732
            #[cfg(test)]
733
            CtrlCmd::AddFakeHop {
734
632
                relay_cell_format,
735
632
                fwd_lasthop,
736
632
                rev_lasthop,
737
632
                peer_id,
738
632
                params,
739
632
                done,
740
            } => {
741
632
                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
742
                    // Don't care if the receiver goes away
743
                    let _ = done.send(Err(tor_error::bad_api_usage!(
744
                        "cannot add fake hop to multipath tunnel"
745
                    )
746
                    .into()));
747

            
748
                    return Ok(());
749
                };
750

            
751
632
                leg.handle_add_fake_hop(
752
632
                    relay_cell_format,
753
632
                    fwd_lasthop,
754
632
                    rev_lasthop,
755
632
                    peer_id,
756
632
                    &params,
757
632
                    done,
758
                );
759

            
760
632
                Ok(())
761
            }
762
            #[cfg(test)]
763
20
            CtrlCmd::QuerySendWindow { hop, leg, done } => {
764
                // Immediately invoked function means that errors will be sent to the channel.
765
30
                let _ = done.send((|| {
766
20
                    let leg = self.reactor.circuits.leg_mut(leg).ok_or_else(|| {
767
                        bad_api_usage!("cannot query send window of non-existent circuit")
768
                    })?;
769

            
770
20
                    let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
771
                        "received QuerySendWindow for unknown hop {}",
772
20
                        hop.display()
773
                    ))?;
774

            
775
20
                    Ok(hop.send_window_and_expected_tags())
776
                })());
777

            
778
20
                Ok(())
779
            }
780
            #[cfg(feature = "conflux")]
781
64
            CtrlCmd::ShutdownAndReturnCircuit { answer } => {
782
64
                self.reactor.handle_shutdown_and_return_circuit(answer)
783
            }
784
            #[cfg(feature = "circ-padding-manual")]
785
            CtrlCmd::SetPadder {
786
                hop,
787
                padder,
788
                sender,
789
            } => {
790
                let result = self.reactor.set_padding_at_hop(hop, padder);
791
                let _ = sender.send(result);
792
                Ok(())
793
            }
794
            CtrlCmd::GetTunnelActivity { sender } => {
795
                let count = self.reactor.circuits.tunnel_activity();
796
                let _ = sender.send(count);
797
                Ok(())
798
            }
799
        }
800
824
    }
801
}