1
//! Directory Mirror Operation.
2
//!
3
//! # Specifications
4
//!
5
//! * [Directory cache operation](https://spec.torproject.org/dir-spec/directory-cache-operation.html).
6
//!
7
//! # Rationale
8
//!
9
//! This module implements the "core operation" of a directory mirror.
10
//! "Core operation" primarily refers to the logic involved in downloading
11
//! network documents from an upstream authority and inserting them into the
12
//! database.  This module notably **DOES NOT** provide any public (in the HTTP
13
//! sense) endpoints for querying documents.  This is purposely behind a different
14
//! module, so that the directory authority implementation can also make use of it.
15
//! You can think of this module as the one implementing the things unique
16
//! to directory mirrors.
17

            
18
use std::{
19
    collections::{HashSet, VecDeque},
20
    net::SocketAddr,
21
};
22

            
23
use r2d2::Pool;
24
use r2d2_sqlite::SqliteConnectionManager;
25
use rand::Rng;
26
use rusqlite::Transaction;
27
use strum::IntoEnumIterator;
28
use tokio::net::TcpStream;
29
use tokio_util::compat::TokioAsyncReadCompatExt;
30
use tor_checkable::Timebound;
31
use tor_dirclient::request::{AuthCertRequest, ConsensusRequest, Requestable};
32
use tor_dircommon::{authority::AuthorityContacts, config::DirTolerance};
33
use tor_error::{internal, into_internal};
34
use tor_netdoc::{
35
    doc::{
36
        authcert::{AuthCertKeyIds, AuthCertUnverified},
37
        netstatus::ConsensusFlavor,
38
    },
39
    parse2::{
40
        self,
41
        poc::netstatus::{cons, md},
42
        NetdocParseable, NetdocParseableUnverified, ParseInput,
43
    },
44
};
45
use tor_rtcompat::PreferredRuntime;
46
use tracing::{debug, warn};
47

            
48
use crate::{
49
    database::{self as db, AuthCertMeta, ConsensusMeta, ContentEncoding, Timestamp},
50
    err::{AuthorityRequestError, DatabaseError, OperationError},
51
};
52

            
53
mod poc;
54

            
55
/// The various states for the [`StaticEngine`].
56
#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::Display)]
57
enum State {
58
    /// Loads the most recent valid (and verified) consensus from the database
59
    /// into memory.
60
    ///
61
    /// Transitions from:
62
    /// * Start, if a recent valid consensus exists in the database.
63
    /// * [`State::StoreConsensus`], if successfully finished.
64
    ///
65
    /// Transitions into:
66
    /// * [`State::Descriptors`]
67
    LoadConsensus,
68

            
69
    /// Downloads the most recent consensus from a directory authority.
70
    ///
71
    /// Transitions from:
72
    /// * Start, if no recent valid consensus exists in the database.
73
    /// * [`State::Descriptors`], if lifetime is over.
74
    /// * [`State::Hibernate`], if lifetime is over.
75
    ///
76
    /// Transitions into:
77
    /// * [`State::AuthCerts`], if we miss authority certificates.
78
    /// * [`State::StoreConsensus`], if all authority certificates exist in the
79
    ///   database.
80
    // TODO DIRMIRROR: What to do in the case of getting an invalid consensus
81
    // such as junk data?  The normal retry logic sounds reasonable here.
82
    FetchConsensus,
83

            
84
    /// Downloads, validates, and stores the missing authority certificates from
85
    /// the downloaded unvalidated consensus into the database.
86
    ///
87
    /// Transitions from:
88
    /// * [`State::FetchConsensus`], if we miss authority certificates.
89
    /// * [`State::AuthCerts`], if we still miss authority certificates.
90
    ///
91
    /// Transitions into:
92
    /// * [`State::AuthCerts`], if we still miss authority certificates.
93
    /// * [`State::StoreConsensus`], if we got all authority certificates.
94
    // TODO DIRMIRROR: What to do in the case of a MITM attack where an attacker
95
    // adds lots of invalid signature items at the bottom, leading to lots of
96
    // queries for directory authority certificates, which may succeed or not?
97
    // Best idea is probably to only download authcerts whose id fingerprints
98
    // are configured in our AuthorityContacts, because then we have an upper
99
    // limit.
100
    AuthCerts,
101

            
102
    /// Validates and stores the downloaded unvalidated consensus into the
103
    /// database.
104
    ///
105
    /// Transitions from:
106
    /// * [`State::FetchConsensus`], if we have all authority certificates.
107
    /// * [`State::AuthCerts`], if we have all authority certificates.
108
    ///
109
    /// Transitions into:
110
    /// * [`State::LoadConsensus`]
111
    StoreConsensus,
112

            
113
    /// Downloads missing network documents (descriptors) from a directory
114
    /// authority.
115
    ///
116
    /// Transitions from:
117
    /// * [`State::LoadConsensus`], if we initialize.
118
    /// * [`State::Descriptors`], if we still have missing descriptors left.
119
    ///
120
    /// Transitions into:
121
    /// * [`State::FetchConsensus`], if lifetime is over.
122
    /// * [`State::Descriptors`], if we still have missing descriptors left.
123
    /// * [`State::Hibernate`], if nothing is left.
124
    Descriptors,
125

            
126
    /// Hibernate because nothing is left.
127
    ///
128
    /// Transitions from:
129
    /// * [`State::Descriptors`]
130
    ///
131
    /// Transitions into:
132
    /// * [`State::FetchConsensus`], if the lifetime is over.
133
    Hibernate,
134
}
135

            
136
/// The execution engine for the finite state machine.
137
///
138
/// The states themselves are explained in [`State`].
139
///
140
/// This data structure itself is static and contains no state, but merely
141
/// configuration primitives that stay constant throughout the runtime of the
142
/// program, such as the [`ConsensusFlavor`], the [`AuthorityContacts`], and the
143
/// [`DirTolerance`].  It can be kept throughout the entire runtime and only
144
/// consists for convenience in order to not give each state machine related
145
/// (then static) method a super long signature containing these fields.
146
///
147
/// The state itself is computed fully deterministically from the data found
148
/// within the database and [`ConsensusBoundData`].
149
///
150
/// This is the reason on why this structure is not called `StateMachine`,
151
/// because this implies that the type in itself carries state, which is not
152
/// true, because the state is stored entirely external, with this engine
153
/// only processing and modifying it.
154
///
155
/// See [`StaticEngine::determine_state()`] for more details.
156
#[derive(Debug)]
157
struct StaticEngine {
158
    /// The flavor of the consensus we are serving.
159
    flavor: ConsensusFlavor,
160

            
161
    /// The authorities we are acknowledging.
162
    authorities: AuthorityContacts,
163

            
164
    /// The document tolerance we are accepting.
165
    tolerance: DirTolerance,
166

            
167
    /// The preferred runtime for compatibility with other arti crates.
168
    ///
169
    /// Generally obtained through [`PreferredRuntime::current()`].
170
    rt: PreferredRuntime,
171
}
172

            
173
/// Additional state machine data concerning a single consensus.
174
///
175
/// This enum stores and keeps track of the consensus we are serving and in
176
/// which ✨state✨ it is currently in, such as whether it is verified or not,
177
/// or if we even have a state loaded in memory in the first place.
178
#[derive(Debug, Clone)]
179
enum ConsensusBoundData {
180
    /// No state is loaded in memory at the moment.
181
    None,
182

            
183
    /// We have downloaded a consensus but it is not yet verified.
184
    Unverified {
185
        /// The unverified parsed consensus we have.
186
        // TODO DIRMIRROR: Make this optional, see comment in
187
        // StaticEngine::execute.
188
        consensus: FlavoredConsensusSigned,
189

            
190
        /// The unparsed raw consensus we have.
191
        raw: String,
192
    },
193

            
194
    /// We have downloaded and verified a consensus.
195
    Verified {
196
        /// The verified consensus we have.
197
        consensus: FlavoredConsensus,
198

            
199
        /// When to stop dealing with this consensus and fetching a new one.
200
        lifetime: Timestamp,
201

            
202
        /// SHA-1 digests of the missing server descriptors in the consensus.
203
        server_queue: HashSet<db::Sha1>,
204

            
205
        /// SHA-1 digests of the missing extra-info descriptors in the server
206
        /// descriptors of the consensus.
207
        ///
208
        /// extra-info documents are only transitively related to a consensus
209
        /// through consensus -> server descriptors -> extra-info descriptors
210
        extra_queue: HashSet<db::Sha1>,
211

            
212
        /// SHA-256 digests of the missing micro descriptors in the consensus.
213
        ///
214
        /// This field is technically mutually exclusive to server_queue and
215
        /// extra_queue because micro descriptors are only found in
216
        /// [`ConsensusFlavor::Microdesc`] and server plus extra-info
217
        /// descriptors only in [`ConsensusFlavor::Plain`].  However, because
218
        /// we used a queue based design, we just leave the queue empty instead
219
        /// of wrapping this behind an enum variant for true mutual exclusivity.
220
        /// This makes coding much easier with less boilerplate and neglectable
221
        /// additional runtime cost.
222
        micro_queue: HashSet<db::Sha256>,
223
    },
224
}
225

            
226
/// A [`ConsensusFlavor`]-like wrapper for verified network statuses.
227
///
228
/// This is required because we need to obtain, at least partial, data from
229
/// each consensus, such as the signature (although not this type), the router
230
/// descriptors, validity, and other information.
231
///
232
/// At the current moment, [`tor_netdoc`] itself does not offer things such as
233
/// a common trait for retrieving the common fields, making this structure
234
/// necessary, or alternatively lots of macro magic similar to [`tor_netdoc`].
235
///
236
/// TODO DIRMIRROR: Either add a trait for [`tor_netdoc`] or figure out if the
237
/// fields we require are all of the same type in both, so we can only store
238
/// the fields we are interested in, though this is probably only possible once
239
/// we reached later stages of code.
240
///
241
/// And no, [`std::any::Any`] is not an alternative I am willing to do.
242
#[derive(Debug, Clone)]
243
enum FlavoredConsensus {
244
    /// For plain consensuses.
245
    Ns(cons::NetworkStatus),
246

            
247
    /// For microdescriptor consensuses.
248
    Md(md::NetworkStatus),
249
}
250

            
251
/// A [`ConsensusFlavor`]-like wrapper for unverified network statuses.
252
///
253
/// TODO DIRMIRROR: See the [`FlavoredConsensus`] trait comment.
254
#[derive(Debug, Clone)]
255
enum FlavoredConsensusSigned {
256
    /// For plain consensuses.
257
    Ns(cons::NetworkStatusUnverified),
258

            
259
    /// For microdescriptor consensus.
260
    Md(md::NetworkStatusUnverified),
261
}
262

            
263
impl StaticEngine {
264
    /// Determines the [`State`] only from the database and [`ConsensusBoundData`].
265
    ///
266
    /// This method is fully idempotent, meaning it only depends upon the data
267
    /// found within the database and the [`ConsensusBoundData`]; there is no
268
    /// internal `state` variable or something contained within [`StaticEngine`].
269
6
    fn determine_state(
270
6
        &self,
271
6
        tx: &Transaction<'_>,
272
6
        data: &ConsensusBoundData,
273
6
        now: Timestamp,
274
6
    ) -> Result<State, DatabaseError> {
275
        // Determine the state primarily upon ConsensusBoundData combined with
276
        // a few database queries, as well as the current time of course.
277
6
        let state = match data {
278
            // ConsensusBoundData::None means that we currently have no
279
            // consensus in memory.  This may be the case because we just
280
            // started up or because we just downloaded, validated, and inserted
281
            // a consensus into the database and reset ConsensusBoundData to
282
            // None afterwards.
283
            ConsensusBoundData::None => {
284
                // Check whether there is a valid consensus in the database at all.
285
                //
286
                // Yes, it is kinda redundant querying a consensus here
287
                // and potentially again when loading the consensus, but SQLite
288
                // is very fast and having to maintain two different queries,
289
                // one for checking and one for selecting, is prone to get
290
                // out-of-sync.
291
2
                match ConsensusMeta::query_recent(tx, self.flavor, &self.tolerance, now)? {
292
                    // Some consensus means we can load it.
293
                    Some(_) => State::LoadConsensus,
294

            
295
                    // None means we must download it.
296
2
                    None => State::FetchConsensus,
297
                }
298
            }
299

            
300
            // ConsensusBoundData::Unverified means that we recently downloaded
301
            // a consensus through State::FetchConsensus.  It is not fully
302
            // validated yet and we may not even be able due to missing
303
            // authority certificates.
304
4
            ConsensusBoundData::Unverified { consensus, .. } => {
305
                // Check whether there any missing authority certificates that
306
                // have signed the consensus.
307
4
                let missing_certs = !AuthCertMeta::query_recent(
308
4
                    tx,
309
4
                    &consensus.signatories(),
310
4
                    &self.tolerance,
311
4
                    now,
312
                )?
313
                .1
314
4
                .is_empty();
315

            
316
4
                if missing_certs {
317
                    // Missing authority certificates means we must download
318
                    // them.
319
2
                    State::AuthCerts
320
                } else {
321
                    // If we have all authority certificates, we can validate
322
                    // and store it inside the database.
323
2
                    State::StoreConsensus
324
                }
325
            }
326

            
327
            // ConsensusBoundData::Verified means that we have successfully
328
            // loaded a recent valid consensus from the database using
329
            // State::LoadConsensus.  Depending on this, we download the missing
330
            // network documents (descriptors) from a directory authority, if
331
            // any.
332
            ConsensusBoundData::Verified {
333
                lifetime,
334
                server_queue: servers,
335
                extra_queue: extras,
336
                micro_queue: micros,
337
                ..
338
            } => {
339
                if *lifetime <= now {
340
                    // The lifetime has been surpassed, download a new
341
                    // consensus.  It is very important TO NOT transition to
342
                    // State::LoadConsensus here, because the current consensus
343
                    // may still be valid but not fresh anymore, in which case
344
                    // State::LoadConsensus will continue to obtain it from the
345
                    // database until valid-after has been surpassed, which is
346
                    // most definitely not what we want.
347
                    State::FetchConsensus
348
                } else if servers.is_empty() && extras.is_empty() && micros.is_empty() {
349
                    // All queues are empty, meaning we are done, until lifetime
350
                    // ends.
351
                    State::Hibernate
352
                } else {
353
                    // The lifetime has not been surpassed and we have stuff
354
                    // to download, so we need to obtain the descriptors.
355
                    State::Descriptors
356
                }
357
            }
358
        };
359
6
        Ok(state)
360
6
    }
361

            
362
    /// Executes a single state iteration in the finite state machine.
363
    ///
364
    /// The return value is of type [`Result<(), OperationError>`].
365
    /// The success type is not of much interest for calling applications.
366
    /// However, the error case itself should be passed towards
367
    /// [`crate::err::IsFatal::is_fatal()`] in order to either abort the
368
    /// application or retry with an appropriate timeout.
369
    ///
370
    // TODO: Use tracing instrumentation here.
371
    // TODO DIRMIRROR: Document the state transition check which we have to do
372
    // because of database invariances no longer holding true.
373
    async fn execute<R: Rng>(
374
        &self,
375
        pool: &Pool<SqliteConnectionManager>,
376
        data: &mut ConsensusBoundData,
377
        endpoint: &[SocketAddr],
378
        now: Timestamp,
379
        rng: &mut R,
380
    ) -> Result<(), OperationError> {
381
        // TODO: Should we return DatabaseError or something like
382
        // StateDeterminationError?  Either way, both cases should be seriously
383
        // fatal.
384
        let state = db::read_tx(pool, |tx| self.determine_state(tx, data, now))??;
385
        debug!("state is {state}");
386

            
387
        match state {
388
            State::LoadConsensus => self.load_consensus(pool, data, now, rng),
389
            State::FetchConsensus => Ok(self.fetch_consensus(data, endpoint).await?),
390
            State::AuthCerts => self.auth_certs(pool, data, endpoint, now).await,
391
            State::StoreConsensus => todo!(),
392
            State::Descriptors => todo!(),
393
            State::Hibernate => self.hibernate(data, now).await,
394
        }
395
    }
396

            
397
    /// Executes [`State::LoadConsensus`].
398
    ///
399
    /// This method does the following:
400
    /// * Load the most recent valid consensus from the database.
401
    /// * Compute the lifetime for it.
402
    /// * Compute the missing descriptors for it.
403
2
    fn load_consensus<R: Rng>(
404
2
        &self,
405
2
        pool: &Pool<SqliteConnectionManager>,
406
2
        data: &mut ConsensusBoundData,
407
2
        now: Timestamp,
408
2
        rng: &mut R,
409
2
    ) -> Result<(), OperationError> {
410
        // Load the most recent valid consensus from the database.
411
        //
412
        // If there is no consensus, we should have not entered the state, which
413
        // means that the database must have been externally verified.
414
        // In this case, it is probably better to return a bug, as external
415
        // applications arbitrarily modifying the database while we are running
416
        // leaves too much room for wrong/weird behavior.
417
2
        let (server_queue, extra_queue, micro_queue, lifetime, consensus) =
418
2
            db::read_tx(pool, |tx| {
419
2
                let meta = ConsensusMeta::query_recent(tx, self.flavor, &self.tolerance, now)?
420
2
                    .ok_or(internal!("database externally modified?"))?;
421
2
                let server_queue = meta.missing_servers(tx)?;
422
2
                let extra_queue = meta.missing_extras(tx)?;
423
2
                let micro_queue = meta.missing_micros(tx)?;
424
2
                let lifetime = meta.lifetime(rng);
425
2
                let consensus = meta.data(tx)?;
426
2
                Ok::<_, DatabaseError>((
427
2
                    server_queue,
428
2
                    extra_queue,
429
2
                    micro_queue,
430
2
                    lifetime,
431
2
                    consensus,
432
2
                ))
433
2
            })??;
434

            
435
        // Parse the most recent valid consensus from the database.
436
        //
437
        // TODO DIRMIRROR:
438
        // Because only valid documents may exist in the database, it should
439
        // succeed.  However, there is this weird edge-case where we may have
440
        // inserted a document with a field we do not understand because of
441
        // using an old version.  After upgrading our version we may now
442
        // understand the field and realize it is wrong, leading to a violation
443
        // of this constraint.  Handling this is not very easy; I suppose adding
444
        // an additional column to the meta table storing the last used crate
445
        // version is a sensible idea, with upgrades and downgrades leading to
446
        // a parsing of all network documents within the database, throwing the
447
        // ones out we do not understand (anymore).
448
        //
449
        // See also the relevant MR discussion:
450
        // <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3664#note_3352723>
451
2
        let consensus = match self.flavor {
452
            ConsensusFlavor::Plain => FlavoredConsensus::Ns(
453
2
                parse2::parse_netdoc::<cons::NetworkStatusUnverified>(&ParseInput::new(
454
2
                    &consensus, "",
455
2
                ))
456
2
                .map_err(into_internal!("invalid netdoc in database?"))?
457
                // TODO DIRMIRROR: explain why this is OK, or re-verify the signatures
458
2
                .unwrap_unverified()
459
                .0,
460
            ),
461
            ConsensusFlavor::Microdesc => FlavoredConsensus::Md(
462
                parse2::parse_netdoc::<md::NetworkStatusUnverified>(&ParseInput::new(
463
                    &consensus, "",
464
                ))
465
                .map_err(into_internal!("invalid netdoc in database?"))?
466
                // TODO DIRMIRROR: explain why this is OK, or re-verify the signatures
467
                .unwrap_unverified()
468
                .0,
469
            ),
470
        };
471

            
472
2
        *data = ConsensusBoundData::Verified {
473
2
            consensus,
474
2
            lifetime,
475
2
            server_queue,
476
2
            extra_queue,
477
2
            micro_queue,
478
2
        };
479
2
        Ok(())
480
2
    }
481

            
482
    /// Fetches a consensus from an upstream authority.
483
    // TODO DIRMIRROR: Add logging.
484
    #[allow(clippy::string_slice)] // TODO
485
2
    async fn fetch_consensus(
486
2
        &self,
487
2
        data: &mut ConsensusBoundData,
488
2
        endpoint: &[SocketAddr],
489
3
    ) -> Result<(), AuthorityRequestError> {
490
        // Obtain the consensus.
491
2
        let mut consensus: VecDeque<_> = match self.flavor {
492
2
            ConsensusFlavor::Plain => self
493
2
                .send_request(endpoint, ConsensusRequest::new(self.flavor))
494
2
                .await
495
2
                .map(|(raw, doc)| {
496
2
                    doc.into_iter()
497
2
                        .map(|(doc, start, end)| {
498
2
                            (raw[start..end].to_owned(), FlavoredConsensusSigned::Ns(doc))
499
2
                        })
500
2
                        .collect()
501
2
                }),
502
            ConsensusFlavor::Microdesc => self
503
                .send_request(endpoint, ConsensusRequest::new(self.flavor))
504
                .await
505
                .map(|(raw, doc)| {
506
                    doc.into_iter()
507
                        .map(|(doc, start, end)| {
508
                            (raw[start..end].to_owned(), FlavoredConsensusSigned::Md(doc))
509
                        })
510
                        .collect()
511
                }),
512
        }?;
513

            
514
        // Check for the correct number of results.
515
2
        if consensus.len() != 1 {
516
            return Err(AuthorityRequestError::Response(
517
                "invalid number of consensus?",
518
            ));
519
2
        }
520

            
521
        // expect is fine because we checked the length for one above.
522
2
        let (raw, consensus) = consensus.pop_front().expect("pop_front");
523

            
524
        // And store it.
525
2
        *data = ConsensusBoundData::Unverified { consensus, raw };
526

            
527
2
        Ok(())
528
2
    }
529

            
530
    /// Fetches, validates, and stores authority certificates.
531
    //
532
    // TODO DIRMIRROR: Right now, there is a torspec DoS issue.
533
    // An attacker may add lots of garbage signatures and we will fetch them
534
    // Even checking the ID PK against v3idents is not useful because an
535
    // attacker may still use the same ID PK dozens of times with various
536
    // SK PKs.  A good fix would include checking that no ID PK is duplicate
537
    // AND to ignore all ID PKs we do not recognize.  Also, it would probably
538
    // be best to move the v3idents structure to a HashMap based implementation,
539
    // as well as the signatories result.
540
    #[allow(clippy::string_slice)] // TODO
541
2
    async fn auth_certs(
542
2
        &self,
543
2
        pool: &Pool<SqliteConnectionManager>,
544
2
        data: &mut ConsensusBoundData,
545
2
        endpoint: &[SocketAddr],
546
2
        now: Timestamp,
547
3
    ) -> Result<(), OperationError> {
548
        // Obtain the signatories of the current unverified consensus.
549
2
        let signatories = match data {
550
2
            ConsensusBoundData::Unverified { consensus, .. } => consensus.signatories(),
551
            _ => return Err(OperationError::Bug(internal!("data is not unverified"))),
552
        };
553

            
554
        // Obtain the missing certificate identifiers.
555
2
        let (_, missing) = db::read_tx(pool, |tx| {
556
2
            AuthCertMeta::query_recent(tx, &signatories, &self.tolerance, now)
557
2
        })??;
558
2
        if missing.is_empty() {
559
            // Although not technically fatal, retrying when the database was
560
            // externally modified does not make much sense.
561
            return Err(OperationError::Bug(internal!(
562
                "database externally modified?"
563
            )));
564
2
        }
565

            
566
        // Compose the request.
567
2
        let mut requ = AuthCertRequest::new();
568
18
        for kp in missing.iter().copied() {
569
18
            requ.push(kp);
570
18
        }
571

            
572
        // Fire it off.
573
2
        let (resp, certs) = self
574
2
            .send_request::<_, AuthCertUnverified>(endpoint, requ)
575
2
            .await?;
576

            
577
        // Verify each certificate.  Invalid certificates and other problems get
578
        // logged and filtered out, with the result being then inserted into
579
        // the database.
580
2
        let certs = certs
581
2
            .into_iter()
582
34
            .filter_map(|(unverified, start, end)| {
583
34
                let unverified_body = unverified.inspect_unverified().0;
584
34
                let kp = AuthCertKeyIds {
585
34
                    id_fingerprint: unverified_body.dir_identity_key.to_rsa_identity(),
586
34
                    sk_fingerprint: unverified_body.dir_signing_key.to_rsa_identity(),
587
34
                };
588

            
589
                // Skip certificates we did not asked for.
590
                //
591
                // Not much of an issue because certificate verification will
592
                // usually fail anyways, except for this weird edge-case where we
593
                // actually have that id fingerprint in the v3idents.
594
34
                if !missing.contains(&kp) {
595
16
                    debug!("authority returned certificate we did not asked for: {kp:?}");
596
16
                    return None;
597
18
                }
598

            
599
18
                let verified = unverified
600
18
                    .verify(self.authorities.v3idents())
601
18
                    .and_then(|v| {
602
18
                        Ok(self
603
18
                            .tolerance
604
18
                            .extend_tolerance(v)
605
18
                            .check_valid_at(&now.into())?)
606
18
                    });
607
18
                let verified = match verified {
608
18
                    Ok(v) => v,
609
                    Err(e) => {
610
                        // TODO DIRMIRROR: Log the actual cert.
611
                        warn!("received invalid auth cert: {e}",);
612
                        return None;
613
                    }
614
                };
615

            
616
18
                Some((verified, &resp[start..end]))
617
34
            })
618
2
            .collect::<Vec<_>>();
619

            
620
        // When we have reached this, it means that this call made no progress,
621
        // i.e. the authority only returned certificates we were not interested
622
        // in.
623
2
        if certs.is_empty() {
624
            Err(Box::new(AuthorityRequestError::Response(
625
                "response lead to no progress",
626
            )))?;
627
2
        }
628

            
629
        // Finally, insert them all into the database.
630
2
        db::rw_tx(pool, |tx| {
631
18
            for (cert, data) in certs {
632
18
                AuthCertMeta::insert(tx, ContentEncoding::iter(), &cert, data)?;
633
            }
634
2
            Ok::<_, DatabaseError>(())
635
2
        })??;
636

            
637
2
        Ok(())
638
2
    }
639

            
640
    /// Hibernates for the remaining lifetime of the consensus.
641
    async fn hibernate(
642
        &self,
643
        data: &mut ConsensusBoundData,
644
        now: Timestamp,
645
    ) -> Result<(), OperationError> {
646
        match data {
647
            ConsensusBoundData::None | ConsensusBoundData::Unverified { .. } => {
648
                // This should not happen, we only enter hibernation in a state
649
                // that already has a verified consensus.
650
                return Err(internal!("hibernating without a verified consensus?").into());
651
            }
652
            ConsensusBoundData::Verified { lifetime, .. } => {
653
                let timeout = *lifetime - now;
654
                debug!("hibernating for {}s", timeout.as_secs());
655
                tokio::time::sleep(timeout).await;
656
            }
657
        }
658

            
659
        Ok(())
660
    }
661

            
662
    /// Convenience wrapper around [`tor_dirclient::send_request()`].
663
    ///
664
    /// It opens a TCP connection, performs the request, and parses the result.
665
    ///
666
    /// Returns the raw response alongside the output of
667
    /// [`parse2::parse_netdoc_multiple_with_offsets()`].
668
    ///
669
    /// The output is required because we need the raw document alongside the
670
    /// offsets to have the actual data we will insert into the database later
671
    /// on.
672
4
    async fn send_request<R: Requestable, T: NetdocParseable>(
673
4
        &self,
674
4
        endpoint: &[SocketAddr],
675
4
        requ: R,
676
4
    ) -> Result<(String, Vec<(T, usize, usize)>), AuthorityRequestError> {
677
        // The check is required to not let Tokio panic.
678
4
        if endpoint.is_empty() {
679
            return Err(AuthorityRequestError::Bug(internal!("empty endpoint?")));
680
4
        }
681

            
682
        // Open the TCP connection.
683
4
        let mut stream = TcpStream::connect(endpoint)
684
4
            .await
685
4
            .map_err(AuthorityRequestError::TcpConnect)?
686
4
            .compat();
687

            
688
        // Perform the request and map the result nicely.
689
4
        let resp = tor_dirclient::send_request(&self.rt, &requ, &mut stream, None)
690
4
            .await
691
4
            .map(|resp| resp.output_string().map(|resp| resp.to_owned()));
692

            
693
        // We can immediately drop the connection now, no need to occupy even
694
        // more resources from the authority.  Doing so is fine, it is HTTP/1.0
695
        // and there is no connection reuse anyways.
696
4
        drop(stream);
697

            
698
        // Returning all request failed errors is okay; they all imply that
699
        // retrying from a different authority is fine.
700
        // TODO MSRV: If possible, use Result::flatten once MSRV 1.89.
701
4
        let resp = match resp {
702
4
            Ok(Ok(r)) => Ok(r),
703
            Ok(Err(e)) => Err(e),
704
            Err(tor_dirclient::Error::RequestFailed(e)) => Err(e),
705
            Err(e) => {
706
                return Err(AuthorityRequestError::Bug(internal!(
707
                    "unhandled dirclient error: {e}"
708
                )))
709
            }
710
        }?;
711

            
712
        // Parse the response.
713
4
        let parsed = parse2::parse_netdoc_multiple_with_offsets(&ParseInput::new(&resp, ""))?;
714

            
715
4
        Ok((resp, parsed))
716
4
    }
717
}
718

            
719
impl FlavoredConsensusSigned {
720
    /// Wrapper to obtain the signatories of a flavored consensus.
721
8
    fn signatories(&self) -> Vec<AuthCertKeyIds> {
722
8
        let sigs = match &self {
723
8
            Self::Ns(ns) => &ns.sigs.sigs.directory_signature,
724
            Self::Md(md) => &md.sigs.sigs.directory_signature,
725
        };
726
8
        sigs.iter().map(|sig| sig.key_ids).collect()
727
8
    }
728
}
729

            
730
#[cfg(test)]
731
mod test {
732
    // @@ begin test lint list maintained by maint/add_warning @@
733
    #![allow(clippy::bool_assert_comparison)]
734
    #![allow(clippy::clone_on_copy)]
735
    #![allow(clippy::dbg_macro)]
736
    #![allow(clippy::mixed_attributes_style)]
737
    #![allow(clippy::print_stderr)]
738
    #![allow(clippy::print_stdout)]
739
    #![allow(clippy::single_char_pattern)]
740
    #![allow(clippy::unwrap_used)]
741
    #![allow(clippy::unchecked_time_subtraction)]
742
    #![allow(clippy::useless_vec)]
743
    #![allow(clippy::needless_pass_by_value)]
744
    #![allow(clippy::string_slice)] // See arti#2571
745
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
746

            
747
    use std::time::{Duration, SystemTime};
748

            
749
    use rusqlite::named_params;
750
    use tokio::{
751
        io::{AsyncReadExt, AsyncWriteExt},
752
        net::TcpListener,
753
    };
754
    use tor_basic_utils::test_rng::testing_rng;
755
    use tor_netdoc::parse2::NetdocParseableUnverified;
756

            
757
    use crate::database::sql;
758

            
759
    use super::*;
760

            
761
    fn create_dummy_db() -> Pool<SqliteConnectionManager> {
762
        let pool = db::open("").unwrap();
763

            
764
        let mut conn = pool.get().unwrap();
765
        let tx = conn.transaction().unwrap();
766

            
767
        let cons_docid = db::store_insert(
768
            &tx,
769
            include_bytes!("../../testdata/consensus-ns"),
770
            std::iter::empty(),
771
        )
772
        .unwrap();
773
        let ns1_docid = db::store_insert(
774
            &tx,
775
            include_bytes!("../../testdata/descriptor1-ns"),
776
            std::iter::empty(),
777
        )
778
        .unwrap();
779
        let extra1_docid = db::store_insert(
780
            &tx,
781
            include_bytes!("../../testdata/descriptor1-extra-info"),
782
            std::iter::empty(),
783
        )
784
        .unwrap();
785

            
786
        tx.execute(
787
            sql!(
788
                "
789
                INSERT INTO router_extra_info (docid, unsigned_sha1, kp_relay_id_rsa_sha1)
790
                VALUES
791
                (:docid, :sha1, :fingerprint)
792
                "
793
            ),
794
            named_params! {
795
                ":docid": extra1_docid,
796
                ":sha1": db::Sha1::digest(include_bytes!("../../testdata/descriptor1-extra-info-unsigned")),
797
                ":fingerprint": "000004ACBB9D29BCBA17256BB35928DDBFC8ABA9"
798
            },
799
        )
800
        .unwrap();
801
        tx.execute(
802
            sql!(
803
                "
804
                INSERT INTO router_descriptor
805
                (docid, unsigned_sha1, unsigned_sha2, kp_relay_id_rsa_sha1, flavor, extra_unsigned_sha1)
806
                VALUES
807
                (:docid, :sha1, :sha2, :fingerprint, 'ns', :extra)
808
                "
809
            ),
810
            named_params! {
811
                ":docid": ns1_docid,
812
                ":sha1": db::Sha1::digest(include_bytes!("../../testdata/descriptor1-ns-unsigned")),
813
                ":sha2": db::Sha256::digest(include_bytes!("../../testdata/descriptor1-ns-unsigned")),
814
                ":fingerprint": "000004ACBB9D29BCBA17256BB35928DDBFC8ABA9",
815
                ":extra": db::Sha1::digest(include_bytes!("../../testdata/descriptor1-extra-info-unsigned")),
816
            },
817
        )
818
        .unwrap();
819

            
820
        tx.execute(
821
            sql!(
822
                "
823
                INSERT INTO consensus
824
                (docid, unsigned_sha3_256, flavor, valid_after, fresh_until, valid_until)
825
                VALUES
826
                (:docid, :sha3, 'ns', :valid_after, :fresh_until, :valid_until)
827
                "
828
            ),
829
            named_params! {
830
                ":docid": cons_docid,
831
                ":sha3": "0000000000000000000000000000000000000000000000000000000000000000",
832
                ":valid_after": 1769698800,
833
                ":fresh_until": 1769702400,
834
                ":valid_until": 1769709600,
835
            },
836
        )
837
        .unwrap();
838

            
839
        tx.execute(
840
            sql!(
841
                "
842
                INSERT INTO consensus_router_descriptor_member
843
                (consensus_docid, unsigned_sha1, unsigned_sha2)
844
                VALUES
845
                (:cons_docid, :ns1_sha1, NULL),
846
                (:cons_docid, :ns2_sha1, NULL)
847
                "
848
            ),
849
            named_params! {
850
                ":cons_docid": cons_docid,
851
                ":ns1_sha1": db::Sha1::digest(include_bytes!("../../testdata/descriptor1-ns-unsigned")),
852
                ":ns2_sha1": db::Sha1::digest(include_bytes!("../../testdata/descriptor2-ns-unsigned")),
853
            },
854
        )
855
        .unwrap();
856

            
857
        tx.commit().unwrap();
858

            
859
        pool
860
    }
861

            
862
    #[tokio::test]
863
    async fn state_load_consensus() {
864
        let pool = create_dummy_db();
865
        let mut data = ConsensusBoundData::None;
866
        let engine = StaticEngine {
867
            flavor: ConsensusFlavor::Plain,
868
            authorities: AuthorityContacts::default(),
869
            tolerance: DirTolerance::default(),
870
            rt: PreferredRuntime::current().unwrap(),
871
        };
872

            
873
        let time = SystemTime::UNIX_EPOCH + Duration::from_secs(1769700600); // 2026-01-29 15:30:00
874
        let time: Timestamp = time.into();
875
        let fresh_until = time + Duration::from_secs(60 * 30);
876
        let fresh_until_half = fresh_until + Duration::from_secs(60 * 60);
877

            
878
        engine
879
            .load_consensus(&pool, &mut data, time, &mut testing_rng())
880
            .unwrap();
881

            
882
        // El-cheapo assert_eq due to lack of PartialEq for tor-netdoc poc.
883
        match data {
884
            ConsensusBoundData::Verified {
885
                consensus,
886
                lifetime,
887
                server_queue,
888
                extra_queue,
889
                micro_queue,
890
            } => {
891
                match consensus {
892
                    FlavoredConsensus::Ns(_) => {}
893
                    _ => panic!("consensus not ns"),
894
                }
895
                assert_eq!(
896
                    server_queue,
897
                    HashSet::from([db::Sha1::digest(include_bytes!(
898
                        "../../testdata/descriptor2-ns-unsigned"
899
                    ))])
900
                );
901
                assert!(lifetime >= fresh_until);
902
                assert!(lifetime <= fresh_until_half);
903
                assert!(extra_queue.is_empty());
904
                assert!(micro_queue.is_empty());
905
            }
906
            _ => panic!("data is not verified"),
907
        }
908
    }
909

            
910
    #[tokio::test]
911
    async fn state_fetch_consensus() {
912
        let pool = create_dummy_db();
913
        let mut data = ConsensusBoundData::None;
914
        let engine = StaticEngine {
915
            flavor: ConsensusFlavor::Plain,
916
            authorities: AuthorityContacts::default(),
917
            tolerance: DirTolerance::default(),
918
            rt: PreferredRuntime::current().unwrap(),
919
        };
920

            
921
        let state = db::read_tx(&pool, |tx| {
922
            engine.determine_state(tx, &data, SystemTime::UNIX_EPOCH.into())
923
        })
924
        .unwrap()
925
        .unwrap();
926
        assert_eq!(state, State::FetchConsensus);
927

            
928
        let server = TcpListener::bind("[::]:0").await.unwrap();
929
        let saddr = server.local_addr().unwrap();
930
        tokio::spawn(async move {
931
            let (mut stream, _) = server.accept().await.unwrap();
932
            let mut buf = vec![0; 1024];
933
            let _ = stream.read(&mut buf).await.unwrap();
934

            
935
            let consensus = include_str!("../../testdata/consensus-ns");
936
            let resp = format!(
937
                "HTTP/1.0 200 OK\r\nContent-Encoding: identity\r\nContent-Length: {}\r\n\r\n{consensus}",
938
                consensus.len()
939
            );
940
            stream.write_all(resp.as_bytes()).await.unwrap();
941
        });
942

            
943
        engine.fetch_consensus(&mut data, &[saddr]).await.unwrap();
944
        match data {
945
            ConsensusBoundData::Unverified { consensus, raw } => match consensus {
946
                FlavoredConsensusSigned::Ns(ns) => {
947
                    // El-cheapo verification, this is not a parser unit test.
948
                    assert_eq!(ns.unwrap_unverified().0.r.len(), 2);
949
                    assert_eq!(raw, include_str!("../../testdata/consensus-ns"));
950
                }
951
                _ => panic!("data is not unverified ns consensus"),
952
            },
953
            _ => panic!("data is not unverified"),
954
        }
955
    }
956

            
957
    #[tokio::test]
958
    async fn state_auth_certs() {
959
        let pool = create_dummy_db();
960
        let mut data = ConsensusBoundData::Unverified {
961
            consensus: FlavoredConsensusSigned::Ns(
962
                parse2::parse_netdoc(&ParseInput::new(
963
                    include_str!("../../testdata/consensus-ns"),
964
                    "",
965
                ))
966
                .unwrap(),
967
            ),
968
            raw: include_str!("../../testdata/consensus-ns").to_owned(),
969
        };
970
        let engine = StaticEngine {
971
            flavor: ConsensusFlavor::Plain,
972
            authorities: AuthorityContacts::default(),
973
            tolerance: DirTolerance::default(),
974
            rt: PreferredRuntime::current().unwrap(),
975
        };
976

            
977
        assert_eq!(
978
            db::read_tx(&pool, |tx| engine.determine_state(
979
                tx,
980
                &data,
981
                SystemTime::UNIX_EPOCH.into()
982
            ))
983
            .unwrap()
984
            .unwrap(),
985
            State::AuthCerts
986
        );
987

            
988
        let server = TcpListener::bind("[::]:0").await.unwrap();
989
        let saddr = server.local_addr().unwrap();
990
        tokio::spawn(async move {
991
            let mut buf = [0; 1024];
992
            let (mut stream, _) = server.accept().await.unwrap();
993
            let _ = stream.read(&mut buf).await.unwrap();
994

            
995
            let authcerts = include_str!("../../testdata/authcert-all");
996

            
997
            stream.write_all(format!(
998
                "HTTP/1.0 200 OK\r\nContent-Encoding: identity\r\nContent-Length: {}\r\n\r\n{authcerts}",
999
                authcerts.len()
            ).as_bytes()).await.unwrap();
        });
        // Fetch all authcerts.
        engine
            .auth_certs(
                &pool,
                &mut data,
                &[saddr],
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1770639454)).into(), // Mon Feb  9 12:17:34 UTC 2026
            )
            .await
            .unwrap();
        // Check whether we are done with all authcerts.
        assert_eq!(
            db::read_tx(&pool, |tx| engine.determine_state(
                tx,
                &data,
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1770639454)).into(), // Mon Feb  9 12:17:34 UTC 2026
            ))
            .unwrap()
            .unwrap(),
            State::StoreConsensus
        );
        let recent_authcerts = db::read_tx(&pool, |tx| {
            AuthCertMeta::query_recent(
                tx,
                &FlavoredConsensusSigned::Ns(
                    parse2::parse_netdoc(&ParseInput::new(
                        include_str!("../../testdata/consensus-ns"),
                        "",
                    ))
                    .unwrap(),
                )
                .signatories(),
                &DirTolerance::default(),
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1770639454)).into(), // Mon Feb  9 12:17:34 UTC 2026
            )
        })
        .unwrap()
        .unwrap();
        // TODO DIRMIRROR: Compare more than just length.
        assert_eq!(
            recent_authcerts.0.len(),
            engine.authorities.v3idents().len()
        );
        assert!(recent_authcerts.1.is_empty());
    }
}