1
//! Experimental RPC support.
2

            
3
use anyhow::Result;
4
use arti_rpcserver::RpcMgr;
5
use derive_deftly::Deftly;
6
use fs_mistrust::Mistrust;
7
use futures::{AsyncReadExt, stream::StreamExt};
8
use session::ArtiRpcSession;
9
use std::collections::BTreeMap;
10
use std::{io::Result as IoResult, sync::Arc};
11
use tor_config::derive::prelude::*;
12
use tor_config_path::CfgPathResolver;
13
use tracing::{debug, info};
14

            
15
use arti_client::TorClient;
16
use tor_rtcompat::{NetStreamListener as _, Runtime, SpawnExt, general};
17

            
18
pub(crate) mod conntarget;
19
pub(crate) mod listener;
20
mod proxyinfo;
21
mod session;
22
mod superuser;
23

            
24
use listener::RpcListenerSetConfig;
25
pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
26

            
27
use crate::reload_cfg::LaunchableTorClient;
28
use crate::rpc::superuser::RpcSuperuser;
29

            
30
/// Configuration for Arti's RPC subsystem.
31
///
32
/// You cannot change this section on a running Arti client.
33
#[derive(Debug, Clone, Deftly, Eq, PartialEq)]
34
#[derive_deftly(TorConfig)]
35
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
36
#[cfg_attr(feature = "experimental-api", deftly(tor_config(vis = "pub")))]
37
pub(crate) struct RpcConfig {
38
    /// If true, then the RPC subsystem is enabled and will listen for connections.
39
    #[deftly(tor_config(default = "false"))] // TODO RPC make this true once we are stable.
40
    enable: bool,
41

            
42
    /// A set of named locations in which to find connect files.
43
    #[deftly(tor_config(map, default = "listener::listener_map_defaults()"))]
44
    listen: BTreeMap<String, RpcListenerSetConfig>,
45

            
46
    /// A list of default connect points to bind
47
    /// if no enabled connect points are found under `listen`.
48
    #[deftly(tor_config(list(element(clone)), default = "listen_defaults_defaults()"))]
49
    listen_default: Vec<String>,
50
}
51

            
52
/// Return default values for `RpcConfig.listen_default`
53
396
fn listen_defaults_defaults() -> Vec<String> {
54
396
    vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
55
396
}
56

            
57
/// Information about an incoming connection.
58
///
59
/// Yielded in a stream from our RPC listeners.
60
type IncomingConn = (
61
    general::Stream,
62
    general::SocketAddr,
63
    Arc<listener::RpcConnInfo>,
64
);
65

            
66
/// Bind to all configured RPC listeners in `cfg`.
67
///
68
/// On success, return a stream of `IncomingConn`.
69
#[allow(clippy::cognitive_complexity)] // TODO: Refactor?
70
async fn launch_all_listeners<R: Runtime>(
71
    runtime: &R,
72
    cfg: &RpcConfig,
73
    resolver: &CfgPathResolver,
74
    mistrust: &Mistrust,
75
) -> anyhow::Result<(
76
    impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin + use<R>,
77
    Vec<tor_rpc_connect::server::ListenerGuard>,
78
)> {
79
    let mut listeners = Vec::new();
80
    let mut guards = Vec::new();
81
    for (name, listener_cfg) in cfg.listen.iter() {
82
        for (lis, info, guard) in listener_cfg
83
            .bind(runtime, name.as_str(), resolver, mistrust)
84
            .await?
85
        {
86
            // (Note that `bind` only returns enabled listeners, so we don't need to check here.
87
            debug!(
88
                "Listening at {} for {}",
89
                lis.local_addr()
90
                    .expect("general::listener without address?")
91
                    .display_lossy(),
92
                info.name,
93
            );
94
            listeners.push((lis, info));
95
            guards.push(guard);
96
        }
97
    }
98
    if listeners.is_empty() {
99
        for (idx, connpt) in cfg.listen_default.iter().enumerate() {
100
            let display_index = idx + 1; // One-indexed values are more human-readable.
101
            let (lis, info, guard) =
102
                listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
103
            debug!(
104
                "Listening at {} for {}",
105
                lis.local_addr()
106
                    .expect("general::listener without address?")
107
                    .display_lossy(),
108
                info.name,
109
            );
110
            listeners.push((lis, info));
111
            guards.push(guard);
112
        }
113
    }
114
    if listeners.is_empty() {
115
        info!("No RPC listeners configured.");
116
    }
117

            
118
    let streams = listeners.into_iter().map(|(listener, info)| {
119
        listener
120
            .incoming()
121
            .map(move |accept_result| match accept_result {
122
                Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
123
                Err(e) => Err(e),
124
            })
125
    });
126

            
127
    Ok((futures::stream::select_all(streams), guards))
128
}
129

            
130
/// Create an RPC manager, bind to connect points, and open a listener task to accept incoming
131
/// RPC connections.
132
pub(crate) async fn launch_rpc_mgr<R: Runtime>(
133
    runtime: &R,
134
    cfg: &RpcConfig,
135
    resolver: &CfgPathResolver,
136
    mistrust: &Mistrust,
137
    client: Arc<TorClient<R>>,
138
    launchable: Arc<LaunchableTorClient<R>>,
139
) -> Result<Option<RpcProxySupport>> {
140
    if !cfg.enable {
141
        return Ok(None);
142
    }
143
    let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
144

            
145
    let rpc_mgr = RpcMgr::new()?;
146
    // Register methods. Needed since TorClient is generic.
147
    //
148
    // TODO: If we accumulate a large number of generics like this, we should do this elsewhere.
149
    rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
150
    rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
151
    rpc_mgr.register_rpc_methods(RpcSuperuser::<R>::rpc_methods());
152

            
153
    let rt_clone = runtime.clone();
154
    let rpc_mgr_clone = rpc_mgr.clone();
155

            
156
    let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
157

            
158
    // TODO: Using spawn in this way makes it hard to report whether we
159
    // succeeded or not. This is something we should fix when we refactor
160
    // our service-launching code.
161
    runtime.spawn(async move {
162
        let result = run_rpc_listener(
163
            rt_clone,
164
            incoming,
165
            rpc_mgr_clone,
166
            client,
167
            launchable,
168
            rpc_state,
169
        )
170
        .await;
171
        if let Err(e) = result {
172
            tracing::warn!("RPC manager quit with an error: {}", e);
173
        }
174
        drop(guards);
175
    })?;
176
    Ok(Some(RpcProxySupport {
177
        rpc_mgr,
178
        rpc_state_sender,
179
    }))
180
}
181

            
182
/// Backend function to implement an RPC listener: runs in a loop.
183
async fn run_rpc_listener<R: Runtime>(
184
    runtime: R,
185
    mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
186
    rpc_mgr: Arc<RpcMgr>,
187
    client: Arc<TorClient<R>>,
188
    launchable: Arc<LaunchableTorClient<R>>,
189
    rpc_state: Arc<RpcVisibleArtiState>,
190
) -> Result<()> {
191
    while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
192
        debug!("Received incoming RPC connection from {}", &info.name);
193

            
194
        let client_clone = client.clone();
195
        let rpc_state_clone = rpc_state.clone();
196
        let launchable = launchable.clone();
197
        let connection = rpc_mgr.new_connection(info.auth.clone(), move |auth| {
198
            ArtiRpcSession::new(auth, &client_clone, &launchable, &rpc_state_clone, &info) as _
199
        });
200
        let (input, output) = stream.split();
201

            
202
        runtime.spawn(async {
203
            let result = connection.run(input, output).await;
204
            if let Err(e) = result {
205
                tracing::warn!("RPC session ended with an error: {}", e);
206
            }
207
        })?;
208
    }
