1
//! Traits and code to define different mechanisms for building Channels to
2
//! different kinds of targets.
3

            
4
use std::sync::{Arc, Mutex};
5

            
6
use crate::event::ChanMgrEventSender;
7
use async_trait::async_trait;
8
use tor_error::{HasKind, HasRetryTime, internal};
9
use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName};
10
use tor_proto::channel::Channel;
11
use tor_proto::memquota::ChannelAccount;
12
use tracing::{debug, instrument};
13

            
14
#[cfg(feature = "relay")]
15
use safelog::Sensitive;
16

            
17
/// An opaque type that lets a `ChannelFactory` update the `ChanMgr` about bootstrap progress.
18
///
19
/// A future release of this crate might make this type less opaque.
20
// FIXME(eta): Do that.
21
#[derive(Clone)]
22
pub struct BootstrapReporter(pub(crate) Arc<Mutex<ChanMgrEventSender>>);
23

            
24
impl BootstrapReporter {
25
    #[cfg(test)]
26
    /// Create a useless version of this type to satisfy some test.
27
60
    pub(crate) fn fake() -> Self {
28
60
        let (snd, _rcv) = crate::event::channel();
29
60
        Self(Arc::new(Mutex::new(snd)))
30
60
    }
31
}
32

            
33
/// An object that knows how to build `Channels` to `ChanTarget`s.
34
///
35
/// This trait must be object-safe.
36
///
37
/// Every [`ChanMgr`](crate::ChanMgr) has a `ChannelFactory` that it uses to
38
/// construct all of its channels.
39
///
40
/// A `ChannelFactory` can be implemented in terms of a
41
/// [`TransportImplHelper`](crate::transport::TransportImplHelper), by wrapping it in a
42
/// `ChanBuilder`.
43
///
44
// FIXME(eta): Rectify the below situation.
45
/// (In fact, as of the time of writing, this is the *only* way to implement this trait
46
/// outside of this crate while keeping bootstrap status reporting, since `BootstrapReporter`
47
/// is an opaque type.)
48
#[async_trait]
49
pub trait ChannelFactory: Send + Sync {
50
    /// Open an authenticated channel to `target`.
51
    ///
52
    /// This method does does not necessarily handle retries or timeouts,
53
    /// although some of its implementations may.
54
    ///
55
    /// This method does not necessarily handle every kind of transport. If the
56
    /// caller provides a target with an unsupported
57
    /// [`TransportId`](tor_linkspec::TransportId), this method should return
58
    /// [`Error::NoSuchTransport`](crate::Error::NoSuchTransport).
59
    async fn connect_via_transport(
60
        &self,
61
        target: &OwnedChanTarget,
62
        reporter: BootstrapReporter,
63
        memquota: ChannelAccount,
64
    ) -> crate::Result<Arc<Channel>>;
65
}
66

            
67
/// Similar to [`ChannelFactory`], but for building channels from incoming streams.
68
// This is a separate trait since for some `ChannelFactory`s like the one returned from
69
// `tor_ptmgr::PtMgr::factory_for_transport`, it doesn't make sense to deal with incoming streams
70
// (all PT connections are outgoing).
71
#[async_trait]
72
pub trait IncomingChannelFactory: Send + Sync {
73
    /// The type of byte stream that's required to build channels for incoming connections.
74
    type Stream: Send + Sync + 'static;
75

            
76
    /// Open a channel from `peer` with the given `stream`. The channel may or may not be
77
    /// authenticated.
78
    #[cfg(feature = "relay")]
79
    async fn accept_from_transport(
80
        &self,
81
        peer: Sensitive<std::net::SocketAddr>,
82
        stream: Self::Stream,
83
        memquota: ChannelAccount,
84
    ) -> crate::Result<Arc<Channel>>;
85
}
86

            
87
#[async_trait]
88
impl<CF> crate::mgr::AbstractChannelFactory for CF
89
where
90
    CF: ChannelFactory + IncomingChannelFactory + Sync,
91
{
92
    type Channel = tor_proto::channel::Channel;
93
    type BuildSpec = OwnedChanTarget;
94
    type Stream = CF::Stream;
95

            
96
    #[instrument(skip_all, level = "trace")]
97
    async fn build_channel(
98
        &self,
99
        target: &Self::BuildSpec,
100
        reporter: BootstrapReporter,
101
        memquota: ChannelAccount,
102
    ) -> crate::Result<Arc<Self::Channel>> {
103
        debug!("Attempting to open a new channel to {target}");
104
        self.connect_via_transport(target, reporter, memquota).await
105
    }
106

            
107
    #[cfg(feature = "relay")]
108
    #[instrument(skip_all, level = "trace")]
109
    async fn build_channel_using_incoming(
110
        &self,
111
        peer: Sensitive<std::net::SocketAddr>,
112
        stream: Self::Stream,
113
        memquota: ChannelAccount,
114
    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
115
        debug!("Attempting to open a new channel from {peer}");
116
        self.accept_from_transport(peer, stream, memquota).await
117
    }
118
}
119

            
120
/// The error type returned by a pluggable transport manager.
121
pub trait AbstractPtError:
122
    std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug
