1
//! Objects that can become or wrap a [`arti_client::DataStream`].
2

            
3
use arti_client::rpc::{
4
    ClientConnectionResult, ConnectWithPrefs, ResolvePtrWithPrefs, ResolveWithPrefs,
5
};
6
use derive_deftly::Deftly;
7
use std::{
8
    net::IpAddr,
9
    sync::{Arc, Mutex},
10
};
11
use tor_error::into_internal;
12
use tor_proto::client::stream::ClientDataStreamCtrl;
13
use tor_rpcbase::{self as rpc, templates::*};
14

            
15
use crate::RpcSession;
16

            
17
/// An RPC object representing a single-use client that captures a data-stream.
18
///
19
/// This object is returned by the `arti:new_oneshot_client` method, and starts out with
20
/// enough information to know how to create a `DataStream`, or to respond
21
/// to some other SOCKS request.
22
/// When this object is the target of a SOCKS request,
23
/// it takes its target address, port, and isolation parameters from the SOCKS handshake,
24
/// and launches a data stream.
25
/// It then becomes interchangeable with the stream that was launched.
26
///
27
/// This object is single-use: once a SOCKS request has referred to it,
28
/// it cannot be used for any other SOCKS request.
29
/// (Otherwise, it could not be usable interchangeably with the `DataStream` it creates.)
30
///
31
/// The ObjectID for this object can be used as the target of a SOCKS request.
32
#[derive(Deftly)]
33
#[derive_deftly(Object)]
34
#[deftly(rpc(
35
    delegate_with = "|this: &Self| this.get_ctrl()",
36
    delegate_type = "tor_proto::client::stream::ClientDataStreamCtrl",
37
    expose_outside_of_session
