1
//! Client-specific types and implementation.
2

            
3
pub mod channel;
4
pub mod circuit;
5
pub mod stream;
6

            
7
#[cfg(feature = "rpc")]
8
pub mod rpc;
9

            
10
#[cfg(feature = "send-control-msg")]
11
pub(crate) mod msghandler;
12
pub(crate) mod reactor;
13

            
14
use derive_deftly::Deftly;
15
use oneshot_fused_workaround as oneshot;
16
use std::net::IpAddr;
17
use std::sync::Arc;
18
use tracing::instrument;
19

            
20
use crate::circuit::UniqId;
21
#[cfg(feature = "circ-padding-manual")]
22
pub use crate::client::circuit::padding::{
23
    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
24
};
25
use crate::client::stream::{
26
    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
27
    StreamReceiver,
28
};
29
use crate::congestion::sendme::StreamRecvWindow;
30
use crate::crypto::cell::HopNum;
31
use crate::memquota::{SpecificAccount as _, StreamAccount};
32
use crate::stream::STREAM_READER_BUFFER;
33
use crate::stream::cmdcheck::AnyCmdChecker;
34
use crate::stream::flow_ctrl::state::StreamRateLimit;
35
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
36
use crate::stream::queue::stream_queue;
37
use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
38
use crate::util::notify::NotifySender;
39
use crate::{Error, ResolveError, Result};
40
use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
41
use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
42

            
43
use postage::watch;
44
use tor_cell::relaycell::StreamId;
45
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
46
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
47
use tor_error::bad_api_usage;
48
use tor_linkspec::OwnedChanTarget;
49
use tor_memquota::derive_deftly_template_HasMemoryCost;
50
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
51

            
52
#[cfg(feature = "hs-service")]
53
use crate::stream::incoming::StreamReqInfo;
54

            
55
#[cfg(feature = "hs-service")]
56
use crate::client::stream::{IncomingCmdChecker, IncomingStream};
57

            
58
#[cfg(feature = "send-control-msg")]
59
use msghandler::{MsgHandler, UserMsgHandler};
60

            
61
/// Handle to use during an ongoing protocol exchange with a circuit's last hop
62
///
63
/// This is obtained from [`ClientTunnel::start_conversation`],
64
/// and used to send messages to the last hop relay.
65
//
66
// TODO(conflux): this should use ClientTunnel, and it should be moved into
67
// the tunnel module.
68
#[cfg(feature = "send-control-msg")]
69
pub struct Conversation<'r>(&'r ClientTunnel);
70

            
71
#[cfg(feature = "send-control-msg")]
72
impl Conversation<'_> {
73
    /// Send a protocol message as part of an ad-hoc exchange
74
    ///
75
    /// Responses are handled by the `UserMsgHandler` set up
76
    /// when the `Conversation` was created.
77
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
78
        self.send_internal(Some(msg), None).await
79
    }
80

            
81
    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
82
    ///
83
    /// The guts of `start_conversation` and `Conversation::send_msg`
84
    pub(crate) async fn send_internal(
85
        &self,
86
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
87
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
88
    ) -> Result<()> {
89
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
90
        let (sender, receiver) = oneshot::channel();
91

            
92
        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
93
            msg,
94
            handler,
95
            sender,
96
        };
97
        self.0
98
            .circ
99
            .control
100
            .unbounded_send(ctrl_msg)
101
            .map_err(|_| Error::CircuitClosed)?;
102

            
103
        receiver.await.map_err(|_| Error::CircuitClosed)?
104
    }
105
}
106

            
107
/// A low-level client tunnel API.
108
///
109
/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
110
///
111
/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
112
/// desired subset of functionality depending on purpose and path size.
113
///
114
/// Some API calls are for single path and some for multi path. A check with the underlying reactor
115
/// is done preventing for instance multi path calls to be used on a single path. Top level types
116
/// should prevent this and thus this object should never be used directly.
117
#[derive(Debug)]
118
#[cfg_attr(
119
    feature = "rpc",
120
    derive(derive_deftly::Deftly),
121
    derive_deftly(tor_rpcbase::templates::Object)
