1
//! Client-specific types and implementation.
2

            
3
pub mod channel;
4
pub mod circuit;
5
pub mod stream;
6

            
7
#[cfg(feature = "send-control-msg")]
8
pub(crate) mod msghandler;
9
pub(crate) mod reactor;
10

            
11
use derive_deftly::Deftly;
12
use oneshot_fused_workaround as oneshot;
13
use std::net::IpAddr;
14
use std::sync::Arc;
15
use tracing::instrument;
16

            
17
use crate::circuit::UniqId;
18
#[cfg(feature = "circ-padding-manual")]
19
pub use crate::client::circuit::padding::{
20
    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
21
};
22
use crate::client::stream::{
23
    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
24
    StreamReceiver,
25
};
26
use crate::congestion::sendme::StreamRecvWindow;
27
use crate::crypto::cell::HopNum;
28
use crate::memquota::{SpecificAccount as _, StreamAccount};
29
use crate::stream::STREAM_READER_BUFFER;
30
use crate::stream::cmdcheck::AnyCmdChecker;
31
use crate::stream::flow_ctrl::state::StreamRateLimit;
32
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
33
use crate::stream::queue::stream_queue;
34
use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
35
use crate::util::notify::NotifySender;
36
use crate::{Error, ResolveError, Result};
37
use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
38
use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
39

            
40
use postage::watch;
41
use tor_cell::relaycell::StreamId;
42
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
43
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
44
use tor_error::bad_api_usage;
45
use tor_linkspec::OwnedChanTarget;
46
use tor_memquota::derive_deftly_template_HasMemoryCost;
47
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
48

            
49
#[cfg(feature = "hs-service")]
50
use crate::stream::incoming::StreamReqInfo;
51

            
52
#[cfg(feature = "hs-service")]
53
use crate::client::stream::{IncomingCmdChecker, IncomingStream};
54

            
55
#[cfg(feature = "send-control-msg")]
56
use msghandler::{MsgHandler, UserMsgHandler};
57

            
58
/// Handle to use during an ongoing protocol exchange with a circuit's last hop
59
///
60
/// This is obtained from [`ClientTunnel::start_conversation`],
61
/// and used to send messages to the last hop relay.
62
//
63
// TODO(conflux): this should use ClientTunnel, and it should be moved into
64
// the tunnel module.
65
#[cfg(feature = "send-control-msg")]
66
pub struct Conversation<'r>(&'r ClientTunnel);
67

            
68
#[cfg(feature = "send-control-msg")]
69
impl Conversation<'_> {
70
    /// Send a protocol message as part of an ad-hoc exchange
71
    ///
72
    /// Responses are handled by the `UserMsgHandler` set up
73
    /// when the `Conversation` was created.
74
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
75
        self.send_internal(Some(msg), None).await
76
    }
77

            
78
    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
79
    ///
80
    /// The guts of `start_conversation` and `Conversation::send_msg`
81
    pub(crate) async fn send_internal(
82
        &self,
83
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
84
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
85
    ) -> Result<()> {
86
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
87
        let (sender, receiver) = oneshot::channel();
88

            
89
        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
90
            msg,
91
            handler,
92
            sender,
93
        };
94
        self.0
95
            .circ
96
            .control
97
            .unbounded_send(ctrl_msg)
98
            .map_err(|_| Error::CircuitClosed)?;
99

            
100
        receiver.await.map_err(|_| Error::CircuitClosed)?
101
    }
102
}
103

            
104
/// A low-level client tunnel API.
105
///
106
/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
107
///
108
/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
109
/// desired subset of functionality depending on purpose and path size.
110
///
111
/// Some API calls are for single path and some for multi path. A check with the underlying reactor
112
/// is done preventing for instance multi path calls to be used on a single path. Top level types
113
/// should prevent this and thus this object should never be used directly.
114
#[derive(Debug)]
115
#[allow(dead_code)] // TODO(conflux)
116
pub struct ClientTunnel {
117
    /// The underlying handle to the reactor.
118
    circ: ClientCirc,
119
}
120

            
121
impl ClientTunnel {
122
    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
123
    /// circuit tunnel.
124
    ///
125
    /// Returns an error if the tunnel has more than one circuit.
126
436
    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
127
436
        if self.circ.is_multi_path {
128
4
            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
129
432
        }
130
432
        Ok(&self.circ)
131
436
    }
