1
//! RPC connection support, mainloop, and protocol implementation.
2

            
3
pub(crate) mod auth;
4
mod methods;
5
use std::{
6
    collections::HashMap,
7
    io::Error as IoError,
8
    pin::Pin,
9
    sync::{Arc, Mutex, RwLock, Weak},
10
};
11

            
12
use asynchronous_codec::JsonCodecError;
13
use derive_deftly::Deftly;
14
use futures::{
15
    AsyncWriteExt as _, FutureExt, Sink, SinkExt as _, StreamExt,
16
    channel::mpsc,
17
    stream::{FusedStream, FuturesUnordered},
18
};
19
use rpc::dispatch::BoxedUpdateSink;
20
use serde_json::error::Category as JsonErrorCategory;
21
use tor_async_utils::{SinkExt as _, mpsc_channel_no_memquota};
22

            
23
use crate::{
24
    RpcAuthentication,
25
    cancel::{self, Cancel, CancelHandle},
26
    err::RequestParseError,
27
    globalid::{GlobalId, MacKey},
28
    msgs::{BoxedResponse, FlexibleRequest, ReqMeta, Request, RequestId, ResponseBody},
29
    objmap::{GenIdx, ObjMap},
30
};
31

            
32
use tor_rpcbase::templates::*;
33
use tor_rpcbase::{self as rpc, RpcError};
34

            
35
/// A function we use to construct Session objects in response to authentication.
36
//
37
// TODO RPC: Perhaps this should return a Result?
38
type SessionFactory = Box<dyn Fn(&RpcAuthentication) -> Arc<dyn rpc::Object> + Send + Sync>;
39

            
40
/// An open connection from an RPC client.
41
///
42
/// Tracks information that persists from one request to another.
43
///
44
/// The client might not have authenticated;
45
/// access and permissions control is handled via the capability system.
46
/// Specifically, the `objects` table in `Inner` hold capabilities
47
/// that the client will use to do things,
48
/// including an `RpcSession`.
49
///
50
/// # In the Arti RPC System
51
///
52
/// A connection to Arti.
53
///
54
/// This object is available as soon as you open a connection to Arti RPC,
55
/// even before you authenticate.  Its ObjectId is always `"connection"`.
56
///
57
/// Because this object is available before authentication,
58
/// it provides only those methods that you need
59
/// in order to perform authentication
60
/// and receive an `RpcSession`.
61
///
62
/// Note that a connection can only be authenticated once:
63
/// If you drop the `RpcSession` returned by authenticating,
64
/// you cannot get another one on the same connection.
65
#[derive(Deftly)]
66
#[derive_deftly(Object)]
67
pub struct Connection {
68
    /// The mutable state of this connection.
69
    inner: Mutex<Inner>,
70

            
71
    /// Lookup table to find the implementations for methods
72
    /// based on RPC object and method types.
73
    ///
74
    /// **NOTE: observe the [Lock hierarchy](crate::mgr::Inner#lock-hierarchy)**
75
    dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
76

            
77
    /// A unique identifier for this connection.
78
    ///
79
    /// This kind of ID is used to refer to the connection from _outside_ of the
80
    /// context of an RPC connection: it can uniquely identify the connection
81
    /// from e.g. a SOCKS session so that clients can attach streams to it.
82
    connection_id: ConnectionId,
83

            
84
    /// A `MacKey` used to create `GlobalIds` for the objects whose identifiers
85
    /// need to exist outside this connection.
86
    global_id_mac_key: MacKey,
87

            
88
    /// The authentication type that's required in order to get a session.
89
    require_auth: tor_rpc_connect::auth::RpcAuth,
90
}
91

            
92
/// The inner, lock-protected part of an RPC connection.
93
struct Inner {
94
    /// Map from request ID to handles; used when we need to cancel a request.
95
    //
96
    // TODO: We have two options here for handling colliding IDs.  We can either turn
97
    // this into a multimap, or we can declare that cancelling a request only
98
    // cancels the most recent request sent with that ID.
99
    inflight: HashMap<RequestId, Option<CancelHandle>>,
100

            
101
    /// An object map used to look up most objects by ID, and keep track of
102
    /// which objects are owned by this connection.
103
    objects: ObjMap,
104

            
105
    /// A reference to this connection itself.
106
    ///
107
    /// Used when we're looking up the connection within the RPC system as an object.
108
    ///
109
    /// TODO RPC: Maybe there is an easier way to do this while keeping `context` object-save?
110
    this_connection: Option<Weak<Connection>>,
111

            
112
    /// A SessionFactory that will be used to create a session if authentication is successful.
113
    ///
114
    /// This is None if the connection has already been authenticated.
115
    session_factory: Option<SessionFactory>,
116
}
117

            
118
/// How many updates can be pending, per connection, before they start to block?
119
const UPDATE_CHAN_SIZE: usize = 128;
120

            
121
/// A type-erased [`FusedStream`] yielding [`Request`]s.
122
//
123
// (We name this type and [`BoxedResponseSink`] below so as to keep the signature for run_loop
124
// nice and simple.)
125
pub(crate) type BoxedRequestStream = Pin<
126
    Box<dyn FusedStream<Item = Result<FlexibleRequest, asynchronous_codec::JsonCodecError>> + Send>,
