iota_common/sync/
async_once_cell.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use parking_lot::Mutex;
8use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
9
10/// This structure contains a cell for a single value.
11/// The cell can be written only once, and can be read many times.
12/// Readers are provided with async API, that waits for write to happen.
13/// This is similar to tokio::sync::watch, except one difference:
14/// * tokio::sync::watch requires existing receiver to work. If no subscriber is
15///   registered, and the value is sent to channel, the value is dropped
16/// * Unlike with tokio::sync::watch, it is possible to write to AsyncOnceCell
17///   when no readers are registered, and value will be available later when
18///   AsyncOnceCell::get is called
19pub struct AsyncOnceCell<T> {
20    value: Arc<RwLock<Option<T>>>,
21    writer: Mutex<Option<OwnedRwLockWriteGuard<Option<T>>>>,
22}
23
24impl<T: Send + Clone> AsyncOnceCell<T> {
25    pub fn new() -> Self {
26        let value = Arc::new(RwLock::new(None));
27        let writer = value
28            .clone()
29            .try_write_owned()
30            .expect("Write lock can not fail here");
31        let writer = Mutex::new(Some(writer));
32        Self { value, writer }
33    }
34
35    pub async fn get(&self) -> T {
36        self.value
37            .read()
38            .await
39            .as_ref()
40            .cloned()
41            .expect("Value is available when writer is dropped")
42    }
43
44    /// Sets the value and notifies waiters. Return error if called twice
45    #[expect(clippy::result_unit_err)]
46    pub fn set(&self, value: T) -> Result<(), ()> {
47        let mut writer = self.writer.lock();
48        match writer.take() {
49            None => Err(()),
50            Some(mut writer) => {
51                *writer = Some(value);
52                Ok(())
53            }
54        }
55    }
56}
57
58impl<T: Send + Clone> Default for AsyncOnceCell<T> {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67
68    #[tokio::test]
69    async fn async_once_cell_test() {
70        let cell = Arc::new(AsyncOnceCell::<u64>::new());
71        let cell2 = cell.clone();
72        let wait = tokio::spawn(async move { cell2.get().await });
73        tokio::task::yield_now().await;
74        cell.set(15).unwrap();
75        assert!(cell.set(16).is_err());
76        assert_eq!(15, cell.get().await);
77        assert_eq!(15, wait.await.unwrap());
78    }
79}