1
//! Net document storage backed by sqlite3.
2
//!
3
//! We store most objects in sqlite tables, except for very large ones,
4
//! which we store as "blob" files in a separate directory.
5

            
6
use super::ExpirationConfig;
7
use crate::docmeta::{AuthCertMeta, ConsensusMeta};
8
use crate::err::ReadOnlyStorageError;
9
use crate::storage::{InputString, Store};
10
use crate::{Error, Result};
11

            
12
use fs_mistrust::CheckedDir;
13
use tor_basic_utils::PathExt as _;
14
use tor_error::{internal, into_internal, warn_report};
15
use tor_netdoc::doc::authcert::AuthCertKeyIds;
16
use tor_netdoc::doc::microdesc::MdDigest;
17
use tor_netdoc::doc::netstatus::{ConsensusFlavor, Lifetime};
18
#[cfg(feature = "routerdesc")]
19
use tor_netdoc::doc::routerdesc::RdDigest;
20
use web_time_compat::SystemTimeExt;
21

            
22
#[cfg(feature = "bridge-client")]
23
pub(crate) use {crate::storage::CachedBridgeDescriptor, tor_guardmgr::bridge::BridgeConfig};
24

            
25
use std::collections::{HashMap, HashSet};
26
use std::fs::OpenOptions;
27
use std::path::{Path, PathBuf};
28
use std::result::Result as StdResult;
29
use std::sync::Arc;
30
use std::time::SystemTime;
31

            
32
use fslock_guard::LockFileGuard;
33
use rusqlite::{OpenFlags, OptionalExtension, Transaction, params};
34
use time::OffsetDateTime;
35
use tracing::{trace, warn};
36

            
37
/// Possible status of a lockfile.
38
///
39
/// (Sqlite does its own locking, but we would like to cover the blobs directory
40
/// as well)
41
enum LockFile {
42
    /// We are not even trying to lock, but permitting write operations
43
    /// regardless.
44
    ///
45
    /// This is the implementation we use for ephemeral testing databases.
46
    /// Don't use it in production!
47
    NotLocking,
48

            
49
    /// We aren't locked.
50
    ///
51
    /// The provided path is the path to the lockfile that we will try to open if
52
    /// we
53
    Unlocked(PathBuf),
54

            
55
    /// We have the lock.
56
    ///
57
    Locked(
58
        // We never need to read this field; we only need to hold it so that the
59
        // lock file isn't closed.
60
        #[allow(unused)] LockFileGuard,
61
    ),
62
}
63

            
64
/// Local directory cache using a Sqlite3 connection.
65
pub(crate) struct SqliteStore {
66
    /// Connection to the sqlite3 database.
67
    conn: rusqlite::Connection,
68
    /// Location for the sqlite3 database; used to reopen it.
69
    sql_path: Option<PathBuf>,
70
    /// Location to store blob files.
71
    blob_dir: CheckedDir,
72
    /// Lockfile to prevent concurrent write attempts from different
73
    /// processes.
74
    ///
75
    /// If this is LockFile::NotLocking we aren't using a lockfile.  Watch out!
76
    ///
77
    /// (sqlite supports that with connection locking, but we want to
78
    /// be a little more coarse-grained here)
79
    lockfile: LockFile,
80
}
81

            
82
/// # Some notes on blob consistency, and the lack thereof.
83
///
84
/// We store large documents (currently, consensuses) in separate files,
85
/// called "blobs",
86
/// outside of the sqlite database.
87
/// We do this for performance reasons: for large objects,
88
/// mmap is far more efficient than sqlite in RAM and CPU.
89
///
90
/// In the sqlite database, we keep track of our blobs
91
/// using the ExtDocs table.
92
/// This scheme makes it possible for the blobs and the table
93
/// get out of sync.
94
///
95
/// In summary:
96
///   - _Vanished_ blobs (ones present only in ExtDocs) are possible;
97
///     we try to tolerate them.
98
///   - _Orphaned_ blobs (ones present only on the disk) are possible;
99
///     we try to tolerate them.
100
///   - _Corrupted_ blobs (ones with the wrong contents) are possible
101
///     but (we hope) unlikely;
102
///     we do not currently try to tolerate them.
103
///
104
/// In more detail:
105
///
106
/// Here are the practices we use when _writing_ blobs:
107
///
108
/// - We always create a blob before updating the ExtDocs table,
109
///   and remove an entry from the ExtDocs before deleting the blob.
110
/// - If we decide to roll back the transaction that adds the row to ExtDocs,
111
///   we delete the blob after doing so.
112
/// - We use [`CheckedDir::write_and_replace`] to store blobs,
113
///   so a half-formed blob shouldn't be common.
114
///   (We assume that "close" and "rename" are serialized by the OS,
115
///   so that _if_ the rename happens, the file is completely written.)
116
/// - Blob filenames include a digest of the file contents,
117
///   so collisions are unlikely.
118
///
119
/// Here are the practices we use when _deleting_ blobs:
120
/// - First, we drop the row from the ExtDocs table.
121
///   Only then do we delete the file.
122
///
123
/// These practices can result in _orphaned_ blobs
124
/// (ones with no row in the ExtDoc table),
125
/// or in _half-written_ blobs files with tempfile names
126
/// (which also have no row in the ExtDoc table).
127
/// This happens if we crash at the wrong moment.
128
/// Such blobs can be safely removed;
129
/// we do so in [`SqliteStore::remove_unreferenced_blobs`].
130
///
131
/// Despite our efforts, _vanished_ blobs
132
/// (entries in the ExtDoc table with no corresponding file)
133
/// are also possible.  They could happen for these reasons:
134
/// - The filesystem might not serialize or sync things in a way that's
135
///   consistent with the DB.
136
/// - An automatic process might remove random cache files.
137
/// - The user might run around deleting things to free space.
138
///
139
/// We try to tolerate vanished blobs.
140
///
141
/// _Corrupted_ blobs are also possible.  They can happen on FS corruption,
142
/// or on somebody messing around with the cache directory manually.
143
/// We do not attempt to tolerate corrupted blobs.
144
///
145
/// ## On trade-offs
146
///
147
/// TODO: The practices described above are more likely
148
/// to create _orphaned_ blobs than _vanished_ blobs.
149
/// We initially made this trade-off decision on the mistaken theory
150
/// that we could avoid vanished blobs entirely.
151
/// We _may_ want to revisit this choice,
152
/// on the rationale that we can respond to vanished blobs as soon as we notice they're gone,
153
/// whereas we can only handle orphaned blobs with a periodic cleanup.
154
/// On the other hand, since we need to handle both cases,
155
/// it may not matter very much in practice.
156
#[allow(unused)]
157
mod blob_consistency {}
158

            
159
/// Specific error returned when a blob will not be read.
160
///
161
/// This error is an internal type: it's never returned to the user.
162
#[derive(Debug)]
163
enum AbsentBlob {
164
    /// We did not find a blob file on the disk.
165
    VanishedFile,
166
    /// We did not even find a blob to read in ExtDocs.
167
    NothingToRead,
168
}
169

            
170
impl SqliteStore {
171
    /// Construct or open a new SqliteStore at some location on disk.
172
    /// The provided location must be a directory, or a possible
173
    /// location for a directory: the directory will be created if
174
    /// necessary.
175
    ///
176
    /// If readonly is true, the result will be a read-only store.
177
    /// Otherwise, when readonly is false, the result may be
178
    /// read-only or read-write, depending on whether we can acquire
179
    /// the lock.
180
    ///
181
    /// # Limitations:
182
    ///
183
    /// The file locking that we use to ensure that only one dirmgr is
184
    /// writing to a given storage directory at a time is currently
185
    /// _per process_. Therefore, you might get unexpected results if
186
    /// two SqliteStores are created in the same process with the
187
    /// path.
188
411
    pub(crate) fn from_path_and_mistrust<P: AsRef<Path>>(
189
411
        path: P,
190
411
        mistrust: &fs_mistrust::Mistrust,
191
411
        mut readonly: bool,
192
411
    ) -> Result<Self> {
193
411
        let path = path.as_ref();
194
411
        let sqlpath = path.join("dir.sqlite3");
195
411
        let blobpath = path.join("dir_blobs/");
196
411
        let lockpath = path.join("dir.lock");
197

            
198
411
        let verifier = mistrust.verifier().permit_readable().check_content();
199

            
200
411
        let blob_dir = if readonly {
201
4
            verifier.secure_dir(blobpath)?
202
        } else {
203
407
            verifier.make_secure_dir(blobpath)?
204
        };
205

            
206
        // Check permissions on the sqlite and lock files; don't require them to
207
        // exist.
208
818
        for p in [&lockpath, &sqlpath] {
209
818
            match mistrust
210
818
                .verifier()
211
818
                .permit_readable()
212
818
                .require_file()
213
818
                .check(p)
214
            {
215
818
                Ok(()) | Err(fs_mistrust::Error::NotFound(_)) => {}
216
                Err(e) => return Err(e.into()),
217
            }
218
        }
219

            
220
409
        let lockfile = if !readonly {
221
407
            match LockFileGuard::try_lock(&lockpath).map_err(Error::from_lockfile)? {
222
407
                Some(guard) => LockFile::Locked(guard),
223
                None => {
224
                    // We couldn't get the lock.
225
                    readonly = true;
226
                    LockFile::Unlocked(lockpath)
227
                }
228
            }
229
        } else {
230
2
            LockFile::Unlocked(lockpath)
231
        };
232

            
233
409
        let flags = if readonly {
234
2
            OpenFlags::SQLITE_OPEN_READ_ONLY
235
        } else {
236
407
            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
237
        };
238
409
        let conn = rusqlite::Connection::open_with_flags(&sqlpath, flags)?;
239
409
        let mut store = SqliteStore::from_conn_internal(conn, blob_dir, readonly)?;
240
409
        store.sql_path = Some(sqlpath);
241
409
        store.lockfile = lockfile;
242
409
        Ok(store)
243
411
    }
244

            
245
    /// Construct a new SqliteStore from a database connection and a location
246
    /// for blob files.
247
    ///
248
    /// Used for testing with a memory-backed database.
249
    ///
250
    /// Note: `blob_dir` must not be used for anything other than storing the blobs associated with
251
    /// this database, since we will freely remove unreferenced files from this directory.
252
    #[cfg(test)]
253
44
    fn from_conn(conn: rusqlite::Connection, blob_dir: CheckedDir) -> Result<Self> {
254
44
        Self::from_conn_internal(conn, blob_dir, false)
255
44
    }
256

            
257
    /// Construct a new SqliteStore from a database connection and a location
258
    /// for blob files.
259
    ///
260
    /// The `readonly` argument specifies whether the database connection should be read-only.
261
453
    fn from_conn_internal(
262
453
        conn: rusqlite::Connection,
263
453
        blob_dir: CheckedDir,
264
453
        readonly: bool,
265
453
    ) -> Result<Self> {
266
        // sqlite (as of Jun 2024) does not enforce foreign keys automatically unless you set this
267
        // pragma on the connection.
268
453
        conn.pragma_update(None, "foreign_keys", "ON")?;
269

            
270
453
        let mut result = SqliteStore {
271
453
            conn,
272
453
            blob_dir,
273
453
            lockfile: LockFile::NotLocking,
274
453
            sql_path: None,
275
453
        };
276

            
277
453
        result.check_schema(readonly)?;
278

            
279
451
        Ok(result)
280
453
    }
281

            
282
    /// Check whether this database has a schema format we can read, and
283
    /// install or upgrade the schema if necessary.
284
453
    fn check_schema(&mut self, readonly: bool) -> Result<()> {
285
453
        let tx = self.conn.transaction()?;
286
453
        let db_n_tables: u32 = tx.query_row(
287
453
            "SELECT COUNT(name) FROM sqlite_master
288
453
             WHERE type='table'
289
453
             AND name NOT LIKE 'sqlite_%'",
290
453
            [],
291
453
            |row| row.get(0),
292
        )?;
293
453
        let db_exists = db_n_tables > 0;
294

            
295
        // Update the schema from current_vsn to the latest (does not commit)
296
494
        let update_schema = |tx: &rusqlite::Transaction, current_vsn| {
297
1335
            for (from_vsn, update) in UPDATE_SCHEMA.iter().enumerate() {
298
1335
                let from_vsn = u32::try_from(from_vsn).expect("schema version >2^32");
299
1335
                let new_vsn = from_vsn + 1;
300
1335
                if current_vsn < new_vsn {
301
1335
                    tx.execute_batch(update)?;
302
1335
                    tx.execute(UPDATE_SCHEMA_VERSION, params![new_vsn, new_vsn])?;
303
                }
304
            }
305
445
            Ok::<_, Error>(())
306
445
        };
307

            
308
453
        if !db_exists {
309
445
            if !readonly {
310
445
                tx.execute_batch(INSTALL_V0_SCHEMA)?;
311
445
                update_schema(&tx, 0)?;
312
445
                tx.commit()?;
313
            } else {
314
                // The other process should have created the database!
315
                return Err(Error::ReadOnlyStorage(ReadOnlyStorageError::NoDatabase));
316
            }
317
445
            return Ok(());
318
8
        }
319

            
320
8
        let (version, readable_by): (u32, u32) = tx.query_row(
321
8
            "SELECT version, readable_by FROM TorSchemaMeta
322
8
             WHERE name = 'TorDirStorage'",
323
8
            [],
324
8
            |row| Ok((row.get(0)?, row.get(1)?)),
325
        )?;
326

            
327
8
        if version < SCHEMA_VERSION {
328
            if !readonly {
329
                update_schema(&tx, version)?;
330
                tx.commit()?;
331
            } else {
332
                return Err(Error::ReadOnlyStorage(
333
                    ReadOnlyStorageError::IncompatibleSchema {
334
                        schema: version,
335
                        supported: SCHEMA_VERSION,
336
                    },
337
                ));
338
            }
339

            
340
            return Ok(());
341
8
        } else if readable_by > SCHEMA_VERSION {
342
2
            return Err(Error::UnrecognizedSchema {
343
2
                schema: readable_by,
344
2
                supported: SCHEMA_VERSION,
345
2
            });
346
6
        }
347

            
348
        // rolls back the transaction, but nothing was done.
349
6
        Ok(())
350
453
    }
351

            
352
    /// Read a blob from disk, mapping it if possible.
353
    ///
354
    /// Return `Ok(Err(.))` if the file for the blob was not found on disk;
355
    /// returns an error in other cases.
356
    ///
357
    /// (See [`blob_consistency`] for information on why the blob might be absent.)
358
22
    fn read_blob(&self, path: &str) -> Result<StdResult<InputString, AbsentBlob>> {
359
22
        let file = match self.blob_dir.open(path, OpenOptions::new().read(true)) {
360
22
            Ok(file) => file,
361
            Err(fs_mistrust::Error::NotFound(_)) => {
362
                warn!(
363
                    "{:?} was listed in the database, but its corresponding file had been deleted",
364
                    path
365
                );
366
                return Ok(Err(AbsentBlob::VanishedFile));
367
            }
368
            Err(e) => return Err(e.into()),
369
        };
370

            
371
22
        InputString::load(file)
372
22
            .map_err(|err| Error::CacheFile {
373
                action: "loading",
374
                fname: PathBuf::from(path),
375
                error: Arc::new(err),
376
            })
377
22
            .map(Ok)
378
22
    }
379

            
380
    /// Write a file to disk as a blob, and record it in the ExtDocs table.
381
    ///
382
    /// Return a SavedBlobHandle that describes where the blob is, and which
383
    /// can be used either to commit the blob or delete it.
384
    ///
385
    /// See [`blob_consistency`] for more information on guarantees.
386
34
    fn save_blob_internal(
387
34
        &mut self,
388
34
        contents: &[u8],
389
34
        doctype: &str,
390
34
        digest_type: &str,
391
34
        digest: &[u8],
392
34
        expires: OffsetDateTime,
393
34
    ) -> Result<blob_handle::SavedBlobHandle<'_>> {
394
34
        let digest = hex::encode(digest);
395
34
        let digeststr = format!("{}-{}", digest_type, digest);
396
34
        let fname = format!("{}_{}", doctype, digeststr);
397

            
398
34
        let full_path = self.blob_dir.join(&fname)?;
399
34
        let unlinker = blob_handle::Unlinker::new(&full_path);
400
34
        self.blob_dir
401
34
            .write_and_replace(&fname, contents)
402
34
            .map_err(|e| match e {
403
                fs_mistrust::Error::Io { err, .. } => Error::CacheFile {
404
                    action: "saving",
405
                    fname: full_path,
406
                    error: err,
407
                },
408
                err => err.into(),
409
            })?;
410

            
411
34
        let tx = self.conn.unchecked_transaction()?;
412
34
        tx.execute(INSERT_EXTDOC, params![digeststr, expires, doctype, fname])?;
413

            
414
34
        Ok(blob_handle::SavedBlobHandle::new(
415
34
            tx, fname, digeststr, unlinker,
416
34
        ))
417
34
    }
