1
//! Net document storage backed by sqlite3.
2
//!
3
//! We store most objects in sqlite tables, except for very large ones,
4
//! which we store as "blob" files in a separate directory.
5

            
6
use super::ExpirationConfig;
7
use crate::docmeta::{AuthCertMeta, ConsensusMeta};
8
use crate::err::ReadOnlyStorageError;
9
use crate::storage::{InputString, Store};
10
use crate::{Error, Result};
11

            
12
use fs_mistrust::CheckedDir;
13
use tor_basic_utils::PathExt as _;
14
use tor_error::{into_internal, warn_report};
15
use tor_netdoc::doc::authcert::AuthCertKeyIds;
16
use tor_netdoc::doc::microdesc::MdDigest;
17
use tor_netdoc::doc::netstatus::{ConsensusFlavor, Lifetime};
18
#[cfg(feature = "routerdesc")]
19
use tor_netdoc::doc::routerdesc::RdDigest;
20
use web_time_compat::SystemTimeExt;
21

            
22
#[cfg(feature = "bridge-client")]
23
pub(crate) use {crate::storage::CachedBridgeDescriptor, tor_guardmgr::bridge::BridgeConfig};
24

            
25
use std::collections::{HashMap, HashSet};
26
use std::fs::OpenOptions;
27
use std::path::{Path, PathBuf};
28
use std::result::Result as StdResult;
29
use std::sync::Arc;
30
use std::time::SystemTime;
31

            
32
use rusqlite::{OpenFlags, OptionalExtension, Transaction, params};
33
use time::OffsetDateTime;
34
use tracing::{trace, warn};
35

            
36
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
37
use fslock::LockFile;
38

            
39
/// Local directory cache using a Sqlite3 connection.
40
pub(crate) struct SqliteStore {
41
    /// Connection to the sqlite3 database.
42
    conn: rusqlite::Connection,
43
    /// Location for the sqlite3 database; used to reopen it.
44
    sql_path: Option<PathBuf>,
45
    /// Location to store blob files.
46
    blob_dir: CheckedDir,
47
    /// Lockfile to prevent concurrent write attempts from different
48
    /// processes.
49
    ///
50
    /// If this is None we aren't using a lockfile.  Watch out!
51
    ///
52
    /// (sqlite supports that with connection locking, but we want to
53
    /// be a little more coarse-grained here)
54
    lockfile: Option<LockFile>,
55
}
56

            
57
/// Wasm-only: a non-implementation of LockFile.
58
///
59
/// TODO #2106 -- remove this when we migrate to use File::lock.
60
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
61
struct LockFile(void::Void);
62

            
63
#[allow(clippy::missing_docs_in_private_items)]
64
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
65
impl LockFile {
66
    fn open<P: AsRef<Path>>(_path: P) -> std::io::Result<Self> {
67
        Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
68
    }
69

            
70
    fn try_lock(&mut self) -> std::io::Result<bool> {
71
        void::unreachable(self.0)
72
    }
73

            
74
    fn unlock(&mut self) -> std::io::Result<()> {
75
        void::unreachable(self.0)
76
    }
77

            
78
    fn owns_lock(&self) -> bool {
79
        void::unreachable(self.0)
80
    }
81
}
82

            
83
/// # Some notes on blob consistency, and the lack thereof.
84
///
85
/// We store large documents (currently, consensuses) in separate files,
86
/// called "blobs",
87
/// outside of the sqlite database.
88
/// We do this for performance reasons: for large objects,
89
/// mmap is far more efficient than sqlite in RAM and CPU.
90
///
91
/// In the sqlite database, we keep track of our blobs
92
/// using the ExtDocs table.
93
/// This scheme makes it possible for the blobs and the table
94
/// get out of sync.
95
///
96
/// In summary:
97
///   - _Vanished_ blobs (ones present only in ExtDocs) are possible;
98
///     we try to tolerate them.
99
///   - _Orphaned_ blobs (ones present only on the disk) are possible;
100
///     we try to tolerate them.
101
///   - _Corrupted_ blobs (ones with the wrong contents) are possible
102
///     but (we hope) unlikely;
103
///     we do not currently try to tolerate them.
104
///
105
/// In more detail:
106
///
107
/// Here are the practices we use when _writing_ blobs:
108
///
109
/// - We always create a blob before updating the ExtDocs table,
110
///   and remove an entry from the ExtDocs before deleting the blob.
111
/// - If we decide to roll back the transaction that adds the row to ExtDocs,
112
///   we delete the blob after doing so.
113
/// - We use [`CheckedDir::write_and_replace`] to store blobs,
114
///   so a half-formed blob shouldn't be common.
115
///   (We assume that "close" and "rename" are serialized by the OS,
116
///   so that _if_ the rename happens, the file is completely written.)
117
/// - Blob filenames include a digest of the file contents,
118
///   so collisions are unlikely.
119
///
120
/// Here are the practices we use when _deleting_ blobs:
121
/// - First, we drop the row from the ExtDocs table.
122
///   Only then do we delete the file.
123
///
124
/// These practices can result in _orphaned_ blobs
125
/// (ones with no row in the ExtDoc table),
126
/// or in _half-written_ blobs files with tempfile names
127
/// (which also have no row in the ExtDoc table).
128
/// This happens if we crash at the wrong moment.
129
/// Such blobs can be safely removed;
130
/// we do so in [`SqliteStore::remove_unreferenced_blobs`].
131
///
132
/// Despite our efforts, _vanished_ blobs
133
/// (entries in the ExtDoc table with no corresponding file)
134
/// are also possible.  They could happen for these reasons:
135
/// - The filesystem might not serialize or sync things in a way that's
136
///   consistent with the DB.
137
/// - An automatic process might remove random cache files.
138
/// - The user might run around deleting things to free space.
139
///
140
/// We try to tolerate vanished blobs.
141
///
142
/// _Corrupted_ blobs are also possible.  They can happen on FS corruption,
143
/// or on somebody messing around with the cache directory manually.
144
/// We do not attempt to tolerate corrupted blobs.
145
///
146
/// ## On trade-offs
147
///
148
/// TODO: The practices described above are more likely
149
/// to create _orphaned_ blobs than _vanished_ blobs.
150
/// We initially made this trade-off decision on the mistaken theory
151
/// that we could avoid vanished blobs entirely.
152
/// We _may_ want to revisit this choice,
153
/// on the rationale that we can respond to vanished blobs as soon as we notice they're gone,
154
/// whereas we can only handle orphaned blobs with a periodic cleanup.
155
/// On the other hand, since we need to handle both cases,
156
/// it may not matter very much in practice.
157
#[allow(unused)]
158
mod blob_consistency {}
159

            
160
/// Specific error returned when a blob will not be read.
161
///
162
/// This error is an internal type: it's never returned to the user.
163
#[derive(Debug)]
164
enum AbsentBlob {
165
    /// We did not find a blob file on the disk.
166
    VanishedFile,
167
    /// We did not even find a blob to read in ExtDocs.
168
    NothingToRead,
169
}
170

            
171
impl SqliteStore {
172
    /// Construct or open a new SqliteStore at some location on disk.
173
    /// The provided location must be a directory, or a possible
174
    /// location for a directory: the directory will be created if
175
    /// necessary.
176
    ///
177
    /// If readonly is true, the result will be a read-only store.
178
    /// Otherwise, when readonly is false, the result may be
179
    /// read-only or read-write, depending on whether we can acquire
180
    /// the lock.
181
    ///
182
    /// # Limitations:
183
    ///
184
    /// The file locking that we use to ensure that only one dirmgr is
185
    /// writing to a given storage directory at a time is currently
186
    /// _per process_. Therefore, you might get unexpected results if
187
    /// two SqliteStores are created in the same process with the
188
    /// path.
189
411
    pub(crate) fn from_path_and_mistrust<P: AsRef<Path>>(
190
411
        path: P,
191
411
        mistrust: &fs_mistrust::Mistrust,
192
411
        mut readonly: bool,
193
411
    ) -> Result<Self> {
194
411
        let path = path.as_ref();
195
411
        let sqlpath = path.join("dir.sqlite3");
196
411
        let blobpath = path.join("dir_blobs/");
197
411
        let lockpath = path.join("dir.lock");
198

            
199
411
        let verifier = mistrust.verifier().permit_readable().check_content();
200

            
201
411
        let blob_dir = if readonly {
202
4
            verifier.secure_dir(blobpath)?
203
        } else {
204
407
            verifier.make_secure_dir(blobpath)?
205
        };
206

            
207
        // Check permissions on the sqlite and lock files; don't require them to
208
        // exist.
209
818
        for p in [&lockpath, &sqlpath] {
210
818
            match mistrust
211
818
                .verifier()
212
818
                .permit_readable()
213
818
                .require_file()
214
818
                .check(p)
215
            {
216
818
                Ok(()) | Err(fs_mistrust::Error::NotFound(_)) => {}
217
                Err(e) => return Err(e.into()),
218
            }
219
        }
220

            
221
409
        let mut lockfile = LockFile::open(&lockpath).map_err(Error::from_lockfile)?;
222
409
        if !readonly && !lockfile.try_lock().map_err(Error::from_lockfile)? {
223
            readonly = true; // we couldn't get the lock!
224
409
        };
225
409
        let flags = if readonly {
226
2
            OpenFlags::SQLITE_OPEN_READ_ONLY
227
        } else {
228
407
            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
229
        };
230
409
        let conn = rusqlite::Connection::open_with_flags(&sqlpath, flags)?;
231
409
        let mut store = SqliteStore::from_conn_internal(conn, blob_dir, readonly)?;
232
409
        store.sql_path = Some(sqlpath);
233
409
        store.lockfile = Some(lockfile);
234
409
        Ok(store)
235
411
    }
236

            
237
    /// Construct a new SqliteStore from a database connection and a location
238
    /// for blob files.
239
    ///
240
    /// Used for testing with a memory-backed database.
241
    ///
242
    /// Note: `blob_dir` must not be used for anything other than storing the blobs associated with
243
    /// this database, since we will freely remove unreferenced files from this directory.
244
    #[cfg(test)]
245
44
    fn from_conn(conn: rusqlite::Connection, blob_dir: CheckedDir) -> Result<Self> {
246
44
        Self::from_conn_internal(conn, blob_dir, false)
247
44
    }
248

            
249
    /// Construct a new SqliteStore from a database connection and a location
250
    /// for blob files.
251
    ///
252
    /// The `readonly` argument specifies whether the database connection should be read-only.
253
453
    fn from_conn_internal(
254
453
        conn: rusqlite::Connection,
255
453
        blob_dir: CheckedDir,
256
453
        readonly: bool,
257
453
    ) -> Result<Self> {
258
        // sqlite (as of Jun 2024) does not enforce foreign keys automatically unless you set this
259
        // pragma on the connection.
260
453
        conn.pragma_update(None, "foreign_keys", "ON")?;
261

            
262
453
        let mut result = SqliteStore {
263
453
            conn,
264
453
            blob_dir,
265
453
            lockfile: None,
266
453
            sql_path: None,
267
453
        };
268

            
269
453
        result.check_schema(readonly)?;
270

            
271
451
        Ok(result)
272
453
    }
273

            
274
    /// Check whether this database has a schema format we can read, and
275
    /// install or upgrade the schema if necessary.
276
453
    fn check_schema(&mut self, readonly: bool) -> Result<()> {
277
453
        let tx = self.conn.transaction()?;
278
453
        let db_n_tables: u32 = tx.query_row(
279
453
            "SELECT COUNT(name) FROM sqlite_master
280
453
             WHERE type='table'
281
453
             AND name NOT LIKE 'sqlite_%'",
282
453
            [],
283
453
            |row| row.get(0),
284
        )?;
285
453
        let db_exists = db_n_tables > 0;
286

            
287
        // Update the schema from current_vsn to the latest (does not commit)
288
494
        let update_schema = |tx: &rusqlite::Transaction, current_vsn| {
289
1335
            for (from_vsn, update) in UPDATE_SCHEMA.iter().enumerate() {
290
1335
                let from_vsn = u32::try_from(from_vsn).expect("schema version >2^32");
291
1335
                let new_vsn = from_vsn + 1;
292
1335
                if current_vsn < new_vsn {
293
1335
                    tx.execute_batch(update)?;
294
1335
                    tx.execute(UPDATE_SCHEMA_VERSION, params![new_vsn, new_vsn])?;
295
                }
296
            }
297
445
            Ok::<_, Error>(())
298
445
        };
299

            
300
453
        if !db_exists {
301
445
            if !readonly {
302
445
                tx.execute_batch(INSTALL_V0_SCHEMA)?;
303
445
                update_schema(&tx, 0)?;
304
445
                tx.commit()?;
305
            } else {
306
                // The other process should have created the database!
307
                return Err(Error::ReadOnlyStorage(ReadOnlyStorageError::NoDatabase));
308
            }
309
445
            return Ok(());
310
8
        }
311

            
312
8
        let (version, readable_by): (u32, u32) = tx.query_row(
313
8
            "SELECT version, readable_by FROM TorSchemaMeta
314
8
             WHERE name = 'TorDirStorage'",
315
8
            [],
316
8
            |row| Ok((row.get(0)?, row.get(1)?)),
317
        )?;
318

            
319
8
        if version < SCHEMA_VERSION {
320
            if !readonly {
321
                update_schema(&tx, version)?;
322
                tx.commit()?;
323
            } else {
324
                return Err(Error::ReadOnlyStorage(
325
                    ReadOnlyStorageError::IncompatibleSchema {
326
                        schema: version,
327
                        supported: SCHEMA_VERSION,
328
                    },
329
                ));
330
            }
331

            
332
            return Ok(());
333
8
        } else if readable_by > SCHEMA_VERSION {
334
2
            return Err(Error::UnrecognizedSchema {
335
2
                schema: readable_by,
336
2
                supported: SCHEMA_VERSION,
337
2
            });
338
6
        }
339

            
340
        // rolls back the transaction, but nothing was done.
341
6
        Ok(())
342
453
    }
343

            
344
    /// Read a blob from disk, mapping it if possible.
345
    ///
346
    /// Return `Ok(Err(.))` if the file for the blob was not found on disk;
347
    /// returns an error in other cases.
348
    ///
349
    /// (See [`blob_consistency`] for information on why the blob might be absent.)
350
22
    fn read_blob(&self, path: &str) -> Result<StdResult<InputString, AbsentBlob>> {
351
22
        let file = match self.blob_dir.open(path, OpenOptions::new().read(true)) {
352
22
            Ok(file) => file,
353
            Err(fs_mistrust::Error::NotFound(_)) => {
354
                warn!(
355
                    "{:?} was listed in the database, but its corresponding file had been deleted",
356
                    path
357
                );
358
                return Ok(Err(AbsentBlob::VanishedFile));
359
            }
360
            Err(e) => return Err(e.into()),
361
        };
362

            
363
22
        InputString::load(file)
364
22
            .map_err(|err| Error::CacheFile {
365
                action: "loading",
366
                fname: PathBuf::from(path),
367
                error: Arc::new(err),
368
            })
369
22
            .map(Ok)
370
22
    }
371

            
372
    /// Write a file to disk as a blob, and record it in the ExtDocs table.
373
    ///
374
    /// Return a SavedBlobHandle that describes where the blob is, and which
375
    /// can be used either to commit the blob or delete it.
376
    ///
377
    /// See [`blob_consistency`] for more information on guarantees.
378
34
    fn save_blob_internal(
379
34
        &mut self,
380
34
        contents: &[u8],
381
34
        doctype: &str,
382
34
        digest_type: &str,
383
34
        digest: &[u8],
384
34
        expires: OffsetDateTime,
385
34
    ) -> Result<blob_handle::SavedBlobHandle<'_>> {
386
34
        let digest = hex::encode(digest);
387
34
        let digeststr = format!("{}-{}", digest_type, digest);
388
34
        let fname = format!("{}_{}", doctype, digeststr);
389

            
390
34
        let full_path = self.blob_dir.join(&fname)?;
391
34
        let unlinker = blob_handle::Unlinker::new(&full_path);
392
34
        self.blob_dir
393
34
            .write_and_replace(&fname, contents)
394
34
            .map_err(|e| match e {
395
                fs_mistrust::Error::Io { err, .. } => Error::CacheFile {
396
                    action: "saving",
397
                    fname: full_path,
398
                    error: err,
399
                },
400
                err => err.into(),
401
            })?;
402

            
403
34
        let tx = self.conn.unchecked_transaction()?;
404
34
        tx.execute(INSERT_EXTDOC, params![digeststr, expires, doctype, fname])?;
405

            
406
34
        Ok(blob_handle::SavedBlobHandle::new(
407
34
            tx, fname, digeststr, unlinker,
408
34
        ))
409
34
    }
