1
//! `TotalQtyNotifier`
2
//!
3
//! This newtype assures that we wake up the reclamation task when nceessary
4

            
5
use super::*;
6

            
7
/// Wrapper for `TotalQty`
8
#[derive(Deref, Debug)]
9
pub(super) struct TotalQtyNotifier {
10
    /// Total memory usage
11
    ///
12
    /// Invariant: equal to
13
    /// ```text
14
    ///    Σ        Σ         PRecord.used
15
    ///     ARecord  PRecord
16
    /// ```
17
    #[deref]
18
    total_used: TotalQty,
19

            
20
    /// Condvar to wake up the reclamation task
21
    ///
22
    /// The reclamation task has another clone of this
23
    reclamation_task_wakeup: mpsc::Sender<()>,
24
}
25

            
26
impl TotalQtyNotifier {
27
    /// Make a new `TotalQtyNotifier`, which will notify a specified condvar
28
671
    pub(super) fn new_zero(reclamation_task_wakeup: mpsc::Sender<()>) -> Self {
29
671
        TotalQtyNotifier {
30
671
            total_used: TotalQty::ZERO,
31
671
            reclamation_task_wakeup,
32
671
        }
33
671
    }
34

            
35
    /// Record that some memory has been (or will be) allocated by a participant
36
    ///
37
    /// Signals the wakeup task if we need to.
38
23656
    pub(super) fn claim(
39
23656
        &mut self,
40
23656
        precord: &mut PRecord,
41
23656
        want: Qty,
42
23656
        config: &ConfigInner,
43
23656
    ) -> crate::Result<ClaimedQty> {
44
23656
        let got = self
45
23656
            .total_used
46
23656
            .claim(&mut precord.used, want)
47
23656
            .ok_or_else(|| internal!("integer overflow attempting to add claim {}", want))?;
48
23656
        self.maybe_wakeup(config);
49
23656
        Ok(got)
50
23656
    }
51

            
52
    /// Check to see if we need to wake up the reclamation task, and if so, do so
53
23770
    pub(super) fn maybe_wakeup(&mut self, config: &ConfigInner) {
54
23770
        if self.total_used > config.max {
55
32
            match self.reclamation_task_wakeup.try_send(()) {
56
32
                Ok(()) => {}
57
                Err(e) if e.is_full() => {}
58
                // reactor shutting down, having dropped reclamation task?
59
                Err(e) => debug!("could not notify reclamation task: {e}"),
60
            };
61
23738
        }
62
23770
    }
63

            
64
    /// Declare this poisoned, and prevent further claims
65
    pub(super) fn set_poisoned(&mut self) {
66
        self.total_used.set_poisoned();
67
    }
68

            
69
    /// Record that some memory has been (or will be) freed by a participant
70
19924
    pub(super) fn release(&mut self, precord: &mut PRecord, have: ClaimedQty) // infallible
71
    {
72
        // TODO if the participant's usage underflows, tell it to reclaim
73
        // (and log some kind of internal error)
74
19924
        self.total_used.release(&mut precord.used, have);
75
19924
    }
76
}