iota_graphql_rpc/types/
epoch.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{connection::Connection, dataloader::Loader, *};
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
9use fastcrypto::encoding::{Base58, Encoding};
10use iota_indexer::{models::epoch::QueryableEpochInfo, schema::epochs};
11use iota_types::messages_checkpoint::CheckpointCommitment as EpochCommitment;
12
13use crate::{
14    connection::ScanConnection,
15    context_data::db_data_provider::PgManager,
16    data::{DataLoader, Db, DbConnection, QueryExecutor},
17    error::Error,
18    server::watermark_task::Watermark,
19    types::{
20        big_int::BigInt,
21        checkpoint::{self, Checkpoint},
22        cursor::Page,
23        date_time::DateTime,
24        protocol_config::ProtocolConfigs,
25        system_state_summary::{NativeStateValidatorInfo, SystemStateSummary},
26        transaction_block::{self, TransactionBlock, TransactionBlockFilter},
27        uint53::UInt53,
28        validator_set::ValidatorSet,
29    },
30};
31
32#[derive(Clone)]
33pub(crate) struct Epoch {
34    pub stored: QueryableEpochInfo,
35    pub checkpoint_viewed_at: u64,
36}
37
38/// `DataLoader` key for fetching an `Epoch` by its ID, optionally constrained
39/// by a consistency cursor.
40#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
41struct EpochKey {
42    pub epoch_id: u64,
43    pub checkpoint_viewed_at: u64,
44}
45
46/// Operation of the IOTA network is temporally partitioned into non-overlapping
47/// epochs, and the network aims to keep epochs roughly the same duration as
48/// each other. During a particular epoch the following data is fixed:
49///
50/// - the protocol version
51/// - the reference gas price
52/// - the set of participating validators
53#[Object]
54impl Epoch {
55    /// The epoch's id as a sequence number that starts at 0 and is incremented
56    /// by one at every epoch change.
57    async fn epoch_id(&self) -> UInt53 {
58        UInt53::from(self.stored.epoch as u64)
59    }
60
61    /// The minimum gas price that a quorum of validators are guaranteed to sign
62    /// a transaction for.
63    async fn reference_gas_price(&self) -> Option<BigInt> {
64        Some(BigInt::from(self.stored.reference_gas_price as u64))
65    }
66
67    /// Validator related properties, including the active validators.
68    ///
69    /// For epochs other than the current the data provided refer to the start
70    /// of the epoch.
71    async fn validator_set(&self, ctx: &Context<'_>) -> Result<Option<ValidatorSet>> {
72        let system_state = ctx
73            .data_unchecked::<PgManager>()
74            .fetch_iota_system_state(Some(self.stored.epoch as u64))
75            .await?;
76
77        let validator_set = NativeStateValidatorInfo::from(system_state).into_validator_set(
78            self.stored.total_stake as u64,
79            self.checkpoint_viewed_at,
80            self.stored.epoch as u64,
81        );
82        Ok(Some(validator_set))
83    }
84
85    /// The epoch's starting timestamp.
86    async fn start_timestamp(&self) -> Result<DateTime, Error> {
87        DateTime::from_ms(self.stored.epoch_start_timestamp)
88    }
89
90    /// The epoch's ending timestamp.
91    async fn end_timestamp(&self) -> Result<Option<DateTime>, Error> {
92        self.stored
93            .epoch_end_timestamp
94            .map(DateTime::from_ms)
95            .transpose()
96    }
97
98    /// The total number of checkpoints in this epoch.
99    async fn total_checkpoints(&self, ctx: &Context<'_>) -> Result<Option<UInt53>> {
100        let last = match self.stored.last_checkpoint_id {
101            Some(last) => last as u64,
102            None => {
103                let Watermark { checkpoint, .. } = *ctx.data_unchecked();
104                checkpoint
105            }
106        };
107
108        Ok(Some(UInt53::from(
109            last - self.stored.first_checkpoint_id as u64 + 1,
110        )))
111    }
112
113    /// The total number of transaction blocks in this epoch.
114    async fn total_transactions(&self) -> Result<Option<UInt53>> {
115        // TODO: this currently returns None for the current epoch. Fix this.
116        Ok(self
117            .stored
118            .epoch_total_transactions()
119            .map(|v| UInt53::from(v as u64)))
120    }
121
122    /// The total amount of gas fees (in NANOS) that were paid in this epoch.
123    async fn total_gas_fees(&self) -> Option<BigInt> {
124        self.stored.total_gas_fees.map(BigInt::from)
125    }
126
127    /// The total NANOS rewarded as stake.
128    async fn total_stake_rewards(&self) -> Option<BigInt> {
129        self.stored
130            .total_stake_rewards_distributed
131            .map(BigInt::from)
132    }
133
134    /// The storage fund available in this epoch.
135    /// This fund is used to redistribute storage fees from past transactions
136    /// to future validators.
137    async fn fund_size(&self) -> Option<BigInt> {
138        Some(BigInt::from(self.stored.storage_fund_balance))
139    }
140
141    /// The difference between the fund inflow and outflow, representing
142    /// the net amount of storage fees accumulated in this epoch.
143    async fn net_inflow(&self) -> Option<BigInt> {
144        if let (Some(fund_inflow), Some(fund_outflow)) =
145            (self.stored.storage_charge, self.stored.storage_rebate)
146        {
147            Some(BigInt::from(fund_inflow - fund_outflow))
148        } else {
149            None
150        }
151    }
152
153    /// The storage fees paid for transactions executed during the epoch.
154    async fn fund_inflow(&self) -> Option<BigInt> {
155        self.stored.storage_charge.map(BigInt::from)
156    }
157
158    /// The storage fee rebates paid to users who deleted the data associated
159    /// with past transactions.
160    async fn fund_outflow(&self) -> Option<BigInt> {
161        self.stored.storage_rebate.map(BigInt::from)
162    }
163
164    /// The epoch's corresponding protocol configuration, including the feature
165    /// flags and the configuration options.
166    async fn protocol_configs(&self, ctx: &Context<'_>) -> Result<ProtocolConfigs> {
167        ProtocolConfigs::query(ctx.data_unchecked(), Some(self.protocol_version()))
168            .await
169            .extend()
170    }
171
172    #[graphql(flatten)]
173    async fn system_state_summary(&self, ctx: &Context<'_>) -> Result<SystemStateSummary> {
174        let state = ctx
175            .data_unchecked::<PgManager>()
176            .fetch_iota_system_state(Some(self.stored.epoch as u64))
177            .await?;
178        Ok(SystemStateSummary { native: state })
179    }
180
181    /// A commitment by the committee at the end of epoch on the contents of the
182    /// live object set at that time. This can be used to verify state
183    /// snapshots.
184    async fn live_object_set_digest(&self) -> Result<Option<String>> {
185        let Some(commitments) = self.stored.epoch_commitments.as_ref() else {
186            return Ok(None);
187        };
188        let commitments: Vec<EpochCommitment> = bcs::from_bytes(commitments).map_err(|e| {
189            Error::Internal(format!("Error deserializing commitments: {e}")).extend()
190        })?;
191
192        let digest = commitments.into_iter().next().map(|commitment| {
193            let EpochCommitment::ECMHLiveObjectSetDigest(digest) = commitment;
194            Base58::encode(digest.digest.into_inner())
195        });
196
197        Ok(digest)
198    }
199
200    /// The epoch's corresponding checkpoints.
201    async fn checkpoints(
202        &self,
203        ctx: &Context<'_>,
204        first: Option<u64>,
205        after: Option<checkpoint::Cursor>,
206        last: Option<u64>,
207        before: Option<checkpoint::Cursor>,
208    ) -> Result<Connection<String, Checkpoint>> {
209        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
210        let epoch = self.stored.epoch as u64;
211        Checkpoint::paginate(
212            ctx.data_unchecked(),
213            page,
214            Some(epoch),
215            self.checkpoint_viewed_at,
216        )
217        .await
218        .extend()
219    }
220
221    /// The epoch's corresponding transaction blocks.
222    ///
223    /// `scanLimit` restricts the number of candidate transactions scanned when
224    /// gathering a page of results. It is required for queries that apply
225    /// more than two complex filters (on function, kind, sender, recipient,
226    /// input object, changed object, or ids), and can be at most
227    /// `serviceConfig.maxScanLimit`.
228    ///
229    /// When the scan limit is reached the page will be returned even if it has
230    /// fewer than `first` results when paginating forward (`last` when
231    /// paginating backwards). If there are more transactions to scan,
232    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
233    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
234    /// to the last transaction that was scanned as opposed to the last (or
235    /// first) transaction in the page.
236    ///
237    /// Requesting the next (or previous) page after this cursor will resume the
238    /// search, scanning the next `scanLimit` many transactions in the
239    /// direction of pagination, and so on until all transactions in the
240    /// scanning range have been visited.
241    ///
242    /// By default, the scanning range consists of all transactions in this
243    /// epoch.
244    async fn transaction_blocks(
245        &self,
246        ctx: &Context<'_>,
247        first: Option<u64>,
248        after: Option<transaction_block::Cursor>,
249        last: Option<u64>,
250        before: Option<transaction_block::Cursor>,
251        filter: Option<TransactionBlockFilter>,
252        scan_limit: Option<u64>,
253    ) -> Result<ScanConnection<String, TransactionBlock>> {
254        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
255
256        let Some(filter) = filter
257            .unwrap_or_default()
258            .intersect(TransactionBlockFilter {
259                // If `first_checkpoint_id` is 0, we include the 0th checkpoint by leaving it None
260                after_checkpoint: (self.stored.first_checkpoint_id > 0)
261                    .then(|| UInt53::from(self.stored.first_checkpoint_id as u64 - 1)),
262                before_checkpoint: self
263                    .stored
264                    .last_checkpoint_id
265                    .map(|id| UInt53::from(id as u64 + 1)),
266                ..Default::default()
267            })
268        else {
269            return Ok(ScanConnection::new(false, false));
270        };
271
272        TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
273            .await
274            .extend()
275    }
276}
277
278impl Epoch {
279    /// The epoch's protocol version.
280    pub(crate) fn protocol_version(&self) -> u64 {
281        self.stored.protocol_version as u64
282    }
283
284    /// Look up an `Epoch` in the database, optionally filtered by its Epoch ID.
285    /// If no ID is supplied, defaults to fetching the latest epoch.
286    pub(crate) async fn query(
287        ctx: &Context<'_>,
288        filter: Option<u64>,
289        checkpoint_viewed_at: u64,
290    ) -> Result<Option<Self>, Error> {
291        if let Some(epoch_id) = filter {
292            let DataLoader(dl) = ctx.data_unchecked();
293            dl.load_one(EpochKey {
294                epoch_id,
295                checkpoint_viewed_at,
296            })
297            .await
298        } else {
299            Self::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await
300        }
301    }
302
303    /// Look up the latest `Epoch` from the database, optionally filtered by a
304    /// consistency cursor (querying for a consistency cursor in the past
305    /// looks for the latest epoch as of that cursor).
306    pub(crate) async fn query_latest_at(
307        db: &Db,
308        checkpoint_viewed_at: u64,
309    ) -> Result<Option<Self>, Error> {
310        use epochs::dsl;
311
312        let stored: Option<QueryableEpochInfo> = db
313            .execute(move |conn| {
314                conn.first(move || {
315                    // Bound the query on `checkpoint_viewed_at` by filtering for the epoch
316                    // whose `first_checkpoint_id <= checkpoint_viewed_at`, selecting the epoch
317                    // with the largest `first_checkpoint_id` among the filtered set.
318                    dsl::epochs
319                        .select(QueryableEpochInfo::as_select())
320                        .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
321                        .order_by(dsl::first_checkpoint_id.desc())
322                })
323                .optional()
324            })
325            .await
326            .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?;
327
328        Ok(stored.map(|stored| Epoch {
329            stored,
330            checkpoint_viewed_at,
331        }))
332    }
333}
334
335impl Loader<EpochKey> for Db {
336    type Value = Epoch;
337    type Error = Error;
338
339    async fn load(&self, keys: &[EpochKey]) -> Result<HashMap<EpochKey, Epoch>, Error> {
340        use epochs::dsl;
341
342        let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect();
343        let epochs: Vec<QueryableEpochInfo> = self
344            .execute_repeatable(move |conn| {
345                conn.results(move || {
346                    dsl::epochs
347                        .select(QueryableEpochInfo::as_select())
348                        .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned()))
349                })
350            })
351            .await
352            .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?;
353
354        let epoch_id_to_stored: BTreeMap<_, _> = epochs
355            .into_iter()
356            .map(|stored| (stored.epoch as u64, stored))
357            .collect();
358
359        Ok(keys
360            .iter()
361            .filter_map(|key| {
362                let stored = epoch_id_to_stored.get(&key.epoch_id).cloned()?;
363                let epoch = Epoch {
364                    stored,
365                    checkpoint_viewed_at: key.checkpoint_viewed_at,
366                };
367
368                // We filter by checkpoint viewed at in memory because it should be quite rare
369                // that this query actually filters something (only in edge
370                // cases), and not trying to encode it in the SQL query makes
371                // the query much simpler and therefore easier for
372                // the DB to plan.
373                let start = epoch.stored.first_checkpoint_id as u64;
374                (key.checkpoint_viewed_at >= start).then_some((*key, epoch))
375            })
376            .collect())
377    }
378}