127
>;
128

            
129
/// A type-erased [`Sink`] accepting [`BoxedResponse`]s.
130
pub(crate) type BoxedResponseSink =
131
    Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
132

            
133
/// A random value used to identify an connection.
134
#[derive(
135
    Copy,
136
    Clone,
137
    Debug,
138
    Eq,
139
    PartialEq,
140
    Hash,
141
    derive_more::From,
142
    derive_more::Into,
143
    derive_more::AsRef,
144
)]
145
pub(crate) struct ConnectionId([u8; 16]);
146

            
147
impl ConnectionId {
148
    /// The length of a ConnectionId.
149
    pub(crate) const LEN: usize = 16;
150
}
151

            
152
impl Connection {
153
    /// A special object ID that indicates the connection itself.
154
    ///
155
    /// On a fresh connection, this is the only ObjectId that exists.
156
    //
157
    // TODO: We might want to move responsibility for tracking this ID and its value into ObjMap.
158
    const CONNECTION_OBJ_ID: &'static str = "connection";
159

            
160
    /// Create a new connection.
161
    pub(crate) fn new(
162
        connection_id: ConnectionId,
163
        dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
164
        global_id_mac_key: MacKey,
165
        require_auth: tor_rpc_connect::auth::RpcAuth,
166
        session_factory: SessionFactory,
167
    ) -> Arc<Self> {
168
        Arc::new_cyclic(|this_connection| Self {
169
            inner: Mutex::new(Inner {
170
                inflight: HashMap::new(),
171
                objects: ObjMap::new(),
172
                this_connection: Some(Weak::clone(this_connection)),
173
                session_factory: Some(session_factory),
174
            }),
175
            dispatch_table,
176
            connection_id,
177
            global_id_mac_key,
178
            require_auth,
179
        })
180
    }
181

            
182
    /// Construct a new object to serve as the `session` for a connection.
183
    pub(crate) fn create_session(
184
        &self,
185
        auth: &RpcAuthentication,
186
    ) -> Result<Arc<dyn rpc::Object>, RpcError> {
187
        let mut inner = self.inner.lock().expect("lock poisoned");
188
        let session_factory = inner.session_factory.take().ok_or_else(|| {
189
            RpcError::new(
190
                "Cannot authenticate the same connection twice".into(),
191
                rpc::RpcErrorKind::RequestError,
192
            )
193
        })?;
194
        Ok((session_factory)(auth))
195
    }
196

            
197
    /// If possible, convert an `ObjectId` into a `GenIdx` that can be used in
198
    /// this connection's ObjMap.
199
    fn id_into_local_idx(&self, id: &rpc::ObjectId) -> Result<GenIdx, rpc::LookupError> {
200
        // Design note: It's not really necessary from a security POV to
201
        // check the MAC here; any possible GenIdx we return will either
202
        // refer to some object we're allowed to name in this session, or to
203
        // no object at all.  Still, we check anyway, since it shouldn't
204
        // hurt to do so.
205
        if let Some(global_id) = GlobalId::try_decode(&self.global_id_mac_key, id)? {
206
            // We have a GlobalId with a valid MAC. Let's make sure it applies
207
            // to this connection's ObjMap.  (We do not support referring to
208
            // anyone else's objects.)
209
            //
210
            // Design note: As above, this check is a protection against
211
            // accidental misuse, not a security feature: even if we removed
212
            // this check, we would still only allow objects that this session
213
            // is allowed to name.
214
            if global_id.connection == self.connection_id {
215
                Ok(global_id.local_id)
216
            } else {
217
                Err(rpc::LookupError::NoObject(id.clone()))
218
            }
219
        } else {
220
            // It's not a GlobalId; let's see if we can make sense of it as an
221
            // ObjMap index.
222
            Ok(GenIdx::try_decode(id)?)
223
        }
224
    }
225

            
226
    /// Look up a given object by its object ID relative to this connection.
227
    pub(crate) fn lookup_object(
228
        &self,
229
        id: &rpc::ObjectId,
230
    ) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
231
        if id.as_ref() == Self::CONNECTION_OBJ_ID {
232
            let this = self
233
                .inner
234
                .lock()
235
                .expect("lock poisoned")
236
                .this_connection
237
                .as_ref()
238
                .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?
239
                .upgrade()
240
                .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?;
241
            Ok(this as Arc<_>)
242
        } else {
243
            let local_id = self.id_into_local_idx(id)?;
244

            
245
            self.lookup_by_idx(local_id)
246
                .ok_or(rpc::LookupError::NoObject(id.clone()))
247
        }
248
    }
