1
//! Access to the database schema.
2
//!
3
//! This module is not intended to provide a high-level ORM, instead it serves
4
//! the purpose of initializing and upgrading the database, if necessary.
5
//!
6
//! # Synchronous or Asynchronous?
7
//!
8
//! The question on whether the database and access to it shall be synchronous
9
//! or asynchronous has been fairly long debate that eventually got settled
10
//! after realizing that an asynchronous approach does not work.  This comment
11
//! should serve as a reminder for future devs, wondering why we use certain
12
//! synchronous primitives in an otherwise asynchronous codebase.
13
//!
14
//! Early on, it was clear that we would need some sort of connection pool,
15
//! primarily for two reasons:
16
//! 1. Performing frequent open and close calls in every task would be costly.
17
//! 2. Sharing a single connection object with a Mutex would be a waste
18
//!
19
//! Because the application itself is primarily asynchronous, we decided to go
20
//! with an asynchronous connection pool as well, leading to the choose of
21
//! `deadpool` initially.
22
//!
23
//! However, soon thereafter, problems with `deadpool` became evident.  Those
24
//! problems mostly stemmed from the synchronous nature of SQLite itself.  In our
25
//! case, this problem was initially triggered by figuring out a way to solve
26
//! `SQLITE_BUSY` handling.  In the end, we decided to settle upon the following
27
//! approach: Set `PRAGMA busy_timeout` to a certain value and create write
28
//! transactions with `BEGIN EXCLUSIVE`.  This way, SQLite would try to obtain
29
//! a write transaction for `busy_timeout` milliseconds by blocking the current
30
//! thread.  Due to this blocking, async no longer made any sense and was in
31
//! fact quite counter-productive because those potential sleep could screw a
32
//! lot of things up, which became very evident while trying to test this.
33
//!
34
//! Besides, throughout refactoring the code base, we realized that, even while
35
//! still using `deadpool`, the actual "asynchronous" calls interfacing with the
36
//! database became smaller and smaller.  In the end, the asynchronous code just
37
//! involved parts of obtaining a connection and creating a transaction,
38
//! eventually resulting in a calling a synchronous function taking the
39
//! transaction handle to perform the lion's share of the operation.
40

            
41
// TODO DIRMIRROR: This could benefit from methods by wrapping the pool into a
42
// custom type.
43

            
44
use std::{
45
    collections::HashSet,
46
    fmt::Display,
47
    io::{Cursor, Write},
48
    num::NonZero,
49
    ops::{Add, Sub},
50
    path::Path,
51
    time::{Duration, SystemTime},
52
};
53

            
54
use digest::Digest;
55
use flate2::write::{DeflateEncoder, GzEncoder};
56
use getset::CopyGetters;
57
use r2d2::Pool;
58
use r2d2_sqlite::SqliteConnectionManager;
59
use rand::Rng;
60
use rusqlite::{
61
    named_params, params,
62
    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
63
    OptionalExtension, ToSql, Transaction, TransactionBehavior,
64
};
65
use saturating_time::SaturatingTime;
66
use tor_basic_utils::RngExt;
67
use tor_dircommon::config::DirTolerance;
68
use tor_error::into_internal;
69
use tor_netdoc::doc::{
70
    authcert::{AuthCert, AuthCertKeyIds},
71
    netstatus::ConsensusFlavor,
72
};
73

            
74
use crate::err::DatabaseError;
75

            
76
/// Version 1 of the database schema.
77
///
78
/// TODO DIRMIRROR: Before the release, figure out where to use rowid and where
79
/// to use docid.
80
const V1_SCHEMA: &str = include_str!("schema_v1.sql");
81

            
82
/// Global options set in every connection.
83
const GLOBAL_OPTIONS: &str = sql!(
84
    "
85
PRAGMA journal_mode=WAL;
86
PRAGMA foreign_keys=ON;
87
PRAGMA busy_timeout=1000;
88
"
89
);
90

            
91
/// Convenience macro for implementing a hash type in a rusqlite compatible fashion.
92
///
93
/// This macro accepts the following parameters:
94
/// 1. `name` for specifying an identifier of the type, such as [`Sha256`].
95
/// 2. `algo` for specifying the type from the rust-crypto [`digest`] ecosystem,
96
///    such as [`tor_llcrypto::d::Sha256`].
97
/// 3. The size in bytes of the hash output, such as `32` for [`Sha256`].
98
///     * Unfortunately, we cannot use something like [`Digest::output_size()`]
99
///       because it is not a constant.
100
///
101
/// It generates a struct with `name` as the identifier, which implements the
102
/// following methods:
103
/// * `digest` for wrapping around [`Digest::digest()`].
104
///
105
/// It also implements the following traits:
106
/// * [`Display`]
107
/// * [`FromSql`]
108
/// * [`ToSql`]
109
/// * [`PartialEq<&str>`] for base16 comparisons
110
/// * [`From<u8; $size>`] but only in tests
111
macro_rules! impl_hash_wrapper {
112
    ($name:ident, $algo:ty, $size:literal) => {
113
        /// Database wrapper type.
114
        #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
115
        pub(crate) struct $name([u8; $size]);
116

            
117
        impl $name {
118
            /// Computes the hash from arbitrary data.
119
388
            pub(crate) fn digest(data: &[u8]) -> Self {
120
388
                Self(<$algo>::digest(data).into())
121
388
            }
122
        }
123

            
124
        impl Display for $name {
125
            /// Formats the hash in uppercase hexadecimal.
126
764
            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127
764
                write!(f, "{}", hex::encode_upper(self.0))
128
764
            }
129
        }
130

            
131
        impl FromSql for $name {
132
176
            fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
133
                // We read the hash as a hexadecimal string from the database.
134
                // Convert it to binary data and check length afterwards.
135
176
                let data: [u8; $size] = value
136
176
                    .as_str()
137
176
                    .map(hex::decode)?
138
176
                    .map_err(|e| {
139
                        FromSqlError::Other(Box::new(tor_error::internal!(
140
                            "non hex data in database? {e}"
141
                        )))
142
                    })?
143
176
                    .try_into()
144
176
                    .map_err(|_| {
145
                        FromSqlError::Other(Box::new(tor_error::internal!(
146
                            "$name with invalid length in database?"
147
                        )))
148
                    })?;
149

            
150
176
                Ok(Self(data))
151
176
            }
152
        }
153

            
154
        impl ToSql for $name {
155
762
            fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
156
                // Because Self is only constructed with FromSql and digest
157
                // data, it is safe to assume it is valid.
158
762
                Ok(ToSqlOutput::from(self.to_string()))
159
762
            }
160
        }
161

            
162
        impl PartialEq<&str> for $name {
163
2
            fn eq(&self, other: &&str) -> bool {
164
2
                self.to_string() == other.to_uppercase()
165
2
            }
166
        }
167

            
168
        #[cfg(test)]
169
        impl From<[u8; $size]> for $name {
170
40
            fn from(value: [u8; $size]) -> Self {
171
40
                Self(value)
172
40
            }
173
        }
174
    };
