1
//! Client-specific types and implementation.
2

            
3
pub mod channel;
4
pub mod circuit;
5
pub mod stream;
6

            
7
#[cfg(feature = "rpc")]
8
pub mod rpc;
9

            
10
#[cfg(feature = "send-control-msg")]
11
pub(crate) mod msghandler;
12
pub(crate) mod reactor;
13

            
14
use derive_deftly::Deftly;
15
use oneshot_fused_workaround as oneshot;
16
use std::net::IpAddr;
17
use std::sync::Arc;
18
use tracing::instrument;
19

            
20
use crate::circuit::UniqId;
21
use crate::circuit::circhop::ReactorStreamComponents;
22
#[cfg(feature = "circ-padding-manual")]
23
pub use crate::client::circuit::padding::{
24
    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
25
};
26
use crate::client::stream::{
27
    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
28
    StreamReceiver,
29
};
30
use crate::congestion::sendme::StreamRecvWindow;
31
use crate::crypto::cell::HopNum;
32
use crate::memquota::{SpecificAccount as _, StreamAccount};
33
use crate::stream::STREAM_READER_BUFFER;
34
use crate::stream::cmdcheck::AnyCmdChecker;
35
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
36
use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
37
use crate::{Error, ResolveError, Result};
38
use circuit::{ClientCirc, Path};
39
use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
40

            
41
use tor_cell::relaycell::StreamId;
42
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
43
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
44
use tor_error::bad_api_usage;
45
use tor_linkspec::OwnedChanTarget;
46
use tor_memquota::derive_deftly_template_HasMemoryCost;
47
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
48

            
49
#[cfg(feature = "hs-service")]
50
use crate::stream::incoming::StreamReqInfo;
51

            
52
#[cfg(feature = "hs-service")]
53
use crate::client::stream::{IncomingCmdChecker, IncomingStream};
54

            
55
#[cfg(feature = "send-control-msg")]
56
use msghandler::{MsgHandler, UserMsgHandler};
57

            
58
/// Handle to use during an ongoing protocol exchange with a circuit's last hop
59
///
60
/// This is obtained from [`ClientTunnel::start_conversation`],
61
/// and used to send messages to the last hop relay.
62
//
63
// TODO(conflux): this should use ClientTunnel, and it should be moved into
64
// the tunnel module.
65
#[cfg(feature = "send-control-msg")]
66
pub struct Conversation<'r>(&'r ClientTunnel);
67

            
68
#[cfg(feature = "send-control-msg")]
69
impl Conversation<'_> {
70
    /// Send a protocol message as part of an ad-hoc exchange
71
    ///
72
    /// Responses are handled by the `UserMsgHandler` set up
73
    /// when the `Conversation` was created.
74
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
75
        self.send_internal(Some(msg), None).await
76
    }
77

            
78
    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
79
    ///
80
    /// The guts of `start_conversation` and `Conversation::send_msg`
81
    pub(crate) async fn send_internal(
82
        &self,
83
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
84
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
85
    ) -> Result<()> {
86
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
87
        let (sender, receiver) = oneshot::channel();
88

            
89
        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
90
            msg,
91
            handler,
92
            sender,
93
        };
94
        self.0
95
            .circ
96
            .control
97
            .unbounded_send(ctrl_msg)
98
            .map_err(|_| Error::CircuitClosed)?;
99

            
100
        receiver.await.map_err(|_| Error::CircuitClosed)?
101
    }
102
}
103

            
104
/// A low-level client tunnel API.
105
///
106
/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
107
///
108
/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
109
/// desired subset of functionality depending on purpose and path size.
110
///
111
/// Some API calls are for single path and some for multi path. A check with the underlying reactor
112
/// is done preventing for instance multi path calls to be used on a single path. Top level types
113
/// should prevent this and thus this object should never be used directly.
114
#[derive(Debug)]
115
#[cfg_attr(
116
    feature = "rpc",
117
    derive(derive_deftly::Deftly),
118
    derive_deftly(tor_rpcbase::templates::Object)
