1use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{
8 connection::{Connection, CursorType, Edge},
9 dataloader::Loader,
10 *,
11};
12use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
13use fastcrypto::encoding::{Base58, Encoding};
14use iota_indexer::{models::checkpoints::StoredCheckpoint, schema::checkpoints};
15use iota_types::messages_checkpoint::CheckpointDigest;
16use serde::{Deserialize, Serialize};
17
18use crate::{
19 connection::ScanConnection,
20 consistency::Checkpointed,
21 data::{self, Conn, DataLoader, Db, DbConnection, QueryExecutor},
22 error::Error,
23 types::{
24 base64::Base64,
25 cursor::{self, Page, Paginated, ScanLimited, Target},
26 date_time::DateTime,
27 digest::Digest,
28 epoch::Epoch,
29 gas::GasCostSummary,
30 transaction_block::{self, TransactionBlock, TransactionBlockFilter},
31 uint53::UInt53,
32 },
33};
34
35#[derive(Default, InputObject)]
38pub(crate) struct CheckpointId {
39 pub digest: Option<Digest>,
40 pub sequence_number: Option<UInt53>,
41}
42
43#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
46struct SeqNumKey {
47 pub sequence_number: u64,
48 pub digest: Option<Digest>,
52 pub checkpoint_viewed_at: u64,
53}
54
55#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
58struct DigestKey {
59 pub digest: Digest,
60 pub checkpoint_viewed_at: u64,
61}
62
63#[derive(Clone)]
64pub(crate) struct Checkpoint {
65 pub stored: StoredCheckpoint,
68 pub checkpoint_viewed_at: u64,
70}
71
72pub(crate) type Cursor = cursor::JsonCursor<CheckpointCursor>;
73type Query<ST, GB> = data::Query<ST, checkpoints::table, GB>;
74
75#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
79pub(crate) struct CheckpointCursor {
80 #[serde(rename = "c")]
82 pub checkpoint_viewed_at: u64,
83 #[serde(rename = "s")]
84 pub sequence_number: u64,
85}
86
87#[Object]
90impl Checkpoint {
91 async fn digest(&self) -> Result<String> {
96 Ok(self.digest_impl().extend()?.base58_encode())
97 }
98
99 async fn sequence_number(&self) -> UInt53 {
102 self.sequence_number_impl().into()
103 }
104
105 async fn timestamp(&self) -> Result<DateTime> {
109 DateTime::from_ms(self.stored.timestamp_ms).extend()
110 }
111
112 async fn validator_signatures(&self) -> Base64 {
115 Base64::from(&self.stored.validator_signature)
116 }
117
118 async fn previous_checkpoint_digest(&self) -> Option<String> {
120 self.stored
121 .previous_checkpoint_digest
122 .as_ref()
123 .map(Base58::encode)
124 }
125
126 async fn network_total_transactions(&self) -> Option<UInt53> {
129 Some(self.network_total_transactions_impl().into())
130 }
131
132 async fn rolling_gas_summary(&self) -> Option<GasCostSummary> {
137 Some(GasCostSummary {
138 computation_cost: self.stored.computation_cost as u64,
139 computation_cost_burned: self.stored.computation_cost_burned(),
140 storage_cost: self.stored.storage_cost as u64,
141 storage_rebate: self.stored.storage_rebate as u64,
142 non_refundable_storage_fee: self.stored.non_refundable_storage_fee as u64,
143 })
144 }
145
146 async fn epoch(&self, ctx: &Context<'_>) -> Result<Option<Epoch>> {
148 Epoch::query(
149 ctx,
150 Some(self.stored.epoch as u64),
151 self.checkpoint_viewed_at,
152 )
153 .await
154 .extend()
155 }
156
157 async fn transaction_blocks(
181 &self,
182 ctx: &Context<'_>,
183 first: Option<u64>,
184 after: Option<transaction_block::Cursor>,
185 last: Option<u64>,
186 before: Option<transaction_block::Cursor>,
187 filter: Option<TransactionBlockFilter>,
188 scan_limit: Option<u64>,
189 ) -> Result<ScanConnection<String, TransactionBlock>> {
190 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
191
192 let Some(filter) = filter
193 .unwrap_or_default()
194 .intersect(TransactionBlockFilter {
195 at_checkpoint: Some(UInt53::from(self.stored.sequence_number as u64)),
196 ..Default::default()
197 })
198 else {
199 return Ok(ScanConnection::new(false, false));
200 };
201
202 TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
203 .await
204 .extend()
205 }
206}
207
208impl CheckpointId {
209 pub(crate) fn by_seq_num(seq_num: u64) -> Self {
210 CheckpointId {
211 sequence_number: Some(seq_num.into()),
212 digest: None,
213 }
214 }
215}
216
217impl Checkpoint {
218 pub(crate) fn sequence_number_impl(&self) -> u64 {
219 self.stored.sequence_number as u64
220 }
221
222 pub(crate) fn network_total_transactions_impl(&self) -> u64 {
223 self.stored.network_total_transactions as u64
224 }
225
226 pub(crate) fn digest_impl(&self) -> Result<CheckpointDigest, Error> {
227 CheckpointDigest::try_from(self.stored.checkpoint_digest.clone())
228 .map_err(|e| Error::Internal(format!("Failed to deserialize checkpoint digest: {e}")))
229 }
230
231 pub(crate) async fn query(
235 ctx: &Context<'_>,
236 filter: CheckpointId,
237 checkpoint_viewed_at: u64,
238 ) -> Result<Option<Self>, Error> {
239 match filter {
240 CheckpointId {
241 sequence_number: Some(sequence_number),
242 digest,
243 } => {
244 let DataLoader(dl) = ctx.data_unchecked();
245 dl.load_one(SeqNumKey {
246 sequence_number: sequence_number.into(),
247 digest,
248 checkpoint_viewed_at,
249 })
250 .await
251 }
252
253 CheckpointId {
254 sequence_number: None,
255 digest: Some(digest),
256 } => {
257 let DataLoader(dl) = ctx.data_unchecked();
258 dl.load_one(DigestKey {
259 digest,
260 checkpoint_viewed_at,
261 })
262 .await
263 }
264
265 CheckpointId {
266 sequence_number: None,
267 digest: None,
268 } => Checkpoint::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await,
269 }
270 }
271
272 async fn query_latest_at(db: &Db, checkpoint_viewed_at: u64) -> Result<Option<Self>, Error> {
276 use checkpoints::dsl;
277
278 let stored: Option<StoredCheckpoint> = db
279 .execute(move |conn| {
280 conn.first(move || {
281 dsl::checkpoints
282 .filter(dsl::sequence_number.le(checkpoint_viewed_at as i64))
283 .order_by(dsl::sequence_number.desc())
284 })
285 .optional()
286 })
287 .await
288 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?;
289
290 Ok(stored.map(|stored| Checkpoint {
291 stored,
292 checkpoint_viewed_at,
293 }))
294 }
295
296 pub(crate) fn query_timestamp(
300 conn: &mut Conn<'_>,
301 seq_num: u64,
302 ) -> Result<u64, diesel::result::Error> {
303 use checkpoints::dsl;
304
305 let stored: i64 = conn.first(|| {
306 dsl::checkpoints
307 .select(dsl::timestamp_ms)
308 .filter(dsl::sequence_number.eq(seq_num as i64))
309 })?;
310
311 Ok(stored as u64)
312 }
313
314 pub(crate) async fn paginate(
329 db: &Db,
330 page: Page<Cursor>,
331 filter: Option<u64>,
332 checkpoint_viewed_at: u64,
333 ) -> Result<Connection<String, Checkpoint>, Error> {
334 use checkpoints::dsl;
335 let cursor_viewed_at = page.validate_cursor_consistency()?;
336 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
337
338 let (prev, next, results) = db
339 .execute(move |conn| {
340 page.paginate_query::<StoredCheckpoint, _, _, _>(
341 conn,
342 checkpoint_viewed_at,
343 move || {
344 let mut query = dsl::checkpoints.into_boxed();
345 query = query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
346 if let Some(epoch) = filter {
347 query = query.filter(dsl::epoch.eq(epoch as i64));
348 }
349 query
350 },
351 )
352 })
353 .await?;
354
355 let mut conn = Connection::new(prev, next);
358 for stored in results {
359 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
360 conn.edges.push(Edge::new(
361 cursor,
362 Checkpoint {
363 stored,
364 checkpoint_viewed_at,
365 },
366 ));
367 }
368
369 Ok(conn)
370 }
371}
372
373impl Paginated<Cursor> for StoredCheckpoint {
374 type Source = checkpoints::table;
375
376 fn filter_ge<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
377 query.filter(checkpoints::dsl::sequence_number.ge(cursor.sequence_number as i64))
378 }
379
380 fn filter_le<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
381 query.filter(checkpoints::dsl::sequence_number.le(cursor.sequence_number as i64))
382 }
383
384 fn order<ST, GB>(asc: bool, query: Query<ST, GB>) -> Query<ST, GB> {
385 use checkpoints::dsl;
386 if asc {
387 query.order(dsl::sequence_number)
388 } else {
389 query.order(dsl::sequence_number.desc())
390 }
391 }
392}
393
394impl Target<Cursor> for StoredCheckpoint {
395 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
396 Cursor::new(CheckpointCursor {
397 checkpoint_viewed_at,
398 sequence_number: self.sequence_number as u64,
399 })
400 }
401}
402
403impl Checkpointed for Cursor {
404 fn checkpoint_viewed_at(&self) -> u64 {
405 self.checkpoint_viewed_at
406 }
407}
408
409impl ScanLimited for Cursor {}
410
411impl Loader<SeqNumKey> for Db {
412 type Value = Checkpoint;
413 type Error = Error;
414
415 async fn load(&self, keys: &[SeqNumKey]) -> Result<HashMap<SeqNumKey, Checkpoint>, Error> {
416 use checkpoints::dsl;
417
418 let checkpoint_ids: BTreeSet<_> = keys
419 .iter()
420 .filter_map(|key| {
421 (key.checkpoint_viewed_at >= key.sequence_number)
423 .then_some(key.sequence_number as i64)
424 })
425 .collect();
426
427 let checkpoints: Vec<StoredCheckpoint> = self
428 .execute(move |conn| {
429 conn.results(move || {
430 dsl::checkpoints
431 .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
432 })
433 })
434 .await
435 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
436
437 let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
438 .into_iter()
439 .map(|stored| (stored.sequence_number as u64, stored))
440 .collect();
441
442 Ok(keys
443 .iter()
444 .filter_map(|key| {
445 let stored = checkpoint_id_to_stored.get(&key.sequence_number).cloned()?;
446 let checkpoint = Checkpoint {
447 stored,
448 checkpoint_viewed_at: key.checkpoint_viewed_at,
449 };
450
451 let digest = &checkpoint.stored.checkpoint_digest;
452 if matches!(key.digest, Some(d) if d.as_slice() != digest) {
453 None
454 } else {
455 Some((*key, checkpoint))
456 }
457 })
458 .collect())
459 }
460}
461
462impl Loader<DigestKey> for Db {
463 type Value = Checkpoint;
464 type Error = Error;
465
466 async fn load(&self, keys: &[DigestKey]) -> Result<HashMap<DigestKey, Checkpoint>, Error> {
467 use checkpoints::dsl;
468
469 let digests: BTreeSet<_> = keys.iter().map(|key| key.digest.to_vec()).collect();
470
471 let checkpoints: Vec<StoredCheckpoint> = self
472 .execute(move |conn| {
473 conn.results(move || {
474 dsl::checkpoints.filter(dsl::checkpoint_digest.eq_any(digests.iter().cloned()))
475 })
476 })
477 .await
478 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
479
480 let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
481 .into_iter()
482 .map(|stored| (stored.checkpoint_digest.clone(), stored))
483 .collect();
484
485 Ok(keys
486 .iter()
487 .filter_map(|key| {
488 let DigestKey {
489 digest,
490 checkpoint_viewed_at,
491 } = *key;
492
493 let stored = checkpoint_id_to_stored.get(digest.as_slice()).cloned()?;
494
495 let checkpoint = Checkpoint {
496 stored,
497 checkpoint_viewed_at,
498 };
499
500 let seq_num = checkpoint.stored.sequence_number as u64;
504 (checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
505 })
506 .collect())
507 }
508}