1
//! Experimental RPC support.
2

            
3
use anyhow::Result;
4
use arti_rpcserver::RpcMgr;
5
use derive_deftly::Deftly;
6
use fs_mistrust::Mistrust;
7
use futures::{AsyncReadExt, stream::StreamExt};
8
use session::ArtiRpcSession;
9
use std::collections::BTreeMap;
10
use std::{io::Result as IoResult, sync::Arc};
11
use tor_config::derive::prelude::*;
12
use tor_config_path::CfgPathResolver;
13
use tracing::{debug, info};
14

            
15
use arti_client::TorClient;
16
use tor_rtcompat::{NetStreamListener as _, Runtime, SpawnExt, general};
17

            
18
pub(crate) mod conntarget;
19
pub(crate) mod listener;
20
mod proxyinfo;
21
mod session;
22

            
23
use listener::RpcListenerSetConfig;
24
pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
25

            
26
/// Configuration for Arti's RPC subsystem.
27
///
28
/// You cannot change this section on a running Arti client.
29
#[derive(Debug, Clone, Deftly, Eq, PartialEq)]
30
#[derive_deftly(TorConfig)]
31
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
32
#[cfg_attr(feature = "experimental-api", deftly(tor_config(vis = "pub")))]
33
pub(crate) struct RpcConfig {
34
    /// If true, then the RPC subsystem is enabled and will listen for connections.
35
    #[deftly(tor_config(default = "false"))] // TODO RPC make this true once we are stable.
36
    enable: bool,
37

            
38
    /// A set of named locations in which to find connect files.
39
    #[deftly(tor_config(map, default = "listener::listener_map_defaults()"))]
40
    listen: BTreeMap<String, RpcListenerSetConfig>,
41

            
42
    /// A list of default connect points to bind
43
    /// if no enabled connect points are found under `listen`.
44
    #[deftly(tor_config(list(element(clone)), default = "listen_defaults_defaults()"))]
45
    listen_default: Vec<String>,
46
}
47

            
48
/// Return default values for `RpcConfig.listen_default`
49
396
fn listen_defaults_defaults() -> Vec<String> {
50
396
    vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
51
396
}
52

            
53
/// Information about an incoming connection.
54
///
55
/// Yielded in a stream from our RPC listeners.
56
type IncomingConn = (
57
    general::Stream,
58
    general::SocketAddr,
59
    Arc<listener::RpcConnInfo>,
60
);
61

            
62
/// Bind to all configured RPC listeners in `cfg`.
63
///
64
/// On success, return a stream of `IncomingConn`.
65
#[allow(clippy::cognitive_complexity)] // TODO: Refactor?
66
async fn launch_all_listeners<R: Runtime>(
67
    runtime: &R,
68
    cfg: &RpcConfig,
69
    resolver: &CfgPathResolver,
70
    mistrust: &Mistrust,
71
) -> anyhow::Result<(
72
    impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin + use<R>,
73
    Vec<tor_rpc_connect::server::ListenerGuard>,
74
)> {
75
    let mut listeners = Vec::new();
76
    let mut guards = Vec::new();
77
    for (name, listener_cfg) in cfg.listen.iter() {
78
        for (lis, info, guard) in listener_cfg
79
            .bind(runtime, name.as_str(), resolver, mistrust)
80
            .await?
81
        {
82
            // (Note that `bind` only returns enabled listeners, so we don't need to check here.
83
            debug!(
84
                "Listening at {} for {}",
85
                lis.local_addr()
86
                    .expect("general::listener without address?")
87
                    .display_lossy(),
88
                info.name,
89
            );
90
            listeners.push((lis, info));
91
            guards.push(guard);
92
        }
93
    }
94
    if listeners.is_empty() {
95
        for (idx, connpt) in cfg.listen_default.iter().enumerate() {
96
            let display_index = idx + 1; // One-indexed values are more human-readable.
97
            let (lis, info, guard) =
98
                listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
99
            debug!(
100
                "Listening at {} for {}",
101
                lis.local_addr()
102
                    .expect("general::listener without address?")
103
                    .display_lossy(),
104
                info.name,
105
            );
106
            listeners.push((lis, info));
107
            guards.push(guard);
108
        }
109
    }
110
    if listeners.is_empty() {
111
        info!("No RPC listeners configured.");
112
    }
113

            
114
    let streams = listeners.into_iter().map(|(listener, info)| {
115
        listener
116
            .incoming()
117
            .map(move |accept_result| match accept_result {
118
                Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
119
                Err(e) => Err(e),
120
            })
121
    });
122

            
123
    Ok((futures::stream::select_all(streams), guards))
124
}
125

            
126
/// Create an RPC manager, bind to connect points, and open a listener task to accept incoming
127
/// RPC connections.
128
pub(crate) async fn launch_rpc_mgr<R: Runtime>(
129
    runtime: &R,
130
    cfg: &RpcConfig,
131
    resolver: &CfgPathResolver,
132
    mistrust: &Mistrust,
133
    client: TorClient<R>,
134
) -> Result<Option<RpcProxySupport>> {
135
    if !cfg.enable {
136
        return Ok(None);
137
    }
138
    let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
139

            
140
    let rpc_mgr = RpcMgr::new(move |auth| ArtiRpcSession::new(auth, &client, &rpc_state))?;
141
    // Register methods. Needed since TorClient is generic.
142
    //
143
    // TODO: If we accumulate a large number of generics like this, we should do this elsewhere.
144
    rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
145
    rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
146

            
147
    let rt_clone = runtime.clone();
148
    let rpc_mgr_clone = rpc_mgr.clone();
149

            
150
    let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
151

            
152
    // TODO: Using spawn in this way makes it hard to report whether we
153
    // succeeded or not. This is something we should fix when we refactor
154
    // our service-launching code.
155
    runtime.spawn(async move {
156
        let result = run_rpc_listener(rt_clone, incoming, rpc_mgr_clone).await;
157
        if let Err(e) = result {
158
            tracing::warn!("RPC manager quit with an error: {}", e);
159
        }
160
        drop(guards);
161
    })?;
162
    Ok(Some(RpcProxySupport {
163
        rpc_mgr,
164
        rpc_state_sender,
165
    }))
166
}
167

            
168
/// Backend function to implement an RPC listener: runs in a loop.
169
async fn run_rpc_listener<R: Runtime>(
170
    runtime: R,
171
    mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
172
    rpc_mgr: Arc<RpcMgr>,
173
) -> Result<()> {
174
    while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
175
        debug!("Received incoming RPC connection from {}", &info.name);
176

            
177
        let connection = rpc_mgr.new_connection(info.auth.clone());
178
        let (input, output) = stream.split();
179

            
180
        runtime.spawn(async {
181
            let result = connection.run(input, output).await;
182
            if let Err(e) = result {
183
                tracing::warn!("RPC session ended with an error: {}", e);
184
            }
185
        })?;
186
    }
