1
//! Experimental RPC support.
2

            
3
use anyhow::Result;
4
use arti_rpcserver::RpcMgr;
5
use derive_deftly::Deftly;
6
use fs_mistrust::Mistrust;
7
use futures::{AsyncReadExt, stream::StreamExt};
8
use session::ArtiRpcSession;
9
use std::collections::BTreeMap;
10
use std::{io::Result as IoResult, sync::Arc};
11
use tor_config::derive::prelude::*;
12
use tor_config_path::CfgPathResolver;
13
use tracing::{debug, info};
14

            
15
use arti_client::TorClient;
16
use tor_rtcompat::{NetStreamListener as _, Runtime, SpawnExt, general};
17

            
18
pub(crate) mod conntarget;
19
pub(crate) mod listener;
20
mod proxyinfo;
21
mod session;
22
mod superuser;
23

            
24
use listener::RpcListenerSetConfig;
25
pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
26

            
27
use crate::rpc::superuser::RpcSuperuser;
28

            
29
/// Configuration for Arti's RPC subsystem.
30
///
31
/// You cannot change this section on a running Arti client.
32
#[derive(Debug, Clone, Deftly, Eq, PartialEq)]
33
#[derive_deftly(TorConfig)]
34
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
35
#[cfg_attr(feature = "experimental-api", deftly(tor_config(vis = "pub")))]
36
pub(crate) struct RpcConfig {
37
    /// If true, then the RPC subsystem is enabled and will listen for connections.
38
    #[deftly(tor_config(default = "false"))] // TODO RPC make this true once we are stable.
39
    enable: bool,
40

            
41
    /// A set of named locations in which to find connect files.
42
    #[deftly(tor_config(map, default = "listener::listener_map_defaults()"))]
43
    listen: BTreeMap<String, RpcListenerSetConfig>,
44

            
45
    /// A list of default connect points to bind
46
    /// if no enabled connect points are found under `listen`.
47
    #[deftly(tor_config(list(element(clone)), default = "listen_defaults_defaults()"))]
48
    listen_default: Vec<String>,
49
}
50

            
51
/// Return default values for `RpcConfig.listen_default`
52
396
fn listen_defaults_defaults() -> Vec<String> {
53
396
    vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
54
396
}
55

            
56
/// Information about an incoming connection.
57
///
58
/// Yielded in a stream from our RPC listeners.
59
type IncomingConn = (
60
    general::Stream,
61
    general::SocketAddr,
62
    Arc<listener::RpcConnInfo>,
63
);
64

            
65
/// Bind to all configured RPC listeners in `cfg`.
66
///
67
/// On success, return a stream of `IncomingConn`.
68
#[allow(clippy::cognitive_complexity)] // TODO: Refactor?
69
async fn launch_all_listeners<R: Runtime>(
70
    runtime: &R,
71
    cfg: &RpcConfig,
72
    resolver: &CfgPathResolver,
73
    mistrust: &Mistrust,
74
) -> anyhow::Result<(
75
    impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin + use<R>,
76
    Vec<tor_rpc_connect::server::ListenerGuard>,
77
)> {
78
    let mut listeners = Vec::new();
79
    let mut guards = Vec::new();
80
    for (name, listener_cfg) in cfg.listen.iter() {
81
        for (lis, info, guard) in listener_cfg
82
            .bind(runtime, name.as_str(), resolver, mistrust)
83
            .await?
84
        {
85
            // (Note that `bind` only returns enabled listeners, so we don't need to check here.
86
            debug!(
87
                "Listening at {} for {}",
88
                lis.local_addr()
89
                    .expect("general::listener without address?")
90
                    .display_lossy(),
91
                info.name,
92
            );
93
            listeners.push((lis, info));
94
            guards.push(guard);
95
        }
96
    }
97
    if listeners.is_empty() {
98
        for (idx, connpt) in cfg.listen_default.iter().enumerate() {
99
            let display_index = idx + 1; // One-indexed values are more human-readable.
100
            let (lis, info, guard) =
101
                listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
102
            debug!(
103
                "Listening at {} for {}",
104
                lis.local_addr()
105
                    .expect("general::listener without address?")
106
                    .display_lossy(),
107
                info.name,
108
            );
109
            listeners.push((lis, info));
110
            guards.push(guard);
111
        }
112
    }
113
    if listeners.is_empty() {
114
        info!("No RPC listeners configured.");
115
    }
116

            
117
    let streams = listeners.into_iter().map(|(listener, info)| {
118
        listener
119
            .incoming()
120
            .map(move |accept_result| match accept_result {
121
                Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
122
                Err(e) => Err(e),
123
            })
124
    });
125

            
126
    Ok((futures::stream::select_all(streams), guards))
127
}
128

            
129
/// Create an RPC manager, bind to connect points, and open a listener task to accept incoming
130
/// RPC connections.
131
pub(crate) async fn launch_rpc_mgr<R: Runtime>(
132
    runtime: &R,
133
    cfg: &RpcConfig,
134
    resolver: &CfgPathResolver,
135
    mistrust: &Mistrust,
136
    client: TorClient<R>,
137
) -> Result<Option<RpcProxySupport>> {
138
    if !cfg.enable {
139
        return Ok(None);
140
    }
141
    let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
142

            
143
    let rpc_mgr = RpcMgr::new()?;
144
    // Register methods. Needed since TorClient is generic.
145
    //
146
    // TODO: If we accumulate a large number of generics like this, we should do this elsewhere.
147
    rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
148
    rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
149
    rpc_mgr.register_rpc_methods(RpcSuperuser::<R>::rpc_methods());
150

            
151
    let rt_clone = runtime.clone();
152
    let rpc_mgr_clone = rpc_mgr.clone();
153

            
154
    let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
155

            
156
    // TODO: Using spawn in this way makes it hard to report whether we
157
    // succeeded or not. This is something we should fix when we refactor
158
    // our service-launching code.
159
    runtime.spawn(async move {
160
        let result = run_rpc_listener(rt_clone, incoming, rpc_mgr_clone, client, rpc_state).await;
161
        if let Err(e) = result {
162
            tracing::warn!("RPC manager quit with an error: {}", e);
163
        }
164
        drop(guards);
165
    })?;
166
    Ok(Some(RpcProxySupport {
167
        rpc_mgr,
168
        rpc_state_sender,
169
    }))
170
}
171

            
172
/// Backend function to implement an RPC listener: runs in a loop.
173
async fn run_rpc_listener<R: Runtime>(
174
    runtime: R,
175
    mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
176
    rpc_mgr: Arc<RpcMgr>,
177
    client: TorClient<R>,
178
    rpc_state: Arc<RpcVisibleArtiState>,
179
) -> Result<()> {
180
    while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
181
        debug!("Received incoming RPC connection from {}", &info.name);
182

            
183
        let client_clone = client.clone();
184
        let rpc_state_clone = rpc_state.clone();
185
        let connection = rpc_mgr.new_connection(info.auth.clone(), move |auth| {
186
            ArtiRpcSession::new(auth, &client_clone, &rpc_state_clone, &info) as _
187
        });
188
        let (input, output) = stream.split();
189

            
190
        runtime.spawn(async {
191
            let result = connection.run(input, output).await;
192
            if let Err(e) = result {
193
                tracing::warn!("RPC session ended with an error: {}", e);
194
            }
195
        })?;
196
    }