410

            
411
    /// As `latest_consensus`, but do not retry.
412
16
    fn latest_consensus_internal(
413
16
        &self,
414
16
        flavor: ConsensusFlavor,
415
16
        pending: Option<bool>,
416
16
    ) -> Result<StdResult<InputString, AbsentBlob>> {
417
16
        trace!(?flavor, ?pending, "Loading latest consensus from cache");
418
16
        let rv: Option<(OffsetDateTime, OffsetDateTime, String)> = match pending {
419
12
            None => self
420
12
                .conn
421
16
                .query_row(FIND_CONSENSUS, params![flavor.name()], |row| row.try_into())
422
12
                .optional()?,
423
4
            Some(pending_val) => self
424
4
                .conn
425
4
                .query_row(
426
4
                    FIND_CONSENSUS_P,
427
4
                    params![pending_val, flavor.name()],
428
2
                    |row| row.try_into(),
429
                )
430
4
                .optional()?,
431
        };
432

            
433
16
        if let Some((_va, _vu, filename)) = rv {
434
            // TODO blobs: If the cache is inconsistent (because this blob is _vanished_), and the cache has not yet
435
            // been cleaned, this may fail to find the latest consensus that we actually have.
436
10
            self.read_blob(&filename)
437
        } else {
438
6
            Ok(Err(AbsentBlob::NothingToRead))
439
        }
440
16
    }