418

            
419
    /// As `latest_consensus`, but do not retry.
420
16
    fn latest_consensus_internal(
421
16
        &self,
422
16
        flavor: ConsensusFlavor,
423
16
        pending: Option<bool>,
424
16
    ) -> Result<StdResult<InputString, AbsentBlob>> {
425
16
        trace!(?flavor, ?pending, "Loading latest consensus from cache");
426
16
        let rv: Option<(OffsetDateTime, OffsetDateTime, String)> = match pending {
427
12
            None => self
428
12
                .conn
429
16
                .query_row(FIND_CONSENSUS, params![flavor.name()], |row| row.try_into())
430
12
                .optional()?,
431
4
            Some(pending_val) => self
432
4
                .conn
433
4
                .query_row(
434
4
                    FIND_CONSENSUS_P,
435
4
                    params![pending_val, flavor.name()],
436
2
                    |row| row.try_into(),
437
                )
438
4
                .optional()?,
439
        };
440

            
441
16
        if let Some((_va, _vu, filename)) = rv {
442
            // TODO blobs: If the cache is inconsistent (because this blob is _vanished_), and the cache has not yet
443
            // been cleaned, this may fail to find the latest consensus that we actually have.
444
10
            self.read_blob(&filename)
445
        } else {
446
6
            Ok(Err(AbsentBlob::NothingToRead))
447
        }
448
16
    }
