1
//! Download Management for [`super`].
2
//!
3
//! This module consists of [`DownloadManager`], a helper for
4
//! downloading network documents.
5
//!
6
//! More information about the proper use can be found in the documentation
7
//! of the respective data type.
8

            
9
use std::{collections::VecDeque, fmt::Debug, net::SocketAddr};
10

            
11
use rand::{seq::IndexedRandom, Rng};
12
use retry_error::RetryError;
13
use tokio::net::TcpStream;
14
use tokio_util::compat::TokioAsyncReadCompatExt;
15
use tor_basic_utils::retry::RetryDelay;
16
use tor_dirclient::request::Requestable;
17
use tor_error::internal;
18
use tor_rtcompat::{PreferredRuntime, SleepProvider};
19
use tracing::{debug, warn};
20

            
21
use crate::err::AuthorityCommunicationError;
22

            
23
/// Download manager for authority requests.
24
///
25
/// This structure serves as the main interface for downloading documents from
26
/// a directory authority.  It implements the logic for retrying failed
27
/// downloads properly.
28
///
29
/// Technically, this structure does not need to be a structure and could be a
30
/// simple method instead.  However, because many settings stay the same across
31
/// all download attempts throughout the run-time of this program, making this
32
/// a separate structure is convenient for making the method signature smaller.
33
/// No state is kept *within* this structure, instead
34
/// [`DownloadManager::download()`] accepts an optional reference to a preferred
35
/// authority while returning the actual used authority, which the caller may
36
/// store in order to use that authority again in the future.
37
///
38
/// It may be worth to note that two round-robin loops, with one of them being
39
/// nested inside the other, are being used here.  The first serves as an
40
/// implementation of the specification in order to retry a download from a
41
/// different authority.  The second, inner, round-robin loop serves as an
42
/// implementation for happy-eyeballs, which, most commonly,  tries to connect
43
/// to both, the IPv4 and IPv6 (if present), utilizing the first one that
44
/// succeeds.  Keep in mind that error handling between these two is different.
45
/// The outer round-robin loop uses [`RetryError`], keeping track of all errors
46
/// in case that the download fails from all authorities, whereas the inner
47
/// round-robin loop uses [`TcpStream::connect()`], which only returns the error
48
/// of the last failed connection attempt, in the case that all attempts have
49
/// failed.
50
///
51
/// # Algorithm
52
///
53
/// 1. Shuffle the list of authorities in a randomized fashion.
54
/// 2. If there is a preferred authority, swap it with the first item in the list.
55
/// 3. Iterate through the list, calling [`tor_dirclient::send_request`].
56
///    3.1. If successful, set preferred authority to the current one and return.
57
///    3.2. If it failed, timeout with [`RetryDelay`] and go to 3.
58
///
59
/// # Specifications
60
///
61
/// * <https://spec.torproject.org/dir-spec/directory-cache-operation.html#general-download-behavior>
62
/// * <https://spec.torproject.org/dir-spec/directory-cache-operation.html#retry-as-cache>
63
#[derive(Debug)]
64
pub(super) struct DownloadManager<'a, 'b> {
65
    /// The list of download authorities.
66
    ///
67
    /// TODO DIRMIRROR: Consider accepting an AuthorityContacts and extract the
68
    /// download addresses ourselves?
69
    authorities: &'a Vec<Vec<SocketAddr>>,
70

            
71
    /// A handle to the runtime that is being used.
72
    rt: &'b PreferredRuntime,
73
}
74

            
75
impl<'a, 'b> DownloadManager<'a, 'b> {
76
    /// Creates a new [`DownloadManager`] with a set of download authorities.
77
8
    pub(super) fn new(authorities: &'a Vec<Vec<SocketAddr>>, rt: &'b PreferredRuntime) -> Self {
78
8
        Self { authorities, rt }
79
8
    }
80

            
81
    /// Performs a download to a single authority.
82
    ///
83
    /// To implement the retry algorithm from the spec, `endpoints` must be the
84
    /// available addresses (for all address families) for a single authority.
85
36
    async fn download_single<Req: Requestable + Debug>(
86
36
        &self,
87
36
        endpoints: &[SocketAddr],
88
36
        req: &Req,
89
36
    ) -> Result<Vec<u8>, AuthorityCommunicationError> {
90
        // This check is important because tokio will panic otherwise.
91
36
        if endpoints.is_empty() {
92
            return Err(AuthorityCommunicationError::Bug(internal!(
93
                "empty endpoints?"
94
            )));
95
36
        }
96

            
97
        // Fortunately, Tokio's TcpStream::connect already offers round-robin.
98
36
        let stream = TcpStream::connect(&endpoints).await.map_err(|error| {
99
            AuthorityCommunicationError::TcpConnect {
100
                endpoints: endpoints.to_vec(),
101
                error,
102
            }
103
        })?;
104

            
105
36
        debug!(
106
            "connected to {}",
107
            stream
108
                .peer_addr()
109
                .map(|x| x.to_string())
110
                .unwrap_or("N/A".to_string())
111
        );
112
36
        let mut stream = stream.compat();
113

            
114
        // Perform the actual request.
115
36
        match tor_dirclient::send_request(self.rt, req, &mut stream, None)
116
36
            .await
117
36
            .map(|resp| resp.into_output())
118
        {
119
6
            Ok(Ok(resp)) => Ok(resp),
120
14
            Ok(Err(e)) => Err(Box::new(tor_dirclient::Error::RequestFailed(e)).into()),
121
16
            Err(e) => Err(Box::new(e).into()),
122
        }
123
36
    }
124

            
125
    /// Downloads a [`Requestable`] from the download authorities.
126
    ///
127
    /// The relevant algorithm is non-trivial, but well-documented in the
128
    /// [`DownloadManager`], which is why we will leave it out here by
129
    /// just referencing to it.
130
    ///
131
    /// Returns the actual used authority as well as the response, or a
132
    /// collection of errors.
133
    #[allow(clippy::cognitive_complexity)]
134
8
    pub(super) async fn download<Req: Requestable + Debug, R: Rng>(
135
8
        &self,
136
8
        req: &Req,
137
8
        preferred: Option<&'a Vec<SocketAddr>>,
138
8
        rng: &mut R,
139
8
    ) -> Result<(&'a Vec<SocketAddr>, Vec<u8>), RetryError<AuthorityCommunicationError>> {
140
        // Because this is a round-robin approach, we want to collect errors.
141
8
        let mut err = RetryError::in_attempt_to("request to authority");
142

            
143
        // Use this struct to calculate delays between iterations.
144
8
        let mut retry_delay = RetryDelay::default();
145

            
146
        // Shuffle the list of authorities in a randomized order.
147
8
        let mut random_auths = self
148
8
            .authorities
149
8
            .choose_multiple(rng, self.authorities.len())
150
8
            .collect::<VecDeque<_>>();
151

            
152
        // If we have a preferred authority, move it to the front.
153
8
        if let Some(preferred) = preferred {
154
            // In this case, we first throw it out and insert it to the start.
155
            random_auths.retain(|x| *x != preferred);
156
            random_auths.push_front(preferred);
157
8
        }
158
8
        assert_eq!(random_auths.len(), self.authorities.len());
159

            
160
38
        for endpoints in random_auths {
161
36
            if endpoints.is_empty() {
162
                warn!("empty endpoints in authority?");
163
                continue;
164
36
            }
165

            
166
36
            match self.download_single(endpoints, req).await {
167
6
                Ok(resp) => {
168
6
                    debug!("request {req:?} to {endpoints:?} succeeded!");
169
6
                    return Ok((endpoints, resp));
170
                }
171
30
                Err(e) => {
172
30
                    let delay = retry_delay.next_delay(rng);
173
30
                    debug!("request {req:?} to {endpoints:?} failed: {e}");
174
30
                    debug!("retrying in {}s", delay.as_secs());
175
30
                    err.push_timed(e, self.rt.now(), Some(self.rt.wallclock()));
176
30
                    tokio::time::sleep(delay).await;
177
                }
178
            }
179
        }
180

            
181
2
        Err(err)
182
8
    }
183
}
184

            
185
#[cfg(test)]
186
mod test {
187
    // @@ begin test lint list maintained by maint/add_warning @@
188
    #![allow(clippy::bool_assert_comparison)]
189
    #![allow(clippy::clone_on_copy)]
190
    #![allow(clippy::dbg_macro)]
191
    #![allow(clippy::mixed_attributes_style)]
192
    #![allow(clippy::print_stderr)]
193
    #![allow(clippy::print_stdout)]
194
    #![allow(clippy::single_char_pattern)]
195
    #![allow(clippy::unwrap_used)]
196
    #![allow(clippy::unchecked_time_subtraction)]
197
    #![allow(clippy::useless_vec)]
198
    #![allow(clippy::needless_pass_by_value)]
199
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
200

            
201
    use std::{
202
        io::ErrorKind,
203
        sync::{
204
            atomic::{AtomicUsize, Ordering},
205
            Arc, Mutex,
206
        },
207
    };
208

            
209
    use tokio::{
210
        io::{AsyncReadExt, AsyncWriteExt},
211
        net::TcpListener,
212
    };
213
    use tor_basic_utils::test_rng::testing_rng;
214
    use tor_dirclient::{request::ConsensusRequest, RequestError};
215
    use tor_netdoc::doc::netstatus::ConsensusFlavor;
216

            
217
    use super::*;
218

            
219
    /// Testing a request that is immediately successful.
220
    #[tokio::test]
221
    async fn request_legit() {
222
        let server = TcpListener::bind("[::]:0").await.unwrap();
223
        let server_addr = server.local_addr().unwrap();
224

            
225
        tokio::task::spawn(async move {
226
            let mut conn = server.accept().await.unwrap().0;
227
            let mut buf = vec![0; 1024];
228
            let _ = conn.read(&mut buf).await.unwrap();
229
            conn.write_all(b"HTTP/1.0 200 OK\r\nContent-Length: 3\r\n\r\nfoo")
230
                .await
231
                .unwrap();
232
        });
233

            
234
        let authorities = vec![vec![server_addr]];
235
        let rt = PreferredRuntime::current().unwrap();
236

            
237
        let mgr = DownloadManager::new(&authorities, &rt);
238
        let (preferred, resp) = mgr
239
            .download(
240
                &ConsensusRequest::new(ConsensusFlavor::Plain),
241
                None,
242
                &mut testing_rng(),
243
            )
244
            .await
245
            .unwrap();
246

            
247
        assert_eq!(resp, b"foo");
248
        assert_eq!(preferred, &authorities[0]);
249
    }
250

            
251
    /// Testing for a request that initially fails by returning a 404 but later succeeds.
252
    #[tokio::test(start_paused = true)]
253
    async fn request_fail_but_succeed() {
254
        let mut server_addrs = Vec::new();
255
        let requ_counter = Arc::new(AtomicUsize::new(0));
256
        let last = Arc::new(Mutex::new(Vec::new()));
257
        for _ in 0..8 {
258
            let server = TcpListener::bind("[::]:0").await.unwrap();
259
            let server_addr = server.local_addr().unwrap();
260
            let requ_counter = requ_counter.clone();
261
            let last = last.clone();
262
            server_addrs.push(vec![server_addr]);
263

            
264
            tokio::task::spawn(async move {
265
                loop {
266
                    let (mut conn, _) = server.accept().await.unwrap();
267

            
268
                    // Store which server_addr was active last, because only the
269
                    // last one will succeed.
270
                    *last.lock().unwrap() = vec![server_addr];
271

            
272
                    // This read is important!
273
                    // Otherwise this server will terminate the connection with
274
                    // RST instead of FIN, causing everything to fail.
275
                    let mut buf = vec![0; 1024];
276
                    let _ = conn.read(&mut buf).await.unwrap();
277

            
278
                    let cur_req = requ_counter.fetch_add(1, Ordering::AcqRel);
279

            
280
                    if cur_req < 7 {
281
                        // Send a failure.
282
                        conn.write_all(b"HTTP/1.0 404 Not Found\r\n\r\n")
283
                            .await
284
                            .unwrap();
285
                    } else {
286
                        // Send a success.
287
                        conn.write_all(b"HTTP/1.0 200 OK\r\nContent-Length: 3\r\n\r\nfoo")
288
                            .await
289
                            .unwrap();
290
                    }
291
                }
292
            });
293
        }
294

            
295
        let rt = PreferredRuntime::current().unwrap();
296
        let mgr = DownloadManager::new(&server_addrs, &rt);
297

            
298
        let (preferred, resp) = mgr
299
            .download(
300
                &ConsensusRequest::new(ConsensusFlavor::Plain),
301
                None,
302
                &mut testing_rng(),
303
            )
304
            .await
305
            .unwrap();
306

            
307
        assert_eq!(resp, b"foo");
308
        assert_eq!(*preferred, *last.lock().unwrap());
309
    }
310

            
311
    /// Request that fails all the time.
312
    ///
313
    /// Failures are done by a server accept and then immediately closing the
314
    /// connection.
315
    #[tokio::test(start_paused = true)]
316
    async fn request_fail_ultimately() {
317
        let mut server_addrs = Vec::new();
318
        for _ in 0..8 {
319
            let server = TcpListener::bind("[::]:0").await.unwrap();
320
            let server_addr = server.local_addr().unwrap();
321
            server_addrs.push(vec![server_addr]);
322

            
323
            tokio::task::spawn(async move {
324
                loop {
325
                    let _ = server.accept().await.unwrap();
326
                }
327
            });
328
        }
329

            
330
        let rt = PreferredRuntime::current().unwrap();
331
        let mgr = DownloadManager::new(&server_addrs, &rt);
332

            
333
        let errs = mgr
334
            .download(
335
                &ConsensusRequest::new(ConsensusFlavor::Plain),
336
                None,
337
                &mut testing_rng(),
338
            )
339
            .await
340
            .unwrap_err();
341

            
342
        // This is just a longer loop to assert all errors are either resets or truncated headers.
343
        //
344
        // Because the detection of TCP RST in itself tends to be stochastic at best,
345
        // we also check for TruncatedHeaders, which is what tor-dirclient will return
346
        // when it performs a successful(!) read(2) returning zero bytes, indicating
347
        // a successful closure of the connection.
348
        for err in errs {
349
            match err {
350
                AuthorityCommunicationError::Dirclient(e) => match *e {
351
                    tor_dirclient::Error::RequestFailed(e) => match e.error {
352
                        RequestError::IoError(e) => match e.kind() {
353
                            ErrorKind::ConnectionReset => {}
354
                            e => unreachable!("{e}"),
355
                        },
356
                        RequestError::TruncatedHeaders => {}
357
                        e => unreachable!("{e}"),
358
                    },
359
                    e => unreachable!("{e}"),
360
                },
361
                e => unreachable!("{e}"),
362
            }
363
        }
364
    }
365
}