1
//! Client-specific types and implementation.
2

            
3
pub mod channel;
4
pub mod circuit;
5
pub mod stream;
6

            
7
#[cfg(feature = "rpc")]
8
pub mod rpc;
9

            
10
#[cfg(feature = "send-control-msg")]
11
pub(crate) mod msghandler;
12
pub(crate) mod reactor;
13

            
14
use derive_deftly::Deftly;
15
use oneshot_fused_workaround as oneshot;
16
use std::net::IpAddr;
17
use std::sync::Arc;
18
use tracing::instrument;
19

            
20
use crate::circuit::UniqId;
21
use crate::circuit::circhop::ReactorStreamComponents;
22
#[cfg(feature = "circ-padding-manual")]
23
pub use crate::client::circuit::padding::{
24
    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
25
};
26
use crate::client::stream::{
27
    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
28
};
29
use crate::congestion::sendme::StreamRecvWindow;
30
use crate::crypto::cell::HopNum;
31
use crate::memquota::{SpecificAccount as _, StreamAccount};
32
use crate::stream::STREAM_READER_BUFFER;
33
use crate::stream::cmdcheck::AnyCmdChecker;
34
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
35
use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamReceiver, StreamTarget, Tunnel};
36
use crate::{Error, ResolveError, Result};
37
use circuit::{ClientCirc, Path};
38
use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
39

            
40
use tor_cell::relaycell::StreamId;
41
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
42
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
43
use tor_error::bad_api_usage;
44
use tor_linkspec::OwnedChanTarget;
45
use tor_memquota::derive_deftly_template_HasMemoryCost;
46
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
47

            
48
#[cfg(feature = "hs-service")]
49
use crate::stream::{IncomingCmdChecker, IncomingStream, StreamReqInfo};
50

            
51
#[cfg(feature = "send-control-msg")]
52
use msghandler::{MsgHandler, UserMsgHandler};
53

            
54
/// Handle to use during an ongoing protocol exchange with a circuit's last hop
55
///
56
/// This is obtained from [`ClientTunnel::start_conversation`],
57
/// and used to send messages to the last hop relay.
58
//
59
// TODO(conflux): this should use ClientTunnel, and it should be moved into
60
// the tunnel module.
61
#[cfg(feature = "send-control-msg")]
62
pub struct Conversation<'r>(&'r ClientTunnel);
63

            
64
#[cfg(feature = "send-control-msg")]
65
impl Conversation<'_> {
66
    /// Send a protocol message as part of an ad-hoc exchange
67
    ///
68
    /// Responses are handled by the `UserMsgHandler` set up
69
    /// when the `Conversation` was created.
70
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
71
        self.send_internal(Some(msg), None).await
72
    }
73

            
74
    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
75
    ///
76
    /// The guts of `start_conversation` and `Conversation::send_msg`
77
    pub(crate) async fn send_internal(
78
        &self,
79
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
80
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
81
    ) -> Result<()> {
82
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
83
        let (sender, receiver) = oneshot::channel();
84

            
85
        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
86
            msg,
87
            handler,
88
            sender,
89
        };
90
        self.0
91
            .circ
92
            .control
93
            .unbounded_send(ctrl_msg)
94
            .map_err(|_| Error::CircuitClosed)?;
95

            
96
        receiver.await.map_err(|_| Error::CircuitClosed)?
97
    }
98
}
99

            
100
/// A low-level client tunnel API.
101
///
102
/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
103
///
104
/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
105
/// desired subset of functionality depending on purpose and path size.
106
///
107
/// Some API calls are for single path and some for multi path. A check with the underlying reactor
108
/// is done preventing for instance multi path calls to be used on a single path. Top level types
109
/// should prevent this and thus this object should never be used directly.
110
#[derive(Debug)]
111
#[cfg_attr(
112
    feature = "rpc",
113
    derive(derive_deftly::Deftly),
114
    derive_deftly(tor_rpcbase::templates::Object)
