1
//! Access to the database schema.
2
//!
3
//! This module is not intended to provide a high-level ORM, instead it serves
4
//! the purpose of initializing and upgrading the database, if necessary.
5
//!
6
//! # Synchronous or Asynchronous?
7
//!
8
//! The question on whether the database and access to it shall be synchronous
9
//! or asynchronous has been fairly long debate that eventually got settled
10
//! after realizing that an asynchronous approach does not work.  This comment
11
//! should serve as a reminder for future devs, wondering why we use certain
12
//! synchronous primitives in an otherwise asynchronous codebase.
13
//!
14
//! Early on, it was clear that we would need some sort of connection pool,
15
//! primarily for two reasons:
16
//! 1. Performing frequent open and close calls in every task would be costly.
17
//! 2. Sharing a single connection object with a Mutex would be a waste
18
//!
19
//! Because the application itself is primarily asynchronous, we decided to go
20
//! with an asynchronous connection pool as well, leading to the choose of
21
//! `deadpool` initially.
22
//!
23
//! However, soon thereafter, problems with `deadpool` became evident.  Those
24
//! problems mostly stemmed from the synchronous nature of SQLite itself.  In our
25
//! case, this problem was initially triggered by figuring out a way to solve
26
//! `SQLITE_BUSY` handling.  In the end, we decided to settle upon the following
27
//! approach: Set `PRAGMA busy_timeout` to a certain value and create write
28
//! transactions with `BEGIN EXCLUSIVE`.  This way, SQLite would try to obtain
29
//! a write transaction for `busy_timeout` milliseconds by blocking the current
30
//! thread.  Due to this blocking, async no longer made any sense and was in
31
//! fact quite counter-productive because those potential sleep could screw a
32
//! lot of things up, which became very evident while trying to test this.
33
//!
34
//! Besides, throughout refactoring the code base, we realized that, even while
35
//! still using `deadpool`, the actual "asynchronous" calls interfacing with the
36
//! database became smaller and smaller.  In the end, the asynchronous code just
37
//! involved parts of obtaining a connection and creating a transaction,
38
//! eventually resulting in a calling a synchronous function taking the
39
//! transaction handle to perform the lion's share of the operation.
40

            
41
// TODO DIRMIRROR: This could benefit from methods by wrapping the pool into a
42
// custom type.
43

            
44
use std::{
45
    collections::HashSet,
46
    fmt::Display,
47
    io::{Cursor, Write},
48
    num::NonZero,
49
    ops::{Add, Sub},
50
    path::Path,
51
    time::{Duration, SystemTime},
52
};
53

            
54
use digest::Digest;
55
use flate2::write::{DeflateEncoder, GzEncoder};
56
use getset::CopyGetters;
57
use r2d2::Pool;
58
use r2d2_sqlite::SqliteConnectionManager;
59
use rand::Rng;
60
use rusqlite::{
61
    named_params, params,
62
    types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
63
    OptionalExtension, ToSql, Transaction, TransactionBehavior,
64
};
65
use saturating_time::SaturatingTime;
66
use tor_basic_utils::RngExt;
67
use tor_dircommon::config::DirTolerance;
68
use tor_error::into_internal;
69
use tor_netdoc::doc::{
70
    authcert::{AuthCert, AuthCertKeyIds},
71
    netstatus::ConsensusFlavor,
72
};
73

            
74
use crate::err::DatabaseError;
75

            
76
/// Version 1 of the database schema.
77
///
78
/// TODO DIRMIRROR: Before the release, figure out where to use rowid and where
79
/// to use docid.
80
const V1_SCHEMA: &str = include_str!("schema_v1.sql");
81

            
82
/// Global options set in every connection.
83
const GLOBAL_OPTIONS: &str = sql!(
84
    "
85
PRAGMA journal_mode=WAL;
86
PRAGMA foreign_keys=ON;
87
PRAGMA busy_timeout=1000;
88
"
89
);
90

            
91
/// Convenience macro for implementing a hash type in a rusqlite compatible fashion.
92
///
93
/// This macro accepts the following parameters:
94
/// 1. `name` for specifying an identifier of the type, such as [`Sha256`].
95
/// 2. `algo` for specifying the type from the rust-crypto [`digest`] ecosystem,
96
///    such as [`tor_llcrypto::d::Sha256`].
97
/// 3. The size in bytes of the hash output, such as `32` for [`Sha256`].
98
///     * Unfortunately, we cannot use something like [`Digest::output_size()`]
99
///       because it is not a constant.
100
///
101
/// It generates a struct with `name` as the identifier, which implements the
102
/// following methods:
103
/// * `digest` for wrapping around [`Digest::digest()`].
104
///
105
/// It also implements the following traits:
106
/// * [`Display`]
107
/// * [`FromSql`]
108
/// * [`ToSql`]
109
/// * [`PartialEq<&str>`] for base16 comparisons
110
/// * [`From<u8; $size>`] but only in tests
111
macro_rules! impl_hash_wrapper {
112
    ($name:ident, $algo:ty, $size:literal) => {
113
        /// Database wrapper type.
114
        #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
115
        pub(crate) struct $name([u8; $size]);
116

            
117
        impl $name {
118
            /// Computes the hash from arbitrary data.
119
440
            pub(crate) fn digest(data: &[u8]) -> Self {
120
440
                Self(<$algo>::digest(data).into())
121
440
            }
122
        }
123

            
124
        impl Display for $name {
125
            /// Formats the hash in uppercase hexadecimal.
126
816
            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127
816
                write!(f, "{}", hex::encode_upper(self.0))
128
816
            }
129
        }
130

            
131
        impl FromSql for $name {
132
176
            fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
133
                // We read the hash as a hexadecimal string from the database.
134
                // Convert it to binary data and check length afterwards.
135
176
                let data: [u8; $size] = value
136
176
                    .as_str()
137
176
                    .map(hex::decode)?
138
176
                    .map_err(|e| {
139
                        FromSqlError::Other(Box::new(tor_error::internal!(
140
                            "non hex data in database? {e}"
141
                        )))
142
                    })?
143
176
                    .try_into()
144
176
                    .map_err(|_| {
145
                        FromSqlError::Other(Box::new(tor_error::internal!(
146
                            "$name with invalid length in database?"
147
                        )))
148
                    })?;
149

            
150
176
                Ok(Self(data))
151
176
            }
152
        }
153

            
154
        impl ToSql for $name {
155
814
            fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
156
                // Because Self is only constructed with FromSql and digest
157
                // data, it is safe to assume it is valid.
158
814
                Ok(ToSqlOutput::from(self.to_string()))
159
814
            }
160
        }
161

            
162
        impl PartialEq<&str> for $name {
163
2
            fn eq(&self, other: &&str) -> bool {
164
2
                self.to_string() == other.to_uppercase()
165
2
            }
166
        }
167

            
168
        #[cfg(test)]
169
        impl From<[u8; $size]> for $name {
170
40
            fn from(value: [u8; $size]) -> Self {
171
40
                Self(value)
172
40
            }
173
        }
174
    };
