iota_graphql_rpc/server/
watermark_task.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{mem, sync::Arc, time::Duration};
6
7use async_graphql::ServerError;
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
9use iota_indexer::schema::checkpoints;
10use tokio::sync::{RwLock, watch};
11use tokio_util::sync::CancellationToken;
12use tracing::{error, info};
13
14use crate::{
15    data::{Db, DbConnection, QueryExecutor},
16    error::Error,
17    metrics::Metrics,
18};
19
20/// Watermark task that periodically updates the current checkpoint, checkpoint
21/// timestamp, and epoch values.
22pub(crate) struct WatermarkTask {
23    /// Thread-safe watermark that avoids writer starvation
24    watermark: WatermarkLock,
25    db: Db,
26    metrics: Metrics,
27    sleep: Duration,
28    cancel: CancellationToken,
29    sender: watch::Sender<u64>,
30    receiver: watch::Receiver<u64>,
31}
32
33pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
34
35/// Watermark used by GraphQL queries to ensure cross-query consistency and flag
36/// epoch-boundary changes.
37#[derive(Clone, Copy, Default)]
38pub(crate) struct Watermark {
39    /// The checkpoint upper-bound for the query.
40    pub checkpoint: u64,
41    /// The checkpoint upper-bound timestamp for the query.
42    pub checkpoint_timestamp_ms: u64,
43    /// The current epoch.
44    pub epoch: u64,
45}
46
47/// Starts an infinite loop that periodically updates the `checkpoint_viewed_at`
48/// high watermark.
49impl WatermarkTask {
50    pub(crate) fn new(
51        db: Db,
52        metrics: Metrics,
53        sleep: Duration,
54        cancel: CancellationToken,
55    ) -> Self {
56        let (sender, receiver) = watch::channel(0);
57
58        Self {
59            watermark: Default::default(),
60            db,
61            metrics,
62            sleep,
63            cancel,
64            sender,
65            receiver,
66        }
67    }
68
69    pub(crate) async fn run(&self) {
70        let mut interval = tokio::time::interval(self.sleep);
71        loop {
72            tokio::select! {
73                _ = self.cancel.cancelled() => {
74                    info!("shutdown signal received, terminating watermark update task");
75                    return;
76                },
77                _ = interval.tick() => {
78                    let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await {
79                        Ok(Some(watermark)) => watermark,
80                        Ok(None) => continue,
81                        Err(e) => {
82                            error!("error fetching the watermark: {e}");
83                            self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
84                            continue;
85                        }
86                    };
87
88                    // Write the watermark as follows to limit how long we hold the lock
89                    let prev_epoch = {
90                        let mut w = self.watermark.write().await;
91                        w.checkpoint = checkpoint;
92                        w.checkpoint_timestamp_ms = checkpoint_timestamp_ms;
93                        mem::replace(&mut w.epoch, epoch)
94                    };
95
96                    if epoch > prev_epoch {
97                        self.sender.send(epoch).unwrap();
98                    }
99                }
100            }
101        }
102    }
103
104    /// Returns a clone of the watermark lock.
105    ///
106    /// It clones the underlying `Arc<RwLock<Watermark>>` wrapper, which means
107    /// the returned `WatermarkLock` shares the same inner data with the
108    /// original.
109    pub(crate) fn lock(&self) -> WatermarkLock {
110        self.watermark.clone()
111    }
112
113    /// Receiver for subscribing to epoch changes.
114    pub(crate) fn epoch_receiver(&self) -> watch::Receiver<u64> {
115        self.receiver.clone()
116    }
117}
118
119impl Watermark {
120    pub(crate) async fn new(lock: WatermarkLock) -> Self {
121        let w = lock.read().await;
122        Self {
123            checkpoint: w.checkpoint,
124            checkpoint_timestamp_ms: w.checkpoint_timestamp_ms,
125            epoch: w.epoch,
126        }
127    }
128
129    pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
130        use checkpoints::dsl;
131        let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db
132            .execute(move |conn| {
133                conn.first(move || {
134                    dsl::checkpoints
135                        .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch))
136                        .order_by(dsl::sequence_number.desc())
137                })
138                .optional()
139            })
140            .await
141            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?
142        else {
143            return Ok(None);
144        };
145        Ok(Some(Watermark {
146            checkpoint: checkpoint as u64,
147            checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64,
148            epoch: epoch as u64,
149        }))
150    }
151}