132

            
133
    /// Return the channel target of the first hop.
134
    ///
135
    /// Can only be used for single path tunnel.
136
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
137
        self.as_single_circ()?.first_hop()
138
    }
139

            
140
    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
141
    /// receiving or sending.
142
128
    pub fn is_closed(&self) -> bool {
143
128
        self.circ.is_closing()
144
128
    }
145

            
146
    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
147
    /// HopLocation with its id and hop number.
148
    ///
149
    /// Return an error if there is no last hop.
150
    pub fn last_hop(&self) -> Result<TargetHop> {
151
        let uniq_id = self.unique_id();
152
        let hop_num = self
153
            .circ
154
            .mutable
155
            .last_hop_num(uniq_id)?
156
            .ok_or_else(|| bad_api_usage!("no last hop"))?;
157
        Ok((uniq_id, hop_num).into())
158
    }
159

            
160
    /// Return a description of the last hop of the tunnel.
161
    ///
162
    /// Return None if the last hop is virtual; return an error
163
    /// if the tunnel has no circuits, or all of its circuits are zero length.
164
    ///
165
    ///
166
    /// # Panics
167
    ///
168
    /// Panics if there is no last hop.  (This should be impossible outside of
169
    /// the tor-proto crate, but within the crate it's possible to have a
170
    /// circuit with no hops.)
171
    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
172
        self.circ.last_hop_info()
173
    }
174

            
175
    /// Return the number of hops this tunnel as. Fail for a multi path.
176
96
    pub fn n_hops(&self) -> Result<usize> {
177
96
        self.as_single_circ()?.n_hops()
178
96
    }
179

            
180
    /// Return the [`Path`] objects describing all the hops
181
    /// of all the circuits in this tunnel.
182
    pub fn all_paths(&self) -> Vec<Arc<Path>> {
183
        self.circ.all_paths()
184
    }
185

            
186
    /// Return a process-unique identifier for this tunnel.
187
    ///
188
    /// Returns the reactor unique ID of the main reactor.
189
128
    pub fn unique_id(&self) -> UniqId {
190
128
        self.circ.unique_id()
191
128
    }
192

            
193
    /// Return the time at which this tunnel last had any open streams.
194
    ///
195
    /// Returns `None` if this tunnel has never had any open streams,
196
    /// or if it currently has open streams.
197
    ///
198
    /// NOTE that the Instant returned by this method is not affected by
199
    /// any runtime mocking; it is the output of an ordinary call to
200
    /// `Instant::now()`.
201
    pub async fn disused_since(&self) -> Result<Option<std::time::Instant>> {
202
        self.circ.disused_since().await
203
    }
204

            
205
    /// Return a future that will resolve once the underlying circuit reactor has closed.
206
    ///
207
    /// Note that this method does not itself cause the tunnel to shut down.
208
    pub fn wait_for_close(
209
        self: &Arc<Self>,
210
    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
211
        self.circ.wait_for_close()
212
    }
213

            
214
    /// Single-path tunnel only. Multi path onion service is not supported yet.
215
    ///
216
    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
217
    /// to create new Tor streams, and to return those pending requests in an
218
    /// asynchronous stream.
219
    ///
220
    /// Ordinarily, these requests are rejected.
221
    ///
222
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
223
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
224
    /// an error.
225
    ///
226
    /// After this method has been called on a tunnel, the tunnel is expected
227
    /// to receive requests of this type indefinitely, until it is finally closed.
228
    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
229
    ///
230
    /// Only onion services (and eventually) exit relays should call this
231
    /// method.
232
    //
233
    // TODO: Someday, we might want to allow a stream request handler to be
234
    // un-registered.  However, nothing in the Tor protocol requires it.
235
    //
236
    // Any incoming request handlers installed on the other circuits
237
    // (which are are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