115
)]
116
#[allow(dead_code)] // TODO(conflux)
117
pub struct ClientTunnel {
118
    /// The underlying handle to the reactor.
119
    circ: ClientCirc,
120
}
121

            
122
impl ClientTunnel {
123
    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
124
    /// circuit tunnel.
125
    ///
126
    /// Returns an error if the tunnel has more than one circuit.
127
436
    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
128
436
        if self.circ.is_multi_path {
129
4
            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
130
432
        }
131
432
        Ok(&self.circ)
132
436
    }
133

            
134
    /// Return the channel target of the first hop.
135
    ///
136
    /// Can only be used for single path tunnel.
137
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
138
        self.as_single_circ()?.first_hop()
139
    }
140

            
141
    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
142
    /// receiving or sending.
143
128
    pub fn is_closed(&self) -> bool {
144
128
        self.circ.is_closing()
145
128
    }
146

            
147
    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
148
    /// HopLocation with its id and hop number.
149
    ///
150
    /// Return an error if there is no last hop.
151
    pub fn last_hop(&self) -> Result<TargetHop> {
152
        let uniq_id = self.unique_id();
153
        let hop_num = self
154
            .circ
155
            .mutable
156
            .last_hop_num(uniq_id)?
157
            .ok_or_else(|| bad_api_usage!("no last hop"))?;
158
        Ok((uniq_id, hop_num).into())
159
    }
160

            
161
    /// Return a description of the last hop of the tunnel.
162
    ///
163
    /// Return None if the last hop is virtual; return an error
164
    /// if the tunnel has no circuits, or all of its circuits are zero length.
165
    ///
166
    ///
167
    /// # Panics
168
    ///
169
    /// Panics if there is no last hop.  (This should be impossible outside of
170
    /// the tor-proto crate, but within the crate it's possible to have a
171
    /// circuit with no hops.)
172
    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
173
        self.circ.last_hop_info()
174
    }
175

            
176
    /// Return the number of hops this tunnel as. Fail for a multi path.
177
96
    pub fn n_hops(&self) -> Result<usize> {
178
96
        self.as_single_circ()?.n_hops()
179
96
    }
180

            
181
    /// Return the [`Path`] objects describing all the hops
182
    /// of all the circuits in this tunnel.
183
    pub fn all_paths(&self) -> Vec<Arc<Path>> {
184
        self.circ.all_paths()
185
    }
186

            
187
    /// Return a representation of the Paths for all the circuits in this tunnel,
188
    /// as a map from each circuits' UniqId to its path.
189
    ///
190
    /// This is only exposed for the RPC subsystem, where it is documented that the
191
    /// format of `UniqId` is not stable.
192
    #[cfg(feature = "rpc")]
193
    pub(crate) fn tagged_paths(&self) -> std::collections::HashMap<UniqId, Arc<Path>> {
194
        self.circ.mutable.tagged_paths()
195
    }
196

            
197
    /// Return a process-unique identifier for this tunnel.
198
    ///
199
    /// Returns the reactor unique ID of the main reactor.
200
128
    pub fn unique_id(&self) -> UniqId {
201
128
        self.circ.unique_id()
202
128
    }
203

            
204
    /// Return the time at which this tunnel last had any open streams.
205
    ///
206
    /// Returns `None` if this tunnel has never had any open streams,
207
    /// or if it currently has open streams.
208
    ///
209
    /// NOTE that the Instant returned by this method is not affected by
210
    /// any runtime mocking; it is the output of an ordinary call to
211
    /// `Instant::now()`.
212
    pub async fn disused_since(&self) -> Result<Option<web_time_compat::Instant>> {
213
        self.circ.disused_since().await
214
    }
215

            
216
    /// Return a future that will resolve once the underlying circuit reactor has closed.
217
    ///
218
    /// Note that this method does not itself cause the tunnel to shut down.
219
    pub fn wait_for_close(
220
        self: &Arc<Self>,
221
    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
222
        self.circ.wait_for_close()
223
    }
224

            
225
    /// Single-path tunnel only. Multi path onion service is not supported yet.
226
    ///
227
    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
228
    /// to create new Tor streams, and to return those pending requests in an
229
    /// asynchronous stream.
230
    ///
231
    /// Ordinarily, these requests are rejected.
232
    ///
233
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
234
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
235
    /// an error.
236
    ///
237
    /// After this method has been called on a tunnel, the tunnel is expected