175
}
176

            
177
impl_hash_wrapper!(Sha1, tor_llcrypto::d::Sha1, 20);
178
impl_hash_wrapper!(Sha256, tor_llcrypto::d::Sha256, 32);
179
impl_hash_wrapper!(Sha3_256, tor_llcrypto::d::Sha3_256, 32);
180

            
181
/// The identifier for documents in the content-addressable cache.
182
///
183
/// Right now, this is a [`Sha256`] hash, but this may change in future.
184
pub(crate) type DocumentId = Sha256;
185

            
186
/// The supported content encodings.
187
#[derive(Debug, Clone, Copy, PartialEq, strum::EnumString, strum::Display, strum::EnumIter)]
188
#[strum(serialize_all = "kebab-case", ascii_case_insensitive)]
189
pub(crate) enum ContentEncoding {
190
    /// RFC2616 section 3.5.
191
    Identity,
192
    /// RFC2616 section 3.5.
193
    Deflate,
194
    /// RFC2616 section 3.5.
195
    Gzip,
196
    /// The zstandard compression algorithm (www.zstd.net).
197
    XZstd,
198
    /// The lzma compression algorithm with a "present" value no higher than 6.
199
    XTorLzma,
200
}
201

            
202
/// A wrapper around [`SystemTime`] with convenient features.
203
///
204
/// Please use this type throughout the crate internally, instead of
205
/// [`SystemTime`].
206
///
207
/// # Conversion
208
///
209
/// This type can be safely converted from and into a [`SystemTime`], because
210
/// it is just a wrapper type.
211
///
212
/// # Saturating Arithmetic
213
///
214
/// This type implements [`Add`] and [`Sub`] for [`Duration`] and [`Timestamp`]
215
/// ([`Sub`] only) using saturating arithmetic from the [`saturating_time`]
216
/// crate.  It means that addition and subtraction can be safely performed
217
/// without the potential risk of an unexpected panic, instead wrapping to
218
/// a local maximum/minimum or [`Duration::ZERO`] depending on the type.
219
///
220
/// Note that we don't provide a saturating version of [`Duration`], so addition
221
/// or subtraction of two [`Duration`]s still needs care to avoid panics.
222
///
223
/// # SQLite Interaction
224
///
225
/// This type implements [`FromSql`] and [`ToSql`], making it convenient to
226
/// integrate into SQL statements, as the database schema represents timestamps
227
/// internally using a non-negative [`i64`] storing the seconds since the epoch.
228
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
229
pub(crate) struct Timestamp(SystemTime);
230

            
231
impl From<SystemTime> for Timestamp {
232
60
    fn from(value: SystemTime) -> Self {
233
60
        Self(value)
234
60
    }
235
}
236

            
237
impl From<Timestamp> for SystemTime {
238
18
    fn from(value: Timestamp) -> Self {
239
18
        value.0
240
18
    }
241
}
242

            
243
impl Add<Duration> for Timestamp {
244
    type Output = Self;
245

            
246
    /// Performs a saturating addition wrapping to [`SystemTime::max_value()`].
247
20020
    fn add(self, rhs: Duration) -> Self::Output {
248
20020
        Self(self.0.saturating_add(rhs))
249
20020
    }
250
}
251

            
252
impl Sub<Duration> for Timestamp {
253
    type Output = Self;
254

            
255
    /// Performs a saturating subtraction wrapping to [`SystemTime::min_value()`].
256
4
    fn sub(self, rhs: Duration) -> Self::Output {
257
4
        Self(self.0.saturating_sub(rhs))
258
4
    }
259
}
260

            
261
impl Sub<Timestamp> for Timestamp {
262
    type Output = Duration;
263

            
264
    /// Performs a saturating duration_since wrapping to [`Duration::ZERO`].
265
20002
    fn sub(self, rhs: Timestamp) -> Self::Output {
266
20002
        self.0.saturating_duration_since(rhs.0)
267
20002
    }
268
}
269

            
270
impl FromSql for Timestamp {
271
130
    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
272
130
        let mut res = SystemTime::UNIX_EPOCH;
273
130
        res = res.saturating_add(Duration::from_secs(value.as_i64()?.try_into().unwrap_or(0)));
274
130
        Ok(Self(res))
275
130
    }
276
}
277

            
278
impl ToSql for Timestamp {
279
202
    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
280
202
        Ok(ToSqlOutput::from(
281
202
            self.0
282
202
                .saturating_duration_since(SystemTime::UNIX_EPOCH)
283
202
                .as_secs()
284
202
                .try_into()
285
202
                .unwrap_or(i64::MAX),
286
202
        ))
287
202
    }
