iota_graphql_rpc/types/
epoch.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{connection::Connection, dataloader::Loader, *};
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
9use fastcrypto::encoding::{Base58, Encoding};
10use iota_indexer::{models::epoch::QueryableEpochInfo, schema::epochs};
11use iota_types::messages_checkpoint::CheckpointCommitment as EpochCommitment;
12
13use crate::{
14    connection::ScanConnection,
15    context_data::db_data_provider::PgManager,
16    data::{DataLoader, Db, DbConnection, QueryExecutor},
17    error::Error,
18    server::watermark_task::Watermark,
19    types::{
20        big_int::BigInt,
21        checkpoint::{self, Checkpoint},
22        cursor::Page,
23        date_time::DateTime,
24        protocol_config::ProtocolConfigs,
25        system_state_summary::{NativeStateValidatorInfo, SystemStateSummary},
26        transaction_block::{self, TransactionBlock, TransactionBlockFilter},
27        uint53::UInt53,
28        validator_set::ValidatorSet,
29    },
30};
31
32#[derive(Clone)]
33pub(crate) struct Epoch {
34    pub stored: QueryableEpochInfo,
35    pub checkpoint_viewed_at: u64,
36}
37
38/// `DataLoader` key for fetching an `Epoch` by its ID, optionally constrained
39/// by a consistency cursor.
40#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
41struct EpochKey {
42    pub epoch_id: u64,
43    pub checkpoint_viewed_at: u64,
44}
45
46/// Operation of the IOTA network is temporally partitioned into non-overlapping
47/// epochs, and the network aims to keep epochs roughly the same duration as
48/// each other. During a particular epoch the following data is fixed:
49///
50/// - the protocol version
51/// - the reference gas price
52/// - the set of participating validators
53#[Object]
54impl Epoch {
55    /// The epoch's id as a sequence number that starts at 0 and is incremented
56    /// by one at every epoch change.
57    async fn epoch_id(&self) -> UInt53 {
58        UInt53::from(self.stored.epoch as u64)
59    }
60
61    /// The minimum gas price that a quorum of validators are guaranteed to sign
62    /// a transaction for.
63    async fn reference_gas_price(&self) -> Option<BigInt> {
64        Some(BigInt::from(self.stored.reference_gas_price as u64))
65    }
66
67    /// Validator related properties, including the active validators.
68    async fn validator_set(&self, ctx: &Context<'_>) -> Result<Option<ValidatorSet>> {
69        let system_state = ctx
70            .data_unchecked::<PgManager>()
71            .fetch_iota_system_state(Some(self.stored.epoch as u64))
72            .await?;
73
74        let validator_set = NativeStateValidatorInfo::from(system_state).into_validator_set(
75            self.stored.total_stake as u64,
76            self.checkpoint_viewed_at,
77            self.stored.epoch as u64,
78        );
79        Ok(Some(validator_set))
80    }
81
82    /// The epoch's starting timestamp.
83    async fn start_timestamp(&self) -> Result<DateTime, Error> {
84        DateTime::from_ms(self.stored.epoch_start_timestamp)
85    }
86
87    /// The epoch's ending timestamp.
88    async fn end_timestamp(&self) -> Result<Option<DateTime>, Error> {
89        self.stored
90            .epoch_end_timestamp
91            .map(DateTime::from_ms)
92            .transpose()
93    }
94
95    /// The total number of checkpoints in this epoch.
96    async fn total_checkpoints(&self, ctx: &Context<'_>) -> Result<Option<UInt53>> {
97        let last = match self.stored.last_checkpoint_id {
98            Some(last) => last as u64,
99            None => {
100                let Watermark { checkpoint, .. } = *ctx.data_unchecked();
101                checkpoint
102            }
103        };
104
105        Ok(Some(UInt53::from(
106            last - self.stored.first_checkpoint_id as u64,
107        )))
108    }
109
110    /// The total number of transaction blocks in this epoch.
111    async fn total_transactions(&self) -> Result<Option<UInt53>> {
112        // TODO: this currently returns None for the current epoch. Fix this.
113        Ok(self
114            .stored
115            .epoch_total_transactions
116            .map(|v| UInt53::from(v as u64)))
117    }
118
119    /// The total amount of gas fees (in NANOS) that were paid in this epoch.
120    async fn total_gas_fees(&self) -> Option<BigInt> {
121        self.stored.total_gas_fees.map(BigInt::from)
122    }
123
124    /// The total NANOS rewarded as stake.
125    async fn total_stake_rewards(&self) -> Option<BigInt> {
126        self.stored
127            .total_stake_rewards_distributed
128            .map(BigInt::from)
129    }
130
131    /// The storage fund available in this epoch.
132    /// This fund is used to redistribute storage fees from past transactions
133    /// to future validators.
134    async fn fund_size(&self) -> Option<BigInt> {
135        Some(BigInt::from(self.stored.storage_fund_balance))
136    }
137
138    /// The difference between the fund inflow and outflow, representing
139    /// the net amount of storage fees accumulated in this epoch.
140    async fn net_inflow(&self) -> Option<BigInt> {
141        if let (Some(fund_inflow), Some(fund_outflow)) =
142            (self.stored.storage_charge, self.stored.storage_rebate)
143        {
144            Some(BigInt::from(fund_inflow - fund_outflow))
145        } else {
146            None
147        }
148    }
149
150    /// The storage fees paid for transactions executed during the epoch.
151    async fn fund_inflow(&self) -> Option<BigInt> {
152        self.stored.storage_charge.map(BigInt::from)
153    }
154
155    /// The storage fee rebates paid to users who deleted the data associated
156    /// with past transactions.
157    async fn fund_outflow(&self) -> Option<BigInt> {
158        self.stored.storage_rebate.map(BigInt::from)
159    }
160
161    /// The epoch's corresponding protocol configuration, including the feature
162    /// flags and the configuration options.
163    async fn protocol_configs(&self, ctx: &Context<'_>) -> Result<ProtocolConfigs> {
164        ProtocolConfigs::query(ctx.data_unchecked(), Some(self.protocol_version()))
165            .await
166            .extend()
167    }
168
169    #[graphql(flatten)]
170    async fn system_state_summary(&self, ctx: &Context<'_>) -> Result<SystemStateSummary> {
171        let state = ctx
172            .data_unchecked::<PgManager>()
173            .fetch_iota_system_state(Some(self.stored.epoch as u64))
174            .await?;
175        Ok(SystemStateSummary { native: state })
176    }
177
178    /// A commitment by the committee at the end of epoch on the contents of the
179    /// live object set at that time. This can be used to verify state
180    /// snapshots.
181    async fn live_object_set_digest(&self) -> Result<Option<String>> {
182        let Some(commitments) = self.stored.epoch_commitments.as_ref() else {
183            return Ok(None);
184        };
185        let commitments: Vec<EpochCommitment> = bcs::from_bytes(commitments).map_err(|e| {
186            Error::Internal(format!("Error deserializing commitments: {e}")).extend()
187        })?;
188
189        let digest = commitments.into_iter().next().map(|commitment| {
190            let EpochCommitment::ECMHLiveObjectSetDigest(digest) = commitment;
191            Base58::encode(digest.digest.into_inner())
192        });
193
194        Ok(digest)
195    }
196
197    /// The epoch's corresponding checkpoints.
198    async fn checkpoints(
199        &self,
200        ctx: &Context<'_>,
201        first: Option<u64>,
202        after: Option<checkpoint::Cursor>,
203        last: Option<u64>,
204        before: Option<checkpoint::Cursor>,
205    ) -> Result<Connection<String, Checkpoint>> {
206        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
207        let epoch = self.stored.epoch as u64;
208        Checkpoint::paginate(
209            ctx.data_unchecked(),
210            page,
211            Some(epoch),
212            self.checkpoint_viewed_at,
213        )
214        .await
215        .extend()
216    }
217
218    /// The epoch's corresponding transaction blocks.
219    ///
220    /// `scanLimit` restricts the number of candidate transactions scanned when
221    /// gathering a page of results. It is required for queries that apply
222    /// more than two complex filters (on function, kind, sender, recipient,
223    /// input object, changed object, or ids), and can be at most
224    /// `serviceConfig.maxScanLimit`.
225    ///
226    /// When the scan limit is reached the page will be returned even if it has
227    /// fewer than `first` results when paginating forward (`last` when
228    /// paginating backwards). If there are more transactions to scan,
229    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
230    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
231    /// to the last transaction that was scanned as opposed to the last (or
232    /// first) transaction in the page.
233    ///
234    /// Requesting the next (or previous) page after this cursor will resume the
235    /// search, scanning the next `scanLimit` many transactions in the
236    /// direction of pagination, and so on until all transactions in the
237    /// scanning range have been visited.
238    ///
239    /// By default, the scanning range consists of all transactions in this
240    /// epoch.
241    async fn transaction_blocks(
242        &self,
243        ctx: &Context<'_>,
244        first: Option<u64>,
245        after: Option<transaction_block::Cursor>,
246        last: Option<u64>,
247        before: Option<transaction_block::Cursor>,
248        filter: Option<TransactionBlockFilter>,
249        scan_limit: Option<u64>,
250    ) -> Result<ScanConnection<String, TransactionBlock>> {
251        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
252
253        let Some(filter) = filter
254            .unwrap_or_default()
255            .intersect(TransactionBlockFilter {
256                // If `first_checkpoint_id` is 0, we include the 0th checkpoint by leaving it None
257                after_checkpoint: (self.stored.first_checkpoint_id > 0)
258                    .then(|| UInt53::from(self.stored.first_checkpoint_id as u64 - 1)),
259                before_checkpoint: self
260                    .stored
261                    .last_checkpoint_id
262                    .map(|id| UInt53::from(id as u64 + 1)),
263                ..Default::default()
264            })
265        else {
266            return Ok(ScanConnection::new(false, false));
267        };
268
269        TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
270            .await
271            .extend()
272    }
273}
274
275impl Epoch {
276    /// The epoch's protocol version.
277    pub(crate) fn protocol_version(&self) -> u64 {
278        self.stored.protocol_version as u64
279    }
280
281    /// Look up an `Epoch` in the database, optionally filtered by its Epoch ID.
282    /// If no ID is supplied, defaults to fetching the latest epoch.
283    pub(crate) async fn query(
284        ctx: &Context<'_>,
285        filter: Option<u64>,
286        checkpoint_viewed_at: u64,
287    ) -> Result<Option<Self>, Error> {
288        if let Some(epoch_id) = filter {
289            let DataLoader(dl) = ctx.data_unchecked();
290            dl.load_one(EpochKey {
291                epoch_id,
292                checkpoint_viewed_at,
293            })
294            .await
295        } else {
296            Self::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await
297        }
298    }
299
300    /// Look up the latest `Epoch` from the database, optionally filtered by a
301    /// consistency cursor (querying for a consistency cursor in the past
302    /// looks for the latest epoch as of that cursor).
303    pub(crate) async fn query_latest_at(
304        db: &Db,
305        checkpoint_viewed_at: u64,
306    ) -> Result<Option<Self>, Error> {
307        use epochs::dsl;
308
309        let stored: Option<QueryableEpochInfo> = db
310            .execute(move |conn| {
311                conn.first(move || {
312                    // Bound the query on `checkpoint_viewed_at` by filtering for the epoch
313                    // whose `first_checkpoint_id <= checkpoint_viewed_at`, selecting the epoch
314                    // with the largest `first_checkpoint_id` among the filtered set.
315                    dsl::epochs
316                        .select(QueryableEpochInfo::as_select())
317                        .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
318                        .order_by(dsl::first_checkpoint_id.desc())
319                })
320                .optional()
321            })
322            .await
323            .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?;
324
325        Ok(stored.map(|stored| Epoch {
326            stored,
327            checkpoint_viewed_at,
328        }))
329    }
330}
331
332impl Loader<EpochKey> for Db {
333    type Value = Epoch;
334    type Error = Error;
335
336    async fn load(&self, keys: &[EpochKey]) -> Result<HashMap<EpochKey, Epoch>, Error> {
337        use epochs::dsl;
338
339        let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect();
340        let epochs: Vec<QueryableEpochInfo> = self
341            .execute_repeatable(move |conn| {
342                conn.results(move || {
343                    dsl::epochs
344                        .select(QueryableEpochInfo::as_select())
345                        .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned()))
346                })
347            })
348            .await
349            .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?;
350
351        let epoch_id_to_stored: BTreeMap<_, _> = epochs
352            .into_iter()
353            .map(|stored| (stored.epoch as u64, stored))
354            .collect();
355
356        Ok(keys
357            .iter()
358            .filter_map(|key| {
359                let stored = epoch_id_to_stored.get(&key.epoch_id).cloned()?;
360                let epoch = Epoch {
361                    stored,
362                    checkpoint_viewed_at: key.checkpoint_viewed_at,
363                };
364
365                // We filter by checkpoint viewed at in memory because it should be quite rare
366                // that this query actually filters something (only in edge
367                // cases), and not trying to encode it in the SQL query makes
368                // the query much simpler and therefore easier for
369                // the DB to plan.
370                let start = epoch.stored.first_checkpoint_id as u64;
371                (key.checkpoint_viewed_at >= start).then_some((*key, epoch))
372            })
373            .collect())
374    }
375}