123
{
124
}
125

            
126
/// A pluggable transport manager.
127
///
128
/// We can't directly reference the `PtMgr` type from `tor-ptmgr`, because of dependency resolution
129
/// constraints, so this defines the interface for what one should look like.
130
#[async_trait]
131
pub trait AbstractPtMgr: Send + Sync {
132
    /// Get a `ChannelFactory` for the provided `PtTransportName`.
133
    async fn factory_for_transport(
134
        &self,
135
        transport: &PtTransportName,
136
    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>>;
137
}
138

            
139
#[async_trait]
140
impl<P> AbstractPtMgr for Option<P>
141
where
142
    P: AbstractPtMgr,
143
{
144
    async fn factory_for_transport(
145
        &self,
146
        transport: &PtTransportName,
147
    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
148
        match self {
149
            Some(mgr) => mgr.factory_for_transport(transport).await,
150
            None => Ok(None),
151
        }
152
    }
153
}
154

            
155
/// A ChannelFactory built from an optional PtMgr to use for pluggable transports, and a
156
/// ChannelFactory to use for everything else.
157
pub(crate) struct CompoundFactory<CF> {
158
    #[cfg(feature = "pt-client")]
159
    /// The PtMgr to use for pluggable transports
160
    ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
161
    /// The factory to use for everything else
162
    default_factory: Arc<CF>,
163
}
164

            
165
impl<CF> Clone for CompoundFactory<CF> {
166
    fn clone(&self) -> Self {
167
        Self {
168
            #[cfg(feature = "pt-client")]
169
            ptmgr: self.ptmgr.as_ref().map(Arc::clone),
170
            default_factory: Arc::clone(&self.default_factory),
171
        }
172
    }
173
}
174

            
175
#[async_trait]
176
impl<CF: ChannelFactory> ChannelFactory for CompoundFactory<CF> {
177
    #[instrument(skip_all, level = "trace")]
178
    async fn connect_via_transport(
179
        &self,
180
        target: &OwnedChanTarget,
181
        reporter: BootstrapReporter,
182
        memquota: ChannelAccount,
183
    ) -> crate::Result<Arc<Channel>> {
184
        use tor_linkspec::ChannelMethod::*;
185
        let factory = match target.chan_method() {
186
            Direct(_) => self.default_factory.clone(),
187
            #[cfg(feature = "pt-client")]
188
            Pluggable(a) => match self.ptmgr.as_ref() {
189
                Some(mgr) => mgr
190
                    .factory_for_transport(a.transport())
191
                    .await
192
                    .map_err(crate::Error::Pt)?
193
                    .ok_or_else(|| crate::Error::NoSuchTransport(a.transport().clone().into()))?,
194
                None => return Err(crate::Error::NoSuchTransport(a.transport().clone().into())),
195
            },
196
            #[allow(unreachable_patterns)]
197
            _ => {
198
                return Err(crate::Error::Internal(internal!(
199
                    "No support for channel method"
200
                )));
201
            }
202
        };
203

            
204
        factory
205
            .connect_via_transport(target, reporter, memquota)
206
            .await
207
    }
208
}
209

            
210
#[async_trait]
211
impl<CF: IncomingChannelFactory> IncomingChannelFactory for CompoundFactory<CF> {
212
    type Stream = CF::Stream;
213

            
214
    #[cfg(feature = "relay")]
215
    async fn accept_from_transport(
216
        &self,
217
        peer: Sensitive<std::net::SocketAddr>,
218
        stream: Self::Stream,
219
        memquota: ChannelAccount,
220
    ) -> crate::Result<Arc<Channel>> {
221
        self.default_factory
222
            .accept_from_transport(peer, stream, memquota)
223
            .await
224
    }
225
}
226

            
227
impl<CF: ChannelFactory + 'static> CompoundFactory<CF> {
228
    /// Create a new `Factory` that will try to use `ptmgr` to handle pluggable
229
    /// transports requests, and `default_factory` to handle everything else.
230
58
    pub(crate) fn new(
231
58
        default_factory: Arc<CF>,
232
58
        #[cfg(feature = "pt-client")] ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
233
58
    ) -> Self {
234
58
        Self {
235
58
            default_factory,
236
58
            #[cfg(feature = "pt-client")]
237
58
            ptmgr,
238
58
        }
239
58
    }
240

            
241
    #[cfg(feature = "pt-client")]
242
    /// Replace the PtMgr in this object.
243
22
    pub(crate) fn replace_ptmgr(&mut self, ptmgr: Arc<dyn AbstractPtMgr + 'static>) {
244
22
        self.ptmgr = Some(ptmgr);
245
22
    }
246

            
247
    /// Return a reference to the default channel factory.
248
    #[cfg(feature = "relay")]
249
    pub(crate) fn default_factory(&self) -> &CF {
250
        &self.default_factory
251
    }
252

            
253
    /// Replace the default channel factory.
254
    #[cfg(feature = "relay")]
255
    pub(crate) fn replace_default_factory(&mut self, factory: Arc<CF>) {
256
        self.default_factory = factory;
257
    }
258
}