119
)]
120
#[allow(dead_code)] // TODO(conflux)
121
pub struct ClientTunnel {
122
    /// The underlying handle to the reactor.
123
    circ: ClientCirc,
124
}
125

            
126
impl ClientTunnel {
127
    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
128
    /// circuit tunnel.
129
    ///
130
    /// Returns an error if the tunnel has more than one circuit.
131
436
    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
132
436
        if self.circ.is_multi_path {
133
4
            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
134
432
        }
135
432
        Ok(&self.circ)
136
436
    }
137

            
138
    /// Return the channel target of the first hop.
139
    ///
140
    /// Can only be used for single path tunnel.
141
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
142
        self.as_single_circ()?.first_hop()
143
    }
144

            
145
    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
146
    /// receiving or sending.
147
128
    pub fn is_closed(&self) -> bool {
148
128
        self.circ.is_closing()
149
128
    }
150

            
151
    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
152
    /// HopLocation with its id and hop number.
153
    ///
154
    /// Return an error if there is no last hop.
155
    pub fn last_hop(&self) -> Result<TargetHop> {
156
        let uniq_id = self.unique_id();
157
        let hop_num = self
158
            .circ
159
            .mutable
160
            .last_hop_num(uniq_id)?
161
            .ok_or_else(|| bad_api_usage!("no last hop"))?;
162
        Ok((uniq_id, hop_num).into())
163
    }
164

            
165
    /// Return a description of the last hop of the tunnel.
166
    ///
167
    /// Return None if the last hop is virtual; return an error
168
    /// if the tunnel has no circuits, or all of its circuits are zero length.
169
    ///
170
    ///
171
    /// # Panics
172
    ///
173
    /// Panics if there is no last hop.  (This should be impossible outside of
174
    /// the tor-proto crate, but within the crate it's possible to have a
175
    /// circuit with no hops.)
176
    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
177
        self.circ.last_hop_info()
178
    }
179

            
180
    /// Return the number of hops this tunnel as. Fail for a multi path.
181
96
    pub fn n_hops(&self) -> Result<usize> {
182
96
        self.as_single_circ()?.n_hops()
183
96
    }
184

            
185
    /// Return the [`Path`] objects describing all the hops
186
    /// of all the circuits in this tunnel.
187
    pub fn all_paths(&self) -> Vec<Arc<Path>> {
188
        self.circ.all_paths()
189
    }
190

            
191
    /// Return a representation of the Paths for all the circuits in this tunnel,
192
    /// as a map from each circuits' UniqId to its path.
193
    ///
194
    /// This is only exposed for the RPC subsystem, where it is documented that the
195
    /// format of `UniqId` is not stable.
196
    #[cfg(feature = "rpc")]
197
    pub(crate) fn tagged_paths(&self) -> std::collections::HashMap<UniqId, Arc<Path>> {
198
        self.circ.mutable.tagged_paths()
199
    }
200

            
201
    /// Return a process-unique identifier for this tunnel.
202
    ///
203
    /// Returns the reactor unique ID of the main reactor.
204
128
    pub fn unique_id(&self) -> UniqId {
205
128
        self.circ.unique_id()
206
128
    }
207

            
208
    /// Return the time at which this tunnel last had any open streams.
209
    ///
210
    /// Returns `None` if this tunnel has never had any open streams,
211
    /// or if it currently has open streams.
212
    ///
213
    /// NOTE that the Instant returned by this method is not affected by
214
    /// any runtime mocking; it is the output of an ordinary call to
215
    /// `Instant::now()`.
216
    pub async fn disused_since(&self) -> Result<Option<web_time_compat::Instant>> {
217
        self.circ.disused_since().await
218
    }
219

            
220
    /// Return a future that will resolve once the underlying circuit reactor has closed.
221
    ///
222
    /// Note that this method does not itself cause the tunnel to shut down.
223
    pub fn wait_for_close(
224
        self: &Arc<Self>,
225
    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
226
        self.circ.wait_for_close()
227
    }
228

            
229
    /// Single-path tunnel only. Multi path onion service is not supported yet.
230
    ///
231
    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
232
    /// to create new Tor streams, and to return those pending requests in an
233
    /// asynchronous stream.
234
    ///
235
    /// Ordinarily, these requests are rejected.
236
    ///
