1
//! This module contains a WIP relay tunnel reactor.
2
//!
3
//! The initial version will duplicate some of the logic from
4
//! the client tunnel reactor.
5
//!
6
//! TODO(relay): refactor the relay tunnel
7
//! to share the same base tunnel implementation
8
//! as the client tunnel (to reduce code duplication).
9
//!
10
//! See the design notes at doc/dev/notes/relay-reactor.md
11

            
12
pub(crate) mod channel;
13
#[allow(unreachable_pub)] // TODO(relay): use in tor-chanmgr(?)
14
pub mod channel_provider;
15
pub(crate) mod reactor;
16

            
17
pub use channel::MaybeVerifiableRelayResponderChannel;
18
pub use channel::create_handler::{
19
    CircNetParameters, CongestionControlNetParams, CreateRequestHandler,
20
};
21

            
22
use derive_deftly::Deftly;
23
use futures::StreamExt as _;
24
use oneshot_fused_workaround as oneshot;
25

            
26
use tor_cell::chancell::msg::{self as chanmsg};
27
use tor_cell::relaycell::StreamId;
28
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
29
use tor_memquota::derive_deftly_template_HasMemoryCost;
30
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
31

            
32
use crate::Error;
33
use crate::circuit::celltypes::derive_deftly_template_RestrictedChanMsgSet;
34
use crate::circuit::circhop::ReactorStreamComponents;
35
use crate::circuit::reactor::CircReactorHandle;
36
use crate::circuit::reactor::{CtrlCmd, forward};
37
use crate::congestion::sendme::StreamRecvWindow;
38
use crate::memquota::SpecificAccount;
39
use crate::relay::reactor::backward::Backward;
40
use crate::relay::reactor::forward::Forward;
41
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
42
use crate::stream::incoming::{
43
    IncomingCmdChecker, IncomingStream, IncomingStreamRequestFilter, StreamReqInfo,
44
};
45
use crate::stream::raw::StreamReceiver;
46
use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
47

            
48
use std::sync::Arc;
49

            
50
/// A subclass of ChanMsg that can correctly arrive on a live relay
51
/// circuit (one where a CREATE* has been received).
52
#[derive(Debug, Deftly)]
53
#[derive_deftly(HasMemoryCost)]
54
#[derive_deftly(RestrictedChanMsgSet)]
55
#[deftly(usage = "on an open relay circuit")]
56
#[cfg(feature = "relay")]
57
#[cfg_attr(not(test), allow(unused))] // TODO(relay)
58
pub(crate) enum RelayCircChanMsg {
59
    /// A relay cell telling us some kind of remote command from some
60
    /// party on the circuit.
61
    Relay(chanmsg::Relay),
62
    /// A relay early cell that is allowed to contain a CREATE message.
63
    RelayEarly(chanmsg::RelayEarly),
64
    /// A cell telling us to destroy the circuit.
65
    Destroy(chanmsg::Destroy),
66
    /// A cell telling us to enable/disable channel padding.
67
    PaddingNegotiate(chanmsg::PaddingNegotiate),
68
}
69

            
70
/// A handle for interacting with a relay circuit.
71
#[allow(unused)] // TODO(relay)
72
#[derive(Debug)]
73
pub struct RelayCirc(pub(crate) CircReactorHandle<Forward, Backward>);
74

            
75
impl RelayCirc {
76
    /// Shut down this circuit, along with all streams that are using it.
77
    /// Happens asynchronously (i.e. the tunnel won't necessarily be done shutting down
78
    /// immediately after this function returns!).
79
    ///
80
    /// Note that other references to this tunnel may exist.
81
    /// If they do, they will stop working after you call this function.
82
    ///
83
    /// It's not necessary to call this method if you're just done with a circuit:
84
    /// the circuit should close on its own once nothing is using it any more.
85
    pub fn terminate(&self) {
86
        let _ = self.0.command.unbounded_send(CtrlCmd::Shutdown);
87
    }
88

            
89
    /// Return true if this circuit is closed and therefore unusable.
90
32
    pub fn is_closing(&self) -> bool {
91
32
        self.0.control.is_closed()
92
32
    }
93

            
94
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
95
    ///
96
    /// Typically the circuit reactor would send this new rate in an XON message to the other end of
97
    /// the stream.
98
    /// But it may decide not to, and may discard this update.
99
    /// For example the stream may have a large amount of buffered data, and the reactor may not
100
    /// want to send an XON while the buffer is large.
101
    ///
102
    /// This sends a message to inform the circuit reactor of the new drain rate,
103
    /// but it does not block or wait for a response from the reactor.
104
    /// An error is only returned if we are unable to send the update.
105
    //
106
    // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
107
    pub(crate) fn drain_rate_update(
108
        &self,
109
        _stream_id: StreamId,
110
        _rate: XonKbpsEwma,
111
    ) -> crate::Result<()> {
112
        todo!()
113
    }
114

            
115
    /// Request to send a SENDME cell for this stream.
116
    ///
117
    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
118
    /// block or wait for a response from the circuit reactor.
119
    /// An error is only returned if we are unable to send the request.
120
    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
121
    /// this here and an error will not be returned.
122
    //
123
    // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
124
    pub(crate) fn send_sendme(&self, _stream_id: StreamId) -> crate::Result<()> {
125
        todo!()
126
    }
127

            
128
    /// Close the pending stream that owns this StreamTarget, delivering the specified
129
    /// END message (if any)
130
    ///
131
    /// The stream is closed by sending a control message (`ClosePendingStream`)
132
    /// to the reactor.
133
    ///
134
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
135
    ///
136
    /// The StreamTarget will set the correct stream ID and pick the
137
    /// right hop, but will not validate that the message is well-formed
138
    /// or meaningful in context.
139
    ///
140
    /// Note that in many cases, the actual contents of an END message can leak unwanted
141
    /// information. Please consider carefully before sending anything but an
142
    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
143
    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
144
    ///
145
    /// In addition to sending the END message, this function also ensures
146
    /// the state of the stream map entry of this stream is updated
147
    /// accordingly.
148
    ///
149
    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
150
    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
151
    /// function is for closing pending incoming streams (a stream is said to be pending if we have
152
    /// received the message initiating the stream but have not responded to it yet).
153
    ///
154
    /// **NOTE**: This function should be called at most once per request.
155
    /// Calling it twice is an error.
156
    //
157
    // TODO(relay): this duplicates the ClientTunnel API and docs. Do we care?
158
    pub(crate) fn close_pending(
159
        &self,
160
        _stream_id: StreamId,
161
        _message: crate::stream::CloseStreamBehavior,
162
    ) -> crate::Result<oneshot::Receiver<crate::Result<()>>> {
163
        todo!()
164
    }
165

            
166
    /// Tell this reactor to begin allowing incoming stream requests,
167
    /// and to return those pending requests in an asynchronous stream.
168
    ///
169
    /// Ordinarily, these requests are rejected.
170
    ///
171
    /// Needed for exits. Middle relays should reject every incoming stream,
172
    /// either through the `filter` provided in `filter`,
173
    /// or by explicitly calling .reject() on each received stream.
174
    ///
175
    // TODO(relay): I think we will prefer using the .reject() approach
176
    // for this, because the filter is only meant for inexpensive quick
177
    // checks that are done immediately in the reactor (any blocking
178
    // in the filter will block the relay reactor main loop!).
179
    ///
180
    /// The user of the reactor **must** handle this stream
181
    /// (either by .accept()ing and opening and proxying the corresponding
182
    /// streams as appropriate, or by .reject()ing).
183
    ///
184
    // TODO: declare a type-alias for the return type when support for
185
    // impl in type aliases gets stabilized.
186
    //
187
    // See issue #63063 <https://github.com/rust-lang/rust/issues/63063>
188
    //
189
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given reactor.
190
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
191
    /// an error.
192
    ///
193
    /// After this method has been called on a reactor, the reactor is expected
194
    /// to receive requests of this type indefinitely, until it is finally closed.
195
    /// If the `Stream` is dropped, the next request on this reactor will cause it to close.
196
    ///
197
    // TODO: Someday, we might want to allow a stream request handler to be
198
    // un-registered.  However, nothing in the Tor protocol requires it.
199
    //
200
    // TODO(DEDUP): *very* similar to ServiceOnionServiceDataTunnel::allow_stream_requests
201
    #[allow(unused)] // TODO(relay): call this from the task that creates the circ
202
8
    pub(crate) async fn allow_stream_requests<'a, FILT>(
203
8
        self: Arc<Self>,
204
8
        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
205
8
        filter: FILT,
206
8
    ) -> crate::Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
