1
//! Module for providing cache implementations for the [`http`] module.
2
//!
3
//! Each cache is outlined comprehensively in its own section.
4

            
5
use std::sync::{Arc, Mutex, MutexGuard, Weak};
6

            
7
use rusqlite::{params, Transaction};
8
use weak_table::WeakValueHashMap;
9

            
10
use crate::{
11
    database::{sql, DocumentId},
12
    err::DatabaseError,
13
};
14

            
15
/// Alias to force use of RandomState, regardless of features enabled in `weak_tables`.
16
///
17
/// See <https://github.com/tov/weak-table-rs/issues/23> for discussion.
18
type WeakValueHash<K, V> = weak_table::WeakValueHashMap<K, V, std::hash::RandomState>;
19

            
20
/// Representation of the store cache.
21
///
22
/// The cache serves the purpose to not store the same document multiple times
23
/// in memory, when multiple clients request it simultaneously.
24
///
25
/// It *DOES NOT* serve the purpose to reduce the amount of read system calls.
26
/// We believe that SQLite and the operating system itself do a good job at
27
/// buffering reads for us here.
28
///
29
/// The cache itself is wrapped in a [`Mutex`] to ensure secure access across
30
/// thread boundaries.  Keep in mind, that it is a **synchronous** mutex.
31
///
32
/// All hash lookups in the `store` table should be performed through this
33
/// interface, because it will automatically select them from the database in
34
/// case they are missing.
35
#[derive(Debug)]
36
pub(super) struct StoreCache {
37
    /// The actual data of the cache.
38
    ///
39
    /// We use a [`Mutex`] instead of an [`RwLock`](std::sync::RwLock), because
40
    /// we want to assure that a concurrent cache miss does not lead into two
41
    /// simultaneous database reads and copies into memory.
42
    data: Mutex<WeakValueHashMap<DocumentId, Weak<[u8]>>>,
43
}
44

            
45
impl StoreCache {
46
    /// Creates a new empty [`StoreCache`].
47
4
    pub(super) fn new() -> Self {
48
4
        Self {
49
4
            data: Mutex::new(WeakValueHashMap::new()),
50
4
        }
51
4
    }
52

            
53
    /// Removes all mappings whose values have expired.
54
    ///
55
    /// Takes O(n) time.
56
6
    pub(super) fn gc(&self) {
57
6
        self.lock().remove_expired();
58
6
    }
59

            
60
    /// Looks up a [`DocumentId`] in the cache or the database.
61
    ///
62
    /// If we got a cache miss, this function automatically queries the database
63
    /// and inserts the result into the cache, before returning it.
64
    ///
65
    /// We do not keep a lock throughout the entire method.  This risks storing
66
    /// the same document in memory for a very short amount of time, based upon
67
    /// the number of worker threads we are using.  However, this is fine,
68
    /// given that reading from the store table is a large performance bottleneck.
69
    /// Also, the number of simultaneous copies that might be risked by that is
70
    /// limited to the amount of worker threads, which is usually very low
71
    /// compared to the number of async tasks, which might be in the thousands.
72
8
    pub(super) fn get(
73
8
        &self,
74
8
        tx: &Transaction,
75
8
        docid: DocumentId,
76
8
    ) -> Result<Arc<[u8]>, DatabaseError> {
77
        // Query the cache for the relevant document.
78
8
        if let Some(document) = self.lock().get(&docid) {
79
2
            return Ok(document);
80
6
        }
81

            
82
        // Cache miss, let us query the database.
83
6
        let document = Self::get_db(tx, docid)?;
84

            
85
        // Insert it into the cache.
86
        //
87
        // We obtain the lock and check again if it has been added in the
88
        // meantime.  The idea is to only return one copy of it, not two
89
        // simultaneous ones.
90
6
        Ok(self.lock().entry(docid).or_insert(document))
91
8
    }
92

            
93
    /// Obtains a [`DocumentId`] from the database without consulting the cache first.
94
6
    fn get_db(tx: &Transaction, docid: DocumentId) -> Result<Arc<[u8]>, DatabaseError> {
95
6
        let mut stmt = tx.prepare_cached(sql!("SELECT content FROM store WHERE docid = ?1"))?;
96
9
        let document: Vec<u8> = stmt.query_one(params![docid], |row| row.get(0))?;
97
6
        Ok(Arc::from(document))
98
6
    }
99

            
100
    /// Obtains a lock of the [`WeakValueHashMap`] even if the lock is poisoned.
101
    ///
102
    /// Ignoring the [`PoisonError`](std::sync::PoisonError) is fine here because
103
    /// the only "danger" in the general case of this might be, that certain
104
    /// invariants no longer hold true.  However, unless the poison has been
105
    /// triggered by the implementation of [`WeakValueHashMap`], in which case we
106
    /// would have much larger problems, doing this is fine.
107
    ///
108
    /// TODO DIRMIRROR: Consider cleaning the poison, although that would probably
109
    /// involve much more complexity and wrapping around the parent type.
110
20
    fn lock(&self) -> MutexGuard<'_, WeakValueHashMap<DocumentId, Weak<[u8]>>> {
111
20
        self.data.lock().unwrap_or_else(|e| e.into_inner())
112
20
    }
