1
//! Channel for sending messages to [`StreamReactor`].
2

            
3
use crate::circuit::UniqId;
4
use crate::circuit::circhop::{CircHopOutbound, HopSettings};
5
use crate::circuit::reactor::circhop::CircHopList;
6
use crate::circuit::reactor::stream::{ReadyStreamMsg, StreamMsg, StreamReactor};
7
use crate::congestion::CongestionControl;
8
use crate::memquota::CircuitAccount;
9
use crate::util::err::ReactorError;
10
use crate::util::timeout::TimeoutEstimator;
11
use crate::{Error, HopNum, Result};
12

            
13
#[cfg(any(feature = "hs-service", feature = "relay"))]
14
use crate::stream::incoming::IncomingStreamRequestHandler;
15

            
16
use tor_error::internal;
17
use tor_rtcompat::Runtime;
18

            
19
use futures::SinkExt;
20
use futures::channel::mpsc;
21

            
22
use std::result::Result as StdResult;
23
use std::sync::{Arc, Mutex, RwLock};
24

            
25
/// The hop manager of a reactor.
26
///
27
/// This contains the per-hop state (e.g. congestion control information),
28
/// and a handle to the stream reactor of the hop.
29
///
30
/// The stream reactor of the hop is launched lazily,
31
/// when the first [`StreamMsg`] is sent via [`HopMgr::send`].
32
pub(crate) struct HopMgr<R: Runtime> {
33
    /// A handle to the runtime.
34
    runtime: R,
35
    /// Context used when spawning a stream reactor.
36
    ctx: StreamReactorContext,
37
    /// Sender for sending messages to BWD.
38
    ///
39
    /// The receiver is in BWD.
40
    ///
41
    /// A clone of this is passed to each spawned StreamReactor
42
    bwd_tx: mpsc::Sender<ReadyStreamMsg>,
43
    /// The underlying senders, indexed by [`HopNum`].
44
    ///
45
    /// Relays have at most one stream reactor per circuit.
46
    /// Clients have at most one stream reactor per circuit hop.
47
    ///
48
    /// This is shared with the backward reactor.
49
    /// The backward reactor only ever *reads* from this
50
    /// (it never mutates the list).
51
    ///
52
    // TODO: the backward reactor only ever reads from this.
53
    // Conceptually, it is the HopMgr that owns this list,
54
    // because only HopMgr can add hops to the list.
55
    //
56
    // Perhaps we need a specialized abstraction that only allows reading here.
57
    // This could be a wrapper over RwLock, providing a read-only API for the BWD.
58
    hops: Arc<RwLock<CircHopList>>,
59
    /// Memory quota account
60
    memquota: CircuitAccount,
61
}
62

            
63
/// State needed to build a stream reactor.
64
///
65
/// Used when spawning the stream reactor of a hop.
66
struct StreamReactorContext {
67
    /// An identifier for logging about this reactor's circuit.
68
    unique_id: UniqId,
69
    /// The incoming stream handler.
70
    ///
71
    /// This is shared with every StreamReactor.
72
    #[cfg(any(feature = "hs-service", feature = "relay"))]
73
    incoming: Arc<Mutex<Option<IncomingStreamRequestHandler>>>,
74
    /// The circuit timeout estimator.
75
    ///
76
    /// Used for computing half-stream expiration.
77
    timeouts: Arc<dyn TimeoutEstimator>,
78
}
79

            
80
impl<R: Runtime> HopMgr<R> {
81
    /// Create a new [`HopMgr`] with an empty hop list.
82
    ///
83
    /// Hops are added with [`HopMgr::add_hop`].
84
    pub(crate) fn new(
85
        runtime: R,
86
        unique_id: UniqId,
87
        timeouts: Arc<dyn TimeoutEstimator>,
88
        bwd_tx: mpsc::Sender<ReadyStreamMsg>,
89
        memquota: CircuitAccount,
90
    ) -> Self {
91
        // We don't spawn any stream reactors ahead of time.
92
        // Instead we spawn them lazily, when opening streams.
93
        let hops = Arc::new(RwLock::new(Default::default()));
94
        let ctx = StreamReactorContext {
95
            unique_id,
96
            #[cfg(any(feature = "hs-service", feature = "relay"))]
97
            incoming: Arc::new(Mutex::new(None)),
98
            timeouts,
99
        };
100

            
101
        Self {
102
            runtime,
103
            hops,
104
            ctx,
105
            bwd_tx,
106
            memquota,
107
        }
108
    }
109

            
110
    /// Return a reference to our hop list.
111
    pub(crate) fn hops(&self) -> &Arc<RwLock<CircHopList>> {
112
        &self.hops
113
    }
114

            
115
    /// Set the incoming stream handler for this reactor.
116
    ///
117
    /// There can only be one incoming stream handler per reactor,
118
    /// and each stream handler only pertains to a single hop (see expected_hop())
119
    //
120
    // TODO: eventually, we might want a different design here,
121
    // for example we might want to allow multiple stream handlers per reactor (one per hop).
122
    // However, for now, the implementation is intentionally kept similar to that
123
    // in the client reactor (to make it easier to migrate it to the new reactor design).
124
    //
125
    /// Returns an error if the hop manager already has a stream handler.
126
    ///
127
    /// Since the handler is shared with every hop's stream reactor,
128
    /// this function will update the handler for all of them.
129
    ///
130
    // TODO(DEDUP): almost identical to the client-side
131
    // CellHandlers::set_incoming_stream_req_handler()
132
    #[cfg(any(feature = "hs-service", feature = "relay"))]
133
    pub(crate) fn set_incoming_handler(&self, handler: IncomingStreamRequestHandler) -> Result<()> {
134
        let mut lock = self.ctx.incoming.lock().expect("poisoned lock");
135

            
136
        if lock.is_none() {
137
            *lock = Some(handler);
138
            Ok(())
139
        } else {
140
            Err(Error::from(internal!(
141
                "Tried to install a BEGIN cell handler before the old one was gone."
142
            )))
143
        }
144
    }
145

            
146
    /// Push a new hop to our hop list.
147
    ///
148
    /// Prepares a cc object for the hop, but does not spawn a stream reactor.
149
    ///
150
    /// Will return an error if the circuit already has [`u8::MAX`] hops.
151
    pub(crate) fn add_hop(&mut self, settings: HopSettings) -> Result<()> {
152
        let mut hops = self.hops.write().expect("poisoned lock");
153
        hops.add_hop(settings)
154
    }
155

            
156
    /// Send a message to the stream reactor of the specified `hop`,
157
    /// spawning it if necessary.
158
    pub(crate) async fn send(
159
        &mut self,
160
        hopnum: Option<HopNum>,
161
        msg: StreamMsg,
162
    ) -> StdResult<(), ReactorError> {
163
        let mut tx = self.get_or_spawn_stream_reactor(hopnum)?;
164

            
165
        tx.send(msg).await.map_err(|_| {
166
            // The stream reactor has shut down
167
            ReactorError::Shutdown
168
        })
169
    }
170

            
171
    /// Get a handle to the stream reactor, spawning it if necessary
172
    fn get_or_spawn_stream_reactor(
173
        &self,
174
        hopnum: Option<HopNum>,
175
    ) -> StdResult<mpsc::Sender<StreamMsg>, ReactorError> {
176
        let mut hops = self.hops.write().expect("poisoned lock");
177
        let hop = hops
178
            .get_mut(hopnum)
179
            .ok_or_else(|| internal!("tried to send cell to nonexistent hop?!"))?;
180

            
181
        let tx = match &hop.tx {
182
            Some(tx) => tx.clone(),
183
            None => {
184
                // If we don't have a handle to the stream reactor,
185
                // it means it hasn't been spawned yet, so we have to spawn it now.
186
                let tx =
187
                    self.spawn_stream_reactor(hopnum, &hop.settings, Arc::clone(&hop.ccontrol))?;
188

            
189
                hop.tx = Some(tx.clone());
190

            
191
                // Return a copy of this sender (can't borrow because the hop
192
                // is behind a Mutex, and we can't keep it locked across the send()
193
                // await point)
194
                tx
195
            }
196
        };
197

            
198
        Ok(tx)
199
    }
200

            
201
    /// Spawn a [`StreamReactor`] for the specified hop.
202
    fn spawn_stream_reactor(
203
        &self,
204
        hopnum: Option<HopNum>,
205
        settings: &HopSettings,
206
        ccontrol: Arc<Mutex<CongestionControl>>,
207
    ) -> StdResult<mpsc::Sender<StreamMsg>, ReactorError> {
208
        use tor_rtcompat::SpawnExt as _;
209

            
210
        // NOTE: not registering this channel with the memquota subsystem is okay,
211
        // because it has no buffering (if ever decide to make the size of this buffer
212
        // non-zero for whatever reason, we must remember to register it with memquota
213
        // so that it counts towards the total memory usage for the circuit.
214
        //
215
        // TODO(tuning): having zero buffering here is very likely suboptimal.
216
        // We should do *some* buffering here, and then figure out if we should it
217
        // up to memquota or not.
218
        #[allow(clippy::disallowed_methods)]
219
        let (fwd_stream_tx, fwd_stream_rx) = mpsc::channel(0);
220

            
221
        let flow_ctrl_params = Arc::new(settings.flow_ctrl_params.clone());
222
        let relay_format = settings.relay_crypt_protocol().relay_cell_format();
223
        let outbound = CircHopOutbound::new(ccontrol, relay_format, flow_ctrl_params, settings);
224

            
225
        let stream_reactor = StreamReactor::new(
226
            self.runtime.clone(),
227
            hopnum,
228
            outbound,
229
            self.ctx.unique_id,
230
            fwd_stream_rx,
231
            self.bwd_tx.clone(),
232
            Arc::clone(&self.ctx.timeouts),
233
            #[cfg(any(feature = "hs-service", feature = "relay"))]
234
            Arc::clone(&self.ctx.incoming),
235
            self.memquota.clone(),
236
        );
237

            
238
        self.runtime
239
            .spawn(async {
240
                let _ = stream_reactor.run().await;
241
            })
242
            .map_err(|_| ReactorError::Shutdown)?;
243

            
244
        Ok(fwd_stream_tx)
245
    }
246
}