1
//! RPC connection support, mainloop, and protocol implementation.
2

            
3
pub(crate) mod auth;
4
mod methods;
5
use std::{
6
    collections::HashMap,
7
    io::Error as IoError,
8
    pin::Pin,
9
    sync::{Arc, Mutex, RwLock, Weak},
10
};
11

            
12
use asynchronous_codec::JsonCodecError;
13
use derive_deftly::Deftly;
14
use futures::{
15
    AsyncWriteExt as _, FutureExt, Sink, SinkExt as _, StreamExt,
16
    channel::mpsc,
17
    stream::{FusedStream, FuturesUnordered},
18
};
19
use rpc::dispatch::BoxedUpdateSink;
20
use serde_json::error::Category as JsonErrorCategory;
21
use tor_async_utils::{SinkExt as _, mpsc_channel_no_memquota};
22

            
23
use crate::{
24
    RpcMgr,
25
    cancel::{self, Cancel, CancelHandle},
26
    err::RequestParseError,
27
    globalid::{GlobalId, MacKey},
28
    msgs::{BoxedResponse, FlexibleRequest, ReqMeta, Request, RequestId, ResponseBody},
29
    objmap::{GenIdx, ObjMap},
30
};
31

            
32
use tor_rpcbase::templates::*;
33
use tor_rpcbase::{self as rpc, RpcError};
34

            
35
/// An open connection from an RPC client.
36
///
37
/// Tracks information that persists from one request to another.
38
///
39
/// The client might not have authenticated;
40
/// access and permissions control is handled via the capability system.
41
/// Specifically, the `objects` table in `Inner` hold capabilities
42
/// that the client will use to do things,
43
/// including an `RpcSession`.
44
///
45
/// # In the Arti RPC System
46
///
47
/// A connection to Arti.
48
///
49
/// This object is available as soon as you open a connection to Arti RPC,
50
/// even before you authenticate.  Its ObjectId is always `"connection"`.
51
///
52
/// Because this object is available before authentication,
53
/// it provides only those methods that you need
54
/// in order to perform authentication
55
/// and receive an `RpcSession`.
56
#[derive(Deftly)]
57
#[derive_deftly(Object)]
58
pub struct Connection {
59
    /// The mutable state of this connection.
60
    inner: Mutex<Inner>,
61

            
62
    /// Lookup table to find the implementations for methods
63
    /// based on RPC object and method types.
64
    ///
65
    /// **NOTE: observe the [Lock hierarchy](crate::mgr::Inner#lock-hierarchy)**
66
    dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
67

            
68
    /// A unique identifier for this connection.
69
    ///
70
    /// This kind of ID is used to refer to the connection from _outside_ of the
71
    /// context of an RPC connection: it can uniquely identify the connection
72
    /// from e.g. a SOCKS session so that clients can attach streams to it.
73
    connection_id: ConnectionId,
74

            
75
    /// A `MacKey` used to create `GlobalIds` for the objects whose identifiers
76
    /// need to exist outside this connection.
77
    global_id_mac_key: MacKey,
78

            
79
    /// A reference to the manager associated with this session.
80
    mgr: Weak<RpcMgr>,
81

            
82
    /// The authentication type that's required in order to get a session.
83
    require_auth: tor_rpc_connect::auth::RpcAuth,
84
}
85

            
86
/// The inner, lock-protected part of an RPC connection.
87
struct Inner {
88
    /// Map from request ID to handles; used when we need to cancel a request.
89
    //
90
    // TODO: We have two options here for handling colliding IDs.  We can either turn
91
    // this into a multimap, or we can declare that cancelling a request only
92
    // cancels the most recent request sent with that ID.
93
    inflight: HashMap<RequestId, Option<CancelHandle>>,
94

            
95
    /// An object map used to look up most objects by ID, and keep track of
96
    /// which objects are owned by this connection.
97
    objects: ObjMap,
98

            
99
    /// A reference to this connection itself.
100
    ///
101
    /// Used when we're looking up the connection within the RPC system as an object.
102
    ///
103
    /// TODO RPC: Maybe there is an easier way to do this while keeping `context` object-save?
104
    this_connection: Option<Weak<Connection>>,
105
}
106

            
107
/// How many updates can be pending, per connection, before they start to block?
108
const UPDATE_CHAN_SIZE: usize = 128;
109

            
110
/// A type-erased [`FusedStream`] yielding [`Request`]s.
111
//
112
// (We name this type and [`BoxedResponseSink`] below so as to keep the signature for run_loop
113
// nice and simple.)
114
pub(crate) type BoxedRequestStream = Pin<
115
    Box<dyn FusedStream<Item = Result<FlexibleRequest, asynchronous_codec::JsonCodecError>> + Send>,