441

            
442
    /// Save a blob to disk and commit it.
443
    #[cfg(test)]
444
24
    fn save_blob(
445
24
        &mut self,
446
24
        contents: &[u8],
447
24
        doctype: &str,
448
24
        digest_type: &str,
449
24
        digest: &[u8],
450
24
        expires: OffsetDateTime,
451
24
    ) -> Result<String> {
452
24
        let h = self.save_blob_internal(contents, doctype, digest_type, digest, expires)?;
453
24
        let fname = h.fname().to_string();
454
24
        h.commit()?;
455
24
        Ok(fname)
456
24
    }
457

            
458
    /// Return the valid-after time for the latest non non-pending consensus,
459
    #[cfg(test)]
460
    // We should revise the tests to use latest_consensus_meta instead.
461
6
    fn latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>> {
462
6
        Ok(self
463
6
            .latest_consensus_meta(flavor)?
464
7
            .map(|m| m.lifetime().valid_after().into()))
465
6
    }
466

            
467
    /// Remove the blob with name `fname`, but do not give an error on failure.
468
    ///
469
    /// See [`blob_consistency`]: we should call this only having first ensured
470
    /// that the blob is removed from the ExtDocs table.
471
2
    fn remove_blob_or_warn<P: AsRef<Path>>(&self, fname: P) {
472
2
        let fname = fname.as_ref();
473
2
        if let Err(e) = self.blob_dir.remove_file(fname) {
474
            warn_report!(e, "Unable to remove {}", fname.display_lossy());
475
2
        }
476
2
    }
477

            
478
    /// Delete any blob files that are old enough, and not mentioned in the ExtDocs table.
479
    ///
480
    /// There shouldn't typically be any, but we don't want to let our cache grow infinitely
481
    /// if we have a bug.
482
10
    fn remove_unreferenced_blobs(
483
10
        &self,
484
10
        now: OffsetDateTime,
485
10
        expiration: &ExpirationConfig,
486
10
    ) -> Result<()> {
487
        // Now, look for any unreferenced blobs that are a bit old.
488
10
        for ent in self.blob_dir.read_directory(".")?.flatten() {
489
8
            let md_error = |io_error| Error::CacheFile {
490
                action: "getting metadata",
491
                fname: ent.file_name().into(),
492
                error: Arc::new(io_error),
493
            };
494
8
            if ent
495
8
                .metadata()
496
8
                .map_err(md_error)?
497
8
                .modified()
498
8
                .map_err(md_error)?
499
8
                + expiration.consensuses
500
8
                >= now
501
            {
502
                // this file is sufficiently recent that we should not remove it, just to be cautious.
503
6
                continue;
504
2
            }
505
2
            let filename = match ent.file_name().into_string() {
506
2
                Ok(s) => s,
507
                Err(os_str) => {
508
                    // This filename wasn't utf-8.  We will never create one of these.
509
                    warn!(
510
                        "Removing bizarre file '{}' from blob store.",
511
                        os_str.to_string_lossy()
512
                    );
513
                    self.remove_blob_or_warn(ent.file_name());
514
                    continue;
515
                }
516
            };
517
2
            let found: (u32,) =
518
2
                self.conn
519
3
                    .query_row(COUNT_EXTDOC_BY_PATH, params![&filename], |row| {
520
2
                        row.try_into()
521
2
                    })?;
522
2
            if found == (0,) {
523
2
                warn!("Removing unreferenced file '{}' from blob store", &filename);
524
2
                self.remove_blob_or_warn(ent.file_name());
525
            }
526
        }
527

            
528
10
        Ok(())
529
10
    }
530

            
531
    /// Remove any entry in the ExtDocs table for which a blob file is vanished.
532
    ///
533
    /// This method is `O(n)` in the size of the ExtDocs table and the size of the directory.
534
    /// It doesn't take self, to avoid problems with the borrow checker.
535
2
    fn remove_entries_for_vanished_blobs<'a>(
536
2
        blob_dir: &CheckedDir,
537
2
        tx: &Transaction<'a>,
538
2
    ) -> Result<usize> {
539
2
        let in_directory: HashSet<PathBuf> = blob_dir
540
2
            .read_directory(".")?
541
2
            .flatten()
542
9
            .map(|dir_entry| PathBuf::from(dir_entry.file_name()))
543
2
            .collect();
544
2
        let in_db: Vec<String> = tx
545
2
            .prepare(FIND_ALL_EXTDOC_FILENAMES)?
546
17
            .query_map([], |row| row.get::<_, String>(0))?
547
2
            .collect::<StdResult<Vec<String>, _>>()?;
548

            
549
2
        let mut n_removed = 0;
550
18
        for fname in in_db {
551
16
            if in_directory.contains(Path::new(&fname)) {
552
                // The blob is present; great!
553
8
                continue;
554
8
            }
555

            
556
8
            n_removed += tx.execute(DELETE_EXTDOC_BY_FILENAME, [fname])?;
557
        }
558

            
559
2
        Ok(n_removed)
560
2
    }
561
}
562

            
563
impl Store for SqliteStore {
564
38
    fn is_readonly(&self) -> bool {
565
38
        match &self.lockfile {
566
14
            Some(f) => !f.owns_lock(),
567
24
            None => false,
568
        }
569
38
    }
570
4
    fn upgrade_to_readwrite(&mut self) -> Result<bool> {
571
4
        let Some(sql_path) = self.sql_path.as_ref() else {
572
            return Ok(true);
573
        };
574

            
575
4
        if self.is_readonly() {
576
2
            let lf = self
577
2
                .lockfile
578
2
                .as_mut()
579
2
                .expect("No lockfile open; cannot upgrade to read-write storage");
580
2
            if !lf.try_lock().map_err(Error::from_lockfile)? {
581
                // Somebody else has the lock.
582
                return Ok(false);
583
2
            }
584
2
            match rusqlite::Connection::open(sql_path) {
585
2
                Ok(conn) => {
586
2
                    self.conn = conn;
587
2
                }
588
                Err(e) => {
589
                    if let Err(e2) = lf.unlock() {
590
                        warn_report!(
591
                            e2,
592
                            "Unable to release lock file while upgrading DB to read/write"
593
                        );
594
                    }
595
                    return Err(e.into());
596
                }
597
            }
598
2
        }
599
4
        Ok(true)
600
4
    }
601
8
    fn expire_all(&mut self, expiration: &ExpirationConfig) -> Result<()> {
602
8
        let tx = self.conn.transaction()?;
603
        // This works around a false positive; see
604
        //   https://github.com/rust-lang/rust-clippy/issues/8114
605
        #[allow(clippy::let_and_return)]
606
8
        let expired_blobs: Vec<String> = {
607
8
            let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
608
8
            let names: Vec<String> = stmt
609
9
                .query_map([], |row| row.get::<_, String>(0))?
610
8
                .collect::<StdResult<Vec<String>, _>>()?;
611
8
            names
612
        };
613

            
614
8
        let now = now_utc();
615
8
        tx.execute(DROP_OLD_EXTDOCS, [])?;
616

            
617
        // In theory bad system clocks might generate table rows with times far in the future.
618
        // However, for data which is cached here which comes from the network consensus,
619
        // we rely on the fact that no consensus from the future exists, so this can't happen.
620
8
        tx.execute(DROP_OLD_MICRODESCS, [now - expiration.microdescs])?;
621
8
        tx.execute(DROP_OLD_AUTHCERTS, [now - expiration.authcerts])?;
622
8
        tx.execute(DROP_OLD_CONSENSUSES, [now - expiration.consensuses])?;
623
8
        tx.execute(DROP_OLD_ROUTERDESCS, [now - expiration.router_descs])?;
624

            
625
        // Bridge descriptors come from bridges and bridges might send crazy times,
626
        // so we need to discard any that look like they are from the future,
627
        // since otherwise wrong far-future timestamps might live in our DB indefinitely.
628
        #[cfg(feature = "bridge-client")]
629
8
        tx.execute(DROP_OLD_BRIDGEDESCS, [now, now])?;
630

            
631
        // Find all consensus blobs that are no longer referenced,
632
        // and delete their entries from extdocs.
633
8
        let remove_consensus_blobs = {
634
            // TODO: This query can be O(n); but that won't matter for clients.
635
            // For relays, we may want to add an index to speed it up, if we use this code there too.
636
8
            let mut stmt = tx.prepare(FIND_UNREFERENCED_CONSENSUS_EXTDOCS)?;
637
8
            let filenames: Vec<String> = stmt
638
9
                .query_map([], |row| row.get::<_, String>(0))?
639
8
                .collect::<StdResult<Vec<String>, _>>()?;
640
8
            drop(stmt);
641
8
            let mut stmt = tx.prepare(DELETE_EXTDOC_BY_FILENAME)?;
642
8
            for fname in filenames.iter() {
643
2
                stmt.execute([fname])?;
644
            }
645
8
            filenames
646
        };
647

            
648
8
        tx.commit()?;
649
        // Now that the transaction has been committed, these blobs are
650
        // unreferenced in the ExtDocs table, and we can remove them from disk.
651
8
        let mut remove_blob_files: HashSet<_> = expired_blobs.iter().collect();
652
8
        remove_blob_files.extend(remove_consensus_blobs.iter());
653

            
654
12
        for name in remove_blob_files {
655
4
            let fname = self.blob_dir.join(name);
656
4
            if let Ok(fname) = fname {
657
4
                if let Err(e) = std::fs::remove_file(&fname) {
658
                    warn_report!(
659
                        e,
660
                        "Couldn't remove orphaned blob file {}",
661
                        fname.display_lossy()
662
                    );
663
4
                }
664
            }
665
        }
666

            
667
8
        self.remove_unreferenced_blobs(now, expiration)?;
668

            
669
8
        Ok(())
670
8
    }
671

            
672
    // Note: We cannot, and do not, call this function when a transaction already exists.
673
16
    fn latest_consensus(
674
16
        &self,
675
16
        flavor: ConsensusFlavor,
676
16
        pending: Option<bool>,
677
16
    ) -> Result<Option<InputString>> {
678
16
        match self.latest_consensus_internal(flavor, pending)? {
679
10
            Ok(s) => return Ok(Some(s)),
680
6
            Err(AbsentBlob::NothingToRead) => return Ok(None),
681
            Err(AbsentBlob::VanishedFile) => {
682
                // If we get here, the file was vanished.  Clean up the DB and try again.
683
            }
684
        }
685

            
686
        // We use unchecked_transaction() here because this API takes a non-mutable `SqliteStore`.
687
        // `unchecked_transaction()` will give an error if it is used
688
        // when a transaction already exists.
689
        // That's fine: We don't call this function from inside this module,
690
        // when a transaction might exist,
691
        // and we can't call multiple SqliteStore functions at once: it isn't sync.
692
        // Here we enforce that:
693
        static_assertions::assert_not_impl_any!(SqliteStore: Sync);
694

            
695
        // If we decide that this is unacceptable,
696
        // then since sqlite doesn't really support concurrent use of a connection,
697
        // we _could_ change the Store::latest_consensus API take &mut self,
698
        // or we could add a mutex,
699
        // or we could just not use a transaction object.
700
        let tx = self.conn.unchecked_transaction()?;
701
        Self::remove_entries_for_vanished_blobs(&self.blob_dir, &tx)?;
702
        tx.commit()?;
703

            
704
        match self.latest_consensus_internal(flavor, pending)? {
705
            Ok(s) => Ok(Some(s)),
706
            Err(AbsentBlob::NothingToRead) => Ok(None),
707
            Err(AbsentBlob::VanishedFile) => {
708
                warn!("Somehow remove_entries_for_vanished_blobs didn't resolve a VanishedFile");
709
                Ok(None)
710
            }
711
        }
712
16
    }
713

            
714
14
    fn latest_consensus_meta(&self, flavor: ConsensusFlavor) -> Result<Option<ConsensusMeta>> {
715
14
        let mut stmt = self.conn.prepare(FIND_LATEST_CONSENSUS_META)?;
716
14
        let mut rows = stmt.query(params![flavor.name()])?;
717
14
        if let Some(row) = rows.next()? {
718
6
            Ok(Some(cmeta_from_row(row)?))
719
        } else {
720
8
            Ok(None)
721
        }
722
14
    }
723
    #[cfg(test)]
724
4
    fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
725
2
        if let Some((text, _)) =
726
4
            self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
727
        {
728
2
            Ok(text)
729
        } else {
730
2
            Err(Error::CacheCorruption(
731
2
                "couldn't find a consensus we thought we had.",
732
2
            ))
733
        }
734
4
    }