207
8
    where
208
8
        FILT: IncomingStreamRequestFilter,
209
8
    {
210
8
        let tunnel = Arc::clone(&self);
211
        /// The size of the channel receiving IncomingStreamRequestContexts.
212
        ///
213
        // TODO(relay-tuning): buffer size
214
        const INCOMING_BUFFER: usize = crate::stream::STREAM_READER_BUFFER;
215

            
216
8
        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER).new_mq(
217
8
            self.0.time_provider.clone(),
218
8
            tunnel.0.memquota.as_raw_account(),
219
        )?;
220

            
221
8
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
222
8
        let (tx, rx) = oneshot::channel();
223
8
        let cmd = forward::CtrlCmd::AwaitStreamRequests {
224
8
            incoming_sender,
225
8
            cmd_checker,
226
8
            hop: None,
227
8
            filter: Box::new(filter),
228
8
            done: tx,
229
8
        };
230

            
231
8
        tunnel
232
8
            .0
233
8
            .command
234
8
            .unbounded_send(CtrlCmd::Forward(cmd))
235
8
            .map_err(|_| Error::CircuitClosed)?;
236

            
237
        // Check whether the AwaitStreamRequest was processed successfully.
238
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
239

            
240
        // TODO(relay): this is more or less copy-pasta from client code