116
>;
117

            
118
/// A type-erased [`Sink`] accepting [`BoxedResponse`]s.
119
pub(crate) type BoxedResponseSink =
120
    Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
121

            
122
/// A random value used to identify an connection.
123
#[derive(
124
    Copy,
125
    Clone,
126
    Debug,
127
    Eq,
128
    PartialEq,
129
    Hash,
130
    derive_more::From,
131
    derive_more::Into,
132
    derive_more::AsRef,
133
)]
134
pub(crate) struct ConnectionId([u8; 16]);
135

            
136
impl ConnectionId {
137
    /// The length of a ConnectionId.
138
    pub(crate) const LEN: usize = 16;
139
}
140

            
141
impl Connection {
142
    /// A special object ID that indicates the connection itself.
143
    ///
144
    /// On a fresh connection, this is the only ObjectId that exists.
145
    //
146
    // TODO: We might want to move responsibility for tracking this ID and its value into ObjMap.
147
    const CONNECTION_OBJ_ID: &'static str = "connection";
148

            
149
    /// Create a new connection.
150
    pub(crate) fn new(
151
        connection_id: ConnectionId,
152
        dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
153
        global_id_mac_key: MacKey,
154
        mgr: Weak<RpcMgr>,
155
        require_auth: tor_rpc_connect::auth::RpcAuth,
156
    ) -> Arc<Self> {
157
        Arc::new_cyclic(|this_connection| Self {
158
            inner: Mutex::new(Inner {
159
                inflight: HashMap::new(),
160
                objects: ObjMap::new(),
161
                this_connection: Some(Weak::clone(this_connection)),
162
            }),
163
            dispatch_table,
164
            connection_id,
165
            global_id_mac_key,
166
            mgr,
167
            require_auth,
168
        })
169
    }
170

            
171
    /// If possible, convert an `ObjectId` into a `GenIdx` that can be used in
172
    /// this connection's ObjMap.
173
    fn id_into_local_idx(&self, id: &rpc::ObjectId) -> Result<GenIdx, rpc::LookupError> {
174
        // Design note: It's not really necessary from a security POV to
175
        // check the MAC here; any possible GenIdx we return will either
176
        // refer to some object we're allowed to name in this session, or to
177
        // no object at all.  Still, we check anyway, since it shouldn't
178
        // hurt to do so.
179
        if let Some(global_id) = GlobalId::try_decode(&self.global_id_mac_key, id)? {
180
            // We have a GlobalId with a valid MAC. Let's make sure it applies
181
            // to this connection's ObjMap.  (We do not support referring to
182
            // anyone else's objects.)
183
            //
184
            // Design note: As above, this check is a protection against
185
            // accidental misuse, not a security feature: even if we removed
186
            // this check, we would still only allow objects that this session
187
            // is allowed to name.
188
            if global_id.connection == self.connection_id {
189
                Ok(global_id.local_id)
190
            } else {
191
                Err(rpc::LookupError::NoObject(id.clone()))
192
            }
193
        } else {
194
            // It's not a GlobalId; let's see if we can make sense of it as an
195
            // ObjMap index.
196
            Ok(GenIdx::try_decode(id)?)
197
        }
198
    }
199

            
200
    /// Look up a given object by its object ID relative to this connection.
201
    pub(crate) fn lookup_object(
202
        &self,
203
        id: &rpc::ObjectId,
204
    ) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
205
        if id.as_ref() == Self::CONNECTION_OBJ_ID {
206
            let this = self
207
                .inner
208
                .lock()
209
                .expect("lock poisoned")
210
                .this_connection
211
                .as_ref()
212
                .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?
213
                .upgrade()
214
                .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?;
215
            Ok(this as Arc<_>)
216
        } else {
217
            let local_id = self.id_into_local_idx(id)?;
218

            
219
            self.lookup_by_idx(local_id)
220
                .ok_or(rpc::LookupError::NoObject(id.clone()))
221
        }
222
    }
