iota_graphql_rpc/server/
watermark_task.rs1use std::{mem, sync::Arc, time::Duration};
6
7use async_graphql::ServerError;
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
9use iota_indexer::schema::checkpoints;
10use tokio::sync::{RwLock, watch};
11use tokio_util::sync::CancellationToken;
12use tracing::{error, info};
13
14use crate::{
15 data::{Db, DbConnection, QueryExecutor},
16 error::Error,
17 metrics::Metrics,
18};
19
20pub(crate) struct WatermarkTask {
23 watermark: WatermarkLock,
25 db: Db,
26 metrics: Metrics,
27 sleep: Duration,
28 cancel: CancellationToken,
29 sender: watch::Sender<u64>,
30 receiver: watch::Receiver<u64>,
31}
32
33pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
34
35#[derive(Clone, Copy, Default)]
38pub(crate) struct Watermark {
39 pub checkpoint: u64,
41 pub checkpoint_timestamp_ms: u64,
43 pub epoch: u64,
45}
46
47impl WatermarkTask {
50 pub(crate) fn new(
51 db: Db,
52 metrics: Metrics,
53 sleep: Duration,
54 cancel: CancellationToken,
55 ) -> Self {
56 let (sender, receiver) = watch::channel(0);
57
58 Self {
59 watermark: Default::default(),
60 db,
61 metrics,
62 sleep,
63 cancel,
64 sender,
65 receiver,
66 }
67 }
68
69 pub(crate) async fn run(&self) {
70 let mut interval = tokio::time::interval(self.sleep);
71 loop {
72 tokio::select! {
73 _ = self.cancel.cancelled() => {
74 info!("shutdown signal received, terminating watermark update task");
75 return;
76 },
77 _ = interval.tick() => {
78 let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await {
79 Ok(Some(watermark)) => watermark,
80 Ok(None) => continue,
81 Err(e) => {
82 error!("error fetching the watermark: {e}");
83 self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
84 continue;
85 }
86 };
87
88 let prev_epoch = {
90 let mut w = self.watermark.write().await;
91 w.checkpoint = checkpoint;
92 w.checkpoint_timestamp_ms = checkpoint_timestamp_ms;
93 mem::replace(&mut w.epoch, epoch)
94 };
95
96 if epoch > prev_epoch {
97 self.sender.send(epoch).unwrap();
98 }
99 }
100 }
101 }
102 }
103
104 pub(crate) fn lock(&self) -> WatermarkLock {
110 self.watermark.clone()
111 }
112
113 pub(crate) fn epoch_receiver(&self) -> watch::Receiver<u64> {
115 self.receiver.clone()
116 }
117}
118
119impl Watermark {
120 pub(crate) async fn new(lock: WatermarkLock) -> Self {
121 let w = lock.read().await;
122 Self {
123 checkpoint: w.checkpoint,
124 checkpoint_timestamp_ms: w.checkpoint_timestamp_ms,
125 epoch: w.epoch,
126 }
127 }
128
129 pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
130 use checkpoints::dsl;
131 let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db
132 .execute(move |conn| {
133 conn.first(move || {
134 dsl::checkpoints
135 .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch))
136 .order_by(dsl::sequence_number.desc())
137 })
138 .optional()
139 })
140 .await
141 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?
142 else {
143 return Ok(None);
144 };
145 Ok(Some(Watermark {
146 checkpoint: checkpoint as u64,
147 checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64,
148 epoch: epoch as u64,
149 }))
150 }
151}