238
    /// to receive requests of this type indefinitely, until it is finally closed.
239
    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
240
    ///
241
    /// Only onion services (and eventually) exit relays should call this
242
    /// method.
243
    //
244
    // TODO: Someday, we might want to allow a stream request handler to be
245
    // un-registered.  However, nothing in the Tor protocol requires it.
246
    //
247
    // Any incoming request handlers installed on the other circuits
248
    // (which are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
249
    // will be discarded (along with the reactor of that circuit)
250
    #[cfg(feature = "hs-service")]
251
    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
252
64
    pub async fn allow_stream_requests<'a, FILT>(
253
64
        self: &Arc<Self>,
254
64
        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
255
64
        hop: TargetHop,
256
64
        filter: FILT,
257
64
    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
258
64
    where
259
64
        FILT: crate::stream::IncomingStreamRequestFilter + 'a,
260
64
    {
261
        use futures::stream::StreamExt;
262

            
263
        /// The size of the channel receiving IncomingStreamRequestContexts.
264
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
265

            
266
        // TODO(#2002): support onion service conflux
267
64
        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
268
            "Cannot allow stream requests on a multi-path tunnel"
269
4
        ))?;
270

            
271
60
        let time_prov = circ.time_provider.clone();
272
60
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
273
60
        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
274
60
            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
275
60
        let (tx, rx) = oneshot::channel();
276

            
277
60
        circ.command
278
60
            .unbounded_send(CtrlCmd::AwaitStreamRequest {
279
60
                cmd_checker,
280
60
                incoming_sender,
281
60
                hop,
282
60
                done: tx,
283
60
                filter: Box::new(filter),
284
60
            })
285
60
            .map_err(|_| Error::CircuitClosed)?;
286

            
287
        // Check whether the AwaitStreamRequest was processed successfully.
288
60
        rx.await.map_err(|_| Error::CircuitClosed)??;
289

            
290
48
        let allowed_hop_loc: HopLocation = match hop {
291
48
            TargetHop::Hop(loc) => Some(loc),
292
            _ => None,
293
        }
294
48
        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
295

            
296
48
        let tunnel = self.clone();
297
48
        Ok(incoming_receiver.map(move |req_ctx| {
298
            let StreamReqInfo {
299
36
                req,
300
36
                stream_id,
301
36
                hop,
302
                stream_components:
303
                    ReactorStreamComponents {
304
36
                        stream_inbound_rx,
305
36
                        stream_outbound_tx,
306
36
                        rate_limit_rx,
307
36
                        drain_rate_request_rx,
308
                    },
309
36
                memquota,
310
36
                relay_cell_format,
311
36
            } = req_ctx;
312

            
313
            // We already enforce this in handle_incoming_stream_request; this
314
            // assertion is just here to make sure that we don't ever
315
            // accidentally remove or fail to enforce that check, since it is
316
            // security-critical.
317
36
            assert_eq!(Some(allowed_hop_loc), hop);
318

            
319
            // TODO(#2002): figure out what this is going to look like
320
            // for onion services (perhaps we should forbid this function
321
            // from being called on a multipath circuit?)
322
            //
323
            // See also:
324
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
325
36
            let target = StreamTarget {
326
36
                tunnel: Tunnel::Client(Arc::clone(&tunnel)),
327
36
                tx: stream_outbound_tx,
328
36
                hop: Some(allowed_hop_loc),
329
36
                stream_id,
330
36
                relay_cell_format,
331
36
                rate_limit_stream: rate_limit_rx,
332
36
            };
333

            
334
            // can be used to build a reader that supports XON/XOFF flow control
335
36
            let xon_xoff_reader_ctrl =
336
36
                XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
337

            
338
36
            let reader = StreamReceiver {
339
36
                target: target.clone(),
340
36
                receiver: stream_inbound_rx,
341
36
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
342
36
                ended: false,
343
36
            };
344

            
345
36
            let components = StreamComponents {
346
36
                stream_receiver: reader,
347
36
                target,
348
36
                memquota,
349
36
                xon_xoff_reader_ctrl,
350
36
            };
351

            
352
36
            IncomingStream::new(time_prov.clone(), req, components)
353
36
        }))