223

            
224
    /// As `lookup_object`, but expect a `GenIdx`.
225
    pub(crate) fn lookup_by_idx(&self, idx: crate::objmap::GenIdx) -> Option<Arc<dyn rpc::Object>> {
226
        let inner = self.inner.lock().expect("lock poisoned");
227
        inner.objects.lookup(idx)
228
    }
229

            
230
    /// Un-register the request `id` and stop tracking its information.
231
    fn remove_request(&self, id: &RequestId) {
232
        let mut inner = self.inner.lock().expect("lock poisoned");
233
        inner.inflight.remove(id);
234
    }
235

            
236
    /// Register the request `id` as a cancellable request.
237
    ///
238
    /// If `handle` is none, register it as an uncancellable request.
239
    fn register_request(&self, id: RequestId, handle: Option<CancelHandle>) {
240
        let mut inner = self.inner.lock().expect("lock poisoned");
241
        inner.inflight.insert(id, handle);
242
    }
243

            
244
    /// Try to cancel the request `id`.
245
    ///
246
    /// Return an error when `id` cannot be found, or cannot be cancelled.
247
    /// (These cases are indistinguishable.)
248
    fn cancel_request(&self, id: &RequestId) -> Result<(), CancelError> {
249
        let mut inner = self.inner.lock().expect("lock poisoned");
250
        match inner.inflight.remove(id) {
251
            Some(Some(handle)) => {
252
                drop(inner);
253
                handle.cancel()?;
254
                Ok(())
255
            }
256
            Some(None) => {
257
                // Put it back in case somebody tries again.
258
                inner.inflight.insert(id.clone(), None);
259
                Err(CancelError::CannotCancelRequest)
260
            }
261
            None => Err(CancelError::RequestNotFound),
262
        }
263
    }
264

            
265
    /// Run in a loop, decoding JSON requests from `input` and
266
    /// writing JSON responses onto `output`.
267
    pub async fn run<IN, OUT>(
268
        self: Arc<Self>,
269
        input: IN,
270
        mut output: OUT,
271
    ) -> Result<(), ConnectionError>
272
    where
273
        IN: futures::AsyncRead + Send + Sync + Unpin + 'static,
274
        OUT: futures::AsyncWrite + Send + Sync + Unpin + 'static,
275
    {
276
        /// Banner line to send, indicating that Arti is ready to receive requests.
277
        ///
278
        /// The key in this json object is mandatory; the value can be anything.
279
        const BANNER: &[u8] = b"{\"arti_rpc\":{}}\n";
280

            
281
        output
282
            .write_all(BANNER)
283
            .await
284
            .map_err(|e| ConnectionError::WriteFailed(Arc::new(e)))?;
285

            
286
        let write = Box::pin(asynchronous_codec::FramedWrite::new(
287
            output,
288
            crate::codecs::JsonLinesEncoder::<BoxedResponse>::default(),
289
        ));
290

            
291
        let read = Box::pin(
292
            asynchronous_codec::FramedRead::new(
293
                input,
294
                asynchronous_codec::JsonCodec::<(), FlexibleRequest>::new(),
295
            )
296
            .fuse(),
297
        );
298

            
299
        self.run_loop(read, write).await
300
    }
301

            
302
    /// Run in a loop, handling requests from `request_stream` and writing
303
    /// responses onto `response_stream`.
304
    ///
305
    /// After this returns, even if it returns `Ok(())`, the connection must no longer be used.
