1
//! Implement a concrete type to build channels over a transport.
2

            
3
use async_trait::async_trait;
4
use futures::{AsyncRead, AsyncWrite};
5
use std::io;
6
use std::sync::{Arc, Mutex};
7
use std::time::Duration;
8
use tracing::instrument;
9

            
10
use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
11
use crate::transport::TransportImplHelper;
12
use crate::{Error, event::ChanMgrEventSender};
13

            
14
use safelog::MaybeSensitive;
15
use tor_basic_utils::rand_hostname;
16
use tor_error::internal;
17
use tor_linkspec::{ChanTarget, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
18
use tor_proto::channel::ChannelType;
19
use tor_proto::channel::kist::KistParams;
20
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
21
use tor_proto::memquota::ChannelAccount;
22
use tor_proto::peer::PeerAddr;
23
use tor_rtcompat::SpawnExt;
24
use tor_rtcompat::{CertifiedConn, Runtime, StreamOps, TlsProvider, tls::TlsConnector};
25

            
26
#[cfg(feature = "relay")]
27
use {
28
    safelog::Sensitive, std::net::SocketAddr, tor_error::bad_api_usage,
29
    tor_proto::RelayChannelAuthMaterial, tor_proto::relay::CreateRequestHandler,
30
};
31

            
32
/// TLS-based channel builder.
33
///
34
/// This is a separate type so that we can keep our channel management code
35
/// network-agnostic.
36
///
37
/// It uses a provided `TransportHelper` type to make a connection (possibly
38
/// directly over TCP, and possibly over some other protocol).  It then
39
/// negotiates TLS over that connection, and negotiates a Tor channel over that
40
/// TLS session.
41
///
42
/// This channel builder does not retry on failure, but it _does_ implement a
43
/// time-out.
44
pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
45
where
46
    R: tor_rtcompat::TlsProvider<H::Stream>,
47
{
48
    /// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
49
    runtime: R,
50
    /// The transport object that we use to construct streams.
51
    transport: H,
52
    /// Object to build TLS connections.
53
    tls_connector: <R as TlsProvider<H::Stream>>::Connector,
54
    /// Object to accept TLS connections.
55
    #[cfg(feature = "relay")]
56
    tls_acceptor: Option<<R as TlsProvider<H::Stream>>::Acceptor>,
57
    /// Relay authentication key material needed for relay channels.
58
    #[cfg(feature = "relay")]
59
    auth_material: Option<Arc<RelayChannelAuthMaterial>>,
60
    /// Our address(es) to use in the NETINFO cell.
61
    // TODO: We might want one day to support updating the addresses here in the same way we
62
    // support updating the auth_material. One use case for this is the relay config reload.
63
    #[cfg(feature = "relay")]
64
    my_addrs: Vec<SocketAddr>,
65
    /// Provided to each new channel so that they can handle CREATE* requests.
66
    #[cfg(feature = "relay")]
67
    create_request_handler: Option<Arc<CreateRequestHandler>>,
68
}
69

            
70
impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
71
where
72
    R: TlsProvider<H::Stream>,
73
{
74
    /// Construct a new client specific ChanBuilder.
75
60
    pub fn new_client(runtime: R, transport: H) -> Self {
76
60
        let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
77
60
        ChanBuilder {
78
60
            runtime,
79
60
            transport,
80
60
            tls_connector,
81
60
            #[cfg(feature = "relay")]
82
60
            tls_acceptor: None,
83
60
            #[cfg(feature = "relay")]
84
60
            auth_material: None,
85
60
            #[cfg(feature = "relay")]
86
60
            my_addrs: Vec::new(),
87
60
            #[cfg(feature = "relay")]
88
60
            create_request_handler: None,
89
60
        }
90
60
    }
91

            
92
    /// Construct a new relay specific ChanBuilder.
93
    #[cfg(feature = "relay")]
94
    pub fn new_relay(
95
        runtime: R,
96
        transport: H,
97
        auth_material: Arc<RelayChannelAuthMaterial>,
98
        my_addrs: Vec<SocketAddr>,
99
        create_request_handler: Option<Arc<CreateRequestHandler>>,
100
    ) -> crate::Result<Self> {
101
        use tor_error::into_internal;
102
        use tor_rtcompat::tls::TlsAcceptorSettings;
103

            
104
        // Build the TLS acceptor.
105
        let tls_settings = TlsAcceptorSettings::new(auth_material.tls_key_and_cert().clone())
106
            .map_err(into_internal!("Unable to build TLS acceptor setting"))?;
107
        let tls_acceptor = <R as TlsProvider<H::Stream>>::tls_acceptor(&runtime, tls_settings)
108
            .map_err(into_internal!("Unable to build TLS acceptor"))?;
109

            
110
        // Same builder as a client but with auth material and acceptor.
111
        let mut builder = Self::new_client(runtime, transport);
112
        builder.auth_material = Some(auth_material);
113
        builder.tls_acceptor = Some(tls_acceptor);
114
        builder.my_addrs = my_addrs;
115
        builder.create_request_handler = create_request_handler;
116

            
117
        Ok(builder)
118
    }
119

            
120
    /// Build a new `ChanBuilder` with the given `auth_material`,
121
    /// cloning everything else.
122
    ///
123
    /// This is needed because some relay keys rotate over time.
124
    #[cfg(feature = "relay")]
125
    pub fn rebuild_with_auth_material(
126
        &self,
127
        auth_material: Arc<RelayChannelAuthMaterial>,
128
    ) -> crate::Result<Self>
129
    where
130
        H: Clone,
131
    {
132
        Self::new_relay(
133
            self.runtime.clone(),
134
            self.transport.clone(),
135
            auth_material,
136
            self.my_addrs.clone(),
137
            self.create_request_handler.as_ref().map(Arc::clone),
138
        )
139
    }
140

            
141
    /// Build a new `ChanBuilder` with the given CREATE* request `handler`,
142
    /// cloning everything else.
143
    ///
144
    /// This is needed so that we can set the handler,
145
    /// which isn't known when the builder is initially created.
146
    #[cfg(feature = "relay")]
147
    pub fn rebuild_with_create_request_handler(
148
        &self,
149
        handler: Arc<CreateRequestHandler>,
150
    ) -> crate::Result<Self>
151
    where
152
        H: Clone,
153
    {
154
        let auth_material = self.auth_material.clone().ok_or_else(|| {
155
            internal!("Trying to set a CREATE* request handler for a non-relay channel builder")
156
        })?;
157

            
158
        Self::new_relay(
159
            self.runtime.clone(),
160
            self.transport.clone(),
161
            auth_material,
162
            self.my_addrs.clone(),
163
            Some(handler),
164
        )
165
    }
166

            
167
    /// Return the outbound channel type of this config.
168
    ///
169
    /// The channel type is used when creating outbound channels. Relays always initiate channels
170
    /// as "relay initiator" while client and bridges behave like a "client initiator".
171
    ///
172
    /// Important: The wrong channel type is returned if this is called before `with_identities()`
173
    /// is called.
174
6
    fn outbound_chan_type(&self) -> ChannelType {
175
        #[cfg(feature = "relay")]
176
6
        if self.auth_material.is_some() {
177
            return ChannelType::RelayInitiator;
178
6
        }
179
6
        ChannelType::ClientInitiator
180
6
    }
181
}
182

            
183
#[async_trait]
184
impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
185
where
186
    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
187
    H: Send + Sync,
188
{
189
    #[instrument(skip_all, level = "trace")]
190
    async fn connect_via_transport(
191
        &self,
192
        target: &OwnedChanTarget,
193
        reporter: BootstrapReporter,
194
        memquota: ChannelAccount,
195
    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
196
        use tor_rtcompat::SleepProviderExt;
197

            
198
        // TODO: make this an option.  And make a better value.
199
        let delay = if target.chan_method().is_direct() {
200
            std::time::Duration::new(5, 0)
201
        } else {
202
            std::time::Duration::new(10, 0)
203
        };
204

            
205
        self.runtime
206
            .timeout(delay, self.connect_no_timeout(target, reporter.0, memquota))
207
            .await
208
            .map_err(|_| Error::ChanTimeout {
209
                peer: target.to_logged(),
210
            })?
211
    }
212
}
213

            
214
#[async_trait]
215
impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
216
where
217
    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
218
    H: Send + Sync,
219
{
220
    type Stream = H::Stream;
221

            
222
    #[cfg(feature = "relay")]
223
    async fn accept_from_transport(
224
        &self,
225
        peer: Sensitive<std::net::SocketAddr>,
226
        stream: Self::Stream,
227
        memquota: ChannelAccount,
228
    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
229
        use tor_linkspec::OwnedChanTargetBuilder;
230
        use tor_proto::relay::MaybeVerifiableRelayResponderChannel;
231

            
232
        // Note that as we accept a connection, we don't expect any specific identities and so we
233
        // can only build a target from the peer address. This means that the verification process
234
        // will not match the identities seen (if a relay <-> relay channel) to this target which
235
        // is fine as we are a responder.
236
        let target_no_ids = OwnedChanTargetBuilder::default()
237
            .addrs(vec![peer.into_inner()])
238
            .build()
239
            .map_err(|e| internal!("Unable to build chan target from peer sockaddr: {e}"))?;
240
        // Convert into a PeerAddr but keep it sensitive, this can be a client/bridge.
241
        let peer_addr: MaybeSensitive<PeerAddr> =
242
            MaybeSensitive::sensitive(peer.into_inner().into());
243

            
244
        // Helpers: For error mapping.
245
        let map_ioe = |ioe, action| Error::Io {
246
            action,
247
            peer: peer_addr.clone(),
248
            source: ioe,
249
        };
250
        let map_proto = |source, target: &OwnedChanTarget, clock_skew| Error::Proto {
251
            source,
252
            peer: target.to_logged(),
253
            clock_skew,
254
        };
255

            
256
        let tls = self
257
            .tls_acceptor
258
            .as_ref()
259
            .ok_or(internal!("Accepting connection without TLS acceptor"))?
260
            .negotiate_unvalidated(stream, "ignored")
261
            .await
262
            .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
263
        let auth_material = self
264
            .auth_material
265
            .as_ref()
266
            .ok_or(internal!(
267
                "Unable to build relay channel without auth material"
268
            ))?
269
            .clone();
270

            
271
        let our_cert = tls
272
            .own_certificate()
273
            .map_err(|e| map_ioe(e.into(), "TLS Certs"))?
274
            .ok_or_else(|| Error::Internal(internal!("TLS connection without our certificate")))?
275
            .into_owned();
276

            
277
        let create_request_handler = self.create_request_handler.as_ref().ok_or_else(|| {
278
            bad_api_usage!("Can't create a relay channel without a CREATE* request handler")
279
        })?;
280

            
281
        let builder = tor_proto::RelayChannelBuilder::new();
282

            
283
        let unverified = builder
284
            .accept(
285
                Sensitive::new(peer_addr.inner()),
286
                self.my_addrs.clone(),
287
                tls,
288
                self.runtime.clone(),
289
                auth_material,
290
                memquota,
291
                Arc::clone(create_request_handler),
292
            )
293
            .handshake(|| self.runtime.wallclock())
294
            .await
295
            .map_err(|e| map_proto(e, &target_no_ids, None))?;
296

            
297
        let (chan, reactor) = match unverified {
298
            MaybeVerifiableRelayResponderChannel::Verifiable(c) => {
299
                let clock_skew = c.clock_skew();
300
                let now = self.runtime.wallclock();
301
                c.verify(&target_no_ids, &our_cert, Some(now))
302
                    .map_err(|e| map_proto(e, &target_no_ids, Some(clock_skew)))?
303
                    .finish()
304
                    .await
305
                    .map_err(|e| map_proto(e, &target_no_ids, Some(clock_skew)))?
306
            }
307
            MaybeVerifiableRelayResponderChannel::NonVerifiable(c) => {
308
                c.finish().map_err(|e| map_proto(e, &target_no_ids, None))?
309
            }
310
        };
311

            
312
        // Launch a task to run the channel reactor.
313
        self.runtime
314
            .spawn(async {
315
                let _ = reactor.run().await;
316
            })
317
            .map_err(|e| Error::from_spawn("responder channel reactor", e))?;
318

            
319
        Ok(chan)
320
    }
321
}
322

            
323
impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
324
where
325
    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
326
    H: Send + Sync,
327
{
328
    /// Perform the work of `connect_via_transport`, but without enforcing a timeout.
329
    ///
330
    /// Return a [`Channel`](tor_proto::channel::Channel) on success.
331
    #[instrument(skip_all, level = "trace")]
332
2
    async fn connect_no_timeout(
333
2
        &self,
334
2
        target: &OwnedChanTarget,
335
2
        event_sender: Arc<Mutex<ChanMgrEventSender>>,
336
2
        memquota: ChannelAccount,
337
2
    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
338
        use tor_rtcompat::tls::CertifiedConn;
339

            
340
        {
341
            event_sender.lock().expect("Lock poisoned").record_attempt();
342
        }
343

            
344
        // Before actually doing the connect, we need to validate the channel target based on this
345
        // build outbound channel type. There are restrictions we need to apply.
346
        self.validate_target(target)?;
347

            
348
        // 1a. Negotiate the TCP connection or other stream.
349

            
350
        // The returned PeerAddr is the actual address we are connected to.
351
        let (peer_addr, stream) = self.transport.connect(target).await?;
352
        // The peer could be a bridge/guard or a relay. We have to shield it right away to avoid
353
        // leaking the info in the logs but we also want the info for a relay<-> relay.
354
        let peer_addr = match self.outbound_chan_type() {
355
            ChannelType::ClientInitiator => MaybeSensitive::sensitive(peer_addr),
356
            ChannelType::RelayInitiator => MaybeSensitive::not_sensitive(peer_addr),
357
            _ => return Err(Error::Internal(internal!("Unknown outbound channel type"))),
358
        };
359

            
360
4
        let map_ioe = |action: &'static str| {
361
4
            let peer = peer_addr.clone();
362
            move |ioe: io::Error| Error::Io {
363
                action,
364
                peer,
365
                source: ioe.into(),
366
            }
367
4
        };
368

            
369
        {
370
            // TODO(nickm): At some point, it would be helpful to the
371
            // bootstrapping logic if we could distinguish which
372
            // transport just succeeded.
373
            event_sender
374
                .lock()
375
                .expect("Lock poisoned")
376
                .record_tcp_success();
377
        }
378

            
379
        // 1b. Negotiate TLS.
380

            
381
        let hostname = rand_hostname::random_hostname(&mut rand::rng());
382

            
383
        let tls = self
384
            .tls_connector
385
            .negotiate_unvalidated(stream, hostname.as_str())
386
            .await
387
            .map_err(map_ioe("TLS negotiation"))?;
388

            
389
        let peer_tls_cert = tls
390
            .peer_certificate()
391
            .map_err(map_ioe("TLS certs"))?
392
            .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?
393
            // Note: we could skip this "into_owned" if we computed any necessary digest on the
394
            // certificate earlier.  That would require changing out channel negotiation APIs,
395
            // though, and might not be worth it.
396
            .into_owned();
397

            
398
        {
399
            event_sender
400
                .lock()
401
                .expect("Lock poisoned")
402
                .record_tls_finished();
403
        }
404

            
405
        // Store this so we can log it in case we don't recognize it.
406
        let outbound_chan_type = self.outbound_chan_type();
407
        let chan = match outbound_chan_type {
408
            ChannelType::ClientInitiator => {
409
                self.build_client_channel(
410
                    tls,
411
                    peer_addr,
412
                    target,
413
                    &peer_tls_cert,
414
                    memquota,
415
                    event_sender.clone(),
416
                )
417
                .await?
418
            }
419
            #[cfg(feature = "relay")]
420
            ChannelType::RelayInitiator => {
421
                self.build_relay_channel(
422
                    tls,
423
                    peer_addr,
424
                    target,
425
                    &peer_tls_cert,
426
                    memquota,
427
                    event_sender.clone(),
428
                )
429
                .await?
430
            }
431
            _ => {
432
                return Err(Error::Internal(internal!(
433
                    "Unusable channel type for outbound: {outbound_chan_type}",
434
                )));
435
            }
436
        };
437

            
438
        event_sender
439
            .lock()
440
            .expect("Lock poisoned")
441
            .record_handshake_done();
442

            
443
        Ok(chan)
444
2
    }
445

            
446
    /// Validate the given `target` as we (a relay) are attempting to connect to another relay.
447
    #[cfg(feature = "relay")]
448
    fn validate_target_as_relay<CT>(&self, target: &CT) -> crate::Result<()>
449
    where
450
        CT: ChanTarget,
451
    {
452
        use tor_linkspec::HasRelayIds;
453

            
454
        // Make sure we don't attempt to use a PT method for this relay channel.
455
        if !target.chan_method().is_direct() {
456
            // Return an internal error because the circuit reactor asking us to use a PT is a code
457
            // flow issue, not a protocol violation.
458
            return Err(Error::UnusableTarget(tor_error::bad_api_usage!(
459
                "Relays don't support outbound PT channels"
460
            )));
461
        }
462

            
463
        // Make sure that all addresses are allowed for an EXTEND. For instance, connecting to a
464
        // local/private network for security reasons as a it could lead to network
465
        // scanning by measuring latency between successful connect() and failures.
466
        //
467
        // If no addresses, it returns true and thus no error.
468
        if !target.all_addrs_allowed_for_outgoing_channels() {
469
            return Err(Error::Proto {
470
                source: tor_proto::Error::ChanProto(
471
                    "Target address is not allowed for outgoing channels".into(),
472
                ),
473
                peer: target.to_owned().into(),
474
                clock_skew: None,
475
            });
476
        }
477

            
478
        // Connecting to a relay as a relay without key material will fail. This should never
479
        // happen hence the internal error.
480
        let Some(auth_material) = &self.auth_material else {
481
            return Err(Error::Internal(tor_error::bad_api_usage!(
482
                "Relay initiating a channel without key auth material"
483
            )));
484
        };
485

            
486
        // Any of our identities match the given target, we are connecting to ourselves, refuse.
487
        if auth_material.has_any_relay_id_from(target) {
488
            Err(Error::Proto {
489
                source: tor_proto::Error::ChanProto("Refusing to build channel to ourself".into()),
490
                peer: target.to_owned().into(),
491
                clock_skew: None,
492
            })
493
        } else {
494
            Ok(())
495
        }
496
    }
497

            
498
    /// Validate the given target as in if it is fine to connect to it.
499
    ///
500
    ///There are several rules to follow based on the channel outbound type.
501
2
    fn validate_target<CT>(&self, target: &CT) -> crate::Result<()>
502
2
    where
503
2
        CT: ChanTarget,
504
    {
505
        // Make sure that each address has a valid port.
506
2
        if !target.has_all_nonzero_port() {
507
            return Err(Error::Proto {
508
                source: tor_proto::Error::ChanProto("Target address port is invalid".into()),
509
                peer: target.to_owned().into(),
510
                clock_skew: None,
511
            });
512
2
        }
513

            
514
        // Make sure no target address is ourself including the port. A relay is allowed to connect
515
        // to its IP address but on a different port. We also want to avoid a client bridge to
516
        // connect back to itself.
517
        #[cfg(feature = "relay")]
518
2
        for addr in target.addrs() {
519
2
            if self.my_addrs.contains(&addr) {
520
                return Err(Error::Proto {
521
                    source: tor_proto::Error::ChanProto("Target address is ours".into()),
522
                    peer: target.to_owned().into(),
523
                    clock_skew: None,
524
                });
525
2
            }
526
        }
527

            
528
2
        let chan_type = self.outbound_chan_type();
529
2
        match chan_type {
530
            // This is a client connecting to a relay.
531
2
            ChannelType::ClientInitiator => Ok(()),
532
            // This is a relay connecting to a relay.
533
            #[cfg(feature = "relay")]
534
            ChannelType::RelayInitiator => self.validate_target_as_relay(target),
535
            // ChannelType is non_exhaustive but also we only cover outbound channels.
536
            _ => Err(Error::UnusableTarget(tor_error::bad_api_usage!(
537
                "Channel type can't be used as a target: {chan_type}"
538
            ))),
539
        }
540
2
    }
541

            
542
    /// Build a client channel (always initiator).
543
    ///
544
    /// This spawns the Reactor and return the [`tor_proto::channel::Channel`].
545
2
    async fn build_client_channel<T>(
546
2
        &self,
547
2
        tls: T,
548
2
        peer_addr: MaybeSensitive<PeerAddr>,
549
2
        target: &OwnedChanTarget,
550
2
        peer_tls_cert: &[u8],
551
2
        memquota: ChannelAccount,
552
2
        event_sender: Arc<Mutex<ChanMgrEventSender>>,
553
2
    ) -> crate::Result<Arc<tor_proto::channel::Channel>>
554
2
    where
555
2
        T: AsyncRead + AsyncWrite + CertifiedConn + StreamOps + Send + Unpin + 'static,
556
2
    {
557
        // Helper to map protocol level error.
558
        //
559
        // We are logging the `target` here as these protocol error happens during the handshake
560
        // and we need to log the identities that are being tried but it will honor safe logging
561
        // for the relay <-> relay case which is not ideal but a tradeoff in complexity.
562
2
        let map_proto = |source, target: &OwnedChanTarget, clock_skew| Error::Proto {
563
            source,
564
            peer: target.to_logged(),
565
            clock_skew,
566
        };
567

            
568
2
        let now = self.runtime.wallclock();
569

            
570
        // Get the client specific channel builder.
571
2
        let mut builder = tor_proto::ClientChannelBuilder::new();
572
2
        builder.set_declared_method(target.chan_method());
573

            
574
2
        let unverified = builder
575
2
            .launch(
576
2
                tls,
577
2
                self.runtime.clone(), /* TODO provide ZST SleepProvider instead */
578
2
                memquota,
579
            )
580
2
            .connect(|| self.runtime.wallclock())
581
2
            .await
582
2
            .map_err(|e| Error::from_proto_no_skew(e, target))?;
583

            
584
2
        let clock_skew = unverified.clock_skew();
585
2
        let (chan, reactor) = unverified
586
2
            .verify(target, peer_tls_cert, Some(now))
587
2
            .map_err(|source| match &source {
588
                tor_proto::Error::HandshakeCertsExpired { .. } => {
589
                    event_sender
590
                        .lock()
591
                        .expect("Lock poisoned")
592
                        .record_handshake_done_with_skewed_clock();
593
                    map_proto(source, target, Some(clock_skew))
594
                }
595
                _ => Error::from_proto_no_skew(source, target),
596
            })?
597
2
            .finish(peer_addr)
598
2
            .await
599
2
            .map_err(|e| map_proto(e, target, Some(clock_skew)))?;
600

            
601
        // Launch a task to run the channel reactor.
602
2
        self.runtime
603
2
            .spawn(async {
604
2
                let _ = reactor.run().await;
605
2
            })
606
2
            .map_err(|e| Error::from_spawn("client channel reactor", e))?;
607
2
        Ok(chan)
608
2
    }
609

            
610
    /// Build a relay initiator channel.
611
    ///
612
    /// This spawns the Reactor and return the [`tor_proto::channel::Channel`].
613
    #[cfg(feature = "relay")]
614
    async fn build_relay_channel<T>(
615
        &self,
616
        tls: T,
617
        peer_addr: MaybeSensitive<PeerAddr>,
618
        target: &OwnedChanTarget,
619
        peer_tls_cert: &[u8],
620
        memquota: ChannelAccount,
621
        event_sender: Arc<Mutex<ChanMgrEventSender>>,
622
    ) -> crate::Result<Arc<tor_proto::channel::Channel>>
623
    where
624
        T: AsyncRead + AsyncWrite + CertifiedConn + StreamOps + Send + Unpin + 'static,
625
    {
626
        let builder = tor_proto::RelayChannelBuilder::new();
627
        let auth_material = self
628
            .auth_material
629
            .as_ref()
630
            .ok_or(internal!(
631
                "Unable to build relay channel without auth material"
632
            ))?
633
            .clone();
634

            
635
        let create_request_handler = self.create_request_handler.as_ref().ok_or_else(|| {
636
            bad_api_usage!("Can't create a relay channel without a CREATE* request handler")
637
        })?;
638

            
639
        let unverified = builder
640
            .launch(
641
                tls,
642
                self.runtime.clone(), /* TODO provide ZST SleepProvider instead */
643
                auth_material,
644
                self.my_addrs.clone(),
645
                target,
646
                memquota,
647
                Arc::clone(create_request_handler),
648
            )
649
            .connect(|| self.runtime.wallclock())
650
            .await
651
            .map_err(|e| Error::from_proto_no_skew(e, target))?;
652

            
653
        let now = self.runtime.wallclock();
654
        let clock_skew = unverified.clock_skew();
655
        let (chan, reactor) = unverified
656
            .verify(target, peer_tls_cert, Some(now))
657
            .map_err(|source| match &source {
658
                tor_proto::Error::HandshakeCertsExpired { .. } => {
659
                    event_sender
660
                        .lock()
661
                        .expect("Lock poisoned")
662
                        .record_handshake_done_with_skewed_clock();
663
                    Error::Proto {
664
                        source,
665
                        peer: target.to_logged(),
666
                        clock_skew: Some(clock_skew),
667
                    }
668
                }
669
                _ => Error::from_proto_no_skew(source, target),
670
            })?
671
            // Peer address is not sensitive as this is a relay<->relay connection.
672
            .finish(peer_addr.inner())
673
            .await
674
            .map_err(|source| Error::Proto {
675
                source,
676
                peer: target.to_logged(),
677
                clock_skew: Some(clock_skew),
678
            })?;
679

            
680
        // Launch a task to run the channel reactor.
681
        self.runtime
682
            .spawn(async {
683
                let _ = reactor.run().await;
684
            })
685
            .map_err(|e| Error::from_spawn("relay channel reactor", e))?;
686

            
687
        Ok(chan)
688
    }
689
}
690

            
691
impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
692
    fn is_canonical(&self) -> bool {
693
        self.is_canonical()
694
    }