187
    Ok(())
188
}
189

            
190
/// Information passed to a proxy or similar stream provider when running with RPC support.
191
pub(crate) struct RpcProxySupport {
192
    /// An RPC manager to use for looking up objects as possible stream targets.
193
    pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
194
    /// An RPCStateSender to use for registering the list of known proxy ports.
195
    pub(crate) rpc_state_sender: RpcStateSender,
196
}
197

            
198
#[cfg(test)]
199
mod test {
200
    // @@ begin test lint list maintained by maint/add_warning @@
201
    #![allow(clippy::bool_assert_comparison)]
202
    #![allow(clippy::clone_on_copy)]
203
    #![allow(clippy::dbg_macro)]
204
    #![allow(clippy::mixed_attributes_style)]
205
    #![allow(clippy::print_stderr)]
206
    #![allow(clippy::print_stdout)]
207
    #![allow(clippy::single_char_pattern)]
208
    #![allow(clippy::unwrap_used)]
209
    #![allow(clippy::unchecked_time_subtraction)]
210
    #![allow(clippy::useless_vec)]
211
    #![allow(clippy::needless_pass_by_value)]
212
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
213

            
214
    use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
215
    use tor_config_path::CfgPath;
216
    use tor_rpc_connect::ParsedConnectPoint;
217

            
218
    use super::*;
219

            
220
    #[test]
221
    fn rpc_method_names() {
222
        // We run this from a nice high level module, to ensure that as many method names as
223
        // possible will be in-scope.
224
        let problems = tor_rpcbase::check_method_names([]);
225

            
226
        for (m, err) in &problems {
227
            eprintln!("Bad method name {m:?}: {err}");
228
        }
229
        assert!(problems.is_empty());
230
    }
231

            
232
    #[test]
233
    fn parse_listener_defaults() {
234
        for string in listen_defaults_defaults() {
235
            let _parsed: ParsedConnectPoint = string.parse().unwrap();
236
        }
237
    }
238

            
239
    #[test]
240
    fn parsing_and_building() {
241
        fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
242
            let b: RpcConfigBuilder = toml::from_str(s)?;
243
            Ok(b.build()?)
244
        }
245

            
246
        let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
247
        user_defaults_builder.listener_options().enable(true);
248
        user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
249
        let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
250
        system_defaults_builder.listener_options().enable(false);
251
        system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
252

            
253
        // Make sure that an empty configuration gets us the defaults.
254
        let defaults = build("").unwrap();
255
        assert_eq!(
256
            defaults,
257
            RpcConfig {
258
                enable: false,
259
                listen: vec![
260
                    (
261
                        "user-default".to_string(),
262
                        user_defaults_builder.build().unwrap()
263
                    ),
264
                    (
265
                        "system-default".to_string(),
266
                        system_defaults_builder.build().unwrap()
267
                    ),
268
                ]
269
                .into_iter()
270
                .collect(),
271
                listen_default: listen_defaults_defaults()
272
            }
273
        );
274

            
275
        // Make sure that overriding specific options works as expected.
276
        let altered = build(
277
            r#"
278
[listen."user-default"]
279
enable = false
280
[listen."system-default"]
281
dir = "/usr/local/etc/arti-rpc/connect.d"
282
file_options = { "tmp.toml" = { "enable" = false } }
283
[listen."my-connpt"]
284
file = "/home/dante/.paradiso/connpt.toml"
285
"#,
286
        )
287
        .unwrap();
288
        let mut altered_user_defaults = user_defaults_builder.clone();
289
        altered_user_defaults.listener_options().enable(false);
290
        let mut altered_system_defaults = system_defaults_builder.clone();
291
        altered_system_defaults.dir(CfgPath::new(
292
            "/usr/local/etc/arti-rpc/connect.d".to_string(),
293
        ));
294
        let mut opt = ConnectPointOptionsBuilder::default();
295
        opt.enable(false);
296
        altered_system_defaults
297
            .file_options()
298
            .insert("tmp.toml".to_string(), opt);
299
        let mut my_connpt = RpcListenerSetConfigBuilder::default();
300
        my_connpt.file(CfgPath::new(
301
            "/home/dante/.paradiso/connpt.toml".to_string(),
302
        ));
303

            
304
        assert_eq!(
305
            altered,
306
            RpcConfig {
307
                enable: false,
308
                listen: vec![
309
                    (
310
                        "user-default".to_string(),
311
                        altered_user_defaults.build().unwrap()
312
                    ),
313
                    (
314
                        "system-default".to_string(),
315
                        altered_system_defaults.build().unwrap()
316
                    ),
317
                    ("my-connpt".to_string(), my_connpt.build().unwrap()),
318
                ]
319
                .into_iter()
320
                .collect(),
321
                listen_default: listen_defaults_defaults()
322
            }
323
        );
324
    }
325
}