449

            
450
    /// Save a blob to disk and commit it.
451
    #[cfg(test)]
452
24
    fn save_blob(
453
24
        &mut self,
454
24
        contents: &[u8],
455
24
        doctype: &str,
456
24
        digest_type: &str,
457
24
        digest: &[u8],
458
24
        expires: OffsetDateTime,
459
24
    ) -> Result<String> {
460
24
        let h = self.save_blob_internal(contents, doctype, digest_type, digest, expires)?;
461
24
        let fname = h.fname().to_string();
462
24
        h.commit()?;
463
24
        Ok(fname)
464
24
    }
465

            
466
    /// Return the valid-after time for the latest non non-pending consensus,
467
    #[cfg(test)]
468
    // We should revise the tests to use latest_consensus_meta instead.
469
6
    fn latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>> {
470
6
        Ok(self
471
6
            .latest_consensus_meta(flavor)?
472
7
            .map(|m| m.lifetime().valid_after().into()))
473
6
    }
474

            
475
    /// Remove the blob with name `fname`, but do not give an error on failure.
476
    ///
477
    /// See [`blob_consistency`]: we should call this only having first ensured
478
    /// that the blob is removed from the ExtDocs table.
479
2
    fn remove_blob_or_warn<P: AsRef<Path>>(&self, fname: P) {
480
2
        let fname = fname.as_ref();
481
2
        if let Err(e) = self.blob_dir.remove_file(fname) {
482
            warn_report!(e, "Unable to remove {}", fname.display_lossy());
483
2
        }
484
2
    }
485

            
486
    /// Delete any blob files that are old enough, and not mentioned in the ExtDocs table.
487
    ///
488
    /// There shouldn't typically be any, but we don't want to let our cache grow infinitely
489
    /// if we have a bug.
490
10
    fn remove_unreferenced_blobs(
491
10
        &self,
492
10
        now: OffsetDateTime,
493
10
        expiration: &ExpirationConfig,
494
10
    ) -> Result<()> {
495
        // Now, look for any unreferenced blobs that are a bit old.
496
10
        for ent in self.blob_dir.read_directory(".")?.flatten() {
497
8
            let md_error = |io_error| Error::CacheFile {
498
                action: "getting metadata",
499
                fname: ent.file_name().into(),
500
                error: Arc::new(io_error),
501
            };
502
8
            if ent
503
8
                .metadata()
504
8
                .map_err(md_error)?
505
8
                .modified()
506
8
                .map_err(md_error)?
507
8
                + expiration.consensuses
508
8
                >= now
509
            {
510
                // this file is sufficiently recent that we should not remove it, just to be cautious.
511
6
                continue;
512
2
            }
513
2
            let filename = match ent.file_name().into_string() {
514
2
                Ok(s) => s,
515
                Err(os_str) => {
516
                    // This filename wasn't utf-8.  We will never create one of these.
517
                    warn!(
518
                        "Removing bizarre file '{}' from blob store.",
519
                        os_str.to_string_lossy()
520
                    );
521
                    self.remove_blob_or_warn(ent.file_name());
522
                    continue;
523
                }
524
            };
525
2
            let found: (u32,) =
526
2
                self.conn
527
3
                    .query_row(COUNT_EXTDOC_BY_PATH, params![&filename], |row| {
528
2
                        row.try_into()
529
2
                    })?;
530
2
            if found == (0,) {
531
2
                warn!("Removing unreferenced file '{}' from blob store", &filename);
532
2
                self.remove_blob_or_warn(ent.file_name());
533
            }
534
        }
535

            
536
10
        Ok(())
537
10
    }
538

            
539
    /// Remove any entry in the ExtDocs table for which a blob file is vanished.
540
    ///
541
    /// This method is `O(n)` in the size of the ExtDocs table and the size of the directory.
542
    /// It doesn't take self, to avoid problems with the borrow checker.
543
2
    fn remove_entries_for_vanished_blobs<'a>(
544
2
        blob_dir: &CheckedDir,
545
2
        tx: &Transaction<'a>,
546
2
    ) -> Result<usize> {
547
2
        let in_directory: HashSet<PathBuf> = blob_dir
548
2
            .read_directory(".")?
549
2
            .flatten()
550
9
            .map(|dir_entry| PathBuf::from(dir_entry.file_name()))
551
2
            .collect();
552
2
        let in_db: Vec<String> = tx
553
2
            .prepare(FIND_ALL_EXTDOC_FILENAMES)?
554
17
            .query_map([], |row| row.get::<_, String>(0))?
555
2
            .collect::<StdResult<Vec<String>, _>>()?;
556

            
557
2
        let mut n_removed = 0;
558
16
        for fname in in_db {
559
16
            if in_directory.contains(Path::new(&fname)) {
560
                // The blob is present; great!
561
8
                continue;
562
8
            }
563

            
564
8
            n_removed += tx.execute(DELETE_EXTDOC_BY_FILENAME, [fname])?;
565
        }
566

            
567
2
        Ok(n_removed)
568
2
    }
569
}
570

            
571
impl Store for SqliteStore {
572
34
    fn is_readonly(&self) -> bool {
573
34
        match &self.lockfile {
574
24
            LockFile::NotLocking => false, // no locks used; we can always write.
575
2
            LockFile::Unlocked(_) => true, // lock in use but we don't have it; can't write.
576
8
            LockFile::Locked(_) => false,  // we have the lock; we can write.
577
        }
578
34
    }
579

            
580
4
    fn upgrade_to_readwrite(&mut self) -> Result<bool> {
581
4
        let Some(sql_path) = self.sql_path.as_ref() else {
582
            // This is an ephemeral database with no disk representation.
583
            return Ok(true);
584
        };
585

            
586
4
        let lockpath = match &self.lockfile {
587
            LockFile::NotLocking => {
588
                // This should be unreachable.
589
                return Err(
590
                    internal!("No lockfile open; cannot upgrade to read-write storage").into(),
591
                );
592
            }
593
2
            LockFile::Locked(_) => return Ok(true),
594
2
            LockFile::Unlocked(path) => path,
595
        };
596
        // We aren't locked. Try to fix that.
597
2
        let Some(guard) = LockFileGuard::try_lock(lockpath).map_err(Error::from_lockfile)? else {
598
            // Somebody else has the lock.
599
            return Ok(false);
600
        };
601

            
602
        // Open a fresh RW sql connection. If it fails, we'll unlock the guard
603
        // and remain in our old state.
604
2
        let new_conn = rusqlite::Connection::open(sql_path)?;
605
2
        self.conn = new_conn;
606
2
        self.lockfile = LockFile::Locked(guard);
607
2
        Ok(true)
608
4
    }
609
8
    fn expire_all(&mut self, expiration: &ExpirationConfig) -> Result<()> {
610
8
        let tx = self.conn.transaction()?;
611
        // This works around a false positive; see
612
        //   https://github.com/rust-lang/rust-clippy/issues/8114
613
        #[allow(clippy::let_and_return)]
614
8
        let expired_blobs: Vec<String> = {
615
8
            let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
616
8
            let names: Vec<String> = stmt
617
9
                .query_map([], |row| row.get::<_, String>(0))?
618
8
                .collect::<StdResult<Vec<String>, _>>()?;
619
8
            names
620
        };
621

            
622
8
        let now = now_utc();
623
8
        tx.execute(DROP_OLD_EXTDOCS, [])?;
624

            
625
        // In theory bad system clocks might generate table rows with times far in the future.
626
        // However, for data which is cached here which comes from the network consensus,
627
        // we rely on the fact that no consensus from the future exists, so this can't happen.
628
8
        tx.execute(DROP_OLD_MICRODESCS, [now - expiration.microdescs])?;
629
8
        tx.execute(DROP_OLD_AUTHCERTS, [now - expiration.authcerts])?;
630
8
        tx.execute(DROP_OLD_CONSENSUSES, [now - expiration.consensuses])?;
631
8
        tx.execute(DROP_OLD_ROUTERDESCS, [now - expiration.router_descs])?;
632

            
633
        // Bridge descriptors come from bridges and bridges might send crazy times,
634
        // so we need to discard any that look like they are from the future,
635
        // since otherwise wrong far-future timestamps might live in our DB indefinitely.
636
        #[cfg(feature = "bridge-client")]
637
8
        tx.execute(DROP_OLD_BRIDGEDESCS, [now, now])?;
638

            
639
        // Find all consensus blobs that are no longer referenced,
640
        // and delete their entries from extdocs.
641
8
        let remove_consensus_blobs = {
642
            // TODO: This query can be O(n); but that won't matter for clients.
643
            // For relays, we may want to add an index to speed it up, if we use this code there too.
644
8
            let mut stmt = tx.prepare(FIND_UNREFERENCED_CONSENSUS_EXTDOCS)?;
645
8
            let filenames: Vec<String> = stmt
646
9
                .query_map([], |row| row.get::<_, String>(0))?
647
8
                .collect::<StdResult<Vec<String>, _>>()?;
648
8
            drop(stmt);
649
8
            let mut stmt = tx.prepare(DELETE_EXTDOC_BY_FILENAME)?;
650
8
            for fname in filenames.iter() {
651
2
                stmt.execute([fname])?;
652
            }
653
8
            filenames
654
        };
655

            
656
8
        tx.commit()?;
657
        // Now that the transaction has been committed, these blobs are
658
        // unreferenced in the ExtDocs table, and we can remove them from disk.
659
8
        let mut remove_blob_files: HashSet<_> = expired_blobs.iter().collect();
660
8
        remove_blob_files.extend(remove_consensus_blobs.iter());
661

            
662
8
        for name in remove_blob_files {
663
4
            let fname = self.blob_dir.join(name);
664
4
            if let Ok(fname) = fname {
665
4
                if let Err(e) = std::fs::remove_file(&fname) {
666
                    warn_report!(
667
                        e,
668
                        "Couldn't remove orphaned blob file {}",
669
                        fname.display_lossy()
670
                    );
671
4
                }
672
            }
673
        }
674

            
675
8
        self.remove_unreferenced_blobs(now, expiration)?;
676

            
677
8
        Ok(())
678
8
    }
679

            
680
    // Note: We cannot, and do not, call this function when a transaction already exists.
681
16
    fn latest_consensus(
682
16
        &self,
683
16
        flavor: ConsensusFlavor,
684
16
        pending: Option<bool>,
685
16
    ) -> Result<Option<InputString>> {
686
16
        match self.latest_consensus_internal(flavor, pending)? {
687
10
            Ok(s) => return Ok(Some(s)),
688
6
            Err(AbsentBlob::NothingToRead) => return Ok(None),
689
            Err(AbsentBlob::VanishedFile) => {
690
                // If we get here, the file was vanished.  Clean up the DB and try again.
691
            }
692
        }
693

            
694
        // We use unchecked_transaction() here because this API takes a non-mutable `SqliteStore`.
695
        // `unchecked_transaction()` will give an error if it is used
696
        // when a transaction already exists.
697
        // That's fine: We don't call this function from inside this module,
698
        // when a transaction might exist,
699
        // and we can't call multiple SqliteStore functions at once: it isn't sync.
700
        // Here we enforce that:
701
        static_assertions::assert_not_impl_any!(SqliteStore: Sync);
702

            
703
        // If we decide that this is unacceptable,
704
        // then since sqlite doesn't really support concurrent use of a connection,
705
        // we _could_ change the Store::latest_consensus API take &mut self,
706
        // or we could add a mutex,
707
        // or we could just not use a transaction object.
708
        let tx = self.conn.unchecked_transaction()?;
709
        Self::remove_entries_for_vanished_blobs(&self.blob_dir, &tx)?;
710
        tx.commit()?;
711

            
712
        match self.latest_consensus_internal(flavor, pending)? {
713
            Ok(s) => Ok(Some(s)),
714
            Err(AbsentBlob::NothingToRead) => Ok(None),
715
            Err(AbsentBlob::VanishedFile) => {
716
                warn!("Somehow remove_entries_for_vanished_blobs didn't resolve a VanishedFile");
717
                Ok(None)
718
            }
719
        }
720
16
    }
721

            
722
14
    fn latest_consensus_meta(&self, flavor: ConsensusFlavor) -> Result<Option<ConsensusMeta>> {
723
14
        let mut stmt = self.conn.prepare(FIND_LATEST_CONSENSUS_META)?;
724
14
        let mut rows = stmt.query(params![flavor.name()])?;
725
14
        if let Some(row) = rows.next()? {
726
6
            Ok(Some(cmeta_from_row(row)?))
727
        } else {
728
8
            Ok(None)
729
        }
730
14
    }
731
    #[cfg(test)]
732
4
    fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
733
2
        if let Some((text, _)) =
734
4
            self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
735
        {
736
2
            Ok(text)
737
        } else {
738
2
            Err(Error::CacheCorruption(
739
2
                "couldn't find a consensus we thought we had.",
740
2
            ))
741
        }
742
4
    }
