1use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{connection::Connection, dataloader::Loader, *};
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
9use fastcrypto::encoding::{Base58, Encoding};
10use iota_indexer::{models::epoch::QueryableEpochInfo, schema::epochs};
11use iota_types::messages_checkpoint::CheckpointCommitment as EpochCommitment;
12
13use crate::{
14 connection::ScanConnection,
15 context_data::db_data_provider::PgManager,
16 data::{DataLoader, Db, DbConnection, QueryExecutor},
17 error::Error,
18 server::watermark_task::Watermark,
19 types::{
20 big_int::BigInt,
21 checkpoint::{self, Checkpoint},
22 cursor::Page,
23 date_time::DateTime,
24 protocol_config::ProtocolConfigs,
25 system_state_summary::{NativeStateValidatorInfo, SystemStateSummary},
26 transaction_block::{self, TransactionBlock, TransactionBlockFilter},
27 uint53::UInt53,
28 validator_set::ValidatorSet,
29 },
30};
31
32#[derive(Clone)]
33pub(crate) struct Epoch {
34 pub stored: QueryableEpochInfo,
35 pub checkpoint_viewed_at: u64,
36}
37
38#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
41struct EpochKey {
42 pub epoch_id: u64,
43 pub checkpoint_viewed_at: u64,
44}
45
46#[Object]
54impl Epoch {
55 async fn epoch_id(&self) -> UInt53 {
58 UInt53::from(self.stored.epoch as u64)
59 }
60
61 async fn reference_gas_price(&self) -> Option<BigInt> {
64 Some(BigInt::from(self.stored.reference_gas_price as u64))
65 }
66
67 async fn validator_set(&self, ctx: &Context<'_>) -> Result<Option<ValidatorSet>> {
69 let system_state = ctx
70 .data_unchecked::<PgManager>()
71 .fetch_iota_system_state(Some(self.stored.epoch as u64))
72 .await?;
73
74 let validator_set = NativeStateValidatorInfo::from(system_state).into_validator_set(
75 self.stored.total_stake as u64,
76 self.checkpoint_viewed_at,
77 self.stored.epoch as u64,
78 );
79 Ok(Some(validator_set))
80 }
81
82 async fn start_timestamp(&self) -> Result<DateTime, Error> {
84 DateTime::from_ms(self.stored.epoch_start_timestamp)
85 }
86
87 async fn end_timestamp(&self) -> Result<Option<DateTime>, Error> {
89 self.stored
90 .epoch_end_timestamp
91 .map(DateTime::from_ms)
92 .transpose()
93 }
94
95 async fn total_checkpoints(&self, ctx: &Context<'_>) -> Result<Option<UInt53>> {
97 let last = match self.stored.last_checkpoint_id {
98 Some(last) => last as u64,
99 None => {
100 let Watermark { checkpoint, .. } = *ctx.data_unchecked();
101 checkpoint
102 }
103 };
104
105 Ok(Some(UInt53::from(
106 last - self.stored.first_checkpoint_id as u64,
107 )))
108 }
109
110 async fn total_transactions(&self) -> Result<Option<UInt53>> {
112 Ok(self
114 .stored
115 .epoch_total_transactions
116 .map(|v| UInt53::from(v as u64)))
117 }
118
119 async fn total_gas_fees(&self) -> Option<BigInt> {
121 self.stored.total_gas_fees.map(BigInt::from)
122 }
123
124 async fn total_stake_rewards(&self) -> Option<BigInt> {
126 self.stored
127 .total_stake_rewards_distributed
128 .map(BigInt::from)
129 }
130
131 async fn fund_size(&self) -> Option<BigInt> {
135 Some(BigInt::from(self.stored.storage_fund_balance))
136 }
137
138 async fn net_inflow(&self) -> Option<BigInt> {
141 if let (Some(fund_inflow), Some(fund_outflow)) =
142 (self.stored.storage_charge, self.stored.storage_rebate)
143 {
144 Some(BigInt::from(fund_inflow - fund_outflow))
145 } else {
146 None
147 }
148 }
149
150 async fn fund_inflow(&self) -> Option<BigInt> {
152 self.stored.storage_charge.map(BigInt::from)
153 }
154
155 async fn fund_outflow(&self) -> Option<BigInt> {
158 self.stored.storage_rebate.map(BigInt::from)
159 }
160
161 async fn protocol_configs(&self, ctx: &Context<'_>) -> Result<ProtocolConfigs> {
164 ProtocolConfigs::query(ctx.data_unchecked(), Some(self.protocol_version()))
165 .await
166 .extend()
167 }
168
169 #[graphql(flatten)]
170 async fn system_state_summary(&self, ctx: &Context<'_>) -> Result<SystemStateSummary> {
171 let state = ctx
172 .data_unchecked::<PgManager>()
173 .fetch_iota_system_state(Some(self.stored.epoch as u64))
174 .await?;
175 Ok(SystemStateSummary { native: state })
176 }
177
178 async fn live_object_set_digest(&self) -> Result<Option<String>> {
182 let Some(commitments) = self.stored.epoch_commitments.as_ref() else {
183 return Ok(None);
184 };
185 let commitments: Vec<EpochCommitment> = bcs::from_bytes(commitments).map_err(|e| {
186 Error::Internal(format!("Error deserializing commitments: {e}")).extend()
187 })?;
188
189 let digest = commitments.into_iter().next().map(|commitment| {
190 let EpochCommitment::ECMHLiveObjectSetDigest(digest) = commitment;
191 Base58::encode(digest.digest.into_inner())
192 });
193
194 Ok(digest)
195 }
196
197 async fn checkpoints(
199 &self,
200 ctx: &Context<'_>,
201 first: Option<u64>,
202 after: Option<checkpoint::Cursor>,
203 last: Option<u64>,
204 before: Option<checkpoint::Cursor>,
205 ) -> Result<Connection<String, Checkpoint>> {
206 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
207 let epoch = self.stored.epoch as u64;
208 Checkpoint::paginate(
209 ctx.data_unchecked(),
210 page,
211 Some(epoch),
212 self.checkpoint_viewed_at,
213 )
214 .await
215 .extend()
216 }
217
218 async fn transaction_blocks(
242 &self,
243 ctx: &Context<'_>,
244 first: Option<u64>,
245 after: Option<transaction_block::Cursor>,
246 last: Option<u64>,
247 before: Option<transaction_block::Cursor>,
248 filter: Option<TransactionBlockFilter>,
249 scan_limit: Option<u64>,
250 ) -> Result<ScanConnection<String, TransactionBlock>> {
251 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
252
253 let Some(filter) = filter
254 .unwrap_or_default()
255 .intersect(TransactionBlockFilter {
256 after_checkpoint: (self.stored.first_checkpoint_id > 0)
258 .then(|| UInt53::from(self.stored.first_checkpoint_id as u64 - 1)),
259 before_checkpoint: self
260 .stored
261 .last_checkpoint_id
262 .map(|id| UInt53::from(id as u64 + 1)),
263 ..Default::default()
264 })
265 else {
266 return Ok(ScanConnection::new(false, false));
267 };
268
269 TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
270 .await
271 .extend()
272 }
273}
274
275impl Epoch {
276 pub(crate) fn protocol_version(&self) -> u64 {
278 self.stored.protocol_version as u64
279 }
280
281 pub(crate) async fn query(
284 ctx: &Context<'_>,
285 filter: Option<u64>,
286 checkpoint_viewed_at: u64,
287 ) -> Result<Option<Self>, Error> {
288 if let Some(epoch_id) = filter {
289 let DataLoader(dl) = ctx.data_unchecked();
290 dl.load_one(EpochKey {
291 epoch_id,
292 checkpoint_viewed_at,
293 })
294 .await
295 } else {
296 Self::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await
297 }
298 }
299
300 pub(crate) async fn query_latest_at(
304 db: &Db,
305 checkpoint_viewed_at: u64,
306 ) -> Result<Option<Self>, Error> {
307 use epochs::dsl;
308
309 let stored: Option<QueryableEpochInfo> = db
310 .execute(move |conn| {
311 conn.first(move || {
312 dsl::epochs
316 .select(QueryableEpochInfo::as_select())
317 .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
318 .order_by(dsl::first_checkpoint_id.desc())
319 })
320 .optional()
321 })
322 .await
323 .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?;
324
325 Ok(stored.map(|stored| Epoch {
326 stored,
327 checkpoint_viewed_at,
328 }))
329 }
330}
331
332impl Loader<EpochKey> for Db {
333 type Value = Epoch;
334 type Error = Error;
335
336 async fn load(&self, keys: &[EpochKey]) -> Result<HashMap<EpochKey, Epoch>, Error> {
337 use epochs::dsl;
338
339 let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect();
340 let epochs: Vec<QueryableEpochInfo> = self
341 .execute_repeatable(move |conn| {
342 conn.results(move || {
343 dsl::epochs
344 .select(QueryableEpochInfo::as_select())
345 .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned()))
346 })
347 })
348 .await
349 .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?;
350
351 let epoch_id_to_stored: BTreeMap<_, _> = epochs
352 .into_iter()
353 .map(|stored| (stored.epoch as u64, stored))
354 .collect();
355
356 Ok(keys
357 .iter()
358 .filter_map(|key| {
359 let stored = epoch_id_to_stored.get(&key.epoch_id).cloned()?;
360 let epoch = Epoch {
361 stored,
362 checkpoint_viewed_at: key.checkpoint_viewed_at,
363 };
364
365 let start = epoch.stored.first_checkpoint_id as u64;
371 (key.checkpoint_viewed_at >= start).then_some((*key, epoch))
372 })
373 .collect())
374 }
375}