122
)]
123
#[allow(dead_code)] // TODO(conflux)
124
pub struct ClientTunnel {
125
    /// The underlying handle to the reactor.
126
    circ: ClientCirc,
127
}
128

            
129
impl ClientTunnel {
130
    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
131
    /// circuit tunnel.
132
    ///
133
    /// Returns an error if the tunnel has more than one circuit.
134
436
    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
135
436
        if self.circ.is_multi_path {
136
4
            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
137
432
        }
138
432
        Ok(&self.circ)
139
436
    }
140

            
141
    /// Return the channel target of the first hop.
142
    ///
143
    /// Can only be used for single path tunnel.
144
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
145
        self.as_single_circ()?.first_hop()
146
    }
147

            
148
    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
149
    /// receiving or sending.
150
128
    pub fn is_closed(&self) -> bool {
151
128
        self.circ.is_closing()
152
128
    }
153

            
154
    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
155
    /// HopLocation with its id and hop number.
156
    ///
157
    /// Return an error if there is no last hop.
158
    pub fn last_hop(&self) -> Result<TargetHop> {
159
        let uniq_id = self.unique_id();
160
        let hop_num = self
161
            .circ
162
            .mutable
163
            .last_hop_num(uniq_id)?
164
            .ok_or_else(|| bad_api_usage!("no last hop"))?;
165
        Ok((uniq_id, hop_num).into())
166
    }
167

            
168
    /// Return a description of the last hop of the tunnel.
169
    ///
170
    /// Return None if the last hop is virtual; return an error
171
    /// if the tunnel has no circuits, or all of its circuits are zero length.
172
    ///
173
    ///
174
    /// # Panics
175
    ///
176
    /// Panics if there is no last hop.  (This should be impossible outside of
177
    /// the tor-proto crate, but within the crate it's possible to have a
178
    /// circuit with no hops.)
179
    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
180
        self.circ.last_hop_info()
181
    }
182

            
183
    /// Return the number of hops this tunnel as. Fail for a multi path.
184
96
    pub fn n_hops(&self) -> Result<usize> {
185
96
        self.as_single_circ()?.n_hops()
186
96
    }
187

            
188
    /// Return the [`Path`] objects describing all the hops
189
    /// of all the circuits in this tunnel.
190
    pub fn all_paths(&self) -> Vec<Arc<Path>> {
191
        self.circ.all_paths()
192
    }
193

            
194
    /// Return a representation of the Paths for all the circuits in this tunnel,
195
    /// as a map from each circuits' UniqId to its path.
196
    ///
197
    /// This is only exposed for the RPC subsystem, where it is documented that the
198
    /// format of `UniqId` is not stable.
199
    #[cfg(feature = "rpc")]
200
    pub(crate) fn tagged_paths(&self) -> std::collections::HashMap<UniqId, Arc<Path>> {
201
        self.circ.mutable.tagged_paths()
202
    }
203

            
204
    /// Return a process-unique identifier for this tunnel.
205
    ///
206
    /// Returns the reactor unique ID of the main reactor.
207
128
    pub fn unique_id(&self) -> UniqId {
208
128
        self.circ.unique_id()
209
128
    }
210

            
211
    /// Return the time at which this tunnel last had any open streams.
212
    ///
213
    /// Returns `None` if this tunnel has never had any open streams,
214
    /// or if it currently has open streams.
215
    ///
216
    /// NOTE that the Instant returned by this method is not affected by
217
    /// any runtime mocking; it is the output of an ordinary call to
218
    /// `Instant::now()`.
219
    pub async fn disused_since(&self) -> Result<Option<web_time_compat::Instant>> {
220
        self.circ.disused_since().await
221
    }
222

            
223
    /// Return a future that will resolve once the underlying circuit reactor has closed.
224
    ///
225
    /// Note that this method does not itself cause the tunnel to shut down.
226
    pub fn wait_for_close(
227
        self: &Arc<Self>,
228
    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
229
        self.circ.wait_for_close()
230
    }
231

            
232
    /// Single-path tunnel only. Multi path onion service is not supported yet.
233
    ///
234
    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
235
    /// to create new Tor streams, and to return those pending requests in an
236
    /// asynchronous stream.
237
    ///
238
    /// Ordinarily, these requests are rejected.
239
    ///