238
    // will be discarded (along with the reactor of that circuit)
239
    #[cfg(feature = "hs-service")]
240
    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
241
64
    pub async fn allow_stream_requests<'a, FILT>(
242
64
        self: &Arc<Self>,
243
64
        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
244
64
        hop: TargetHop,
245
64
        filter: FILT,
246
64
    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
247
64
    where
248
64
        FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
249
64
    {
250
        use futures::stream::StreamExt;
251

            
252
        /// The size of the channel receiving IncomingStreamRequestContexts.
253
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
254

            
255
        // TODO(#2002): support onion service conflux
256
64
        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
257
            "Cannot allow stream requests on a multi-path tunnel"
258
4
        ))?;
259

            
260
60
        let time_prov = circ.time_provider.clone();
261
60
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
262
60
        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
263
60
            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
264
60
        let (tx, rx) = oneshot::channel();
265

            
266
60
        circ.command
267
60
            .unbounded_send(CtrlCmd::AwaitStreamRequest {
268
60
                cmd_checker,
269
60
                incoming_sender,
270
60
                hop,
271
60
                done: tx,
272
60
                filter: Box::new(filter),
273
60
            })
274
60
            .map_err(|_| Error::CircuitClosed)?;
275

            
276
        // Check whether the AwaitStreamRequest was processed successfully.
277
60
        rx.await.map_err(|_| Error::CircuitClosed)??;
278

            
279
48
        let allowed_hop_loc: HopLocation = match hop {
280
48
            TargetHop::Hop(loc) => Some(loc),
281
            _ => None,
282
        }
283
48
        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
284

            
285
48
        let tunnel = self.clone();
286
48
        Ok(incoming_receiver.map(move |req_ctx| {
287
            let StreamReqInfo {
288
36
                req,
289
36
                stream_id,
290
36
                hop,
291
36
                receiver,
292
36
                msg_tx,
293
36
                rate_limit_stream,
294
36
                drain_rate_request_stream,
295
36
                memquota,
296
36
                relay_cell_format,
297
36
            } = req_ctx;
298

            
299
            // We already enforce this in handle_incoming_stream_request; this
300
            // assertion is just here to make sure that we don't ever
301
            // accidentally remove or fail to enforce that check, since it is
302
            // security-critical.
303
36
            assert_eq!(Some(allowed_hop_loc), hop);
304

            
305
            // TODO(#2002): figure out what this is going to look like
306
            // for onion services (perhaps we should forbid this function
307
            // from being called on a multipath circuit?)
308
            //
309
            // See also:
310
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
311
36
            let target = StreamTarget {
312
36
                tunnel: Tunnel::Client(Arc::clone(&tunnel)),
313
36
                tx: msg_tx,
314
36
                hop: Some(allowed_hop_loc),
315
36
                stream_id,
316
36
                relay_cell_format,
317
36
                rate_limit_stream,
318
36
            };
319

            
320
            // can be used to build a reader that supports XON/XOFF flow control
321
36
            let xon_xoff_reader_ctrl =
322
36
                XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
323

            
324
36
            let reader = StreamReceiver {
325
36
                target: target.clone(),
326
36
                receiver,
327
36
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
328
36
                ended: false,
329
36
            };
330

            
331
36
            let components = StreamComponents {
332
36
                stream_receiver: reader,
333
36
                target,
334
36
                memquota,
335
36
                xon_xoff_reader_ctrl,
336
36
            };
337

            
338
36
            IncomingStream::new(time_prov.clone(), req, components)
339
36
        }))
340
64
    }
341

            
342
    /// Single and Multi path helper, used to begin a stream.
343
    ///
344
    /// This function allocates a stream ID, and sends the message
345
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
346
    ///
347
    /// The caller will typically want to see the first cell in response,
348
    /// to see whether it is e.g. an END or a CONNECTED.
