1
//! Module for providing cache implementations for the [`http`] module.
2
//!
3
//! Each cache is outlined comprehensively in its own section.
4

            
5
use std::sync::{Arc, Mutex, MutexGuard, Weak};
6

            
7
use rusqlite::{params, Transaction};
8
use weak_table::WeakValueHashMap;
9

            
10
use crate::{
11
    database::{sql, DocumentId},
12
    err::DatabaseError,
13
};
14

            
15
/// Representation of the store cache.
16
///
17
/// The cache serves the purpose to not store the same document multiple times
18
/// in memory, when multiple clients request it simultanously.
19
///
20
/// It *DOES NOT* serve the purpose to reduce the amount of read system calls.
21
/// We believe that SQLite and the operating system itself do a good job at
22
/// buffering reads for us here.
23
///
24
/// The cache itself is wrapped in a [`Mutex`] to ensure secure access across
25
/// thread boundaries.  Keep in mind, that it is a **synchronous** mutex.
26
///
27
/// All hash lookups in the `store` table should be performed through this
28
/// interface, because it will automatically select them from the database in
29
/// case they are missing.
30
#[derive(Debug)]
31
pub(super) struct StoreCache {
32
    /// The actual data of the cache.
33
    ///
34
    /// We use a [`Mutex`] instead of an [`RwLock`](std::sync::RwLock), because
35
    /// we want to assure that a concurrent cache miss does not lead into two
36
    /// simultanous database reads and copies into memory.
37
    data: Mutex<WeakValueHashMap<DocumentId, Weak<[u8]>>>,
38
}
39

            
40
impl StoreCache {
41
    /// Creates a new empty [`StoreCache`].
42
4
    pub(super) fn new() -> Self {
43
4
        Self {
44
4
            data: Mutex::new(WeakValueHashMap::new()),
45
4
        }
46
4
    }
47

            
48
    /// Removes all mappings whose values have expired.
49
    ///
50
    /// Takes O(n) time.
51
6
    pub(super) fn gc(&self) {
52
6
        self.lock().remove_expired();
53
6
    }
54

            
55
    /// Looks up a [`DocumentId`] in the cache or the database.
56
    ///
57
    /// If we got a cache miss, this function automatically queries the database
58
    /// and inserts the result into the cache, before returning it.
59
    ///
60
    /// We do not keep a lock throughout the entire method.  This risks storing
61
    /// the same document in memory for a very short amount of time, based upon
62
    /// the number of worker threads we are using.  However, this is fine,
63
    /// given that reading from the store table is a large performance bottleneck.
64
    /// Also, the number of simultanous copies that might be risked by that is
65
    /// limited to the amount of worker threads, which is usually very low
66
    /// compared to the number of async tasks, which might be in the thousands.
67
8
    pub(super) fn get(
68
8
        &self,
69
8
        tx: &Transaction,
70
8
        docid: DocumentId,
71
8
    ) -> Result<Arc<[u8]>, DatabaseError> {
72
        // Query the cache for the relevant document.
73
8
        if let Some(document) = self.lock().get(&docid) {
74
2
            return Ok(document);
75
6
        }
76

            
77
        // Cache miss, let us query the database.
78
6
        let document = Self::get_db(tx, docid)?;
79

            
80
        // Insert it into the cache.
81
        //
82
        // We obtain the lock and check again if it has been added in the
83
        // meantime.  The idea is to only return one copy of it, not two
84
        // simultanous ones.
85
6
        Ok(self.lock().entry(docid).or_insert(document))
86
8
    }
87

            
88
    /// Obtains a [`DocumentId`] from the database without consulting the cache first.
89
6
    fn get_db(tx: &Transaction, docid: DocumentId) -> Result<Arc<[u8]>, DatabaseError> {
90
6
        let mut stmt = tx.prepare_cached(sql!("SELECT content FROM store WHERE docid = ?1"))?;
91
9
        let document: Vec<u8> = stmt.query_one(params![docid], |row| row.get(0))?;
92
6
        Ok(Arc::from(document))
93
6
    }
94

            
95
    /// Obtains a lock of the [`WeakValueHashMap`] even if the lock is poisoned.
96
    ///
97
    /// Ignoring the [`PoisonError`](std::sync::PoisonError) is fine here because
98
    /// the only "danger" in the general case of this might be, that certain
99
    /// invariants no longer hold true.  However, unless the poison has been
100
    /// triggered by the implementation of [`WeakValueHashMap`], in which case we
101
    /// would have much larger problems, doing this is fine.
102
    ///
103
    /// TODO DIRMIRROR: Consider cleaning the poison, although that would probably
104
    /// involve much more complexity and wrapping around the parent type.
105
20
    fn lock(&self) -> MutexGuard<'_, WeakValueHashMap<DocumentId, Weak<[u8]>>> {
106
20
        self.data.lock().unwrap_or_else(|e| e.into_inner())
107
20
    }
