1use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{
8 connection::{Connection, CursorType, Edge},
9 dataloader::Loader,
10 *,
11};
12use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
13use fastcrypto::encoding::{Base58, Encoding};
14use iota_indexer::{models::checkpoints::StoredCheckpoint, schema::checkpoints};
15use iota_types::messages_checkpoint::CheckpointDigest;
16use serde::{Deserialize, Serialize};
17
18use crate::{
19 config::DEFAULT_PAGE_SIZE,
20 connection::ScanConnection,
21 consistency::Checkpointed,
22 data::{self, Conn, DataLoader, Db, DbConnection, QueryExecutor},
23 error::Error,
24 types::{
25 base64::Base64,
26 cursor::{self, Page, Paginated, ScanLimited, Target},
27 date_time::DateTime,
28 digest::Digest,
29 epoch::Epoch,
30 gas::GasCostSummary,
31 transaction_block::{self, TransactionBlock, TransactionBlockFilter},
32 uint53::UInt53,
33 },
34};
35
36#[derive(Default, InputObject)]
39pub(crate) struct CheckpointId {
40 pub digest: Option<Digest>,
41 pub sequence_number: Option<UInt53>,
42}
43
44#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
47struct SeqNumKey {
48 pub sequence_number: u64,
49 pub digest: Option<Digest>,
53 pub checkpoint_viewed_at: u64,
54}
55
56#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
59struct DigestKey {
60 pub digest: Digest,
61 pub checkpoint_viewed_at: u64,
62}
63
64#[derive(Clone)]
65pub(crate) struct Checkpoint {
66 pub stored: StoredCheckpoint,
69 pub checkpoint_viewed_at: u64,
71}
72
73pub(crate) type Cursor = cursor::JsonCursor<CheckpointCursor>;
74type Query<ST, GB> = data::Query<ST, checkpoints::table, GB>;
75
76#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
80pub(crate) struct CheckpointCursor {
81 #[serde(rename = "c")]
83 pub checkpoint_viewed_at: u64,
84 #[serde(rename = "s")]
85 pub sequence_number: u64,
86}
87
88#[Object]
91impl Checkpoint {
92 #[graphql(complexity = 0)]
97 async fn digest(&self) -> Result<String> {
98 Ok(self.digest_impl().extend()?.base58_encode())
99 }
100
101 #[graphql(complexity = 0)]
104 async fn sequence_number(&self) -> UInt53 {
105 self.sequence_number_impl().into()
106 }
107
108 #[graphql(complexity = 0)]
112 async fn timestamp(&self) -> Result<DateTime> {
113 DateTime::from_ms(self.stored.timestamp_ms).extend()
114 }
115
116 #[graphql(complexity = 0)]
119 async fn validator_signatures(&self) -> Base64 {
120 Base64::from(&self.stored.validator_signature)
121 }
122
123 #[graphql(complexity = 0)]
125 async fn previous_checkpoint_digest(&self) -> Option<String> {
126 self.stored
127 .previous_checkpoint_digest
128 .as_ref()
129 .map(Base58::encode)
130 }
131
132 #[graphql(complexity = 0)]
135 async fn network_total_transactions(&self) -> Option<UInt53> {
136 Some(self.network_total_transactions_impl().into())
137 }
138
139 #[graphql(complexity = 0)]
144 async fn rolling_gas_summary(&self) -> Option<GasCostSummary> {
145 Some(GasCostSummary {
146 computation_cost: self.stored.computation_cost as u64,
147 computation_cost_burned: self.stored.computation_cost_burned(),
148 storage_cost: self.stored.storage_cost as u64,
149 storage_rebate: self.stored.storage_rebate as u64,
150 non_refundable_storage_fee: self.stored.non_refundable_storage_fee as u64,
151 })
152 }
153
154 async fn epoch(&self, ctx: &Context<'_>) -> Result<Option<Epoch>> {
156 Epoch::query(
157 ctx,
158 Some(self.stored.epoch as u64),
159 self.checkpoint_viewed_at,
160 )
161 .await
162 .extend()
163 }
164
165 #[graphql(
189 complexity = "first.or(last).unwrap_or(DEFAULT_PAGE_SIZE as u64) as usize * child_complexity"
190 )]
191 async fn transaction_blocks(
192 &self,
193 ctx: &Context<'_>,
194 first: Option<u64>,
195 after: Option<transaction_block::Cursor>,
196 last: Option<u64>,
197 before: Option<transaction_block::Cursor>,
198 filter: Option<TransactionBlockFilter>,
199 scan_limit: Option<u64>,
200 ) -> Result<ScanConnection<String, TransactionBlock>> {
201 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
202
203 let Some(filter) = filter
204 .unwrap_or_default()
205 .intersect(TransactionBlockFilter {
206 at_checkpoint: Some(UInt53::from(self.stored.sequence_number as u64)),
207 ..Default::default()
208 })
209 else {
210 return Ok(ScanConnection::new(false, false));
211 };
212
213 TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
214 .await
215 .extend()
216 }
217}
218
219impl CheckpointId {
220 pub(crate) fn by_seq_num(seq_num: u64) -> Self {
221 CheckpointId {
222 sequence_number: Some(seq_num.into()),
223 digest: None,
224 }
225 }
226}
227
228impl Checkpoint {
229 pub(crate) fn sequence_number_impl(&self) -> u64 {
230 self.stored.sequence_number as u64
231 }
232
233 pub(crate) fn network_total_transactions_impl(&self) -> u64 {
234 self.stored.network_total_transactions as u64
235 }
236
237 pub(crate) fn digest_impl(&self) -> Result<CheckpointDigest, Error> {
238 CheckpointDigest::try_from(self.stored.checkpoint_digest.clone())
239 .map_err(|e| Error::Internal(format!("Failed to deserialize checkpoint digest: {e}")))
240 }
241
242 pub(crate) async fn query(
246 ctx: &Context<'_>,
247 filter: CheckpointId,
248 checkpoint_viewed_at: u64,
249 ) -> Result<Option<Self>, Error> {
250 match filter {
251 CheckpointId {
252 sequence_number: Some(sequence_number),
253 digest,
254 } => {
255 let DataLoader(dl) = ctx.data_unchecked();
256 dl.load_one(SeqNumKey {
257 sequence_number: sequence_number.into(),
258 digest,
259 checkpoint_viewed_at,
260 })
261 .await
262 }
263
264 CheckpointId {
265 sequence_number: None,
266 digest: Some(digest),
267 } => {
268 let DataLoader(dl) = ctx.data_unchecked();
269 dl.load_one(DigestKey {
270 digest,
271 checkpoint_viewed_at,
272 })
273 .await
274 }
275
276 CheckpointId {
277 sequence_number: None,
278 digest: None,
279 } => Checkpoint::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await,
280 }
281 }
282
283 async fn query_latest_at(db: &Db, checkpoint_viewed_at: u64) -> Result<Option<Self>, Error> {
287 use checkpoints::dsl;
288
289 let stored: Option<StoredCheckpoint> = db
290 .execute(move |conn| {
291 conn.first(move || {
292 dsl::checkpoints
293 .filter(dsl::sequence_number.le(checkpoint_viewed_at as i64))
294 .order_by(dsl::sequence_number.desc())
295 })
296 .optional()
297 })
298 .await
299 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?;
300
301 Ok(stored.map(|stored| Checkpoint {
302 stored,
303 checkpoint_viewed_at,
304 }))
305 }
306
307 pub(crate) fn query_timestamp(
311 conn: &mut Conn<'_>,
312 seq_num: u64,
313 ) -> Result<u64, diesel::result::Error> {
314 use checkpoints::dsl;
315
316 let stored: i64 = conn.first(|| {
317 dsl::checkpoints
318 .select(dsl::timestamp_ms)
319 .filter(dsl::sequence_number.eq(seq_num as i64))
320 })?;
321
322 Ok(stored as u64)
323 }
324
325 pub(crate) async fn paginate(
340 db: &Db,
341 page: Page<Cursor>,
342 filter: Option<u64>,
343 checkpoint_viewed_at: u64,
344 ) -> Result<Connection<String, Checkpoint>, Error> {
345 use checkpoints::dsl;
346 let cursor_viewed_at = page.validate_cursor_consistency()?;
347 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
348
349 let (prev, next, results) = db
350 .execute(move |conn| {
351 page.paginate_query::<StoredCheckpoint, _, _, _>(
352 conn,
353 checkpoint_viewed_at,
354 move || {
355 let mut query = dsl::checkpoints.into_boxed();
356 query = query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
357 if let Some(epoch) = filter {
358 query = query.filter(dsl::epoch.eq(epoch as i64));
359 }
360 query
361 },
362 )
363 })
364 .await?;
365
366 let mut conn = Connection::new(prev, next);
369 for stored in results {
370 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
371 conn.edges.push(Edge::new(
372 cursor,
373 Checkpoint {
374 stored,
375 checkpoint_viewed_at,
376 },
377 ));
378 }
379
380 Ok(conn)
381 }
382}
383
384impl Paginated<Cursor> for StoredCheckpoint {
385 type Source = checkpoints::table;
386
387 fn filter_ge<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
388 query.filter(checkpoints::dsl::sequence_number.ge(cursor.sequence_number as i64))
389 }
390
391 fn filter_le<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
392 query.filter(checkpoints::dsl::sequence_number.le(cursor.sequence_number as i64))
393 }
394
395 fn order<ST, GB>(asc: bool, query: Query<ST, GB>) -> Query<ST, GB> {
396 use checkpoints::dsl;
397 if asc {
398 query.order(dsl::sequence_number)
399 } else {
400 query.order(dsl::sequence_number.desc())
401 }
402 }
403}
404
405impl Target<Cursor> for StoredCheckpoint {
406 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
407 Cursor::new(CheckpointCursor {
408 checkpoint_viewed_at,
409 sequence_number: self.sequence_number as u64,
410 })
411 }
412}
413
414impl Checkpointed for Cursor {
415 fn checkpoint_viewed_at(&self) -> u64 {
416 self.checkpoint_viewed_at
417 }
418}
419
420impl ScanLimited for Cursor {}
421
422impl Loader<SeqNumKey> for Db {
423 type Value = Checkpoint;
424 type Error = Error;
425
426 async fn load(&self, keys: &[SeqNumKey]) -> Result<HashMap<SeqNumKey, Checkpoint>, Error> {
427 use checkpoints::dsl;
428
429 let checkpoint_ids: BTreeSet<_> = keys
430 .iter()
431 .filter_map(|key| {
432 (key.checkpoint_viewed_at >= key.sequence_number)
434 .then_some(key.sequence_number as i64)
435 })
436 .collect();
437
438 let checkpoints: Vec<StoredCheckpoint> = self
439 .execute(move |conn| {
440 conn.results(move || {
441 dsl::checkpoints
442 .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
443 })
444 })
445 .await
446 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
447
448 let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
449 .into_iter()
450 .map(|stored| (stored.sequence_number as u64, stored))
451 .collect();
452
453 Ok(keys
454 .iter()
455 .filter_map(|key| {
456 let stored = checkpoint_id_to_stored.get(&key.sequence_number).cloned()?;
457 let checkpoint = Checkpoint {
458 stored,
459 checkpoint_viewed_at: key.checkpoint_viewed_at,
460 };
461
462 let digest = &checkpoint.stored.checkpoint_digest;
463 if matches!(key.digest, Some(d) if d.as_slice() != digest) {
464 None
465 } else {
466 Some((*key, checkpoint))
467 }
468 })
469 .collect())
470 }
471}
472
473impl Loader<DigestKey> for Db {
474 type Value = Checkpoint;
475 type Error = Error;
476
477 async fn load(&self, keys: &[DigestKey]) -> Result<HashMap<DigestKey, Checkpoint>, Error> {
478 use checkpoints::dsl;
479
480 let digests: BTreeSet<_> = keys.iter().map(|key| key.digest.to_vec()).collect();
481
482 let checkpoints: Vec<StoredCheckpoint> = self
483 .execute(move |conn| {
484 conn.results(move || {
485 dsl::checkpoints.filter(dsl::checkpoint_digest.eq_any(digests.iter().cloned()))
486 })
487 })
488 .await
489 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
490
491 let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
492 .into_iter()
493 .map(|stored| (stored.checkpoint_digest.clone(), stored))
494 .collect();
495
496 Ok(keys
497 .iter()
498 .filter_map(|key| {
499 let DigestKey {
500 digest,
501 checkpoint_viewed_at,
502 } = *key;
503
504 let stored = checkpoint_id_to_stored.get(digest.as_slice()).cloned()?;
505
506 let checkpoint = Checkpoint {
507 stored,
508 checkpoint_viewed_at,
509 };
510
511 let seq_num = checkpoint.stored.sequence_number as u64;
515 (checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
516 })
517 .collect())
518 }
519}