354
64
    }
355

            
356
    /// Single and Multi path helper, used to begin a stream.
357
    ///
358
    /// This function allocates a stream ID, and sends the message
359
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
360
    ///
361
    /// The caller will typically want to see the first cell in response,
362
    /// to see whether it is e.g. an END or a CONNECTED.
363
96
    async fn begin_stream_impl(
364
96
        self: &Arc<Self>,
365
96
        begin_msg: AnyRelayMsg,
366
96
        cmd_checker: AnyCmdChecker,
367
144
    ) -> Result<StreamComponents> {
368
        // TODO: Possibly this should take a hop, rather than just
369
        // assuming it's the last hop.
370
96
        let hop = TargetHop::LastHop;
371

            
372
96
        let memquota = StreamAccount::new(self.circ.mq_account())?;
373
96
        let (tx, rx) = oneshot::channel();
374

            
375
96
        self.circ
376
96
            .control
377
96
            .unbounded_send(CtrlMsg::BeginStream {
378
96
                hop,
379
96
                message: begin_msg,
380
96
                memquota: memquota.clone(),
381
96
                done: tx,
382
96
                cmd_checker,
383
96
            })
384
96
            .map_err(|_| Error::CircuitClosed)?;
385

            
386
96
        let (stream_id, hop, relay_cell_format, stream_components) =
387
96
            rx.await.map_err(|_| Error::CircuitClosed)??;
388

            
389
        // Destructure so that we don't forget to use any fields.
390
        let ReactorStreamComponents {
391
96
            stream_inbound_rx,
392
96
            stream_outbound_tx,
393
96
            rate_limit_rx,
394
96
            drain_rate_request_rx,
395
96
        } = stream_components;
396

            
397
96
        let target = StreamTarget {
398
96
            tunnel: Tunnel::Client(self.clone()),
399
96
            tx: stream_outbound_tx,
400
96
            hop: Some(hop),
401
96
            stream_id,
402
96
            relay_cell_format,
403
96
            rate_limit_stream: rate_limit_rx,
404
96
        };
405

            
406
        // can be used to build a reader that supports XON/XOFF flow control
407
96
        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
408

            
409
96
        let stream_receiver = StreamReceiver {
410
96
            target: target.clone(),
411
96
            receiver: stream_inbound_rx,
412
96
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
413
96
            ended: false,
414
96
        };
415

            
416
96
        let components = StreamComponents {
417
96
            stream_receiver,
418
96
            target,
419
96
            memquota,
420
96
            xon_xoff_reader_ctrl,
421
96
        };
422

            
423
96
        Ok(components)
424
96
    }
425

            
426
    /// Install a [`CircuitPadder`] at the listed `hop`.
427
    ///
428
    /// Replaces any previous padder installed at that hop.
429
    #[cfg(feature = "circ-padding-manual")]
430
    pub async fn start_padding_at_hop(
431
        self: &Arc<Self>,
432
        hop: HopLocation,
433
        padder: CircuitPadder,
434
    ) -> Result<()> {
435
        self.circ.set_padder_impl(hop, Some(padder)).await
436
    }
437

            
438
    /// Remove any [`CircuitPadder`] at the listed `hop`.
439
    ///
440
    /// Does nothing if there was not a padder installed there.
441
    #[cfg(feature = "circ-padding-manual")]
442
    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
443
        self.circ.set_padder_impl(hop, None).await
444
    }
445

            
446
    /// Start a DataStream (anonymized connection) to the given
447
    /// address and port, using a BEGIN cell.
448
96
    async fn begin_data_stream(
449
96
        self: &Arc<Self>,
450
96
        msg: AnyRelayMsg,
451
96
        optimistic: bool,
452
144
    ) -> Result<DataStream> {
453
96
        let components = self
454
96
            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
455
96
            .await?;
456

            
457
        let StreamComponents {
458
96
            stream_receiver,
459
96
            target,
460
96
            memquota,
461
96
            xon_xoff_reader_ctrl,
462
96
        } = components;
463

            
464
96
        let mut stream = DataStream::new(
465
96
            self.circ.time_provider.clone(),
466
96
            stream_receiver,
467
96
            xon_xoff_reader_ctrl,
468
96
            target,
469
96
            memquota,
470
        );
471
96
        if !optimistic {
472
84
            stream.wait_for_connection().await?;
473
12
        }
474
96
        Ok(stream)
475
96
    }