695
    fn is_canonical_to_peer(&self) -> bool {
696
        self.is_canonical_to_peer()
697
    }
698
2
    fn is_usable(&self) -> bool {
699
2
        !self.is_closing()
700
2
    }
701
2
    fn duration_unused(&self) -> Option<Duration> {
702
2
        self.duration_unused()
703
2
    }
704
108
    fn reparameterize(
705
108
        &self,
706
108
        updates: Arc<ChannelPaddingInstructionsUpdates>,
707
108
    ) -> tor_proto::Result<()> {
708
108
        tor_proto::channel::Channel::reparameterize(self, updates)
709
108
    }
710
    fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()> {
711
        tor_proto::channel::Channel::reparameterize_kist(self, kist_params)
712
    }
713
36
    fn engage_padding_activities(&self) {
714
36
        tor_proto::channel::Channel::engage_padding_activities(self);
715
36
    }
716
}
717

            
718
#[cfg(test)]
719
mod test {
720
    // @@ begin test lint list maintained by maint/add_warning @@
721
    #![allow(clippy::bool_assert_comparison)]
722
    #![allow(clippy::clone_on_copy)]
723
    #![allow(clippy::dbg_macro)]
724
    #![allow(clippy::mixed_attributes_style)]
725
    #![allow(clippy::print_stderr)]
726
    #![allow(clippy::print_stdout)]
727
    #![allow(clippy::single_char_pattern)]
728
    #![allow(clippy::unwrap_used)]
729
    #![allow(clippy::unchecked_time_subtraction)]
730
    #![allow(clippy::useless_vec)]
731
    #![allow(clippy::needless_pass_by_value)]
732
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
733
    use super::*;
734
    use crate::{
735
        Result,
736
        mgr::{AbstractChannel, AbstractChannelFactory},
737
    };
738
    use futures::StreamExt as _;
739
    use std::net::SocketAddr;
740
    use std::time::{Duration, SystemTime};
741
    use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
742
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
743
    use tor_llcrypto::pk::rsa::RsaIdentity;
744
    use tor_proto::channel::Channel;
745
    use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
746
    use tor_rtcompat::{NetStreamListener, test_with_one_runtime};
747
    use tor_rtmock::{io::LocalStream, net::MockNetwork};
748

            
749
    #[allow(deprecated)] // TODO #1885
750
    use tor_rtmock::MockSleepRuntime;
751

            
752
    // Make sure that the builder can build a real channel.  To test
753
    // this out, we set up a listener that pretends to have the right
754
    // IP, fake the current time, and use a canned response from
755
    // [`testing::msgs`] crate.
756
    #[test]
757
    fn build_ok() -> Result<()> {
758
        use crate::testing::msgs;
759
        let orport: SocketAddr = msgs::ADDR.parse().unwrap();
760
        let ed: Ed25519Identity = msgs::ED_ID.into();
761
        let rsa: RsaIdentity = msgs::RSA_ID.into();
762
        let client_addr = "192.0.2.17".parse().unwrap();
763
        let tls_cert = msgs::X509_CERT.into();
764
        let target = OwnedChanTarget::builder()
765
            .addrs(vec![orport])
766
            .method(ChannelMethod::Direct(vec![orport]))
767
            .ed_identity(ed)
768
            .rsa_identity(rsa)
769
            .build()
770
            .unwrap();
771
        let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
772

            
773
        test_with_one_runtime!(|rt| async move {
774
            // Stub out the internet so that this connection can work.
775
            let network = MockNetwork::new();
776

            
777
            // Set up a client runtime with a given IP
778
            let client_rt = network
779
                .builder()
780
                .add_address(client_addr)
781
                .runtime(rt.clone());
782
            // Mock the current time too
783
            #[allow(deprecated)] // TODO #1885
784
            let client_rt = MockSleepRuntime::new(client_rt);
785

            
786
            // Set up a relay runtime with a different IP
787
            let relay_rt = network
788
                .builder()
789
                .add_address(orport.ip())
790
                .runtime(rt.clone());
791

            
792
            // open a fake TLS listener and be ready to handle a request.
793
            let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
794

            
795
            // Tell the client to believe in a different timestamp.
796
            client_rt.jump_to(now);
797

            
798
            // Create the channel builder that we want to test.
799
            let transport = crate::transport::DefaultTransport::new(client_rt.clone(), None);
800
            let builder = ChanBuilder::new_client(client_rt, transport);
801

            
802
            let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
803
                async {
804
                    // client-side: build a channel!
805
                    builder
806
                        .build_channel(
807
                            &target,
808
                            BootstrapReporter::fake(),
809
                            ChannelAccount::new_noop(),
810
                        )
811
                        .await
812
                },
813
                async {
814
                    // relay-side: accept the channel
815
                    // (and pretend to know what we're doing).
816
                    let (mut con, addr) = lis
817
                        .incoming()
818
                        .next()
819
                        .await
820
                        .expect("Closed?")
821
                        .expect("accept failed");
822
                    assert_eq!(client_addr, addr.ip());
823
                    crate::testing::answer_channel_req(&mut con)
824
                        .await
825
                        .expect("answer failed");
826
                    Ok(con)
827
                }
828
            );
829

            
830
            let chan = r1.unwrap();
831
            assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
832
            assert!(chan.is_usable());
833
            // In theory, time could pass here, so we can't just use
834
            // "assert_eq!(dur_unused, dur_unused2)".
835
            let dur_unused = Channel::duration_unused(&chan);
836
            let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
837
            let dur_unused_3 = Channel::duration_unused(&chan);
838
            assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
839
            assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
840

            
841
            r2.unwrap();
842
            Ok(())
843
        })
844
    }
845

            
846
    // TODO: Write tests for timeout logic, once there is smarter logic.
847
}