349
96
    async fn begin_stream_impl(
350
96
        self: &Arc<Self>,
351
96
        begin_msg: AnyRelayMsg,
352
96
        cmd_checker: AnyCmdChecker,
353
144
    ) -> Result<StreamComponents> {
354
        // TODO: Possibly this should take a hop, rather than just
355
        // assuming it's the last hop.
356
96
        let hop = TargetHop::LastHop;
357

            
358
96
        let time_prov = self.circ.time_provider.clone();
359

            
360
96
        let memquota = StreamAccount::new(self.circ.mq_account())?;
361
96
        let (sender, receiver) = stream_queue(
362
            #[cfg(not(feature = "flowctl-cc"))]
363
            STREAM_READER_BUFFER,
364
96
            &memquota,
365
96
            &time_prov,
366
        )?;
367
96
        let (tx, rx) = oneshot::channel();
368
96
        let (msg_tx, msg_rx) =
369
96
            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
370

            
371
96
        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
372

            
373
        // A channel for the reactor to request a new drain rate from the reader.
374
        // Typically this notification will be sent after an XOFF is sent so that the reader can
375
        // send us a new drain rate when the stream data queue becomes empty.
376
96
        let mut drain_rate_request_tx = NotifySender::new_typed();
377
96
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
378

            
379
96
        self.circ
380
96
            .control
381
96
            .unbounded_send(CtrlMsg::BeginStream {
382
96
                hop,
383
96
                message: begin_msg,
384
96
                sender,
385
96
                rx: msg_rx,
386
96
                rate_limit_notifier: rate_limit_tx,
387
96
                drain_rate_requester: drain_rate_request_tx,
388
96
                done: tx,
389
96
                cmd_checker,
390
96
            })
391
96
            .map_err(|_| Error::CircuitClosed)?;
392

            
393
96
        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
394

            
395
96
        let target = StreamTarget {
396
96
            tunnel: Tunnel::Client(self.clone()),
397
96
            tx: msg_tx,
398
96
            hop: Some(hop),
399
96
            stream_id,
400
96
            relay_cell_format,
401
96
            rate_limit_stream: rate_limit_rx,
402
96
        };
403

            
404
        // can be used to build a reader that supports XON/XOFF flow control
405
96
        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
406

            
407
96
        let stream_receiver = StreamReceiver {
408
96
            target: target.clone(),
409
96
            receiver,
410
96
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
411
96
            ended: false,
412
96
        };
413

            
414
96
        let components = StreamComponents {
415
96
            stream_receiver,
416
96
            target,
417
96
            memquota,
418
96
            xon_xoff_reader_ctrl,
419
96
        };
420

            
421
96
        Ok(components)
422
96
    }
423

            
424
    /// Install a [`CircuitPadder`] at the listed `hop`.
425
    ///
426
    /// Replaces any previous padder installed at that hop.
427
    #[cfg(feature = "circ-padding-manual")]
428
    pub async fn start_padding_at_hop(
429
        self: &Arc<Self>,
430
        hop: HopLocation,
431
        padder: CircuitPadder,
432
    ) -> Result<()> {
433
        self.circ.set_padder_impl(hop, Some(padder)).await
434
    }
435

            
436
    /// Remove any [`CircuitPadder`] at the listed `hop`.
437
    ///
438
    /// Does nothing if there was not a padder installed there.
439
    #[cfg(feature = "circ-padding-manual")]
440
    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
441
        self.circ.set_padder_impl(hop, None).await
442
    }
443

            
444
    /// Start a DataStream (anonymized connection) to the given
445
    /// address and port, using a BEGIN cell.
446
96
    async fn begin_data_stream(
447
96
        self: &Arc<Self>,
448
96
        msg: AnyRelayMsg,
449
96
        optimistic: bool,
450
144
    ) -> Result<DataStream> {
451
96
        let components = self
452
96
            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
453
96
            .await?;
454

            
455
        let StreamComponents {
456
96
            stream_receiver,
457
96
            target,
458
96
            memquota,
459
96
            xon_xoff_reader_ctrl,
460
96
        } = components;
461

            
462
96
        let mut stream = DataStream::new(
463
96
            self.circ.time_provider.clone(),
464
96
            stream_receiver,
465
96
            xon_xoff_reader_ctrl,
466
96
            target,
467
96
            memquota,
468
        );
469
96
        if !optimistic {
470
84
            stream.wait_for_connection().await?;
471
12
        }
472
96
        Ok(stream)
473
96
    }