175
}
176

            
177
impl_hash_wrapper!(Sha1, tor_llcrypto::d::Sha1, 20);
178
impl_hash_wrapper!(Sha256, tor_llcrypto::d::Sha256, 32);
179
impl_hash_wrapper!(Sha3_256, tor_llcrypto::d::Sha3_256, 32);
180

            
181
/// The identifier for documents in the content-addressable cache.
182
///
183
/// Right now, this is a [`Sha256`] hash, but this may change in future.
184
pub(crate) type DocumentId = Sha256;
185

            
186
/// The supported content encodings.
187
#[derive(Debug, Clone, Copy, PartialEq, strum::EnumString, strum::Display, strum::EnumIter)]
188
#[strum(serialize_all = "kebab-case", ascii_case_insensitive)]
189
pub(crate) enum ContentEncoding {
190
    /// RFC2616 section 3.5.
191
    Identity,
192
    /// RFC2616 section 3.5.
193
    Deflate,
194
    /// RFC2616 section 3.5.
195
    Gzip,
196
    /// The zstandard compression algorithm (www.zstd.net).
197
    XZstd,
198
    /// The lzma compression algorithm with a "present" value no higher than 6.
199
    XTorLzma,
200
}
201

            
202
/// A wrapper around [`SystemTime`] with convenient features.
203
///
204
/// Please use this type throughout the crate internally, instead of
205
/// [`SystemTime`].
206
///
207
/// # Conversion
208
///
209
/// This type can be safely converted from and into a [`SystemTime`], because
210
/// it is just a wrapper type.
211
///
212
/// # Saturating Arithmetic
213
///
214
/// This type implements [`Add`] and [`Sub`] for [`Duration`] and [`Timestamp`]
215
/// ([`Sub`] only) using saturating arithmetic from the [`saturating_time`]
216
/// crate.  It means that addition and subtraction can be safely performed
217
/// without the potential risk of an unexpected panic, instead wrapping to
218
/// a local maximum/minimum or [`Duration::ZERO`] depending on the type.
219
///
220
/// Note that we don't provide a saturating version of [`Duration`], so addition
221
/// or subtraction of two [`Duration`]s still needs care to avoid panics.
222
///
223
/// # SQLite Interaction
224
///
225
/// This type implements [`FromSql`] and [`ToSql`], making it convenient to
226
/// integrate into SQL statements, as the database schema represents timestamps
227
/// internally using a non-negative [`i64`] storing the seconds since the epoch.
228
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
229
pub(crate) struct Timestamp(SystemTime);
230

            
231
impl From<SystemTime> for Timestamp {
232
60
    fn from(value: SystemTime) -> Self {
233
60
        Self(value)
234
60
    }
235
}
236

            
237
impl From<Timestamp> for SystemTime {
238
18
    fn from(value: Timestamp) -> Self {
239
18
        value.0
240
18
    }
241
}
242

            
243
impl Add<Duration> for Timestamp {
244
    type Output = Self;
245

            
246
    /// Performs a saturating addition wrapping to [`SystemTime::max_value()`].
247
20020
    fn add(self, rhs: Duration) -> Self::Output {
248
20020
        Self(self.0.saturating_add(rhs))
249
20020
    }
250
}
251

            
252
impl Sub<Duration> for Timestamp {
253
    type Output = Self;
254

            
255
    /// Performs a saturating subtraction wrapping to [`SystemTime::min_value()`].
256
4
    fn sub(self, rhs: Duration) -> Self::Output {
257
4
        Self(self.0.saturating_sub(rhs))
258
4
    }
259
}
260

            
261
impl Sub<Timestamp> for Timestamp {
262
    type Output = Duration;
263

            
264
    /// Performs a saturating duration_since wrapping to [`Duration::ZERO`].
265
20002
    fn sub(self, rhs: Timestamp) -> Self::Output {
266
        #[allow(unstable_name_collisions)]
267
20002
        self.0.saturating_duration_since(rhs.0)
268
20002
    }
269
}
270

            
271
impl FromSql for Timestamp {
272
130
    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
273
130
        let mut res = SystemTime::UNIX_EPOCH;
274
130
        res = res.saturating_add(Duration::from_secs(value.as_i64()?.try_into().unwrap_or(0)));
275
130
        Ok(Self(res))
276
130
    }
277
}
278

            
279
impl ToSql for Timestamp {
280
202
    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
281
        #[allow(unstable_name_collisions)]
282
202
        Ok(ToSqlOutput::from(
283
202
            self.0
284
202
                .saturating_duration_since(SystemTime::UNIX_EPOCH)
285
202
                .as_secs()
286
202
                .try_into()
287
202
                .unwrap_or(i64::MAX),
288
202
        ))
289
202
    }