288
}
289

            
290
/// Representation of consensus metadata from the database.
291
#[derive(Debug, Clone, Copy, PartialEq, Eq, CopyGetters)]
292
#[get_copy = "pub(crate)"]
293
pub(crate) struct ConsensusMeta {
294
    /// The document id uniquely identifying the consensus.
295
    docid: DocumentId,
296

            
297
    /// The SHA3 of the unsigned part of the consensus.
298
    unsigned_sha3_256: Sha3_256,
299

            
300
    /// The flavor of the consensus.
301
    flavor: ConsensusFlavor,
302

            
303
    /// The time after which this consensus is valid.
304
    valid_after: Timestamp,
305

            
306
    /// The time after which this consensus stops being fresh.
307
    fresh_until: Timestamp,
308

            
309
    /// The time after which this consensus stops being valid.
310
    valid_until: Timestamp,
311
}
312

            
313
impl ConsensusMeta {
314
    /// Obtains the most recent valid consensus from the database.
315
    ///
316
    /// This function queries the database using a [`Transaction`] in order to
317
    /// have a consistent view upon it.  It will return an [`Option`] containing
318
    /// a consensus.  In order to obtain a *valid* consensus, a [`Timestamp`]
319
    /// plus a [`DirTolerance`] are supplied, which will be used for querying
320
    /// the database in a time-constrained fashion.
321
    ///
322
    /// The [`None`] case implies that no valid consensus has been found, that
323
    /// is, no consensus at all or no consensus whose `valid-before` or
324
    /// `valid-after` lies within the range composed by `now` and `tolerance`.
325
28
    pub(crate) fn query_recent(
326
28
        tx: &Transaction,
327
28
        flavor: ConsensusFlavor,
328
28
        tolerance: &DirTolerance,
329
28
        now: Timestamp,
330
28
    ) -> Result<Option<Self>, DatabaseError> {
331
        // Select the most recent flavored consensus document from the database.
332
        //
333
        // The `valid_after` and `valid_until` cells must be a member of the range:
334
        // `[valid_after - pre_valid_tolerance; valid_after + post_valid_tolerance]`
335
        // (inclusively).
336
28
        let mut meta_stmt = tx.prepare_cached(sql!(
337
28
            "
338
28
            SELECT docid, unsigned_sha3_256, valid_after, fresh_until, valid_until
339
28
            FROM consensus
340
28
            WHERE
341
28
              flavor = :flavor
342
28
              AND :now >= valid_after - :pre_valid
343
28
              AND :now <= valid_until + :post_valid
344
28
            ORDER BY valid_after DESC
345
28
            LIMIT 1
346
28
            "
347
28
        ))?;
348

            
349
        // Actually execute the query; a None is totally valid and considered as
350
        // no consensus being present in the current database.
351
28
        let res = meta_stmt.query_one(named_params! {
352
28
            ":flavor": flavor.name(),
353
28
            ":now": now,
354
28
            ":pre_valid": tolerance.pre_valid_tolerance().as_secs().try_into().unwrap_or(i64::MAX),
355
28
            ":post_valid": tolerance.post_valid_tolerance().as_secs().try_into().unwrap_or(i64::MAX),
356
18
        }, |row| {
357
            Ok(Self {
358
18
                docid: row.get(0)?,
359
18
                unsigned_sha3_256: row.get(1)?,
360
18
                flavor,
361
18
                valid_after: row.get(2)?,
362
18
                fresh_until: row.get(3)?,
363
18
                valid_until: row.get(4)?,
364
            })
365
37
        }).optional()?;
366

            
367
28
        Ok(res)
368
28
    }
369

            
370
    /// Queries the raw data of a [`ConsensusMeta`].
371
2
    pub(crate) fn data(&self, tx: &Transaction<'_>) -> Result<String, DatabaseError> {
372
2
        let mut stmt = tx.prepare_cached(sql!(
373
2
            "
374
2
            SELECT content
375
2
            FROM store
376
2
            WHERE docid = :docid
377
2
            "
378
2
        ))?;
379

            
380
3
        let raw = stmt.query_one(named_params! {":docid": self.docid}, |row| {
381
2
            row.get::<_, Vec<u8>>(0)
382
2
        })?;
383
2
        let raw = String::from_utf8(raw).map_err(into_internal!("utf-8 constraint violated?"))?;
384
2
        Ok(raw)
385
2
    }
386

            
387
    /// Calculates the [`Timestamp`] at which the authorities will be queried again.
388
    ///
389
    /// # Specifications
390
    ///
391
    /// * <https://spec.torproject.org/dir-spec/directory-cache-operation.html#download-ns-from-auth>
392
20002
    pub(crate) fn lifetime<R: Rng>(&self, rng: &mut R) -> Timestamp {
393
20002
        assert!(self.fresh_until < self.valid_until);
394

            
395
20002
        let offset = rng
396
20002
            .gen_range_checked(0..=((self.valid_until - self.fresh_until).as_secs() / 2))
397
20002
            .expect("invalid range?");
398

            
399
20002
        self.fresh_until + Duration::from_secs(offset)
400
20002
    }
401

            
402
    /// Returns the missing server descriptors for this consensus.
403
6
    pub(crate) fn missing_servers(
404
6
        &self,
405
6
        tx: &Transaction<'_>,
406
6
    ) -> Result<HashSet<Sha1>, DatabaseError> {
407
6
        if self.flavor != ConsensusFlavor::Plain {
408
            return Ok(HashSet::new());
409
6
        }
410

            
411
        // Select the missing router descriptors.
412
        //
413
        // A router descriptor is considered missing if it exists in
414
        // `consensus_router_descriptor_member` but not in `router_descriptor`
415
        // because the first entry is added once the consensus got parsed,
416
        // whereas the second entry is added once we have actually retrieved it.
417
        //
418
        // It works by doing a left join on router_descriptor and filtering for
419
        // all entries where the join is NULL, as that implies we are aware of
420
        // the descriptor but not have it stored.
421
        //
422
        // Parameters:
423
        // :docid - The docid of the consensus.
424
6
        let mut stmt = tx.prepare_cached(sql!(
425
6
            "
426
6
            SELECT cr.unsigned_sha1
427
6
            FROM consensus_router_descriptor_member AS cr
428
6
              LEFT JOIN router_descriptor AS server ON cr.unsigned_sha1 = server.unsigned_sha1
429
6
            WHERE
430
6
              cr.consensus_docid = :docid
431
6
              AND server.unsigned_sha1 IS NULL
432
6
            "
433
6
        ))?;
434

            
435
6
        let missing = stmt
436
11
            .query_map(named_params! {":docid": self.docid}, |row| row.get(0))?
437
6
            .collect::<Result<HashSet<_>, _>>()?;
438
6
        Ok(missing)
439
6
    }
440

            
441
    /// Returns the missing extra infos for this consensus to the best of our abilities.
442
    ///
443
    /// Keep in mind that this does not return **all** missing extra infos but
444
    /// only the missing extra infos of server descriptors we have.
445
6
    pub(crate) fn missing_extras(
446
6
        &self,
447
6
        tx: &Transaction<'_>,
448
6
    ) -> Result<HashSet<Sha1>, DatabaseError> {
449
6
        if self.flavor != ConsensusFlavor::Plain {
450
            return Ok(HashSet::new());
451
6
        }
452

            
453
        // Select the missing extra infos for this consensus.
454
        //
455
        // This return value is not complete because we only know the missing
456
        // extra-infos to the best of our abilities.  In other words: We are
457
        // only aware of a missing extra-info if we have parsed the respective
458
        // server descriptor.
459
        //
460
        // It works by doing an inner join from
461
        // `consensus_router_descriptor_member` to `router_descriptor` because
462
        // we can only know about the extra-infos of which we have the server
463
        // descriptors from.  Afterwards, we do a left join with the
464
        // `router_extra_info` table and filter for all results where the left
465
        // join result is null, hence where we have a server descriptor but not
466
        // the respective extra-info.
467
        //
468
        // Parameters:
469
        // :docid - The docid of the consensus.
470
6
        let mut stmt = tx.prepare_cached(sql!(
471
6
            "
472
6
            SELECT server.extra_unsigned_sha1
473
6
            FROM consensus_router_descriptor_member AS cr
474
6
              INNER JOIN router_descriptor AS server ON cr.unsigned_sha1 = server.unsigned_sha1
475
6
              LEFT JOIN router_extra_info AS extra ON server.extra_unsigned_sha1 = extra.unsigned_sha1
476
6
            WHERE
477
6
              cr.consensus_docid = :docid
478
6
              AND extra.unsigned_sha1 IS NULL
479
6
            "
480
6
        ))?;
481

            
482
6
        let missing = stmt
483
7
            .query_map(named_params! {":docid": self.docid}, |row| row.get(0))?
484
6
            .collect::<Result<HashSet<_>, _>>()?;
485
6
        Ok(missing)
486
6
    }
487

            
488
    /// Returns the missing micro descriptors for this consensus.
489
6
    pub(crate) fn missing_micros(
490
6
        &self,
491
6
        tx: &Transaction<'_>,
492
6
    ) -> Result<HashSet<Sha256>, DatabaseError> {
493
6
        if self.flavor != ConsensusFlavor::Microdesc {
494
2
            return Ok(HashSet::new());
495
4
        }
496

            
497
        // Select the missing micro descriptors.
498
        //
499
        // A micro descriptor is considered missing if it exists in
500
        // `consensus_router_descriptor_member` but not in `router_descriptor`
501
        // because the first entry is added once the consensus got parsed,
502
        // whereas the second entry is added once we have actually retrieved it.
503
        //
504
        // It works by doing a left join on router_descriptor and filtering for
505
        // all entries where the join is NULL, as that implies we are aware of
506
        // the descriptor but not have it stored.
507
        //
508
        // Parameters:
509
        // :docid - The docid of the consensus.
510
4
        let mut stmt = tx.prepare_cached(sql!(
511
4
            "
512
4
            SELECT cr.unsigned_sha2
513
4
            FROM consensus_router_descriptor_member AS cr
514
4
              LEFT JOIN router_descriptor AS micro ON cr.unsigned_sha2 = micro.unsigned_sha2
515
4
            WHERE
516
4
              cr.consensus_docid = :docid
517
4
              AND micro.unsigned_sha2 IS NULL
518
4
            "
519
4
        ))?;
520

            
521
4
        let missing = stmt
522
8
            .query_map(named_params! {":docid": self.docid}, |row| row.get(0))?
523
4
            .collect::<Result<HashSet<_>, _>>()?;
524
4
        Ok(missing)
525
6
    }
526
}
527

            
528
/// Representation of authority certificate metadata from the database.
529
#[derive(Debug, Clone, Copy, PartialEq, Eq, CopyGetters)]
530
#[get_copy = "pub(crate)"]
531
pub(crate) struct AuthCertMeta {
532
    /// The document id uniquely identifying the consensus.
533
    docid: DocumentId,
534

            
535
    /// The SHA-1 fingerprint of the identity key.
536
    // TODO DIRMIRROR: Change this to RsaIdentity.
537
    kp_auth_id_rsa_sha1: Sha1,
538

            
539
    /// The SHA-1 fingerprint of the signign key.
540
    // TODO DIRMIRROR: Change this to RsaIdentity.
541
    kp_auth_sign_rsa_sha1: Sha1,
542

            
543
    /// The timestamp after which this certificate will be valid.
544
    dir_key_published: Timestamp,
545

            
546
    /// The timestamp until this certificate will be valid.
547
    dir_key_expires: Timestamp,
548
}
549

            
550
impl AuthCertMeta {
551
    /// Obtain the most recently published and valid certificate for each authority.
552
    ///
553
    /// Returns the found [`AuthCertMeta`] items as well as the missing
554
    /// [`AuthCertKeyIds`].
555
    ///
556
    /// # Performance
557
    ///
558
    /// This function has a performance between `O(n * log n)` and `O(n^2)`
559
    /// because it performs `signatories.len()` database queries, with each
560
    /// database query potentially taking something between `O(log n)` to
561
    /// `O(n)` to execute.  However, given that this respective value is
562
    /// oftentimes fairly small, it should not be much of a big concern.
563
12
    pub(crate) fn query_recent(
564
12
        tx: &Transaction,
565
12
        signatories: &[AuthCertKeyIds],
566
12
        tolerance: &DirTolerance,
567
12
        now: Timestamp,
568
12
    ) -> Result<(Vec<Self>, Vec<AuthCertKeyIds>), DatabaseError> {
569
        // For every key pair in `signatories`, get the most recent valid cert.
570
        //
571
        // This query selects the most recent timestamp valid certificate from
572
        // the database for a single given key pair.  It means that this query
573
        // has to be executed as many times as there are entries in
574
        // `signatories`.
575
        //
576
        // Unfortunately, there is no neater way to do this, because the
577
        // alternative would involve using a nested set which SQLite does not
578
        // support, even with the carray extension.  An alternative might be to
579
        // precompute that string and then insert it here using `format!` but
580
        // that feels hacky, error- and injection-prone.
581
        //
582
        // Parameters:
583
        // :id_rsa: The RSA identity key fingerprint in uppercase hexadecimal.
584
        // :sk_rsa: The RSA signing key fingerprint in uppercase hexadecimal.
585
        // :now: The current system timestamp.
586
        // :pre_tolerance: The tolerance for not-yet-valid certificates.
587
        // :post_tolerance: The tolerance for expired certificates.
588
12
        let mut stmt = tx.prepare_cached(sql!(
589
12
            "
590
12
            SELECT docid, kp_auth_id_rsa_sha1, kp_auth_sign_rsa_sha1,
591
12
              dir_key_published, dir_key_expires
592
12
            FROM authority_key_certificate
593
12
            WHERE
594
12
              (:id_rsa, :sk_rsa) = (kp_auth_id_rsa_sha1, kp_auth_sign_rsa_sha1)
595
12
              AND :now >= dir_key_published - :pre_tolerance
596
12
              AND :now <= dir_key_expires + :post_tolerance
597
12
            ORDER BY dir_key_published DESC
598
12
            LIMIT 1
599
12
            "
600
12
        ))?;
601

            
602
        // Keep track of the found (and parsed) certificates and the missing ones.
603
12
        let mut found = Vec::new();
604
12
        let mut missing = Vec::new();
605

            
606
        // Iterate over every key pair and query it, adding it to found if it exists
607
        // and was parsed successfully or to missing if it does not exist within the
608
        // database.
609
90
        for kp in signatories {
610
            // Query the certificate from the database.
611
78
            let res = stmt
612
78
            .query_one(
613
78
                named_params! {
614
78
                    ":id_rsa": kp.id_fingerprint.as_hex_upper(),
615
78
                    ":sk_rsa": kp.sk_fingerprint.as_hex_upper(),
616
78
                    ":now": now,
617
78
                    ":pre_tolerance": tolerance.pre_valid_tolerance().as_secs().try_into().unwrap_or(i64::MAX),
618
78
                    ":post_tolerance": tolerance.post_valid_tolerance().as_secs().try_into().unwrap_or(i64::MAX),
619
                },
620
                |row| Ok(Self {
621
38
                    docid: row.get(0)?,
622
38
                    kp_auth_id_rsa_sha1: row.get(1)?,
623
38
                    kp_auth_sign_rsa_sha1: row.get(2)?,
624
38
                    dir_key_published: row.get(3)?,
625
38
                    dir_key_expires: row.get(4)?,
626
                })
627
            )
628
78
            .optional()?;
629

            
630
78
            match res {
631
38
                Some(cert) => found.push(cert),
632
40
                None => missing.push(*kp),
633
            }
634
        }
635

            
636
12
        Ok((found, missing))
637
12
    }
638

            
639
    /// Queries the raw data of an [`AuthCertMeta`].
640
    pub(crate) fn data(&self, tx: &Transaction<'_>) -> Result<String, DatabaseError> {
641
        let mut stmt = tx.prepare_cached(sql!(
642
            "
643
            SELECT content
644
            FROM store
645
            WHERE docid = :docid
646
            "
647
        ))?;
648

            
649
        let raw = stmt.query_one(named_params! {":docid": self.docid}, |row| {
650
            row.get::<_, Vec<u8>>(0)
651
        })?;
652
        let raw = String::from_utf8(raw).map_err(into_internal!("utf-8 constraint violated?"))?;
653
        Ok(raw)
654
    }
655

            
656
    /// Inserts a new authority certificate into the database.
657
    ///
658
    /// Keep in mind that the data in the [`AuthCert`] should correspond to the
659
    /// data found in `data`, as this method performs no parsing.
660
18
    pub(crate) fn insert<I: Iterator<Item = ContentEncoding>>(
661
18
        tx: &Transaction<'_>,
662
18
        encodings: I,
663
18
        cert: &AuthCert,
664
18
        data: &str,
665
18
    ) -> Result<(), DatabaseError> {
666
        // Inserts a new certificate into the meta table.
667
        //
668
        // Parameters:
669
        // :docid - The document id.
670
        // :id_rsa - The identity key fingerprint.
671
        // :sign_rsa - The signing key fingerprint
672
        // :published - The published timestamp.
673
        // :expires - The expires timestamp.
674
18
        let mut stmt = tx.prepare_cached(sql!(
675
18
            "
676
18
            INSERT INTO authority_key_certificate
677
18
            (docid, kp_auth_id_rsa_sha1, kp_auth_sign_rsa_sha1, dir_key_published, dir_key_expires)
678
18
            VALUES
679
18
            (:docid, :id_rsa, :sign_rsa, :published, :expires)
680
18
            "
681
18
        ))?;
682

            
683
18
        let docid = store_insert(tx, data.as_bytes(), encodings)?;
684
18
        stmt.execute(named_params! {
685
18
            ":docid": docid,
686
18
            ":id_rsa": cert.dir_identity_key.to_rsa_identity().as_hex_upper(),
687
18
            ":sign_rsa": cert.dir_signing_key.to_rsa_identity().as_hex_upper(),
688
18
            ":published": Timestamp::from(cert.dir_key_published.0),
689
18
            ":expires": Timestamp::from(cert.dir_key_expires.0),
690
18
        })?;
691

            
692
18
        Ok(())
693
18
    }
694
}
695

            
696
/// A no-op macro just returning the supplied.
697
///
698
/// The purpose of this macro is to semantically mark [`str`] literals to be
699
/// SQL statement.
700
///
701
/// Keep in mind that the compiler will not notice if you forget this macro.
702
/// Unfortunately, you have to ensure it yourself.
703
macro_rules! sql {
704
    ($s:literal) => {
705
        $s
706
    };
707
}
708

            
709
pub(crate) use sql;
710

            
711
/// Opens a database from disk, creating a [`Pool`] for it.
712
///
713
/// This function should be the entry point for all things requiring a database
714
/// handle, as this function prepares all necessary steps required for operating
715
/// on the database correctly, such as:
716
/// * Schema initialization.
717
/// * Schema upgrade.
718
/// * Setting connection specific settings.
719
///
720
/// # `SQLITE_BUSY` Caveat
721
///
722
/// There is a problem with the handling of `SQLITE_BUSY` when opening an
723
/// SQLite database.  In WAL, opening a database might acquire an exclusive lock
724
/// for a very short amount of time, in order to perform clean-up from previous
725
/// connections alongside other tasks for maintaining database integrity?  This
726
/// means, that opening multiple SQLite databases simultaneously will result in
727
/// a busy error regardless of a busy handler, as setting a busy handler will
728
/// require an existing connection, something we are unable to obtain in the
729
/// first place.
730
///
731
/// In order to mitigate this issue, the recommended way in the SQLite community
732
/// is to simply ensure that database connections are opened sequentially,
733
/// by urging calling applications to just use a single [`Pool`] instance.
734
///
735
/// Testing this is hard unfortunately.
736
36
pub(crate) fn open<P: AsRef<Path>>(
737
36
    path: P,
738
36
) -> Result<Pool<SqliteConnectionManager>, DatabaseError> {
739
36
    let num_cores = std::thread::available_parallelism()
740
36
        .unwrap_or(NonZero::new(8).expect("8 == 0?"))
741
36
        .get() as u32;
742

            
743
36
    let manager = r2d2_sqlite::SqliteConnectionManager::file(&path);
744
36
    let pool = Pool::builder().max_size(num_cores).build(manager)?;
745

            
746
36
    rw_tx(&pool, |tx| {
747
        // Prepare the database, doing the following steps:
748
        // 1. Checking the database schema.
749
        // 2. Upgrading (in future) or initializing the database schema (if empty).
750

            
751
36
        let has_arti_dirserver_schema_version = match tx.query_one(
752
            sql!(
753
36
                "
754
36
                SELECT name
755
36
                FROM sqlite_master
756
36
                  WHERE type = 'table'
757
36
                    AND name = 'arti_dirserver_schema_version'
758
36
                "
759
            ),
760
36
            params![],
761
2
            |_| Ok(()),
762
        ) {
763
2
            Ok(()) => true,
764
34
            Err(rusqlite::Error::QueryReturnedNoRows) => false,
765
            Err(e) => return Err(DatabaseError::LowLevel(e)),
766
        };
767

            
768
36
        if has_arti_dirserver_schema_version {
769
2
            let version = tx.query_one(
770
2
                sql!("SELECT version FROM arti_dirserver_schema_version WHERE rowid = 1"),
771
2
                params![],
772
2
                |row| row.get::<_, String>(0),
773
            )?;
774

            
775
2
            match version.as_ref() {
776
2
                "1" => {}
777
2
                unknown => {
778
2
                    return Err(DatabaseError::IncompatibleSchema {
779
2
                        version: unknown.into(),
780
2
                    })
781
                }
782
            }
783
        } else {
784
34
            tx.execute_batch(V1_SCHEMA)?;
785
        }
786

            
787
34
        Ok::<_, DatabaseError>(())
788
36
    })??;
789

            
790
34
    Ok(pool)
791
36
}
792

            
793
/// Executes a closure `op` with a given read-only [`Transaction`].
794
///
795
/// The [`Transaction`] always gets rolled back the moment `op` returns.
796
///
797
/// The [`Transaction`] gets initialized with the global pragma options set.
798
///
799
/// **The closure shall not perform write operations!**
800
/// Not only do they get rolled back anyways, but upgrading the [`Transaction`]
801
/// from a read to a write transaction will lead to other simultaneous write upgrades
802
/// to fail.  Unfortunately, there is no real programmatic way to ensure this.
803
50
pub(crate) fn read_tx<U, F>(pool: &Pool<SqliteConnectionManager>, op: F) -> Result<U, DatabaseError>
804
50
where
805
50
    F: FnOnce(&Transaction<'_>) -> U,
806
{
807
50
    let mut conn = pool.get()?;
808
50
    conn.execute_batch(GLOBAL_OPTIONS)?;
809
50
    let tx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?;
810
50
    let res = op(&tx);
811
50
    tx.rollback()?;
812
50
    Ok(res)
813
50
}
814

            
815
/// Executes a closure `op` with a given read-write [`Transaction`].
816
///
817
/// The [`Transaction`] always gets committed the moment `op` returns.
818
///
819
/// The [`Transaction`] gets initialized with the global pragma options set.
820
///
821
/// The [`Transaction`] gets created with [`TransactionBehavior::Immediate`],
822
/// meaning it will immediately exist as a write connection, retrying in the
823
/// case of a [`rusqlite::ErrorCode::DatabaseBusy`] until it failed after 1s.
824
68
pub(crate) fn rw_tx<U, F>(pool: &Pool<SqliteConnectionManager>, op: F) -> Result<U, DatabaseError>
825
68
where
826
68
    F: FnOnce(&Transaction<'_>) -> U,
827
{
828
68
    let mut conn = pool.get()?;
829
68
    conn.execute_batch(GLOBAL_OPTIONS)?;
830
68
    let tx = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?;
831
66
    let res = op(&tx);
832
66
    tx.commit()?;
833
66
    Ok(res)
834
68
}
835

            
836
/// Inserts `data` into store while also compressing it with given encodings.
837
///
838
/// Returns the [`DocumentId`] of `data`.
839
///
840
/// This function inserts `data` into store and also compresses it into all
841
/// given compression formats.
842
///
843
/// Duplicates get re-encoded and replaced in the database, including
844
/// [`ContentEncoding::Identity`].
845
42
pub(crate) fn store_insert<I: Iterator<Item = ContentEncoding>>(
846
42
    tx: &Transaction,
847
42
    data: &[u8],
848
42
    encodings: I,
849
42
) -> Result<DocumentId, DatabaseError> {
850
    // The statement to insert some data into the store.
851
    //
852
    // Parameters:
853
    // :docid - The docid.
854
    // :content - The binary data.
855
42
    let mut store_stmt = tx.prepare_cached(sql!(
856
42
        "
857
42
        INSERT OR REPLACE INTO store (docid, content)
858
42
        VALUES
859
42
        (:docid, :content)
860
42
        "
861
42
    ))?;
862

            
863
    // The statement to insert a compressed document into the metatable.
864
    //
865
    // Parameters:
866
    // :algorithm - The name of the encoding algorithm.
867
    // :identity_docid - The docid of the plain-text document in the store.
868
    // :compressed_docid - The docid of the encoded document in the store.
869
42
    let mut compressed_stmt = tx.prepare_cached(sql!(
870
42
        "
871
42
        INSERT OR REPLACE INTO compressed_document (algorithm, identity_docid, compressed_docid)
872
42
        VALUES
873
42
        (:algorithm, :identity_docid, :compressed_docid)
874
42
        "
875
42
    ))?;
876

            
877
    // Insert the plain document into the store.
878
42
    let identity_docid = DocumentId::digest(data);
879
42
    store_stmt.execute(named_params! {
880
42
        ":docid": identity_docid,
881
42
        ":content": data
882
42
    })?;
883

            
884
    // Compress it into all formats and insert it into store and compressed.
885
162
    for encoding in encodings {
886
120
        if encoding == ContentEncoding::Identity {
887
            // Ignore identity because we inserted that above.
888
24
            continue;
889
96
        }
890

            
891
        // We map a compression error to a bug because there is no good reason
892
        // on why it should fail, given that we compress from memory data to
893
        // memory data.  Probably because it uses the std::io::Writer interface
894
        // which itself demands use of std::io::Result.
895
96
        let compressed = compress(data, encoding).map_err(into_internal!("{encoding} failed?"))?;
896
96
        let compressed_docid = DocumentId::digest(&compressed);
897
96
        store_stmt.execute(named_params! {
898
96
            ":docid": compressed_docid,
899
96
            ":content": compressed,
900
96
        })?;
901
96
        compressed_stmt.execute(named_params! {
902
96
            ":algorithm": encoding.to_string(),
903
96
            ":identity_docid": identity_docid,
904
96
            ":compressed_docid": compressed_docid,
905
96
        })?;
906
    }
907

            
908
42
    Ok(identity_docid)
909
42
}
910

            
911
/// Compresses `data` into a specified [`ContentEncoding`].
912
///
913
/// Returns a [`Vec`] containing the encoded data.
914
106
fn compress(data: &[u8], encoding: ContentEncoding) -> Result<Vec<u8>, std::io::Error> {
915
106
    match encoding {
916
2
        ContentEncoding::Identity => Ok(data.to_vec()),
917
        ContentEncoding::Deflate => {
918
26
            let mut w = DeflateEncoder::new(Vec::new(), Default::default());
919
26
            w.write_all(data)?;
920
26
            w.finish()
921
        }
922
        ContentEncoding::Gzip => {
923
26
            let mut w = GzEncoder::new(Vec::new(), Default::default());
924
26
            w.write_all(data)?;
925
26
            w.finish()
926
        }
927
26
        ContentEncoding::XZstd => zstd::encode_all(data, Default::default()),
928
        ContentEncoding::XTorLzma => {
929
26
            let mut res = Vec::new();
930
26
            lzma_rs::lzma_compress(&mut Cursor::new(data), &mut res)?;
931
26
            Ok(res)
932
        }
933
    }
934
106
}
935

            
936
#[cfg(test)]
937
mod test {
938
    // @@ begin test lint list maintained by maint/add_warning @@
939
    #![allow(clippy::bool_assert_comparison)]
940
    #![allow(clippy::clone_on_copy)]
941
    #![allow(clippy::dbg_macro)]
942
    #![allow(clippy::mixed_attributes_style)]
943
    #![allow(clippy::print_stderr)]
944
    #![allow(clippy::print_stdout)]
945
    #![allow(clippy::single_char_pattern)]
946
    #![allow(clippy::unwrap_used)]
947
    #![allow(clippy::unchecked_time_subtraction)]
948
    #![allow(clippy::useless_vec)]
949
    #![allow(clippy::needless_pass_by_value)]
950
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
951
    use std::{
952
        collections::HashSet,
953
        io::Read,
954
        sync::{Arc, Once},
955
    };
956

            
957
    use flate2::read::{DeflateDecoder, GzDecoder};
958
    use lazy_static::lazy_static;
959
    use rusqlite::Connection;
960
    use strum::IntoEnumIterator;
961
    use tempfile::tempdir;
962
    use tor_basic_utils::test_rng::testing_rng;
963
    use tor_dircommon::config::DirToleranceBuilder;
964
    use tor_llcrypto::pk::rsa::RsaIdentity;
965

            
966
    use super::*;
967

            
968
    lazy_static! {
969
    /// Wed Jan 01 2020 00:00:00 GMT+0000
970
    static ref VALID_AFTER: Timestamp =
971
        (SystemTime::UNIX_EPOCH + Duration::from_secs(1577836800)).into();
972

            
973
    /// Wed Jan 01 2020 01:00:00 GMT+0000
974
    static ref FRESH_UNTIL: Timestamp =
975
        *VALID_AFTER + Duration::from_secs(60 * 60);
976

            
977
    /// Wed Jan 01 2020 02:00:00 GMT+0000
978
    static ref FRESH_UNTIL_HALF: Timestamp =
979
        *FRESH_UNTIL + Duration::from_secs(60 * 60);
980

            
981
    /// Wed Jan 01 2020 03:00:00 GMT+0000
982
    static ref VALID_UNTIL: Timestamp =
983
        *FRESH_UNTIL + Duration::from_secs(60 * 60 * 2);
984
    }
985

            
986
    const CONSENSUS_CONTENT: &str = "Lorem ipsum dolor sit amet.";
987
    const CONSENSUS_MD_CONTENT: &str = "Lorem ipsum dolor sit amet!";
988
    const CERT_CONTENT: &[u8] = include_bytes!("../testdata/authcert-longclaw");
989

            
990
    lazy_static! {
991
        static ref CONSENSUS_DOCID: DocumentId = DocumentId::digest(CONSENSUS_CONTENT.as_bytes());
992
        static ref CONSENSUS_MD_DOCID: DocumentId =
993
            DocumentId::digest(CONSENSUS_MD_CONTENT.as_bytes());
994
        static ref CERT_DOCID: DocumentId = DocumentId::digest(CERT_CONTENT);
995
    }
996

            
997
    fn create_dummy_db() -> Pool<SqliteConnectionManager> {
998
        let pool = open("").unwrap();
999
        rw_tx(&pool, |tx| {
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![*CONSENSUS_DOCID, CONSENSUS_CONTENT.as_bytes()],
            )
            .unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![*CONSENSUS_MD_DOCID, CONSENSUS_MD_CONTENT.as_bytes()],
            )
            .unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![*CERT_DOCID, CERT_CONTENT],
            )
            .unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![
                    DocumentId::digest(include_bytes!("../testdata/descriptor1-ns")),
                    include_bytes!("../testdata/descriptor1-ns")
                ]
            ).unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![
                    DocumentId::digest(include_bytes!("../testdata/descriptor1-extra-info")),
                    include_bytes!("../testdata/descriptor1-extra-info")
                ]
            ).unwrap();
            tx.execute(
                sql!("INSERT INTO store (docid, content) VALUES (?1, ?2)"),
                params![
                    DocumentId::digest(include_bytes!("../testdata/descriptor1-md")),
                    include_bytes!("../testdata/descriptor1-md"),
            ]).unwrap();
            // Insert descriptor into router_extra_info.
            tx.execute(sql!(
                "
                INSERT INTO router_extra_info
                (docid, unsigned_sha1, kp_relay_id_rsa_sha1)
                VALUES (?1, ?2, ?3)
                "
            ), params![
                Sha256::digest(include_bytes!("../testdata/descriptor1-extra-info")),
                Sha1::digest(include_bytes!("../testdata/descriptor1-extra-info-unsigned")),
                "000004ACBB9D29BCBA17256BB35928DDBFC8ABA9",
            ]).unwrap();
            // We only insert descriptor1 here.
            tx.execute(sql!(
                "
                INSERT INTO router_descriptor
                (docid, unsigned_sha1, unsigned_sha2, kp_relay_id_rsa_sha1, flavor, extra_unsigned_sha1)
                VALUES
                (?1, ?2, ?3, ?4, 'ns', ?5)
                "
            ), params![
                DocumentId::digest(include_bytes!("../testdata/descriptor1-ns")),
                Sha1::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha256::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha1::from([0, 0, 4, 172, 187, 157, 41, 188, 186, 23, 37, 107, 179, 89, 40, 221, 191, 200, 171, 169]),
                Sha1::digest(include_bytes!("../testdata/descriptor1-extra-info-unsigned")),
            ]).unwrap();
            // Only insert descriptor1's md
            tx.execute(sql!(
                "
                INSERT INTO router_descriptor
                (docid, unsigned_sha1, unsigned_sha2, kp_relay_id_rsa_sha1, flavor)
                VALUES (?1, ?2, ?3, ?4, 'microdesc')
                "
            ), params![
                DocumentId::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha1::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha256::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha1::from([0, 0, 4, 172, 187, 157, 41, 188, 186, 23, 37, 107, 179, 89, 40, 221, 191, 200, 171, 169]),
            ]).unwrap();
            tx.execute(
                sql!(
                    "
                    INSERT INTO consensus
                    (docid, unsigned_sha3_256, flavor, valid_after, fresh_until, valid_until)
                    VALUES
                    (?1, ?2, ?3, ?4, ?5, ?6)
                    "
                ),
                params![
                    *CONSENSUS_DOCID,
                    "0000000000000000000000000000000000000000000000000000000000000000", // not the correct hash
                    ConsensusFlavor::Plain.name(),
                    *VALID_AFTER,
                    *FRESH_UNTIL,
                    *VALID_UNTIL,
                ],
            )
            .unwrap();
            tx.execute(
                sql!(
                    "
                    INSERT INTO consensus
                    (docid, unsigned_sha3_256, flavor, valid_after, fresh_until, valid_until)
                    VALUES
                    (?1, ?2, ?3, ?4, ?5, ?6)
                    "
                ),
                params![
                    *CONSENSUS_MD_DOCID,
                    "0000000000000000000000000000000000000000000000000000000000000001", // not the correct hash
                    ConsensusFlavor::Microdesc.name(),
                    *VALID_AFTER,
                    *FRESH_UNTIL,
                    *VALID_UNTIL,
                ],
            )
            .unwrap();
            tx.execute(sql!(
                "
                INSERT INTO consensus_router_descriptor_member
                (consensus_docid, unsigned_sha1, unsigned_sha2)
                VALUES
                (?1, ?2, ?3),
                (?1, ?4, ?5)
                "
            ), params![
                *CONSENSUS_DOCID,
                Sha1::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha256::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha1::digest(include_bytes!("../testdata/descriptor2-ns-unsigned")),
                Sha256::digest(include_bytes!("../testdata/descriptor2-ns-unsigned")),
            ]).unwrap();
            tx.execute(sql!(
                "
                INSERT INTO consensus_router_descriptor_member
                (consensus_docid, unsigned_sha1, unsigned_sha2)
                VALUES
                (?1, ?2, ?3),
                (?1, ?4, ?5)
                "
            ), params![
                *CONSENSUS_MD_DOCID,
                Sha1::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha256::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha1::digest(include_bytes!("../testdata/descriptor2-md")),
                Sha256::digest(include_bytes!("../testdata/descriptor2-md")),
            ]).unwrap();
            tx.execute(sql!(
                "
                INSERT INTO authority_key_certificate
                  (docid, kp_auth_id_rsa_sha1, kp_auth_sign_rsa_sha1, dir_key_published, dir_key_expires)
                VALUES
                  (:docid, :id_rsa, :sk_rsa, :published, :expires)
                "
                ),
                named_params! {
                ":docid": *CERT_DOCID,
                ":id_rsa": "49015F787433103580E3B66A1707A00E60F2D15B",
                ":sk_rsa": "C5D153A6F0DA7CC22277D229DCBBF929D0589FE0",
                ":published": 1764543578,
                ":expires": 1772492378,
            }).unwrap();
        })
        .unwrap();
        pool
    }
    #[test]
    fn open_test() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        open(&db_path).unwrap();
        let conn = Connection::open(&db_path).unwrap();
        // Check if the version was initialized properly.
        let version = conn
            .query_one(
                "SELECT version FROM arti_dirserver_schema_version WHERE rowid = 1",
                params![],
                |row| row.get::<_, String>(0),
            )
            .unwrap();
        assert_eq!(version, "1");
        // Set the version to something unknown.
        conn.execute(
            "UPDATE arti_dirserver_schema_version SET version = 42",
            params![],
        )
        .unwrap();
        drop(conn);
        assert_eq!(
            open(&db_path).unwrap_err().to_string(),
            "incompatible schema version: 42"
        );
    }
    #[test]
    fn read_tx_test() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        let pool = open(&db_path).unwrap();
        // Do a write transaction despite forbidden.
        read_tx(&pool, |tx| {
            tx.execute_batch("DELETE FROM arti_dirserver_schema_version")
                .unwrap();
            let e = tx
                .query_one(
                    sql!("SELECT version FROM arti_dirserver_schema_version"),
                    params![],
                    |row| row.get::<_, String>(0),
                )
                .unwrap_err();
            assert_eq!(e, rusqlite::Error::QueryReturnedNoRows);
        })
        .unwrap();
        // Normal check.
        let version: String = read_tx(&pool, |tx| {
            tx.query_one(
                sql!("SELECT version FROM arti_dirserver_schema_version"),
                params![],
                |row| row.get(0),
            )
            .unwrap()
        })
        .unwrap();
        assert_eq!(version, "1");
    }
    #[test]
    fn rw_tx_test() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        let pool = open(&db_path).unwrap();
        // Do a write transaction.
        rw_tx(&pool, |tx| {
            tx.execute_batch("DELETE FROM arti_dirserver_schema_version")
                .unwrap();
        })
        .unwrap();
        // Check that it was deleted.
        read_tx(&pool, |tx| {
            let e = tx
                .query_one(
                    sql!("SELECT version FROM arti_dirserver_schema_version"),
                    params![],
                    |row| row.get::<_, String>(0),
                )
                .unwrap_err();
            assert_eq!(e, rusqlite::Error::QueryReturnedNoRows);
        })
        .unwrap();
    }
    /// Tests whether our SQLite busy error handling works in normal situations.
    ///
    /// A normal situations means a situation where a lock is never held for
    /// more than 1000ms.  In our case, we will work with two threads.
    /// t1 will acquire an exclusive lock and inform t2 about it.  t2 waits
    /// until t1 has acquired this lock and then immediately informs t1, that
    /// it will now wait for a lock too.  Now, t1 will immediately terminate,
    /// thereby releasing the lock and leading t2 to eventually acquire it.
    #[test]
    fn rw_tx_busy_timeout_working() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        let pool = open(db_path).unwrap();
        // t2 will wait on this before it starts doing stuff.
        let t1_acquired_lock = Arc::new(Once::new());
        // t1 will wait on this in order to terminate properly.
        let t2_is_waiting = Arc::new(Once::new());
        let t1 = std::thread::spawn({
            let pool = pool.clone();
            let t1_acquired_lock = t1_acquired_lock.clone();
            let t2_is_waiting = t2_is_waiting.clone();
            move || {
                rw_tx(&pool, move |_tx| {
                    // Inform t2 we have write lock.
                    t1_acquired_lock.call_once(|| ());
                    println!("t1 acquired write lock");
                    // Wait for t2 to start waiting.
                    t2_is_waiting.wait();
                })
                .unwrap();
                println!("t2 released write lock");
            }
        });
        println!("t2 waits for t1 to acquire write lock");
        t1_acquired_lock.wait();
        t2_is_waiting.call_once(|| ());
        rw_tx(&pool, |_| ()).unwrap();
        println!("t2 acquired and released write lock");
        t1.join().unwrap();
    }
    /// Tests whether our SQLite busy error handlings fails as expected.
    ///
    /// We configure SQLite to fail after 1000ms.  This test works with two
    /// threads.  t1 will acquire an exclusive lock on the database and will
    /// inform t2 about it, which itself will wait until t1 has acquired the
    /// lock.  t2 will then immediately try to also obtain an exclusive lock,
    /// which should fail after about 1000ms.  After the failure, t2 informs
    /// t1 that it has failed, causing t1 to terminate.
    #[test]
    fn rw_tx_busy_timeout_busy() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        let pool = open(db_path).unwrap();
        // t2 will wait on this before it starts doing stuff.
        let t1_acquired_lock = Arc::new(Once::new());
        // t1 will wait on this in order to terminate properly.
        let t2_gave_up = Arc::new(Once::new());
        let t1 = std::thread::spawn({
            let pool = pool.clone();
            let t1_acquired_lock = t1_acquired_lock.clone();
            let t2_gave_up = t2_gave_up.clone();
            move || {
                rw_tx(&pool, move |_tx| {
                    // Inform t2 we have the write lock.
                    t1_acquired_lock.call_once(|| ());
                    println!("t1 acquired write lock");
                    // Wait for t2 to give up before we release (how mean from us).
                    t2_gave_up.wait();
                })
                .unwrap();
                println!("t1 released write lock");
            }
        });
        println!("t2 waits for t1 to acquire write lock");
        t1_acquired_lock.wait();
        let e = rw_tx(&pool, |_| ()).unwrap_err();
        assert_eq!(
            e.to_string(),
            "low-level rusqlite error: database is locked"
        );
        println!("t2 gave up on acquiring write lock");
        t2_gave_up.call_once(|| ());
        t1.join().unwrap();
    }
    #[test]
    fn store_insert_test() {
        let db_dir = tempdir().unwrap();
        let db_path = db_dir.path().join("db");
        open(&db_path).unwrap();
        let mut conn = Connection::open(&db_path).unwrap();
        let tx = conn.transaction().unwrap();
        let docid = store_insert(&tx, "foobar".as_bytes(), ContentEncoding::iter()).unwrap();
        assert_eq!(
            docid,
            "C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2"
        );
        let res = tx
            .query_one(
                sql!(
                    "
                    SELECT content
                    FROM store
                    WHERE docid = 'C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2'
                    "
                ),
                params![],
                |row| row.get::<_, Vec<u8>>(0),
            )
            .unwrap();
        assert_eq!(res, "foobar".as_bytes());
        let mut stmt = tx.prepare_cached(sql!(
            "
            SELECT algorithm
            FROM compressed_document
            WHERE identity_docid = 'C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2'
            "
        )).unwrap();
        let algorithms = stmt
            .query_map(params![], |row| row.get::<_, String>(0))
            .unwrap();
        let algorithms = algorithms.map(|x| x.unwrap()).collect::<HashSet<_>>();
        assert_eq!(
            algorithms,
            HashSet::from([
                "deflate".to_string(),
                "gzip".to_string(),
                "x-zstd".to_string(),
                "x-tor-lzma".to_string()
            ])
        );
        // Now insert the same thing a second time again and see whether the
        // ON CONFLICT magic works.
        let docid_second = store_insert(&tx, "foobar".as_bytes(), ContentEncoding::iter()).unwrap();
        assert_eq!(docid, docid_second);
        // Remove a few compressed entries and get them again.
        let n = tx
            .execute(
                sql!(
                    "
                    DELETE FROM
                    compressed_document
                    WHERE algorithm IN ('deflate', 'x-zstd')
                    "
                ),
                params![],
            )
            .unwrap();
        assert_eq!(n, 2);
        let docid_third = store_insert(&tx, "foobar".as_bytes(), ContentEncoding::iter()).unwrap();
        assert_eq!(docid, docid_third);
        let algorithms = stmt
            .query_map(params![], |row| row.get::<_, String>(0))
            .unwrap();
        let algorithms = algorithms.map(|x| x.unwrap()).collect::<HashSet<_>>();
        assert_eq!(
            algorithms,
            HashSet::from([
                "deflate".to_string(),
                "gzip".to_string(),
                "x-zstd".to_string(),
                "x-tor-lzma".to_string()
            ])
        );
    }
    #[test]
    fn compress_test() {
        /// Asserts that `res` contains `encoding`.
        fn contains(encoding: ContentEncoding, res: &[(ContentEncoding, Vec<u8>)]) {
            assert!(res.iter().any(|x| x.0 == encoding));
        }
        const INPUT: &[u8] = "foobar".as_bytes();
        // Check whether everything was encoded.
        let res = ContentEncoding::iter()
            .map(|encoding| (encoding, compress(INPUT, encoding).unwrap()))
            .collect::<Vec<_>>();
        assert_eq!(res.len(), 5);
        contains(ContentEncoding::Identity, &res);
        contains(ContentEncoding::Deflate, &res);
        contains(ContentEncoding::Gzip, &res);
        contains(ContentEncoding::XTorLzma, &res);
        contains(ContentEncoding::XZstd, &res);
        // Check if we can decode it.
        for (encoding, compressed) in res {
            let mut decompressed = Vec::new();
            match encoding {
                ContentEncoding::Identity => decompressed = compressed,
                ContentEncoding::Deflate => {
                    DeflateDecoder::new(Cursor::new(compressed))
                        .read_to_end(&mut decompressed)
                        .unwrap();
                }
                ContentEncoding::Gzip => {
                    GzDecoder::new(Cursor::new(compressed))
                        .read_to_end(&mut decompressed)
                        .unwrap();
                }
                ContentEncoding::XTorLzma => {
                    lzma_rs::lzma_decompress(&mut Cursor::new(compressed), &mut decompressed)
                        .unwrap();
                }
                ContentEncoding::XZstd => {
                    decompressed = zstd::decode_all(Cursor::new(compressed)).unwrap();
                }
            }
            assert_eq!(decompressed, INPUT);
        }
    }
    #[test]
    fn recent_consensus() {
        let pool = create_dummy_db();
        let no_tolerance = DirToleranceBuilder::default()
            .pre_valid_tolerance(Duration::ZERO)
            .post_valid_tolerance(Duration::ZERO)
            .build()
            .unwrap();
        let liberal_tolerance = DirToleranceBuilder::default()
            .pre_valid_tolerance(Duration::from_secs(60 * 60)) // 1h before
            .post_valid_tolerance(Duration::from_secs(60 * 60)) // 1h after
            .build()
            .unwrap();
        read_tx(&pool, move |tx| {
            // Get None by being way before valid-after.
            assert!(ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                SystemTime::UNIX_EPOCH.into(),
            )
            .unwrap()
            .is_none());
            // Get None by being way behind valid-until.
            assert!(ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_UNTIL + Duration::from_secs(60 * 60 * 24 * 365),
            )
            .unwrap()
            .is_none());
            // Get None by being minimally before valid-after.
            assert!(ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_AFTER - Duration::from_secs(1),
            )
            .unwrap()
            .is_none());
            // Get None by being minimally behind valid-until.
            assert!(ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_UNTIL + Duration::from_secs(1),
            )
            .unwrap()
            .is_none());
            // Get a valid consensus by being in the interval.
            let res1 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_AFTER,
            )
            .unwrap()
            .unwrap();
            let res2 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_UNTIL,
            )
            .unwrap()
            .unwrap();
            let res3 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &no_tolerance,
                *VALID_AFTER + Duration::from_secs(60 * 30),
            )
            .unwrap()
            .unwrap();
            assert_eq!(
                res1,
                ConsensusMeta {
                    docid: *CONSENSUS_DOCID,
                    unsigned_sha3_256: Sha3_256::from([0; 32]),
                    flavor: ConsensusFlavor::Plain,
                    valid_after: *VALID_AFTER,
                    fresh_until: *FRESH_UNTIL,
                    valid_until: *VALID_UNTIL,
                }
            );
            assert_eq!(res1, res2);
            assert_eq!(res2, res3);
            // Get a valid consensus using a liberal dir tolerance.
            let res1 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &liberal_tolerance,
                *VALID_AFTER - Duration::from_secs(60 * 30),
            )
            .unwrap()
            .unwrap();
            let res2 = ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &liberal_tolerance,
                *VALID_UNTIL + Duration::from_secs(60 * 30),
            )
            .unwrap()
            .unwrap();
            assert_eq!(
                res1,
                ConsensusMeta {
                    docid: *CONSENSUS_DOCID,
                    unsigned_sha3_256: Sha3_256::from([0; 32]),
                    flavor: ConsensusFlavor::Plain,
                    valid_after: *VALID_AFTER,
                    fresh_until: *FRESH_UNTIL,
                    valid_until: *VALID_UNTIL,
                }
            );
            assert_eq!(res1, res2);
        })
        .unwrap();
    }
    #[test]
    fn sync_timeout() {
        // We repeat the tests a few thousand times to go over many random values.
        let cons = ConsensusMeta {
            docid: *CONSENSUS_DOCID,
            unsigned_sha3_256: Sha3_256::from([0; 32]),
            flavor: ConsensusFlavor::Plain,
            valid_after: *VALID_AFTER,
            fresh_until: *FRESH_UNTIL,
            valid_until: *VALID_UNTIL,
        };
        for _ in 0..10000 {
            let when = cons.lifetime(&mut testing_rng());
            assert!(when >= *FRESH_UNTIL);
            assert!(when <= *FRESH_UNTIL_HALF);
        }
    }
    #[test]
    fn get_auth_cert() {
        let pool = create_dummy_db();
        // Empty.
        let (found, missing) = read_tx(&pool, |tx| {
            AuthCertMeta::query_recent(
                tx,
                &[],
                &DirTolerance::default(),
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1765900013)).into(),
            )
        })
        .unwrap()
        .unwrap();
        assert!(found.is_empty());
        assert!(missing.is_empty());
        // Find one and two missing ones.
        let (found, missing) = read_tx(&pool, |tx| {
            AuthCertMeta::query_recent(
                tx,
                &[
                    // Found one.
                    AuthCertKeyIds {
                        id_fingerprint: RsaIdentity::from_hex(
                            "49015F787433103580E3B66A1707A00E60F2D15B",
                        )
                        .unwrap(),
                        sk_fingerprint: RsaIdentity::from_hex(
                            "C5D153A6F0DA7CC22277D229DCBBF929D0589FE0",
                        )
                        .unwrap(),
                    },
                    // Missing.
                    AuthCertKeyIds {
                        id_fingerprint: RsaIdentity::from_hex(
                            "0000000000000000000000000000000000000000",
                        )
                        .unwrap(),
                        sk_fingerprint: RsaIdentity::from_hex(
                            "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                        )
                        .unwrap(),
                    },
                    // Missing.
                    AuthCertKeyIds {
                        id_fingerprint: RsaIdentity::from_hex(
                            "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                        )
                        .unwrap(),
                        sk_fingerprint: RsaIdentity::from_hex(
                            "0000000000000000000000000000000000000000",
                        )
                        .unwrap(),
                    },
                ],
                &DirTolerance::default(),
                (SystemTime::UNIX_EPOCH + Duration::from_secs(1765900013)).into(),
            )
        })
        .unwrap()
        .unwrap();
        assert_eq!(
            found,
            vec![AuthCertMeta {
                docid: DocumentId::digest(CERT_CONTENT),
                kp_auth_id_rsa_sha1: Sha1::from([
                    73, 1, 95, 120, 116, 51, 16, 53, 128, 227, 182, 106, 23, 7, 160, 14, 96, 242,
                    209, 91
                ]),
                kp_auth_sign_rsa_sha1: Sha1::from([
                    197, 209, 83, 166, 240, 218, 124, 194, 34, 119, 210, 41, 220, 187, 249, 41,
                    208, 88, 159, 224
                ]),
                dir_key_published: (SystemTime::UNIX_EPOCH + Duration::from_secs(1764543578))
                    .into(),
                dir_key_expires: (SystemTime::UNIX_EPOCH + Duration::from_secs(1772492378)).into()
            }]
        );
        assert_eq!(
            missing,
            vec![
                AuthCertKeyIds {
                    id_fingerprint: RsaIdentity::from_hex(
                        "0000000000000000000000000000000000000000",
                    )
                    .unwrap(),
                    sk_fingerprint: RsaIdentity::from_hex(
                        "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                    )
                    .unwrap(),
                },
                AuthCertKeyIds {
                    id_fingerprint: RsaIdentity::from_hex(
                        "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
                    )
                    .unwrap(),
                    sk_fingerprint: RsaIdentity::from_hex(
                        "0000000000000000000000000000000000000000",
                    )
                    .unwrap(),
                }
            ]
        );
    }
    #[test]
    fn missing_server_descriptors() {
        let pool = create_dummy_db();
        let meta = read_tx(&pool, |tx| {
            ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &DirTolerance::default(),
                *VALID_AFTER,
            )
        })
        .unwrap()
        .unwrap()
        .unwrap();
        // Only one should be returned.
        let missing_servers = read_tx(&pool, |tx| meta.missing_servers(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_servers,
            HashSet::from([Sha1::digest(include_bytes!(
                "../testdata/descriptor2-ns-unsigned"
            ))])
        );
        // If we delete all router descriptors we have, we should get both.
        rw_tx(&pool, |tx| {
            tx.execute(sql!("DELETE FROM router_descriptor"), params![])
        })
        .unwrap()
        .unwrap();
        // Now both should be returned
        let missing_servers = read_tx(&pool, |tx| meta.missing_servers(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_servers,
            HashSet::from([
                Sha1::digest(include_bytes!("../testdata/descriptor1-ns-unsigned")),
                Sha1::digest(include_bytes!("../testdata/descriptor2-ns-unsigned"))
            ])
        );
    }
    #[test]
    fn missing_extra_infos() {
        let pool = create_dummy_db();
        let meta = read_tx(&pool, |tx| {
            ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Plain,
                &DirTolerance::default(),
                *VALID_AFTER,
            )
        })
        .unwrap()
        .unwrap()
        .unwrap();
        // We should have no missing extra-infos.
        // Technically extra-info of the second relay is missing too, but we
        // cannot know that.
        let missing_extras = read_tx(&pool, |tx| meta.missing_extras(tx))
            .unwrap()
            .unwrap();
        assert!(missing_extras.is_empty());
        // Now delete the record of router_extra_info.
        pool.get()
            .unwrap()
            .execute(sql!("DELETE FROM router_extra_info"), params![])
            .unwrap();
        // Now we should get a single missing extra-info.
        let missing_extras = read_tx(&pool, |tx| meta.missing_extras(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_extras,
            HashSet::from([Sha1::digest(include_bytes!(
                "../testdata/descriptor1-extra-info-unsigned"
            ))])
        );
    }
    #[test]
    fn missing_micro_descriptors() {
        let pool = create_dummy_db();
        let meta = read_tx(&pool, |tx| {
            ConsensusMeta::query_recent(
                tx,
                ConsensusFlavor::Microdesc,
                &DirTolerance::default(),
                *VALID_AFTER,
            )
        })
        .unwrap()
        .unwrap()
        .unwrap();
        // Only one should be returned.
        let missing_micros = read_tx(&pool, |tx| meta.missing_micros(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_micros,
            HashSet::from([Sha256::digest(include_bytes!("../testdata/descriptor2-md"))])
        );
        // If we delete all router descriptors we have, we should get both.
        rw_tx(&pool, |tx| {
            tx.execute(sql!("DELETE FROM router_descriptor"), params![])
        })
        .unwrap()
        .unwrap();
        // Now both should be returned
        let missing_servers = read_tx(&pool, |tx| meta.missing_micros(tx))
            .unwrap()
            .unwrap();
        assert_eq!(
            missing_servers,
            HashSet::from([
                Sha256::digest(include_bytes!("../testdata/descriptor1-md")),
                Sha256::digest(include_bytes!("../testdata/descriptor2-md"))
            ])
        );
    }
}