743
16
    fn consensus_by_sha3_digest_of_signed_part(
744
16
        &self,
745
16
        d: &[u8; 32],
746
16
    ) -> Result<Option<(InputString, ConsensusMeta)>> {
747
16
        let digest = hex::encode(d);
748
16
        let mut stmt = self
749
16
            .conn
750
16
            .prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
751
16
        let mut rows = stmt.query(params![digest])?;
752
16
        if let Some(row) = rows.next()? {
753
10
            let meta = cmeta_from_row(row)?;
754
10
            let fname: String = row.get(5)?;
755
10
            if let Ok(text) = self.read_blob(&fname)? {
756
10
                return Ok(Some((text, meta)));
757
            }
758
6
        }
759
6
        Ok(None)
760
16
    }
761
10
    fn store_consensus(
762
10
        &mut self,
763
10
        cmeta: &ConsensusMeta,
764
10
        flavor: ConsensusFlavor,
765
10
        pending: bool,
766
10
        contents: &str,
767
10
    ) -> Result<()> {
768
10
        let lifetime = cmeta.lifetime();
769
10
        let sha3_of_signed = cmeta.sha3_256_of_signed();
770
10
        let sha3_of_whole = cmeta.sha3_256_of_whole();
771
10
        let valid_after: OffsetDateTime = lifetime.valid_after().into();
772
10
        let fresh_until: OffsetDateTime = lifetime.fresh_until().into();
773
10
        let valid_until: OffsetDateTime = lifetime.valid_until().into();
774

            
775
        /// How long to keep a consensus around after it has expired
776
        const CONSENSUS_LIFETIME: time::Duration = time::Duration::days(4);
777

            
778
        // After a few days have passed, a consensus is no good for
779
        // anything at all, not even diffs.
780
10
        let expires = valid_until + CONSENSUS_LIFETIME;
781

            
782
10
        let doctype = format!("con_{}", flavor.name());
783

            
784
10
        let h = self.save_blob_internal(
785
10
            contents.as_bytes(),
786
10
            &doctype,
787
10
            "sha3-256",
788
10
            &sha3_of_whole[..],
789
10
            expires,
790
        )?;
791
10
        h.tx().execute(
792
10
            INSERT_CONSENSUS,
793
10
            params![
794
10
                valid_after,
795
10
                fresh_until,
796
10
                valid_until,
797
10
                flavor.name(),
798
10
                pending,
799
10
                hex::encode(sha3_of_signed),
800
10
                h.digest_string()
801
10
            ],
802
10
        )?;
803
10
        h.commit()?;
804
10
        Ok(())
805
10
    }
806
2
    fn mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
807
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
808
2
        let digest = format!("sha3-256-{}", d);
809

            
810
2
        let tx = self.conn.transaction()?;
811
2
        let n = tx.execute(MARK_CONSENSUS_NON_PENDING, params![digest])?;
812
2
        trace!("Marked {} consensuses usable", n);
813
2
        tx.commit()?;
814

            
815
2
        Ok(())
816
2
    }
817
2
    fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
818
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
819
2
        let digest = format!("sha3-256-{}", d);
820

            
821
        // TODO: We should probably remove the blob as well, but for now
822
        // this is enough.
823
2
        let tx = self.conn.transaction()?;
824
2
        tx.execute(REMOVE_CONSENSUS, params![digest])?;
825
2
        tx.commit()?;
826

            
827
2
        Ok(())
828
2
    }
829

            
830
8
    fn authcerts(&self, certs: &[AuthCertKeyIds]) -> Result<HashMap<AuthCertKeyIds, String>> {
831
8
        let mut result = HashMap::new();
832
        // TODO(nickm): Do I need to get a transaction here for performance?
833
8
        let mut stmt = self.conn.prepare(FIND_AUTHCERT)?;
834

            
835
10
        for ids in certs {
836
10
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
837
10
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
838
10
            if let Some(contents) = stmt
839
13
                .query_row(params![id_digest, sk_digest], |row| row.get::<_, String>(0))
840
10
                .optional()?
841
6
            {
842
6
                result.insert(*ids, contents);
843
6
            }
844
        }
845

            
846
8
        Ok(result)
847
8
    }
848
6
    fn store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()> {
849
6
        let tx = self.conn.transaction()?;
850
6
        let mut stmt = tx.prepare(INSERT_AUTHCERT)?;
851
8
        for (meta, content) in certs {
852
8
            let ids = meta.key_ids();
853
8
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
854
8
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
855
8
            let published: OffsetDateTime = meta.published().into();
856
8
            let expires: OffsetDateTime = meta.expires().into();
857
8
            stmt.execute(params![id_digest, sk_digest, published, expires, content])?;
858
        }
859
6
        stmt.finalize()?;
860
6
        tx.commit()?;
861
6
        Ok(())
862
6
    }
863

            
864
24
    fn microdescs(&self, digests: &[MdDigest]) -> Result<HashMap<MdDigest, String>> {
865
24
        let mut result = HashMap::new();
866
24
        let mut stmt = self.conn.prepare(FIND_MD)?;
867

            
868
        // TODO(nickm): Should I speed this up with a transaction, or
869
        // does it not matter for queries?
870
58
        for md_digest in digests {
871
58
            let h_digest = hex::encode(md_digest);
872
58
            if let Some(contents) = stmt
873
80
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
874
58
                .optional()?
875
44
            {
876
44
                result.insert(*md_digest, contents);
877
44
            }
878
        }
879

            
880
24
        Ok(result)
881
24
    }