237
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
238
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
239
    /// an error.
240
    ///
241
    /// After this method has been called on a tunnel, the tunnel is expected
242
    /// to receive requests of this type indefinitely, until it is finally closed.
243
    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
244
    ///
245
    /// Only onion services (and eventually) exit relays should call this
246
    /// method.
247
    //
248
    // TODO: Someday, we might want to allow a stream request handler to be
249
    // un-registered.  However, nothing in the Tor protocol requires it.
250
    //
251
    // Any incoming request handlers installed on the other circuits
252
    // (which are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
253
    // will be discarded (along with the reactor of that circuit)
254
    #[cfg(feature = "hs-service")]
255
    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
256
64
    pub async fn allow_stream_requests<'a, FILT>(
257
64
        self: &Arc<Self>,
258
64
        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
259
64
        hop: TargetHop,
260
64
        filter: FILT,
261
64
    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
262
64
    where
263
64
        FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
264
64
    {
265
        use futures::stream::StreamExt;
266

            
267
        /// The size of the channel receiving IncomingStreamRequestContexts.
268
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
269

            
270
        // TODO(#2002): support onion service conflux
271
64
        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
272
            "Cannot allow stream requests on a multi-path tunnel"
273
4
        ))?;
274

            
275
60
        let time_prov = circ.time_provider.clone();
276
60
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
277
60
        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
278
60
            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
279
60
        let (tx, rx) = oneshot::channel();
280

            
281
60
        circ.command
282
60
            .unbounded_send(CtrlCmd::AwaitStreamRequest {
283
60
                cmd_checker,
284
60
                incoming_sender,
285
60
                hop,
286
60
                done: tx,
287
60
                filter: Box::new(filter),
288
60
            })
289
60
            .map_err(|_| Error::CircuitClosed)?;
290

            
291
        // Check whether the AwaitStreamRequest was processed successfully.
292
60
        rx.await.map_err(|_| Error::CircuitClosed)??;
293

            
294
48
        let allowed_hop_loc: HopLocation = match hop {
295
48
            TargetHop::Hop(loc) => Some(loc),
296
            _ => None,
297
        }
298
48
        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
299

            
300
48
        let tunnel = self.clone();
301
48
        Ok(incoming_receiver.map(move |req_ctx| {
302
            let StreamReqInfo {
303
36
                req,
304
36
                stream_id,
305
36
                hop,
306
                stream_components:
307
                    ReactorStreamComponents {
308
36
                        stream_inbound_rx,
309
36
                        stream_outbound_tx,
310
36
                        rate_limit_rx,
311
36
                        drain_rate_request_rx,
312
                    },
313
36
                memquota,
314
36
                relay_cell_format,
315
36
            } = req_ctx;
316

            
317
            // We already enforce this in handle_incoming_stream_request; this
318
            // assertion is just here to make sure that we don't ever
319
            // accidentally remove or fail to enforce that check, since it is
320
            // security-critical.
321
36
            assert_eq!(Some(allowed_hop_loc), hop);
322

            
323
            // TODO(#2002): figure out what this is going to look like
324
            // for onion services (perhaps we should forbid this function
325
            // from being called on a multipath circuit?)
326
            //
327
            // See also:
328
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
329
36
            let target = StreamTarget {
330
36
                tunnel: Tunnel::Client(Arc::clone(&tunnel)),
331
36
                tx: stream_outbound_tx,
332
36
                hop: Some(allowed_hop_loc),
333
36
                stream_id,
334
36
                relay_cell_format,
335
36
                rate_limit_stream: rate_limit_rx,
336
36
            };
337

            
338
            // can be used to build a reader that supports XON/XOFF flow control
339
36
            let xon_xoff_reader_ctrl =
340
36
                XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
341

            
342
36
            let reader = StreamReceiver {
343
36
                target: target.clone(),
344
36
                receiver: stream_inbound_rx,
345
36
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
346
36
                ended: false,
347
36
            };
348

            
349
36
            let components = StreamComponents {
350
36
                stream_receiver: reader,
351
36
                target,
352
36
                memquota,
353
36
                xon_xoff_reader_ctrl,
354
36
            };
355

            
356
36
            IncomingStream::new(time_prov.clone(), req, components)
357
36
        }))