249

            
250
    /// As `lookup_object`, but expect a `GenIdx`.
251
    pub(crate) fn lookup_by_idx(&self, idx: crate::objmap::GenIdx) -> Option<Arc<dyn rpc::Object>> {
252
        let inner = self.inner.lock().expect("lock poisoned");
253
        inner.objects.lookup(idx)
254
    }
255

            
256
    /// Un-register the request `id` and stop tracking its information.
257
    fn remove_request(&self, id: &RequestId) {
258
        let mut inner = self.inner.lock().expect("lock poisoned");
259
        inner.inflight.remove(id);
260
    }
261

            
262
    /// Register the request `id` as a cancellable request.
263
    ///
264
    /// If `handle` is none, register it as an uncancellable request.
265
    fn register_request(&self, id: RequestId, handle: Option<CancelHandle>) {
266
        let mut inner = self.inner.lock().expect("lock poisoned");
267
        inner.inflight.insert(id, handle);
268
    }
269

            
270
    /// Try to cancel the request `id`.
271
    ///
272
    /// Return an error when `id` cannot be found, or cannot be cancelled.
273
    /// (These cases are indistinguishable.)
274
    fn cancel_request(&self, id: &RequestId) -> Result<(), CancelError> {
275
        let mut inner = self.inner.lock().expect("lock poisoned");
276
        match inner.inflight.remove(id) {
277
            Some(Some(handle)) => {
278
                drop(inner);
279
                handle.cancel()?;
280
                Ok(())
281
            }
282
            Some(None) => {
283
                // Put it back in case somebody tries again.
284
                inner.inflight.insert(id.clone(), None);
285
                Err(CancelError::CannotCancelRequest)
286
            }
287
            None => Err(CancelError::RequestNotFound),
288
        }
289
    }
290

            
291
    /// Run in a loop, decoding JSON requests from `input` and
292
    /// writing JSON responses onto `output`.
293
    pub async fn run<IN, OUT>(
294
        self: Arc<Self>,
295
        input: IN,
296
        mut output: OUT,
297
    ) -> Result<(), ConnectionError>
298
    where
299
        IN: futures::AsyncRead + Send + Sync + Unpin + 'static,
300
        OUT: futures::AsyncWrite + Send + Sync + Unpin + 'static,
301
    {
302
        /// Banner line to send, indicating that Arti is ready to receive requests.
303
        ///
304
        /// The key in this json object is mandatory; the value can be anything.
305
        const BANNER: &[u8] = b"{\"arti_rpc\":{}}\n";
306

            
307
        output
308
            .write_all(BANNER)
309
            .await
310
            .map_err(|e| ConnectionError::WriteFailed(Arc::new(e)))?;
311

            
312
        let write = Box::pin(asynchronous_codec::FramedWrite::new(
313
            output,
314
            crate::codecs::JsonLinesEncoder::<BoxedResponse>::default(),
315
        ));
316

            
317
        let read = Box::pin(
318
            asynchronous_codec::FramedRead::new(
319
                input,
320
                asynchronous_codec::JsonCodec::<(), FlexibleRequest>::new(),
321
            )
322
            .fuse(),
323
        );
324

            
325
        self.run_loop(read, write).await
326
    }
