iota_graphql_rpc/server/
watermark_task.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{mem, sync::Arc, time::Duration};
6
7use async_graphql::ServerError;
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
9use iota_indexer::schema::checkpoints;
10use tokio::sync::{RwLock, watch};
11use tokio_util::sync::CancellationToken;
12use tracing::{error, info};
13
14use crate::{
15    data::{Db, DbConnection, QueryExecutor},
16    error::Error,
17    metrics::Metrics,
18};
19
20/// Watermark task that periodically updates the current checkpoint, checkpoint
21/// timestamp, and epoch values.
22pub(crate) struct WatermarkTask {
23    /// Thread-safe watermark that avoids writer starvation
24    watermark: WatermarkLock,
25    db: Db,
26    metrics: Metrics,
27    sleep: Duration,
28    cancel: CancellationToken,
29    sender: watch::Sender<u64>,
30    receiver: watch::Receiver<u64>,
31}
32
33pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
34
35/// Watermark used by GraphQL queries to ensure cross-query consistency and flag
36/// epoch-boundary changes.
37#[derive(Clone, Copy, Default)]
38pub(crate) struct Watermark {
39    /// The checkpoint upper-bound for the query.
40    pub checkpoint: u64,
41    /// The checkpoint upper-bound timestamp for the query.
42    pub checkpoint_timestamp_ms: u64,
43    /// The current epoch.
44    pub epoch: u64,
45}
46
47/// Starts an infinite loop that periodically updates the `checkpoint_viewed_at`
48/// high watermark.
49impl WatermarkTask {
50    pub(crate) fn new(
51        db: Db,
52        metrics: Metrics,
53        sleep: Duration,
54        cancel: CancellationToken,
55    ) -> Self {
56        let (sender, receiver) = watch::channel(0);
57
58        Self {
59            watermark: Default::default(),
60            db,
61            metrics,
62            sleep,
63            cancel,
64            sender,
65            receiver,
66        }
67    }
68
69    pub(crate) async fn run(&self) {
70        loop {
71            tokio::select! {
72                _ = self.cancel.cancelled() => {
73                    info!("Shutdown signal received, terminating watermark update task");
74                    return;
75                },
76                _ = tokio::time::sleep(self.sleep) => {
77                    let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await {
78                        Ok(Some(watermark)) => watermark,
79                        Ok(None) => continue,
80                        Err(e) => {
81                            error!("{}", e);
82                            self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
83                            continue;
84                        }
85                    };
86
87                    // Write the watermark as follows to limit how long we hold the lock
88                    let prev_epoch = {
89                        let mut w = self.watermark.write().await;
90                        w.checkpoint = checkpoint;
91                        w.checkpoint_timestamp_ms = checkpoint_timestamp_ms;
92                        mem::replace(&mut w.epoch, epoch)
93                    };
94
95                    if epoch > prev_epoch {
96                        self.sender.send(epoch).unwrap();
97                    }
98                }
99            }
100        }
101    }
102
103    pub(crate) fn lock(&self) -> WatermarkLock {
104        self.watermark.clone()
105    }
106
107    /// Receiver for subscribing to epoch changes.
108    pub(crate) fn epoch_receiver(&self) -> watch::Receiver<u64> {
109        self.receiver.clone()
110    }
111}
112
113impl Watermark {
114    pub(crate) async fn new(lock: WatermarkLock) -> Self {
115        let w = lock.read().await;
116        Self {
117            checkpoint: w.checkpoint,
118            checkpoint_timestamp_ms: w.checkpoint_timestamp_ms,
119            epoch: w.epoch,
120        }
121    }
122
123    pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
124        use checkpoints::dsl;
125        let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db
126            .execute(move |conn| {
127                conn.first(move || {
128                    dsl::checkpoints
129                        .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch))
130                        .order_by(dsl::sequence_number.desc())
131                })
132                .optional()
133            })
134            .await
135            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?
136        else {
137            return Ok(None);
138        };
139        Ok(Some(Watermark {
140            checkpoint: checkpoint as u64,
141            checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64,
142            epoch: epoch as u64,
143        }))
144    }
145}