358
64
    }
359

            
360
    /// Single and Multi path helper, used to begin a stream.
361
    ///
362
    /// This function allocates a stream ID, and sends the message
363
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
364
    ///
365
    /// The caller will typically want to see the first cell in response,
366
    /// to see whether it is e.g. an END or a CONNECTED.
367
96
    async fn begin_stream_impl(
368
96
        self: &Arc<Self>,
369
96
        begin_msg: AnyRelayMsg,
370
96
        cmd_checker: AnyCmdChecker,
371
144
    ) -> Result<StreamComponents> {
372
        // TODO: Possibly this should take a hop, rather than just
373
        // assuming it's the last hop.
374
96
        let hop = TargetHop::LastHop;
375

            
376
96
        let memquota = StreamAccount::new(self.circ.mq_account())?;
377
96
        let (tx, rx) = oneshot::channel();
378

            
379
96
        self.circ
380
96
            .control
381
96
            .unbounded_send(CtrlMsg::BeginStream {
382
96
                hop,
383
96
                message: begin_msg,
384
96
                memquota: memquota.clone(),
385
96
                done: tx,
386
96
                cmd_checker,
387
96
            })
388
96
            .map_err(|_| Error::CircuitClosed)?;
389

            
390
96
        let (stream_id, hop, relay_cell_format, stream_components) =
391
96
            rx.await.map_err(|_| Error::CircuitClosed)??;
392

            
393
        // Destructure so that we don't forget to use any fields.
394
        let ReactorStreamComponents {
395
96
            stream_inbound_rx,
396
96
            stream_outbound_tx,
397
96
            rate_limit_rx,
398
96
            drain_rate_request_rx,
399
96
        } = stream_components;
400

            
401
96
        let target = StreamTarget {
402
96
            tunnel: Tunnel::Client(self.clone()),
403
96
            tx: stream_outbound_tx,
404
96
            hop: Some(hop),
405
96
            stream_id,
406
96
            relay_cell_format,
407
96
            rate_limit_stream: rate_limit_rx,
408
96
        };
409

            
410
        // can be used to build a reader that supports XON/XOFF flow control
411
96
        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
412

            
413
96
        let stream_receiver = StreamReceiver {
414
96
            target: target.clone(),
415
96
            receiver: stream_inbound_rx,
416
96
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
417
96
            ended: false,
418
96
        };
419

            
420
96
        let components = StreamComponents {
421
96
            stream_receiver,
422
96
            target,
423
96
            memquota,
424
96
            xon_xoff_reader_ctrl,
425
96
        };
426

            
427
96
        Ok(components)
428
96
    }
429

            
430
    /// Install a [`CircuitPadder`] at the listed `hop`.
431
    ///
432
    /// Replaces any previous padder installed at that hop.
433
    #[cfg(feature = "circ-padding-manual")]
434
    pub async fn start_padding_at_hop(
435
        self: &Arc<Self>,
436
        hop: HopLocation,
437
        padder: CircuitPadder,
438
    ) -> Result<()> {
439
        self.circ.set_padder_impl(hop, Some(padder)).await
440
    }
441

            
442
    /// Remove any [`CircuitPadder`] at the listed `hop`.
443
    ///
444
    /// Does nothing if there was not a padder installed there.
445
    #[cfg(feature = "circ-padding-manual")]
446
    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
447
        self.circ.set_padder_impl(hop, None).await
448
    }
449

            
450
    /// Start a DataStream (anonymized connection) to the given
451
    /// address and port, using a BEGIN cell.
452
96
    async fn begin_data_stream(
453
96
        self: &Arc<Self>,
454
96
        msg: AnyRelayMsg,
455
96
        optimistic: bool,
456
144
    ) -> Result<DataStream> {
457
96
        let components = self
458
96
            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
459
96
            .await?;
460

            
461
        let StreamComponents {
462
96
            stream_receiver,
463
96
            target,
464
96
            memquota,
465
96
            xon_xoff_reader_ctrl,
466
96
        } = components;
467

            
468
96
        let mut stream = DataStream::new(
469
96
            self.circ.time_provider.clone(),
470
96
            stream_receiver,
471
96
            xon_xoff_reader_ctrl,
472
96
            target,
473
96
            memquota,
474
        );
475
96
        if !optimistic {
476
84
            stream.wait_for_connection().await?;
477
12
        }
478
96
        Ok(stream)
479
96
    }