327

            
328
    /// Run in a loop, handling requests from `request_stream` and writing
329
    /// responses onto `response_stream`.
330
    ///
331
    /// After this returns, even if it returns `Ok(())`, the connection must no longer be used.
332
    pub(crate) async fn run_loop(
333
        self: Arc<Self>,
334
        mut request_stream: BoxedRequestStream,
335
        mut response_sink: BoxedResponseSink,
336
    ) -> Result<(), ConnectionError> {
337
        // This function will multiplex on three streams:
338
        // * `request_stream` -- a stream of incoming requests from the client.
339
        // * `finished_requests` -- a stream of requests that are done.
340
        // * `rx_response` -- a stream of updates and final responses sent from
341
        //   in-progress tasks. (We put updates and final responsese onto the
342
        //   same channel to ensure that they stay in-order for each method
343
        //   invocation.
344
        //
345
        // Note that the blocking behavior here is deliberate: We want _all_ of
346
        // these reads to start blocking when response_sink.send is blocked.
347

            
348
        // TODO RPC should this queue participate in memquota?
349
        let (tx_response, mut rx_response) =
350
            mpsc_channel_no_memquota::<BoxedResponse>(UPDATE_CHAN_SIZE);
351

            
352
        let mut finished_requests = FuturesUnordered::new();
353
        finished_requests.push(futures::future::pending().boxed());
354

            
355
        /// Helper: enforce an explicit "continue".
356
        struct Continue;
357

            
358
        // We create a separate async block here and immediately await it,
359
        // so that any internal `returns` and `?`s do not escape the function.
360
        let outcome = async {
361
            loop {
362
                let _: Continue = futures::select! {
363
                    r = finished_requests.next() => {
364
                        // A task is done, so we can forget about it.
365
                        let () = r.expect("Somehow, future::pending() terminated.");
366
                        Continue
367
                    }
368

            
369
                    r = rx_response.next() => {
370
                        // The future for some request has sent a response (success,
371
                        // failure, or update), so we can inform the client.
372
                        let update = r.expect("Somehow, tx_update got closed.");
373
                        // Calling `await` here (and below) is deliberate: we _want_
374
                        // to stop reading the client's requests if the client is
375
                        // not reading their responses (or not) reading them fast
376
                        // enough.
377
                        response_sink.send(update).await.map_err(ConnectionError::writing)?;
378
                        Continue
379
                    }
380

            
381
                    req = request_stream.next() => {
382
                        match req {
383
                            None => {
384
                                // We've reached the end of the stream of requests;
385
                                // time to close.
386
                                return Ok(());
387
                            }
388
                            Some(Err(e)) => {
389
                                // We got a non-recoverable error from the JSON codec.
390
                                return Err(ConnectionError::from_read_error(e));
391

            
392
                            }
393
                            Some(Ok(FlexibleRequest::Invalid(bad_req))) => {
394
                                // We decoded the request as Json, but not as a `Valid`` request.
395
                                // Send back a response indicating what was wrong with it.
396
                                let response = BoxedResponse::from_error(
397
                                    bad_req.id().cloned(), bad_req.error()
398
                                );
399
                                response_sink
400
                                    .send(response)
401
                                    .await
402
                                    .map_err( ConnectionError::writing)?;
403
                                if bad_req.id().is_none() {
404
                                    // The spec says we must close the connection in this case.
405
                                    return Err(bad_req.error().into());
406
                                }
407
                                Continue
408

            
409
                            }
410
                            Some(Ok(FlexibleRequest::Valid(req))) => {
411
                                // We have a request. Time to launch it!
412
                                let tx = tx_response.clone();
413
                                let fut = self.run_method_and_deliver_response(tx, req);
414
                                finished_requests.push(fut.boxed());
415
                                Continue
416
                            }
417
                        }
418
                    }
419
                };
420
            }
421
        }
422
        .await;
423

            
424
        match outcome {
425
            Err(e) if e.is_connection_close() => Ok(()),
426
            other => other,
427
        }
428
    }