209
    Ok(())
210
}
211

            
212
/// Information passed to a proxy or similar stream provider when running with RPC support.
213
pub(crate) struct RpcProxySupport {
214
    /// An RPC manager to use for looking up objects as possible stream targets.
215
    pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
216
    /// An RPCStateSender to use for registering the list of known proxy ports.
217
    pub(crate) rpc_state_sender: RpcStateSender,
218
}
219

            
220
#[cfg(test)]
221
mod test {
222
    // @@ begin test lint list maintained by maint/add_warning @@
223
    #![allow(clippy::bool_assert_comparison)]
224
    #![allow(clippy::clone_on_copy)]
225
    #![allow(clippy::dbg_macro)]
226
    #![allow(clippy::mixed_attributes_style)]
227
    #![allow(clippy::print_stderr)]
228
    #![allow(clippy::print_stdout)]
229
    #![allow(clippy::single_char_pattern)]
230
    #![allow(clippy::unwrap_used)]
231
    #![allow(clippy::unchecked_time_subtraction)]
232
    #![allow(clippy::useless_vec)]
233
    #![allow(clippy::needless_pass_by_value)]
234
    #![allow(clippy::string_slice)] // See arti#2571
235
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
236

            
237
    use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
238
    use tor_config_path::CfgPath;
239
    use tor_rpc_connect::ParsedConnectPoint;
240

            
241
    use super::*;
242

            
243
    #[test]
244
    fn rpc_method_names() {
245
        // We run this from a nice high level module, to ensure that as many method names as
246
        // possible will be in-scope.
247
        let problems = tor_rpcbase::check_method_names([]);
248

            
249
        for (m, err) in &problems {
250
            eprintln!("Bad method name {m:?}: {err}");
251
        }
252
        assert!(problems.is_empty());
253
    }
254

            
255
    #[test]
256
    fn parse_listener_defaults() {
257
        for string in listen_defaults_defaults() {
258
            let _parsed: ParsedConnectPoint = string.parse().unwrap();
259
        }
260
    }
261

            
262
    #[test]
263
    fn parsing_and_building() {
264
        fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
265
            let b: RpcConfigBuilder = toml::from_str(s)?;
266
            Ok(b.build()?)
267
        }
268

            
269
        let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
270
        user_defaults_builder.listener_options().enable(true);
271
        user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
272
        let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
273
        system_defaults_builder.listener_options().enable(false);
274
        system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
275

            
276
        // Make sure that an empty configuration gets us the defaults.
277
        let defaults = build("").unwrap();
278
        assert_eq!(
279
            defaults,
280
            RpcConfig {
281
                enable: false,
282
                listen: vec![
283
                    (
284
                        "user-default".to_string(),
285
                        user_defaults_builder.build().unwrap()
286
                    ),
287
                    (
288
                        "system-default".to_string(),
289
                        system_defaults_builder.build().unwrap()
290
                    ),
291
                ]
292
                .into_iter()
293
                .collect(),
294
                listen_default: listen_defaults_defaults()
295
            }
296
        );
297

            
298
        // Make sure that overriding specific options works as expected.
299
        let altered = build(
300
            r#"
301
[listen."user-default"]
302
enable = false
303
[listen."system-default"]
304
dir = "/usr/local/etc/arti-rpc/connect.d"
305
file_options = { "tmp.toml" = { "enable" = false } }
306
[listen."my-connpt"]
307
file = "/home/dante/.paradiso/connpt.toml"
308
"#,
309
        )
310
        .unwrap();
311
        let mut altered_user_defaults = user_defaults_builder.clone();
312
        altered_user_defaults.listener_options().enable(false);
313
        let mut altered_system_defaults = system_defaults_builder.clone();
314
        altered_system_defaults.dir(CfgPath::new(
315
            "/usr/local/etc/arti-rpc/connect.d".to_string(),
316
        ));
317
        let mut opt = ConnectPointOptionsBuilder::default();
318
        opt.enable(false);
319
        altered_system_defaults
320
            .file_options()
321
            .insert("tmp.toml".to_string(), opt);
322
        let mut my_connpt = RpcListenerSetConfigBuilder::default();
323
        my_connpt.file(CfgPath::new(
324
            "/home/dante/.paradiso/connpt.toml".to_string(),
325
        ));
326

            
327
        assert_eq!(
328
            altered,
329
            RpcConfig {
330
                enable: false,
331
                listen: vec![
332
                    (
333
                        "user-default".to_string(),
334
                        altered_user_defaults.build().unwrap()
335
                    ),
336
                    (
337
                        "system-default".to_string(),
338
                        altered_system_defaults.build().unwrap()
339
                    ),
340
                    ("my-connpt".to_string(), my_connpt.build().unwrap()),
341
                ]
342
                .into_iter()
343
                .collect(),
344
                listen_default: listen_defaults_defaults()
345
            }
346
        );
347
    }
348
}