480

            
481
    /// Single and multi path helper.
482
    ///
483
    /// Start a stream to the given address and port, using a BEGIN
484
    /// cell.
485
    ///
486
    /// The use of a string for the address is intentional: you should let
487
    /// the remote Tor relay do the hostname lookup for you.
488
    #[instrument(level = "trace", skip_all)]
489
84
    pub async fn begin_stream(
490
84
        self: &Arc<Self>,
491
84
        target: &str,
492
84
        port: u16,
493
84
        parameters: Option<StreamParameters>,
494
126
    ) -> Result<DataStream> {
495
        let parameters = parameters.unwrap_or_default();
496
        let begin_flags = parameters.begin_flags();
497
        let optimistic = parameters.is_optimistic();
498
        let target = if parameters.suppressing_hostname() {
499
            ""
500
        } else {
501
            target
502
        };
503
        let beginmsg = Begin::new(target, port, begin_flags)
504
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
505
        self.begin_data_stream(beginmsg.into(), optimistic).await
506
84
    }
507

            
508
    /// Start a new stream to the last relay in the tunnel, using
509
    /// a BEGIN_DIR cell.
510
18
    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
511
        // Note that we always open begindir connections optimistically.
512
        // Since they are local to a relay that we've already authenticated
513
        // with and built a tunnel to, there should be no additional checks
514
        // we need to perform to see whether the BEGINDIR will succeed.
515
12
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
516
12
            .await
517
12
    }
518

            
519
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
520
    /// in this tunnel.
521
    ///
522
    /// Note that this function does not check for timeouts; that's
523
    /// the caller's responsibility.
524
    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
525
        let resolve_msg = Resolve::new(hostname);
526

            
527
        let resolved_msg = self.try_resolve(resolve_msg).await?;
528

            
529
        resolved_msg
530
            .into_answers()
531
            .into_iter()
532
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
533
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
534
                Ok(_) => None,
535
                Err(e) => Some(Err(e)),
536
            })
537
            .collect()
538
    }
539

            
540
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
541
    /// the last relay on this tunnel.
542
    ///
543
    /// Note that this function does not check for timeouts; that's
544
    /// the caller's responsibility.
545
    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
546
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
547

            
548
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
549

            
550
        resolved_msg
551
            .into_answers()
552
            .into_iter()
553
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
554
                Ok(ResolvedVal::Hostname(v)) => Some(
555
                    String::from_utf8(v)
556
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
557
                ),
558
                Ok(_) => None,
559
                Err(e) => Some(Err(e)),
560
            })
561
            .collect()
562
    }
563

            
564
    /// Send an ad-hoc message to a given hop on the circuit, without expecting
565
    /// a reply.
566
    ///
567
    /// (If you want to handle one or more possible replies, see
568
    /// [`ClientTunnel::start_conversation`].)
569
    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
570
    #[cfg(feature = "send-control-msg")]
571
    pub async fn send_raw_msg(
572
        &self,
573
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
574
        hop: TargetHop,
575
    ) -> Result<()> {
576
        let (sender, receiver) = oneshot::channel();
577
        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
578
        self.circ
579
            .control
580
            .unbounded_send(ctrl_msg)
581
            .map_err(|_| Error::CircuitClosed)?;
582

            
583
        receiver.await.map_err(|_| Error::CircuitClosed)?
584
    }
585

            
586
    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
587
    ///
588
    /// To use this:
589
    ///
590
    ///  0. Create an inter-task channel you'll use to receive
591
    ///     the outcome of your conversation,
592
    ///     and bundle it into a [`UserMsgHandler`].
593
    ///
594
    ///  1. Call `start_conversation`.
595
    ///     This will install a your handler, for incoming messages,
596
    ///     and send the outgoing message (if you provided one).
597
    ///     After that, each message on the circuit
598
    ///     that isn't handled by the core machinery