240
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
241
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
242
    /// an error.
243
    ///
244
    /// After this method has been called on a tunnel, the tunnel is expected
245
    /// to receive requests of this type indefinitely, until it is finally closed.
246
    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
247
    ///
248
    /// Only onion services (and eventually) exit relays should call this
249
    /// method.
250
    //
251
    // TODO: Someday, we might want to allow a stream request handler to be
252
    // un-registered.  However, nothing in the Tor protocol requires it.
253
    //
254
    // Any incoming request handlers installed on the other circuits
255
    // (which are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
256
    // will be discarded (along with the reactor of that circuit)
257
    #[cfg(feature = "hs-service")]
258
    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
259
64
    pub async fn allow_stream_requests<'a, FILT>(
260
64
        self: &Arc<Self>,
261
64
        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
262
64
        hop: TargetHop,
263
64
        filter: FILT,
264
64
    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
265
64
    where
266
64
        FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
267
64
    {
268
        use futures::stream::StreamExt;
269

            
270
        /// The size of the channel receiving IncomingStreamRequestContexts.
271
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
272

            
273
        // TODO(#2002): support onion service conflux
274
64
        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
275
            "Cannot allow stream requests on a multi-path tunnel"
276
4
        ))?;
277

            
278
60
        let time_prov = circ.time_provider.clone();
279
60
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
280
60
        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
281
60
            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
282
60
        let (tx, rx) = oneshot::channel();
283

            
284
60
        circ.command
285
60
            .unbounded_send(CtrlCmd::AwaitStreamRequest {
286
60
                cmd_checker,
287
60
                incoming_sender,
288
60
                hop,
289
60
                done: tx,
290
60
                filter: Box::new(filter),
291
60
            })
292
60
            .map_err(|_| Error::CircuitClosed)?;
293

            
294
        // Check whether the AwaitStreamRequest was processed successfully.
295
60
        rx.await.map_err(|_| Error::CircuitClosed)??;
296

            
297
48
        let allowed_hop_loc: HopLocation = match hop {
298
48
            TargetHop::Hop(loc) => Some(loc),
299
            _ => None,
300
        }
301
48
        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
302

            
303
48
        let tunnel = self.clone();
304
48
        Ok(incoming_receiver.map(move |req_ctx| {
305
            let StreamReqInfo {
306
36
                req,
307
36
                stream_id,
308
36
                hop,
309
36
                receiver,
310
36
                msg_tx,
311
36
                rate_limit_stream,
312
36
                drain_rate_request_stream,
313
36
                memquota,
314
36
                relay_cell_format,
315
36
            } = req_ctx;
316

            
317
            // We already enforce this in handle_incoming_stream_request; this
318
            // assertion is just here to make sure that we don't ever
319
            // accidentally remove or fail to enforce that check, since it is
320
            // security-critical.
321
36
            assert_eq!(Some(allowed_hop_loc), hop);
322

            
323
            // TODO(#2002): figure out what this is going to look like
324
            // for onion services (perhaps we should forbid this function
325
            // from being called on a multipath circuit?)
326
            //
327
            // See also:
328
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
329
36
            let target = StreamTarget {
330
36
                tunnel: Tunnel::Client(Arc::clone(&tunnel)),
331
36
                tx: msg_tx,
332
36
                hop: Some(allowed_hop_loc),
333
36
                stream_id,
334
36
                relay_cell_format,
335
36
                rate_limit_stream,
336
36
            };
337

            
338
            // can be used to build a reader that supports XON/XOFF flow control
339
36
            let xon_xoff_reader_ctrl =
340
36
                XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
341

            
342
36
            let reader = StreamReceiver {
343
36
                target: target.clone(),
344
36
                receiver,
345
36
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
346
36
                ended: false,
347
36
            };
348

            
349
36
            let components = StreamComponents {
350
36
                stream_receiver: reader,
351
36
                target,
352
36
                memquota,
353
36
                xon_xoff_reader_ctrl,
354
36
            };
355

            
356
36
            IncomingStream::new(time_prov.clone(), req, components)
357
36
        }))
358
64
    }
359

            
360
    /// Single and Multi path helper, used to begin a stream.
361
    ///
362
    /// This function allocates a stream ID, and sends the message
