iota_graphql_rpc/types/
checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{
8    connection::{Connection, CursorType, Edge},
9    dataloader::Loader,
10    *,
11};
12use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
13use fastcrypto::encoding::{Base58, Encoding};
14use iota_indexer::{models::checkpoints::StoredCheckpoint, schema::checkpoints};
15use iota_types::messages_checkpoint::CheckpointDigest;
16use serde::{Deserialize, Serialize};
17
18use crate::{
19    config::DEFAULT_PAGE_SIZE,
20    connection::ScanConnection,
21    consistency::Checkpointed,
22    data::{self, Conn, DataLoader, Db, DbConnection, QueryExecutor},
23    error::Error,
24    types::{
25        base64::Base64,
26        cursor::{self, Page, Paginated, ScanLimited, Target},
27        date_time::DateTime,
28        digest::Digest,
29        epoch::Epoch,
30        gas::GasCostSummary,
31        transaction_block::{self, TransactionBlock, TransactionBlockFilter},
32        uint53::UInt53,
33    },
34};
35
36/// Filter either by the digest, or the sequence number, or neither, to get the
37/// latest checkpoint.
38#[derive(Default, InputObject)]
39pub(crate) struct CheckpointId {
40    pub digest: Option<Digest>,
41    pub sequence_number: Option<UInt53>,
42}
43
44/// `DataLoader` key for fetching a `Checkpoint` by its sequence number,
45/// constrained by a consistency cursor.
46#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
47struct SeqNumKey {
48    pub sequence_number: u64,
49    /// The digest is not used for fetching, but is used as an additional
50    /// filter, to correctly implement a request that sets both a sequence
51    /// number and a digest.
52    pub digest: Option<Digest>,
53    pub checkpoint_viewed_at: u64,
54}
55
56/// DataLoader key for fetching a `Checkpoint` by its digest, optionally
57/// constrained by a consistency cursor.
58#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
59struct DigestKey {
60    pub digest: Digest,
61    pub checkpoint_viewed_at: u64,
62}
63
64#[derive(Clone)]
65pub(crate) struct Checkpoint {
66    /// Representation of transaction data in the Indexer's Store. The indexer
67    /// stores the transaction data and its effects together, in one table.
68    pub stored: StoredCheckpoint,
69    /// The checkpoint_sequence_number at which this was viewed at.
70    pub checkpoint_viewed_at: u64,
71}
72
73pub(crate) type Cursor = cursor::JsonCursor<CheckpointCursor>;
74type Query<ST, GB> = data::Query<ST, checkpoints::table, GB>;
75
76/// The cursor returned for each `Checkpoint` in a connection's page of results.
77/// The `checkpoint_viewed_at` will set the consistent upper bound for
78/// subsequent queries made on this cursor.
79#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
80pub(crate) struct CheckpointCursor {
81    /// The checkpoint sequence number this was viewed at.
82    #[serde(rename = "c")]
83    pub checkpoint_viewed_at: u64,
84    #[serde(rename = "s")]
85    pub sequence_number: u64,
86}
87
88/// Checkpoints contain finalized transactions and are used for node
89/// synchronization and global transaction ordering.
90#[Object]
91impl Checkpoint {
92    /// A 32-byte hash that uniquely identifies the checkpoint contents, encoded
93    /// in Base58. This hash can be used to verify checkpoint contents by
94    /// checking signatures against the committee, Hashing contents to match
95    /// digest, and checking that the previous checkpoint digest matches.
96    #[graphql(complexity = 0)]
97    async fn digest(&self) -> Result<String> {
98        Ok(self.digest_impl().extend()?.base58_encode())
99    }
100
101    /// This checkpoint's position in the total order of finalized checkpoints,
102    /// agreed upon by consensus.
103    #[graphql(complexity = 0)]
104    async fn sequence_number(&self) -> UInt53 {
105        self.sequence_number_impl().into()
106    }
107
108    /// The timestamp at which the checkpoint is agreed to have happened
109    /// according to consensus. Transactions that access time in this
110    /// checkpoint will observe this timestamp.
111    #[graphql(complexity = 0)]
112    async fn timestamp(&self) -> Result<DateTime> {
113        DateTime::from_ms(self.stored.timestamp_ms).extend()
114    }
115
116    /// This is an aggregation of signatures from a quorum of validators for the
117    /// checkpoint proposal.
118    #[graphql(complexity = 0)]
119    async fn validator_signatures(&self) -> Base64 {
120        Base64::from(&self.stored.validator_signature)
121    }
122
123    /// The digest of the checkpoint at the previous sequence number.
124    #[graphql(complexity = 0)]
125    async fn previous_checkpoint_digest(&self) -> Option<String> {
126        self.stored
127            .previous_checkpoint_digest
128            .as_ref()
129            .map(Base58::encode)
130    }
131
132    /// The total number of transaction blocks in the network by the end of this
133    /// checkpoint.
134    #[graphql(complexity = 0)]
135    async fn network_total_transactions(&self) -> Option<UInt53> {
136        Some(self.network_total_transactions_impl().into())
137    }
138
139    /// The computation cost, storage cost, storage rebate, and non-refundable
140    /// storage fee accumulated during this epoch, up to and including this
141    /// checkpoint. These values increase monotonically across checkpoints
142    /// in the same epoch, and reset on epoch boundaries.
143    #[graphql(complexity = 0)]
144    async fn rolling_gas_summary(&self) -> Option<GasCostSummary> {
145        Some(GasCostSummary {
146            computation_cost: self.stored.computation_cost as u64,
147            computation_cost_burned: self.stored.computation_cost_burned(),
148            storage_cost: self.stored.storage_cost as u64,
149            storage_rebate: self.stored.storage_rebate as u64,
150            non_refundable_storage_fee: self.stored.non_refundable_storage_fee as u64,
151        })
152    }
153
154    /// The epoch this checkpoint is part of.
155    async fn epoch(&self, ctx: &Context<'_>) -> Result<Option<Epoch>> {
156        Epoch::query(
157            ctx,
158            Some(self.stored.epoch as u64),
159            self.checkpoint_viewed_at,
160        )
161        .await
162        .extend()
163    }
164
165    /// Transactions in this checkpoint.
166    ///
167    /// `scanLimit` restricts the number of candidate transactions scanned when
168    /// gathering a page of results. It is required for queries that apply
169    /// more than two complex filters (on function, kind, sender, recipient,
170    /// input object, changed object, or ids), and can be at most
171    /// `serviceConfig.maxScanLimit`.
172    ///
173    /// When the scan limit is reached the page will be returned even if it has
174    /// fewer than `first` results when paginating forward (`last` when
175    /// paginating backwards). If there are more transactions to scan,
176    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
177    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
178    /// to the last transaction that was scanned as opposed to the last (or
179    /// first) transaction in the page.
180    ///
181    /// Requesting the next (or previous) page after this cursor will resume the
182    /// search, scanning the next `scanLimit` many transactions in the
183    /// direction of pagination, and so on until all transactions in the
184    /// scanning range have been visited.
185    ///
186    /// By default, the scanning range consists of all transactions in this
187    /// checkpoint.
188    #[graphql(
189        complexity = "first.or(last).unwrap_or(DEFAULT_PAGE_SIZE as u64) as usize * child_complexity"
190    )]
191    async fn transaction_blocks(
192        &self,
193        ctx: &Context<'_>,
194        first: Option<u64>,
195        after: Option<transaction_block::Cursor>,
196        last: Option<u64>,
197        before: Option<transaction_block::Cursor>,
198        filter: Option<TransactionBlockFilter>,
199        scan_limit: Option<u64>,
200    ) -> Result<ScanConnection<String, TransactionBlock>> {
201        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
202
203        let Some(filter) = filter
204            .unwrap_or_default()
205            .intersect(TransactionBlockFilter {
206                at_checkpoint: Some(UInt53::from(self.stored.sequence_number as u64)),
207                ..Default::default()
208            })
209        else {
210            return Ok(ScanConnection::new(false, false));
211        };
212
213        TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
214            .await
215            .extend()
216    }
217}
218
219impl CheckpointId {
220    pub(crate) fn by_seq_num(seq_num: u64) -> Self {
221        CheckpointId {
222            sequence_number: Some(seq_num.into()),
223            digest: None,
224        }
225    }
226}
227
228impl Checkpoint {
229    pub(crate) fn sequence_number_impl(&self) -> u64 {
230        self.stored.sequence_number as u64
231    }
232
233    pub(crate) fn network_total_transactions_impl(&self) -> u64 {
234        self.stored.network_total_transactions as u64
235    }
236
237    pub(crate) fn digest_impl(&self) -> Result<CheckpointDigest, Error> {
238        CheckpointDigest::try_from(self.stored.checkpoint_digest.clone())
239            .map_err(|e| Error::Internal(format!("Failed to deserialize checkpoint digest: {e}")))
240    }
241
242    /// Look up a `Checkpoint` in the database, filtered by either sequence
243    /// number or digest. If both filters are supplied they will both be
244    /// applied. If none are supplied, the latest checkpoint is fetched.
245    pub(crate) async fn query(
246        ctx: &Context<'_>,
247        filter: CheckpointId,
248        checkpoint_viewed_at: u64,
249    ) -> Result<Option<Self>, Error> {
250        match filter {
251            CheckpointId {
252                sequence_number: Some(sequence_number),
253                digest,
254            } => {
255                let DataLoader(dl) = ctx.data_unchecked();
256                dl.load_one(SeqNumKey {
257                    sequence_number: sequence_number.into(),
258                    digest,
259                    checkpoint_viewed_at,
260                })
261                .await
262            }
263
264            CheckpointId {
265                sequence_number: None,
266                digest: Some(digest),
267            } => {
268                let DataLoader(dl) = ctx.data_unchecked();
269                dl.load_one(DigestKey {
270                    digest,
271                    checkpoint_viewed_at,
272                })
273                .await
274            }
275
276            CheckpointId {
277                sequence_number: None,
278                digest: None,
279            } => Checkpoint::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await,
280        }
281    }
282
283    /// Look up the latest `Checkpoint` from the database, optionally filtered
284    /// by a consistency cursor (querying for a consistency cursor in the
285    /// past looks for the latest checkpoint as of that cursor).
286    async fn query_latest_at(db: &Db, checkpoint_viewed_at: u64) -> Result<Option<Self>, Error> {
287        use checkpoints::dsl;
288
289        let stored: Option<StoredCheckpoint> = db
290            .execute(move |conn| {
291                conn.first(move || {
292                    dsl::checkpoints
293                        .filter(dsl::sequence_number.le(checkpoint_viewed_at as i64))
294                        .order_by(dsl::sequence_number.desc())
295                })
296                .optional()
297            })
298            .await
299            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?;
300
301        Ok(stored.map(|stored| Checkpoint {
302            stored,
303            checkpoint_viewed_at,
304        }))
305    }
306
307    /// Look up a `Checkpoint` in the database and retrieve its `timestamp_ms`
308    /// field. This method takes a connection, so that it can be used within
309    /// a transaction.
310    pub(crate) fn query_timestamp(
311        conn: &mut Conn<'_>,
312        seq_num: u64,
313    ) -> Result<u64, diesel::result::Error> {
314        use checkpoints::dsl;
315
316        let stored: i64 = conn.first(|| {
317            dsl::checkpoints
318                .select(dsl::timestamp_ms)
319                .filter(dsl::sequence_number.eq(seq_num as i64))
320        })?;
321
322        Ok(stored as u64)
323    }
324
325    /// Query the database for a `page` of checkpoints. The Page uses the
326    /// checkpoint sequence number of the stored checkpoint and the
327    /// checkpoint at which this was viewed at as the cursor, and
328    /// can optionally be further `filter`-ed by an epoch number (to only return
329    /// checkpoints within that epoch).
330    ///
331    /// The `checkpoint_viewed_at` parameter represents the checkpoint sequence
332    /// number at which this page was queried for. Each entity returned in
333    /// the connection will inherit this checkpoint, so that when viewing
334    /// that entity's state, it will be from the reference of this
335    /// checkpoint_viewed_at parameter.
336    ///
337    /// If the `Page<Cursor>` is set, then this function will defer to the
338    /// `checkpoint_viewed_at` in the cursor if they are consistent.
339    pub(crate) async fn paginate(
340        db: &Db,
341        page: Page<Cursor>,
342        filter: Option<u64>,
343        checkpoint_viewed_at: u64,
344    ) -> Result<Connection<String, Checkpoint>, Error> {
345        use checkpoints::dsl;
346        let cursor_viewed_at = page.validate_cursor_consistency()?;
347        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
348
349        let (prev, next, results) = db
350            .execute(move |conn| {
351                page.paginate_query::<StoredCheckpoint, _, _, _>(
352                    conn,
353                    checkpoint_viewed_at,
354                    move || {
355                        let mut query = dsl::checkpoints.into_boxed();
356                        query = query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
357                        if let Some(epoch) = filter {
358                            query = query.filter(dsl::epoch.eq(epoch as i64));
359                        }
360                        query
361                    },
362                )
363            })
364            .await?;
365
366        // The "checkpoint viewed at" sets a consistent upper bound for the nested
367        // queries.
368        let mut conn = Connection::new(prev, next);
369        for stored in results {
370            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
371            conn.edges.push(Edge::new(
372                cursor,
373                Checkpoint {
374                    stored,
375                    checkpoint_viewed_at,
376                },
377            ));
378        }
379
380        Ok(conn)
381    }
382}
383
384impl Paginated<Cursor> for StoredCheckpoint {
385    type Source = checkpoints::table;
386
387    fn filter_ge<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
388        query.filter(checkpoints::dsl::sequence_number.ge(cursor.sequence_number as i64))
389    }
390
391    fn filter_le<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
392        query.filter(checkpoints::dsl::sequence_number.le(cursor.sequence_number as i64))
393    }
394
395    fn order<ST, GB>(asc: bool, query: Query<ST, GB>) -> Query<ST, GB> {
396        use checkpoints::dsl;
397        if asc {
398            query.order(dsl::sequence_number)
399        } else {
400            query.order(dsl::sequence_number.desc())
401        }
402    }
403}
404
405impl Target<Cursor> for StoredCheckpoint {
406    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
407        Cursor::new(CheckpointCursor {
408            checkpoint_viewed_at,
409            sequence_number: self.sequence_number as u64,
410        })
411    }
412}
413
414impl Checkpointed for Cursor {
415    fn checkpoint_viewed_at(&self) -> u64 {
416        self.checkpoint_viewed_at
417    }
418}
419
420impl ScanLimited for Cursor {}
421
422impl Loader<SeqNumKey> for Db {
423    type Value = Checkpoint;
424    type Error = Error;
425
426    async fn load(&self, keys: &[SeqNumKey]) -> Result<HashMap<SeqNumKey, Checkpoint>, Error> {
427        use checkpoints::dsl;
428
429        let checkpoint_ids: BTreeSet<_> = keys
430            .iter()
431            .filter_map(|key| {
432                // Filter out keys querying for checkpoints after their own consistency cursor.
433                (key.checkpoint_viewed_at >= key.sequence_number)
434                    .then_some(key.sequence_number as i64)
435            })
436            .collect();
437
438        let checkpoints: Vec<StoredCheckpoint> = self
439            .execute(move |conn| {
440                conn.results(move || {
441                    dsl::checkpoints
442                        .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
443                })
444            })
445            .await
446            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
447
448        let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
449            .into_iter()
450            .map(|stored| (stored.sequence_number as u64, stored))
451            .collect();
452
453        Ok(keys
454            .iter()
455            .filter_map(|key| {
456                let stored = checkpoint_id_to_stored.get(&key.sequence_number).cloned()?;
457                let checkpoint = Checkpoint {
458                    stored,
459                    checkpoint_viewed_at: key.checkpoint_viewed_at,
460                };
461
462                let digest = &checkpoint.stored.checkpoint_digest;
463                if matches!(key.digest, Some(d) if d.as_slice() != digest) {
464                    None
465                } else {
466                    Some((*key, checkpoint))
467                }
468            })
469            .collect())
470    }
471}
472
473impl Loader<DigestKey> for Db {
474    type Value = Checkpoint;
475    type Error = Error;
476
477    async fn load(&self, keys: &[DigestKey]) -> Result<HashMap<DigestKey, Checkpoint>, Error> {
478        use checkpoints::dsl;
479
480        let digests: BTreeSet<_> = keys.iter().map(|key| key.digest.to_vec()).collect();
481
482        let checkpoints: Vec<StoredCheckpoint> = self
483            .execute(move |conn| {
484                conn.results(move || {
485                    dsl::checkpoints.filter(dsl::checkpoint_digest.eq_any(digests.iter().cloned()))
486                })
487            })
488            .await
489            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
490
491        let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
492            .into_iter()
493            .map(|stored| (stored.checkpoint_digest.clone(), stored))
494            .collect();
495
496        Ok(keys
497            .iter()
498            .filter_map(|key| {
499                let DigestKey {
500                    digest,
501                    checkpoint_viewed_at,
502                } = *key;
503
504                let stored = checkpoint_id_to_stored.get(digest.as_slice()).cloned()?;
505
506                let checkpoint = Checkpoint {
507                    stored,
508                    checkpoint_viewed_at,
509                };
510
511                // Filter by key's checkpoint viewed at here. Doing this in memory because it
512                // should be quite rare that this query actually filters
513                // something, but encoding it in SQL is complicated.
514                let seq_num = checkpoint.stored.sequence_number as u64;
515                (checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
516            })
517            .collect())
518    }
519}