iota_graphql_rpc/types/
checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{
8    connection::{Connection, CursorType, Edge},
9    dataloader::Loader,
10    *,
11};
12use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
13use fastcrypto::encoding::{Base58, Encoding};
14use iota_indexer::{models::checkpoints::StoredCheckpoint, schema::checkpoints};
15use iota_types::messages_checkpoint::CheckpointDigest;
16use serde::{Deserialize, Serialize};
17
18use crate::{
19    connection::ScanConnection,
20    consistency::Checkpointed,
21    data::{self, Conn, DataLoader, Db, DbConnection, QueryExecutor},
22    error::Error,
23    types::{
24        base64::Base64,
25        cursor::{self, Page, Paginated, ScanLimited, Target},
26        date_time::DateTime,
27        digest::Digest,
28        epoch::Epoch,
29        gas::GasCostSummary,
30        transaction_block::{self, TransactionBlock, TransactionBlockFilter},
31        uint53::UInt53,
32    },
33};
34
35/// Filter either by the digest, or the sequence number, or neither, to get the
36/// latest checkpoint.
37#[derive(Default, InputObject)]
38pub(crate) struct CheckpointId {
39    pub digest: Option<Digest>,
40    pub sequence_number: Option<UInt53>,
41}
42
43/// `DataLoader` key for fetching a `Checkpoint` by its sequence number,
44/// constrained by a consistency cursor.
45#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
46struct SeqNumKey {
47    pub sequence_number: u64,
48    /// The digest is not used for fetching, but is used as an additional
49    /// filter, to correctly implement a request that sets both a sequence
50    /// number and a digest.
51    pub digest: Option<Digest>,
52    pub checkpoint_viewed_at: u64,
53}
54
55/// DataLoader key for fetching a `Checkpoint` by its digest, optionally
56/// constrained by a consistency cursor.
57#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
58struct DigestKey {
59    pub digest: Digest,
60    pub checkpoint_viewed_at: u64,
61}
62
63#[derive(Clone)]
64pub(crate) struct Checkpoint {
65    /// Representation of transaction data in the Indexer's Store. The indexer
66    /// stores the transaction data and its effects together, in one table.
67    pub stored: StoredCheckpoint,
68    /// The checkpoint_sequence_number at which this was viewed at.
69    pub checkpoint_viewed_at: u64,
70}
71
72pub(crate) type Cursor = cursor::JsonCursor<CheckpointCursor>;
73type Query<ST, GB> = data::Query<ST, checkpoints::table, GB>;
74
75/// The cursor returned for each `Checkpoint` in a connection's page of results.
76/// The `checkpoint_viewed_at` will set the consistent upper bound for
77/// subsequent queries made on this cursor.
78#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
79pub(crate) struct CheckpointCursor {
80    /// The checkpoint sequence number this was viewed at.
81    #[serde(rename = "c")]
82    pub checkpoint_viewed_at: u64,
83    #[serde(rename = "s")]
84    pub sequence_number: u64,
85}
86
87/// Checkpoints contain finalized transactions and are used for node
88/// synchronization and global transaction ordering.
89#[Object]
90impl Checkpoint {
91    /// A 32-byte hash that uniquely identifies the checkpoint contents, encoded
92    /// in Base58. This hash can be used to verify checkpoint contents by
93    /// checking signatures against the committee, Hashing contents to match
94    /// digest, and checking that the previous checkpoint digest matches.
95    async fn digest(&self) -> Result<String> {
96        Ok(self.digest_impl().extend()?.base58_encode())
97    }
98
99    /// This checkpoint's position in the total order of finalized checkpoints,
100    /// agreed upon by consensus.
101    async fn sequence_number(&self) -> UInt53 {
102        self.sequence_number_impl().into()
103    }
104
105    /// The timestamp at which the checkpoint is agreed to have happened
106    /// according to consensus. Transactions that access time in this
107    /// checkpoint will observe this timestamp.
108    async fn timestamp(&self) -> Result<DateTime> {
109        DateTime::from_ms(self.stored.timestamp_ms).extend()
110    }
111
112    /// This is an aggregation of signatures from a quorum of validators for the
113    /// checkpoint proposal.
114    async fn validator_signatures(&self) -> Base64 {
115        Base64::from(&self.stored.validator_signature)
116    }
117
118    /// The digest of the checkpoint at the previous sequence number.
119    async fn previous_checkpoint_digest(&self) -> Option<String> {
120        self.stored
121            .previous_checkpoint_digest
122            .as_ref()
123            .map(Base58::encode)
124    }
125
126    /// The total number of transaction blocks in the network by the end of this
127    /// checkpoint.
128    async fn network_total_transactions(&self) -> Option<UInt53> {
129        Some(self.network_total_transactions_impl().into())
130    }
131
132    /// The computation cost, storage cost, storage rebate, and non-refundable
133    /// storage fee accumulated during this epoch, up to and including this
134    /// checkpoint. These values increase monotonically across checkpoints
135    /// in the same epoch, and reset on epoch boundaries.
136    async fn rolling_gas_summary(&self) -> Option<GasCostSummary> {
137        Some(GasCostSummary {
138            computation_cost: self.stored.computation_cost as u64,
139            computation_cost_burned: self.stored.computation_cost_burned(),
140            storage_cost: self.stored.storage_cost as u64,
141            storage_rebate: self.stored.storage_rebate as u64,
142            non_refundable_storage_fee: self.stored.non_refundable_storage_fee as u64,
143        })
144    }
145
146    /// The epoch this checkpoint is part of.
147    async fn epoch(&self, ctx: &Context<'_>) -> Result<Option<Epoch>> {
148        Epoch::query(
149            ctx,
150            Some(self.stored.epoch as u64),
151            self.checkpoint_viewed_at,
152        )
153        .await
154        .extend()
155    }
156
157    /// Transactions in this checkpoint.
158    ///
159    /// `scanLimit` restricts the number of candidate transactions scanned when
160    /// gathering a page of results. It is required for queries that apply
161    /// more than two complex filters (on function, kind, sender, recipient,
162    /// input object, changed object, or ids), and can be at most
163    /// `serviceConfig.maxScanLimit`.
164    ///
165    /// When the scan limit is reached the page will be returned even if it has
166    /// fewer than `first` results when paginating forward (`last` when
167    /// paginating backwards). If there are more transactions to scan,
168    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
169    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
170    /// to the last transaction that was scanned as opposed to the last (or
171    /// first) transaction in the page.
172    ///
173    /// Requesting the next (or previous) page after this cursor will resume the
174    /// search, scanning the next `scanLimit` many transactions in the
175    /// direction of pagination, and so on until all transactions in the
176    /// scanning range have been visited.
177    ///
178    /// By default, the scanning range consists of all transactions in this
179    /// checkpoint.
180    async fn transaction_blocks(
181        &self,
182        ctx: &Context<'_>,
183        first: Option<u64>,
184        after: Option<transaction_block::Cursor>,
185        last: Option<u64>,
186        before: Option<transaction_block::Cursor>,
187        filter: Option<TransactionBlockFilter>,
188        scan_limit: Option<u64>,
189    ) -> Result<ScanConnection<String, TransactionBlock>> {
190        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
191
192        let Some(filter) = filter
193            .unwrap_or_default()
194            .intersect(TransactionBlockFilter {
195                at_checkpoint: Some(UInt53::from(self.stored.sequence_number as u64)),
196                ..Default::default()
197            })
198        else {
199            return Ok(ScanConnection::new(false, false));
200        };
201
202        TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
203            .await
204            .extend()
205    }
206}
207
208impl CheckpointId {
209    pub(crate) fn by_seq_num(seq_num: u64) -> Self {
210        CheckpointId {
211            sequence_number: Some(seq_num.into()),
212            digest: None,
213        }
214    }
215}
216
217impl Checkpoint {
218    pub(crate) fn sequence_number_impl(&self) -> u64 {
219        self.stored.sequence_number as u64
220    }
221
222    pub(crate) fn network_total_transactions_impl(&self) -> u64 {
223        self.stored.network_total_transactions as u64
224    }
225
226    pub(crate) fn digest_impl(&self) -> Result<CheckpointDigest, Error> {
227        CheckpointDigest::try_from(self.stored.checkpoint_digest.clone())
228            .map_err(|e| Error::Internal(format!("Failed to deserialize checkpoint digest: {e}")))
229    }
230
231    /// Look up a `Checkpoint` in the database, filtered by either sequence
232    /// number or digest. If both filters are supplied they will both be
233    /// applied. If none are supplied, the latest checkpoint is fetched.
234    pub(crate) async fn query(
235        ctx: &Context<'_>,
236        filter: CheckpointId,
237        checkpoint_viewed_at: u64,
238    ) -> Result<Option<Self>, Error> {
239        match filter {
240            CheckpointId {
241                sequence_number: Some(sequence_number),
242                digest,
243            } => {
244                let DataLoader(dl) = ctx.data_unchecked();
245                dl.load_one(SeqNumKey {
246                    sequence_number: sequence_number.into(),
247                    digest,
248                    checkpoint_viewed_at,
249                })
250                .await
251            }
252
253            CheckpointId {
254                sequence_number: None,
255                digest: Some(digest),
256            } => {
257                let DataLoader(dl) = ctx.data_unchecked();
258                dl.load_one(DigestKey {
259                    digest,
260                    checkpoint_viewed_at,
261                })
262                .await
263            }
264
265            CheckpointId {
266                sequence_number: None,
267                digest: None,
268            } => Checkpoint::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await,
269        }
270    }
271
272    /// Look up the latest `Checkpoint` from the database, optionally filtered
273    /// by a consistency cursor (querying for a consistency cursor in the
274    /// past looks for the latest checkpoint as of that cursor).
275    async fn query_latest_at(db: &Db, checkpoint_viewed_at: u64) -> Result<Option<Self>, Error> {
276        use checkpoints::dsl;
277
278        let stored: Option<StoredCheckpoint> = db
279            .execute(move |conn| {
280                conn.first(move || {
281                    dsl::checkpoints
282                        .filter(dsl::sequence_number.le(checkpoint_viewed_at as i64))
283                        .order_by(dsl::sequence_number.desc())
284                })
285                .optional()
286            })
287            .await
288            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?;
289
290        Ok(stored.map(|stored| Checkpoint {
291            stored,
292            checkpoint_viewed_at,
293        }))
294    }
295
296    /// Look up a `Checkpoint` in the database and retrieve its `timestamp_ms`
297    /// field. This method takes a connection, so that it can be used within
298    /// a transaction.
299    pub(crate) fn query_timestamp(
300        conn: &mut Conn<'_>,
301        seq_num: u64,
302    ) -> Result<u64, diesel::result::Error> {
303        use checkpoints::dsl;
304
305        let stored: i64 = conn.first(|| {
306            dsl::checkpoints
307                .select(dsl::timestamp_ms)
308                .filter(dsl::sequence_number.eq(seq_num as i64))
309        })?;
310
311        Ok(stored as u64)
312    }
313
314    /// Query the database for a `page` of checkpoints. The Page uses the
315    /// checkpoint sequence number of the stored checkpoint and the
316    /// checkpoint at which this was viewed at as the cursor, and
317    /// can optionally be further `filter`-ed by an epoch number (to only return
318    /// checkpoints within that epoch).
319    ///
320    /// The `checkpoint_viewed_at` parameter represents the checkpoint sequence
321    /// number at which this page was queried for. Each entity returned in
322    /// the connection will inherit this checkpoint, so that when viewing
323    /// that entity's state, it will be from the reference of this
324    /// checkpoint_viewed_at parameter.
325    ///
326    /// If the `Page<Cursor>` is set, then this function will defer to the
327    /// `checkpoint_viewed_at` in the cursor if they are consistent.
328    pub(crate) async fn paginate(
329        db: &Db,
330        page: Page<Cursor>,
331        filter: Option<u64>,
332        checkpoint_viewed_at: u64,
333    ) -> Result<Connection<String, Checkpoint>, Error> {
334        use checkpoints::dsl;
335        let cursor_viewed_at = page.validate_cursor_consistency()?;
336        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
337
338        let (prev, next, results) = db
339            .execute(move |conn| {
340                page.paginate_query::<StoredCheckpoint, _, _, _>(
341                    conn,
342                    checkpoint_viewed_at,
343                    move || {
344                        let mut query = dsl::checkpoints.into_boxed();
345                        query = query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
346                        if let Some(epoch) = filter {
347                            query = query.filter(dsl::epoch.eq(epoch as i64));
348                        }
349                        query
350                    },
351                )
352            })
353            .await?;
354
355        // The "checkpoint viewed at" sets a consistent upper bound for the nested
356        // queries.
357        let mut conn = Connection::new(prev, next);
358        for stored in results {
359            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
360            conn.edges.push(Edge::new(
361                cursor,
362                Checkpoint {
363                    stored,
364                    checkpoint_viewed_at,
365                },
366            ));
367        }
368
369        Ok(conn)
370    }
371}
372
373impl Paginated<Cursor> for StoredCheckpoint {
374    type Source = checkpoints::table;
375
376    fn filter_ge<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
377        query.filter(checkpoints::dsl::sequence_number.ge(cursor.sequence_number as i64))
378    }
379
380    fn filter_le<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
381        query.filter(checkpoints::dsl::sequence_number.le(cursor.sequence_number as i64))
382    }
383
384    fn order<ST, GB>(asc: bool, query: Query<ST, GB>) -> Query<ST, GB> {
385        use checkpoints::dsl;
386        if asc {
387            query.order(dsl::sequence_number)
388        } else {
389            query.order(dsl::sequence_number.desc())
390        }
391    }
392}
393
394impl Target<Cursor> for StoredCheckpoint {
395    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
396        Cursor::new(CheckpointCursor {
397            checkpoint_viewed_at,
398            sequence_number: self.sequence_number as u64,
399        })
400    }
401}
402
403impl Checkpointed for Cursor {
404    fn checkpoint_viewed_at(&self) -> u64 {
405        self.checkpoint_viewed_at
406    }
407}
408
409impl ScanLimited for Cursor {}
410
411impl Loader<SeqNumKey> for Db {
412    type Value = Checkpoint;
413    type Error = Error;
414
415    async fn load(&self, keys: &[SeqNumKey]) -> Result<HashMap<SeqNumKey, Checkpoint>, Error> {
416        use checkpoints::dsl;
417
418        let checkpoint_ids: BTreeSet<_> = keys
419            .iter()
420            .filter_map(|key| {
421                // Filter out keys querying for checkpoints after their own consistency cursor.
422                (key.checkpoint_viewed_at >= key.sequence_number)
423                    .then_some(key.sequence_number as i64)
424            })
425            .collect();
426
427        let checkpoints: Vec<StoredCheckpoint> = self
428            .execute(move |conn| {
429                conn.results(move || {
430                    dsl::checkpoints
431                        .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
432                })
433            })
434            .await
435            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
436
437        let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
438            .into_iter()
439            .map(|stored| (stored.sequence_number as u64, stored))
440            .collect();
441
442        Ok(keys
443            .iter()
444            .filter_map(|key| {
445                let stored = checkpoint_id_to_stored.get(&key.sequence_number).cloned()?;
446                let checkpoint = Checkpoint {
447                    stored,
448                    checkpoint_viewed_at: key.checkpoint_viewed_at,
449                };
450
451                let digest = &checkpoint.stored.checkpoint_digest;
452                if matches!(key.digest, Some(d) if d.as_slice() != digest) {
453                    None
454                } else {
455                    Some((*key, checkpoint))
456                }
457            })
458            .collect())
459    }
460}
461
462impl Loader<DigestKey> for Db {
463    type Value = Checkpoint;
464    type Error = Error;
465
466    async fn load(&self, keys: &[DigestKey]) -> Result<HashMap<DigestKey, Checkpoint>, Error> {
467        use checkpoints::dsl;
468
469        let digests: BTreeSet<_> = keys.iter().map(|key| key.digest.to_vec()).collect();
470
471        let checkpoints: Vec<StoredCheckpoint> = self
472            .execute(move |conn| {
473                conn.results(move || {
474                    dsl::checkpoints.filter(dsl::checkpoint_digest.eq_any(digests.iter().cloned()))
475                })
476            })
477            .await
478            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
479
480        let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
481            .into_iter()
482            .map(|stored| (stored.checkpoint_digest.clone(), stored))
483            .collect();
484
485        Ok(keys
486            .iter()
487            .filter_map(|key| {
488                let DigestKey {
489                    digest,
490                    checkpoint_viewed_at,
491                } = *key;
492
493                let stored = checkpoint_id_to_stored.get(digest.as_slice()).cloned()?;
494
495                let checkpoint = Checkpoint {
496                    stored,
497                    checkpoint_viewed_at,
498                };
499
500                // Filter by key's checkpoint viewed at here. Doing this in memory because it
501                // should be quite rare that this query actually filters
502                // something, but encoding it in SQL is complicated.
503                let seq_num = checkpoint.stored.sequence_number as u64;
504                (checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
505            })
506            .collect())
507    }
508}