476

            
477
    /// Single and multi path helper.
478
    ///
479
    /// Start a stream to the given address and port, using a BEGIN
480
    /// cell.
481
    ///
482
    /// The use of a string for the address is intentional: you should let
483
    /// the remote Tor relay do the hostname lookup for you.
484
    #[instrument(level = "trace", skip_all)]
485
84
    pub async fn begin_stream(
486
84
        self: &Arc<Self>,
487
84
        target: &str,
488
84
        port: u16,
489
84
        parameters: Option<StreamParameters>,
490
126
    ) -> Result<DataStream> {
491
        let parameters = parameters.unwrap_or_default();
492
        let begin_flags = parameters.begin_flags();
493
        let optimistic = parameters.is_optimistic();
494
        let target = if parameters.suppressing_hostname() {
495
            ""
496
        } else {
497
            target
498
        };
499
        let beginmsg = Begin::new(target, port, begin_flags)
500
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
501
        self.begin_data_stream(beginmsg.into(), optimistic).await
502
84
    }
503

            
504
    /// Start a new stream to the last relay in the tunnel, using
505
    /// a BEGIN_DIR cell.
506
18
    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
507
        // Note that we always open begindir connections optimistically.
508
        // Since they are local to a relay that we've already authenticated
509
        // with and built a tunnel to, there should be no additional checks
510
        // we need to perform to see whether the BEGINDIR will succeed.
511
12
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
512
12
            .await
513
12
    }
514

            
515
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
516
    /// in this tunnel.
517
    ///
518
    /// Note that this function does not check for timeouts; that's
519
    /// the caller's responsibility.
520
    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
521
        let resolve_msg = Resolve::new(hostname);
522

            
523
        let resolved_msg = self.try_resolve(resolve_msg).await?;
524

            
525
        resolved_msg
526
            .into_answers()
527
            .into_iter()
528
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
529
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
530
                Ok(_) => None,
531
                Err(e) => Some(Err(e)),
532
            })
533
            .collect()
534
    }
535

            
536
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
537
    /// the last relay on this tunnel.
538
    ///
539
    /// Note that this function does not check for timeouts; that's
540
    /// the caller's responsibility.
541
    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
542
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
543

            
544
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
545

            
546
        resolved_msg
547
            .into_answers()
548
            .into_iter()
549
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
550
                Ok(ResolvedVal::Hostname(v)) => Some(
551
                    String::from_utf8(v)
552
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
553
                ),
554
                Ok(_) => None,
555
                Err(e) => Some(Err(e)),
556
            })
557
            .collect()
558
    }
559

            
560
    /// Send an ad-hoc message to a given hop on the circuit, without expecting
561
    /// a reply.
562
    ///
563
    /// (If you want to handle one or more possible replies, see
564
    /// [`ClientTunnel::start_conversation`].)
565
    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
566
    #[cfg(feature = "send-control-msg")]
567
    pub async fn send_raw_msg(
568
        &self,
569
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
570
        hop: TargetHop,
571
    ) -> Result<()> {
572
        let (sender, receiver) = oneshot::channel();
573
        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
574
        self.circ
575
            .control
576
            .unbounded_send(ctrl_msg)
577
            .map_err(|_| Error::CircuitClosed)?;
578

            
579
        receiver.await.map_err(|_| Error::CircuitClosed)?
580
    }
581

            
582
    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
583
    ///
584
    /// To use this:
585
    ///
586
    ///  0. Create an inter-task channel you'll use to receive
587
    ///     the outcome of your conversation,
588
    ///     and bundle it into a [`UserMsgHandler`].
589
    ///
590
    ///  1. Call `start_conversation`.
591
    ///     This will install a your handler, for incoming messages,
592
    ///     and send the outgoing message (if you provided one).
593
    ///     After that, each message on the circuit
594
    ///     that isn't handled by the core machinery