735
16
    fn consensus_by_sha3_digest_of_signed_part(
736
16
        &self,
737
16
        d: &[u8; 32],
738
16
    ) -> Result<Option<(InputString, ConsensusMeta)>> {
739
16
        let digest = hex::encode(d);
740
16
        let mut stmt = self
741
16
            .conn
742
16
            .prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
743
16
        let mut rows = stmt.query(params![digest])?;
744
16
        if let Some(row) = rows.next()? {
745
10
            let meta = cmeta_from_row(row)?;
746
10
            let fname: String = row.get(5)?;
747
10
            if let Ok(text) = self.read_blob(&fname)? {
748
10
                return Ok(Some((text, meta)));
749
            }
750
6
        }
751
6
        Ok(None)
752
16
    }
753
10
    fn store_consensus(
754
10
        &mut self,
755
10
        cmeta: &ConsensusMeta,
756
10
        flavor: ConsensusFlavor,
757
10
        pending: bool,
758
10
        contents: &str,
759
10
    ) -> Result<()> {
760
10
        let lifetime = cmeta.lifetime();
761
10
        let sha3_of_signed = cmeta.sha3_256_of_signed();
762
10
        let sha3_of_whole = cmeta.sha3_256_of_whole();
763
10
        let valid_after: OffsetDateTime = lifetime.valid_after().into();
764
10
        let fresh_until: OffsetDateTime = lifetime.fresh_until().into();
765
10
        let valid_until: OffsetDateTime = lifetime.valid_until().into();
766

            
767
        /// How long to keep a consensus around after it has expired
768
        const CONSENSUS_LIFETIME: time::Duration = time::Duration::days(4);
769

            
770
        // After a few days have passed, a consensus is no good for
771
        // anything at all, not even diffs.
772
10
        let expires = valid_until + CONSENSUS_LIFETIME;
773

            
774
10
        let doctype = format!("con_{}", flavor.name());
775

            
776
10
        let h = self.save_blob_internal(
777
10
            contents.as_bytes(),
778
10
            &doctype,
779
10
            "sha3-256",
780
10
            &sha3_of_whole[..],
781
10
            expires,
782
        )?;
783
10
        h.tx().execute(
784
10
            INSERT_CONSENSUS,
785
10
            params![
786
10
                valid_after,
787
10
                fresh_until,
788
10
                valid_until,
789
10
                flavor.name(),
790
10
                pending,
791
10
                hex::encode(sha3_of_signed),
792
10
                h.digest_string()
793
10
            ],
794
10
        )?;
795
10
        h.commit()?;
796
10
        Ok(())
797
10
    }
798
2
    fn mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
799
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
800
2
        let digest = format!("sha3-256-{}", d);
801

            
802
2
        let tx = self.conn.transaction()?;
803
2
        let n = tx.execute(MARK_CONSENSUS_NON_PENDING, params![digest])?;
804
2
        trace!("Marked {} consensuses usable", n);
805
2
        tx.commit()?;
806

            
807
2
        Ok(())
808
2
    }
809
2
    fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
810
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
811
2
        let digest = format!("sha3-256-{}", d);
812

            
813
        // TODO: We should probably remove the blob as well, but for now
814
        // this is enough.
815
2
        let tx = self.conn.transaction()?;
816
2
        tx.execute(REMOVE_CONSENSUS, params![digest])?;
817
2
        tx.commit()?;
818

            
819
2
        Ok(())
820
2
    }
821

            
822
8
    fn authcerts(&self, certs: &[AuthCertKeyIds]) -> Result<HashMap<AuthCertKeyIds, String>> {
823
8
        let mut result = HashMap::new();
824
        // TODO(nickm): Do I need to get a transaction here for performance?
825
8
        let mut stmt = self.conn.prepare(FIND_AUTHCERT)?;
826

            
827
18
        for ids in certs {
828
10
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
829
10
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
830
10
            if let Some(contents) = stmt
831
13
                .query_row(params![id_digest, sk_digest], |row| row.get::<_, String>(0))
832
10
                .optional()?
833
6
            {
834
6
                result.insert(*ids, contents);
835
6
            }
836
        }
837

            
838
8
        Ok(result)
839
8
    }
840
6
    fn store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()> {
841
6
        let tx = self.conn.transaction()?;
842
6
        let mut stmt = tx.prepare(INSERT_AUTHCERT)?;
843
14
        for (meta, content) in certs {
844
8
            let ids = meta.key_ids();
845
8
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
846
8
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
847
8
            let published: OffsetDateTime = meta.published().into();
848
8
            let expires: OffsetDateTime = meta.expires().into();
849
8
            stmt.execute(params![id_digest, sk_digest, published, expires, content])?;
850
        }
851
6
        stmt.finalize()?;
852
6
        tx.commit()?;
853
6
        Ok(())
854
6
    }
855

            
856
24
    fn microdescs(&self, digests: &[MdDigest]) -> Result<HashMap<MdDigest, String>> {
857
24
        let mut result = HashMap::new();
858
24
        let mut stmt = self.conn.prepare(FIND_MD)?;
859

            
860
        // TODO(nickm): Should I speed this up with a transaction, or
861
        // does it not matter for queries?
862
82
        for md_digest in digests {
863
58
            let h_digest = hex::encode(md_digest);
864
58
            if let Some(contents) = stmt
865
80
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
866
58
                .optional()?
867
44
            {
868
44
                result.insert(*md_digest, contents);
869
44
            }
870
        }
871

            
872
24
        Ok(result)
873
24
    }