474

            
475
    /// Single and multi path helper.
476
    ///
477
    /// Start a stream to the given address and port, using a BEGIN
478
    /// cell.
479
    ///
480
    /// The use of a string for the address is intentional: you should let
481
    /// the remote Tor relay do the hostname lookup for you.
482
    #[instrument(level = "trace", skip_all)]
483
84
    pub async fn begin_stream(
484
84
        self: &Arc<Self>,
485
84
        target: &str,
486
84
        port: u16,
487
84
        parameters: Option<StreamParameters>,
488
126
    ) -> Result<DataStream> {
489
        let parameters = parameters.unwrap_or_default();
490
        let begin_flags = parameters.begin_flags();
491
        let optimistic = parameters.is_optimistic();
492
        let target = if parameters.suppressing_hostname() {
493
            ""
494
        } else {
495
            target
496
        };
497
        let beginmsg = Begin::new(target, port, begin_flags)
498
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
499
        self.begin_data_stream(beginmsg.into(), optimistic).await
500
84
    }
501

            
502
    /// Start a new stream to the last relay in the tunnel, using
503
    /// a BEGIN_DIR cell.
504
18
    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
505
        // Note that we always open begindir connections optimistically.
506
        // Since they are local to a relay that we've already authenticated
507
        // with and built a tunnel to, there should be no additional checks
508
        // we need to perform to see whether the BEGINDIR will succeed.
509
12
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
510
12
            .await
511
12
    }
512

            
513
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
514
    /// in this tunnel.
515
    ///
516
    /// Note that this function does not check for timeouts; that's
517
    /// the caller's responsibility.
518
    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
519
        let resolve_msg = Resolve::new(hostname);
520

            
521
        let resolved_msg = self.try_resolve(resolve_msg).await?;
522

            
523
        resolved_msg
524
            .into_answers()
525
            .into_iter()
526
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
527
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
528
                Ok(_) => None,
529
                Err(e) => Some(Err(e)),
530
            })
531
            .collect()
532
    }
533

            
534
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
535
    /// the last relay on this tunnel.
536
    ///
537
    /// Note that this function does not check for timeouts; that's
538
    /// the caller's responsibility.
539
    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
540
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
541

            
542
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
543

            
544
        resolved_msg
545
            .into_answers()
546
            .into_iter()
547
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
548
                Ok(ResolvedVal::Hostname(v)) => Some(
549
                    String::from_utf8(v)
550
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
551
                ),
552
                Ok(_) => None,
553
                Err(e) => Some(Err(e)),
554
            })
555
            .collect()
556
    }
557

            
558
    /// Send an ad-hoc message to a given hop on the circuit, without expecting
559
    /// a reply.
560
    ///
561
    /// (If you want to handle one or more possible replies, see
562
    /// [`ClientTunnel::start_conversation`].)
563
    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
564
    #[cfg(feature = "send-control-msg")]
565
    pub async fn send_raw_msg(
566
        &self,
567
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
568
        hop: TargetHop,
569
    ) -> Result<()> {
570
        let (sender, receiver) = oneshot::channel();
571
        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
572
        self.circ
573
            .control
574
            .unbounded_send(ctrl_msg)
575
            .map_err(|_| Error::CircuitClosed)?;
576

            
577
        receiver.await.map_err(|_| Error::CircuitClosed)?
578
    }
579

            
580
    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
581
    ///
582
    /// To use this:
583
    ///
584
    ///  0. Create an inter-task channel you'll use to receive
585
    ///     the outcome of your conversation,
586
    ///     and bundle it into a [`UserMsgHandler`].
587
    ///
588
    ///  1. Call `start_conversation`.
589
    ///     This will install a your handler, for incoming messages,
590
    ///     and send the outgoing message (if you provided one).
591
    ///     After that, each message on the circuit
592
    ///     that isn't handled by the core machinery
593
    ///     is passed to your provided `reply_handler`.
594
    ///
595
    ///  2. Possibly call `send_msg` on the [`Conversation`],