306
    pub(crate) async fn run_loop(
307
        self: Arc<Self>,
308
        mut request_stream: BoxedRequestStream,
309
        mut response_sink: BoxedResponseSink,
310
    ) -> Result<(), ConnectionError> {
311
        // This function will multiplex on three streams:
312
        // * `request_stream` -- a stream of incoming requests from the client.
313
        // * `finished_requests` -- a stream of requests that are done.
314
        // * `rx_response` -- a stream of updates and final responses sent from
315
        //   in-progress tasks. (We put updates and final responsese onto the
316
        //   same channel to ensure that they stay in-order for each method
317
        //   invocation.
318
        //
319
        // Note that the blocking behavior here is deliberate: We want _all_ of
320
        // these reads to start blocking when response_sink.send is blocked.
321

            
322
        // TODO RPC should this queue participate in memquota?
323
        let (tx_response, mut rx_response) =
324
            mpsc_channel_no_memquota::<BoxedResponse>(UPDATE_CHAN_SIZE);
325

            
326
        let mut finished_requests = FuturesUnordered::new();
327
        finished_requests.push(futures::future::pending().boxed());
328

            
329
        /// Helper: enforce an explicit "continue".
330
        struct Continue;
331

            
332
        // We create a separate async block here and immediately await it,
333
        // so that any internal `returns` and `?`s do not escape the function.
334
        let outcome = async {
335
            loop {
336
                let _: Continue = futures::select! {
337
                    r = finished_requests.next() => {
338
                        // A task is done, so we can forget about it.
339
                        let () = r.expect("Somehow, future::pending() terminated.");
340
                        Continue
341
                    }
342

            
343
                    r = rx_response.next() => {
344
                        // The future for some request has sent a response (success,
345
                        // failure, or update), so we can inform the client.
346
                        let update = r.expect("Somehow, tx_update got closed.");
347
                        // Calling `await` here (and below) is deliberate: we _want_
348
                        // to stop reading the client's requests if the client is
349
                        // not reading their responses (or not) reading them fast
350
                        // enough.
351
                        response_sink.send(update).await.map_err(ConnectionError::writing)?;
352
                        Continue
353
                    }
354

            
355
                    req = request_stream.next() => {
356
                        match req {
357
                            None => {
358
                                // We've reached the end of the stream of requests;
359
                                // time to close.
360
                                return Ok(());
361
                            }
362
                            Some(Err(e)) => {
363
                                // We got a non-recoverable error from the JSON codec.
364
                                return Err(ConnectionError::from_read_error(e));
365

            
366
                            }
367
                            Some(Ok(FlexibleRequest::Invalid(bad_req))) => {
368
                                // We decoded the request as Json, but not as a `Valid`` request.
369
                                // Send back a response indicating what was wrong with it.
370
                                let response = BoxedResponse::from_error(
371
                                    bad_req.id().cloned(), bad_req.error()
372
                                );
373
                                response_sink
374
                                    .send(response)
375
                                    .await
376
                                    .map_err( ConnectionError::writing)?;
377
                                if bad_req.id().is_none() {
378
                                    // The spec says we must close the connection in this case.
379
                                    return Err(bad_req.error().into());
380
                                }
381
                                Continue
382

            
383
                            }
384
                            Some(Ok(FlexibleRequest::Valid(req))) => {
385
                                // We have a request. Time to launch it!
386
                                let tx = tx_response.clone();
387
                                let fut = self.run_method_and_deliver_response(tx, req);
388
                                finished_requests.push(fut.boxed());
389
                                Continue
390
                            }
391
                        }
392
                    }
393
                };
394
            }
395
        }
396
        .await;
397

            
398
        match outcome {
399
            Err(e) if e.is_connection_close() => Ok(()),
400
            other => other,
401
        }
402
    }
403

            
404
    /// Invoke `request` and send all of its responses to `tx_response`.