882
22
    fn store_microdescs(&mut self, digests: &[(&str, &MdDigest)], when: SystemTime) -> Result<()> {
883
22
        let when: OffsetDateTime = when.into();
884

            
885
22
        let tx = self.conn.transaction()?;
886
22
        let mut stmt = tx.prepare(INSERT_MD)?;
887

            
888
34
        for (content, md_digest) in digests {
889
34
            let h_digest = hex::encode(md_digest);
890
34
            stmt.execute(params![h_digest, when, content])?;
891
        }
892
22
        stmt.finalize()?;
893
22
        tx.commit()?;
894
22
        Ok(())
895
22
    }
896
4
    fn update_microdescs_listed(&mut self, digests: &[MdDigest], when: SystemTime) -> Result<()> {
897
4
        let tx = self.conn.transaction()?;
898
4
        let mut stmt = tx.prepare(UPDATE_MD_LISTED)?;
899
4
        let when: OffsetDateTime = when.into();
900

            
901
4
        for md_digest in digests {
902
4
            let h_digest = hex::encode(md_digest);
903
4
            stmt.execute(params![when, h_digest])?;
904
        }
905

            
906
4
        stmt.finalize()?;
907
4
        tx.commit()?;
908
4
        Ok(())
909
4
    }
910

            
911
    #[cfg(feature = "routerdesc")]
912
6
    fn routerdescs(&self, digests: &[RdDigest]) -> Result<HashMap<RdDigest, String>> {
913
6
        let mut result = HashMap::new();
914
6
        let mut stmt = self.conn.prepare(FIND_RD)?;
915

            
916
        // TODO(nickm): Should I speed this up with a transaction, or
917
        // does it not matter for queries?
918
14
        for rd_digest in digests {
919
14
            let h_digest = hex::encode(rd_digest);
920
14
            if let Some(contents) = stmt
921
18
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
922
14
                .optional()?
923
8
            {
924
8
                result.insert(*rd_digest, contents);
925
8
            }
926
        }
927

            
928
6
        Ok(result)
929
6
    }
930
    #[cfg(feature = "routerdesc")]
931
4
    fn store_routerdescs(&mut self, digests: &[(&str, SystemTime, &RdDigest)]) -> Result<()> {
932
4
        let tx = self.conn.transaction()?;
933
4
        let mut stmt = tx.prepare(INSERT_RD)?;
934

            
935
10
        for (content, when, rd_digest) in digests {
936
10
            let when: OffsetDateTime = (*when).into();
937
10
            let h_digest = hex::encode(rd_digest);
938
10
            stmt.execute(params![h_digest, when, content])?;
939
        }
940
4
        stmt.finalize()?;
941
4
        tx.commit()?;
942
4
        Ok(())
943
4
    }
944

            
945
    #[cfg(feature = "bridge-client")]
946
92
    fn lookup_bridgedesc(&self, bridge: &BridgeConfig) -> Result<Option<CachedBridgeDescriptor>> {
947
92
        let bridge_line = bridge.to_string();
948
92
        Ok(self
949
92
            .conn
950
100
            .query_row(FIND_BRIDGEDESC, params![bridge_line], |row| {
951
16
                let (fetched, document): (OffsetDateTime, _) = row.try_into()?;
952
16
                let fetched = fetched.into();
953
16
                Ok(CachedBridgeDescriptor { fetched, document })
954
16
            })
955
92
            .optional()?)
956
92
    }
957

            
958
    #[cfg(feature = "bridge-client")]
959
24
    fn store_bridgedesc(
960
24
        &mut self,
961
24
        bridge: &BridgeConfig,
962
24
        entry: CachedBridgeDescriptor,
963
24
        until: SystemTime,
964
24
    ) -> Result<()> {
965
24
        if self.is_readonly() {
966
            // Hopefully whoever *does* have the lock will update the cache.
967
            // Otherwise it will contain a stale entry forever
968
            // (which we'll ignore, but waste effort on).
969
            return Ok(());
970
24
        }
971
24
        let bridge_line = bridge.to_string();
972
24
        let row = params![
973
            bridge_line,
974
24
            OffsetDateTime::from(entry.fetched),
975
24
            OffsetDateTime::from(until),
976
            entry.document,
977
        ];
978
24
        self.conn.execute(INSERT_BRIDGEDESC, row)?;
979
24
        Ok(())
980
24
    }
981

            
982
    #[cfg(feature = "bridge-client")]
983
    fn delete_bridgedesc(&mut self, bridge: &BridgeConfig) -> Result<()> {
984
        if self.is_readonly() {
985
            // This is called when we find corrupted or stale cache entries,
986
            // to stop us wasting time on them next time.
987
            // Hopefully whoever *does* have the lock will do this.
988
            return Ok(());
989
        }
990
        let bridge_line = bridge.to_string();
991
        self.conn.execute(DELETE_BRIDGEDESC, params![bridge_line])?;
992
        Ok(())
993
    }
994

            
995
4
    fn update_protocol_recommendations(
996
4
        &mut self,
997
4
        valid_after: SystemTime,
998
4
        protocols: &tor_netdoc::doc::netstatus::ProtoStatuses,
999
4
    ) -> Result<()> {
4
        let json =
4
            serde_json::to_string(&protocols).map_err(into_internal!("Cannot encode protocols"))?;
4
        let params = params![OffsetDateTime::from(valid_after), json];
4
        self.conn.execute(UPDATE_PROTOCOL_STATUS, params)?;
4
        Ok(())
4
    }
405
    fn cached_protocol_recommendations(
405
        &self,
405
    ) -> Result<Option<(SystemTime, tor_netdoc::doc::netstatus::ProtoStatuses)>> {
405
        let opt_row: Option<(OffsetDateTime, String)> = self
405
            .conn
407
            .query_row(FIND_LATEST_PROTOCOL_STATUS, [], |row| {
4
                Ok((row.get(0)?, row.get(1)?))
4
            })
405
            .optional()?;
405
        let (date, json) = match opt_row {
4
            Some(v) => v,
401
            None => return Ok(None),
        };
4
        let date = date.into();
4
        let statuses: tor_netdoc::doc::netstatus::ProtoStatuses =
4
            serde_json::from_str(json.as_str()).map_err(|e| Error::BadJsonInCache(Arc::new(e)))?;
4
        Ok(Some((date, statuses)))
405
    }
}
/// Functionality related to uncommitted blobs.
mod blob_handle {
    use std::path::{Path, PathBuf};
    use crate::Result;
    use rusqlite::Transaction;
    use tor_basic_utils::PathExt as _;
    use tor_error::warn_report;
    /// Handle to a blob that we have saved to disk but
    /// not yet committed to
    /// the database, and the database transaction where we added a reference to it.
    ///
    /// Used to either commit the blob (by calling [`SavedBlobHandle::commit`]),
    /// or roll it back (by dropping the [`SavedBlobHandle`] without committing it.)
    #[must_use]
    pub(super) struct SavedBlobHandle<'a> {
        /// Transaction we're using to add the blob to the ExtDocs table.
        ///
        /// Note that struct fields are dropped in declaration order,
        /// so when we drop an uncommitted SavedBlobHandle,
        /// we roll back the transaction before we delete the file.
        /// (In practice, either order would be fine.)
        tx: Transaction<'a>,
        /// Filename for the file, with respect to the blob directory.
        fname: String,
        /// Declared digest string for this blob. Of the format
        /// "digesttype-hexstr".
        digeststr: String,
        /// An 'unlinker' for the blob file.
        unlinker: Unlinker,
    }
    impl<'a> SavedBlobHandle<'a> {
        /// Construct a SavedBlobHandle from its parts.
34
        pub(super) fn new(
34
            tx: Transaction<'a>,
34
            fname: String,
34
            digeststr: String,
34
            unlinker: Unlinker,
34
        ) -> Self {
34
            Self {
34
                tx,
34
                fname,
34
                digeststr,
34
                unlinker,
34
            }
34
        }
        /// Return a reference to the underlying database transaction.
10
        pub(super) fn tx(&self) -> &Transaction<'a> {
10
            &self.tx
10
        }
        /// Return the digest string of the saved blob.
        /// Other tables use this as a foreign key into ExtDocs.digest
10
        pub(super) fn digest_string(&self) -> &str {
10
            self.digeststr.as_ref()
10
        }
        /// Return the filename of this blob within the blob directory.
        #[allow(unused)] // used for testing.
24
        pub(super) fn fname(&self) -> &str {
24
            self.fname.as_ref()
24
        }
        /// Commit the relevant database transaction.