108
}
109

            
110
// The tests are all performed in the parent module, due to existing test
111
// vectors there.
112

            
113
#[cfg(test)]
114
mod test {
115
    // @@ begin test lint list maintained by maint/add_warning @@
116
    #![allow(clippy::bool_assert_comparison)]
117
    #![allow(clippy::clone_on_copy)]
118
    #![allow(clippy::dbg_macro)]
119
    #![allow(clippy::mixed_attributes_style)]
120
    #![allow(clippy::print_stderr)]
121
    #![allow(clippy::print_stdout)]
122
    #![allow(clippy::single_char_pattern)]
123
    #![allow(clippy::unwrap_used)]
124
    #![allow(clippy::unchecked_time_subtraction)]
125
    #![allow(clippy::useless_vec)]
126
    #![allow(clippy::needless_pass_by_value)]
127
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
128
    use crate::database;
129

            
130
    use super::super::test::*;
131
    use super::*;
132

            
133
    #[test]
134
    fn store_cache() {
135
        let pool = create_test_db_pool();
136
        let cache = StoreCache::new();
137

            
138
        database::read_tx(&pool, |tx| {
139
            // Obtain the lipsum entry.
140
            let entry = cache.get(tx, *IDENTITY_DOCID).unwrap();
141
            assert_eq!(entry.as_ref(), IDENTITY.as_bytes());
142
            assert_eq!(Arc::strong_count(&entry), 1);
143

            
144
            // Obtain the lipsum entry again but ensure it is not copied in memory.
145
            let entry2 = cache.get(tx, *IDENTITY_DOCID).unwrap();
146
            assert_eq!(Arc::strong_count(&entry), 2);
147
            assert_eq!(Arc::as_ptr(&entry), Arc::as_ptr(&entry2));
148
            assert_eq!(entry, entry2);
149

            
150
            // Perform a garbage collection and ensure that entry is not removed.
151
            assert!(cache.data.lock().unwrap().contains_key(&IDENTITY_DOCID));
152
            cache.gc();
153
            assert!(cache.data.lock().unwrap().contains_key(&IDENTITY_DOCID));
154

            
155
            // Now drop entry and entry2 and perform the gc again.
156
            let weak_entry = Arc::downgrade(&entry);
157
            assert_eq!(weak_entry.strong_count(), 2);
158
            drop(entry);
159
            drop(entry2);
160
            assert_eq!(weak_entry.strong_count(), 0);
161

            
162
            // The strong count zero should already make it impossible to access the element ...
163
            assert!(!cache.data.lock().unwrap().contains_key(&IDENTITY_DOCID));
164
            // ... but it should not reduce the total size of the hash map ...
165
            assert_eq!(cache.data.lock().unwrap().len(), 1);
166
            cache.gc();
167
            // ... however, the garbage collection should actually do.
168
            assert_eq!(cache.data.lock().unwrap().len(), 0);
169
        })
170
        .unwrap();
171
    }
172
}