874
22
    fn store_microdescs(&mut self, digests: &[(&str, &MdDigest)], when: SystemTime) -> Result<()> {
875
22
        let when: OffsetDateTime = when.into();
876

            
877
22
        let tx = self.conn.transaction()?;
878
22
        let mut stmt = tx.prepare(INSERT_MD)?;
879

            
880
56
        for (content, md_digest) in digests {
881
34
            let h_digest = hex::encode(md_digest);
882
34
            stmt.execute(params![h_digest, when, content])?;
883
        }
884
22
        stmt.finalize()?;
885
22
        tx.commit()?;
886
22
        Ok(())
887
22
    }
888
4
    fn update_microdescs_listed(&mut self, digests: &[MdDigest], when: SystemTime) -> Result<()> {
889
4
        let tx = self.conn.transaction()?;
890
4
        let mut stmt = tx.prepare(UPDATE_MD_LISTED)?;
891
4
        let when: OffsetDateTime = when.into();
892

            
893
8
        for md_digest in digests {
894
4
            let h_digest = hex::encode(md_digest);
895
4
            stmt.execute(params![when, h_digest])?;
896
        }
897

            
898
4
        stmt.finalize()?;
899
4
        tx.commit()?;
900
4
        Ok(())
901
4
    }
902

            
903
    #[cfg(feature = "routerdesc")]
904
6
    fn routerdescs(&self, digests: &[RdDigest]) -> Result<HashMap<RdDigest, String>> {
905
6
        let mut result = HashMap::new();
906
6
        let mut stmt = self.conn.prepare(FIND_RD)?;
907

            
908
        // TODO(nickm): Should I speed this up with a transaction, or
909
        // does it not matter for queries?
910
20
        for rd_digest in digests {
911
14
            let h_digest = hex::encode(rd_digest);
912
14
            if let Some(contents) = stmt
913
18
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
914
14
                .optional()?
915
8
            {
916
8
                result.insert(*rd_digest, contents);
917
8
            }
918
        }
919

            
920
6
        Ok(result)
921
6
    }
922
    #[cfg(feature = "routerdesc")]
923
4
    fn store_routerdescs(&mut self, digests: &[(&str, SystemTime, &RdDigest)]) -> Result<()> {
924
4
        let tx = self.conn.transaction()?;
925
4
        let mut stmt = tx.prepare(INSERT_RD)?;
926

            
927
14
        for (content, when, rd_digest) in digests {
928
10
            let when: OffsetDateTime = (*when).into();
929
10
            let h_digest = hex::encode(rd_digest);
930
10
            stmt.execute(params![h_digest, when, content])?;
931
        }
932
4
        stmt.finalize()?;
933
4
        tx.commit()?;
934
4
        Ok(())
935
4
    }
936

            
937
    #[cfg(feature = "bridge-client")]
938
92
    fn lookup_bridgedesc(&self, bridge: &BridgeConfig) -> Result<Option<CachedBridgeDescriptor>> {
939
92
        let bridge_line = bridge.to_string();
940
92
        Ok(self
941
92
            .conn
942
100
            .query_row(FIND_BRIDGEDESC, params![bridge_line], |row| {
943
16
                let (fetched, document): (OffsetDateTime, _) = row.try_into()?;
944
16
                let fetched = fetched.into();
945
16
                Ok(CachedBridgeDescriptor { fetched, document })
946
16
            })
947
92
            .optional()?)
948
92
    }
949

            
950
    #[cfg(feature = "bridge-client")]
951
24
    fn store_bridgedesc(
952
24
        &mut self,
953
24
        bridge: &BridgeConfig,
954
24
        entry: CachedBridgeDescriptor,
955
24
        until: SystemTime,
956
24
    ) -> Result<()> {
957
24
        if self.is_readonly() {
958
            // Hopefully whoever *does* have the lock will update the cache.
959
            // Otherwise it will contain a stale entry forever
960
            // (which we'll ignore, but waste effort on).
961
            return Ok(());
962
24
        }
963
24
        let bridge_line = bridge.to_string();
964
24
        let row = params![
965
            bridge_line,
966
24
            OffsetDateTime::from(entry.fetched),
967
24
            OffsetDateTime::from(until),
968
            entry.document,
969
        ];
970
24
        self.conn.execute(INSERT_BRIDGEDESC, row)?;
971
24
        Ok(())
972
24
    }
973

            
974
    #[cfg(feature = "bridge-client")]
975
    fn delete_bridgedesc(&mut self, bridge: &BridgeConfig) -> Result<()> {
976
        if self.is_readonly() {
977
            // This is called when we find corrupted or stale cache entries,
978
            // to stop us wasting time on them next time.
979
            // Hopefully whoever *does* have the lock will do this.
980
            return Ok(());
981
        }
982
        let bridge_line = bridge.to_string();
983
        self.conn.execute(DELETE_BRIDGEDESC, params![bridge_line])?;
984
        Ok(())
985
    }
986

            
987
4
    fn update_protocol_recommendations(
988
4
        &mut self,
989
4
        valid_after: SystemTime,
990
4
        protocols: &tor_netdoc::doc::netstatus::ProtoStatuses,
991
4
    ) -> Result<()> {
992
4
        let json =
993
4
            serde_json::to_string(&protocols).map_err(into_internal!("Cannot encode protocols"))?;
994
4
        let params = params![OffsetDateTime::from(valid_after), json];
995
4
        self.conn.execute(UPDATE_PROTOCOL_STATUS, params)?;
996
4
        Ok(())
997
4
    }
998

            
999
405
    fn cached_protocol_recommendations(
405
        &self,
405
    ) -> Result<Option<(SystemTime, tor_netdoc::doc::netstatus::ProtoStatuses)>> {
405
        let opt_row: Option<(OffsetDateTime, String)> = self
405
            .conn
407
            .query_row(FIND_LATEST_PROTOCOL_STATUS, [], |row| {
4
                Ok((row.get(0)?, row.get(1)?))
4
            })
405
            .optional()?;
405
        let (date, json) = match opt_row {
4
            Some(v) => v,
401
            None => return Ok(None),
        };
4
        let date = date.into();
4
        let statuses: tor_netdoc::doc::netstatus::ProtoStatuses =
4
            serde_json::from_str(json.as_str()).map_err(|e| Error::BadJsonInCache(Arc::new(e)))?;
4
        Ok(Some((date, statuses)))
405
    }
}
/// Functionality related to uncommitted blobs.
mod blob_handle {
    use std::path::{Path, PathBuf};
    use crate::Result;
    use rusqlite::Transaction;
    use tor_basic_utils::PathExt as _;
    use tor_error::warn_report;
    /// Handle to a blob that we have saved to disk but
    /// not yet committed to
    /// the database, and the database transaction where we added a reference to it.
    ///
    /// Used to either commit the blob (by calling [`SavedBlobHandle::commit`]),
    /// or roll it back (by dropping the [`SavedBlobHandle`] without committing it.)
    #[must_use]
    pub(super) struct SavedBlobHandle<'a> {
        /// Transaction we're using to add the blob to the ExtDocs table.
        ///
        /// Note that struct fields are dropped in declaration order,
        /// so when we drop an uncommitted SavedBlobHandle,
        /// we roll back the transaction before we delete the file.
        /// (In practice, either order would be fine.)
        tx: Transaction<'a>,
        /// Filename for the file, with respect to the blob directory.
        fname: String,
        /// Declared digest string for this blob. Of the format
        /// "digesttype-hexstr".
        digeststr: String,
        /// An 'unlinker' for the blob file.
        unlinker: Unlinker,
    }
    impl<'a> SavedBlobHandle<'a> {
        /// Construct a SavedBlobHandle from its parts.
34
        pub(super) fn new(
34
            tx: Transaction<'a>,
34
            fname: String,
34
            digeststr: String,
34
            unlinker: Unlinker,
34
        ) -> Self {
34
            Self {
34
                tx,
34
                fname,
34
                digeststr,
34
                unlinker,
34
            }
34
        }
        /// Return a reference to the underlying database transaction.
10
        pub(super) fn tx(&self) -> &Transaction<'a> {
10
            &self.tx
10
        }
        /// Return the digest string of the saved blob.
        /// Other tables use this as a foreign key into ExtDocs.digest
10
        pub(super) fn digest_string(&self) -> &str {
10
            self.digeststr.as_ref()
10
        }
        /// Return the filename of this blob within the blob directory.
        #[allow(unused)] // used for testing.
24
        pub(super) fn fname(&self) -> &str {
24
            self.fname.as_ref()
24
        }
        /// Commit the relevant database transaction.