405
    async fn run_method_and_deliver_response(
406
        self: &Arc<Self>,
407
        mut tx_response: mpsc::Sender<BoxedResponse>,
408
        request: Request,
409
    ) {
410
        let Request {
411
            id,
412
            obj,
413
            meta,
414
            method,
415
        } = request;
416

            
417
        let update_sender: BoxedUpdateSink = if meta.updates {
418
            let id_clone = id.clone();
419
            let sink =
420
                tx_response
421
                    .clone()
422
                    .with_fn(move |obj: Box<dyn erased_serde::Serialize + Send>| {
423
                        Result::<BoxedResponse, _>::Ok(BoxedResponse {
424
                            id: Some(id_clone.clone()),
425
                            body: ResponseBody::Update(obj),
426
                        })
427
                    });
428
            Box::pin(sink)
429
        } else {
430
            let sink = futures::sink::drain().sink_err_into();
431
            Box::pin(sink)
432
        };
433

            
434
        let is_cancellable = method.is_cancellable();
435

            
436
        // Create `run_method_lowlevel` future, and make it cancellable.
437
        let fut = self.run_method_lowlevel(update_sender, obj, method, meta);
438

            
439
        // Optionally register the future as cancellable.  Then run it to completion.
440
        let outcome = if is_cancellable {
441
            let (handle, fut) = Cancel::new(fut);
442
            self.register_request(id.clone(), Some(handle));
443
            fut.await
444
        } else {
445
            self.register_request(id.clone(), None);
446
            Ok(fut.await)
447
        };
448

            
449
        // Figure out how to respond.
450
        let body = match outcome {
451
            Ok(Ok(value)) => ResponseBody::Success(value),
452
            // TODO: If we're going to box this, let's do so earlier.
453
            Ok(Err(err)) => {
454
                if err.is_internal() {
455
                    tracing::warn!(
456
                        "Reporting an internal error on an RPC connection: {:?}",
457
                        err
458
                    );
459
                }
460
                ResponseBody::Error(Box::new(err))
461
            }
462
            Err(_cancelled) => ResponseBody::Error(Box::new(rpc::RpcError::from(RequestCancelled))),
463
        };
464

            
465
        // Send the response.
466
        //
467
        // (It's okay to ignore the error here, since it can only mean that the
468
        // RPC connection has closed.)
469
        let _ignore_err = tx_response
470
            .send(BoxedResponse {
471
                id: Some(id.clone()),
472
                body,
473
            })
474
            .await;
475

            
476
        // Unregister the request.
477
        //
478
        // TODO: This may unregister a different request if the user sent
479
        // in another request with the same ID.
480
        self.remove_request(&id);
481
    }
482

            
483
    /// Run a single method, and return its final response.
484
    ///
485
    /// If `tx_updates` is provided, and this method generates updates, it
486
    /// should send those updates on `tx_updates`
487
    ///
488
    /// Note that this function is able to send responses with IDs that do not
489
    /// match the original.  It should enforce correct IDs on whatever response
490
    /// it generates.
491
    async fn run_method_lowlevel(
492
        self: &Arc<Self>,
493
        tx_updates: rpc::dispatch::BoxedUpdateSink,
494
        obj_id: rpc::ObjectId,
495
        method: Box<dyn rpc::DeserMethod>,
496
        meta: ReqMeta,
497
    ) -> Result<Box<dyn erased_serde::Serialize + Send + 'static>, rpc::RpcError> {
498
        let obj = self.lookup_object(&obj_id)?;
499

            
500
        if !meta.require.is_empty() {
501
            // TODO RPC: Eventually, we will need a way to tell which "features" are actually
502
            // available.  But for now, we have no features, so if the require list is nonempty,
503
            // we can safely reject the request.
504
            return Err(MissingFeaturesError(meta.require).into());
505
        }
506

            
507
        let context: Arc<dyn rpc::Context> = self.clone() as Arc<_>;
508

            
509
        let invoke_future =
510
            rpc::invoke_rpc_method(context, &obj_id, obj, method.upcast_box(), tx_updates)?;
511

            
512
        // Note that we drop the read lock before we await this future!
513
        invoke_future.await
514
    }
515

            
516
    /// Try to get a strong reference to the RpcMgr for this connection, and
517
    /// return an error if we can't.
518
    pub(crate) fn mgr(&self) -> Result<Arc<RpcMgr>, MgrDisappearedError> {
519
        self.mgr
520
            .upgrade()
521
            .ok_or(MgrDisappearedError::RpcMgrDisappeared)
522
    }