34
        pub(super) fn commit(self) -> Result<()> {
            // The blob has been written to disk, so it is safe to
            // commit the transaction.
            // If the commit returns an error, self.unlinker will remove the blob.
            // (This could result in a vanished blob if the commit reports an error,
            // but the transaction is still visible in the database.)
34
            self.tx.commit()?;
            // If we reach this point, we don't want to remove the file.
34
            self.unlinker.forget();
34
            Ok(())
34
        }
    }
    /// Handle to a file which we might have to delete.
    ///
    /// When this handle is dropped, the file gets deleted, unless you have
    /// first called [`Unlinker::forget`].
    pub(super) struct Unlinker {
        /// The location of the file to remove, or None if we shouldn't
        /// remove it.
        p: Option<PathBuf>,
    }
    impl Unlinker {
        /// Make a new Unlinker for a given filename.
34
        pub(super) fn new<P: AsRef<Path>>(p: P) -> Self {
34
            Unlinker {
34
                p: Some(p.as_ref().to_path_buf()),
34
            }
34
        }
        /// Forget about this unlinker, so that the corresponding file won't
        /// get dropped.
34
        fn forget(mut self) {
34
            self.p = None;
34
        }
    }
    impl Drop for Unlinker {
34
        fn drop(&mut self) {
34
            if let Some(p) = self.p.take() {
                if let Err(e) = std::fs::remove_file(&p) {
                    warn_report!(
                        e,
                        "Couldn't remove rolled-back blob file {}",
                        p.display_lossy()
                    );
                }
34
            }
34
        }
    }
}
/// Convert a hexadecimal sha3-256 digest from the database into an array.
32
fn digest_from_hex(s: &str) -> Result<[u8; 32]> {
32
    let mut bytes = [0_u8; 32];
32
    hex::decode_to_slice(s, &mut bytes[..]).map_err(Error::BadHexInCache)?;
32
    Ok(bytes)
32
}
/// Convert a hexadecimal sha3-256 "digest string" as used in the
/// digest column from the database into an array.
16
fn digest_from_dstr(s: &str) -> Result<[u8; 32]> {
16
    if let Some(stripped) = s.strip_prefix("sha3-256-") {
16
        digest_from_hex(stripped)
    } else {
        Err(Error::CacheCorruption("Invalid digest in database"))
    }
16
}
/// Create a ConsensusMeta from a `Row` returned by one of
/// `FIND_LATEST_CONSENSUS_META` or `FIND_CONSENSUS_AND_META_BY_DIGEST`.
16
fn cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta> {
16
    let va: OffsetDateTime = row.get(0)?;
16
    let fu: OffsetDateTime = row.get(1)?;
16
    let vu: OffsetDateTime = row.get(2)?;
16
    let d_signed: String = row.get(3)?;
16
    let d_all: String = row.get(4)?;
16
    let lifetime = Lifetime::new(va.into(), fu.into(), vu.into())
16
        .map_err(|_| Error::CacheCorruption("inconsistent lifetime in database"))?;
16
    Ok(ConsensusMeta::new(
16
        lifetime,
16
        digest_from_hex(&d_signed)?,
16
        digest_from_dstr(&d_all)?,
    ))
16
}
/// Return `SystemTime::get()` as an OffsetDateTime in UTC.
24
fn now_utc() -> OffsetDateTime {
24
    SystemTime::get().into()
24
}
/// Set up the tables for the arti cache schema in a sqlite database.
const INSTALL_V0_SCHEMA: &str = "
  -- Helps us version the schema.  The schema here corresponds to a
  -- version number called 'version', and it should be readable by
  -- anybody who is compliant with versions of at least 'readable_by'.
  CREATE TABLE TorSchemaMeta (
     name TEXT NOT NULL PRIMARY KEY,
     version INTEGER NOT NULL,
     readable_by INTEGER NOT NULL
  );
  INSERT INTO TorSchemaMeta (name, version, readable_by) VALUES ( 'TorDirStorage', 0, 0 );
  -- Keeps track of external blobs on disk.
  CREATE TABLE ExtDocs (
    -- Records a digest of the file contents, in the form '<digest_type>-hexstr'
    digest TEXT PRIMARY KEY NOT NULL,
    -- When was this file created?
    created DATE NOT NULL,
    -- After what time will this file definitely be useless?
    expires DATE NOT NULL,
    -- What is the type of this file? Currently supported are 'con_<flavor>'.
    --   (Before tor-dirmgr ~0.28.0, we would erroneously record 'con_flavor' as 'sha3-256';
    --   Nothing depended on this yet, but will be used in the future
    --   as we add more large-document types.)
    type TEXT NOT NULL,
    -- Filename for this file within our blob directory.
    filename TEXT NOT NULL
  );
  -- All the microdescriptors we know about.
  CREATE TABLE Microdescs (
    sha256_digest TEXT PRIMARY KEY NOT NULL,
    last_listed DATE NOT NULL,
    contents BLOB NOT NULL
  );
  -- All the authority certificates we know.
  CREATE TABLE Authcerts (
    id_digest TEXT NOT NULL,
    sk_digest TEXT NOT NULL,
    published DATE NOT NULL,
    expires DATE NOT NULL,
    contents BLOB NOT NULL,
    PRIMARY KEY (id_digest, sk_digest)
  );
  -- All the consensuses we're storing.
  CREATE TABLE Consensuses (
    valid_after DATE NOT NULL,
    fresh_until DATE NOT NULL,
    valid_until DATE NOT NULL,
    flavor TEXT NOT NULL,
    pending BOOLEAN NOT NULL,
    sha3_of_signed_part TEXT NOT NULL,
    digest TEXT NOT NULL,
    FOREIGN KEY (digest) REFERENCES ExtDocs (digest) ON DELETE CASCADE
  );
  CREATE INDEX Consensuses_vu on CONSENSUSES(valid_until);