363
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
364
    ///
365
    /// The caller will typically want to see the first cell in response,
366
    /// to see whether it is e.g. an END or a CONNECTED.
367
96
    async fn begin_stream_impl(
368
96
        self: &Arc<Self>,
369
96
        begin_msg: AnyRelayMsg,
370
96
        cmd_checker: AnyCmdChecker,
371
144
    ) -> Result<StreamComponents> {
372
        // TODO: Possibly this should take a hop, rather than just
373
        // assuming it's the last hop.
374
96
        let hop = TargetHop::LastHop;
375

            
376
96
        let time_prov = self.circ.time_provider.clone();
377

            
378
96
        let memquota = StreamAccount::new(self.circ.mq_account())?;
379
96
        let (sender, receiver) = stream_queue(
380
            #[cfg(not(feature = "flowctl-cc"))]
381
            STREAM_READER_BUFFER,
382
96
            &memquota,
383
96
            &time_prov,
384
        )?;
385
96
        let (tx, rx) = oneshot::channel();
386
96
        let (msg_tx, msg_rx) =
387
96
            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
388

            
389
96
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
390

            
391
        // A channel for the reactor to request a new drain rate from the reader.
392
        // Typically this notification will be sent after an XOFF is sent so that the reader can
393
        // send us a new drain rate when the stream data queue becomes empty.
394
96
        let mut drain_rate_request_tx = NotifySender::new_typed();
395
96
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
396

            
397
96
        self.circ
398
96
            .control
399
96
            .unbounded_send(CtrlMsg::BeginStream {
400
96
                hop,
401
96
                message: begin_msg,
402
96
                sender,
403
96
                rx: msg_rx,
404
96
                rate_limit_notifier: rate_limit_tx,
405
96
                drain_rate_requester: drain_rate_request_tx,
406
96
                done: tx,
407
96
                cmd_checker,
408
96
            })
409
96
            .map_err(|_| Error::CircuitClosed)?;
410

            
411
96
        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
412

            
413
96
        let target = StreamTarget {
414
96
            tunnel: Tunnel::Client(self.clone()),
415
96
            tx: msg_tx,
416
96
            hop: Some(hop),
417
96
            stream_id,
418
96
            relay_cell_format,
419
96
            rate_limit_stream: rate_limit_rx,
420
96
        };
421

            
422
        // can be used to build a reader that supports XON/XOFF flow control
423
96
        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
424

            
425
96
        let stream_receiver = StreamReceiver {
426
96
            target: target.clone(),
427
96
            receiver,
428
96
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
429
96
            ended: false,
430
96
        };
431

            
432
96
        let components = StreamComponents {
433
96
            stream_receiver,
434
96
            target,
435
96
            memquota,
436
96
            xon_xoff_reader_ctrl,
437
96
        };
438

            
439
96
        Ok(components)
440
96
    }
441

            
442
    /// Install a [`CircuitPadder`] at the listed `hop`.
443
    ///
444
    /// Replaces any previous padder installed at that hop.
445
    #[cfg(feature = "circ-padding-manual")]
446
    pub async fn start_padding_at_hop(
447
        self: &Arc<Self>,
448
        hop: HopLocation,
449
        padder: CircuitPadder,
450
    ) -> Result<()> {
451
        self.circ.set_padder_impl(hop, Some(padder)).await
452
    }
453

            
454
    /// Remove any [`CircuitPadder`] at the listed `hop`.
455
    ///
456
    /// Does nothing if there was not a padder installed there.
457
    #[cfg(feature = "circ-padding-manual")]
458
    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
459
        self.circ.set_padder_impl(hop, None).await
460
    }
461

            
462
    /// Start a DataStream (anonymized connection) to the given
463
    /// address and port, using a BEGIN cell.