197
    Ok(())
198
}
199

            
200
/// Information passed to a proxy or similar stream provider when running with RPC support.
201
pub(crate) struct RpcProxySupport {
202
    /// An RPC manager to use for looking up objects as possible stream targets.
203
    pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
204
    /// An RPCStateSender to use for registering the list of known proxy ports.
205
    pub(crate) rpc_state_sender: RpcStateSender,
206
}
207

            
208
#[cfg(test)]
209
mod test {
210
    // @@ begin test lint list maintained by maint/add_warning @@
211
    #![allow(clippy::bool_assert_comparison)]
212
    #![allow(clippy::clone_on_copy)]
213
    #![allow(clippy::dbg_macro)]
214
    #![allow(clippy::mixed_attributes_style)]
215
    #![allow(clippy::print_stderr)]
216
    #![allow(clippy::print_stdout)]
217
    #![allow(clippy::single_char_pattern)]
218
    #![allow(clippy::unwrap_used)]
219
    #![allow(clippy::unchecked_time_subtraction)]
220
    #![allow(clippy::useless_vec)]
221
    #![allow(clippy::needless_pass_by_value)]
222
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
223

            
224
    use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
225
    use tor_config_path::CfgPath;
226
    use tor_rpc_connect::ParsedConnectPoint;
227

            
228
    use super::*;
229

            
230
    #[test]
231
    fn rpc_method_names() {
232
        // We run this from a nice high level module, to ensure that as many method names as
233
        // possible will be in-scope.
234
        let problems = tor_rpcbase::check_method_names([]);
235

            
236
        for (m, err) in &problems {
237
            eprintln!("Bad method name {m:?}: {err}");
238
        }
239
        assert!(problems.is_empty());
240
    }
241

            
242
    #[test]
243
    fn parse_listener_defaults() {
244
        for string in listen_defaults_defaults() {
245
            let _parsed: ParsedConnectPoint = string.parse().unwrap();
246
        }
247
    }
248

            
249
    #[test]
250
    fn parsing_and_building() {
251
        fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
252
            let b: RpcConfigBuilder = toml::from_str(s)?;
253
            Ok(b.build()?)
254
        }
255

            
256
        let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
257
        user_defaults_builder.listener_options().enable(true);
258
        user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
259
        let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
260
        system_defaults_builder.listener_options().enable(false);
261
        system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
262

            
263
        // Make sure that an empty configuration gets us the defaults.
264
        let defaults = build("").unwrap();
265
        assert_eq!(
266
            defaults,
267
            RpcConfig {
268
                enable: false,
269
                listen: vec![
270
                    (
271
                        "user-default".to_string(),
272
                        user_defaults_builder.build().unwrap()
273
                    ),
274
                    (
275
                        "system-default".to_string(),
276
                        system_defaults_builder.build().unwrap()
277
                    ),
278
                ]
279
                .into_iter()
280
                .collect(),
281
                listen_default: listen_defaults_defaults()
282
            }
283
        );
284

            
285
        // Make sure that overriding specific options works as expected.
286
        let altered = build(
287
            r#"
288
[listen."user-default"]
289
enable = false
290
[listen."system-default"]
291
dir = "/usr/local/etc/arti-rpc/connect.d"
292
file_options = { "tmp.toml" = { "enable" = false } }
293
[listen."my-connpt"]
294
file = "/home/dante/.paradiso/connpt.toml"
295
"#,
296
        )
297
        .unwrap();
298
        let mut altered_user_defaults = user_defaults_builder.clone();
299
        altered_user_defaults.listener_options().enable(false);
300
        let mut altered_system_defaults = system_defaults_builder.clone();
301
        altered_system_defaults.dir(CfgPath::new(
302
            "/usr/local/etc/arti-rpc/connect.d".to_string(),
303
        ));
304
        let mut opt = ConnectPointOptionsBuilder::default();
305
        opt.enable(false);
306
        altered_system_defaults
307
            .file_options()
308
            .insert("tmp.toml".to_string(), opt);
309
        let mut my_connpt = RpcListenerSetConfigBuilder::default();
310
        my_connpt.file(CfgPath::new(
311
            "/home/dante/.paradiso/connpt.toml".to_string(),
312
        ));
313

            
314
        assert_eq!(
315
            altered,
316
            RpcConfig {
317
                enable: false,
318
                listen: vec![
319
                    (
320
                        "user-default".to_string(),
321
                        altered_user_defaults.build().unwrap()
322
                    ),
323
                    (
324
                        "system-default".to_string(),
325
                        altered_system_defaults.build().unwrap()
326
                    ),
327
                    ("my-connpt".to_string(), my_connpt.build().unwrap()),
328
                ]
329
                .into_iter()
330
                .collect(),
331
                listen_default: listen_defaults_defaults()
332
            }
333
        );
334
    }
335
}