";
/// Update the database schema, from each version to the next
const UPDATE_SCHEMA: &[&str] = &["
  -- Update the database schema from version 0 to version 1.
  CREATE TABLE RouterDescs (
    sha1_digest TEXT PRIMARY KEY NOT NULL,
    published DATE NOT NULL,
    contents BLOB NOT NULL
  );
","
  -- Update the database schema from version 1 to version 2.
  -- We create this table even if the bridge-client feature is disabled, but then don't touch it at all.
  CREATE TABLE BridgeDescs (
    bridge_line TEXT PRIMARY KEY NOT NULL,
    fetched DATE NOT NULL,
    until DATE NOT NULL,
    contents BLOB NOT NULL
  );
","
 -- Update the database schema from version 2 to version 3.
 -- Table to hold our latest ProtocolStatuses object, to tell us if we're obsolete.
 -- We hold this independently from our consensus,
 -- since we want to read it very early in our startup process,
 -- even if the consensus is expired.
 CREATE TABLE ProtocolStatus (
    -- Enforce that there is only one row in this table.
    -- (This is a bit kludgy, but I am assured that it is a common practice.)
    zero INTEGER PRIMARY KEY NOT NULL,
    -- valid-after date of the consensus from which we got this status
    date DATE NOT NULL,
    -- ProtoStatuses object, encoded as json
    statuses TEXT NOT NULL
 );
"];
/// Update the database schema version tracking, from each version to the next
const UPDATE_SCHEMA_VERSION: &str = "
  UPDATE TorSchemaMeta SET version=? WHERE version<?;
";
/// Version number used for this version of the arti cache schema.
const SCHEMA_VERSION: u32 = UPDATE_SCHEMA.len() as u32;
/// Query: find the latest-expiring microdesc consensus with a given
/// pending status.
const FIND_CONSENSUS_P: &str = "
  SELECT valid_after, valid_until, filename
  FROM Consensuses
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
  WHERE pending = ? AND flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Query: find the latest-expiring microdesc consensus, regardless of
/// pending status.
const FIND_CONSENSUS: &str = "
  SELECT valid_after, valid_until, filename
  FROM Consensuses
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
  WHERE flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Query: Find the valid-after time for the latest-expiring
/// non-pending consensus of a given flavor.
const FIND_LATEST_CONSENSUS_META: &str = "
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, digest
  FROM Consensuses
  WHERE pending = 0 AND flavor = ?
  ORDER BY valid_until DESC
  LIMIT 1;
";
/// Look up a consensus by its digest-of-signed-part string.
const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
  FROM Consensuses
  INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
  WHERE Consensuses.sha3_of_signed_part = ?
  LIMIT 1;
";
/// Query: Update the consensus whose digest field is 'digest' to call it
/// no longer pending.
const MARK_CONSENSUS_NON_PENDING: &str = "
  UPDATE Consensuses
  SET pending = 0
  WHERE digest = ?;
";
/// Query: Remove the consensus with a given digest field.
#[allow(dead_code)]
const REMOVE_CONSENSUS: &str = "
  DELETE FROM Consensuses
  WHERE digest = ?;
";
/// Query: Find the authority certificate with given key digests.
const FIND_AUTHCERT: &str = "
  SELECT contents FROM AuthCerts WHERE id_digest = ? AND sk_digest = ?;
";
/// Query: find the microdescriptor with a given hex-encoded sha256 digest
const FIND_MD: &str = "
  SELECT contents
  FROM Microdescs
  WHERE sha256_digest = ?
";
/// Query: find the router descriptors with a given hex-encoded sha1 digest
#[cfg(feature = "routerdesc")]
const FIND_RD: &str = "
  SELECT contents
  FROM RouterDescs
  WHERE sha1_digest = ?
";
/// Query: find every ExtDocs member that has expired.
const FIND_EXPIRED_EXTDOCS: &str = "
  SELECT filename FROM ExtDocs where expires < datetime('now');
";
/// Query: find whether an ExtDoc is listed.
const COUNT_EXTDOC_BY_PATH: &str = "
  SELECT COUNT(*) FROM ExtDocs WHERE filename = ?;
";
/// Query: Add a new entry to ExtDocs.
const INSERT_EXTDOC: &str = "
  INSERT OR REPLACE INTO ExtDocs ( digest, created, expires, type, filename )
  VALUES ( ?, datetime('now'), ?, ?, ? );
";
/// Query: Add a new consensus.
const INSERT_CONSENSUS: &str = "
  INSERT OR REPLACE INTO Consensuses
    ( valid_after, fresh_until, valid_until, flavor, pending, sha3_of_signed_part, digest )
  VALUES ( ?, ?, ?, ?, ?, ?, ? );
";
/// Query: Add a new AuthCert
const INSERT_AUTHCERT: &str = "
  INSERT OR REPLACE INTO Authcerts
    ( id_digest, sk_digest, published, expires, contents)
  VALUES ( ?, ?, ?, ?, ? );
";
/// Query: Add a new microdescriptor
const INSERT_MD: &str = "
  INSERT OR REPLACE INTO Microdescs ( sha256_digest, last_listed, contents )
  VALUES ( ?, ?, ? );
";
/// Query: Add a new router descriptor
#[allow(unused)]
#[cfg(feature = "routerdesc")]
const INSERT_RD: &str = "
  INSERT OR REPLACE INTO RouterDescs ( sha1_digest, published, contents )
  VALUES ( ?, ?, ? );
";
/// Query: Change the time when a given microdescriptor was last listed.
const UPDATE_MD_LISTED: &str = "
  UPDATE Microdescs
  SET last_listed = max(last_listed, ?)
  WHERE sha256_digest = ?;
";
/// Query: Find a cached bridge descriptor
#[cfg(feature = "bridge-client")]
const FIND_BRIDGEDESC: &str = "SELECT fetched, contents FROM BridgeDescs WHERE bridge_line = ?;";
/// Query: Record a cached bridge descriptor
#[cfg(feature = "bridge-client")]
const INSERT_BRIDGEDESC: &str = "
  INSERT OR REPLACE INTO BridgeDescs ( bridge_line, fetched, until, contents )
  VALUES ( ?, ?, ?, ? );
";
/// Query: Remove a cached bridge descriptor
#[cfg(feature = "bridge-client")]
#[allow(dead_code)]
const DELETE_BRIDGEDESC: &str = "DELETE FROM BridgeDescs WHERE bridge_line = ?;";
/// Query: Find all consensus extdocs that are not referenced in the consensus table.
///
/// Note: use of `sha3-256` is a synonym for `con_%` is a workaround.
const FIND_UNREFERENCED_CONSENSUS_EXTDOCS: &str = "
    SELECT filename FROM ExtDocs WHERE
         (type LIKE 'con_%' OR type = 'sha3-256')
    AND NOT EXISTS
         (SELECT digest FROM Consensuses WHERE Consensuses.digest = ExtDocs.digest);";
/// Query: Discard every expired extdoc.
///
/// External documents aren't exposed through [`Store`].
const DROP_OLD_EXTDOCS: &str = "DELETE FROM ExtDocs WHERE expires < datetime('now');";
/// Query: Discard an extdoc with a given path.
const DELETE_EXTDOC_BY_FILENAME: &str = "DELETE FROM ExtDocs WHERE filename = ?;";
/// Query: List all extdoc filenames.
const FIND_ALL_EXTDOC_FILENAMES: &str = "SELECT filename FROM ExtDocs;";
/// Query: Get the latest protocol status.
const FIND_LATEST_PROTOCOL_STATUS: &str = "SELECT date, statuses FROM ProtocolStatus WHERE zero=0;";
/// Query: Update the latest protocol status.
const UPDATE_PROTOCOL_STATUS: &str = "INSERT OR REPLACE INTO ProtocolStatus VALUES ( 0, ?, ? );";
/// Query: Discard every router descriptor that hasn't been listed for 3
/// months.
// TODO: Choose a more realistic time.
const DROP_OLD_ROUTERDESCS: &str = "DELETE FROM RouterDescs WHERE published < ?;";
/// Query: Discard every microdescriptor that hasn't been listed for 3 months.
// TODO: Choose a more realistic time.
const DROP_OLD_MICRODESCS: &str = "DELETE FROM Microdescs WHERE last_listed < ?;";
/// Query: Discard every expired authority certificate.
const DROP_OLD_AUTHCERTS: &str = "DELETE FROM Authcerts WHERE expires < ?;";
/// Query: Discard every consensus that's been expired for at least
/// two days.
const DROP_OLD_CONSENSUSES: &str = "DELETE FROM Consensuses WHERE valid_until < ?;";
/// Query: Discard every bridge descriptor that is too old, or from the future.  (Both ?=now.)
#[cfg(feature = "bridge-client")]
const DROP_OLD_BRIDGEDESCS: &str = "DELETE FROM BridgeDescs WHERE ? > until OR fetched > ?;";
#[cfg(test)]
pub(crate) mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::storage::EXPIRATION_DEFAULTS;
    use digest::Digest;
    use hex_literal::hex;
    use tempfile::{TempDir, tempdir};
    use time::ext::NumericalDuration;
    use tor_llcrypto::d::Sha3_256;
    pub(crate) fn new_empty() -> Result<(TempDir, SqliteStore)> {
        let tmp_dir = tempdir().unwrap();
        let sql_path = tmp_dir.path().join("db.sql");
        let conn = rusqlite::Connection::open(sql_path)?;
        let blob_path = tmp_dir.path().join("blobs");
        let blob_dir = fs_mistrust::Mistrust::builder()
            .dangerously_trust_everyone()
            .build()
            .unwrap()
            .verifier()
            .make_secure_dir(blob_path)
            .unwrap();
        let store = SqliteStore::from_conn(conn, blob_dir)?;
        Ok((tmp_dir, store))
    }
    #[test]
    fn init() -> Result<()> {
        let tmp_dir = tempdir().unwrap();
        let blob_dir = fs_mistrust::Mistrust::builder()
            .dangerously_trust_everyone()
            .build()
            .unwrap()
            .verifier()
            .secure_dir(&tmp_dir)
            .unwrap();
        let sql_path = tmp_dir.path().join("db.sql");
        // Initial setup: everything should work.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Second setup: shouldn't need to upgrade.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Third setup: shouldn't need to upgrade.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            conn.execute_batch("UPDATE TorSchemaMeta SET version = 9002;")?;
            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
        }
        // Fourth: this says we can't read it, so we'll get an error.
        {
            let conn = rusqlite::Connection::open(&sql_path)?;
            conn.execute_batch("UPDATE TorSchemaMeta SET readable_by = 9001;")?;
            let val = SqliteStore::from_conn(conn, blob_dir);
            assert!(val.is_err());
        }
        Ok(())
    }
    #[test]
    fn bad_blob_fname() -> Result<()> {
        let (_tmp_dir, store) = new_empty()?;
        assert!(store.blob_dir.join("abcd").is_ok());
        assert!(store.blob_dir.join("abcd..").is_ok());
        assert!(store.blob_dir.join("..abcd..").is_ok());
        assert!(store.blob_dir.join(".abcd").is_ok());
        assert!(store.blob_dir.join("..").is_err());
        assert!(store.blob_dir.join("../abcd").is_err());
        assert!(store.blob_dir.join("/abcd").is_err());
        Ok(())
    }
    #[test]
    fn blobs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_week = 1.weeks();
        let fname1 = store.save_blob(
            b"Hello world",
            "greeting",
            "sha1",
            &hex!("7b502c3a1f48c8609ae212cdfb639dee39673f5e"),
            now + one_week,
        )?;
        let fname2 = store.save_blob(
            b"Goodbye, dear friends",
            "greeting",
            "sha1",
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
            now - one_week,
        )?;
        assert_eq!(
            fname1,
            "greeting_sha1-7b502c3a1f48c8609ae212cdfb639dee39673f5e"
        );
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
            b"Hello world"
        );
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname2)?).unwrap()[..],
            b"Goodbye, dear friends"
        );
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 2);
        let blob = store.read_blob(&fname2)?.unwrap();
        assert_eq!(blob.as_str().unwrap(), "Goodbye, dear friends");
        // Now expire: the second file should go away.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
            b"Hello world"
        );
        assert!(std::fs::read(store.blob_dir.join(&fname2)?).is_err());
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 1);
        Ok(())
    }
    #[test]
    fn consensus() -> Result<()> {
        use tor_netdoc::doc::netstatus;
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_hour = 1.hours();
        assert_eq!(
            store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
            None
        );
        let cmeta = ConsensusMeta::new(
            netstatus::Lifetime::new(
                now.into(),
                (now + one_hour).into(),
                SystemTime::from(now + one_hour * 2),
            )
            .unwrap(),
            [0xAB; 32],
            [0xBC; 32],
        );
        store.store_consensus(
            &cmeta,
            ConsensusFlavor::Microdesc,
            true,
            "Pretend this is a consensus",
        )?;
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                None
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store.latest_consensus(ConsensusFlavor::Microdesc, Some(false))?;
            assert!(consensus.is_none());
        }
        store.mark_consensus_usable(&cmeta)?;
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                now.into()
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, Some(false))?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
        }
        {
            let consensus_text = store.consensus_by_meta(&cmeta)?;
            assert_eq!(consensus_text.as_str()?, "Pretend this is a consensus");
            let (is, _cmeta2) = store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .unwrap();
            assert_eq!(is.as_str()?, "Pretend this is a consensus");
            let cmeta3 = ConsensusMeta::new(
                netstatus::Lifetime::new(
                    now.into(),
                    (now + one_hour).into(),
                    SystemTime::from(now + one_hour * 2),
                )
                .unwrap(),
                [0x99; 32],
                [0x99; 32],
            );
            assert!(store.consensus_by_meta(&cmeta3).is_err());
            assert!(
                store
                    .consensus_by_sha3_digest_of_signed_part(&[0x99; 32])?
                    .is_none()
            );
        }
        {
            assert!(
                store
                    .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                    .is_some()
            );
            store.delete_consensus(&cmeta)?;
            assert!(
                store
                    .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                    .is_none()
            );
        }
        Ok(())
    }
    #[test]
    fn authcerts() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_hour = 1.hours();
        let keyids = AuthCertKeyIds {
            id_fingerprint: [3; 20].into(),
            sk_fingerprint: [4; 20].into(),
        };
        let keyids2 = AuthCertKeyIds {
            id_fingerprint: [4; 20].into(),
            sk_fingerprint: [3; 20].into(),
        };
        let m1 = AuthCertMeta::new(keyids, now.into(), SystemTime::from(now + one_hour * 24));
        store.store_authcerts(&[(m1, "Pretend this is a cert")])?;
        let certs = store.authcerts(&[keyids, keyids2])?;
        assert_eq!(certs.len(), 1);
        assert_eq!(certs.get(&keyids).unwrap(), "Pretend this is a cert");
        Ok(())
    }
    #[test]
    fn microdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_day = 1.days();
        let d1 = [5_u8; 32];
        let d2 = [7; 32];
        let d3 = [42; 32];
        let d4 = [99; 32];
        let long_ago: OffsetDateTime = now - one_day * 100;
        store.store_microdescs(
            &[
                ("Fake micro 1", &d1),
                ("Fake micro 2", &d2),
                ("Fake micro 3", &d3),
            ],
            long_ago.into(),
        )?;
        store.update_microdescs_listed(&[d2], now.into())?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 2);
        assert_eq!(mds.get(&d1), None);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        assert_eq!(mds.get(&d3).unwrap(), "Fake micro 3");
        assert_eq!(mds.get(&d4), None);
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 1);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        Ok(())
    }
    #[test]
    #[cfg(feature = "routerdesc")]
    fn routerdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_day = 1.days();
        let long_ago: OffsetDateTime = now - one_day * 100;
        let recently = now - one_day;
        let d1 = [5_u8; 20];
        let d2 = [7; 20];
        let d3 = [42; 20];
        let d4 = [99; 20];
        store.store_routerdescs(&[
            ("Fake routerdesc 1", long_ago.into(), &d1),
            ("Fake routerdesc 2", recently.into(), &d2),
            ("Fake routerdesc 3", long_ago.into(), &d3),
        ])?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 2);
        assert_eq!(rds.get(&d1), None);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        assert_eq!(rds.get(&d3).unwrap(), "Fake routerdesc 3");
        assert_eq!(rds.get(&d4), None);
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 1);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        Ok(())
    }
    #[test]
    fn from_path_rw() -> Result<()> {
        let tmp = tempdir().unwrap();
        let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
        // Nothing there: can't open read-only
        let r = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true);
        assert!(r.is_err());
        assert!(!tmp.path().join("dir_blobs").try_exists().unwrap());
        // Opening it read-write will crate the files
        {
            let mut store = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, false)?;
            assert!(tmp.path().join("dir_blobs").is_dir());
            assert!(matches!(&store.lockfile, LockFile::Locked(_)));
            assert!(!store.is_readonly());
            assert!(store.upgrade_to_readwrite()?); // no-op.
        }
        // At this point, we can successfully make a read-only connection.
        {
            let mut store2 = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true)?;
            assert!(store2.is_readonly());
            // Nobody else is locking this, so we can upgrade.
            assert!(store2.upgrade_to_readwrite()?); // no-op.
            assert!(!store2.is_readonly());
        }
        Ok(())
    }
    #[test]
    fn orphaned_blobs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        /*
        for ent in store.blob_dir.read_directory(".")?.flatten() {
            println!("{:?}", ent);
        }
        */
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
        let now = now_utc();
        let one_week = 1.weeks();
        let _fname_good = store.save_blob(
            b"Goodbye, dear friends",
            "greeting",
            "sha1",
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
            now + one_week,
        )?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
        // Now, create a two orphaned blobs: one with a recent timestamp, and one with an older
        // timestamp.
        store
            .blob_dir
            .write_and_replace("fairly_new", b"new contents will stay")?;
        store
            .blob_dir
            .write_and_replace("fairly_old", b"old contents will be removed")?;
        filetime::set_file_mtime(
            store.blob_dir.join("fairly_old")?,
            SystemTime::from(now - one_week).into(),
        )
        .expect("Can't adjust mtime");
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 3);
        store.remove_unreferenced_blobs(now, &EXPIRATION_DEFAULTS)?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 2);
        Ok(())
    }
    #[test]
    fn unreferenced_consensus_blob() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_week = 1.weeks();
        // Make a blob that claims to be a consensus, and which has not yet expired, but which is
        // not listed in the consensus table.  It should get removed.
        let fname = store.save_blob(
            b"pretend this is a consensus",
            "con_fake",
            "sha1",
            &hex!("803e5a45eea7766a62a735e051a25a50ffb9b1cf"),
            now + one_week,
        )?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
        assert_eq!(
            &std::fs::read(store.blob_dir.join(&fname)?).unwrap()[..],
            b"pretend this is a consensus"
        );
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 1);
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
        let n: u32 = store
            .conn
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
        assert_eq!(n, 0);
        Ok(())
    }
    #[test]
    fn vanished_blob_cleanup() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = now_utc();
        let one_week = 1.weeks();
        // Make a few blobs.
        let mut fnames = vec![];
        for idx in 0..8 {
            let content = format!("Example {idx}");
            let digest = Sha3_256::digest(content.as_bytes());
            let fname = store.save_blob(
                content.as_bytes(),
                "blob",
                "sha3-256",
                digest.as_slice(),
                now + one_week,
            )?;
            fnames.push(fname);
        }
        // Delete the odd-numbered blobs.
        store.blob_dir.remove_file(&fnames[1])?;
        store.blob_dir.remove_file(&fnames[3])?;
        store.blob_dir.remove_file(&fnames[5])?;
        store.blob_dir.remove_file(&fnames[7])?;
        let n_removed = {
            let tx = store.conn.transaction()?;
            let n = SqliteStore::remove_entries_for_vanished_blobs(&store.blob_dir, &tx)?;
            tx.commit()?;
            n
        };
        assert_eq!(n_removed, 4);
        // Make sure that it was the _odd-numbered_ ones that got deleted from the DB.
        let (n_1,): (u32,) =
            store
                .conn
                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[1]], |row| {
                    row.try_into()
                })?;
        let (n_2,): (u32,) =
            store
                .conn
                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[2]], |row| {
                    row.try_into()
                })?;
        assert_eq!(n_1, 0);
        assert_eq!(n_2, 1);
        Ok(())
    }
    #[test]
    fn protocol_statuses() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = SystemTime::get();
        let hour = 1.hours();
        let valid_after = now;
        let protocols = serde_json::from_str(
            r#"{
            "client":{
                "required":"Link=5 LinkAuth=3",
                "recommended":"Link=1-5 LinkAuth=2-5"
            },
            "relay":{
                "required":"Wombat=20-22 Knish=25-27",
                "recommended":"Wombat=20-30 Knish=20-30"
            }
            }"#,
        )
        .unwrap();
        let v = store.cached_protocol_recommendations()?;
        assert!(v.is_none());
        store.update_protocol_recommendations(valid_after, &protocols)?;
        let v = store.cached_protocol_recommendations()?.unwrap();
        assert_eq!(v.0, now);
        assert_eq!(
            serde_json::to_string(&protocols).unwrap(),
            serde_json::to_string(&v.1).unwrap()
        );
        let protocols2 = serde_json::from_str(
            r#"{
            "client":{
                "required":"Link=5 ",
                "recommended":"Link=1-5"
            },
            "relay":{
                "required":"Wombat=20",
                "recommended":"Cons=6"
            }
            }"#,
        )
        .unwrap();
        let valid_after_2 = now + hour;
        store.update_protocol_recommendations(valid_after_2, &protocols2)?;
        let v = store.cached_protocol_recommendations()?.unwrap();
        assert_eq!(v.0, now + hour);
        assert_eq!(
            serde_json::to_string(&protocols2).unwrap(),
            serde_json::to_string(&v.1).unwrap()
        );
        Ok(())
    }
}