1use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{connection::Connection, dataloader::Loader, *};
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
9use fastcrypto::encoding::{Base58, Encoding};
10use iota_indexer::{models::epoch::QueryableEpochInfo, schema::epochs};
11use iota_types::messages_checkpoint::CheckpointCommitment as EpochCommitment;
12
13use crate::{
14 connection::ScanConnection,
15 context_data::db_data_provider::PgManager,
16 data::{DataLoader, Db, DbConnection, QueryExecutor},
17 error::Error,
18 server::watermark_task::Watermark,
19 types::{
20 big_int::BigInt,
21 checkpoint::{self, Checkpoint},
22 cursor::Page,
23 date_time::DateTime,
24 protocol_config::ProtocolConfigs,
25 system_state_summary::{NativeStateValidatorInfo, SystemStateSummary},
26 transaction_block::{self, TransactionBlock, TransactionBlockFilter},
27 uint53::UInt53,
28 validator_set::ValidatorSet,
29 },
30};
31
32#[derive(Clone)]
33pub(crate) struct Epoch {
34 pub stored: QueryableEpochInfo,
35 pub checkpoint_viewed_at: u64,
36}
37
38#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
41struct EpochKey {
42 pub epoch_id: u64,
43 pub checkpoint_viewed_at: u64,
44}
45
46#[Object]
54impl Epoch {
55 async fn epoch_id(&self) -> UInt53 {
58 UInt53::from(self.stored.epoch as u64)
59 }
60
61 async fn reference_gas_price(&self) -> Option<BigInt> {
64 Some(BigInt::from(self.stored.reference_gas_price as u64))
65 }
66
67 async fn validator_set(&self, ctx: &Context<'_>) -> Result<Option<ValidatorSet>> {
72 let system_state = ctx
73 .data_unchecked::<PgManager>()
74 .fetch_iota_system_state(Some(self.stored.epoch as u64))
75 .await?;
76
77 let validator_set = NativeStateValidatorInfo::from(system_state).into_validator_set(
78 self.stored.total_stake as u64,
79 self.checkpoint_viewed_at,
80 self.stored.epoch as u64,
81 );
82 Ok(Some(validator_set))
83 }
84
85 async fn start_timestamp(&self) -> Result<DateTime, Error> {
87 DateTime::from_ms(self.stored.epoch_start_timestamp)
88 }
89
90 async fn end_timestamp(&self) -> Result<Option<DateTime>, Error> {
92 self.stored
93 .epoch_end_timestamp
94 .map(DateTime::from_ms)
95 .transpose()
96 }
97
98 async fn total_checkpoints(&self, ctx: &Context<'_>) -> Result<Option<UInt53>> {
100 let last = match self.stored.last_checkpoint_id {
101 Some(last) => last as u64,
102 None => {
103 let Watermark { checkpoint, .. } = *ctx.data_unchecked();
104 checkpoint
105 }
106 };
107
108 Ok(Some(UInt53::from(
109 last - self.stored.first_checkpoint_id as u64 + 1,
110 )))
111 }
112
113 async fn total_transactions(&self) -> Result<Option<UInt53>> {
115 Ok(self
117 .stored
118 .epoch_total_transactions()
119 .map(|v| UInt53::from(v as u64)))
120 }
121
122 async fn total_gas_fees(&self) -> Option<BigInt> {
124 self.stored.total_gas_fees.map(BigInt::from)
125 }
126
127 async fn total_stake_rewards(&self) -> Option<BigInt> {
129 self.stored
130 .total_stake_rewards_distributed
131 .map(BigInt::from)
132 }
133
134 async fn fund_size(&self) -> Option<BigInt> {
138 Some(BigInt::from(self.stored.storage_fund_balance))
139 }
140
141 async fn net_inflow(&self) -> Option<BigInt> {
144 if let (Some(fund_inflow), Some(fund_outflow)) =
145 (self.stored.storage_charge, self.stored.storage_rebate)
146 {
147 Some(BigInt::from(fund_inflow - fund_outflow))
148 } else {
149 None
150 }
151 }
152
153 async fn fund_inflow(&self) -> Option<BigInt> {
155 self.stored.storage_charge.map(BigInt::from)
156 }
157
158 async fn fund_outflow(&self) -> Option<BigInt> {
161 self.stored.storage_rebate.map(BigInt::from)
162 }
163
164 async fn protocol_configs(&self, ctx: &Context<'_>) -> Result<ProtocolConfigs> {
167 ProtocolConfigs::query(ctx.data_unchecked(), Some(self.protocol_version()))
168 .await
169 .extend()
170 }
171
172 #[graphql(flatten)]
173 async fn system_state_summary(&self, ctx: &Context<'_>) -> Result<SystemStateSummary> {
174 let state = ctx
175 .data_unchecked::<PgManager>()
176 .fetch_iota_system_state(Some(self.stored.epoch as u64))
177 .await?;
178 Ok(SystemStateSummary { native: state })
179 }
180
181 async fn live_object_set_digest(&self) -> Result<Option<String>> {
185 let Some(commitments) = self.stored.epoch_commitments.as_ref() else {
186 return Ok(None);
187 };
188 let commitments: Vec<EpochCommitment> = bcs::from_bytes(commitments).map_err(|e| {
189 Error::Internal(format!("Error deserializing commitments: {e}")).extend()
190 })?;
191
192 let digest = commitments.into_iter().next().map(|commitment| {
193 let EpochCommitment::ECMHLiveObjectSetDigest(digest) = commitment;
194 Base58::encode(digest.digest.into_inner())
195 });
196
197 Ok(digest)
198 }
199
200 async fn checkpoints(
202 &self,
203 ctx: &Context<'_>,
204 first: Option<u64>,
205 after: Option<checkpoint::Cursor>,
206 last: Option<u64>,
207 before: Option<checkpoint::Cursor>,
208 ) -> Result<Connection<String, Checkpoint>> {
209 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
210 let epoch = self.stored.epoch as u64;
211 Checkpoint::paginate(
212 ctx.data_unchecked(),
213 page,
214 Some(epoch),
215 self.checkpoint_viewed_at,
216 )
217 .await
218 .extend()
219 }
220
221 async fn transaction_blocks(
245 &self,
246 ctx: &Context<'_>,
247 first: Option<u64>,
248 after: Option<transaction_block::Cursor>,
249 last: Option<u64>,
250 before: Option<transaction_block::Cursor>,
251 filter: Option<TransactionBlockFilter>,
252 scan_limit: Option<u64>,
253 ) -> Result<ScanConnection<String, TransactionBlock>> {
254 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
255
256 let Some(filter) = filter
257 .unwrap_or_default()
258 .intersect(TransactionBlockFilter {
259 after_checkpoint: (self.stored.first_checkpoint_id > 0)
261 .then(|| UInt53::from(self.stored.first_checkpoint_id as u64 - 1)),
262 before_checkpoint: self
263 .stored
264 .last_checkpoint_id
265 .map(|id| UInt53::from(id as u64 + 1)),
266 ..Default::default()
267 })
268 else {
269 return Ok(ScanConnection::new(false, false));
270 };
271
272 TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
273 .await
274 .extend()
275 }
276}
277
278impl Epoch {
279 pub(crate) fn protocol_version(&self) -> u64 {
281 self.stored.protocol_version as u64
282 }
283
284 pub(crate) async fn query(
287 ctx: &Context<'_>,
288 filter: Option<u64>,
289 checkpoint_viewed_at: u64,
290 ) -> Result<Option<Self>, Error> {
291 if let Some(epoch_id) = filter {
292 let DataLoader(dl) = ctx.data_unchecked();
293 dl.load_one(EpochKey {
294 epoch_id,
295 checkpoint_viewed_at,
296 })
297 .await
298 } else {
299 Self::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await
300 }
301 }
302
303 pub(crate) async fn query_latest_at(
307 db: &Db,
308 checkpoint_viewed_at: u64,
309 ) -> Result<Option<Self>, Error> {
310 use epochs::dsl;
311
312 let stored: Option<QueryableEpochInfo> = db
313 .execute(move |conn| {
314 conn.first(move || {
315 dsl::epochs
319 .select(QueryableEpochInfo::as_select())
320 .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
321 .order_by(dsl::first_checkpoint_id.desc())
322 })
323 .optional()
324 })
325 .await
326 .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?;
327
328 Ok(stored.map(|stored| Epoch {
329 stored,
330 checkpoint_viewed_at,
331 }))
332 }
333}
334
335impl Loader<EpochKey> for Db {
336 type Value = Epoch;
337 type Error = Error;
338
339 async fn load(&self, keys: &[EpochKey]) -> Result<HashMap<EpochKey, Epoch>, Error> {
340 use epochs::dsl;
341
342 let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect();
343 let epochs: Vec<QueryableEpochInfo> = self
344 .execute_repeatable(move |conn| {
345 conn.results(move || {
346 dsl::epochs
347 .select(QueryableEpochInfo::as_select())
348 .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned()))
349 })
350 })
351 .await
352 .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?;
353
354 let epoch_id_to_stored: BTreeMap<_, _> = epochs
355 .into_iter()
356 .map(|stored| (stored.epoch as u64, stored))
357 .collect();
358
359 Ok(keys
360 .iter()
361 .filter_map(|key| {
362 let stored = epoch_id_to_stored.get(&key.epoch_id).cloned()?;
363 let epoch = Epoch {
364 stored,
365 checkpoint_viewed_at: key.checkpoint_viewed_at,
366 };
367
368 let start = epoch.stored.first_checkpoint_id as u64;
374 (key.checkpoint_viewed_at >= start).then_some((*key, epoch))
375 })
376 .collect())
377 }
378}