464
96
    async fn begin_data_stream(
465
96
        self: &Arc<Self>,
466
96
        msg: AnyRelayMsg,
467
96
        optimistic: bool,
468
144
    ) -> Result<DataStream> {
469
96
        let components = self
470
96
            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
471
96
            .await?;
472

            
473
        let StreamComponents {
474
96
            stream_receiver,
475
96
            target,
476
96
            memquota,
477
96
            xon_xoff_reader_ctrl,
478
96
        } = components;
479

            
480
96
        let mut stream = DataStream::new(
481
96
            self.circ.time_provider.clone(),
482
96
            stream_receiver,
483
96
            xon_xoff_reader_ctrl,
484
96
            target,
485
96
            memquota,
486
        );
487
96
        if !optimistic {
488
84
            stream.wait_for_connection().await?;
489
12
        }
490
96
        Ok(stream)
491
96
    }
492

            
493
    /// Single and multi path helper.
494
    ///
495
    /// Start a stream to the given address and port, using a BEGIN
496
    /// cell.
497
    ///
498
    /// The use of a string for the address is intentional: you should let
499
    /// the remote Tor relay do the hostname lookup for you.
500
    #[instrument(level = "trace", skip_all)]
501
84
    pub async fn begin_stream(
502
84
        self: &Arc<Self>,
503
84
        target: &str,
504
84
        port: u16,
505
84
        parameters: Option<StreamParameters>,
506
126
    ) -> Result<DataStream> {
507
        let parameters = parameters.unwrap_or_default();
508
        let begin_flags = parameters.begin_flags();
509
        let optimistic = parameters.is_optimistic();
510
        let target = if parameters.suppressing_hostname() {
511
            ""
512
        } else {
513
            target
514
        };
515
        let beginmsg = Begin::new(target, port, begin_flags)
516
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
517
        self.begin_data_stream(beginmsg.into(), optimistic).await
518
84
    }
519

            
520
    /// Start a new stream to the last relay in the tunnel, using
521
    /// a BEGIN_DIR cell.
522
18
    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
523
        // Note that we always open begindir connections optimistically.
524
        // Since they are local to a relay that we've already authenticated
525
        // with and built a tunnel to, there should be no additional checks
526
        // we need to perform to see whether the BEGINDIR will succeed.
527
12
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
528
12
            .await
529
12
    }
530

            
531
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
532
    /// in this tunnel.
533
    ///
534
    /// Note that this function does not check for timeouts; that's
535
    /// the caller's responsibility.
536
    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
537
        let resolve_msg = Resolve::new(hostname);
538

            
539
        let resolved_msg = self.try_resolve(resolve_msg).await?;
540

            
541
        resolved_msg
542
            .into_answers()
543
            .into_iter()
544
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
545
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
546
                Ok(_) => None,
547
                Err(e) => Some(Err(e)),
548
            })
549
            .collect()
550
    }
551

            
552
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
553
    /// the last relay on this tunnel.
554
    ///
555
    /// Note that this function does not check for timeouts; that's
556
    /// the caller's responsibility.
557
    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
558
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
559

            
560
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
561

            
562
        resolved_msg
563
            .into_answers()
564
            .into_iter()
565
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
566
                Ok(ResolvedVal::Hostname(v)) => Some(
567
                    String::from_utf8(v)
568
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
569
                ),
570
                Ok(_) => None,
571
                Err(e) => Some(Err(e)),
572
            })
573
            .collect()
574
    }
575

            
576
    /// Send an ad-hoc message to a given hop on the circuit, without expecting
577
    /// a reply.
578
    ///
579
    /// (If you want to handle one or more possible replies, see
580
    /// [`ClientTunnel::start_conversation`].)
581
    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
582
    #[cfg(feature = "send-control-msg")]
583
    pub async fn send_raw_msg(
584
        &self,
585
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
586
        hop: TargetHop,
587
    ) -> Result<()> {
588
        let (sender, receiver) = oneshot::channel();
589
        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
590
        self.circ
591
            .control
592
            .unbounded_send(ctrl_msg)
593
            .map_err(|_| Error::CircuitClosed)?;
594

            
595
        receiver.await.map_err(|_| Error::CircuitClosed)?
596
    }
597

            
598
    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
599
    ///
600
    /// To use this:
601
    ///
602
    ///  0. Create an inter-task channel you'll use to receive
603
    ///     the outcome of your conversation,
604
    ///     and bundle it into a [`UserMsgHandler`].
605
    ///
606
    ///  1. Call `start_conversation`.
607
    ///     This will install a your handler, for incoming messages,
608
    ///     and send the outgoing message (if you provided one).
