iota_graphql_rpc/types/
epoch.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{connection::Connection, dataloader::Loader, *};
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
9use fastcrypto::encoding::{Base58, Encoding};
10use iota_indexer::{models::epoch::QueryableEpochInfo, schema::epochs};
11use iota_types::messages_checkpoint::CheckpointCommitment as EpochCommitment;
12
13use crate::{
14    config::DEFAULT_PAGE_SIZE,
15    connection::ScanConnection,
16    context_data::db_data_provider::PgManager,
17    data::{DataLoader, Db, DbConnection, QueryExecutor},
18    error::Error,
19    server::watermark_task::Watermark,
20    types::{
21        big_int::BigInt,
22        checkpoint::{self, Checkpoint},
23        cursor::Page,
24        date_time::DateTime,
25        protocol_config::ProtocolConfigs,
26        system_state_summary::{NativeStateValidatorInfo, SystemStateSummary},
27        transaction_block::{self, TransactionBlock, TransactionBlockFilter},
28        uint53::UInt53,
29        validator_set::ValidatorSet,
30    },
31};
32
33#[derive(Clone)]
34pub(crate) struct Epoch {
35    pub stored: QueryableEpochInfo,
36    pub checkpoint_viewed_at: u64,
37}
38
39/// `DataLoader` key for fetching an `Epoch` by its ID, optionally constrained
40/// by a consistency cursor.
41#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
42struct EpochKey {
43    pub epoch_id: u64,
44    pub checkpoint_viewed_at: u64,
45}
46
47/// Operation of the IOTA network is temporally partitioned into non-overlapping
48/// epochs, and the network aims to keep epochs roughly the same duration as
49/// each other. During a particular epoch the following data is fixed:
50///
51/// - the protocol version
52/// - the reference gas price
53/// - the set of participating validators
54#[Object]
55impl Epoch {
56    /// The epoch's id as a sequence number that starts at 0 and is incremented
57    /// by one at every epoch change.
58    async fn epoch_id(&self) -> UInt53 {
59        UInt53::from(self.stored.epoch as u64)
60    }
61
62    /// The minimum gas price that a quorum of validators are guaranteed to sign
63    /// a transaction for.
64    async fn reference_gas_price(&self) -> Option<BigInt> {
65        Some(BigInt::from(self.stored.reference_gas_price as u64))
66    }
67
68    /// Validator related properties, including the active validators.
69    ///
70    /// For epochs other than the current the data provided refer to the start
71    /// of the epoch.
72    async fn validator_set(&self, ctx: &Context<'_>) -> Result<Option<ValidatorSet>> {
73        let system_state = ctx
74            .data_unchecked::<PgManager>()
75            .fetch_iota_system_state(Some(self.stored.epoch as u64))
76            .await?;
77
78        let validator_set = NativeStateValidatorInfo::from(system_state).into_validator_set(
79            self.stored.total_stake as u64,
80            self.checkpoint_viewed_at,
81            self.stored.epoch as u64,
82        );
83        Ok(Some(validator_set))
84    }
85
86    /// The epoch's starting timestamp.
87    async fn start_timestamp(&self) -> Result<DateTime, Error> {
88        DateTime::from_ms(self.stored.epoch_start_timestamp)
89    }
90
91    /// The epoch's ending timestamp.
92    async fn end_timestamp(&self) -> Result<Option<DateTime>, Error> {
93        self.stored
94            .epoch_end_timestamp
95            .map(DateTime::from_ms)
96            .transpose()
97    }
98
99    /// The total number of checkpoints in this epoch.
100    async fn total_checkpoints(&self, ctx: &Context<'_>) -> Result<Option<UInt53>> {
101        let last = match self.stored.last_checkpoint_id {
102            Some(last) => last as u64,
103            None => {
104                let Watermark { checkpoint, .. } = *ctx.data_unchecked();
105                checkpoint
106            }
107        };
108
109        Ok(Some(UInt53::from(
110            last - self.stored.first_checkpoint_id as u64 + 1,
111        )))
112    }
113
114    /// The total number of transaction blocks in this epoch.
115    async fn total_transactions(&self) -> Result<Option<UInt53>> {
116        // TODO: this currently returns None for the current epoch. Fix this.
117        Ok(self
118            .stored
119            .epoch_total_transactions()
120            .map(|v| UInt53::from(v as u64)))
121    }
122
123    /// The total amount of gas fees (in NANOS) that were paid in this epoch.
124    async fn total_gas_fees(&self) -> Option<BigInt> {
125        self.stored.total_gas_fees.map(BigInt::from)
126    }
127
128    /// The total NANOS rewarded as stake.
129    async fn total_stake_rewards(&self) -> Option<BigInt> {
130        self.stored
131            .total_stake_rewards_distributed
132            .map(BigInt::from)
133    }
134
135    /// The storage fund available in this epoch.
136    /// This fund is used to redistribute storage fees from past transactions
137    /// to future validators.
138    async fn fund_size(&self) -> Option<BigInt> {
139        Some(BigInt::from(self.stored.storage_fund_balance))
140    }
141
142    /// The difference between the fund inflow and outflow, representing
143    /// the net amount of storage fees accumulated in this epoch.
144    async fn net_inflow(&self) -> Option<BigInt> {
145        if let (Some(fund_inflow), Some(fund_outflow)) =
146            (self.stored.storage_charge, self.stored.storage_rebate)
147        {
148            Some(BigInt::from(fund_inflow - fund_outflow))
149        } else {
150            None
151        }
152    }
153
154    /// The storage fees paid for transactions executed during the epoch.
155    async fn fund_inflow(&self) -> Option<BigInt> {
156        self.stored.storage_charge.map(BigInt::from)
157    }
158
159    /// The storage fee rebates paid to users who deleted the data associated
160    /// with past transactions.
161    async fn fund_outflow(&self) -> Option<BigInt> {
162        self.stored.storage_rebate.map(BigInt::from)
163    }
164
165    /// The epoch's corresponding protocol configuration, including the feature
166    /// flags and the configuration options.
167    async fn protocol_configs(&self, ctx: &Context<'_>) -> Result<ProtocolConfigs> {
168        ProtocolConfigs::query(ctx.data_unchecked(), Some(self.protocol_version()))
169            .await
170            .extend()
171    }
172
173    #[graphql(flatten)]
174    async fn system_state_summary(&self, ctx: &Context<'_>) -> Result<SystemStateSummary> {
175        let state = ctx
176            .data_unchecked::<PgManager>()
177            .fetch_iota_system_state(Some(self.stored.epoch as u64))
178            .await?;
179        Ok(SystemStateSummary { native: state })
180    }
181
182    /// A commitment by the committee at the end of epoch on the contents of the
183    /// live object set at that time. This can be used to verify state
184    /// snapshots.
185    async fn live_object_set_digest(&self) -> Result<Option<String>> {
186        let Some(commitments) = self.stored.epoch_commitments.as_ref() else {
187            return Ok(None);
188        };
189        let commitments: Vec<EpochCommitment> = bcs::from_bytes(commitments).map_err(|e| {
190            Error::Internal(format!("Error deserializing commitments: {e}")).extend()
191        })?;
192
193        let digest = commitments.into_iter().next().map(|commitment| {
194            let EpochCommitment::ECMHLiveObjectSetDigest(digest) = commitment;
195            Base58::encode(digest.digest.into_inner())
196        });
197
198        Ok(digest)
199    }
200
201    /// The epoch's corresponding checkpoints.
202    async fn checkpoints(
203        &self,
204        ctx: &Context<'_>,
205        first: Option<u64>,
206        after: Option<checkpoint::Cursor>,
207        last: Option<u64>,
208        before: Option<checkpoint::Cursor>,
209    ) -> Result<Connection<String, Checkpoint>> {
210        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
211        let epoch = self.stored.epoch as u64;
212        Checkpoint::paginate(
213            ctx.data_unchecked(),
214            page,
215            Some(epoch),
216            self.checkpoint_viewed_at,
217        )
218        .await
219        .extend()
220    }
221
222    /// The epoch's corresponding transaction blocks.
223    ///
224    /// `scanLimit` restricts the number of candidate transactions scanned when
225    /// gathering a page of results. It is required for queries that apply
226    /// more than two complex filters (on function, kind, sender, recipient,
227    /// input object, changed object, or ids), and can be at most
228    /// `serviceConfig.maxScanLimit`.
229    ///
230    /// When the scan limit is reached the page will be returned even if it has
231    /// fewer than `first` results when paginating forward (`last` when
232    /// paginating backwards). If there are more transactions to scan,
233    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
234    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
235    /// to the last transaction that was scanned as opposed to the last (or
236    /// first) transaction in the page.
237    ///
238    /// Requesting the next (or previous) page after this cursor will resume the
239    /// search, scanning the next `scanLimit` many transactions in the
240    /// direction of pagination, and so on until all transactions in the
241    /// scanning range have been visited.
242    ///
243    /// By default, the scanning range consists of all transactions in this
244    /// epoch.
245    #[graphql(
246        complexity = "first.or(last).unwrap_or(DEFAULT_PAGE_SIZE as u64) as usize * child_complexity"
247    )]
248    async fn transaction_blocks(
249        &self,
250        ctx: &Context<'_>,
251        first: Option<u64>,
252        after: Option<transaction_block::Cursor>,
253        last: Option<u64>,
254        before: Option<transaction_block::Cursor>,
255        filter: Option<TransactionBlockFilter>,
256        scan_limit: Option<u64>,
257    ) -> Result<ScanConnection<String, TransactionBlock>> {
258        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
259
260        let Some(filter) = filter
261            .unwrap_or_default()
262            .intersect(TransactionBlockFilter {
263                // If `first_checkpoint_id` is 0, we include the 0th checkpoint by leaving it None
264                after_checkpoint: (self.stored.first_checkpoint_id > 0)
265                    .then(|| UInt53::from(self.stored.first_checkpoint_id as u64 - 1)),
266                before_checkpoint: self
267                    .stored
268                    .last_checkpoint_id
269                    .map(|id| UInt53::from(id as u64 + 1)),
270                ..Default::default()
271            })
272        else {
273            return Ok(ScanConnection::new(false, false));
274        };
275
276        TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
277            .await
278            .extend()
279    }
280}
281
282impl Epoch {
283    /// The epoch's protocol version.
284    pub(crate) fn protocol_version(&self) -> u64 {
285        self.stored.protocol_version as u64
286    }
287
288    /// Look up an `Epoch` in the database, optionally filtered by its Epoch ID.
289    /// If no ID is supplied, defaults to fetching the latest epoch.
290    pub(crate) async fn query(
291        ctx: &Context<'_>,
292        filter: Option<u64>,
293        checkpoint_viewed_at: u64,
294    ) -> Result<Option<Self>, Error> {
295        if let Some(epoch_id) = filter {
296            let DataLoader(dl) = ctx.data_unchecked();
297            dl.load_one(EpochKey {
298                epoch_id,
299                checkpoint_viewed_at,
300            })
301            .await
302        } else {
303            Self::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await
304        }
305    }
306
307    /// Look up the latest `Epoch` from the database, optionally filtered by a
308    /// consistency cursor (querying for a consistency cursor in the past
309    /// looks for the latest epoch as of that cursor).
310    pub(crate) async fn query_latest_at(
311        db: &Db,
312        checkpoint_viewed_at: u64,
313    ) -> Result<Option<Self>, Error> {
314        use epochs::dsl;
315
316        let stored: Option<QueryableEpochInfo> = db
317            .execute(move |conn| {
318                conn.first(move || {
319                    // Bound the query on `checkpoint_viewed_at` by filtering for the epoch
320                    // whose `first_checkpoint_id <= checkpoint_viewed_at`, selecting the epoch
321                    // with the largest `first_checkpoint_id` among the filtered set.
322                    dsl::epochs
323                        .select(QueryableEpochInfo::as_select())
324                        .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
325                        .order_by(dsl::first_checkpoint_id.desc())
326                })
327                .optional()
328            })
329            .await
330            .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?;
331
332        Ok(stored.map(|stored| Epoch {
333            stored,
334            checkpoint_viewed_at,
335        }))
336    }
337}
338
339impl Loader<EpochKey> for Db {
340    type Value = Epoch;
341    type Error = Error;
342
343    async fn load(&self, keys: &[EpochKey]) -> Result<HashMap<EpochKey, Epoch>, Error> {
344        use epochs::dsl;
345
346        let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect();
347        let epochs: Vec<QueryableEpochInfo> = self
348            .execute_repeatable(move |conn| {
349                conn.results(move || {
350                    dsl::epochs
351                        .select(QueryableEpochInfo::as_select())
352                        .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned()))
353                })
354            })
355            .await
356            .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?;
357
358        let epoch_id_to_stored: BTreeMap<_, _> = epochs
359            .into_iter()
360            .map(|stored| (stored.epoch as u64, stored))
361            .collect();
362
363        Ok(keys
364            .iter()
365            .filter_map(|key| {
366                let stored = epoch_id_to_stored.get(&key.epoch_id).cloned()?;
367                let epoch = Epoch {
368                    stored,
369                    checkpoint_viewed_at: key.checkpoint_viewed_at,
370                };
371
372                // We filter by checkpoint viewed at in memory because it should be quite rare
373                // that this query actually filters something (only in edge
374                // cases), and not trying to encode it in the SQL query makes
375                // the query much simpler and therefore easier for
376                // the DB to plan.
377                let start = epoch.stored.first_checkpoint_id as u64;
378                (key.checkpoint_viewed_at >= start).then_some((*key, epoch))
379            })
380            .collect())
381    }
382}