1
//! [`oneshot`] channel between a circuit control message handler and the main code
2
//!
3
//! Wraps up a [`oneshot`] and deals with some error handling.
4
//!
5
//! Used by [`connect`](crate::connect)
6

            
7
use oneshot_fused_workaround as oneshot;
8
use tor_cell::relaycell::RelayMsg;
9
use tor_cell::relaycell::msg::AnyRelayMsg;
10
use tor_error::internal;
11
use tor_proto::MetaCellDisposition;
12

            
13
use crate::FailedAttemptError;
14

            
15
/// Sender, owned by the circuit message handler
16
///
17
/// Also records whether the message has been sent.
18
/// Forms part of the state for message handler's state machine.
19
pub(crate) struct Sender<M>(
20
    /// This is an `Option` so that we can `send` without consuming.
21
    ///
22
    /// Needed because `oneshot`'s send consumes, but the message handler gets `&mut self`.
23
    Option<oneshot::Sender<Result<M, tor_proto::Error>>>,
24
);
25

            
26
/// Receiver for awaiting the protocol message when the circuit handler sends it
27
pub(crate) struct Receiver<M>(
28
    oneshot::Receiver<Result<M, tor_proto::Error>>, // (force rustfmt to do this like Sender)
29
);
30

            
31
/// Create a new [`proto_oneshot::Sender`](Sender) and [`proto_oneshot::Receiver`](Receiver)
32
510
pub(crate) fn channel<M>() -> (Sender<M>, Receiver<M>) {
33
510
    let (tx, rx) = oneshot::channel();
34
510
    (Sender(Some(tx)), Receiver(rx))
35
510
}
36

            
37
impl<M> Sender<M> {
38
    /// Has this `Sender` yet to be used?
39
    ///
40
    /// Returns `true` until the first call to `deliver_expected_message`,
41
    /// then `false` .
42
340
    pub(crate) fn still_expected(&self) -> bool {
43
340
        self.0.is_some()
44
340
    }
45

            
46
    /// Try to decode `msg` as message of type `M`, and to send the outcome on the
47
    /// oneshot taken from `reply_tx`.
48
    ///
49
    /// Gives an error if `reply_tx` is None, or if an error occurs.
50
    ///
51
    /// Where possible, errors are also reported via the `oneshot`.
52
510
    pub(crate) fn deliver_expected_message(
53
510
        &mut self,
54
510
        msg: AnyRelayMsg,
55
510
        disposition_on_success: MetaCellDisposition,
56
510
    ) -> Result<MetaCellDisposition, tor_proto::Error>
57
510
    where
58
510
        M: RelayMsg + Clone + TryFrom<AnyRelayMsg, Error = tor_cell::Error>,
59
    {
60
510
        let reply_tx = self
61
510
            .0
62
510
            .take()
63
510
            .ok_or_else(|| internal!("Tried to handle two messages of the same type"))?;
64

            
65
510
        let outcome = M::try_from(msg).map_err(|err| tor_proto::Error::CellDecodeErr {
66
            object: "rendezvous-related cell",
67
            err,
68
        });
69

            
70
        #[allow(clippy::unnecessary_lazy_evaluations)] // want to state the Err type
71
510
        reply_tx
72
510
            .send(outcome.clone())
73
            // If the caller went away, we just drop the outcome
74
510
            .unwrap_or_else(|_: Result<M, _>| ());
75

            
76
510
        outcome.map(|_| disposition_on_success)
77
510
    }
78
}
79

            
80
impl<M> Receiver<M> {
81
    /// Receive the message `M`
82
    ///
83
    /// Waits for the call to `deliver_expected_message`, and converts the
84
    /// resulting error to a `FailedAttemptError` using `handle_proto_error`.
85
344
    pub(crate) async fn recv(
86
344
        self,
87
344
        handle_proto_error: impl Fn(tor_proto::Error) -> FailedAttemptError + Copy,
88
344
    ) -> Result<M, FailedAttemptError> {
89
344
        self.0
90
344
            .await
91
            // If the circuit collapsed, we don't get an error from tor_proto; make one up
92
344
            .map_err(|_: oneshot::Canceled| tor_proto::Error::CircuitClosed)
93
344
            .map_err(handle_proto_error)?
94
344
            .map_err(handle_proto_error)
95
344
    }
96
}