113
}
114

            
115
// The tests are all performed in the parent module, due to existing test
116
// vectors there.
117

            
118
#[cfg(test)]
119
mod test {
120
    // @@ begin test lint list maintained by maint/add_warning @@
121
    #![allow(clippy::bool_assert_comparison)]
122
    #![allow(clippy::clone_on_copy)]
123
    #![allow(clippy::dbg_macro)]
124
    #![allow(clippy::mixed_attributes_style)]
125
    #![allow(clippy::print_stderr)]
126
    #![allow(clippy::print_stdout)]
127
    #![allow(clippy::single_char_pattern)]
128
    #![allow(clippy::unwrap_used)]
129
    #![allow(clippy::unchecked_time_subtraction)]
130
    #![allow(clippy::useless_vec)]
131
    #![allow(clippy::needless_pass_by_value)]
132
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
133
    use crate::database;
134

            
135
    use super::super::test::*;
136
    use super::*;
137

            
138
    #[test]
139
    fn store_cache() {
140
        let pool = create_test_db_pool();
141
        let cache = StoreCache::new();
142

            
143
        database::read_tx(&pool, |tx| {
144
            // Obtain the lipsum entry.
145
            let entry = cache.get(tx, *IDENTITY_DOCID).unwrap();
146
            assert_eq!(entry.as_ref(), IDENTITY.as_bytes());
147
            assert_eq!(Arc::strong_count(&entry), 1);
148

            
149
            // Obtain the lipsum entry again but ensure it is not copied in memory.
150
            let entry2 = cache.get(tx, *IDENTITY_DOCID).unwrap();
151
            assert_eq!(Arc::strong_count(&entry), 2);
152
            assert_eq!(Arc::as_ptr(&entry), Arc::as_ptr(&entry2));
153
            assert_eq!(entry, entry2);
154

            
155
            // Perform a garbage collection and ensure that entry is not removed.
156
            assert!(cache.data.lock().unwrap().contains_key(&IDENTITY_DOCID));
157
            cache.gc();
158
            assert!(cache.data.lock().unwrap().contains_key(&IDENTITY_DOCID));
159

            
160
            // Now drop entry and entry2 and perform the gc again.
161
            let weak_entry = Arc::downgrade(&entry);
162
            assert_eq!(weak_entry.strong_count(), 2);
163
            drop(entry);
164
            drop(entry2);
165
            assert_eq!(weak_entry.strong_count(), 0);
166

            
167
            // The strong count zero should already make it impossible to access the element ...
168
            assert!(!cache.data.lock().unwrap().contains_key(&IDENTITY_DOCID));
169
            // ... but it should not reduce the total size of the hash map ...
170
            assert_eq!(cache.data.lock().unwrap().len(), 1);
171
            cache.gc();
172
            // ... however, the garbage collection should actually do.
173
            assert_eq!(cache.data.lock().unwrap().len(), 0);
174
        })
175
        .unwrap();
176
    }
177
}