iota_common/sync/
notify_once.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use parking_lot::Mutex;
8use tokio::sync::{Notify, futures::Notified};
9
10/// Notify once allows waiter to register for certain conditions and unblocks
11/// waiter when condition is signalled with `notify` method.
12///
13/// The functionality is somewhat similar to a tokio watch channel with
14/// subscribe method, however it is much less error prone to use NotifyOnce
15/// rather then tokio watch.
16///
17/// Specifically with tokio watch you may miss notification,
18/// if you subscribe to it after the value was changed
19/// (Note that this is not a bug in tokio watch, but rather a misuse of it).
20///
21/// NotifyOnce guarantees that wait() will return once notify() is called,
22/// regardless of whether wait() was called before or after notify().
23#[derive(Debug)]
24pub struct NotifyOnce {
25    notify: Mutex<Option<Arc<Notify>>>,
26}
27
28impl NotifyOnce {
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    /// Notify all waiters, present and future about event
34    ///
35    /// After this method all pending and future calls to .wait() will return
36    ///
37    /// This method returns errors if called more then once
38    #[expect(clippy::result_unit_err)]
39    pub fn notify(&self) -> Result<(), ()> {
40        let Some(notify) = self.notify.lock().take() else {
41            return Err(());
42        };
43        // At this point all `register` either registered with current notify,
44        // or will be returning immediately
45        notify.notify_waiters();
46        Ok(())
47    }
48
49    /// Awaits for `notify` method to be called.
50    ///
51    /// This future is cancellation safe.
52    pub async fn wait(&self) {
53        // Note that we only hold lock briefly when registering for notification
54        // There is a bit of a trickery here with lock - we take a lock and if it is not
55        // empty, we register .notified() first and then release lock
56        //
57        // This is to make sure no notification is lost because Notify::notify_waiters
58        // do not notify waiters that register **after** notify_waiters was
59        // called
60        let mut notify = None;
61        let notified = self.make_notified(&mut notify);
62
63        if let Some(notified) = notified {
64            notified.await;
65        }
66    }
67
68    // This made into separate function as it is only way to make compiler
69    // not to hold `lock` in a generated async future.
70    fn make_notified<'a>(&self, notify: &'a mut Option<Arc<Notify>>) -> Option<Notified<'a>> {
71        let lock = self.notify.lock();
72        *notify = lock.as_ref().cloned();
73        notify.as_ref().map(|n| n.notified())
74    }
75}
76
77impl Default for NotifyOnce {
78    fn default() -> Self {
79        let notify = Arc::new(Notify::new());
80        let notify = Mutex::new(Some(notify));
81        Self { notify }
82    }
83}
84
85#[cfg(test)]
86mod test {
87    use super::*;
88
89    #[tokio::test]
90    async fn notify_once_test() {
91        let notify_once = NotifyOnce::new();
92        // Before notify() is called .wait() is not ready
93        assert!(
94            futures::future::poll_immediate(notify_once.wait())
95                .await
96                .is_none()
97        );
98        let wait = notify_once.wait();
99        notify_once.notify().unwrap();
100        // Pending wait() call is ready now
101        assert!(futures::future::poll_immediate(wait).await.is_some());
102        // Take wait future and don't resolve it.
103        // This makes sure lock is dropped properly and wait futures resolve
104        // independently of each other
105        let _dangle_wait = notify_once.wait();
106        // Any new wait() is immediately ready
107        assert!(
108            futures::future::poll_immediate(notify_once.wait())
109                .await
110                .is_some()
111        );
112    }
113}