596
    ///     from the call site of `start_conversation`,
597
    ///     possibly multiple times, from time to time,
598
    ///     to send further desired messages to the peer.
599
    ///
600
    ///  3. In your [`UserMsgHandler`], process the incoming messages.
601
    ///     You may respond by
602
    ///     sending additional messages
603
    ///     When the protocol exchange is finished,
604
    ///     `UserMsgHandler::handle_msg` should return
605
    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
606
    ///
607
    /// If you don't need the `Conversation` to send followup messages,
608
    /// you may simply drop it,
609
    /// and rely on the responses you get from your handler,
610
    /// on the channel from step 0 above.
611
    /// Your handler will remain installed and able to process incoming messages
612
    /// until it returns `ConversationFinished`.
613
    ///
614
    /// (If you don't want to accept any replies at all, it may be
615
    /// simpler to use [`ClientTunnel::send_raw_msg`].)
616
    ///
617
    /// Note that it is quite possible to use this function to violate the tor
618
    /// protocol; most users of this API will not need to call it.  It is used
619
    /// to implement most of the onion service handshake.
620
    ///
621
    /// # Limitations
622
    ///
623
    /// Only one conversation may be active at any one time,
624
    /// for any one circuit.
625
    /// This generally means that this function should not be called
626
    /// on a tunnel which might be shared with anyone else.
627
    ///
628
    /// Likewise, it is forbidden to try to extend the tunnel,
629
    /// while the conversation is in progress.
630
    ///
631
    /// After the conversation has finished, the tunnel may be extended.
632
    /// Or, `start_conversation` may be called again;
633
    /// but, in that case there will be a gap between the two conversations,
634
    /// during which no `UserMsgHandler` is installed,
635
    /// and unexpected incoming messages would close the tunnel.
636
    ///
637
    /// If these restrictions are violated, the tunnel will be closed with an error.
638
    ///
639
    /// ## Precise definition of the lifetime of a conversation
640
    ///
641
    /// A conversation is in progress from entry to `start_conversation`,
642
    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
643
    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
644
    /// (*Entry* since `handle_msg` is synchronously embedded
645
    /// into the incoming message processing.)
646
    /// So you may start a new conversation as soon as you have the final response
647
    /// via your inter-task channel from (0) above.
648
    ///
649
    /// The lifetime relationship of the [`Conversation`],
650
    /// vs the handler returning `ConversationFinished`
651
    /// is not enforced by the type system.
652
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
653
    // at least while allowing sending followup messages from outside the handler.
654
    #[cfg(feature = "send-control-msg")]
655
    pub async fn start_conversation(
656
        &self,
657
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
658
        reply_handler: impl MsgHandler + Send + 'static,
659
        hop: TargetHop,
660
    ) -> Result<Conversation<'_>> {
661
        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
662
        // the right Leg/Hop with inbound cell.
663
        let (sender, receiver) = oneshot::channel();
664
        self.circ
665
            .command
666
            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
667
            .map_err(|_| Error::CircuitClosed)?;
668
        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
669
        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
670
        let conversation = Conversation(self);
671
        conversation.send_internal(msg, Some(handler)).await?;
672
        Ok(conversation)
673
    }
674

            
675
    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
676
    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
677
    /// returns!).
678
    ///
679
    /// Note that other references to this tunnel may exist. If they do, they will stop working
680
    /// after you call this function.
681
    ///
682
    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
683
    /// close on its own once nothing is using it any more.
684
    // TODO(conflux): This should use the ReactorHandle instead.
685
    pub fn terminate(&self) {
686
        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
687
    }
688

            
689
    /// Helper: Send the resolve message, and read resolved message from
690
    /// resolve stream.
691
    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
692
        let components = self
693
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
694
            .await?;
695

            
696
        let StreamComponents {
697
            stream_receiver,
698
            target: _,
699
            memquota,
700
            xon_xoff_reader_ctrl: _,
701
        } = components;
702

            
703
        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
704
        resolve_stream.read_msg().await
705
    }
706

            
707
    // TODO(conflux)
