1
//! This module contains a WIP relay tunnel reactor.
2
//!
3
//! The initial version will duplicate some of the logic from
4
//! the client tunnel reactor.
5
//!
6
//! TODO(relay): refactor the relay tunnel
7
//! to share the same base tunnel implementation
8
//! as the client tunnel (to reduce code duplication).
9
//!
10
//! See the design notes at doc/dev/notes/relay-reactor.md
11

            
12
pub(crate) mod channel;
13
#[allow(unreachable_pub)] // TODO(relay): use in tor-chanmgr(?)
14
pub mod channel_provider;
15
pub(crate) mod reactor;
16

            
17
pub use channel::MaybeVerifiableRelayResponderChannel;
18
pub use channel::create_handler::{
19
    CircNetParameters, CongestionControlNetParams, CreateRequestHandler,
20
    IncomingStreamRequestFilterFactory,
21
};
22

            
23
use derive_deftly::Deftly;
24
use oneshot_fused_workaround as oneshot;
25

            
26
use tor_cell::chancell::msg::{self as chanmsg};
27
use tor_cell::relaycell::StreamId;
28
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
29
use tor_memquota::derive_deftly_template_HasMemoryCost;
30

            
31
use crate::Error;
32
use crate::circuit::celltypes::derive_deftly_template_RestrictedChanMsgSet;
33
use crate::circuit::reactor::CircReactorHandle;
34
use crate::circuit::reactor::CtrlCmd;
35
use crate::circuit::reactor::forward;
36
use crate::relay::reactor::backward::Backward;
37
use crate::relay::reactor::forward::Forward;
38
use crate::stream::incoming::IncomingStreamRequestFilter;
39

            
40
/// A subclass of ChanMsg that can correctly arrive on a live relay
41
/// circuit (one where a CREATE* has been received).
42
#[derive(Debug, Deftly)]
43
#[derive_deftly(HasMemoryCost)]
44
#[derive_deftly(RestrictedChanMsgSet)]
45
#[deftly(usage = "on an open relay circuit")]
46
#[cfg(feature = "relay")]
47
#[cfg_attr(not(test), allow(unused))] // TODO(relay)
48
pub(crate) enum RelayCircChanMsg {
49
    /// A relay cell telling us some kind of remote command from some
50
    /// party on the circuit.
51
    Relay(chanmsg::Relay),
52
    /// A relay early cell that is allowed to contain a CREATE message.
53
    RelayEarly(chanmsg::RelayEarly),
54
    /// A cell telling us to destroy the circuit.
55
    Destroy(chanmsg::Destroy),
56
    /// A cell telling us to enable/disable channel padding.
57
    PaddingNegotiate(chanmsg::PaddingNegotiate),
58
}
59

            
60
/// A handle for interacting with a relay circuit.
61
#[allow(unused)] // TODO(relay)
62
#[derive(Debug)]
63
pub struct RelayCirc(pub(crate) CircReactorHandle<Forward, Backward>);
64

            
65
impl RelayCirc {
66
    /// Shut down this circuit, along with all streams that are using it.
67
    /// Happens asynchronously (i.e. the tunnel won't necessarily be done shutting down
68
    /// immediately after this function returns!).
69
    ///
70
    /// Note that other references to this tunnel may exist.
71
    /// If they do, they will stop working after you call this function.
72
    ///
73
    /// It's not necessary to call this method if you're just done with a circuit:
74
    /// the circuit should close on its own once nothing is using it any more.
75
    pub fn terminate(&self) {
76
        let _ = self.0.command.unbounded_send(CtrlCmd::Shutdown);
77
    }
78

            
79
    /// Return true if this circuit is closed and therefore unusable.
80
32
    pub fn is_closing(&self) -> bool {
81
32
        self.0.control.is_closed()
82
32
    }
83

            
84
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
85
    ///
86
    /// Typically the circuit reactor would send this new rate in an XON message to the other end of
87
    /// the stream.
88
    /// But it may decide not to, and may discard this update.
89
    /// For example the stream may have a large amount of buffered data, and the reactor may not
90
    /// want to send an XON while the buffer is large.
91
    ///
92
    /// This sends a message to inform the circuit reactor of the new drain rate,
93
    /// but it does not block or wait for a response from the reactor.
94
    /// An error is only returned if we are unable to send the update.
95
    //
96
    // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
97
    pub(crate) fn drain_rate_update(
98
        &self,
99
        _stream_id: StreamId,
100
        _rate: XonKbpsEwma,
101
    ) -> crate::Result<()> {
102
        todo!()
103
    }
104

            
105
    /// Request to send a SENDME cell for this stream.
106
    ///
107
    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
108
    /// block or wait for a response from the circuit reactor.
109
    /// An error is only returned if we are unable to send the request.
110
    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
111
    /// this here and an error will not be returned.
112
    //
113
    // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
114
    pub(crate) fn send_sendme(&self, _stream_id: StreamId) -> crate::Result<()> {
115
        todo!()
116
    }
117

            
118
    /// Close the pending stream that owns this StreamTarget, delivering the specified
119
    /// END message (if any)
120
    ///
121
    /// The stream is closed by sending a control message (`ClosePendingStream`)
122
    /// to the reactor.
123
    ///
124
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
125
    ///
126
    /// The StreamTarget will set the correct stream ID and pick the
127
    /// right hop, but will not validate that the message is well-formed
128
    /// or meaningful in context.
129
    ///
130
    /// Note that in many cases, the actual contents of an END message can leak unwanted
131
    /// information. Please consider carefully before sending anything but an
132
    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
133
    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
134
    ///
135
    /// In addition to sending the END message, this function also ensures
136
    /// the state of the stream map entry of this stream is updated
137
    /// accordingly.
138
    ///
139
    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
140
    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
141
    /// function is for closing pending incoming streams (a stream is said to be pending if we have
142
    /// received the message initiating the stream but have not responded to it yet).
143
    ///
144
    /// **NOTE**: This function should be called at most once per request.
145
    /// Calling it twice is an error.
146
    //
147
    // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
148
4
    pub(crate) fn close_pending(
149
4
        &self,
150
4
        stream_id: StreamId,
151
4
        message: crate::stream::CloseStreamBehavior,
152
4
    ) -> crate::Result<oneshot::Receiver<crate::Result<()>>> {
153
4
        let (tx, rx) = oneshot::channel();
154

            
155
4
        let cmd = forward::CtrlCmd::ClosePendingStream {
156
4
            stream_id,
157
4
            hop: None,
158
4
            message,
159
4
            done: tx,
160
4
        };
161

            
162
4
        self.0
163
4
            .command
164
4
            .unbounded_send(CtrlCmd::Forward(cmd))
165
4
            .map_err(|_| Error::CircuitClosed)?;
166

            
167
4
        Ok(rx)
168
4
    }
169
}
170

            
171
#[cfg(test)]
172
mod test {
173
    // @@ begin test lint list maintained by maint/add_warning @@
174
    #![allow(clippy::bool_assert_comparison)]
175
    #![allow(clippy::clone_on_copy)]
176
    #![allow(clippy::dbg_macro)]
177
    #![allow(clippy::mixed_attributes_style)]
178
    #![allow(clippy::print_stderr)]
179
    #![allow(clippy::print_stdout)]
180
    #![allow(clippy::single_char_pattern)]
181
    #![allow(clippy::unwrap_used)]
182
    #![allow(clippy::unchecked_time_subtraction)]
183
    #![allow(clippy::useless_vec)]
184
    #![allow(clippy::needless_pass_by_value)]
185
    #![allow(clippy::string_slice)] // See arti#2571
186
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
187

            
188
    #[test]
189
    fn relay_circ_chan_msg() {
190
        use tor_cell::chancell::msg::{self, AnyChanMsg};
191
        fn good(m: AnyChanMsg) {
192
            use crate::relay::RelayCircChanMsg;
193
            assert!(RelayCircChanMsg::try_from(m).is_ok());
194
        }
195
        fn bad(m: AnyChanMsg) {
196
            use crate::relay::RelayCircChanMsg;
197
            assert!(RelayCircChanMsg::try_from(m).is_err());
198
        }
199

            
200
        good(msg::Destroy::new(2.into()).into());
201
        bad(msg::CreatedFast::new(&b"The great globular mass"[..]).into());
202
        bad(msg::Created2::new(&b"of protoplasmic slush"[..]).into());
203
        good(msg::Relay::new(&b"undulated slightly,"[..]).into());
204
        good(msg::AnyChanMsg::RelayEarly(
205
            msg::Relay::new(&b"as if aware of him"[..]).into(),
206
        ));
207
        bad(msg::Versions::new([1, 2, 3]).unwrap().into());
208
        good(msg::PaddingNegotiate::start_default().into());
209
        good(msg::RelayEarly::from(msg::Relay::new(b"snail-like unipedular organism")).into());
210
    }
211
}