34
        pub(super) fn commit(self) -> Result<()> {
            // The blob has been written to disk, so it is safe to
            // commit the transaction.
            // If the commit returns an error, self.unlinker will remove the blob.
            // (This could result in a vanished blob if the commit reports an error,
            // but the transaction is still visible in the database.)
34
            self.tx.commit()?;
            // If we reach this point, we don't want to remove the file.
34
            self.unlinker.forget();
34
            Ok(())
34
        }
    }
    /// Handle to a file which we might have to delete.
    ///
    /// When this handle is dropped, the file gets deleted, unless you have
    /// first called [`Unlinker::forget`].
    pub(super) struct Unlinker {
        /// The location of the file to remove, or None if we shouldn't
        /// remove it.
        p: Option<PathBuf>,
    }
    impl Unlinker {
        /// Make a new Unlinker for a given filename.
34
        pub(super) fn new<P: AsRef<Path>>(p: P) -> Self {
34
            Unlinker {
34
                p: Some(p.as_ref().to_path_buf()),
34
            }
34
        }
        /// Forget about this unlinker, so that the corresponding file won't
        /// get dropped.
34
        fn forget(mut self) {
34
            self.p = None;
34
        }
    }
    impl Drop for Unlinker {
34
        fn drop(&mut self) {
34
            if let Some(p) = self.p.take() {
                if let Err(e) = std::fs::remove_file(&p) {
                    warn_report!(
                        e,
                        "Couldn't remove rolled-back blob file {}",
                        p.display_lossy()
                    );
                }
34
            }
34
        }
    }
}
/// Convert a hexadecimal sha3-256 digest from the database into an array.
32
fn digest_from_hex(s: &str) -> Result<[u8; 32]> {
32
    let mut bytes = [0_u8; 32];
32
    hex::decode_to_slice(s, &mut bytes[..]).map_err(Error::BadHexInCache)?;
32
    Ok(bytes)
32
}
/// Convert a hexadecimal sha3-256 "digest string" as used in the
/// digest column from the database into an array.
16
fn digest_from_dstr(s: &str) -> Result<[u8; 32]> {
16
    if let Some(stripped) = s.strip_prefix("sha3-256-") {
16
        digest_from_hex(stripped)
    } else {
        Err(Error::CacheCorruption("Invalid digest in database"))
    }
16
}
/// Create a ConsensusMeta from a `Row` returned by one of
/// `FIND_LATEST_CONSENSUS_META` or `FIND_CONSENSUS_AND_META_BY_DIGEST`.
16
fn cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta> {
16
    let va: OffsetDateTime = row.get(0)?;
16
    let fu: OffsetDateTime = row.get(1)?;
16
    let vu: OffsetDateTime = row.get(2)?;
16
    let d_signed: String = row.get(3)?;
16
    let d_all: String = row.get(4)?;
16
    let lifetime = Lifetime::new(va.into(), fu.into(), vu.into())
16
        .map_err(|_| Error::CacheCorruption("inconsistent lifetime in database"))?;
16
    Ok(ConsensusMeta::new(
16
        lifetime,
16
        digest_from_hex(&d_signed)?,
16
        digest_from_dstr(&d_all)?,
    ))
16
}
/// Return `SystemTime::get()` as an OffsetDateTime in UTC.
24
fn now_utc() -> OffsetDateTime {
24
    SystemTime::get().into()
24
}
/// Set up the tables for the arti cache schema in a sqlite database.
const INSTALL_V0_SCHEMA: &str = "
  -- Helps us version the schema.  The schema here corresponds to a
  -- version number called 'version', and it should be readable by
  -- anybody who is compliant with versions of at least 'readable_by'.
  CREATE TABLE TorSchemaMeta (
     name TEXT NOT NULL PRIMARY KEY,
     version INTEGER NOT NULL,
     readable_by INTEGER NOT NULL
  );
  INSERT INTO TorSchemaMeta (name, version, readable_by) VALUES ( 'TorDirStorage', 0, 0 );
  -- Keeps track of external blobs on disk.
  CREATE TABLE ExtDocs (
    -- Records a digest of the file contents, in the form '<digest_type>-hexstr'
    digest TEXT PRIMARY KEY NOT NULL,
    -- When was this file created?
    created DATE NOT NULL,
    -- After what time will this file definitely be useless?
    expires DATE NOT NULL,
    -- What is the type of this file? Currently supported are 'con_<flavor>'.
    --   (Before tor-dirmgr ~0.28.0, we would erroneously record 'con_flavor' as 'sha3-256';
    --   Nothing depended on this yet, but will be used in the future
    --   as we add more large-document types.)
    type TEXT NOT NULL,
    -- Filename for this file within our blob directory.
    filename TEXT NOT NULL
  );
  -- All the microdescriptors we know about.
  CREATE TABLE Microdescs (
    sha256_digest TEXT PRIMARY KEY NOT NULL,
    last_listed DATE NOT NULL,
    contents BLOB NOT NULL
  );
  -- All the authority certificates we know.
  CREATE TABLE Authcerts (
    id_digest TEXT NOT NULL,
    sk_digest TEXT NOT NULL,
    published DATE NOT NULL,
    expires DATE NOT NULL,
    contents BLOB NOT NULL,
    PRIMARY KEY (id_digest, sk_digest)
  );
  -- All the consensuses we're storing.
  CREATE TABLE Consensuses (
    valid_after DATE NOT NULL,
    fresh_until DATE NOT NULL,
    valid_until DATE NOT NULL,
    flavor TEXT NOT NULL,
    pending BOOLEAN NOT NULL,
    sha3_of_signed_part TEXT NOT NULL,
    digest TEXT NOT NULL,
    FOREIGN KEY (digest) REFERENCES ExtDocs (digest) ON DELETE CASCADE
  );
  CREATE INDEX Consensuses_vu on CONSENSUSES(valid_until);