609
    ///     After that, each message on the circuit
610
    ///     that isn't handled by the core machinery
611
    ///     is passed to your provided `reply_handler`.
612
    ///
613
    ///  2. Possibly call `send_msg` on the [`Conversation`],
614
    ///     from the call site of `start_conversation`,
615
    ///     possibly multiple times, from time to time,
616
    ///     to send further desired messages to the peer.
617
    ///
618
    ///  3. In your [`UserMsgHandler`], process the incoming messages.
619
    ///     You may respond by
620
    ///     sending additional messages
621
    ///     When the protocol exchange is finished,
622
    ///     `UserMsgHandler::handle_msg` should return
623
    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
624
    ///
625
    /// If you don't need the `Conversation` to send followup messages,
626
    /// you may simply drop it,
627
    /// and rely on the responses you get from your handler,
628
    /// on the channel from step 0 above.
629
    /// Your handler will remain installed and able to process incoming messages
630
    /// until it returns `ConversationFinished`.
631
    ///
632
    /// (If you don't want to accept any replies at all, it may be
633
    /// simpler to use [`ClientTunnel::send_raw_msg`].)
634
    ///
635
    /// Note that it is quite possible to use this function to violate the tor
636
    /// protocol; most users of this API will not need to call it.  It is used
637
    /// to implement most of the onion service handshake.
638
    ///
639
    /// # Limitations
640
    ///
641
    /// Only one conversation may be active at any one time,
642
    /// for any one circuit.
643
    /// This generally means that this function should not be called
644
    /// on a tunnel which might be shared with anyone else.
645
    ///
646
    /// Likewise, it is forbidden to try to extend the tunnel,
647
    /// while the conversation is in progress.
648
    ///
649
    /// After the conversation has finished, the tunnel may be extended.
650
    /// Or, `start_conversation` may be called again;
651
    /// but, in that case there will be a gap between the two conversations,
652
    /// during which no `UserMsgHandler` is installed,
653
    /// and unexpected incoming messages would close the tunnel.
654
    ///
655
    /// If these restrictions are violated, the tunnel will be closed with an error.
656
    ///
657
    /// ## Precise definition of the lifetime of a conversation
658
    ///
659
    /// A conversation is in progress from entry to `start_conversation`,
660
    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
661
    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
662
    /// (*Entry* since `handle_msg` is synchronously embedded
663
    /// into the incoming message processing.)
664
    /// So you may start a new conversation as soon as you have the final response
665
    /// via your inter-task channel from (0) above.
666
    ///
667
    /// The lifetime relationship of the [`Conversation`],
668
    /// vs the handler returning `ConversationFinished`
669
    /// is not enforced by the type system.
670
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
671
    // at least while allowing sending followup messages from outside the handler.
672
    #[cfg(feature = "send-control-msg")]
673
    pub async fn start_conversation(
674
        &self,
675
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
676
        reply_handler: impl MsgHandler + Send + 'static,
677
        hop: TargetHop,
678
    ) -> Result<Conversation<'_>> {
679
        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
680
        // the right Leg/Hop with inbound cell.
681
        let (sender, receiver) = oneshot::channel();
682
        self.circ
683
            .command
684
            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
685
            .map_err(|_| Error::CircuitClosed)?;
686
        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
687
        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
688
        let conversation = Conversation(self);
689
        conversation.send_internal(msg, Some(handler)).await?;
690
        Ok(conversation)
691
    }
692

            
693
    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
694
    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
695
    /// returns!).
696
    ///
697
    /// Note that other references to this tunnel may exist. If they do, they will stop working
698
    /// after you call this function.
699
    ///
700
    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
701
    /// close on its own once nothing is using it any more.
702
    // TODO(conflux): This should use the ReactorHandle instead.
703
    pub fn terminate(&self) {
704
        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
705
    }
706

            
707
    /// Helper: Send the resolve message, and read resolved message from
708
    /// resolve stream.
709
    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
710
        let components = self
711
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
712
            .await?;
713

            
714
        let StreamComponents {
715
            stream_receiver,
716
            target: _,
717
            memquota,
718
            xon_xoff_reader_ctrl: _,
719
        } = components;
720

            
721
        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
722
        resolve_stream.read_msg().await