290
}
291

            
292
/// Representation of consensus metadata from the database.
293
#[derive(Debug, Clone, Copy, PartialEq, Eq, CopyGetters)]
294
#[get_copy = "pub(crate)"]
295
pub(crate) struct ConsensusMeta {
296
    /// The document id uniquely identifying the consensus.
297
    docid: DocumentId,
298

            
299
    /// The SHA3 of the unsigned part of the consensus.
300
    unsigned_sha3_256: Sha3_256,
301

            
302
    /// The flavor of the consensus.
303
    flavor: ConsensusFlavor,
304

            
305
    /// The time after which this consensus is valid.
306
    valid_after: Timestamp,
307

            
308
    /// The time after which this consensus stops being fresh.
309
    fresh_until: Timestamp,
310

            
311
    /// The time after which this consensus stops being valid.
312
    valid_until: Timestamp,
313
}
314

            
315
impl ConsensusMeta {
316
    /// Obtains the most recent valid consensus from the database.
317
    ///
318
    /// This function queries the database using a [`Transaction`] in order to
319
    /// have a consistent view upon it.  It will return an [`Option`] containing
320
    /// a consensus.  In order to obtain a *valid* consensus, a [`Timestamp`]
321
    /// plus a [`DirTolerance`] are supplied, which will be used for querying
322
    /// the database in a time-constrained fashion.
323
    ///
324
    /// The [`None`] case implies that no valid consensus has been found, that
325
    /// is, no consensus at all or no consensus whose `valid-before` or
326
    /// `valid-after` lies within the range composed by `now` and `tolerance`.
327
28
    pub(crate) fn query_recent(
328
28
        tx: &Transaction,
329
28
        flavor: ConsensusFlavor,
330
28
        tolerance: &DirTolerance,
331
28
        now: Timestamp,
332
28
    ) -> Result<Option<Self>, DatabaseError> {
333
        // Select the most recent flavored consensus document from the database.
334
        //
335
        // The `valid_after` and `valid_until` cells must be a member of the range:
336
        // `[valid_after - pre_valid_tolerance; valid_after + post_valid_tolerance]`
337
        // (inclusively).
338
28
        let mut meta_stmt = tx.prepare_cached(sql!(
339
28
            "
340
28
            SELECT docid, unsigned_sha3_256, valid_after, fresh_until, valid_until
341
28
            FROM consensus
342
28
            WHERE
343
28
              flavor = :flavor
344
28
              AND :now >= valid_after - :pre_valid
345
28
              AND :now <= valid_until + :post_valid
346
28
            ORDER BY valid_after DESC
347
28
            LIMIT 1
348
28
            "
349
28
        ))?;
350

            
351
        // Actually execute the query; a None is totally valid and considered as
352
        // no consensus being present in the current database.
353
28
        let res = meta_stmt.query_one(named_params! {
354
28
            ":flavor": flavor.name(),
355
28
            ":now": now,
356
28
            ":pre_valid": tolerance.pre_valid_tolerance().as_secs().try_into().unwrap_or(i64::MAX),
357
28
            ":post_valid": tolerance.post_valid_tolerance().as_secs().try_into().unwrap_or(i64::MAX),
358
18
        }, |row| {
359
            Ok(Self {
360
18
                docid: row.get(0)?,
361
18
                unsigned_sha3_256: row.get(1)?,
362
18
                flavor,
363
18
                valid_after: row.get(2)?,
364
18
                fresh_until: row.get(3)?,
365
18
                valid_until: row.get(4)?,
366
            })
367
37
        }).optional()?;
368

            
369
28
        Ok(res)
370
28
    }
371

            
372
    /// Queries the raw data of a [`ConsensusMeta`].
373
2
    pub(crate) fn data(&self, tx: &Transaction<'_>) -> Result<String, DatabaseError> {
374
2
        let mut stmt = tx.prepare_cached(sql!(
375
2
            "
376
2
            SELECT content
377
2
            FROM store
378
2
            WHERE docid = :docid
379
2
            "
380
2
        ))?;
381

            
382
3
        let raw = stmt.query_one(named_params! {":docid": self.docid}, |row| {
383
2
            row.get::<_, Vec<u8>>(0)
384
2
        })?;
385
2
        let raw = String::from_utf8(raw).map_err(into_internal!("utf-8 constraint violated?"))?;
386
2
        Ok(raw)
387
2
    }
388

            
389
    /// Calculates the [`Timestamp`] at which the authorities will be queried again.
390
    ///
391
    /// # Specifications
392
    ///
393
    /// * <https://spec.torproject.org/dir-spec/directory-cache-operation.html#download-ns-from-auth>
394
20002
    pub(crate) fn lifetime<R: Rng>(&self, rng: &mut R) -> Timestamp {
395
20002
        assert!(self.fresh_until < self.valid_until);
396

            
397
20002
        let offset = rng
398
20002
            .gen_range_checked(0..=((self.valid_until - self.fresh_until).as_secs() / 2))
399
20002
            .expect("invalid range?");
400

            
401
20002
        self.fresh_until + Duration::from_secs(offset)
402
20002
    }
403

            
404
    /// Returns the missing server descriptors for this consensus.
405
6
    pub(crate) fn missing_servers(
406
6
        &self,
407
6
        tx: &Transaction<'_>,
408
6
    ) -> Result<HashSet<Sha1>, DatabaseError> {
409
6
        if self.flavor != ConsensusFlavor::Plain {
410
            return Ok(HashSet::new());
411
6
        }
412

            
413
        // Select the missing router descriptors.
414
        //
415
        // A router descriptor is considered missing if it exists in
416
        // `consensus_router_descriptor_member` but not in `router_descriptor`
417
        // because the first entry is added once the consensus got parsed,
418
        // whereas the second entry is added once we have actually retrieved it.
419
        //
420
        // It works by doing a left join on router_descriptor and filtering for
421
        // all entries where the join is NULL, as that implies we are aware of
422
        // the descriptor but not have it stored.
423
        //
424
        // Parameters:
425
        // :docid - The docid of the consensus.
426
6
        let mut stmt = tx.prepare_cached(sql!(
427
6
            "
428
6
            SELECT cr.unsigned_sha1
429
6
            FROM consensus_router_descriptor_member AS cr
430
6
              LEFT JOIN router_descriptor AS server ON cr.unsigned_sha1 = server.unsigned_sha1
431
6
            WHERE
432
6
              cr.consensus_docid = :docid
433
6
              AND cr.unsigned_sha1 IS NOT NULL
434
6
              AND server.unsigned_sha1 IS NULL
435
6
            "
436
6
        ))?;
437

            
438
6
        let missing = stmt
439
11
            .query_map(named_params! {":docid": self.docid}, |row| row.get(0))?
440
6
            .collect::<Result<HashSet<_>, _>>()?;
441
6
        Ok(missing)
442
6
    }
443

            
444
    /// Returns the missing extra infos for this consensus to the best of our abilities.
445
    ///
446
    /// Keep in mind that this does not return **all** missing extra infos but
447
    /// only the missing extra infos of server descriptors we have.
448
6
    pub(crate) fn missing_extras(
449
6
        &self,
450
6
        tx: &Transaction<'_>,
451
6
    ) -> Result<HashSet<Sha1>, DatabaseError> {
452
6
        if self.flavor != ConsensusFlavor::Plain {
453
            return Ok(HashSet::new());
454
6
        }
455

            
456
        // Select the missing extra infos for this consensus.
457
        //
458
        // This return value is not complete because we only know the missing
459
        // extra-infos to the best of our abilities.  In other words: We are
460
        // only aware of a missing extra-info if we have parsed the respective
461
        // server descriptor.
462
        //
463
        // It works by doing an inner join from
464
        // `consensus_router_descriptor_member` to `router_descriptor` because
465
        // we can only know about the extra-infos of which we have the server
466
        // descriptors from.  Afterwards, we do a left join with the
467
        // `router_extra_info` table and filter for all results where the left
468
        // join result is null, hence where we have a server descriptor but not
469
        // the respective extra-info.
470
        //
471
        // Parameters:
472
        // :docid - The docid of the consensus.
473
6
        let mut stmt = tx.prepare_cached(sql!(
474
6
            "
475
6
            SELECT server.extra_unsigned_sha1
476
6
            FROM consensus_router_descriptor_member AS cr
477
6
              INNER JOIN router_descriptor AS server ON cr.unsigned_sha1 = server.unsigned_sha1
478
6
              LEFT JOIN router_extra_info AS extra ON server.extra_unsigned_sha1 = extra.unsigned_sha1
479
6
            WHERE
480
6
              cr.consensus_docid = :docid
481
6
              AND extra.unsigned_sha1 IS NULL
482
6
            "
483
6
        ))?;
484

            
485
6
        let missing = stmt
486
7
            .query_map(named_params! {":docid": self.docid}, |row| row.get(0))?
487
6
            .collect::<Result<HashSet<_>, _>>()?;
488
6
        Ok(missing)
489
6
    }
490

            
491
    /// Returns the missing micro descriptors for this consensus.
492
6
    pub(crate) fn missing_micros(
493
6
        &self,
494
6
        tx: &Transaction<'_>,
495
6
    ) -> Result<HashSet<Sha256>, DatabaseError> {
496
6
        if self.flavor != ConsensusFlavor::Microdesc {
497
2
            return Ok(HashSet::new());
498
4
        }
499

            
500
        // Select the missing micro descriptors.
501
        //
502
        // A micro descriptor is considered missing if it exists in
503
        // `consensus_router_descriptor_member` but not in `router_descriptor`
504
        // because the first entry is added once the consensus got parsed,
505
        // whereas the second entry is added once we have actually retrieved it.
506
        //
507
        // It works by doing a left join on router_descriptor and filtering for
508
        // all entries where the join is NULL, as that implies we are aware of
509
        // the descriptor but not have it stored.
510
        //
511
        // Parameters:
512
        // :docid - The docid of the consensus.
513
4
        let mut stmt = tx.prepare_cached(sql!(
514
4
            "
515
4
            SELECT cr.unsigned_sha2
516
4
            FROM consensus_router_descriptor_member AS cr
517
4
              LEFT JOIN router_descriptor AS micro ON cr.unsigned_sha2 = micro.unsigned_sha2
518
4
            WHERE
519
4
              cr.consensus_docid = :docid
520
4
              AND cr.unsigned_sha2 IS NOT NULL
521
4
              AND micro.unsigned_sha2 IS NULL
522
4
            "
523
4
        ))?;
524

            
525
4
        let missing = stmt
526
8
            .query_map(named_params! {":docid": self.docid}, |row| row.get(0))?
527
4
            .collect::<Result<HashSet<_>, _>>()?;
528
4
        Ok(missing)
529
6
    }
530
}
531

            
532
/// Representation of authority certificate metadata from the database.
533
#[derive(Debug, Clone, Copy, PartialEq, Eq, CopyGetters)]
534
#[get_copy = "pub(crate)"]
535
pub(crate) struct AuthCertMeta {
536
    /// The document id uniquely identifying the consensus.
537
    docid: DocumentId,
538

            
539
    /// The SHA-1 fingerprint of the identity key.
540
    // TODO DIRMIRROR: Change this to RsaIdentity.
541
    kp_auth_id_rsa_sha1: Sha1,
542

            
543
    /// The SHA-1 fingerprint of the signign key.
544
    // TODO DIRMIRROR: Change this to RsaIdentity.
545
    kp_auth_sign_rsa_sha1: Sha1,
546

            
547
    /// The timestamp after which this certificate will be valid.
548
    dir_key_published: Timestamp,
549

            
550
    /// The timestamp until this certificate will be valid.
551
    dir_key_expires: Timestamp,
552
}
553

            
554
impl AuthCertMeta {
555
    /// Obtain the most recently published and valid certificate for each authority.
556
    ///
557
    /// Returns the found [`AuthCertMeta`] items as well as the missing
558
    /// [`AuthCertKeyIds`].
559
    ///
560
    /// # Performance
561
    ///
562
    /// This function has a performance between `O(n * log n)` and `O(n^2)`
563
    /// because it performs `signatories.len()` database queries, with each
564
    /// database query potentially taking something between `O(log n)` to
565
    /// `O(n)` to execute.  However, given that this respective value is
566
    /// oftentimes fairly small, it should not be much of a big concern.
567
12
    pub(crate) fn query_recent(
568
12
        tx: &Transaction,
569
12
        signatories: &[AuthCertKeyIds],
570
12
        tolerance: &DirTolerance,
571
12
        now: Timestamp,
572
12
    ) -> Result<(Vec<Self>, Vec<AuthCertKeyIds>), DatabaseError> {
573
        // For every key pair in `signatories`, get the most recent valid cert.
574
        //
575
        // This query selects the most recent timestamp valid certificate from
576
        // the database for a single given key pair.  It means that this query
577
        // has to be executed as many times as there are entries in
578
        // `signatories`.
579
        //
580
        // Unfortunately, there is no neater way to do this, because the
581
        // alternative would involve using a nested set which SQLite does not
582
        // support, even with the carray extension.  An alternative might be to
583
        // precompute that string and then insert it here using `format!` but
584
        // that feels hacky, error- and injection-prone.
585
        //
586
        // Parameters:
587
        // :id_rsa: The RSA identity key fingerprint in uppercase hexadecimal.
588
        // :sk_rsa: The RSA signing key fingerprint in uppercase hexadecimal.
589
        // :now: The current system timestamp.
590
        // :pre_tolerance: The tolerance for not-yet-valid certificates.
591
        // :post_tolerance: The tolerance for expired certificates.
592
12
        let mut stmt = tx.prepare_cached(sql!(
593
12
            "
594
12
            SELECT docid, kp_auth_id_rsa_sha1, kp_auth_sign_rsa_sha1,
595
12
              dir_key_published, dir_key_expires
596
12
            FROM authority_key_certificate
597
12
            WHERE
598
12
              (:id_rsa, :sk_rsa) = (kp_auth_id_rsa_sha1, kp_auth_sign_rsa_sha1)
599
12
              AND :now >= dir_key_published - :pre_tolerance
600
12
              AND :now <= dir_key_expires + :post_tolerance
601
12
            ORDER BY dir_key_published DESC
602
12
            LIMIT 1
603
12
            "
604
12
        ))?;
605

            
606
        // Keep track of the found (and parsed) certificates and the missing ones.
607
12
        let mut found = Vec::new();
608
12
        let mut missing = Vec::new();
609

            
610
        // Iterate over every key pair and query it, adding it to found if it exists
611
        // and was parsed successfully or to missing if it does not exist within the
612
        // database.
613
78
        for kp in signatories {
614
            // Query the certificate from the database.
615
78
            let res = stmt
616
78
            .query_one(
617
78
                named_params! {
618
78
                    ":id_rsa": kp.id_fingerprint.as_hex_upper(),
619
78
                    ":sk_rsa": kp.sk_fingerprint.as_hex_upper(),
620
78
                    ":now": now,
621
78
                    ":pre_tolerance": tolerance.pre_valid_tolerance().as_secs().try_into().unwrap_or(i64::MAX),
622
78
                    ":post_tolerance": tolerance.post_valid_tolerance().as_secs().try_into().unwrap_or(i64::MAX),
623
                },
624
                |row| Ok(Self {
625
38
                    docid: row.get(0)?,
626
38
                    kp_auth_id_rsa_sha1: row.get(1)?,
627
38
                    kp_auth_sign_rsa_sha1: row.get(2)?,
628
38
                    dir_key_published: row.get(3)?,
629
38
                    dir_key_expires: row.get(4)?,
630
                })
631
            )
632
78
            .optional()?;
633

            
634
78
            match res {
635
38
                Some(cert) => found.push(cert),
636
40
                None => missing.push(*kp),
637
            }
638
        }
639

            
640
12
        Ok((found, missing))
641
12
    }
642

            
643
    /// Queries the raw data of an [`AuthCertMeta`].
644
    pub(crate) fn data(&self, tx: &Transaction<'_>) -> Result<String, DatabaseError> {
645
        let mut stmt = tx.prepare_cached(sql!(
646
            "
647
            SELECT content
648
            FROM store
649
            WHERE docid = :docid
650
            "
651
        ))?;
652

            
653
        let raw = stmt.query_one(named_params! {":docid": self.docid}, |row| {
654
            row.get::<_, Vec<u8>>(0)
655
        })?;
656
        let raw = String::from_utf8(raw).map_err(into_internal!("utf-8 constraint violated?"))?;
657
        Ok(raw)
658
    }
659

            
660
    /// Inserts a new authority certificate into the database.
661
    ///
662
    /// Keep in mind that the data in the [`AuthCert`] should correspond to the
663
    /// data found in `data`, as this method performs no parsing.
664
18
    pub(crate) fn insert<I: Iterator<Item = ContentEncoding>>(
665
18
        tx: &Transaction<'_>,
666
18
        encodings: I,
667
18
        cert: &AuthCert,
668
18
        data: &str,
669
18
    ) -> Result<(), DatabaseError> {
670
        // Inserts a new certificate into the meta table.
671
        //
672
        // Parameters:
673
        // :docid - The document id.
674
        // :id_rsa - The identity key fingerprint.
675
        // :sign_rsa - The signing key fingerprint
676
        // :published - The published timestamp.
677
        // :expires - The expires timestamp.
678
18
        let mut stmt = tx.prepare_cached(sql!(
679
18
            "
680
18
            INSERT INTO authority_key_certificate
681
18
            (docid, kp_auth_id_rsa_sha1, kp_auth_sign_rsa_sha1, dir_key_published, dir_key_expires)
682
18
            VALUES
683
18
            (:docid, :id_rsa, :sign_rsa, :published, :expires)
684
18
            "
685
18
        ))?;
686

            
687
18
        let docid = store_insert(tx, data.as_bytes(), encodings)?;
688
18
        stmt.execute(named_params! {
689
18
            ":docid": docid,
690
18
            ":id_rsa": cert.dir_identity_key.to_rsa_identity().as_hex_upper(),
691
18
            ":sign_rsa": cert.dir_signing_key.to_rsa_identity().as_hex_upper(),
692
18
            ":published": Timestamp::from(cert.dir_key_published.0),
693
18
            ":expires": Timestamp::from(cert.dir_key_expires.0),
694
18
        })?;
695

            
696
18
        Ok(())
697
18
    }
698
}
699

            
700
/// A no-op macro just returning the supplied.
701
///
702
/// The purpose of this macro is to semantically mark [`str`] literals to be
703
/// SQL statement.
704
///
705
/// Keep in mind that the compiler will not notice if you forget this macro.
706
/// Unfortunately, you have to ensure it yourself.
707
macro_rules! sql {
708
    ($s:literal) => {
709
        $s
710
    };
711
}
712

            
713
pub(crate) use sql;
714

            
715
/// Opens a database from disk, creating a [`Pool`] for it.
716
///
717
/// This function should be the entry point for all things requiring a database
718
/// handle, as this function prepares all necessary steps required for operating
719
/// on the database correctly, such as:
720
/// * Schema initialization.
721
/// * Schema upgrade.
722
/// * Setting connection specific settings.
723
///
724
/// # `SQLITE_BUSY` Caveat
725
///
726
/// There is a problem with the handling of `SQLITE_BUSY` when opening an
727
/// SQLite database.  In WAL, opening a database might acquire an exclusive lock
728
/// for a very short amount of time, in order to perform clean-up from previous
729
/// connections alongside other tasks for maintaining database integrity?  This
730
/// means, that opening multiple SQLite databases simultaneously will result in
731
/// a busy error regardless of a busy handler, as setting a busy handler will
732
/// require an existing connection, something we are unable to obtain in the
733
/// first place.
734
///
735
/// In order to mitigate this issue, the recommended way in the SQLite community
736
/// is to simply ensure that database connections are opened sequentially,
737
/// by urging calling applications to just use a single [`Pool`] instance.
738
///
739
/// Testing this is hard unfortunately.
740
36
pub(crate) fn open<P: AsRef<Path>>(
741
36
    path: P,
742
36
) -> Result<Pool<SqliteConnectionManager>, DatabaseError> {
743
36
    let num_cores = std::thread::available_parallelism()
744
36
        .unwrap_or(NonZero::new(8).expect("8 == 0?"))
745
36
        .get() as u32;
746

            
747
36
    let manager = r2d2_sqlite::SqliteConnectionManager::file(&path);
748
36
    let pool = Pool::builder().max_size(num_cores).build(manager)?;
749

            
750
36
    rw_tx(&pool, |tx| {
751
        // Prepare the database, doing the following steps:
752
        // 1. Checking the database schema.
753
        // 2. Upgrading (in future) or initializing the database schema (if empty).
754

            
755
36
        let has_arti_dirserver_schema_version = match tx.query_one(
756
            sql!(
757
36
                "
758
36
                SELECT name
759
36
                FROM sqlite_master
760
36
                  WHERE type = 'table'
761
36
                    AND name = 'arti_dirserver_schema_version'
762
36
                "
763
            ),
764
36
            params![],
765
2
            |_| Ok(()),
766
        ) {
767
2
            Ok(()) => true,
768
34
            Err(rusqlite::Error::QueryReturnedNoRows) => false,
769
            Err(e) => return Err(DatabaseError::LowLevel(e)),
770
        };
771

            
772
36
        if has_arti_dirserver_schema_version {
773
2
            let version = tx.query_one(
774
2
                sql!("SELECT version FROM arti_dirserver_schema_version WHERE rowid = 1"),
775
2
                params![],
776
2
                |row| row.get::<_, String>(0),
777
            )?;
778

            
779
2
            match version.as_ref() {
780
2
                "1" => {}
781
2
                unknown => {
782
2
                    return Err(DatabaseError::IncompatibleSchema {
783
2
                        version: unknown.into(),
784
2
                    })
785
                }
786
            }
787
        } else {
788
34
            tx.execute_batch(V1_SCHEMA)?;
789
        }
790

            
791
34
        Ok::<_, DatabaseError>(())
792
36
    })??;
793

            
794
34
    Ok(pool)
795
36
}
796

            
797
/// Executes a closure `op` with a given read-only [`Transaction`].
798
///
799
/// The [`Transaction`] always gets rolled back the moment `op` returns.
800
///
801
/// The [`Transaction`] gets initialized with the global pragma options set.
802
///
803
/// **The closure shall not perform write operations!**
804
/// Not only do they get rolled back anyways, but upgrading the [`Transaction`]
805
/// from a read to a write transaction will lead to other simultaneous write upgrades
806
/// to fail.  Unfortunately, there is no real programmatic way to ensure this.
807
50
pub(crate) fn read_tx<U, F>(pool: &Pool<SqliteConnectionManager>, op: F) -> Result<U, DatabaseError>
808
50
where
809
50
    F: FnOnce(&Transaction<'_>) -> U,
810
{
811
50
    let mut conn = pool.get()?;
812
50
    conn.execute_batch(GLOBAL_OPTIONS)?;
813
50
    let tx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?;
814
50
    let res = op(&tx);
815
50
    tx.rollback()?;
816
50
    Ok(res)
817
50
}
818

            
819
/// Executes a closure `op` with a given read-write [`Transaction`].
820
///
821
/// The [`Transaction`] always gets committed the moment `op` returns.
822
///
823
/// The [`Transaction`] gets initialized with the global pragma options set.
824
///
825
/// The [`Transaction`] gets created with [`TransactionBehavior::Immediate`],
826
/// meaning it will immediately exist as a write connection, retrying in the
827
/// case of a [`rusqlite::ErrorCode::DatabaseBusy`] until it failed after 1s.
828
68
pub(crate) fn rw_tx<U, F>(pool: &Pool<SqliteConnectionManager>, op: F) -> Result<U, DatabaseError>
829
68
where
830
68
    F: FnOnce(&Transaction<'_>) -> U,
831
{
832
68
    let mut conn = pool.get()?;
833
68
    conn.execute_batch(GLOBAL_OPTIONS)?;
834
68
    let tx = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?;
835
66
    let res = op(&tx);
836
66
    tx.commit()?;
837
66
    Ok(res)
838
68
}
839

            
840
/// Inserts `data` into store while also compressing it with given encodings.
841
///
842
/// Returns the [`DocumentId`] of `data`.
843
///
844
/// This function inserts `data` into store and also compresses it into all
845
/// given compression formats.
846
///
847
/// Duplicates get re-encoded and replaced in the database, including
848
/// [`ContentEncoding::Identity`].
849
42
pub(crate) fn store_insert<I: Iterator<Item = ContentEncoding>>(
850
42
    tx: &Transaction,
851
42
    data: &[u8],
852
42
    encodings: I,
853
42
) -> Result<DocumentId, DatabaseError> {
854
    // The statement to insert some data into the store.
855
    //
856
    // Parameters:
857
    // :docid - The docid.
858
    // :content - The binary data.
859
42
    let mut store_stmt = tx.prepare_cached(sql!(
860
42
        "
861
42
        INSERT OR REPLACE INTO store (docid, content)
862
42
        VALUES
863
42
        (:docid, :content)
864
42
        "
865
42
    ))?;
866

            
867
    // The statement to insert a compressed document into the metatable.
868
    //
869
    // Parameters:
870
    // :algorithm - The name of the encoding algorithm.
871
    // :identity_docid - The docid of the plain-text document in the store.
872
    // :compressed_docid - The docid of the encoded document in the store.
873
42
    let mut compressed_stmt = tx.prepare_cached(sql!(
874
42
        "
875
42
        INSERT OR REPLACE INTO compressed_document (algorithm, identity_docid, compressed_docid)
876
42
        VALUES
877
42
        (:algorithm, :identity_docid, :compressed_docid)
878
42
        "
879
42
    ))?;
880

            
881
    // Insert the plain document into the store.
882
42
    let identity_docid = DocumentId::digest(data);
883
42
    store_stmt.execute(named_params! {
884
42
        ":docid": identity_docid,
885
42
        ":content": data
886
42
    })?;
887

            
888
    // Compress it into all formats and insert it into store and compressed.
889
120
    for encoding in encodings {
890
120
        if encoding == ContentEncoding::Identity {
891
            // Ignore identity because we inserted that above.
892
24
            continue;
893
96
        }
894

            
895
        // We map a compression error to a bug because there is no good reason
896
        // on why it should fail, given that we compress from memory data to
897
        // memory data.  Probably because it uses the std::io::Writer interface
898
        // which itself demands use of std::io::Result.
899
96
        let compressed = compress(data, encoding).map_err(into_internal!("{encoding} failed?"))?;
900
96
        let compressed_docid = DocumentId::digest(&compressed);
901
96
        store_stmt.execute(named_params! {
902
96
            ":docid": compressed_docid,
903
96
            ":content": compressed,
904
96
        })?;
905
96
        compressed_stmt.execute(named_params! {
906
96
            ":algorithm": encoding.to_string(),
907
96
            ":identity_docid": identity_docid,
908
96
            ":compressed_docid": compressed_docid,
909
96
        })?;
910
    }
911

            
912
42
    Ok(identity_docid)
913
42
}
914

            
915
/// Compresses `data` into a specified [`ContentEncoding`].
916
///
917
/// Returns a [`Vec`] containing the encoded data.
918
106
fn compress(data: &[u8], encoding: ContentEncoding) -> Result<Vec<u8>, std::io::Error> {
919
106
    match encoding {
920
2
        ContentEncoding::Identity => Ok(data.to_vec()),
921
        ContentEncoding::Deflate => {
922
26
            let mut w = DeflateEncoder::new(Vec::new(), Default::default());
923
26
            w.write_all(data)?;
924
26
            w.finish()
925
        }
926
        ContentEncoding::Gzip => {
927
26
            let mut w = GzEncoder::new(Vec::new(), Default::default());
928
26
            w.write_all(data)?;
929
26
            w.finish()
930
        }
931
26
        ContentEncoding::XZstd => zstd::encode_all(data, Default::default()),
932
        ContentEncoding::XTorLzma => {
933
26
            let mut res = Vec::new();
934
26
            lzma_rs::lzma_compress(&mut Cursor::new(data), &mut res)?;
935
26
            Ok(res)
936
        }
937
    }
938
106
}
939

            
940
#[cfg(test)]
941
mod test {
942
    // @@ begin test lint list maintained by maint/add_warning @@
943
    #![allow(clippy::bool_assert_comparison)]
944
    #![allow(clippy::clone_on_copy)]
945
    #![allow(clippy::dbg_macro)]
946
    #![allow(clippy::mixed_attributes_style)]
947
    #![allow(clippy::print_stderr)]
948
    #![allow(clippy::print_stdout)]
949
    #![allow(clippy::single_char_pattern)]
950
    #![allow(clippy::unwrap_used)]
951
    #![allow(clippy::unchecked_time_subtraction)]
952
    #![allow(clippy::useless_vec)]
953
    #![allow(clippy::needless_pass_by_value)]
954
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
955
    use std::{
956
        collections::HashSet,
957
        io::Read,
958
        sync::{Arc, Once},
959
    };
960

            
961
    use flate2::read::{DeflateDecoder, GzDecoder};
962
    use lazy_static::lazy_static;
963
    use rusqlite::Connection;
964
    use strum::IntoEnumIterator;
965
    use tempfile::tempdir;
966
    use tor_basic_utils::test_rng::testing_rng;
967
    use tor_dircommon::config::DirToleranceBuilder;
968
    use tor_llcrypto::pk::rsa::RsaIdentity;
969

            
970
    use super::*;
971

            
972
    lazy_static! {
973
    /// Wed Jan 01 2020 00:00:00 GMT+0000
974
    static ref VALID_AFTER: Timestamp =
975
        (SystemTime::UNIX_EPOCH + Duration::from_secs(1577836800)).into();
976

            
977
    /// Wed Jan 01 2020 01:00:00 GMT+0000
978
    static ref FRESH_UNTIL: Timestamp =
979
        *VALID_AFTER + Duration::from_secs(60 * 60);
980

            
981
    /// Wed Jan 01 2020 02:00:00 GMT+0000
982
    static ref FRESH_UNTIL_HALF: Timestamp =
983
        *FRESH_UNTIL + Duration::from_secs(60 * 60);
984

            
985
    /// Wed Jan 01 2020 03:00:00 GMT+0000
986
    static ref VALID_UNTIL: Timestamp =
987
        *FRESH_UNTIL + Duration::from_secs(60 * 60 * 2);
988
    }
989

            
990
    const CONSENSUS_CONTENT: &str = "Lorem ipsum dolor sit amet.";
991
    const CONSENSUS_MD_CONTENT: &str = "Lorem ipsum dolor sit amet!";
992
    const CERT_CONTENT: &[u8] = include_bytes!("../testdata/authcert-longclaw");
993

            
994
    lazy_static! {
995
        static ref CONSENSUS_DOCID: DocumentId = DocumentId::digest(CONSENSUS_CONTENT.as_bytes());
996
        static ref CONSENSUS_MD_DOCID: DocumentId =
997
            DocumentId::digest(CONSENSUS_MD_CONTENT.as_bytes());
998
        static ref CERT_DOCID: DocumentId = DocumentId::digest(CERT_CONTENT);
999
    }
    fn create_dummy_db() -> Pool<SqliteConnectionManager> {
        let pool = open("").unwrap();
        rw_tx(&pool, |tx| {
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![*CONSENSUS_DOCID, CONSENSUS_CONTENT.as_bytes()],
            )
            .unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![*CONSENSUS_MD_DOCID, CONSENSUS_MD_CONTENT.as_bytes()],
            )
            .unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![*CERT_DOCID, CERT_CONTENT],
            )
            .unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![
                    DocumentId::digest(include_bytes!("../testdata/descriptor1-ns")),
                    include_bytes!("../testdata/descriptor1-ns")
                ]
            ).unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![
                    DocumentId::digest(include_bytes!("../testdata/descriptor1-extra-info")),
                    include_bytes!("../testdata/descriptor1-extra-info")
                ]
            ).unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![
                    DocumentId::digest(include_bytes!("../testdata/descriptor1-md")),
                    include_bytes!("../testdata/descriptor1-md"),
            ]).unwrap();
            // Insert descriptor into router_extra_info.
            tx.execute(sql!(
                "
                INSERT INTO router_extra_info
                (docid, unsigned_sha1, kp_relay_id_rsa_sha1)
                VALUES (?1, ?2, ?3)
                "
            ), params![
                Sha256::digest(include_bytes!("../testdata/descriptor1-extra-info")),
                Sha1::digest(include_bytes!("../testdata/descriptor1-extra-info-unsigned")),
                "000004ACBB9D29BCBA17256BB35928DDBFC8ABA9",
            ]).unwrap();
            // We only insert descriptor1 here.
            tx.execute(sql!(
                "
                INSERT INTO router_descriptor
                (docid, unsigned_sha1, unsigned_sha2, kp_relay_id_rsa_sha1, flavor, extra_unsigned_sha1)
                VALUES
                (?1, ?2, ?3, ?4, 'ns', ?5)
                "
            ), params![
                DocumentId::digest(include_bytes!("../testdata/descriptor1-ns")),
                Sha1::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha256::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha1::from([0, 0, 4, 172, 187, 157, 41, 188, 186, 23, 37, 107, 179, 89, 40, 221, 191, 200, 171, 169]),
                Sha1::digest(include_bytes!("../testdata/descriptor1-extra-info-unsigned")),
            ]).unwrap();
            // Only insert descriptor1's md
            tx.execute(sql!(
                "
                INSERT INTO router_descriptor
                (docid, unsigned_sha1, unsigned_sha2, kp_relay_id_rsa_sha1, flavor)
                VALUES (?1, ?2, ?3, ?4, 'microdesc')
                "
            ), params![
                DocumentId::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha1::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha256::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha1::from([0, 0, 4, 172, 187, 157, 41, 188, 186, 23, 37, 107, 179, 89, 40, 221, 191, 200, 171, 169]),
            ]).unwrap();
            tx.execute(
                sql!(
                    "
                    INSERT INTO consensus
                    (docid, unsigned_sha3_256, flavor, valid_after, fresh_until, valid_until)
                    VALUES
                    (?1, ?2, ?3, ?4, ?5, ?6)
                    "
                ),
                params![
                    *CONSENSUS_DOCID,
                    "0000000000000000000000000000000000000000000000000000000000000000", // not the correct hash
                    ConsensusFlavor::Plain.name(),
                    *VALID_AFTER,
                    *FRESH_UNTIL,
                    *VALID_UNTIL,
                ],
            )
            .unwrap();
            tx.execute(
                sql!(
                    "
                    INSERT INTO consensus
                    (docid, unsigned_sha3_256, flavor, valid_after, fresh_until, valid_until)
                    VALUES
                    (?1, ?2, ?3, ?4, ?5, ?6)
                    "
                ),
                params![
                    *CONSENSUS_MD_DOCID,
                    "0000000000000000000000000000000000000000000000000000000000000001", // not the correct hash
                    ConsensusFlavor::Microdesc.name(),
                    *VALID_AFTER,
                    *FRESH_UNTIL,
                    *VALID_UNTIL,
                ],
            )
            .unwrap();
            tx.execute(sql!(
                "
                INSERT INTO consensus_router_descriptor_member
                (consensus_docid, unsigned_sha1, unsigned_sha2)
                VALUES
                (?1, ?2, NULL),
                (?1, ?3, NULL)
                "
            ), params![
                *CONSENSUS_DOCID,
                Sha1::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha1::digest(include_bytes!("../testdata/descriptor2-ns-unsigned")),
            ]).unwrap();
            tx.execute(sql!(
                "
                INSERT INTO consensus_router_descriptor_member
                (consensus_docid, unsigned_sha1, unsigned_sha2)
                VALUES
                (?1, NULL, ?2),
                (?1, NULL, ?3)
                "
            ), params![
                *CONSENSUS_MD_DOCID,
                Sha256::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha256::digest(include_bytes!("../testdata/descriptor2-md")),
            ]).unwrap();
            tx.execute(sql!(
                "
                INSERT INTO authority_key_certificate
                  (docid, kp_auth_id_rsa_sha1, kp_auth_sign_rsa_sha1, dir_key_published, dir_key_expires)
                VALUES
                  (:docid, :id_rsa, :sk_rsa, :published, :expires)
                "
                ),
                named_params! {
                ":docid": *CERT_DOCID,
                ":id_rsa": "49015F787433103580E3B66A1707A00E60F2D15B",
                ":sk_rsa": "C5D153A6F0DA7CC22277D229DCBBF929D0589FE0",
                ":published": 1764543578,
                ":expires": 1772492378,
            }).unwrap();
        })
        .unwrap();
        pool
    }
    #[test]
    fn open_test() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        open(&db_path).unwrap();
        let conn = Connection::open(&db_path).unwrap();
        // Check if the version was initialized properly.
        let version = conn
            .query_one(
                "SELECT version FROM arti_dirserver_schema_version WHERE rowid = 1",
                params![],
                |row| row.get::<_, String>(0),
            )
            .unwrap();
        assert_eq!(version, "1");
        // Set the version to something unknown.
        conn.execute(
            "UPDATE arti_dirserver_schema_version SET version = 42",
            params![],
        )
        .unwrap();
        drop(conn);
        assert_eq!(
            open(&db_path).unwrap_err().to_string(),
            "incompatible schema version: 42"
        );
    }
    #[test]
    fn read_tx_test() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        let pool = open(&db_path).unwrap();
        // Do a write transaction despite forbidden.
        read_tx(&pool, |tx| {
            tx.execute_batch("DELETE FROM arti_dirserver_schema_version")
                .unwrap();
            let e = tx
                .query_one(
                    sql!("SELECT version FROM arti_dirserver_schema_version"),
                    params![],
                    |row| row.get::<_, String>(0),
                )
                .unwrap_err();
            assert_eq!(e, rusqlite::Error::QueryReturnedNoRows);
        })
        .unwrap();
        // Normal check.
        let version: String = read_tx(&pool, |tx| {
            tx.query_one(
                sql!("SELECT version FROM arti_dirserver_schema_version"),
                params![],
                |row| row.get(0),
            )
            .unwrap()
        })
        .unwrap();
        assert_eq!(version, "1");
    }
    #[test]
    fn rw_tx_test() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        let pool = open(&db_path).unwrap();
        // Do a write transaction.
        rw_tx(&pool, |tx| {
            tx.execute_batch("DELETE FROM arti_dirserver_schema_version")
                .unwrap();
        })
        .unwrap();
        // Check that it was deleted.
        read_tx(&pool, |tx| {
            let e = tx
                .query_one(
                    sql!("SELECT version FROM arti_dirserver_schema_version"),
                    params![],
                    |row| row.get::<_, String>(0),
                )
                .unwrap_err();
            assert_eq!(e, rusqlite::Error::QueryReturnedNoRows);
        })
        .unwrap();
    }
    /// Tests whether our SQLite busy error handling works in normal situations.
    ///
    /// A normal situations means a situation where a lock is never held for
    /// more than 1000ms.  In our case, we will work with two threads.
    /// t1 will acquire an exclusive lock and inform t2 about it.  t2 waits
    /// until t1 has acquired this lock and then immediately informs t1, that
    /// it will now wait for a lock too.  Now, t1 will immediately terminate,
    /// thereby releasing the lock and leading t2 to eventually acquire it.
    #[test]
    fn rw_tx_busy_timeout_working() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        let pool = open(db_path).unwrap();
        // t2 will wait on this before it starts doing stuff.
        let t1_acquired_lock = Arc::new(Once::new());
        // t1 will wait on this in order to terminate properly.
        let t2_is_waiting = Arc::new(Once::new());
        let t1 = std::thread::spawn({
            let pool = pool.clone();
            let t1_acquired_lock = t1_acquired_lock.clone();
            let t2_is_waiting = t2_is_waiting.clone();
            move || {
                rw_tx(&pool, move |_tx| {
                    // Inform t2 we have write lock.
                    t1_acquired_lock.call_once(|| ());
                    println!("t1 acquired write lock");
                    // Wait for t2 to start waiting.
                    t2_is_waiting.wait();
                })
                .unwrap();
                println!("t2 released write lock");
            }
        });
        println!("t2 waits for t1 to acquire write lock");
        t1_acquired_lock.wait();
        t2_is_waiting.call_once(|| ());
        rw_tx(&pool, |_| ()).unwrap();
        println!("t2 acquired and released write lock");
        t1.join().unwrap();
    }
    /// Tests whether our SQLite busy error handlings fails as expected.
    ///
    /// We configure SQLite to fail after 1000ms.  This test works with two
    /// threads.  t1 will acquire an exclusive lock on the database and will
    /// inform t2 about it, which itself will wait until t1 has acquired the
    /// lock.  t2 will then immediately try to also obtain an exclusive lock,
    /// which should fail after about 1000ms.  After the failure, t2 informs
    /// t1 that it has failed, causing t1 to terminate.
    #[test]
    fn rw_tx_busy_timeout_busy() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        let pool = open(db_path).unwrap();
        // t2 will wait on this before it starts doing stuff.
        let t1_acquired_lock = Arc::new(Once::new());
        // t1 will wait on this in order to terminate properly.
        let t2_gave_up = Arc::new(Once::new());
        let t1 = std::thread::spawn({
            let pool = pool.clone();
            let t1_acquired_lock = t1_acquired_lock.clone();
            let t2_gave_up = t2_gave_up.clone();
            move || {
                rw_tx(&pool, move |_tx| {
                    // Inform t2 we have the write lock.
                    t1_acquired_lock.call_once(|| ());
                    println!("t1 acquired write lock");
                    // Wait for t2 to give up before we release (how mean from us).
                    t2_gave_up.wait();
                })
                .unwrap();
                println!("t1 released write lock");
            }
        });
        println!("t2 waits for t1 to acquire write lock");
        t1_acquired_lock.wait();
        let e = rw_tx(&pool, |_| ()).unwrap_err();
        assert_eq!(
            e.to_string(),
            "low-level rusqlite error: database is locked"
        );
        println!("t2 gave up on acquiring write lock");
        t2_gave_up.call_once(|| ());
        t1.join().unwrap();
    }
    #[test]
    fn store_insert_test() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        open(&db_path).unwrap();
        let mut conn = Connection::open(&db_path).unwrap();
        let tx = conn.transaction().unwrap();
        let docid = store_insert(&tx, "foobar".as_bytes(), ContentEncoding::iter()).unwrap();
        assert_eq!(
            docid,
            "C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2"
        );
        let res = tx
            .query_one(
                sql!(
                    "
                    SELECT content
                    FROM store
                    WHERE docid = 'C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2'
                    "
                ),
                params![],
                |row| row.get::<_, Vec<u8>>(0),
            )
            .unwrap();
        assert_eq!(res, "foobar".as_bytes());
        let mut stmt = tx.prepare_cached(sql!(
            "
            SELECT algorithm
            FROM compressed_document
            WHERE identity_docid = 'C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2'
            "
        )).unwrap();
        let algorithms = stmt
            .query_map(params![], |row| row.get::<_, String>(0))
            .unwrap();
        let algorithms = algorithms.map(|x| x.unwrap()).collect::<HashSet<_>>();
        assert_eq!(
            algorithms,
            HashSet::from([
                "deflate".to_string(),
                "gzip".to_string(),
                "x-zstd".to_string(),
                "x-tor-lzma".to_string()
            ])
        );
        // Now insert the same thing a second time again and see whether the
        // ON CONFLICT magic works.
        let docid_second = store_insert(&tx, "foobar".as_bytes(), ContentEncoding::iter()).unwrap();
        assert_eq!(docid, docid_second);
        // Remove a few compressed entries and get them again.
        let n = tx
            .execute(
                sql!(
                    "
                    DELETE FROM
                    compressed_document
                    WHERE algorithm IN ('deflate', 'x-zstd')
                    "
                ),
                params![],
            )
            .unwrap();
        assert_eq!(n, 2);
        let docid_third = store_insert(&tx, "foobar".as_bytes(), ContentEncoding::iter()).unwrap();
        assert_eq!(docid, docid_third);
        let algorithms = stmt
            .query_map(params![], |row| row.get::<_, String>(0))
            .unwrap();
        let algorithms = algorithms.map(|x| x.unwrap()).collect::<HashSet<_>>();
        assert_eq!(
            algorithms,
            HashSet::from([
                "deflate".to_string(),
                "gzip".to_string(),
                "x-zstd".to_string(),
                "x-tor-lzma".to_string()
            ])
        );
    }
    #[test]
    fn compress_test() {
        /// Asserts that `res` contains `encoding`.
        fn contains(encoding: ContentEncoding, res: &[(ContentEncoding, Vec<u8>)]) {
            assert!(res.iter().any(|x| x.0 == encoding));
        }
        const INPUT: &[u8] = "foobar".as_bytes();
        // Check whether everything was encoded.
        let res = ContentEncoding::iter()
            .map(|encoding| (encoding, compress(INPUT, encoding).unwrap()))
            .collect::<Vec<_>>();
        assert_eq!(res.len(), 5);
        contains(ContentEncoding::Identity, &res);
        contains(ContentEncoding::Deflate, &res);
        contains(ContentEncoding::Gzip, &res);
        contains(ContentEncoding::XTorLzma, &res);
        contains(ContentEncoding::XZstd, &res);
        // Check if we can decode it.
        for (encoding, compressed) in res {
            let mut decompressed = Vec::new();
            match encoding {
                ContentEncoding::Identity => decompressed = compressed,
                ContentEncoding::Deflate => {
                    DeflateDecoder::new(Cursor::new(compressed))
                        .read_to_end(&mut decompressed)
                        .unwrap();
                }
                ContentEncoding::Gzip => {
                    GzDecoder::new(Cursor::new(compressed))
                        .read_to_end(&mut decompressed)
                        .unwrap();
                }
                ContentEncoding::XTorLzma => {
                    lzma_rs::lzma_decompress(&mut Cursor::new(compressed), &mut decompressed)
                        .unwrap();
                }
                ContentEncoding::XZstd => {
                    decompressed = zstd::decode_all(Cursor::new(compressed)).unwrap();
                }
            }
            assert_eq!(decompressed, INPUT);
        }
    }
    #[test]
    fn recent_consensus() {
        let pool = create_dummy_db();
        let no_tolerance = DirToleranceBuilder::default()
            .pre_valid_tolerance(Duration::ZERO)
            .post_valid_tolerance(Duration::ZERO)
            .build()
            .unwrap();
        let liberal_tolerance = DirToleranceBuilder::default()
            .pre_valid_tolerance(Duration::from_secs(60 * 60)) // 1h before
            .post_valid_tolerance(Duration::from_secs(60 * 60)) // 1h after
            .build()
            .unwrap();
        read_tx(&pool, move |tx| {
            // Get None by being way before valid-after.
            assert!(ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                SystemTime::UNIX_EPOCH.into(),
            )
            .unwrap()
            .is_none());
            // Get None by being way behind valid-until.
            assert!(ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_UNTIL + Duration::from_secs(60 * 60 * 24 * 365),
            )
            .unwrap()
            .is_none());
            // Get None by being minimally before valid-after.
            assert!(ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_AFTER - Duration::from_secs(1),
            )
            .unwrap()
            .is_none());
            // Get None by being minimally behind valid-until.
            assert!(ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_UNTIL + Duration::from_secs(1),
            )
            .unwrap()
            .is_none());
            // Get a valid consensus by being in the interval.
            let res1 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_AFTER,
            )
            .unwrap()
            .unwrap();
            let res2 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_UNTIL,
            )
            .unwrap()
            .unwrap();
            let res3 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_AFTER + Duration::from_secs(60 * 30),
            )
            .unwrap()
            .unwrap();
            assert_eq!(
                res1,
                ConsensusMeta {
                    docid: *CONSENSUS_DOCID,
                    unsigned_sha3_256: Sha3_256::from([0; 32]),
                    flavor: ConsensusFlavor::Plain,
                    valid_after: *VALID_AFTER,
                    fresh_until: *FRESH_UNTIL,
                    valid_until: *VALID_UNTIL,
                }
            );
            assert_eq!(res1, res2);
            assert_eq!(res2, res3);
            // Get a valid consensus using a liberal dir tolerance.
            let res1 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &liberal_tolerance,
                *VALID_AFTER - Duration::from_secs(60 * 30),
            )
            .unwrap()
            .unwrap();
            let res2 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &liberal_tolerance,
                *VALID_UNTIL + Duration::from_secs(60 * 30),
            )
            .unwrap()
            .unwrap();
            assert_eq!(
                res1,
                ConsensusMeta {
                    docid: *CONSENSUS_DOCID,
                    unsigned_sha3_256: Sha3_256::from([0; 32]),
                    flavor: ConsensusFlavor::Plain,
                    valid_after: *VALID_AFTER,
                    fresh_until: *FRESH_UNTIL,
                    valid_until: *VALID_UNTIL,
                }
            );
            assert_eq!(res1, res2);
        })
        .unwrap();
    }
    #[test]
    fn sync_timeout() {
        // We repeat the tests a few thousand times to go over many random values.
        let cons = ConsensusMeta {
            docid: *CONSENSUS_DOCID,
            unsigned_sha3_256: Sha3_256::from([0; 32]),
            flavor: ConsensusFlavor::Plain,
            valid_after: *VALID_AFTER,
            fresh_until: *FRESH_UNTIL,
            valid_until: *VALID_UNTIL,
        };
        for _ in 0..10000 {
            let when = cons.lifetime(&mut testing_rng());
            assert!(when >= *FRESH_UNTIL);
            assert!(when <= *FRESH_UNTIL_HALF);
        }
    }
    #[test]
    fn get_auth_cert() {
        let pool = create_dummy_db();
        // Empty.
        let (found, missing) = read_tx(&pool, |tx| {
            AuthCertMeta::query_recent(
                tx,
                &[],
                &DirTolerance::default(),
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1765900013)).into(),
            )
        })
        .unwrap()
        .unwrap();
        assert!(found.is_empty());
        assert!(missing.is_empty());
        // Find one and two missing ones.
        let (found, missing) = read_tx(&pool, |tx| {
            AuthCertMeta::query_recent(
                tx,
                &[
                    // Found one.
                    AuthCertKeyIds {
                        id_fingerprint: RsaIdentity::from_hex(
                            "49015F787433103580E3B66A1707A00E60F2D15B",
                        )
                        .unwrap(),
                        sk_fingerprint: RsaIdentity::from_hex(
                            "C5D153A6F0DA7CC22277D229DCBBF929D0589FE0",
                        )
                        .unwrap(),
                    },
                    // Missing.
                    AuthCertKeyIds {
                        id_fingerprint: RsaIdentity::from_hex(
                            "0000000000000000000000000000000000000000",
                        )
                        .unwrap(),
                        sk_fingerprint: RsaIdentity::from_hex(
                            "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                        )
                        .unwrap(),
                    },
                    // Missing.
                    AuthCertKeyIds {
                        id_fingerprint: RsaIdentity::from_hex(
                            "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                        )
                        .unwrap(),
                        sk_fingerprint: RsaIdentity::from_hex(
                            "0000000000000000000000000000000000000000",
                        )
                        .unwrap(),
                    },
                ],
                &DirTolerance::default(),
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1765900013)).into(),
            )
        })
        .unwrap()
        .unwrap();
        assert_eq!(
            found,
            vec![AuthCertMeta {
                docid: DocumentId::digest(CERT_CONTENT),
                kp_auth_id_rsa_sha1: Sha1::from([
                    73, 1, 95, 120, 116, 51, 16, 53, 128, 227, 182, 106, 23, 7, 160, 14, 96, 242,
                    209, 91
                ]),
                kp_auth_sign_rsa_sha1: Sha1::from([
                    197, 209, 83, 166, 240, 218, 124, 194, 34, 119, 210, 41, 220, 187, 249, 41,
                    208, 88, 159, 224
                ]),
                dir_key_published: (SystemTime::UNIX_EPOCH + Duration::from_secs(1764543578))
                    .into(),
                dir_key_expires: (SystemTime::UNIX_EPOCH + Duration::from_secs(1772492378)).into()
            }]
        );
        assert_eq!(
            missing,
            vec![
                AuthCertKeyIds {
                    id_fingerprint: RsaIdentity::from_hex(
                        "0000000000000000000000000000000000000000",
                    )
                    .unwrap(),
                    sk_fingerprint: RsaIdentity::from_hex(
                        "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                    )
                    .unwrap(),
                },
                AuthCertKeyIds {
                    id_fingerprint: RsaIdentity::from_hex(
                        "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                    )
                    .unwrap(),
                    sk_fingerprint: RsaIdentity::from_hex(
                        "0000000000000000000000000000000000000000",
                    )
                    .unwrap(),
                }
            ]
        );
    }
    #[test]
    fn missing_server_descriptors() {
        let pool = create_dummy_db();
        let meta = read_tx(&pool, |tx| {
            ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &DirTolerance::default(),
                *VALID_AFTER,
            )
        })
        .unwrap()
        .unwrap()
        .unwrap();
        // Only one should be returned.
        let missing_servers = read_tx(&pool, |tx| meta.missing_servers(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_servers,
            HashSet::from([Sha1::digest(include_bytes!(
                "../testdata/descriptor2-ns-unsigned"
            ))])
        );
        // If we delete all router descriptors we have, we should get both.
        rw_tx(&pool, |tx| {
            tx.execute(sql!("DELETE FROM router_descriptor"), params![])
        })
        .unwrap()
        .unwrap();
        // Now both should be returned
        let missing_servers = read_tx(&pool, |tx| meta.missing_servers(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_servers,
            HashSet::from([
                Sha1::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha1::digest(include_bytes!("../testdata/descriptor2-ns-unsigned"))
            ])
        );
    }
    #[test]
    fn missing_extra_infos() {
        let pool = create_dummy_db();
        let meta = read_tx(&pool, |tx| {
            ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &DirTolerance::default(),
                *VALID_AFTER,
            )
        })
        .unwrap()
        .unwrap()
        .unwrap();
        // We should have no missing extra-infos.
        // Technically extra-info of the second relay is missing too, but we
        // cannot know that.
        let missing_extras = read_tx(&pool, |tx| meta.missing_extras(tx))
            .unwrap()
            .unwrap();
        assert!(missing_extras.is_empty());
        // Now delete the record of router_extra_info.
        pool.get()
            .unwrap()
            .execute(sql!("DELETE FROM router_extra_info"), params![])
            .unwrap();
        // Now we should get a single missing extra-info.
        let missing_extras = read_tx(&pool, |tx| meta.missing_extras(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_extras,
            HashSet::from([Sha1::digest(include_bytes!(
                "../testdata/descriptor1-extra-info-unsigned"
            ))])
        );
    }
    #[test]
    fn missing_micro_descriptors() {
        let pool = create_dummy_db();
        let meta = read_tx(&pool, |tx| {
            ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Microdesc,
                &DirTolerance::default(),
                *VALID_AFTER,
            )
        })
        .unwrap()
        .unwrap()
        .unwrap();
        // Only one should be returned.
        let missing_micros = read_tx(&pool, |tx| meta.missing_micros(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_micros,
            HashSet::from([Sha256::digest(include_bytes!("../testdata/descriptor2-md"))])
        );
        // If we delete all router descriptors we have, we should get both.
        rw_tx(&pool, |tx| {
            tx.execute(sql!("DELETE FROM router_descriptor"), params![])
        })
        .unwrap()
        .unwrap();
        // Now both should be returned
        let missing_servers = read_tx(&pool, |tx| meta.missing_micros(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_servers,
            HashSet::from([
                Sha256::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha256::digest(include_bytes!("../testdata/descriptor2-md"))
            ])
        );
    }
}