";
/// Update the database schema, from each version to the next
const UPDATE_SCHEMA: &[&str] = &["
  -- Update the database schema from version 0 to version 1.
  CREATE TABLE RouterDescs (
    sha1_digest TEXT PRIMARY KEY NOT NULL,
    published DATE NOT NULL,
    contents BLOB NOT NULL
  );
","
  -- Update the database schema from version 1 to version 2.
  -- We create this table even if the bridge-client feature is disabled, but then don't touch it at all.
  CREATE TABLE BridgeDescs (
    bridge_line TEXT PRIMARY KEY NOT NULL,
    fetched DATE NOT NULL,
    until DATE NOT NULL,
    contents BLOB NOT NULL
  );
","
 -- Update the database schema from version 2 to version 3.
 -- Table to hold our latest ProtocolStatuses object, to tell us if we're obsolete.
 -- We hold this independently from our consensus,
 -- since we want to read it very early in our startup process,
 -- even if the consensus is expired.
 CREATE TABLE ProtocolStatus (
    -- Enforce that there is only one row in this table.
    -- (This is a bit kludgy, but I am assured that it is a common practice.)
    zero INTEGER PRIMARY KEY NOT NULL,
    -- valid-after date of the consensus from which we got this status
    date DATE NOT NULL,
    -- ProtoStatuses object, encoded as json
    statuses TEXT NOT NULL
 );
"];
/// Update the database schema version tracking, from each version to the next
const UPDATE_SCHEMA_VERSION: &str = "
  UPDATE TorSchemaMeta SET version=? WHERE version<?;
";
/// Version number used for this version of the arti cache schema.
const SCHEMA_VERSION: u32 = UPDATE_SCHEMA.len() as u32;
/// Query: find the latest-expiring microdesc consensus with a given
/// pending status.
const FIND_CONSENSUS_P: &str = "
  SELECT valid_after, valid_until, filename
  FROM Consensuses
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
  WHERE pending = ? AND flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Query: find the latest-expiring microdesc consensus, regardless of
/// pending status.
const FIND_CONSENSUS: &str = "
  SELECT valid_after, valid_until, filename
  FROM Consensuses
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
  WHERE flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Query: Find the valid-after time for the latest-expiring
/// non-pending consensus of a given flavor.
const FIND_LATEST_CONSENSUS_META: &str = "
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, digest
  FROM Consensuses
  WHERE pending = 0 AND flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Look up a consensus by its digest-of-signed-part string.
const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
  FROM Consensuses
  INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
  WHERE Consensuses.sha3_of_signed_part = ?
  LIMIT 1;
";
/// Query: Update the consensus whose digest field is 'digest' to call it
/// no longer pending.
const MARK_CONSENSUS_NON_PENDING: &str = "
  UPDATE Consensuses
  SET pending = 0
  WHERE digest = ?;
";
/// Query: Remove the consensus with a given digest field.
#[allow(dead_code)]
const REMOVE_CONSENSUS: &str = "
  DELETE FROM Consensuses
  WHERE digest = ?;
";
/// Query: Find the authority certificate with given key digests.
const FIND_AUTHCERT: &str = "
  SELECT contents FROM AuthCerts WHERE id_digest = ? AND sk_digest = ?;
";
/// Query: find the microdescriptor with a given hex-encoded sha256 digest
const FIND_MD: &str = "
  SELECT contents
  FROM Microdescs
  WHERE sha256_digest = ?
";
/// Query: find the router descriptors with a given hex-encoded sha1 digest
#[cfg(feature = "routerdesc")]
const FIND_RD: &str = "
  SELECT contents
  FROM RouterDescs
  WHERE sha1_digest = ?
";
/// Query: find every ExtDocs member that has expired.
const FIND_EXPIRED_EXTDOCS: &str = "
  SELECT filename FROM ExtDocs where expires < datetime('now');
";
/// Query: find whether an ExtDoc is listed.
const COUNT_EXTDOC_BY_PATH: &str = "
  SELECT COUNT(*) FROM ExtDocs WHERE filename = ?;
";
/// Query: Add a new entry to ExtDocs.
const INSERT_EXTDOC: &str = "
  INSERT OR REPLACE INTO ExtDocs ( digest, created, expires, type, filename )
  VALUES ( ?, datetime('now'), ?, ?, ? );
";
/// Query: Add a new consensus.
const INSERT_CONSENSUS: &str = "
  INSERT OR REPLACE INTO Consensuses
    ( valid_after, fresh_until, valid_until, flavor, pending, sha3_of_signed_part, digest )
  VALUES ( ?, ?, ?, ?, ?, ?, ? );
";
/// Query: Add a new AuthCert
const INSERT_AUTHCERT: &str = "
  INSERT OR REPLACE INTO Authcerts
    ( id_digest, sk_digest, published, expires, contents)
  VALUES ( ?, ?, ?, ?, ? );
";
/// Query: Add a new microdescriptor
const INSERT_MD: &str = "
  INSERT OR REPLACE INTO Microdescs ( sha256_digest, last_listed, contents )
  VALUES ( ?, ?, ? );
";
/// Query: Add a new router descriptor
#[allow(unused)]
#[cfg(feature = "routerdesc")]
const INSERT_RD: &str = "
  INSERT OR REPLACE INTO RouterDescs ( sha1_digest, published, contents )
  VALUES ( ?, ?, ? );
";
/// Query: Change the time when a given microdescriptor was last listed.
const UPDATE_MD_LISTED: &str = "
  UPDATE Microdescs
  SET last_listed = max(last_listed, ?)
  WHERE sha256_digest = ?;
";
/// Query: Find a cached bridge descriptor
#[cfg(feature = "bridge-client")]
const FIND_BRIDGEDESC: &str = "SELECT fetched, contents FROM BridgeDescs WHERE bridge_line = ?;";
/// Query: Record a cached bridge descriptor
#[cfg(feature = "bridge-client")]
const INSERT_BRIDGEDESC: &str = "
  INSERT OR REPLACE INTO BridgeDescs ( bridge_line, fetched, until, contents )
  VALUES ( ?, ?, ?, ? );
";
/// Query: Remove a cached bridge descriptor
#[cfg(feature = "bridge-client")]
#[allow(dead_code)]
const DELETE_BRIDGEDESC: &str = "DELETE FROM BridgeDescs WHERE bridge_line = ?;";
/// Query: Find all consensus extdocs that are not referenced in the consensus table.
///
/// Note: use of `sha3-256` is a synonym for `con_%` is a workaround.
const FIND_UNREFERENCED_CONSENSUS_EXTDOCS: &str = "
    SELECT filename FROM ExtDocs WHERE
         (type LIKE 'con_%' OR type = 'sha3-256')
    AND NOT EXISTS
         (SELECT digest FROM Consensuses WHERE Consensuses.digest = ExtDocs.digest);";
/// Query: Discard every expired extdoc.
///
/// External documents aren't exposed through [`Store`].
const DROP_OLD_EXTDOCS: &str = "DELETE FROM ExtDocs WHERE expires < datetime('now');";
/// Query: Discard an extdoc with a given path.
const DELETE_EXTDOC_BY_FILENAME: &str = "DELETE FROM ExtDocs WHERE filename = ?;";
/// Query: List all extdoc filenames.
const FIND_ALL_EXTDOC_FILENAMES: &str = "SELECT filename FROM ExtDocs;";
/// Query: Get the latest protocol status.
const FIND_LATEST_PROTOCOL_STATUS: &str = "SELECT date, statuses FROM ProtocolStatus WHERE zero=0;";
/// Query: Update the latest protocol status.
const UPDATE_PROTOCOL_STATUS: &str = "INSERT OR REPLACE INTO ProtocolStatus VALUES ( 0, ?, ? );";
/// Query: Discard every router descriptor that hasn't been listed for 3
/// months.
// TODO: Choose a more realistic time.
const DROP_OLD_ROUTERDESCS: &str = "DELETE FROM RouterDescs WHERE published < ?;";
/// Query: Discard every microdescriptor that hasn't been listed for 3 months.
// TODO: Choose a more realistic time.
const DROP_OLD_MICRODESCS: &str = "DELETE FROM Microdescs WHERE last_listed < ?;";
/// Query: Discard every expired authority certificate.
const DROP_OLD_AUTHCERTS: &str = "DELETE FROM Authcerts WHERE expires < ?;";
/// Query: Discard every consensus that's been expired for at least
/// two days.
const DROP_OLD_CONSENSUSES: &str = "DELETE FROM Consensuses WHERE valid_until < ?;";
/// Query: Discard every bridge descriptor that is too old, or from the future.  (Both ?=now.)
#[cfg(feature = "bridge-client")]
const DROP_OLD_BRIDGEDESCS: &str = "DELETE FROM BridgeDescs WHERE ? > until OR fetched > ?;";
#[cfg(test)]
pub(crate) mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::storage::EXPIRATION_DEFAULTS;
    use digest::Digest;
    use hex_literal::hex;
    use tempfile::{TempDir, tempdir};
    use time::ext::NumericalDuration;
    use tor_llcrypto::d::Sha3_256;
    pub(crate) fn new_empty() -> Result<(TempDir, SqliteStore)> {
        let tmp_dir = tempdir().unwrap();
        let sql_path = tmp_dir.path().join("db.sql");
        let conn = rusqlite::Connection::open(sql_path)?;
        let blob_path = tmp_dir.path().join("blobs");
        let blob_dir = fs_mistrust::Mistrust::builder()
            .dangerously_trust_everyone()
            .build()
            .unwrap()
            .verifier()
            .make_secure_dir(blob_path)
            .unwrap();
        let store = SqliteStore::from_conn(conn, blob_dir)?;
        Ok((tmp_dir, store))
    }
    #[test]
    fn init() -> Result<()> {
        let tmp_dir = tempdir().unwrap();
        let blob_dir = fs_mistrust::Mistrust::builder()
            .dangerously_trust_everyone()
            .build()
            .unwrap()
            .verifier()
            .secure_dir(&tmp_dir)
            .unwrap();
        let sql_path = tmp_dir.path().join("db.sql");
        // Initial setup: everything should work.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Second setup: shouldn't need to upgrade.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Third setup: shouldn't need to upgrade.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            conn.execute_batch("UPDATE TorSchemaMeta SET version = 9002;")?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Fourth: this says we can't read it, so we'll get an error.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            conn.execute_batch("UPDATE TorSchemaMeta SET readable_by = 9001;")?;
            let val = SqliteStore::from_conn(conn, blob_dir);
            assert!(val.is_err());
        }
        Ok(())
    }
    #[test]
    fn bad_blob_fname() -> Result<()> {
        let (_tmp_dir, store) = new_empty()?;
        assert!(store.blob_dir.join("abcd").is_ok());
        assert!(store.blob_dir.join("abcd..").is_ok());
        assert!(store.blob_dir.join("..abcd..").is_ok());
        assert!(store.blob_dir.join(".abcd").is_ok());
        assert!(store.blob_dir.join("..").is_err());
        assert!(store.blob_dir.join("../abcd").is_err());
        assert!(store.blob_dir.join("/abcd").is_err());
        Ok(())
    }
    #[test]
    fn blobs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_week = 1.weeks();
        let fname1 = store.save_blob(
            b"Hello world",
            "greeting",
            "sha1",
            &hex!("7b502c3a1f48c8609ae212cdfb639dee39673f5e"),
            now + one_week,
        )?;
        let fname2 = store.save_blob(
            b"Goodbye, dear friends",
            "greeting",
            "sha1",
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
            now - one_week,
        )?;
        assert_eq!(
            fname1,
            "greeting_sha1-7b502c3a1f48c8609ae212cdfb639dee39673f5e"
        );
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
            b"Hello world"
        );
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname2)?).unwrap()[..],
            b"Goodbye, dear friends"
        );
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 2);
        let blob = store.read_blob(&fname2)?.unwrap();
        assert_eq!(blob.as_str().unwrap(), "Goodbye, dear friends");
        // Now expire: the second file should go away.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
            b"Hello world"
        );
        assert!(std::fs::read(store.blob_dir.join(&fname2)?).is_err());
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 1);
        Ok(())
    }
    #[test]
    fn consensus() -> Result<()> {
        use tor_netdoc::doc::netstatus;
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_hour = 1.hours();
        assert_eq!(
            store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
            None
        );
        let cmeta = ConsensusMeta::new(
            netstatus::Lifetime::new(
                now.into(),
                (now + one_hour).into(),
                SystemTime::from(now + one_hour * 2),
            )
            .unwrap(),
            [0xAB; 32],
            [0xBC; 32],
        );
        store.store_consensus(
            &cmeta,
            ConsensusFlavor::Microdesc,
            true,
            "Pretend this is a consensus",
        )?;
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                None
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store.latest_consensus(ConsensusFlavor::Microdesc, Some(false))?;
            assert!(consensus.is_none());
        }
        store.mark_consensus_usable(&cmeta)?;
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                now.into()
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, Some(false))?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
        }
        {
            let consensus_text = store.consensus_by_meta(&cmeta)?;
            assert_eq!(consensus_text.as_str()?, "Pretend this is a consensus");
            let (is, _cmeta2) = store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .unwrap();
            assert_eq!(is.as_str()?, "Pretend this is a consensus");
            let cmeta3 = ConsensusMeta::new(
                netstatus::Lifetime::new(
                    now.into(),
                    (now + one_hour).into(),
                    SystemTime::from(now + one_hour * 2),
                )
                .unwrap(),
                [0x99; 32],
                [0x99; 32],
            );
            assert!(store.consensus_by_meta(&cmeta3).is_err());
            assert!(
                store
                    .consensus_by_sha3_digest_of_signed_part(&[0x99; 32])?
                    .is_none()
            );
        }
        {
            assert!(
                store
                    .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                    .is_some()
            );
            store.delete_consensus(&cmeta)?;
            assert!(
                store
                    .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                    .is_none()
            );
        }
        Ok(())
    }
    #[test]
    fn authcerts() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_hour = 1.hours();
        let keyids = AuthCertKeyIds {
            id_fingerprint: [3; 20].into(),
            sk_fingerprint: [4; 20].into(),
        };
        let keyids2 = AuthCertKeyIds {
            id_fingerprint: [4; 20].into(),
            sk_fingerprint: [3; 20].into(),
        };
        let m1 = AuthCertMeta::new(keyids, now.into(), SystemTime::from(now + one_hour * 24));
        store.store_authcerts(&[(m1, "Pretend this is a cert")])?;
        let certs = store.authcerts(&[keyids, keyids2])?;
        assert_eq!(certs.len(), 1);
        assert_eq!(certs.get(&keyids).unwrap(), "Pretend this is a cert");
        Ok(())
    }
    #[test]
    fn microdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_day = 1.days();
        let d1 = [5_u8; 32];
        let d2 = [7; 32];
        let d3 = [42; 32];
        let d4 = [99; 32];
        let long_ago: OffsetDateTime = now - one_day * 100;
        store.store_microdescs(
            &[
                ("Fake micro 1", &d1),
                ("Fake micro 2", &d2),
                ("Fake micro 3", &d3),
            ],
            long_ago.into(),
        )?;
        store.update_microdescs_listed(&[d2], now.into())?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 2);
        assert_eq!(mds.get(&d1), None);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        assert_eq!(mds.get(&d3).unwrap(), "Fake micro 3");
        assert_eq!(mds.get(&d4), None);
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 1);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        Ok(())
    }
    #[test]
    #[cfg(feature = "routerdesc")]
    fn routerdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_day = 1.days();
        let long_ago: OffsetDateTime = now - one_day * 100;
        let recently = now - one_day;
        let d1 = [5_u8; 20];
        let d2 = [7; 20];
        let d3 = [42; 20];
        let d4 = [99; 20];
        store.store_routerdescs(&[
            ("Fake routerdesc 1", long_ago.into(), &d1),
            ("Fake routerdesc 2", recently.into(), &d2),
            ("Fake routerdesc 3", long_ago.into(), &d3),
        ])?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 2);
        assert_eq!(rds.get(&d1), None);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        assert_eq!(rds.get(&d3).unwrap(), "Fake routerdesc 3");
        assert_eq!(rds.get(&d4), None);
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 1);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        Ok(())
    }
    #[test]
    fn from_path_rw() -> Result<()> {
        let tmp = tempdir().unwrap();
        let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
        // Nothing there: can't open read-only
        let r = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true);
        assert!(r.is_err());
        assert!(!tmp.path().join("dir_blobs").try_exists().unwrap());
        // Opening it read-write will crate the files
        {
            let mut store = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, false)?;
            assert!(tmp.path().join("dir_blobs").is_dir());
            assert!(store.lockfile.is_some());
            assert!(!store.is_readonly());
            assert!(store.upgrade_to_readwrite()?); // no-op.
        }
        // At this point, we can successfully make a read-only connection.
        {
            let mut store2 = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true)?;
            assert!(store2.is_readonly());
            // Nobody else is locking this, so we can upgrade.
            assert!(store2.upgrade_to_readwrite()?); // no-op.
            assert!(!store2.is_readonly());
        }
        Ok(())
    }
    #[test]
    fn orphaned_blobs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        /*
        for ent in store.blob_dir.read_directory(".")?.flatten() {
            println!("{:?}", ent);
        }
        */
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
        let now = now_utc();
        let one_week = 1.weeks();
        let _fname_good = store.save_blob(
            b"Goodbye, dear friends",
            "greeting",
            "sha1",
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
            now + one_week,
        )?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
        // Now, create a two orphaned blobs: one with a recent timestamp, and one with an older
        // timestamp.
        store
            .blob_dir
            .write_and_replace("fairly_new", b"new contents will stay")?;
        store
            .blob_dir
            .write_and_replace("fairly_old", b"old contents will be removed")?;
        filetime::set_file_mtime(
            store.blob_dir.join("fairly_old")?,
            SystemTime::from(now - one_week).into(),
        )
        .expect("Can't adjust mtime");
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 3);
        store.remove_unreferenced_blobs(now, &EXPIRATION_DEFAULTS)?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 2);
        Ok(())
    }
    #[test]
    fn unreferenced_consensus_blob() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_week = 1.weeks();
        // Make a blob that claims to be a consensus, and which has not yet expired, but which is
        // not listed in the consensus table.  It should get removed.
        let fname = store.save_blob(
            b"pretend this is a consensus",
            "con_fake",
            "sha1",
            &hex!("803e5a45eea7766a62a735e051a25a50ffb9b1cf"),
            now + one_week,
        )?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname)?).unwrap()[..],
            b"pretend this is a consensus"
        );
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 1);
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 0);
        Ok(())
    }
    #[test]
    fn vanished_blob_cleanup() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_week = 1.weeks();
        // Make a few blobs.
        let mut fnames = vec![];
        for idx in 0..8 {
            let content = format!("Example {idx}");
            let digest = Sha3_256::digest(content.as_bytes());
            let fname = store.save_blob(
                content.as_bytes(),
                "blob",
                "sha3-256",
                digest.as_slice(),
                now + one_week,
            )?;
            fnames.push(fname);
        }
        // Delete the odd-numbered blobs.
        store.blob_dir.remove_file(&fnames[1])?;
        store.blob_dir.remove_file(&fnames[3])?;
        store.blob_dir.remove_file(&fnames[5])?;
        store.blob_dir.remove_file(&fnames[7])?;
        let n_removed = {
            let tx = store.conn.transaction()?;
            let n = SqliteStore::remove_entries_for_vanished_blobs(&store.blob_dir, &tx)?;
            tx.commit()?;
            n
        };
        assert_eq!(n_removed, 4);
        // Make sure that it was the _odd-numbered_ ones that got deleted from the DB.
        let (n_1,): (u32,) =
            store
                .conn
                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[1]], |row| {
                    row.try_into()
                })?;
        let (n_2,): (u32,) =
            store
                .conn
                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[2]], |row| {
                    row.try_into()
                })?;
        assert_eq!(n_1, 0);
        assert_eq!(n_2, 1);
        Ok(())
    }
    #[test]
    fn protocol_statuses() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = SystemTime::get();
        let hour = 1.hours();
        let valid_after = now;
        let protocols = serde_json::from_str(
            r#"{
            "client":{
                "required":"Link=5 LinkAuth=3",
                "recommended":"Link=1-5 LinkAuth=2-5"
            },
            "relay":{
                "required":"Wombat=20-22 Knish=25-27",
                "recommended":"Wombat=20-30 Knish=20-30"
            }
            }"#,
        )
        .unwrap();
        let v = store.cached_protocol_recommendations()?;
        assert!(v.is_none());
        store.update_protocol_recommendations(valid_after, &protocols)?;
        let v = store.cached_protocol_recommendations()?.unwrap();
        assert_eq!(v.0, now);
        assert_eq!(
            serde_json::to_string(&protocols).unwrap(),
            serde_json::to_string(&v.1).unwrap()
        );
        let protocols2 = serde_json::from_str(
            r#"{
            "client":{
                "required":"Link=5 ",
                "recommended":"Link=1-5"
            },
            "relay":{
                "required":"Wombat=20",
                "recommended":"Cons=6"
            }
            }"#,
        )
        .unwrap();
        let valid_after_2 = now + hour;
        store.update_protocol_recommendations(valid_after_2, &protocols2)?;
        let v = store.cached_protocol_recommendations()?.unwrap();
        assert_eq!(v.0, now + hour);
        assert_eq!(
            serde_json::to_string(&protocols2).unwrap(),
            serde_json::to_string(&v.1).unwrap()
        );
        Ok(())
    }
}