1use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{connection::Connection, dataloader::Loader, *};
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
9use fastcrypto::encoding::{Base58, Encoding};
10use iota_indexer::{models::epoch::QueryableEpochInfo, schema::epochs};
11use iota_types::messages_checkpoint::CheckpointCommitment as EpochCommitment;
12
13use crate::{
14 config::DEFAULT_PAGE_SIZE,
15 connection::ScanConnection,
16 context_data::db_data_provider::PgManager,
17 data::{DataLoader, Db, DbConnection, QueryExecutor},
18 error::Error,
19 server::watermark_task::Watermark,
20 types::{
21 big_int::BigInt,
22 checkpoint::{self, Checkpoint},
23 cursor::Page,
24 date_time::DateTime,
25 protocol_config::ProtocolConfigs,
26 system_state_summary::{NativeStateValidatorInfo, SystemStateSummary},
27 transaction_block::{self, TransactionBlock, TransactionBlockFilter},
28 uint53::UInt53,
29 validator_set::ValidatorSet,
30 },
31};
32
33#[derive(Clone)]
34pub(crate) struct Epoch {
35 pub stored: QueryableEpochInfo,
36 pub checkpoint_viewed_at: u64,
37}
38
39#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
42struct EpochKey {
43 pub epoch_id: u64,
44 pub checkpoint_viewed_at: u64,
45}
46
47#[Object]
55impl Epoch {
56 async fn epoch_id(&self) -> UInt53 {
59 UInt53::from(self.stored.epoch as u64)
60 }
61
62 async fn reference_gas_price(&self) -> Option<BigInt> {
65 Some(BigInt::from(self.stored.reference_gas_price as u64))
66 }
67
68 async fn validator_set(&self, ctx: &Context<'_>) -> Result<Option<ValidatorSet>> {
73 let system_state = ctx
74 .data_unchecked::<PgManager>()
75 .fetch_iota_system_state(Some(self.stored.epoch as u64))
76 .await?;
77
78 let validator_set = NativeStateValidatorInfo::from(system_state).into_validator_set(
79 self.stored.total_stake as u64,
80 self.checkpoint_viewed_at,
81 self.stored.epoch as u64,
82 );
83 Ok(Some(validator_set))
84 }
85
86 async fn start_timestamp(&self) -> Result<DateTime, Error> {
88 DateTime::from_ms(self.stored.epoch_start_timestamp)
89 }
90
91 async fn end_timestamp(&self) -> Result<Option<DateTime>, Error> {
93 self.stored
94 .epoch_end_timestamp
95 .map(DateTime::from_ms)
96 .transpose()
97 }
98
99 async fn total_checkpoints(&self, ctx: &Context<'_>) -> Result<Option<UInt53>> {
101 let last = match self.stored.last_checkpoint_id {
102 Some(last) => last as u64,
103 None => {
104 let Watermark { checkpoint, .. } = *ctx.data_unchecked();
105 checkpoint
106 }
107 };
108
109 Ok(Some(UInt53::from(
110 last - self.stored.first_checkpoint_id as u64 + 1,
111 )))
112 }
113
114 async fn total_transactions(&self) -> Result<Option<UInt53>> {
116 Ok(self
118 .stored
119 .epoch_total_transactions()
120 .map(|v| UInt53::from(v as u64)))
121 }
122
123 async fn total_gas_fees(&self) -> Option<BigInt> {
125 self.stored.total_gas_fees.map(BigInt::from)
126 }
127
128 async fn total_stake_rewards(&self) -> Option<BigInt> {
130 self.stored
131 .total_stake_rewards_distributed
132 .map(BigInt::from)
133 }
134
135 async fn fund_size(&self) -> Option<BigInt> {
139 Some(BigInt::from(self.stored.storage_fund_balance))
140 }
141
142 async fn net_inflow(&self) -> Option<BigInt> {
145 if let (Some(fund_inflow), Some(fund_outflow)) =
146 (self.stored.storage_charge, self.stored.storage_rebate)
147 {
148 Some(BigInt::from(fund_inflow - fund_outflow))
149 } else {
150 None
151 }
152 }
153
154 async fn fund_inflow(&self) -> Option<BigInt> {
156 self.stored.storage_charge.map(BigInt::from)
157 }
158
159 async fn fund_outflow(&self) -> Option<BigInt> {
162 self.stored.storage_rebate.map(BigInt::from)
163 }
164
165 async fn protocol_configs(&self, ctx: &Context<'_>) -> Result<ProtocolConfigs> {
168 ProtocolConfigs::query(ctx.data_unchecked(), Some(self.protocol_version()))
169 .await
170 .extend()
171 }
172
173 #[graphql(flatten)]
174 async fn system_state_summary(&self, ctx: &Context<'_>) -> Result<SystemStateSummary> {
175 let state = ctx
176 .data_unchecked::<PgManager>()
177 .fetch_iota_system_state(Some(self.stored.epoch as u64))
178 .await?;
179 Ok(SystemStateSummary { native: state })
180 }
181
182 async fn live_object_set_digest(&self) -> Result<Option<String>> {
186 let Some(commitments) = self.stored.epoch_commitments.as_ref() else {
187 return Ok(None);
188 };
189 let commitments: Vec<EpochCommitment> = bcs::from_bytes(commitments).map_err(|e| {
190 Error::Internal(format!("Error deserializing commitments: {e}")).extend()
191 })?;
192
193 let digest = commitments.into_iter().next().map(|commitment| {
194 let EpochCommitment::ECMHLiveObjectSetDigest(digest) = commitment;
195 Base58::encode(digest.digest.into_inner())
196 });
197
198 Ok(digest)
199 }
200
201 async fn checkpoints(
203 &self,
204 ctx: &Context<'_>,
205 first: Option<u64>,
206 after: Option<checkpoint::Cursor>,
207 last: Option<u64>,
208 before: Option<checkpoint::Cursor>,
209 ) -> Result<Connection<String, Checkpoint>> {
210 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
211 let epoch = self.stored.epoch as u64;
212 Checkpoint::paginate(
213 ctx.data_unchecked(),
214 page,
215 Some(epoch),
216 self.checkpoint_viewed_at,
217 )
218 .await
219 .extend()
220 }
221
222 #[graphql(
246 complexity = "first.or(last).unwrap_or(DEFAULT_PAGE_SIZE as u64) as usize * child_complexity"
247 )]
248 async fn transaction_blocks(
249 &self,
250 ctx: &Context<'_>,
251 first: Option<u64>,
252 after: Option<transaction_block::Cursor>,
253 last: Option<u64>,
254 before: Option<transaction_block::Cursor>,
255 filter: Option<TransactionBlockFilter>,
256 scan_limit: Option<u64>,
257 ) -> Result<ScanConnection<String, TransactionBlock>> {
258 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
259
260 let Some(filter) = filter
261 .unwrap_or_default()
262 .intersect(TransactionBlockFilter {
263 after_checkpoint: (self.stored.first_checkpoint_id > 0)
265 .then(|| UInt53::from(self.stored.first_checkpoint_id as u64 - 1)),
266 before_checkpoint: self
267 .stored
268 .last_checkpoint_id
269 .map(|id| UInt53::from(id as u64 + 1)),
270 ..Default::default()
271 })
272 else {
273 return Ok(ScanConnection::new(false, false));
274 };
275
276 TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
277 .await
278 .extend()
279 }
280}
281
282impl Epoch {
283 pub(crate) fn protocol_version(&self) -> u64 {
285 self.stored.protocol_version as u64
286 }
287
288 pub(crate) async fn query(
291 ctx: &Context<'_>,
292 filter: Option<u64>,
293 checkpoint_viewed_at: u64,
294 ) -> Result<Option<Self>, Error> {
295 if let Some(epoch_id) = filter {
296 let DataLoader(dl) = ctx.data_unchecked();
297 dl.load_one(EpochKey {
298 epoch_id,
299 checkpoint_viewed_at,
300 })
301 .await
302 } else {
303 Self::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await
304 }
305 }
306
307 pub(crate) async fn query_latest_at(
311 db: &Db,
312 checkpoint_viewed_at: u64,
313 ) -> Result<Option<Self>, Error> {
314 use epochs::dsl;
315
316 let stored: Option<QueryableEpochInfo> = db
317 .execute(move |conn| {
318 conn.first(move || {
319 dsl::epochs
323 .select(QueryableEpochInfo::as_select())
324 .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
325 .order_by(dsl::first_checkpoint_id.desc())
326 })
327 .optional()
328 })
329 .await
330 .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?;
331
332 Ok(stored.map(|stored| Epoch {
333 stored,
334 checkpoint_viewed_at,
335 }))
336 }
337}
338
339impl Loader<EpochKey> for Db {
340 type Value = Epoch;
341 type Error = Error;
342
343 async fn load(&self, keys: &[EpochKey]) -> Result<HashMap<EpochKey, Epoch>, Error> {
344 use epochs::dsl;
345
346 let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect();
347 let epochs: Vec<QueryableEpochInfo> = self
348 .execute_repeatable(move |conn| {
349 conn.results(move || {
350 dsl::epochs
351 .select(QueryableEpochInfo::as_select())
352 .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned()))
353 })
354 })
355 .await
356 .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?;
357
358 let epoch_id_to_stored: BTreeMap<_, _> = epochs
359 .into_iter()
360 .map(|stored| (stored.epoch as u64, stored))
361 .collect();
362
363 Ok(keys
364 .iter()
365 .filter_map(|key| {
366 let stored = epoch_id_to_stored.get(&key.epoch_id).cloned()?;
367 let epoch = Epoch {
368 stored,
369 checkpoint_viewed_at: key.checkpoint_viewed_at,
370 };
371
372 let start = epoch.stored.first_checkpoint_id as u64;
378 (key.checkpoint_viewed_at >= start).then_some((*key, epoch))
379 })
380 .collect())
381 }
382}