1
//! Directory Mirror Operation.
2
//!
3
//! # Specifications
4
//!
5
//! * [Directory cache operation](https://spec.torproject.org/dir-spec/directory-cache-operation.html).
6
//!
7
//! # Rationale
8
//!
9
//! This module implements the "core operation" of a directory mirror.
10
//! "Core operation" primarily refers to the logic involved in downloading
11
//! network documents from an upstream authority and inserting them into the
12
//! database.  This module notably **DOES NOT** provide any public (in the HTTP
13
//! sense) endpoints for querying documents.  This is purposely behind a different
14
//! module, so that the directory authority implementation can also make use of it.
15
//! You can think of this module as the one implementing the things unique
16
//! to directory mirrors.
17

            
18
use std::{
19
    collections::{HashSet, VecDeque},
20
    net::SocketAddr,
21
};
22

            
23
use r2d2::Pool;
24
use r2d2_sqlite::SqliteConnectionManager;
25
use rand::Rng;
26
use rusqlite::Transaction;
27
use strum::IntoEnumIterator;
28
use tokio::net::TcpStream;
29
use tokio_util::compat::TokioAsyncReadCompatExt;
30
use tor_dirclient::request::{AuthCertRequest, ConsensusRequest, Requestable};
31
use tor_dircommon::{authority::AuthorityContacts, config::DirTolerance};
32
use tor_error::{internal, into_internal};
33
use tor_netdoc::{
34
    doc::{
35
        authcert::{AuthCertKeyIds, AuthCertUnverified},
36
        netstatus::ConsensusFlavor,
37
    },
38
    parse2::{
39
        self,
40
        poc::netstatus::{cons, md, NdiDirectorySignature},
41
        NetdocParseable, NetdocUnverified, ParseInput,
42
    },
43
};
44
use tor_rtcompat::PreferredRuntime;
45
use tracing::{debug, warn};
46

            
47
use crate::{
48
    database::{self as db, AuthCertMeta, ConsensusMeta, ContentEncoding, Timestamp},
49
    err::{AuthorityRequestError, DatabaseError, OperationError},
50
};
51

            
52
mod poc;
53

            
54
/// The various states for the [`StaticEngine`].
55
#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::Display)]
56
enum State {
57
    /// Loads the most recent valid (and verified) consensus from the database
58
    /// into memory.
59
    ///
60
    /// Transitions from:
61
    /// * Start, if a recent valid consensus exists in the database.
62
    /// * [`State::StoreConsensus`], if successfully finished.
63
    ///
64
    /// Transitions into:
65
    /// * [`State::Descriptors`]
66
    LoadConsensus,
67

            
68
    /// Downloads the most recent consensus from a directory authority.
69
    ///
70
    /// Transitions from:
71
    /// * Start, if no recent valid consensus exists in the database.
72
    /// * [`State::Descriptors`], if lifetime is over.
73
    /// * [`State::Hibernate`], if lifetime is over.
74
    ///
75
    /// Transitions into:
76
    /// * [`State::AuthCerts`], if we miss authority certificates.
77
    /// * [`State::StoreConsensus`], if all authority certificates exist in the
78
    ///   database.
79
    // TODO DIRMIRROR: What to do in the case of getting an invalid consensus
80
    // such as junk data?  The normal retry logic sounds reasonable here.
81
    FetchConsensus,
82

            
83
    /// Downloads, validates, and stores the missing authority certificates from
84
    /// the downloaded unvalidated consensus into the database.
85
    ///
86
    /// Transitions from:
87
    /// * [`State::FetchConsensus`], if we miss authority certificates.
88
    /// * [`State::AuthCerts`], if we still miss authority certificates.
89
    ///
90
    /// Transitions into:
91
    /// * [`State::AuthCerts`], if we still miss authority certificates.
92
    /// * [`State::StoreConsensus`], if we got all authority certificates.
93
    // TODO DIRMIRROR: What to do in the case of a MITM attack where an attacker
94
    // adds lots of invalid signature items at the bottom, leading to lots of
95
    // queries for directory authority certificates, which may succeed or not?
96
    // Best idea is probably to only download authcerts whose id fingerprints
97
    // are configured in our AuthorityContacts, because then we have an upper
98
    // limit.
99
    AuthCerts,
100

            
101
    /// Validates and stores the downloaded unvalidated consensus into the
102
    /// database.
103
    ///
104
    /// Transitions from:
105
    /// * [`State::FetchConsensus`], if we have all authority certificates.
106
    /// * [`State::AuthCerts`], if we have all authority certificates.
107
    ///
108
    /// Transitions into:
109
    /// * [`State::LoadConsensus`]
110
    StoreConsensus,
111

            
112
    /// Downloads missing network documents (descriptors) from a directory
113
    /// authority.
114
    ///
115
    /// Transitions from:
116
    /// * [`State::LoadConsensus`], if we initialize.
117
    /// * [`State::Descriptors`], if we still have missing descriptors left.
118
    ///
119
    /// Transitions into:
120
    /// * [`State::FetchConsensus`], if lifetime is over.
121
    /// * [`State::Descriptors`], if we still have missing descriptors left.
122
    /// * [`State::Hibernate`], if nothing is left.
123
    Descriptors,
124

            
125
    /// Hibernate because nothing is left.
126
    ///
127
    /// Transitions from:
128
    /// * [`State::Descriptors`]
129
    ///
130
    /// Transitions into:
131
    /// * [`State::FetchConsensus`], if the lifetime is over.
132
    Hibernate,
133
}
134

            
135
/// The execution engine for the finite state machine.
136
///
137
/// The states themselves are explained in [`State`].
138
///
139
/// This data structure itself is static and contains no state, but merely
140
/// configuration primitives that stay constant throughout the runtime of the
141
/// program, such as the [`ConsensusFlavor`], the [`AuthorityContacts`], and the
142
/// [`DirTolerance`].  It can be kept throughout the entire runtime and only
143
/// consists for convenience in order to not give each state machine related
144
/// (then static) method a super long signature containing these fields.
145
///
146
/// The state itself is computed fully deterministically from the data found
147
/// within the database and [`ConsensusBoundData`].
148
///
149
/// This is the reason on why this structure is not called `StateMachine`,
150
/// because this implies that the type in itself carries state, which is not
151
/// true, because the state is stored entirely external, with this engine
152
/// only processing and modifying it.
153
///
154
/// See [`StaticEngine::determine_state()`] for more details.
155
#[derive(Debug)]
156
struct StaticEngine {
157
    /// The flavor of the consensus we are serving.
158
    flavor: ConsensusFlavor,
159

            
160
    /// The authorities we are acknowledging.
161
    authorities: AuthorityContacts,
162

            
163
    /// The document tolerance we are accepting.
164
    tolerance: DirTolerance,
165

            
166
    /// The preferred runtime for compatibility with other arti crates.
167
    ///
168
    /// Generally obtained through [`PreferredRuntime::current()`].
169
    rt: PreferredRuntime,
170
}
171

            
172
/// Additional state machine data concerning a single consensus.
173
///
174
/// This enum stores and keeps track of the consensus we are serving and in
175
/// which ✨state✨ it is currently in, such as whether it is verified or not,
176
/// or if we even have a state loaded in memory in the first place.
177
#[derive(Debug, Clone)]
178
enum ConsensusBoundData {
179
    /// No state is loaded in memory at the moment.
180
    None,
181

            
182
    /// We have downloaded a consensus but it is not yet verified.
183
    Unverified {
184
        /// The unverified parsed consensus we have.
185
        // TODO DIRMIRROR: Make this optional, see comment in
186
        // StaticEngine::execute.
187
        consensus: FlavoredConsensusSigned,
188

            
189
        /// The unparsed raw consensus we have.
190
        raw: String,
191
    },
192

            
193
    /// We have downloaded and verified a consensus.
194
    Verified {
195
        /// The verified consensus we have.
196
        consensus: FlavoredConsensus,
197

            
198
        /// When to stop dealing with this consensus and fetching a new one.
199
        lifetime: Timestamp,
200

            
201
        /// SHA-1 digests of the missing server descriptors in the consensus.
202
        server_queue: HashSet<db::Sha1>,
203

            
204
        /// SHA-1 digests of the missing extra-info descriptors in the server
205
        /// descriptors of the consensus.
206
        ///
207
        /// extra-info documents are only transitively related to a consensus
208
        /// through consensus -> server descriptors -> extra-info descriptors
209
        extra_queue: HashSet<db::Sha1>,
210

            
211
        /// SHA-256 digests of the missing micro descriptors in the consensus.
212
        ///
213
        /// This field is technically mutually exclusive to server_queue and
214
        /// extra_queue because micro descriptors are only found in
215
        /// [`ConsensusFlavor::Microdesc`] and server plus extra-info
216
        /// descriptors only in [`ConsensusFlavor::Plain`].  However, because
217
        /// we used a queue based design, we just leave the queue empty instead
218
        /// of wrapping this behind an enum variant for true mutual exclusivity.
219
        /// This makes coding much easier with less boilerplate and neglectable
220
        /// additional runtime cost.
221
        micro_queue: HashSet<db::Sha256>,
222
    },
223
}
224

            
225
/// A [`ConsensusFlavor`]-like wrapper for verified network statuses.
226
///
227
/// This is required because we need to obtain, at least partial, data from
228
/// each consensus, such as the signature (although not this type), the router
229
/// descriptors, validity, and other information.
230
///
231
/// At the current moment, [`tor_netdoc`] itself does not offer things such as
232
/// a common trait for retrieving the common fields, making this structure
233
/// necessary, or alternatively lots of macro magic similar to [`tor_netdoc`].
234
///
235
/// TODO DIRMIRROR: Either add a trait for [`tor_netdoc`] or figure out if the
236
/// fields we require are all of the same type in both, so we can only store
237
/// the fields we are interested in, though this is probably only possible once
238
/// we reached later stages of code.
239
///
240
/// And no, [`std::any::Any`] is not an alternative I am willing to do.
241
#[derive(Debug, Clone)]
242
enum FlavoredConsensus {
243
    /// For plain consensuses.
244
    Ns(cons::NetworkStatus),
245

            
246
    /// For microdescriptor consensuses.
247
    Md(md::NetworkStatus),
248
}
249

            
250
/// A [`ConsensusFlavor`]-like wrapper for unverified network statuses.
251
///
252
/// TODO DIRMIRROR: See the [`FlavoredConsensus`] trait comment.
253
#[derive(Debug, Clone)]
254
enum FlavoredConsensusSigned {
255
    /// For plain consensuses.
256
    Ns(cons::NetworkStatusUnverified),
257

            
258
    /// For microdescriptor consensus.
259
    Md(md::NetworkStatusUnverified),
260
}
261

            
262
impl StaticEngine {
263
    /// Determines the [`State`] only from the database and [`ConsensusBoundData`].
264
    ///
265
    /// This method is fully idempotent, meaning it only depends upon the data
266
    /// found within the database and the [`ConsensusBoundData`]; there is no
267
    /// internal `state` variable or something contained within [`StaticEngine`].
268
6
    fn determine_state(
269
6
        &self,
270
6
        tx: &Transaction<'_>,
271
6
        data: &ConsensusBoundData,
272
6
        now: Timestamp,
273
6
    ) -> Result<State, DatabaseError> {
274
        // Determine the state primarily upon ConsensusBoundData combined with
275
        // a few database queries, as well as the current time of course.
276
6
        let state = match data {
277
            // ConsensusBoundData::None means that we currently have no
278
            // consensus in memory.  This may be the case because we just
279
            // started up or because we just downloaded, validated, and inserted
280
            // a consensus into the database and reset ConsensusBoundData to
281
            // None afterwards.
282
            ConsensusBoundData::None => {
283
                // Check whether there is a valid consensus in the database at all.
284
                //
285
                // Yes, it is kinda redundant querying a consensus here
286
                // and potentially again when loading the consensus, but SQLite
287
                // is very fast and having to maintain two different queries,
288
                // one for checking and one for selecting, is prone to get
289
                // out-of-sync.
290
2
                match ConsensusMeta::query_recent(tx, self.flavor, &self.tolerance, now)? {
291
                    // Some consensus means we can load it.
292
                    Some(_) => State::LoadConsensus,
293

            
294
                    // None means we must download it.
295
2
                    None => State::FetchConsensus,
296
                }
297
            }
298

            
299
            // ConsensusBoundData::Unverified means that we recently downloaded
300
            // a consensus through State::FetchConsensus.  It is not fully
301
            // validated yet and we may not even be able due to missing
302
            // authority certificates.
303
4
            ConsensusBoundData::Unverified { consensus, .. } => {
304
                // Check whether there any missing authority certificates that
305
                // have signed the consensus.
306
4
                let missing_certs = !AuthCertMeta::query_recent(
307
4
                    tx,
308
4
                    &consensus.signatories(),
309
4
                    &self.tolerance,
310
4
                    now,
311
                )?
312
                .1
313
4
                .is_empty();
314

            
315
4
                if missing_certs {
316
                    // Missing authority certificates means we must download
317
                    // them.
318
2
                    State::AuthCerts
319
                } else {
320
                    // If we have all authority certificates, we can validate
321
                    // and store it inside the database.
322
2
                    State::StoreConsensus
323
                }
324
            }
325

            
326
            // ConsensusBoundData::Verified means that we have successfully
327
            // loaded a recent valid consensus from the database using
328
            // State::LoadConsensus.  Depending on this, we download the missing
329
            // network documents (descriptors) from a directory authority, if
330
            // any.
331
            ConsensusBoundData::Verified {
332
                lifetime,
333
                server_queue: servers,
334
                extra_queue: extras,
335
                micro_queue: micros,
336
                ..
337
            } => {
338
                if *lifetime <= now {
339
                    // The lifetime has been surpassed, download a new
340
                    // consensus.  It is very important TO NOT transition to
341
                    // State::LoadConsensus here, because the current consensus
342
                    // may still be valid but not fresh anymore, in which case
343
                    // State::LoadConsensus will continue to obtain it from the
344
                    // database until valid-after has been surpassed, which is
345
                    // most definitely not what we want.
346
                    State::FetchConsensus
347
                } else if servers.is_empty() && extras.is_empty() && micros.is_empty() {
348
                    // All queues are empty, meaning we are done, until lifetime
349
                    // ends.
350
                    State::Hibernate
351
                } else {
352
                    // The lifetime has not been surpassed and we have stuff
353
                    // to download, so we need to obtain the descriptors.
354
                    State::Descriptors
355
                }
356
            }
357
        };
358
6
        Ok(state)
359
6
    }
360

            
361
    /// Executes a single state iteration in the finite state machine.
362
    ///
363
    /// The return value is of type [`Result<(), OperationError>`].
364
    /// The success type is not of much interest for calling applications.
365
    /// However, the error case itself should be passed towards
366
    /// [`crate::err::IsFatal::is_fatal()`] in order to either abort the
367
    /// application or retry with an appropriate timeout.
368
    ///
369
    // TODO: Use tracing instrumentation here.
370
    // TODO DIRMIRROR: Document the state transition check which we have to do
371
    // because of database invariances no longer holding true.
372
    async fn execute<R: Rng>(
373
        &self,
374
        pool: &Pool<SqliteConnectionManager>,
375
        data: &mut ConsensusBoundData,
376
        endpoint: &[SocketAddr],
377
        now: Timestamp,
378
        rng: &mut R,
379
    ) -> Result<(), OperationError> {
380
        // TODO: Should we return DatabaseError or something like
381
        // StateDeterminationError?  Either way, both cases should be seriously
382
        // fatal.
383
        let state = db::read_tx(pool, |tx| self.determine_state(tx, data, now))??;
384
        debug!("state is {state}");
385

            
386
        match state {
387
            State::LoadConsensus => self.load_consensus(pool, data, now, rng),
388
            State::FetchConsensus => Ok(self.fetch_consensus(data, endpoint).await?),
389
            State::AuthCerts => self.auth_certs(pool, data, endpoint, now).await,
390
            State::StoreConsensus => todo!(),
391
            State::Descriptors => todo!(),
392
            State::Hibernate => self.hibernate(data, now).await,
393
        }
394
    }
395

            
396
    /// Executes [`State::LoadConsensus`].
397
    ///
398
    /// This method does the following:
399
    /// * Load the most recent valid consensus from the database.
400
    /// * Compute the lifetime for it.
401
    /// * Compute the missing descriptors for it.
402
2
    fn load_consensus<R: Rng>(
403
2
        &self,
404
2
        pool: &Pool<SqliteConnectionManager>,
405
2
        data: &mut ConsensusBoundData,
406
2
        now: Timestamp,
407
2
        rng: &mut R,
408
2
    ) -> Result<(), OperationError> {
409
        // Load the most recent valid consensus from the database.
410
        //
411
        // If there is no consensus, we should have not entered the state, which
412
        // means that the database must have been externally verified.
413
        // In this case, it is probably better to return a bug, as external
414
        // applications arbitrarily modifying the database while we are running
415
        // leaves too much room for wrong/weird behavior.
416
2
        let (server_queue, extra_queue, micro_queue, lifetime, consensus) =
417
2
            db::read_tx(pool, |tx| {
418
2
                let meta = ConsensusMeta::query_recent(tx, self.flavor, &self.tolerance, now)?
419
2
                    .ok_or(internal!("database externally modified?"))?;
420
2
                let server_queue = meta.missing_servers(tx)?;
421
2
                let extra_queue = meta.missing_extras(tx)?;
422
2
                let micro_queue = meta.missing_micros(tx)?;
423
2
                let lifetime = meta.lifetime(rng);
424
2
                let consensus = meta.data(tx)?;
425
2
                Ok::<_, DatabaseError>((
426
2
                    server_queue,
427
2
                    extra_queue,
428
2
                    micro_queue,
429
2
                    lifetime,
430
2
                    consensus,
431
2
                ))
432
2
            })??;
433

            
434
        // Parse the most recent valid consensus from the database.
435
        //
436
        // TODO DIRMIRROR:
437
        // Because only valid documents may exist in the database, it should
438
        // succeed.  However, there is this weird edge-case where we may have
439
        // inserted a document with a field we do not understand because of
440
        // using an old version.  After upgrading our version we may now
441
        // understand the field and realize it is wrong, leading to a violation
442
        // of this constraint.  Handling this is not very easy; I suppose adding
443
        // an additional column to the meta table storing the last used crate
444
        // version is a sensible idea, with upgrades and downgrades leading to
445
        // a parsing of all network documents within the database, throwing the
446
        // ones out we do not understand (anymore).
447
        //
448
        // See also the relevant MR discussion:
449
        // <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3664#note_3352723>
450
2
        let consensus = match self.flavor {
451
            ConsensusFlavor::Plain => FlavoredConsensus::Ns(
452
2
                parse2::parse_netdoc::<cons::NetworkStatusUnverified>(&ParseInput::new(
453
2
                    &consensus, "",
454
2
                ))
455
2
                .map_err(into_internal!("invalid netdoc in database?"))?
456
                // TODO DIRMIRROR: explain why this is OK, or re-verify the signatures
457
2
                .unwrap_unverified()
458
                .0,
459
            ),
460
            ConsensusFlavor::Microdesc => FlavoredConsensus::Md(
461
                parse2::parse_netdoc::<md::NetworkStatusUnverified>(&ParseInput::new(
462
                    &consensus, "",
463
                ))
464
                .map_err(into_internal!("invalid netdoc in database?"))?
465
                // TODO DIRMIRROR: explain why this is OK, or re-verify the signatures
466
                .unwrap_unverified()
467
                .0,
468
            ),
469
        };
470

            
471
2
        *data = ConsensusBoundData::Verified {
472
2
            consensus,
473
2
            lifetime,
474
2
            server_queue,
475
2
            extra_queue,
476
2
            micro_queue,
477
2
        };
478
2
        Ok(())
479
2
    }
480

            
481
    /// Fetches a consensus from an upstream authority.
482
    // TODO DIRMIRROR: Add logging.
483
2
    async fn fetch_consensus(
484
2
        &self,
485
2
        data: &mut ConsensusBoundData,
486
2
        endpoint: &[SocketAddr],
487
3
    ) -> Result<(), AuthorityRequestError> {
488
        // Obtain the consensus.
489
2
        let mut consensus: VecDeque<_> = match self.flavor {
490
2
            ConsensusFlavor::Plain => self
491
2
                .send_request(endpoint, ConsensusRequest::new(self.flavor))
492
2
                .await
493
2
                .map(|(raw, doc)| {
494
2
                    doc.into_iter()
495
2
                        .map(|(doc, start, end)| {
496
2
                            (raw[start..end].to_owned(), FlavoredConsensusSigned::Ns(doc))
497
2
                        })
498
2
                        .collect()
499
2
                }),
500
            ConsensusFlavor::Microdesc => self
501
                .send_request(endpoint, ConsensusRequest::new(self.flavor))
502
                .await
503
                .map(|(raw, doc)| {
504
                    doc.into_iter()
505
                        .map(|(doc, start, end)| {
506
                            (raw[start..end].to_owned(), FlavoredConsensusSigned::Md(doc))
507
                        })
508
                        .collect()
509
                }),
510
        }?;
511

            
512
        // Check for the correct number of results.
513
2
        if consensus.len() != 1 {
514
            return Err(AuthorityRequestError::Response(
515
                "invalid number of consensus?",
516
            ));
517
2
        }
518

            
519
        // expect is fine because we checked the length for one above.
520
2
        let (raw, consensus) = consensus.pop_front().expect("pop_front");
521

            
522
        // And store it.
523
2
        *data = ConsensusBoundData::Unverified { consensus, raw };
524

            
525
2
        Ok(())
526
2
    }
527

            
528
    /// Fetches, validates, and stores authority certificates.
529
    //
530
    // TODO DIRMIRROR: Right now, there is a torspec DoS issue.
531
    // An attacker may add lots of garbage signatures and we will fetch them
532
    // Even checking the ID PK against v3idents is not useful because an
533
    // attacker may still use the same ID PK dozens of times with various
534
    // SK PKs.  A good fix would include checking that no ID PK is duplicate
535
    // AND to ignore all ID PKs we do not recognize.  Also, it would probably
536
    // be best to move the v3idents structure to a HashMap based implementation,
537
    // as well as the signatories result.
538
2
    async fn auth_certs(
539
2
        &self,
540
2
        pool: &Pool<SqliteConnectionManager>,
541
2
        data: &mut ConsensusBoundData,
542
2
        endpoint: &[SocketAddr],
543
2
        now: Timestamp,
544
3
    ) -> Result<(), OperationError> {
545
        // Obtain the signatories of the current unverified consensus.
546
2
        let signatories = match data {
547
2
            ConsensusBoundData::Unverified { consensus, .. } => consensus.signatories(),
548
            _ => return Err(OperationError::Bug(internal!("data is not unverified"))),
549
        };
550

            
551
        // Obtain the missing certificate identifiers.
552
2
        let (_, missing) = db::read_tx(pool, |tx| {
553
2
            AuthCertMeta::query_recent(tx, &signatories, &self.tolerance, now)
554
2
        })??;
555
2
        if missing.is_empty() {
556
            // Although not technically fatal, retrying when the database was
557
            // externally modified does not make much sense.
558
            return Err(OperationError::Bug(internal!(
559
                "database externally modified?"
560
            )));
561
2
        }
562

            
563
        // Compose the request.
564
2
        let mut requ = AuthCertRequest::new();
565
18
        for kp in missing.iter().copied() {
566
18
            requ.push(kp);
567
18
        }
568

            
569
        // Fire it off.
570
2
        let (resp, certs) = self
571
2
            .send_request::<_, AuthCertUnverified>(endpoint, requ)
572
2
            .await?;
573

            
574
        // Verify each certificate.  Invalid certificates and other problems get
575
        // logged and filtered out, with the result being then inserted into
576
        // the database.
577
2
        let certs = certs
578
2
            .into_iter()
579
34
            .filter_map(|(unverified, start, end)| {
580
34
                let unverified_body = unverified.inspect_unverified().0;
581
34
                let kp = AuthCertKeyIds {
582
34
                    id_fingerprint: unverified_body.dir_identity_key.to_rsa_identity(),
583
34
                    sk_fingerprint: unverified_body.dir_signing_key.to_rsa_identity(),
584
34
                };
585

            
586
                // Skip certificates we did not asked for.
587
                //
588
                // Not much of an issue because certificate verification will
589
                // usually fail anyways, except for this weird edge-case where we
590
                // actually have that id fingerprint in the v3idents.
591
34
                if !missing.contains(&kp) {
592
16
                    debug!("authority returned certificate we did not asked for: {kp:?}");
593
16
                    return None;
594
18
                }
595

            
596
18
                let verified = unverified.verify_self_signed(
597
18
                    self.authorities.v3idents(),
598
18
                    self.tolerance.pre_valid_tolerance(),
599
18
                    self.tolerance.post_valid_tolerance(),
600
18
                    now.into(),
601
                );
602
18
                let verified = match verified {
603
18
                    Ok(v) => v,
604
                    Err(e) => {
605
                        // TODO DIRMIRROR: Log the actual cert.
606
                        warn!("received invalid auth cert: {e}",);
607
                        return None;
608
                    }
609
                };
610

            
611
18
                Some((verified, &resp[start..end]))
612
34
            })
613
2
            .collect::<Vec<_>>();
614

            
615
        // When we have reached this, it means that this call made no progress,
616
        // i.e. the authority only returned certificates we were not interested
617
        // in.
618
2
        if certs.is_empty() {
619
            Err(Box::new(AuthorityRequestError::Response(
620
                "response lead to no progress",
621
            )))?;
622
2
        }
623

            
624
        // Finally, insert them all into the database.
625
2
        db::rw_tx(pool, |tx| {
626
20
            for (cert, data) in certs {
627
18
                AuthCertMeta::insert(tx, ContentEncoding::iter(), &cert, data)?;
628
            }
629
2
            Ok::<_, DatabaseError>(())
630
2
        })??;
631

            
632
2
        Ok(())
633
2
    }
634

            
635
    /// Hibernates for the remaining lifetime of the consensus.
636
    async fn hibernate(
637
        &self,
638
        data: &mut ConsensusBoundData,
639
        now: Timestamp,
640
    ) -> Result<(), OperationError> {
641
        match data {
642
            ConsensusBoundData::None | ConsensusBoundData::Unverified { .. } => {
643
                // This should not happen, we only enter hibernation in a state
644
                // that already has a verified consensus.
645
                return Err(internal!("hibernating without a verified consensus?").into());
646
            }
647
            ConsensusBoundData::Verified { lifetime, .. } => {
648
                let timeout = *lifetime - now;
649
                debug!("hibernating for {}s", timeout.as_secs());
650
                tokio::time::sleep(timeout).await;
651
            }
652
        }
653

            
654
        Ok(())
655
    }
656

            
657
    /// Convenience wrapper around [`tor_dirclient::send_request()`].
658
    ///
659
    /// It opens a TCP connection, performs the request, and parses the result.
660
    ///
661
    /// Returns the raw response alongside the output of
662
    /// [`parse2::parse_netdoc_multiple_with_offsets()`].
663
    ///
664
    /// The output is required because we need the raw document alongside the
665
    /// offsets to have the actual data we will insert into the database later
666
    /// on.
667
4
    async fn send_request<R: Requestable, T: NetdocParseable>(
668
4
        &self,
669
4
        endpoint: &[SocketAddr],
670
4
        requ: R,
671
4
    ) -> Result<(String, Vec<(T, usize, usize)>), AuthorityRequestError> {
672
        // The check is required to not let Tokio panic.
673
4
        if endpoint.is_empty() {
674
            return Err(AuthorityRequestError::Bug(internal!("empty endpoint?")));
675
4
        }
676

            
677
        // Open the TCP connection.
678
4
        let mut stream = TcpStream::connect(endpoint)
679
4
            .await
680
4
            .map_err(AuthorityRequestError::TcpConnect)?
681
4
            .compat();
682

            
683
        // Perform the request and map the result nicely.
684
4
        let resp = tor_dirclient::send_request(&self.rt, &requ, &mut stream, None)
685
4
            .await
686
4
            .map(|resp| resp.output_string().map(|resp| resp.to_owned()));
687

            
688
        // We can immediately drop the connection now, no need to occupy even
689
        // more resources from the authority.  Doing so is fine, it is HTTP/1.0
690
        // and there is no connection reuse anyways.
691
4
        drop(stream);
692

            
693
        // Returning all request failed errors is okay; they all imply that
694
        // retrying from a different authority is fine.
695
        // TODO MSRV: If possible, use Result::flatten once MSRV 1.89.
696
4
        let resp = match resp {
697
4
            Ok(Ok(r)) => Ok(r),
698
            Ok(Err(e)) => Err(e),
699
            Err(tor_dirclient::Error::RequestFailed(e)) => Err(e),
700
            Err(e) => {
701
                return Err(AuthorityRequestError::Bug(internal!(
702
                    "unhandled dirclient error: {e}"
703
                )))
704
            }
705
        }?;
706

            
707
        // Parse the response.
708
4
        let parsed = parse2::parse_netdoc_multiple_with_offsets(&ParseInput::new(&resp, ""))?;
709

            
710
4
        Ok((resp, parsed))
711
4
    }
712
}
713

            
714
impl FlavoredConsensusSigned {
715
    /// Wrapper to obtain the signatories of a flavored consensus.
716
8
    fn signatories(&self) -> Vec<AuthCertKeyIds> {
717
8
        let sigs = match &self {
718
8
            Self::Ns(ns) => &ns.sigs.sigs.directory_signature,
719
            Self::Md(md) => &md.sigs.sigs.directory_signature,
720
        };
721
8
        sigs.iter()
722
76
            .filter_map(|sig| match sig {
723
                NdiDirectorySignature::Known {
724
72
                    h_kp_auth_id_rsa,
725
72
                    h_kp_auth_sign_rsa,
726
                    ..
727
72
                } => Some(AuthCertKeyIds {
728
72
                    id_fingerprint: *h_kp_auth_id_rsa,
729
72
                    sk_fingerprint: *h_kp_auth_sign_rsa,
730
72
                }),
731
                // TODO DIRMIRROR: This is inappropriate, but because we are
732
                // using poc, we have to refactor this either way.
733
                _ => None,
734
72
            })
735
8
            .collect()
736
8
    }
737
}
738

            
739
#[cfg(test)]
740
mod test {
741
    // @@ begin test lint list maintained by maint/add_warning @@
742
    #![allow(clippy::bool_assert_comparison)]
743
    #![allow(clippy::clone_on_copy)]
744
    #![allow(clippy::dbg_macro)]
745
    #![allow(clippy::mixed_attributes_style)]
746
    #![allow(clippy::print_stderr)]
747
    #![allow(clippy::print_stdout)]
748
    #![allow(clippy::single_char_pattern)]
749
    #![allow(clippy::unwrap_used)]
750
    #![allow(clippy::unchecked_time_subtraction)]
751
    #![allow(clippy::useless_vec)]
752
    #![allow(clippy::needless_pass_by_value)]
753
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
754

            
755
    use std::time::{Duration, SystemTime};
756

            
757
    use rusqlite::named_params;
758
    use tokio::{
759
        io::{AsyncReadExt, AsyncWriteExt},
760
        net::TcpListener,
761
    };
762
    use tor_basic_utils::test_rng::testing_rng;
763
    use tor_netdoc::parse2::NetdocUnverified;
764

            
765
    use crate::database::sql;
766

            
767
    use super::*;
768

            
769
    fn create_dummy_db() -> Pool<SqliteConnectionManager> {
770
        let pool = db::open("").unwrap();
771

            
772
        let mut conn = pool.get().unwrap();
773
        let tx = conn.transaction().unwrap();
774

            
775
        let cons_docid = db::store_insert(
776
            &tx,
777
            include_bytes!("../../testdata/consensus-ns"),
778
            std::iter::empty(),
779
        )
780
        .unwrap();
781
        let ns1_docid = db::store_insert(
782
            &tx,
783
            include_bytes!("../../testdata/descriptor1-ns"),
784
            std::iter::empty(),
785
        )
786
        .unwrap();
787
        let extra1_docid = db::store_insert(
788
            &tx,
789
            include_bytes!("../../testdata/descriptor1-extra-info"),
790
            std::iter::empty(),
791
        )
792
        .unwrap();
793

            
794
        tx.execute(
795
            sql!(
796
                "
797
                INSERT INTO router_extra_info (docid, unsigned_sha1, kp_relay_id_rsa_sha1)
798
                VALUES
799
                (:docid, :sha1, :fingerprint)
800
                "
801
            ),
802
            named_params! {
803
                ":docid": extra1_docid,
804
                ":sha1": db::Sha1::digest(include_bytes!("../../testdata/descriptor1-extra-info-unsigned")),
805
                ":fingerprint": "000004ACBB9D29BCBA17256BB35928DDBFC8ABA9"
806
            },
807
        )
808
        .unwrap();
809
        tx.execute(
810
            sql!(
811
                "
812
                INSERT INTO router_descriptor
813
                (docid, unsigned_sha1, unsigned_sha2, kp_relay_id_rsa_sha1, flavor, extra_unsigned_sha1)
814
                VALUES
815
                (:docid, :sha1, :sha2, :fingerprint, 'ns', :extra)
816
                "
817
            ),
818
            named_params! {
819
                ":docid": ns1_docid,
820
                ":sha1": db::Sha1::digest(include_bytes!("../../testdata/descriptor1-ns-unsigned")),
821
                ":sha2": db::Sha256::digest(include_bytes!("../../testdata/descriptor1-ns-unsigned")),
822
                ":fingerprint": "000004ACBB9D29BCBA17256BB35928DDBFC8ABA9",
823
                ":extra": db::Sha1::digest(include_bytes!("../../testdata/descriptor1-extra-info-unsigned")),
824
            },
825
        )
826
        .unwrap();
827

            
828
        tx.execute(
829
            sql!(
830
                "
831
                INSERT INTO consensus
832
                (docid, unsigned_sha3_256, flavor, valid_after, fresh_until, valid_until)
833
                VALUES
834
                (:docid, :sha3, 'ns', :valid_after, :fresh_until, :valid_until)
835
                "
836
            ),
837
            named_params! {
838
                ":docid": cons_docid,
839
                ":sha3": "0000000000000000000000000000000000000000000000000000000000000000",
840
                ":valid_after": 1769698800,
841
                ":fresh_until": 1769702400,
842
                ":valid_until": 1769709600,
843
            },
844
        )
845
        .unwrap();
846

            
847
        tx.execute(
848
            sql!(
849
                "
850
                INSERT INTO consensus_router_descriptor_member
851
                (consensus_docid, unsigned_sha1, unsigned_sha2)
852
                VALUES
853
                (:cons_docid, :ns1_sha1, :ns1_sha2),
854
                (:cons_docid, :ns2_sha1, :ns2_sha2)
855
                "
856
            ),
857
            named_params! {
858
                ":cons_docid": cons_docid,
859
                ":ns1_sha1": db::Sha1::digest(include_bytes!("../../testdata/descriptor1-ns-unsigned")),
860
                ":ns1_sha2": db::Sha256::digest(include_bytes!("../../testdata/descriptor1-ns-unsigned")),
861
                ":ns2_sha1": db::Sha1::digest(include_bytes!("../../testdata/descriptor2-ns-unsigned")),
862
                ":ns2_sha2": db::Sha256::digest(include_bytes!("../../testdata/descriptor2-ns-unsigned")),
863
            },
864
        )
865
        .unwrap();
866

            
867
        tx.commit().unwrap();
868

            
869
        pool
870
    }
871

            
872
    #[tokio::test]
873
    async fn state_load_consensus() {
874
        let pool = create_dummy_db();
875
        let mut data = ConsensusBoundData::None;
876
        let engine = StaticEngine {
877
            flavor: ConsensusFlavor::Plain,
878
            authorities: AuthorityContacts::default(),
879
            tolerance: DirTolerance::default(),
880
            rt: PreferredRuntime::current().unwrap(),
881
        };
882

            
883
        let time = SystemTime::UNIX_EPOCH + Duration::from_secs(1769700600); // 2026-01-29 15:30:00
884
        let time: Timestamp = time.into();
885
        let fresh_until = time + Duration::from_secs(60 * 30);
886
        let fresh_until_half = fresh_until + Duration::from_secs(60 * 60);
887

            
888
        engine
889
            .load_consensus(&pool, &mut data, time, &mut testing_rng())
890
            .unwrap();
891

            
892
        // El-cheapo assert_eq due to lack of PartialEq for tor-netdoc poc.
893
        match data {
894
            ConsensusBoundData::Verified {
895
                consensus,
896
                lifetime,
897
                server_queue,
898
                extra_queue,
899
                micro_queue,
900
            } => {
901
                match consensus {
902
                    FlavoredConsensus::Ns(_) => {}
903
                    _ => panic!("consensus not ns"),
904
                }
905
                assert_eq!(
906
                    server_queue,
907
                    HashSet::from([db::Sha1::digest(include_bytes!(
908
                        "../../testdata/descriptor2-ns-unsigned"
909
                    ))])
910
                );
911
                assert!(lifetime >= fresh_until);
912
                assert!(lifetime <= fresh_until_half);
913
                assert!(extra_queue.is_empty());
914
                assert!(micro_queue.is_empty());
915
            }
916
            _ => panic!("data is not verified"),
917
        }
918
    }
919

            
920
    #[tokio::test]
921
    async fn state_fetch_consensus() {
922
        let pool = create_dummy_db();
923
        let mut data = ConsensusBoundData::None;
924
        let engine = StaticEngine {
925
            flavor: ConsensusFlavor::Plain,
926
            authorities: AuthorityContacts::default(),
927
            tolerance: DirTolerance::default(),
928
            rt: PreferredRuntime::current().unwrap(),
929
        };
930

            
931
        let state = db::read_tx(&pool, |tx| {
932
            engine.determine_state(tx, &data, SystemTime::UNIX_EPOCH.into())
933
        })
934
        .unwrap()
935
        .unwrap();
936
        assert_eq!(state, State::FetchConsensus);
937

            
938
        let server = TcpListener::bind("[::]:0").await.unwrap();
939
        let saddr = server.local_addr().unwrap();
940
        tokio::spawn(async move {
941
            let (mut stream, _) = server.accept().await.unwrap();
942
            let mut buf = vec![0; 1024];
943
            let _ = stream.read(&mut buf).await.unwrap();
944

            
945
            let consensus = include_str!("../../testdata/consensus-ns");
946
            let resp = format!(
947
                "HTTP/1.0 200 OK\r\nContent-Encoding: identity\r\nContent-Length: {}\r\n\r\n{consensus}",
948
                consensus.len()
949
            );
950
            stream.write_all(resp.as_bytes()).await.unwrap();
951
        });
952

            
953
        engine.fetch_consensus(&mut data, &[saddr]).await.unwrap();
954
        match data {
955
            ConsensusBoundData::Unverified { consensus, raw } => match consensus {
956
                FlavoredConsensusSigned::Ns(ns) => {
957
                    // El-cheapo verification, this is not a parser unit test.
958
                    assert_eq!(ns.unwrap_unverified().0.r.len(), 2);
959
                    assert_eq!(raw, include_str!("../../testdata/consensus-ns"));
960
                }
961
                _ => panic!("data is not unverified ns consensus"),
962
            },
963
            _ => panic!("data is not unverified"),
964
        }
965
    }
966

            
967
    #[tokio::test]
968
    async fn state_auth_certs() {
969
        let pool = create_dummy_db();
970
        let mut data = ConsensusBoundData::Unverified {
971
            consensus: FlavoredConsensusSigned::Ns(
972
                parse2::parse_netdoc(&ParseInput::new(
973
                    include_str!("../../testdata/consensus-ns"),
974
                    "",
975
                ))
976
                .unwrap(),
977
            ),
978
            raw: include_str!("../../testdata/consensus-ns").to_owned(),
979
        };
980
        let engine = StaticEngine {
981
            flavor: ConsensusFlavor::Plain,
982
            authorities: AuthorityContacts::default(),
983
            tolerance: DirTolerance::default(),
984
            rt: PreferredRuntime::current().unwrap(),
985
        };
986

            
987
        assert_eq!(
988
            db::read_tx(&pool, |tx| engine.determine_state(
989
                tx,
990
                &data,
991
                SystemTime::UNIX_EPOCH.into()
992
            ))
993
            .unwrap()
994
            .unwrap(),
995
            State::AuthCerts
996
        );
997

            
998
        let server = TcpListener::bind("[::]:0").await.unwrap();
999
        let saddr = server.local_addr().unwrap();
        tokio::spawn(async move {
            let mut buf = [0; 1024];
            let (mut stream, _) = server.accept().await.unwrap();
            let _ = stream.read(&mut buf).await.unwrap();
            let authcerts = include_str!("../../testdata/authcert-all");
            stream.write_all(format!(
                "HTTP/1.0 200 OK\r\nContent-Encoding: identity\r\nContent-Length: {}\r\n\r\n{authcerts}",
                authcerts.len()
            ).as_bytes()).await.unwrap();
        });
        // Fetch all authcerts.
        engine
            .auth_certs(
                &pool,
                &mut data,
                &[saddr],
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1770639454)).into(), // Mon Feb  9 12:17:34 UTC 2026
            )
            .await
            .unwrap();
        // Check whether we are done with all authcerts.
        assert_eq!(
            db::read_tx(&pool, |tx| engine.determine_state(
                tx,
                &data,
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1770639454)).into(), // Mon Feb  9 12:17:34 UTC 2026
            ))
            .unwrap()
            .unwrap(),
            State::StoreConsensus
        );
        let recent_authcerts = db::read_tx(&pool, |tx| {
            AuthCertMeta::query_recent(
                tx,
                &FlavoredConsensusSigned::Ns(
                    parse2::parse_netdoc(&ParseInput::new(
                        include_str!("../../testdata/consensus-ns"),
                        "",
                    ))
                    .unwrap(),
                )
                .signatories(),
                &DirTolerance::default(),
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1770639454)).into(), // Mon Feb  9 12:17:34 UTC 2026
            )
        })
        .unwrap()
        .unwrap();
        // TODO DIRMIRROR: Compare more than just length.
        assert_eq!(
            recent_authcerts.0.len(),
            engine.authorities.v3idents().len()
        );
        assert!(recent_authcerts.1.is_empty());
    }
}