595
    ///     is passed to your provided `reply_handler`.
596
    ///
597
    ///  2. Possibly call `send_msg` on the [`Conversation`],
598
    ///     from the call site of `start_conversation`,
599
    ///     possibly multiple times, from time to time,
600
    ///     to send further desired messages to the peer.
601
    ///
602
    ///  3. In your [`UserMsgHandler`], process the incoming messages.
603
    ///     You may respond by
604
    ///     sending additional messages
605
    ///     When the protocol exchange is finished,
606
    ///     `UserMsgHandler::handle_msg` should return
607
    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
608
    ///
609
    /// If you don't need the `Conversation` to send followup messages,
610
    /// you may simply drop it,
611
    /// and rely on the responses you get from your handler,
612
    /// on the channel from step 0 above.
613
    /// Your handler will remain installed and able to process incoming messages
614
    /// until it returns `ConversationFinished`.
615
    ///
616
    /// (If you don't want to accept any replies at all, it may be
617
    /// simpler to use [`ClientTunnel::send_raw_msg`].)
618
    ///
619
    /// Note that it is quite possible to use this function to violate the tor
620
    /// protocol; most users of this API will not need to call it.  It is used
621
    /// to implement most of the onion service handshake.
622
    ///
623
    /// # Limitations
624
    ///
625
    /// Only one conversation may be active at any one time,
626
    /// for any one circuit.
627
    /// This generally means that this function should not be called
628
    /// on a tunnel which might be shared with anyone else.
629
    ///
630
    /// Likewise, it is forbidden to try to extend the tunnel,
631
    /// while the conversation is in progress.
632
    ///
633
    /// After the conversation has finished, the tunnel may be extended.
634
    /// Or, `start_conversation` may be called again;
635
    /// but, in that case there will be a gap between the two conversations,
636
    /// during which no `UserMsgHandler` is installed,
637
    /// and unexpected incoming messages would close the tunnel.
638
    ///
639
    /// If these restrictions are violated, the tunnel will be closed with an error.
640
    ///
641
    /// ## Precise definition of the lifetime of a conversation
642
    ///
643
    /// A conversation is in progress from entry to `start_conversation`,
644
    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
645
    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
646
    /// (*Entry* since `handle_msg` is synchronously embedded
647
    /// into the incoming message processing.)
648
    /// So you may start a new conversation as soon as you have the final response
649
    /// via your inter-task channel from (0) above.
650
    ///
651
    /// The lifetime relationship of the [`Conversation`],
652
    /// vs the handler returning `ConversationFinished`
653
    /// is not enforced by the type system.
654
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
655
    // at least while allowing sending followup messages from outside the handler.
656
    #[cfg(feature = "send-control-msg")]
657
    pub async fn start_conversation(
658
        &self,
659
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
660
        reply_handler: impl MsgHandler + Send + 'static,
661
        hop: TargetHop,
662
    ) -> Result<Conversation<'_>> {
663
        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
664
        // the right Leg/Hop with inbound cell.
665
        let (sender, receiver) = oneshot::channel();
666
        self.circ
667
            .command
668
            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
669
            .map_err(|_| Error::CircuitClosed)?;
670
        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
671
        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
672
        let conversation = Conversation(self);
673
        conversation.send_internal(msg, Some(handler)).await?;
674
        Ok(conversation)
675
    }
676

            
677
    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
678
    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
679
    /// returns!).
680
    ///
681
    /// Note that other references to this tunnel may exist. If they do, they will stop working
682
    /// after you call this function.
683
    ///
684
    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
685
    /// close on its own once nothing is using it any more.
686
    // TODO(conflux): This should use the ReactorHandle instead.
687
    pub fn terminate(&self) {
688
        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
689
    }
690

            
691
    /// Helper: Send the resolve message, and read resolved message from
692
    /// resolve stream.
693
    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
694
        let components = self
695
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
696
            .await?;
697

            
698
        let StreamComponents {
699
            stream_receiver,
700
            target: _,
701
            memquota,
702
            xon_xoff_reader_ctrl: _,
703
        } = components;
704

            
705
        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
706
        resolve_stream.read_msg().await
707
    }