599
    ///     is passed to your provided `reply_handler`.
600
    ///
601
    ///  2. Possibly call `send_msg` on the [`Conversation`],
602
    ///     from the call site of `start_conversation`,
603
    ///     possibly multiple times, from time to time,
604
    ///     to send further desired messages to the peer.
605
    ///
606
    ///  3. In your [`UserMsgHandler`], process the incoming messages.
607
    ///     You may respond by
608
    ///     sending additional messages
609
    ///     When the protocol exchange is finished,
610
    ///     `UserMsgHandler::handle_msg` should return
611
    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
612
    ///
613
    /// If you don't need the `Conversation` to send followup messages,
614
    /// you may simply drop it,
615
    /// and rely on the responses you get from your handler,
616
    /// on the channel from step 0 above.
617
    /// Your handler will remain installed and able to process incoming messages
618
    /// until it returns `ConversationFinished`.
619
    ///
620
    /// (If you don't want to accept any replies at all, it may be
621
    /// simpler to use [`ClientTunnel::send_raw_msg`].)
622
    ///
623
    /// Note that it is quite possible to use this function to violate the tor
624
    /// protocol; most users of this API will not need to call it.  It is used
625
    /// to implement most of the onion service handshake.
626
    ///
627
    /// # Limitations
628
    ///
629
    /// Only one conversation may be active at any one time,
630
    /// for any one circuit.
631
    /// This generally means that this function should not be called
632
    /// on a tunnel which might be shared with anyone else.
633
    ///
634
    /// Likewise, it is forbidden to try to extend the tunnel,
635
    /// while the conversation is in progress.
636
    ///
637
    /// After the conversation has finished, the tunnel may be extended.
638
    /// Or, `start_conversation` may be called again;
639
    /// but, in that case there will be a gap between the two conversations,
640
    /// during which no `UserMsgHandler` is installed,
641
    /// and unexpected incoming messages would close the tunnel.
642
    ///
643
    /// If these restrictions are violated, the tunnel will be closed with an error.
644
    ///
645
    /// ## Precise definition of the lifetime of a conversation
646
    ///
647
    /// A conversation is in progress from entry to `start_conversation`,
648
    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
649
    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
650
    /// (*Entry* since `handle_msg` is synchronously embedded
651
    /// into the incoming message processing.)
652
    /// So you may start a new conversation as soon as you have the final response
653
    /// via your inter-task channel from (0) above.
654
    ///
655
    /// The lifetime relationship of the [`Conversation`],
656
    /// vs the handler returning `ConversationFinished`
657
    /// is not enforced by the type system.
658
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
659
    // at least while allowing sending followup messages from outside the handler.
660
    #[cfg(feature = "send-control-msg")]
661
    pub async fn start_conversation(
662
        &self,
663
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
664
        reply_handler: impl MsgHandler + Send + 'static,
665
        hop: TargetHop,
666
    ) -> Result<Conversation<'_>> {
667
        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
668
        // the right Leg/Hop with inbound cell.
669
        let (sender, receiver) = oneshot::channel();
670
        self.circ
671
            .command
672
            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
673
            .map_err(|_| Error::CircuitClosed)?;
674
        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
675
        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
676
        let conversation = Conversation(self);
677
        conversation.send_internal(msg, Some(handler)).await?;
678
        Ok(conversation)
679
    }
680

            
681
    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
682
    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
683
    /// returns!).
684
    ///
685
    /// Note that other references to this tunnel may exist. If they do, they will stop working
686
    /// after you call this function.
687
    ///
688
    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
689
    /// close on its own once nothing is using it any more.
690
    // TODO(conflux): This should use the ReactorHandle instead.
691
    pub fn terminate(&self) {
692
        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
693
    }
694

            
695
    /// Helper: Send the resolve message, and read resolved message from
696
    /// resolve stream.
697
    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
698
        let components = self
699
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
700
            .await?;
701

            
702
        let StreamComponents {
703
            stream_receiver,
704
            target: _,
705
            memquota,
706
            xon_xoff_reader_ctrl: _,
707
        } = components;
708

            
709
        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
710
        resolve_stream.read_msg().await
711
    }