429

            
430
    /// Invoke `request` and send all of its responses to `tx_response`.
431
    async fn run_method_and_deliver_response(
432
        self: &Arc<Self>,
433
        mut tx_response: mpsc::Sender<BoxedResponse>,
434
        request: Request,
435
    ) {
436
        let Request {
437
            id,
438
            obj,
439
            meta,
440
            method,
441
        } = request;
442

            
443
        let update_sender: BoxedUpdateSink = if meta.updates {
444
            let id_clone = id.clone();
445
            let sink =
446
                tx_response
447
                    .clone()
448
                    .with_fn(move |obj: Box<dyn erased_serde::Serialize + Send>| {
449
                        Result::<BoxedResponse, _>::Ok(BoxedResponse {
450
                            id: Some(id_clone.clone()),
451
                            body: ResponseBody::Update(obj),
452
                        })
453
                    });
454
            Box::pin(sink)
455
        } else {
456
            let sink = futures::sink::drain().sink_err_into();
457
            Box::pin(sink)
458
        };
459

            
460
        let is_cancellable = method.is_cancellable();
461

            
462
        // Create `run_method_lowlevel` future, and make it cancellable.
463
        let fut = self.run_method_lowlevel(update_sender, obj, method, meta);
464

            
465
        // Optionally register the future as cancellable.  Then run it to completion.
466
        let outcome = if is_cancellable {
467
            let (handle, fut) = Cancel::new(fut);
468
            self.register_request(id.clone(), Some(handle));
469
            fut.await
470
        } else {
471
            self.register_request(id.clone(), None);
472
            Ok(fut.await)
473
        };
474

            
475
        // Figure out how to respond.
476
        let body = match outcome {
477
            Ok(Ok(value)) => ResponseBody::Success(value),
478
            // TODO: If we're going to box this, let's do so earlier.
479
            Ok(Err(err)) => {
480
                if err.is_internal() {
481
                    tracing::warn!(
482
                        "Reporting an internal error on an RPC connection: {:?}",
483
                        err
484
                    );
485
                }
486
                ResponseBody::Error(Box::new(err))
487
            }
488
            Err(_cancelled) => ResponseBody::Error(Box::new(rpc::RpcError::from(RequestCancelled))),
489
        };
490

            
491
        // Send the response.
492
        //
493
        // (It's okay to ignore the error here, since it can only mean that the
494
        // RPC connection has closed.)
495
        let _ignore_err = tx_response
496
            .send(BoxedResponse {
497
                id: Some(id.clone()),
498
                body,
499
            })
500
            .await;
501

            
502
        // Unregister the request.
503
        //
504
        // TODO: This may unregister a different request if the user sent
505
        // in another request with the same ID.
506
        self.remove_request(&id);
507
    }
508

            
509
    /// Run a single method, and return its final response.
510
    ///
511
    /// If `tx_updates` is provided, and this method generates updates, it
512
    /// should send those updates on `tx_updates`
513
    ///
514
    /// Note that this function is able to send responses with IDs that do not
515
    /// match the original.  It should enforce correct IDs on whatever response
516
    /// it generates.
517
    async fn run_method_lowlevel(
518
        self: &Arc<Self>,
519
        tx_updates: rpc::dispatch::BoxedUpdateSink,
520
        obj_id: rpc::ObjectId,
521
        method: Box<dyn rpc::DeserMethod>,
522
        meta: ReqMeta,
523
    ) -> Result<Box<dyn erased_serde::Serialize + Send + 'static>, rpc::RpcError> {
524
        let obj = self.lookup_object(&obj_id)?;
525

            
526
        if !meta.require.is_empty() {
527
            // TODO RPC: Eventually, we will need a way to tell which "features" are actually
528
            // available.  But for now, we have no features, so if the require list is nonempty,
529
            // we can safely reject the request.
530
            return Err(MissingFeaturesError(meta.require).into());
531
        }
532

            
533
        let context: Arc<dyn rpc::Context> = self.clone() as Arc<_>;