241
8
        let stream = incoming_receiver.map(move |req_ctx| {
242
            let StreamReqInfo {
243
4
                req,
244
4
                stream_id,
245
4
                hop,
246
                stream_components:
247
                    ReactorStreamComponents {
248
4
                        stream_inbound_rx,
249
4
                        stream_outbound_tx,
250
4
                        rate_limit_rx,
251
4
                        drain_rate_request_rx,
252
                    },
253
4
                memquota,
254
4
                relay_cell_format,
255
4
            } = req_ctx;
256

            
257
            // There is no originating hop if we're a relay
258
4
            debug_assert!(hop.is_none());
259

            
260
4
            let target = StreamTarget {
261
4
                tunnel: Tunnel::Relay(Arc::clone(&tunnel)),
262
4
                tx: stream_outbound_tx,
263
4
                hop: None,
264
4
                stream_id,
265
4
                relay_cell_format,
266
4
                rate_limit_stream: rate_limit_rx,
267
4
            };
268

            
269
            // can be used to build a reader that supports XON/XOFF flow control
270
4
            let xon_xoff_reader_ctrl =
271
4
                XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
272

            
273
4
            let reader = StreamReceiver {
274
4
                target: target.clone(),
275
4
                receiver: stream_inbound_rx,
276
4
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
277
4
                ended: false,
278
4
            };
279

            
280
4
            let components = StreamComponents {
281
4
                stream_receiver: reader,
282
4
                target,
283
4
                memquota,
284
4
                xon_xoff_reader_ctrl,
285
4
            };
286

            
287
4
            IncomingStream::new(self.0.time_provider.clone(), req, components)
288
4
        });
289

            
290
8
        Ok(stream)
291
8
    }
292
}
293

            
294
#[cfg(test)]
295
mod test {
296
    // @@ begin test lint list maintained by maint/add_warning @@
297
    #![allow(clippy::bool_assert_comparison)]
298
    #![allow(clippy::clone_on_copy)]
299
    #![allow(clippy::dbg_macro)]
300
    #![allow(clippy::mixed_attributes_style)]
301
    #![allow(clippy::print_stderr)]
302
    #![allow(clippy::print_stdout)]
303
    #![allow(clippy::single_char_pattern)]
304
    #![allow(clippy::unwrap_used)]
305
    #![allow(clippy::unchecked_time_subtraction)]
306
    #![allow(clippy::useless_vec)]
307
    #![allow(clippy::needless_pass_by_value)]
308
    #![allow(clippy::string_slice)] // See arti#2571
309
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
310

            
311
    #[test]
312
    fn relay_circ_chan_msg() {
313
        use tor_cell::chancell::msg::{self, AnyChanMsg};
314
        fn good(m: AnyChanMsg) {
315
            use crate::relay::RelayCircChanMsg;
316
            assert!(RelayCircChanMsg::try_from(m).is_ok());
317
        }
318
        fn bad(m: AnyChanMsg) {
319
            use crate::relay::RelayCircChanMsg;
320
            assert!(RelayCircChanMsg::try_from(m).is_err());
321
        }
322

            
323
        good(msg::Destroy::new(2.into()).into());
324
        bad(msg::CreatedFast::new(&b"The great globular mass"[..]).into());
325
        bad(msg::Created2::new(&b"of protoplasmic slush"[..]).into());
326
        good(msg::Relay::new(&b"undulated slightly,"[..]).into());
327
        good(msg::AnyChanMsg::RelayEarly(
328
            msg::Relay::new(&b"as if aware of him"[..]).into(),
329
        ));
330
        bad(msg::Versions::new([1, 2, 3]).unwrap().into());
331
        good(msg::PaddingNegotiate::start_default().into());
332
        good(msg::RelayEarly::from(msg::Relay::new(b"snail-like unipedular organism")).into());
333
    }
334
}