708

            
709
    // TODO(conflux)
710
}
711

            
712
// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
713
// has the expected (non-zero) number of hops.
714
impl TryFrom<ClientCirc> for ClientTunnel {
715
    type Error = Error;
716

            
717
364
    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
718
364
        Ok(Self { circ })
719
364
    }
720
}
721

            
722
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
723
/// it represents an error.
724
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
725
    match val {
726
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
727
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
728
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
729
        _ => Ok(val),
730
    }
731
}
732

            
733
/// A precise position in a tunnel.
734
#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
735
#[derive_deftly(HasMemoryCost)]
736
#[non_exhaustive]
737
pub enum HopLocation {
738
    /// A specific position in a tunnel.
739
    Hop((UniqId, HopNum)),
740
    /// The join point of a multi-path tunnel.
741
    JoinPoint,
742
}
743

            
744
/// A position in a tunnel.
745
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
746
#[non_exhaustive]
747
pub enum TargetHop {
748
    /// A specific position in a tunnel.
749
    Hop(HopLocation),
750
    /// The last hop of a tunnel.
751
    ///
752
    /// This should be used only when you don't care about what specific hop is used.
753
    /// Some tunnels may be extended or truncated,
754
    /// which means that the "last hop" may change at any time.
755
    LastHop,
756
}
757

            
758
impl From<(UniqId, HopNum)> for HopLocation {
759
172
    fn from(v: (UniqId, HopNum)) -> Self {
760
172
        HopLocation::Hop(v)
761
172
    }
762
}
763

            
764
impl From<(UniqId, HopNum)> for TargetHop {
765
16
    fn from(v: (UniqId, HopNum)) -> Self {
766
16
        TargetHop::Hop(v.into())
767
16
    }
768
}
769

            
770
impl HopLocation {
771
    /// Return the hop number if not a JointPoint.
772
    pub fn hop_num(&self) -> Option<HopNum> {
773
        match self {
774
            Self::Hop((_, hop_num)) => Some(*hop_num),
775
            Self::JoinPoint => None,
776
        }
777
    }
778
}
779

            
780
impl ClientTunnel {
781
    /// Close the pending stream that owns this StreamTarget, delivering the specified
782
    /// END message (if any)
783
    ///
784
    /// See [`StreamTarget::close_pending`].
785
    #[cfg(feature = "hs-service")]
786
12
    pub(crate) fn close_pending(
787
12
        &self,
788
12
        stream_id: StreamId,
789
12
        hop: Option<HopLocation>,
790
12
        message: crate::stream::CloseStreamBehavior,
791
12
    ) -> Result<oneshot::Receiver<Result<()>>> {
792
12
        let (tx, rx) = oneshot::channel();
793

            
794
12
        self.circ
795
12
            .control
796
12
            .unbounded_send(CtrlMsg::ClosePendingStream {
797
12
                stream_id,
798
12
                hop: hop.expect("missing stream hop for client tunnel"),
799
12
                message,
800
12
                done: tx,
801
12
            })
802
12
            .map_err(|_| Error::CircuitClosed)?;
803

            
804
12
        Ok(rx)
805
12
    }
806

            
807
    /// Request to send a SENDME cell for this stream.
808
    ///
809
    /// See [`StreamTarget::send_sendme`].
810
    pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
811
        self.circ
812
            .control
813
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
814
                msg: FlowCtrlMsg::Sendme,
815
                stream_id,
816
                hop: hop.expect("missing stream hop for client tunnel"),
817
            })
818
            .map_err(|_| Error::CircuitClosed)
819
    }
820

            
821
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
822
    ///
823
    /// See [`StreamTarget::drain_rate_update`].
824
    pub(crate) fn drain_rate_update(
825
        &self,
826
        stream_id: StreamId,
827
        hop: Option<HopLocation>,
828
        rate: XonKbpsEwma,
829
    ) -> Result<()> {
830
        self.circ
831
            .control
832
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
833
                msg: FlowCtrlMsg::Xon(rate),
834
                stream_id,
835
                hop: hop.expect("missing stream hop for client tunnel"),
836
            })
837
            .map_err(|_| Error::CircuitClosed)
838
    }
839
}