1
//! Declare the RPC session object as exposed from the RPC server run by the `arti` crate.
2

            
3
use arti_client::TorClient;
4
use arti_rpcserver::RpcAuthentication;
5
use derive_deftly::Deftly;
6
use futures::stream::StreamExt as _;
7
use std::sync::Arc;
8
use tor_async_utils::{DropNotifyEofSignallable, DropNotifyWatchSender};
9
use tor_rpc_connect::SuperuserPermission;
10
use tor_rpcbase::{self as rpc};
11
use tor_rtcompat::Runtime;
12

            
13
use crate::{
14
    proxy::port_info,
15
    reload_cfg::LaunchableTorClient,
16
    rpc::{listener::RpcConnInfo, superuser::RpcSuperuser},
17
};
18

            
19
use super::proxyinfo::{self, ProxyInfo};
20

            
21
/// A top-level RPC session object.
22
///
23
/// This is the first object that an RPC user receives upon authenticating;
24
/// It is returned by `auth:authenticate`.
25
///
26
/// Other objects (`TorClient`,`RpcDataStream`, etc)
27
/// are available using methods on this object.
28
/// (See the list of available methods.)
29
///
30
/// This type wraps and delegates to [`arti_rpcserver::RpcSession`],
31
/// but exposes additional functionality not available at the
32
/// level of [`arti_rpcserver`], including information about configured proxies.
33
///
34
/// This ObjectID for this object can be used as the target of a SOCKS stream.
35
#[derive(Deftly)]
36
#[derive_deftly(rpc::Object)]
37
#[deftly(rpc(
38
    delegate_with = "|this: &Self| Some(this.session.clone())",
39
    delegate_type = "arti_rpcserver::RpcSession"