723
    }
724

            
725
    // TODO(conflux)
726
}
727

            
728
// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
729
// has the expected (non-zero) number of hops.
730
impl TryFrom<ClientCirc> for ClientTunnel {
731
    type Error = Error;
732

            
733
364
    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
734
364
        Ok(Self { circ })
735
364
    }
736
}
737

            
738
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
739
/// it represents an error.
740
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
741
    match val {
742
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
743
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
744
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
745
        _ => Ok(val),
746
    }
747
}
748

            
749
/// A precise position in a tunnel.
750
#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
751
#[derive_deftly(HasMemoryCost)]
752
#[non_exhaustive]
753
pub enum HopLocation {
754
    /// A specific position in a tunnel.
755
    Hop((UniqId, HopNum)),
756
    /// The join point of a multi-path tunnel.
757
    JoinPoint,
758
}
759

            
760
/// A position in a tunnel.
761
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
762
#[non_exhaustive]
763
pub enum TargetHop {
764
    /// A specific position in a tunnel.
765
    Hop(HopLocation),
766
    /// The last hop of a tunnel.
767
    ///
768
    /// This should be used only when you don't care about what specific hop is used.
769
    /// Some tunnels may be extended or truncated,
770
    /// which means that the "last hop" may change at any time.
771
    LastHop,
772
}
773

            
774
impl From<(UniqId, HopNum)> for HopLocation {
775
172
    fn from(v: (UniqId, HopNum)) -> Self {
776
172
        HopLocation::Hop(v)
777
172
    }
778
}
779

            
780
impl From<(UniqId, HopNum)> for TargetHop {
781
16
    fn from(v: (UniqId, HopNum)) -> Self {
782
16
        TargetHop::Hop(v.into())
783
16
    }
784
}
785

            
786
impl HopLocation {
787
    /// Return the hop number if not a JointPoint.
788
    pub fn hop_num(&self) -> Option<HopNum> {
789
        match self {
790
            Self::Hop((_, hop_num)) => Some(*hop_num),
791
            Self::JoinPoint => None,
792
        }
793
    }
794
}
795

            
796
impl ClientTunnel {
797
    /// Close the pending stream that owns this StreamTarget, delivering the specified
798
    /// END message (if any)
799
    ///
800
    /// See [`StreamTarget::close_pending`].
801
    #[cfg(feature = "hs-service")]
802
12
    pub(crate) fn close_pending(
803
12
        &self,
804
12
        stream_id: StreamId,
805
12
        hop: Option<HopLocation>,
806
12
        message: crate::stream::CloseStreamBehavior,
807
12
    ) -> Result<oneshot::Receiver<Result<()>>> {
808
12
        let (tx, rx) = oneshot::channel();
809

            
810
12
        self.circ
811
12
            .control
812
12
            .unbounded_send(CtrlMsg::ClosePendingStream {
813
12
                stream_id,
814
12
                hop: hop.expect("missing stream hop for client tunnel"),
815
12
                message,
816
12
                done: tx,
817
12
            })
818
12
            .map_err(|_| Error::CircuitClosed)?;
819

            
820
12
        Ok(rx)
821
12
    }
822

            
823
    /// Request to send a SENDME cell for this stream.
824
    ///
825
    /// See [`StreamTarget::send_sendme`].
826
    pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
827
        self.circ
828
            .control
829
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
830
                msg: FlowCtrlMsg::Sendme,
831
                stream_id,
832
                hop: hop.expect("missing stream hop for client tunnel"),
833
            })
834
            .map_err(|_| Error::CircuitClosed)
835
    }
836

            
837
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
838
    ///
839
    /// See [`StreamTarget::drain_rate_update`].
840
    pub(crate) fn drain_rate_update(
841
        &self,
842
        stream_id: StreamId,
843
        hop: Option<HopLocation>,
844
        rate: XonKbpsEwma,
845
    ) -> Result<()> {
846
        self.circ
847
            .control
848
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
849
                msg: FlowCtrlMsg::Xon(rate),
850
                stream_id,
851
                hop: hop.expect("missing stream hop for client tunnel"),
852
            })
853
            .map_err(|_| Error::CircuitClosed)
854
    }
855
}