iota_graphql_rpc/server/
watermark_task.rs1use std::{mem, sync::Arc, time::Duration};
6
7use async_graphql::ServerError;
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
9use iota_indexer::schema::checkpoints;
10use tokio::sync::{RwLock, watch};
11use tokio_util::sync::CancellationToken;
12use tracing::{error, info};
13
14use crate::{
15 data::{Db, DbConnection, QueryExecutor},
16 error::Error,
17 metrics::Metrics,
18};
19
20pub(crate) struct WatermarkTask {
23 watermark: WatermarkLock,
25 db: Db,
26 metrics: Metrics,
27 sleep: Duration,
28 cancel: CancellationToken,
29 sender: watch::Sender<u64>,
30 receiver: watch::Receiver<u64>,
31}
32
33pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
34
35#[derive(Clone, Copy, Default)]
38pub(crate) struct Watermark {
39 pub checkpoint: u64,
41 pub checkpoint_timestamp_ms: u64,
43 pub epoch: u64,
45}
46
47impl WatermarkTask {
50 pub(crate) fn new(
51 db: Db,
52 metrics: Metrics,
53 sleep: Duration,
54 cancel: CancellationToken,
55 ) -> Self {
56 let (sender, receiver) = watch::channel(0);
57
58 Self {
59 watermark: Default::default(),
60 db,
61 metrics,
62 sleep,
63 cancel,
64 sender,
65 receiver,
66 }
67 }
68
69 pub(crate) async fn run(&self) {
70 loop {
71 tokio::select! {
72 _ = self.cancel.cancelled() => {
73 info!("Shutdown signal received, terminating watermark update task");
74 return;
75 },
76 _ = tokio::time::sleep(self.sleep) => {
77 let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await {
78 Ok(Some(watermark)) => watermark,
79 Ok(None) => continue,
80 Err(e) => {
81 error!("{}", e);
82 self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
83 continue;
84 }
85 };
86
87 let prev_epoch = {
89 let mut w = self.watermark.write().await;
90 w.checkpoint = checkpoint;
91 w.checkpoint_timestamp_ms = checkpoint_timestamp_ms;
92 mem::replace(&mut w.epoch, epoch)
93 };
94
95 if epoch > prev_epoch {
96 self.sender.send(epoch).unwrap();
97 }
98 }
99 }
100 }
101 }
102
103 pub(crate) fn lock(&self) -> WatermarkLock {
104 self.watermark.clone()
105 }
106
107 pub(crate) fn epoch_receiver(&self) -> watch::Receiver<u64> {
109 self.receiver.clone()
110 }
111}
112
113impl Watermark {
114 pub(crate) async fn new(lock: WatermarkLock) -> Self {
115 let w = lock.read().await;
116 Self {
117 checkpoint: w.checkpoint,
118 checkpoint_timestamp_ms: w.checkpoint_timestamp_ms,
119 epoch: w.epoch,
120 }
121 }
122
123 pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
124 use checkpoints::dsl;
125 let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db
126 .execute(move |conn| {
127 conn.first(move || {
128 dsl::checkpoints
129 .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch))
130 .order_by(dsl::sequence_number.desc())
131 })
132 .optional()
133 })
134 .await
135 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?
136 else {
137 return Ok(None);
138 };
139 Ok(Some(Watermark {
140 checkpoint: checkpoint as u64,
141 checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64,
142 epoch: epoch as u64,
143 }))
144 }
145}