712

            
713
    // TODO(conflux)
714
}
715

            
716
// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
717
// has the expected (non-zero) number of hops.
718
impl TryFrom<ClientCirc> for ClientTunnel {
719
    type Error = Error;
720

            
721
364
    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
722
364
        Ok(Self { circ })
723
364
    }
724
}
725

            
726
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
727
/// it represents an error.
728
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
729
    match val {
730
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
731
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
732
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
733
        _ => Ok(val),
734
    }
735
}
736

            
737
/// A precise position in a tunnel.
738
#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
739
#[derive_deftly(HasMemoryCost)]
740
#[non_exhaustive]
741
pub enum HopLocation {
742
    /// A specific position in a tunnel.
743
    Hop((UniqId, HopNum)),
744
    /// The join point of a multi-path tunnel.
745
    JoinPoint,
746
}
747

            
748
/// A position in a tunnel.
749
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
750
#[non_exhaustive]
751
pub enum TargetHop {
752
    /// A specific position in a tunnel.
753
    Hop(HopLocation),
754
    /// The last hop of a tunnel.
755
    ///
756
    /// This should be used only when you don't care about what specific hop is used.
757
    /// Some tunnels may be extended or truncated,
758
    /// which means that the "last hop" may change at any time.
759
    LastHop,
760
}
761

            
762
impl From<(UniqId, HopNum)> for HopLocation {
763
172
    fn from(v: (UniqId, HopNum)) -> Self {
764
172
        HopLocation::Hop(v)
765
172
    }
766
}
767

            
768
impl From<(UniqId, HopNum)> for TargetHop {
769
16
    fn from(v: (UniqId, HopNum)) -> Self {
770
16
        TargetHop::Hop(v.into())
771
16
    }
772
}
773

            
774
impl HopLocation {
775
    /// Return the hop number if not a JointPoint.
776
    pub fn hop_num(&self) -> Option<HopNum> {
777
        match self {
778
            Self::Hop((_, hop_num)) => Some(*hop_num),
779
            Self::JoinPoint => None,
780
        }
781
    }
782
}
783

            
784
impl ClientTunnel {
785
    /// Close the pending stream that owns this StreamTarget, delivering the specified
786
    /// END message (if any)
787
    ///
788
    /// See [`StreamTarget::close_pending`].
789
    #[cfg(feature = "hs-service")]
790
12
    pub(crate) fn close_pending(
791
12
        &self,
792
12
        stream_id: StreamId,
793
12
        hop: Option<HopLocation>,
794
12
        message: crate::stream::CloseStreamBehavior,
795
12
    ) -> Result<oneshot::Receiver<Result<()>>> {
796
12
        let (tx, rx) = oneshot::channel();
797

            
798
12
        self.circ
799
12
            .control
800
12
            .unbounded_send(CtrlMsg::ClosePendingStream {
801
12
                stream_id,
802
12
                hop: hop.expect("missing stream hop for client tunnel"),
803
12
                message,
804
12
                done: tx,
805
12
            })
806
12
            .map_err(|_| Error::CircuitClosed)?;
807

            
808
12
        Ok(rx)
809
12
    }
810

            
811
    /// Request to send a SENDME cell for this stream.
812
    ///
813
    /// See [`StreamTarget::send_sendme`].
814
    pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
815
        self.circ
816
            .control
817
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
818
                msg: FlowCtrlMsg::Sendme,
819
                stream_id,
820
                hop: hop.expect("missing stream hop for client tunnel"),
821
            })
822
            .map_err(|_| Error::CircuitClosed)
823
    }
824

            
825
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
826
    ///
827
    /// See [`StreamTarget::drain_rate_update`].
828
    pub(crate) fn drain_rate_update(
829
        &self,
830
        stream_id: StreamId,
831
        hop: Option<HopLocation>,
832
        rate: XonKbpsEwma,
833
    ) -> Result<()> {
834
        self.circ
835
            .control
836
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
837
                msg: FlowCtrlMsg::Xon(rate),
838
                stream_id,
839
                hop: hop.expect("missing stream hop for client tunnel"),
840
            })
841
            .map_err(|_| Error::CircuitClosed)
842
    }
843
}