1
//! Handler for EXTEND2 cells.
2

            
3
use super::{CircEvent, ExtendResult, Outbound};
4

            
5
use crate::Error;
6
use crate::circuit::UniqId;
7
use crate::circuit::create::{Create2Wrap, CreateHandshakeWrap};
8
use crate::peer::PeerInfo;
9
use crate::relay::channel_provider::{ChannelProvider, ChannelResult, OutboundChanSender};
10
use crate::relay::reactor::CircuitAccount;
11
use crate::util::err::ReactorError;
12
use tor_cell::chancell::AnyChanCell;
13
use tor_cell::relaycell::UnparsedRelayMsg;
14
use tor_cell::relaycell::msg::{Extend2, Extended2};
15
use tor_error::{internal, into_internal, warn_report};
16
use tor_linkspec::decode::Strictness;
17
use tor_linkspec::{HasRelayIds, OwnedChanTarget, OwnedChanTargetBuilder};
18
use tor_rtcompat::{Runtime, SpawnExt as _};
19

            
20
use futures::channel::mpsc;
21
use futures::{SinkExt as _, StreamExt as _};
22
use tracing::{debug, trace};
23

            
24
use std::result::Result as StdResult;
25
use std::sync::Arc;
26

            
27
/// Helper for handling EXTEND2 cells.
28
pub(super) struct ExtendRequestHandler {
29
    /// An identifier for logging about this handler.
30
    unique_id: UniqId,
31
    /// Whether we have received an EXTEND2 on this circuit.
32
    ///
33
    // TODO(relay): bools can be finicky.
34
    // Maybe we should combine this bool and the optional
35
    // outbound into a new state machine type
36
    // (with states Initial -> Extending -> Extended(Outbound))?
37
    // But should not do this if it turns out more convoluted than the bool-based approach.
38
    have_seen_extend2: bool,
39
    /// A handle to a [`ChannelProvider`], used for initiating outgoing Tor channels.
40
    ///
41
    /// Note: all circuit reactors of a relay need to be initialized
42
    /// with the *same* underlying Tor channel provider (`ChanMgr`),
43
    /// to enable the reuse of existing Tor channels where possible.
44
    chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
45
    /// The identity of the inbound relay (the previous hop).
46
    inbound_peer: Arc<PeerInfo>,
47
    /// A stream of events to be read from the main loop of the reactor.
48
    event_tx: mpsc::Sender<CircEvent>,
49
    /// Memory quota account
50
    memquota: CircuitAccount,
51
}
52

            
53
impl ExtendRequestHandler {
54
    /// Create a new [`ExtendRequestHandler`].
55
20
    pub(super) fn new(
56
20
        unique_id: UniqId,
57
20
        chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
58
20
        inbound_peer: Arc<PeerInfo>,
59
20
        event_tx: mpsc::Sender<CircEvent>,
60
20
        memquota: CircuitAccount,
61
20
    ) -> Self {
62
20
        Self {
63
20
            unique_id,
64
20
            have_seen_extend2: false,
65
20
            chan_provider,
66
20
            inbound_peer,
67
20
            event_tx,
68
20
            memquota,
69
20
        }
70
20
    }
71

            
72
    /// Handle an EXTEND2 cell.
73
    ///
74
    /// This spawns a background task for dealing with the circuit extension,
75
    /// which then reports back the result via the [`Self::event_tx`] MPSC stream.
76
    /// Note that this MPSC stream is polled from the `ForwardReactor` main loop,
77
    /// and each `CircEvent` is passed back to [`Forward`](super::Forward)'s
78
    /// [`ForwardHandler::handle_event`](crate::circuit::reactor::forward::ForwardHandler::handle_event)
79
    /// implementation for handling.
80
12
    pub(super) fn handle_extend2<R: Runtime>(
81
12
        &mut self,
82
12
        runtime: &R,
83
12
        early: bool,
84
12
        msg: UnparsedRelayMsg,
85
12
    ) -> StdResult<(), ReactorError> {
86
        // TODO(relay): this should be allowed if the AllowNonearlyExtend consensus
87
        // param is set (arti#2349)
88
12
        if !early {
89
4
            return Err(Error::CircProto("got EXTEND2 in a RELAY cell?!".into()).into());
90
8
        }
91

            
92
        // Check if we're in the right state before parsing the EXTEND2
93
8
        if self.have_seen_extend2 {
94
            return Err(Error::CircProto("got 2 EXTEND2 on the same circuit?!".into()).into());
95
8
        }
96

            
97
8
        self.have_seen_extend2 = true;
98

            
99
8
        let to_bytes_err = |e| Error::from_bytes_err(e, "EXTEND2 message");
100

            
101
8
        let extend2 = msg.decode::<Extend2>().map_err(to_bytes_err)?.into_msg();
102

            
103
8
        let chan_target = OwnedChanTargetBuilder::from_encoded_linkspecs(
104
8
            Strictness::Standard,
105
8
            extend2.linkspecs(),
106
        )
107
8
        .map_err(|err| Error::LinkspecDecodeErr {
108
            object: "EXTEND2",
109
            err,
110
        })?
111
8
        .build()
112
8
        .map_err(|_| {
113
            // TODO: should we include the error in the circ proto error context?
114
            Error::CircProto("Invalid channel target".into())
115
        })?;
116

            
117
8
        if chan_target.has_any_relay_id_from(&*self.inbound_peer) {
118
4
            return Err(Error::CircProto("Cannot extend circuit to previous hop".into()).into());
119
4
        }
120

            
121
        // Note: we don't do any further validation on the EXTEND2 here,
122
        // under the assumption it will be handled by the ChannelProvider.
123

            
124
4
        let (chan_tx, chan_rx) = mpsc::unbounded();
125

            
126
4
        let chan_tx = OutboundChanSender(chan_tx);
127
4
        Arc::clone(&self.chan_provider).get_or_launch(self.unique_id, chan_target, chan_tx)?;
128

            
129
4
        let mut result_tx = self.event_tx.clone();
130
4
        let rt = runtime.clone();
131
4
        let unique_id = self.unique_id;
132
4
        let memquota = self.memquota.clone();
133

            
134
        // TODO(relay): because we dispatch this the entire EXTEND2 handling to a background task,
135
        // we don't really need the channel provider to send us the outcome via an MPSC channel,
136
        // because get_or_launch() could simply be async (it wouldn't block the reactor,
137
        // because it runs in another task). Maybe we need to rethink the ChannelProvider API?
138
4
        runtime
139
4
            .spawn(async move {
140
4
                let res = Self::extend_circuit(rt, unique_id, extend2, chan_rx, memquota).await;
141

            
142
                // Discard the error if the reactor shut down before we had
143
                // a chance to complete the extend handshake
144
4
                let _ = result_tx.send(CircEvent::ExtendResult(res)).await;
145
4
            })
146
4
            .map_err(into_internal!("failed to spawn extend task?!"))?;
147

            
148
4
        Ok(())
149
12
    }
150

            
151
    /// Extend this circuit on the channel received on `chan_rx`.
152
    ///
153
    /// Note: this gets spawned in a background task from
154
    /// [`Self::handle_extend2`] so as not to block the reactor main loop.
155
4
    async fn extend_circuit<R: Runtime>(
156
4
        _runtime: R,
157
4
        unique_id: UniqId,
158
4
        extend2: Extend2,
159
4
        mut chan_rx: mpsc::UnboundedReceiver<ChannelResult>,
160
4
        memquota: CircuitAccount,
161
4
    ) -> StdResult<ExtendResult, ReactorError> {
162
        // We expect the channel build timeout to be enforced by the ChannelProvider
163
4
        let chan_res = chan_rx
164
4
            .next()
165
4
            .await
166
4
            .ok_or_else(|| internal!("channel provider task exited"))?;
167

            
168
4
        let channel = match chan_res {
169
4
            Ok(c) => c,
170
            Err(e) => {
171
                warn_report!(e, "Failed to launch outgoing channel");
172
                // Note: retries are handled within
173
                // get_or_launch(), so if we receive an
174
                // error at this point, we need to bail
175
                return Err(ReactorError::Shutdown);
176
            }
177
        };
178

            
179
4
        debug!(
180
            circ_id = %unique_id,
181
            "Launched channel to the next hop"
182
        );
183

            
184
        // Now that we finally have a forward Tor channel,
185
        // it's time to forward the onion skin and extend the circuit...
186
        //
187
        // Note: the only reason we need to await here is because internally
188
        // new_outbound_circ() sends a control message to the channel reactor handles,
189
        // which is handled asynchronously. In practice, we're not actually waiting on
190
        // the network here, so in theory we shouldn't need a timeout for this operation.
191
4
        let (circ_id, outbound_chan_rx, createdreceiver) =
192
4
            channel.new_outbound_circ(memquota).await?;
193

            
194
        // We have allocated a circuit in the channel's circmap,
195
        // now it's time to send the CREATE2 and wait for the response.
196
4
        let create2_wrap = Create2Wrap {
197
4
            handshake_type: extend2.handshake_type(),
198
4
        };
199
4
        let create2 = create2_wrap.to_chanmsg(extend2.handshake().into());
200

            
201
        // Time to write the CREATE2 to the outbound channel...
202
4
        let mut outbound_chan_tx = channel.sender();
203
4
        let cell = AnyChanCell::new(Some(circ_id), create2);
204

            
205
4
        trace!(
206
            circ_id = %unique_id,
207
            "Sending CREATE2 to the next hop"
208
        );
209

            
210
4
        outbound_chan_tx.send((cell, None)).await?;
211

            
212
        // TODO(relay): we need a timeout here, otherwise we might end up waiting forever
213
        // for the CREATED2 to arrive.
214
        //
215
        // There is some complexity here, see
216
        // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3648#note_3340125
217
4
        let response = createdreceiver
218
4
            .await
219
4
            .map_err(|_| internal!("channel disappeared?"))?;
220

            
221
4
        trace!(
222
            circ_id = %unique_id,
223
            "Got CREATED2 response from next hop"
224
        );
225

            
226
4
        let outbound = Outbound {
227
4
            circ_id,
228
4
            channel: Arc::clone(&channel),
229
4
            outbound_chan_tx,
230
4
        };
231

            
232
        // If we reach this point, it means we have extended
233
        // the circuit by one hop, so we need to take the contents
234
        // of the CREATE/CREATED2 cell, and package an EXTEND/EXTENDED2
235
        // to send back to the client.
236
4
        let created2_body = create2_wrap.decode_chanmsg(response)?;
237
4
        let extended2 = Extended2::new(created2_body);
238

            
239
4
        Ok(ExtendResult {
240
4
            extended2,
241
4
            outbound,
242
4
            outbound_chan_rx,
243
4
        })
244
4
    }
245
}