40
))]
41
#[deftly(rpc(expose_outside_of_session))]
42
pub(super) struct ArtiRpcSession {
43
    /// State about the `arti` server, as seen by the Rpc system.
44
    pub(super) arti_state: Arc<RpcVisibleArtiState>,
45
    /// The underlying RpcSession object that we delegate to.
46
    session: Arc<arti_rpcserver::RpcSession>,
47
}
48

            
49
/// Information about the current global top-level Arti state,
50
/// as exposed to an Rpc Session.
51
//
52
// TODO: This type is dangerously close to being a collection of globals.
53
// We should refactor it aggressively when we refactor the `arti` crate.
54
//
55
// TODO: Right now this is constructed in the same form that it's used in
56
// ArtiRpcSession.  Later on, we could split it into one type that
57
// the rest of this crate constructs, and another type that the
58
// ArtiRpcSession actually uses. We should do that if the needs seem to diverge.
59
pub(crate) struct RpcVisibleArtiState {
60
    /// A `ProxyInfo` that we hand out when asked to list our proxy ports.
61
    ///
62
    /// Right now it only lists Socks; in the future it may list more.
63
    proxy_info: postage::watch::Receiver<ProxyInfoState>,
64
}
65

            
66
/// Handle to set RPC state across RPC sessions.  (See `RpcVisibleArtiState`.)
67
#[derive(Debug)]
68
pub(crate) struct RpcStateSender {
69
    /// Sender for setting our list of proxy ports.
70
    proxy_info_sender: DropNotifyWatchSender<ProxyInfoState>,
71
}
72

            
73
impl ArtiRpcSession {
74
    /// Construct a new `ArtiRpcSession`.
75
    ///
76
    /// Privileges on the session (if any) are derived from `auth`, which describes
77
    /// how the user authenticated.
78
    ///
79
    /// The session receives a new isolated TorClient, based on `client_root`.
80
    pub(super) fn new<R: Runtime>(
81
        auth: &RpcAuthentication,
82
        client_root: &Arc<TorClient<R>>,
83
        launchable_client: &Arc<LaunchableTorClient<R>>,
84
        arti_state: &Arc<RpcVisibleArtiState>,
85
        listener_info: &RpcConnInfo,
86
    ) -> Arc<Self> {
87
        let _ = auth; // This is currently unused; any authentication gives the same result.
88
        let client = client_root.isolated_client();
89
        let session = arti_rpcserver::RpcSession::new_with_client(client);
90
        if listener_info.allow_superuser == SuperuserPermission::Allowed {
91
            session.provide_superuser_permission(Arc::new(RpcSuperuser::new(
92
                client_root.clone(),
93
                launchable_client.clone(),
94
            )) as _);
95
        }
96
        Arc::new(ArtiRpcSession {
97
            session,
98
            arti_state: arti_state.clone(),
99
        })
100
    }
101
}
102

            
103
/// Possible state for a watched proxy_info.
104
#[derive(Debug, Clone)]
105
enum ProxyInfoState {
106
    /// We haven't set it yet.
107
    Unset,
108
    /// We've set it to a given value.
109
    Set(Arc<ProxyInfo>),
110
    /// The sender has been dropped.
111
    Eof,
112
}
113

            
114
impl DropNotifyEofSignallable for ProxyInfoState {
115
4
    fn eof() -> Self {
116
4
        Self::Eof
117
4
    }
118
}
119

            
120
impl RpcVisibleArtiState {
121
    /// Construct a new `RpcVisibleArtiState`.
122
4
    pub(crate) fn new() -> (Arc<Self>, RpcStateSender) {
123
4
        let (proxy_info_sender, proxy_info) = postage::watch::channel_with(ProxyInfoState::Unset);
124
4
        let proxy_info_sender = DropNotifyWatchSender::new(proxy_info_sender);
125
4
        (
126
4
            Arc::new(Self { proxy_info }),
127
4
            RpcStateSender { proxy_info_sender },
128
4
        )
129
4
    }
130

            
131
    /// Return the latest proxy info, waiting until it is set.
132
    ///
133
    /// Return an error if the sender has been closed.
134
12
    pub(super) async fn get_proxy_info(&self) -> Result<Arc<ProxyInfo>, ()> {
135
8
        let mut proxy_info = self.proxy_info.clone();
136
12
        while let Some(v) = proxy_info.next().await {
137
12
            match v {
138
4
                ProxyInfoState::Unset => {
139
4
                    // Not yet set, try again.
140
4
                }
141
8
                ProxyInfoState::Set(proxyinfo) => return Ok(Arc::clone(&proxyinfo)),
142
                ProxyInfoState::Eof => return Err(()),
143
            }
144
        }
145
        Err(())
146
8
    }
147
}
148

            
149
impl RpcStateSender {
150
    /// Set the list of stream listener addresses on this state.
151
    ///
152
    /// This method may only be called once per state.
153
4
    pub(crate) fn set_stream_listeners(&mut self, ports: &[port_info::Port]) {
154
4
        let info = ProxyInfo {
155
4
            proxies: ports
156
4
                .iter()
157
6
                .filter_map(|port| {
158
                    Some(proxyinfo::Proxy {
159
4
                        listener: proxyinfo::ProxyListener::try_from_portinfo(port)?,
160
                    })
161
4
                })
162
4
                .collect(),
163
        };
164
4
        *self.proxy_info_sender.borrow_mut() = ProxyInfoState::Set(Arc::new(info));
165
4
    }
166
}
167

            
168
#[cfg(test)]
169
mod test {
170
    // @@ begin test lint list maintained by maint/add_warning @@
171
    #![allow(clippy::bool_assert_comparison)]
172
    #![allow(clippy::clone_on_copy)]
173
    #![allow(clippy::dbg_macro)]
174
    #![allow(clippy::mixed_attributes_style)]
175
    #![allow(clippy::print_stderr)]
176
    #![allow(clippy::print_stdout)]
177
    #![allow(clippy::single_char_pattern)]
178
    #![allow(clippy::unwrap_used)]
179
    #![allow(clippy::unchecked_time_subtraction)]
180
    #![allow(clippy::useless_vec)]
181
    #![allow(clippy::needless_pass_by_value)]
182
    #![allow(clippy::string_slice)] // See arti#2571
183
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
184

            
185
    use tor_rtcompat::SpawnExt as _;
186
    use tor_rtmock::MockRuntime;
187

            
188
    use super::*;
189

            
190
    #[test]
191
    fn set_proxy_info() {
192
        MockRuntime::test_with_various(|rt| async move {
193
            let (state, mut sender) = RpcVisibleArtiState::new();
194
            let _task = rt.clone().spawn_with_handle(async move {
195
                sender.set_stream_listeners(&[port_info::Port {
196
                    protocol: port_info::SupportedProtocol::Socks,
197
                    address: "8.8.8.8:40".parse().unwrap(),
198
                }]);
199
                sender // keep sender alive
200
            });
201

            
202
            let value = state.get_proxy_info().await;
203

            
204
            // At this point, we've returned once, so this will test that we get a fresh answer even
205
            // if we already set the inner value.
206
            let value_again = state.get_proxy_info().await;
207
            assert_eq!(value.unwrap(), value_again.unwrap());
208
        });
209
    }
210
}