1
//! Extension trait for more efficient use of [`postage::watch`].
2
use std::ops::{Deref, DerefMut};
3
use void::{ResultVoidExt as _, Void};
4

            
5
/// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
6
///
7
/// Ideally these, or something like them, would be upstream:
8
/// See <https://github.com/austinjones/postage-rs/issues/56>.
9
///
10
/// We provide this as an extension trait became the implementation is a bit fiddly.
11
/// This lets us concentrate on the actual logic, when we use it.
12
pub trait PostageWatchSenderExt<T> {
13
    /// Update, by calling a fallible function, sending only if necessary
14
    ///
15
    /// Calls `update` on the current value in the watch, to obtain a new value.
16
    /// If the new value doesn't compare equal, updates the watch, notifying receivers.
17
    fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
18
    where
19
        T: PartialEq,
20
        F: FnOnce(&T) -> Result<T, E>;
21

            
22
    /// Update, by calling a function, sending only if necessary
23
    ///
24
    /// Calls `update` on the current value in the watch, to obtain a new value.
25
    /// If the new value doesn't compare equal, updates the watch, notifying receivers.
26
194
    fn maybe_send<F>(&mut self, update: F)
27
194
    where
28
194
        T: PartialEq,
29
194
        F: FnOnce(&T) -> T,
30
    {
31
291
        self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
32
194
            .void_unwrap();
33
194
    }
34
}
35

            
36
impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
37
200
    fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
38
200
    where
39
200
        T: PartialEq,
40
200
        F: FnOnce(&T) -> Result<T, E>,
41
    {
42
200
        let lock = self.borrow();
43
200
        let new = update(&*lock)?;
44
198
        if new != *lock {
45
158
            // We must drop the lock guard, because otherwise borrow_mut will deadlock.
46
158
            // There is no race, because we hold &mut self, so no-one else can get a look in.
47
158
            // (postage::watch::Sender is not one of those facilities which is mereely a
48
158
            // handle, and Clone.)
49
158
            drop(lock);
50
158
            *self.borrow_mut() = new;
51
162
        }
52
198
        Ok(())
53
200
    }
54
}
55

            
56
#[derive(Debug)]
57
/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped
58
///
59
/// Derefs to the inner `Sender`.
60
///
61
/// Ideally this would be behaviour promised by upstream, or something
62
/// See <https://github.com/austinjones/postage-rs/issues/57>.
63
pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
64

            
65
/// Values that can signal EOF
66
///
67
/// Implemented for `Option`, which is usually what you want to use.
68
pub trait DropNotifyEofSignallable {
69
    /// Generate the EOF value
70
    fn eof() -> Self;
71

            
72
    /// Does this value indicate EOF?
73
    ///
74
    /// ### Deprecated
75
    ///
76
    /// This method is deprecated.
77
    /// It should not be called, or defined, in new programs.
78
    /// It is not required by [`DropNotifyWatchSender`].
79
    /// The provided implementation always returns `false`.
80
    #[deprecated]
81
    fn is_eof(&self) -> bool {
82
        false
83
    }
84
}
85

            
86
impl<T> DropNotifyEofSignallable for Option<T> {
87
22
    fn eof() -> Self {
88
22
        None
89
22
    }
90

            
91
    fn is_eof(&self) -> bool {
92
        self.is_none()
93
    }
94
}
95

            
96
impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
97
    /// Arrange to send `T::Default` when `inner` is dropped
98
30
    pub fn new(inner: postage::watch::Sender<T>) -> Self {
99
30
        DropNotifyWatchSender(Some(inner))
100
30
    }
101

            
102
    /// Unwrap the inner sender, defusing the drop notification
103
2
    pub fn into_inner(mut self) -> postage::watch::Sender<T> {
104
2
        self.0.take().expect("inner was None")
105
2
    }
106
}
107

            
108
impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
109
    type Target = postage::watch::Sender<T>;