534

            
535
        let invoke_future =
536
            rpc::invoke_rpc_method(context, &obj_id, obj, method.upcast_box(), tx_updates)?;
537

            
538
        // Note that we drop the read lock before we await this future!
539
        invoke_future.await
540
    }
541
}
542

            
543
/// An error returned when an RPC request lists some feature as required,
544
/// but we don't have every such feature.
545
#[derive(Clone, Debug, thiserror::Error)]
546
#[error("Required features not available")]
547
struct MissingFeaturesError(
548
    /// A list of the features that were requested but not available.
549
    Vec<String>,
550
);
551

            
552
impl From<MissingFeaturesError> for RpcError {
553
    fn from(err: MissingFeaturesError) -> Self {
554
        let mut e = RpcError::new(
555
            err.to_string(),
556
            tor_rpcbase::RpcErrorKind::FeatureNotPresent,
557
        );
558
        e.set_datum("rpc:unsupported_features".to_string(), err.0)
559
            .expect("invalid keyword");
560
        e
561
    }
562
}
563

            
564
/// A failure that results in closing a [`Connection`].
565
#[derive(Clone, Debug, thiserror::Error)]
566
#[non_exhaustive]
567
pub enum ConnectionError {
568
    /// Unable to write to our connection.
569
    #[error("Could not write to connection")]
570
    WriteFailed(#[source] Arc<IoError>),
571
    /// Read error from connection.
572
    #[error("Problem reading from connection")]
573
    ReadFailed(#[source] Arc<IoError>),
574
    /// Read something that we could not decode.
575
    #[error("Unable to decode request from connection")]
576
    DecodeFailed(#[source] Arc<serde_json::Error>),
577
    /// Unable to write our response as json.
578
    #[error("Unable to encode response onto connection")]
579
    EncodeFailed(#[source] Arc<serde_json::Error>),
580
    /// We encountered a problem when parsing a request that was (in our judgment)
581
    /// too severe to recover from.
582
    #[error("Unrecoverable problem from parsed request")]
583
    RequestParseFailed(#[from] RequestParseError),
584
}
585

            
586
impl ConnectionError {
587
    /// Construct a new `ConnectionError` from a `JsonCodecError` that has occurred while writing.
588
    fn writing(error: JsonCodecError) -> Self {
589
        match error {
590
            JsonCodecError::Io(e) => Self::WriteFailed(Arc::new(e)),
591
            JsonCodecError::Json(e) => Self::EncodeFailed(Arc::new(e)),
592
        }
593
    }
594

            
595
    /// Return true if this error is (or might be) due to the peer closing the connection.
596
    ///
597
    /// Such errors should be tolerated without much complaint;
598
    /// other errors should at least be logged somewhere.
599
    fn is_connection_close(&self) -> bool {
600
        use JsonErrorCategory as JK;
601
        use std::io::ErrorKind as IK;
602
        #[allow(clippy::match_like_matches_macro)]
603
        match self {
604
            Self::ReadFailed(e) | Self::WriteFailed(e) => match e.kind() {
605
                IK::UnexpectedEof | IK::ConnectionAborted | IK::BrokenPipe => true,
606
                _ => false,
607
            },
608
            Self::DecodeFailed(e) => match e.classify() {
609
                JK::Eof => true,
610
                _ => false,
611
            },
612
            _ => false,
613
        }
614
    }
615

            
616
    /// Construct a `ConnectionError` from a JsonCodecError that occurred while reading.
617
    fn from_read_error(error: JsonCodecError) -> Self {
618
        match error {
619
            JsonCodecError::Io(e) => Self::ReadFailed(Arc::new(e)),
620
            JsonCodecError::Json(e) => Self::DecodeFailed(Arc::new(e)),
621
        }
622
    }
623
}
624

            
625
impl rpc::Context for Connection {
626
    fn lookup_object(&self, id: &rpc::ObjectId) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
627
        Connection::lookup_object(self, id)
628
    }
629

            
630
    fn register_owned(&self, object: Arc<dyn rpc::Object>) -> rpc::ObjectId {
631
        let use_global_id = object.expose_outside_of_session();
632
        let local_id = self
633
            .inner
634
            .lock()
635
            .expect("Lock poisoned")
636
            .objects
637
            .insert_strong(object);
638

            
639
        // Design note: It is a deliberate decision to _always_ use GlobalId for
640
        // objects whose IDs are _ever_ exported for use in SOCKS requests.  Some
641
        // alternatives would be to use GlobalId conditionally, or to have a
642
        // separate Method to create a new GlobalId given an existing LocalId.
643
        if use_global_id {
644
            GlobalId::new(self.connection_id, local_id).encode(&self.global_id_mac_key)
645
        } else {
646
            local_id.encode()
647
        }
648
    }
649

            
650
    fn release_owned(&self, id: &rpc::ObjectId) -> Result<(), rpc::LookupError> {
651
        let removed_some = if id.as_ref() == Self::CONNECTION_OBJ_ID {
652
            self.inner
653
                .lock()
654
                .expect("Lock poisoned")
655
                .this_connection
656
                .take()
657
                .is_some()
658
        } else {
659
            let idx = self.id_into_local_idx(id)?;
660

            
661
            if !idx.is_strong() {
662
                return Err(rpc::LookupError::WrongType(id.clone()));
663
            }
664

            
665
            self.inner
666
                .lock()
667
                .expect("Lock poisoned")
668
                .objects
669
                .remove(idx)
670
                .is_some()
671
        };
672

            
673
        if removed_some {
674
            Ok(())
675
        } else {
676
            Err(rpc::LookupError::NoObject(id.clone()))
677
        }
678
    }
679

            
680
    fn dispatch_table(&self) -> &Arc<std::sync::RwLock<rpc::DispatchTable>> {
681
        &self.dispatch_table
682
    }
683
}
684

            
685
/// An error given when an RPC request is cancelled.
686
///
687
/// This is a separate type from [`crate::cancel::Cancelled`] since eventually
688
/// we want to move that type into a general-purpose location, and make it not
689
/// RPC-specific.
690
#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
691
#[error("RPC request was cancelled")]
692
pub(crate) struct RequestCancelled;
693

            
694
impl From<RequestCancelled> for RpcError {
695
2
    fn from(_: RequestCancelled) -> Self {
696
2
        RpcError::new(
697
2
            "Request cancelled".into(),
698
2
            rpc::RpcErrorKind::RequestCancelled,
699
        )
700
2
    }
701
}
702

            
703
/// An error given when we attempt to cancel an RPC request, but cannot.
704
///
705
#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
706
pub(crate) enum CancelError {
707
    /// We didn't find any request with the provided ID.
708
    ///
709
    /// Since we don't keep track of requests after they finish or are cancelled,
710
    /// we cannot distinguish the cases where a request has finished,
711
    /// where the request has been cancelled,
712
    /// or where the request never existed.
713
    /// Therefore we collapse them into a single error type.
714
    #[error("RPC request not found")]
715
    RequestNotFound,
716

            
717
    /// This kind of request cannot be cancelled.
718
    #[error("Uncancellable request")]
719
    CannotCancelRequest,
720

            
721
    /// We tried to cancel a request but found out it was already cancelled.
722
    ///
723
    /// This error should be impossible.
724
    #[error("Request somehow cancelled twice!")]
725
    AlreadyCancelled,
726
}
727

            
728
impl From<cancel::CannotCancel> for CancelError {
729
    fn from(value: cancel::CannotCancel) -> Self {
730
        use CancelError as CE;
731
        use cancel::CannotCancel as CC;
732
        match value {
733
            CC::Cancelled => CE::AlreadyCancelled,
734
            // We map "finished" to RequestNotFound since it is not in the general case
735
            // distinguishable from it; see documentation on RequestNotFound.
736
            CC::Finished => CE::RequestNotFound,
737
        }
738
    }
739
}
740

            
741
impl From<CancelError> for RpcError {
742
    fn from(err: CancelError) -> Self {
743
        use CancelError as CE;
744
        use rpc::RpcErrorKind as REK;
745
        let code = match err {
746
            CE::RequestNotFound => REK::RequestError,
747
            CE::CannotCancelRequest => REK::RequestError,
748
            CE::AlreadyCancelled => REK::InternalError,
749
        };
750
        RpcError::new(err.to_string(), code)
751
    }
752
}