38
))]
39
pub(crate) struct OneshotClient {
40
    /// The inner state of this object.
41
    inner: Mutex<Inner>,
42
}
43

            
44
/// The inner state of an `OneshotClient`.
45
///
46
/// A stream is created in the "Unused" state.
47
enum Inner {
48
    /// Newly constructed: Waiting for a SOCKS command.
49
    ///
50
    /// This is the initial state for every OneshotClient.
51
    ///
52
    /// It may become `Launching` or `UsedToResolve`.
53
    Unused(Arc<dyn rpc::Object>),
54

            
55
    /// The actual connection is being made, ie we are within `connect_with_prefs`
56
    ///
57
    /// If the state is `Launching`, no one except `connect_with_prefs` may change it.
58
    ///
59
    /// From this state, a stream may become `Stream`, or `StreamFailed`.
60
    Launching,
61

            
62
    /// Stream constructed; may or may not be connected.
63
    ///
64
    /// A stream does not exit this state.  Even if the stream is closed or fails,
65
    /// its `ClientDataStreamCtrl` remains until it is dropped.
66
    Stream(Arc<ClientDataStreamCtrl>),
67

            
68
    /// Stream was used for a resolve or resolve_ptr request; there is no underlying stream.
69
    ///
70
    /// A stream does not exit this state, even if resolve request fails.
71
    //
72
    // TODO RPC: We may want to make this state hold more information if someday we
73
    // make DNS requests into objects that we can inspect while they are running.
74
    UsedToResolve,
75

            
76
    /// Failed to construct the tor_proto::DataStream object.
77
    ///
78
    /// A stream does not exit this state.
79
    StreamFailed,
80
}
81

            
82
/// Error returned by an operations from OneshotClient.
83
#[derive(Debug, Clone, thiserror::Error)]
84
enum OneshotClientError {
85
    /// Application tried to open a stream using a OneshotClient,
86
    /// but that OneshotClient had already been used previously.
87
    #[error("Data stream object already used")]
88
    AlreadyUsed,
89
}
90

            
91
impl tor_error::HasKind for OneshotClientError {
92
    fn kind(&self) -> tor_error::ErrorKind {
93
        use OneshotClientError as E;
94
        use tor_error::ErrorKind as EK;
95
        match self {
96
            E::AlreadyUsed => EK::BadApiUsage, // TODO RPC: is this the correct ErrorKind?
97
        }
98
    }
99
}
100

            
101
impl OneshotClient {
102
    /// Construct a new unused OneshotClient that will make its connection
103
    /// with `connector`.
104
    ///
105
    /// The `connector` object should implement at least one of ConnectWithPrefs, ResolveWithPrefs,
106
    /// or ResolvePtrWithPrefs, or else it won't actually be useful for anything.
107
    pub(crate) fn new(connector: Arc<dyn rpc::Object>) -> Self {
108
        Self {
109
            inner: Mutex::new(Inner::Unused(connector)),
110
        }
111
    }
112

            
113
    /// If this `OneshotClient` is in state Unused, replace its state with `new_state`
114
    /// and return the ClientConnectionTarget.  Otherwise, leave its state unchanged
115
    /// and return an error.
116
    fn take_connector(&self, new_state: Inner) -> Result<Arc<dyn rpc::Object>, OneshotClientError> {
117
        let mut inner = self.inner.lock().expect("poisoned lock");
118
        let val = std::mem::replace(&mut *inner, new_state);
119
        if let Inner::Unused(conn) = val {
120
            Ok(conn)
121
        } else {
122
            *inner = val;
123
            Err(OneshotClientError::AlreadyUsed)
124
        }
125
    }
126

            
127
    /// Return the `ClientDataStreamCtrl` for this stream, if it has one.
128
    fn get_ctrl(&self) -> Option<Arc<ClientDataStreamCtrl>> {
129
        let inner = self.inner.lock().expect("poisoned lock");
130
        if let Inner::Stream(s) = &*inner {
131
            Some(s.clone())
132
        } else {
133
            None
134
        }
135
    }
136
}
137

            
138
/// Invoke ConnectWithPrefs on an OneshotClient.
139
///
140
/// Unlike the other methods on OneshotClient, this one is somewhat complex, since it must
141
/// re-register the resulting datastream once it has one.
142
async fn oneshot_client_connect_with_prefs(
143
    rpc_data_stream: Arc<OneshotClient>,
144
    mut method: Box<ConnectWithPrefs>,
145
    ctx: Arc<dyn rpc::Context>,
146
) -> ClientConnectionResult<arti_client::DataStream> {
147
    // Extract the connector.
148
    //
149
    // As we do this, we put this OneshotClient into a Launching state.
150
    //
151
    // (`Launching`` wouldn't need to exist if we `connect_with_prefs` were synchronous,
152
    // but it isn't synchronous, so `Launching` is an observable state.)
153
    let connector = rpc_data_stream
154
        .take_connector(Inner::Launching)
155
        .map_err(|e| Box::new(e) as _)?;
156

            
157
    // Internally, we're going to tell tor-proto to make an optimistic stream.
158
    // The only effect here is that the DataStream will be returned immediately by
159
    // our invoke_special_method call, which would otherwise call `wait_for_connection`
160
    // if the stream was _not_ originally optimistic.
161
    //
162
    // We use `was_optimistic` to remember whether the prefs was _originally_
163
    // configured to give an optimistic stream,
164
    // so that we know whether _we_ should do the `wait_for_connection``.
165
    //
166
    // From the POV of the SOCKS proxy code that is calling this function,
167
    // it will still receive the requested optimistic or non-optimistic behavior,
168
    // since the `wait_for_connection` call will still happen (or not happen)
169
    // as requested, causing _this_ function to possibly wait.
170
    //
171
    // The only observable impact here is that this object
172
    // will immediately transition to its new state,
173
    // so that other RPC calls will see a `DataStreamCtrl` object.
174
    let was_optimistic = method.prefs.is_optimistic();
175
    method.prefs.optimistic();
176

            
177
    // Now, launch the connection.  Since we marked it as optimistic,
178
    // this call should return almost immediately.
179
    let stream: Result<arti_client::DataStream, _> =
180
        *rpc::invoke_special_method(ctx, connector, method)
181
            .await
182
            .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
183

            
184
    // Pick the new state for this object, and install it.
185
    let new_obj = match &stream {
186
        Ok(s) => Inner::Stream(
187
            s.client_stream_ctrl()
188
                .expect("Created a client stream with no ClientDataStreamCtrl!?")
189
                .clone(),
190
        ),
191
        Err(_) => Inner::StreamFailed, // TODO RPC: Remember some error information here.
192
    };
193
    {
194
        let mut inner = rpc_data_stream.inner.lock().expect("poisoned lock");
195
        *inner = new_obj;
196
    }
197
    // Return early on failure.
198
    let mut stream = stream?;
199

            
200
    if !was_optimistic {
201
        // Implement non-optimistic behavior, if that is what was originally configured.
202
        stream
203
            .wait_for_connection()
204
            .await
205
            .map_err(|e| Box::new(e) as _)?;
206
    }
207

            
208
    // Return the stream; the SOCKS layer will take it from here.
209
    Ok(stream)
210
}
211

            
212
/// Invoke ResolveWithPrefs on an OneshotClient
213
async fn oneshot_client_resolve_with_prefs(
214
    rpc_data_stream: Arc<OneshotClient>,
215
    method: Box<ResolveWithPrefs>,
216
    ctx: Arc<dyn rpc::Context>,
217
) -> ClientConnectionResult<Vec<IpAddr>> {
218
    let connector = rpc_data_stream
219
        .take_connector(Inner::UsedToResolve)
220
        .map_err(|e| Box::new(e) as _)?;
221

            
222
    let result = rpc::invoke_special_method(ctx, connector, method)
223
        .await
224
        .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
225

            
226
    *result
227
}
228

            
229
/// Invoke ResolvePtrWithPrefs on an OneshotClient
230
async fn oneshot_client_resolve_ptr_with_prefs(
231
    rpc_data_stream: Arc<OneshotClient>,
232
    method: Box<ResolvePtrWithPrefs>,
233
    ctx: Arc<dyn rpc::Context>,
234
) -> ClientConnectionResult<Vec<String>> {
235
    let connector = rpc_data_stream
236
        .take_connector(Inner::UsedToResolve)
237
        .map_err(|e| Box::new(e) as _)?;
238

            
239
    let result = rpc::invoke_special_method(ctx, connector, method)
240
        .await
241
        .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
242

            
243
    *result
244
}
245

            
246
/// Create a new `RpcOneshotClient` to wait for a SOCKS request.
247
///
248
/// The resulting ObjectID will be a handle to an `RpcOneshotClient`.
249
/// It can be used as the target of a single SOCKS request.
250
///
251
/// Once used for a SOCKS connect request,
252
/// the object will become a handle for the underlying DataStream
253
/// that was created with the request.
254
#[derive(Debug, serde::Deserialize, serde::Serialize, Deftly)]
255
#[derive_deftly(DynMethod)]
256
#[deftly(rpc(method_name = "arti:new_oneshot_client"))]
257
pub(crate) struct NewOneshotClient {}
258

            
259
impl rpc::RpcMethod for NewOneshotClient {
260
    type Output = rpc::SingleIdResponse;
261
    type Update = rpc::NoUpdates; // TODO actually, updates are quite suitable here.
262
}
263

            
264
/// Helper: construct and register an OneshotClient.
265
fn new_oneshot_client_impl(
266
    connector: Arc<dyn rpc::Object>,
267
    ctx: &dyn rpc::Context,
268
) -> rpc::ObjectId {
269
    let rpc_stream = Arc::new(OneshotClient::new(connector));
270
    ctx.register_owned(rpc_stream as _)
271
}
272

            
273
/// Implement NewOneshotClient for clients.
274
pub(crate) async fn new_oneshot_client_on_client<R: tor_rtcompat::Runtime>(
275
    client: Arc<arti_client::TorClient<R>>,
276
    _method: Box<NewOneshotClient>,
277
    ctx: Arc<dyn rpc::Context>,
278
) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
279
    Ok(new_oneshot_client_impl(client, ctx.as_ref()).into())
280
}
281

            
282
/// Implement NewOneshotClient for RpcSession.
283
async fn new_oneshot_client_on_session(
284
    session: Arc<RpcSession>,
285
    _method: Box<NewOneshotClient>,
286
    ctx: Arc<dyn rpc::Context>,
287
) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
288
    Ok(new_oneshot_client_impl(session, ctx.as_ref()).into())
289
}
290
rpc::static_rpc_invoke_fn! {
291
    new_oneshot_client_on_session;
292
    @special oneshot_client_connect_with_prefs;
293
    @special oneshot_client_resolve_with_prefs;
294
    @special oneshot_client_resolve_ptr_with_prefs;
295
}