523
}
524

            
525
/// An error returned when an RPC request lists some feature as required,
526
/// but we don't have every such feature.
527
#[derive(Clone, Debug, thiserror::Error)]
528
#[error("Required features not available")]
529
struct MissingFeaturesError(
530
    /// A list of the features that were requested but not available.
531
    Vec<String>,
532
);
533

            
534
impl From<MissingFeaturesError> for RpcError {
535
    fn from(err: MissingFeaturesError) -> Self {
536
        let mut e = RpcError::new(
537
            err.to_string(),
538
            tor_rpcbase::RpcErrorKind::FeatureNotPresent,
539
        );
540
        e.set_datum("rpc:unsupported_features".to_string(), err.0)
541
            .expect("invalid keyword");
542
        e
543
    }
544
}
545

            
546
/// A failure that results in closing a [`Connection`].
547
#[derive(Clone, Debug, thiserror::Error)]
548
#[non_exhaustive]
549
pub enum ConnectionError {
550
    /// Unable to write to our connection.
551
    #[error("Could not write to connection")]
552
    WriteFailed(#[source] Arc<IoError>),
553
    /// Read error from connection.
554
    #[error("Problem reading from connection")]
555
    ReadFailed(#[source] Arc<IoError>),
556
    /// Read something that we could not decode.
557
    #[error("Unable to decode request from connection")]
558
    DecodeFailed(#[source] Arc<serde_json::Error>),
559
    /// Unable to write our response as json.
560
    #[error("Unable to encode response onto connection")]
561
    EncodeFailed(#[source] Arc<serde_json::Error>),
562
    /// We encountered a problem when parsing a request that was (in our judgment)
563
    /// too severe to recover from.
564
    #[error("Unrecoverable problem from parsed request")]
565
    RequestParseFailed(#[from] RequestParseError),
566
}
567

            
568
impl ConnectionError {
569
    /// Construct a new `ConnectionError` from a `JsonCodecError` that has occurred while writing.
570
    fn writing(error: JsonCodecError) -> Self {
571
        match error {
572
            JsonCodecError::Io(e) => Self::WriteFailed(Arc::new(e)),
573
            JsonCodecError::Json(e) => Self::EncodeFailed(Arc::new(e)),
574
        }
575
    }
576

            
577
    /// Return true if this error is (or might be) due to the peer closing the connection.
578
    ///
579
    /// Such errors should be tolerated without much complaint;
580
    /// other errors should at least be logged somewhere.
581
    fn is_connection_close(&self) -> bool {
582
        use JsonErrorCategory as JK;
583
        use std::io::ErrorKind as IK;
584
        #[allow(clippy::match_like_matches_macro)]
585
        match self {
586
            Self::ReadFailed(e) | Self::WriteFailed(e) => match e.kind() {
587
                IK::UnexpectedEof | IK::ConnectionAborted | IK::BrokenPipe => true,
588
                _ => false,
589
            },
590
            Self::DecodeFailed(e) => match e.classify() {
591
                JK::Eof => true,
592
                _ => false,
593
            },
594
            _ => false,
595
        }
596
    }
597

            
598
    /// Construct a `ConnectionError` from a JsonCodecError that occurred while reading.
599
    fn from_read_error(error: JsonCodecError) -> Self {
600
        match error {
601
            JsonCodecError::Io(e) => Self::ReadFailed(Arc::new(e)),
602
            JsonCodecError::Json(e) => Self::DecodeFailed(Arc::new(e)),
603
        }
604
    }
605
}
606

            
607
/// A failure from trying to upgrade a `Weak<RpcMgr>`.
608
#[derive(Clone, Debug, thiserror::Error, serde::Serialize)]
609
pub(crate) enum MgrDisappearedError {
610
    /// We tried to upgrade our reference to the RpcMgr, and failed.
611
    #[error("RPC manager disappeared; Arti is shutting down?")]
612
    RpcMgrDisappeared,
613
}
614
impl tor_error::HasKind for MgrDisappearedError {
615
    fn kind(&self) -> tor_error::ErrorKind {
616
        tor_error::ErrorKind::ArtiShuttingDown
617
    }
618
}
619

            
620
impl rpc::Context for Connection {
621
    fn lookup_object(&self, id: &rpc::ObjectId) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
622
        Connection::lookup_object(self, id)
623
    }
624

            
625
    fn register_owned(&self, object: Arc<dyn rpc::Object>) -> rpc::ObjectId {
626
        let use_global_id = object.expose_outside_of_session();
627
        let local_id = self
628
            .inner
629
            .lock()
630
            .expect("Lock poisoned")
631
            .objects
632
            .insert_strong(object);
633

            
634
        // Design note: It is a deliberate decision to _always_ use GlobalId for
635
        // objects whose IDs are _ever_ exported for use in SOCKS requests.  Some
636
        // alternatives would be to use GlobalId conditionally, or to have a
637
        // separate Method to create a new GlobalId given an existing LocalId.
638
        if use_global_id {
639
            GlobalId::new(self.connection_id, local_id).encode(&self.global_id_mac_key)
640
        } else {
641
            local_id.encode()
642
        }
643
    }
644

            
645
    fn release_owned(&self, id: &rpc::ObjectId) -> Result<(), rpc::LookupError> {
646
        let removed_some = if id.as_ref() == Self::CONNECTION_OBJ_ID {
647
            self.inner
648
                .lock()
649
                .expect("Lock poisoned")
650
                .this_connection
651
                .take()
652
                .is_some()
653
        } else {
654
            let idx = self.id_into_local_idx(id)?;
655

            
656
            if !idx.is_strong() {
657
                return Err(rpc::LookupError::WrongType(id.clone()));
658
            }
659

            
660
            self.inner
661
                .lock()
662
                .expect("Lock poisoned")
663
                .objects
664
                .remove(idx)
665
                .is_some()
666
        };
667

            
668
        if removed_some {
669
            Ok(())
670
        } else {
671
            Err(rpc::LookupError::NoObject(id.clone()))
672
        }
673
    }
674

            
675
    fn dispatch_table(&self) -> &Arc<std::sync::RwLock<rpc::DispatchTable>> {
676
        &self.dispatch_table
677
    }
678
}
679

            
680
/// An error given when an RPC request is cancelled.
681
///
682
/// This is a separate type from [`crate::cancel::Cancelled`] since eventually
683
/// we want to move that type into a general-purpose location, and make it not
684
/// RPC-specific.
685
#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
686
#[error("RPC request was cancelled")]
687
pub(crate) struct RequestCancelled;
688

            
689
impl From<RequestCancelled> for RpcError {
690
2
    fn from(_: RequestCancelled) -> Self {
691
2
        RpcError::new(
692
2
            "Request cancelled".into(),
693
2
            rpc::RpcErrorKind::RequestCancelled,
694
        )
695
2
    }
696
}
697

            
698
/// An error given when we attempt to cancel an RPC request, but cannot.
699
///
700
#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
701
pub(crate) enum CancelError {
702
    /// We didn't find any request with the provided ID.
703
    ///
704
    /// Since we don't keep track of requests after they finish or are cancelled,
705
    /// we cannot distinguish the cases where a request has finished,
706
    /// where the request has been cancelled,
707
    /// or where the request never existed.
708
    /// Therefore we collapse them into a single error type.
709
    #[error("RPC request not found")]
710
    RequestNotFound,
711

            
712
    /// This kind of request cannot be cancelled.
713
    #[error("Uncancellable request")]
714
    CannotCancelRequest,
715

            
716
    /// We tried to cancel a request but found out it was already cancelled.
717
    ///
718
    /// This error should be impossible.
719
    #[error("Request somehow cancelled twice!")]
720
    AlreadyCancelled,
721
}
722

            
723
impl From<cancel::CannotCancel> for CancelError {
724
    fn from(value: cancel::CannotCancel) -> Self {
725
        use CancelError as CE;
726
        use cancel::CannotCancel as CC;
727
        match value {
728
            CC::Cancelled => CE::AlreadyCancelled,
729
            // We map "finished" to RequestNotFound since it is not in the general case
730
            // distinguishable from it; see documentation on RequestNotFound.
731
            CC::Finished => CE::RequestNotFound,
732
        }
733
    }
734
}
735

            
736
impl From<CancelError> for RpcError {
737
    fn from(err: CancelError) -> Self {
738
        use CancelError as CE;
739
        use rpc::RpcErrorKind as REK;
740
        let code = match err {
741
            CE::RequestNotFound => REK::RequestError,
742
            CE::CannotCancelRequest => REK::RequestError,
743
            CE::AlreadyCancelled => REK::InternalError,
744
        };
745
        RpcError::new(err.to_string(), code)
746
    }
747
}