110
    fn deref(&self) -> &Self::Target {
111
        self.0.as_ref().expect("inner was None")
112
    }
113
}
114

            
115
impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
116
8
    fn deref_mut(&mut self) -> &mut Self::Target {
117
8
        self.0.as_mut().expect("inner was None")
118
8
    }
119
}
120

            
121
impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
122
30
    fn drop(&mut self) {
123
30
        if let Some(mut inner) = self.0.take() {
124
28
            // None means into_inner() was called
125
28
            *inner.borrow_mut() = DropNotifyEofSignallable::eof();
126
28
        }
127
30
    }
128
}
129

            
130
#[cfg(test)]
131
mod test {
132
    // @@ begin test lint list maintained by maint/add_warning @@
133
    #![allow(clippy::bool_assert_comparison)]
134
    #![allow(clippy::clone_on_copy)]
135
    #![allow(clippy::dbg_macro)]
136
    #![allow(clippy::mixed_attributes_style)]
137
    #![allow(clippy::print_stderr)]
138
    #![allow(clippy::print_stdout)]
139
    #![allow(clippy::single_char_pattern)]
140
    #![allow(clippy::unwrap_used)]
141
    #![allow(clippy::unchecked_time_subtraction)]
142
    #![allow(clippy::useless_vec)]
143
    #![allow(clippy::needless_pass_by_value)]
144
    #![allow(clippy::string_slice)] // See arti#2571
145
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
146

            
147
    use super::*;
148
    use futures::select_biased;
149
    use futures_await_test::async_test;
150

            
151
    #[async_test]
152
    async fn postage_sender_ext() {
153
        use futures::FutureExt;
154
        use futures::stream::StreamExt;
155

            
156
        let (mut s, mut r) = postage::watch::channel_with(20);
157
        // Receiver of a fresh watch wakes once, but let's not rely on this
158
        select_biased! {
159
            i = r.next().fuse() => assert_eq!(i, Some(20)),
160
            _ = futures::future::ready(()) => { }, // tolerate nothing
161
        };
162
        // Now, not ready
163
        select_biased! {
164
            _ = r.next().fuse() => panic!(),
165
            _ = futures::future::ready(()) => { },
166
        };
167

            
168
        s.maybe_send(|i| *i);
169
        // Still not ready
170
        select_biased! {
171
            _ = r.next().fuse() => panic!(),
172
            _ = futures::future::ready(()) => { },
173
        };
174

            
175
        s.maybe_send(|i| *i + 1);
176
        // Ready, with 21
177
        select_biased! {
178
            i = r.next().fuse() => assert_eq!(i, Some(21)),
179
            _ = futures::future::ready(()) => panic!(),
180
        };
181

            
182
        let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
183
        // Not ready
184
        select_biased! {
185
            _ = r.next().fuse() => panic!(),
186
            _ = futures::future::ready(()) => { },
187
        };
188
    }
189

            
190
    #[test]
191
    fn postage_drop() {
192
        #[derive(Clone, Copy, Debug, Eq, PartialEq)]
193
        struct I(i32);
194

            
195
        impl DropNotifyEofSignallable for I {
196
            fn eof() -> I {
197
                I(0)
198
            }
199
            fn is_eof(&self) -> bool {
200
                self.0 == 0
201
            }
202
        }
203

            
204
        let (s, r) = postage::watch::channel_with(I(20));
205
        let s = DropNotifyWatchSender::new(s);
206

            
207
        assert_eq!(*r.borrow(), I(20));
208
        drop(s);
209
        assert_eq!(*r.borrow(), I(0));
210

            
211
        let (s, r) = postage::watch::channel_with(I(44));
212
        let s = DropNotifyWatchSender::new(s);
213

            
214
        assert_eq!(*r.borrow(), I(44));
215
        drop(s.into_inner());
216
        assert_eq!(*r.borrow(), I(44));
217
    }
218
}