708
}
709

            
710
// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
711
// has the expected (non-zero) number of hops.
712
impl TryFrom<ClientCirc> for ClientTunnel {
713
    type Error = Error;
714

            
715
364
    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
716
364
        Ok(Self { circ })
717
364
    }
718
}
719

            
720
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
721
/// it represents an error.
722
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
723
    match val {
724
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
725
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
726
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
727
        _ => Ok(val),
728
    }
729
}
730

            
731
/// A precise position in a tunnel.
732
#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
733
#[derive_deftly(HasMemoryCost)]
734
#[non_exhaustive]
735
pub enum HopLocation {
736
    /// A specific position in a tunnel.
737
    Hop((UniqId, HopNum)),
738
    /// The join point of a multi-path tunnel.
739
    JoinPoint,
740
}
741

            
742
/// A position in a tunnel.
743
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
744
#[non_exhaustive]
745
pub enum TargetHop {
746
    /// A specific position in a tunnel.
747
    Hop(HopLocation),
748
    /// The last hop of a tunnel.
749
    ///
750
    /// This should be used only when you don't care about what specific hop is used.
751
    /// Some tunnels may be extended or truncated,
752
    /// which means that the "last hop" may change at any time.
753
    LastHop,
754
}
755

            
756
impl From<(UniqId, HopNum)> for HopLocation {
757
172
    fn from(v: (UniqId, HopNum)) -> Self {
758
172
        HopLocation::Hop(v)
759
172
    }
760
}
761

            
762
impl From<(UniqId, HopNum)> for TargetHop {
763
16
    fn from(v: (UniqId, HopNum)) -> Self {
764
16
        TargetHop::Hop(v.into())
765
16
    }
766
}
767

            
768
impl HopLocation {
769
    /// Return the hop number if not a JointPoint.
770
    pub fn hop_num(&self) -> Option<HopNum> {
771
        match self {
772
            Self::Hop((_, hop_num)) => Some(*hop_num),
773
            Self::JoinPoint => None,
774
        }
775
    }
776
}
777

            
778
impl ClientTunnel {
779
    /// Close the pending stream that owns this StreamTarget, delivering the specified
780
    /// END message (if any)
781
    ///
782
    /// See [`StreamTarget::close_pending`].
783
    #[cfg(feature = "hs-service")]
784
12
    pub(crate) fn close_pending(
785
12
        &self,
786
12
        stream_id: StreamId,
787
12
        hop: Option<HopLocation>,
788
12
        message: crate::stream::CloseStreamBehavior,
789
12
    ) -> Result<oneshot::Receiver<Result<()>>> {
790
12
        let (tx, rx) = oneshot::channel();
791

            
792
12
        self.circ
793
12
            .control
794
12
            .unbounded_send(CtrlMsg::ClosePendingStream {
795
12
                stream_id,
796
12
                hop: hop.expect("missing stream hop for client tunnel"),
797
12
                message,
798
12
                done: tx,
799
12
            })
800
12
            .map_err(|_| Error::CircuitClosed)?;
801

            
802
12
        Ok(rx)
803
12
    }
804

            
805
    /// Request to send a SENDME cell for this stream.
806
    ///
807
    /// See [`StreamTarget::send_sendme`].
808
    pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
809
        self.circ
810
            .control
811
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
812
                msg: FlowCtrlMsg::Sendme,
813
                stream_id,
814
                hop: hop.expect("missing stream hop for client tunnel"),
815
            })
816
            .map_err(|_| Error::CircuitClosed)
817
    }
818

            
819
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
820
    ///
821
    /// See [`StreamTarget::drain_rate_update`].
822
    pub(crate) fn drain_rate_update(
823
        &self,
824
        stream_id: StreamId,
825
        hop: Option<HopLocation>,
826
        rate: XonKbpsEwma,
827
    ) -> Result<()> {
828
        self.circ
829
            .control
830
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
831
                msg: FlowCtrlMsg::Xon(rate),
832
                stream_id,
833
                hop: hop.expect("missing stream hop for client tunnel"),
834
            })
835
            .map_err(|_| Error::CircuitClosed)
836
    }
837
}