iota_graphql_rpc/context_data/
db_data_provider.rs1use std::time::Duration;
6
7use iota_indexer::{
8 apis::GovernanceReadApi, db::ConnectionPoolConfig, indexer_reader::IndexerReader,
9};
10use iota_json_rpc_types::Stake as RpcStakedIota;
11use iota_types::{
12 governance::StakedIota as NativeStakedIota,
13 iota_system_state::iota_system_state_summary::IotaSystemStateSummary as NativeIotaSystemStateSummary,
14};
15
16use crate::{error::Error, types::system_state_summary::SystemStateSummaryView};
17
18pub(crate) struct PgManager {
19 pub inner: IndexerReader,
20}
21
22impl PgManager {
23 pub(crate) fn new(inner: IndexerReader) -> Self {
24 Self { inner }
25 }
26
27 pub(crate) fn reader_with_config(
30 db_url: impl Into<String>,
31 pool_size: u32,
32 timeout_ms: u64,
33 ) -> Result<IndexerReader, Error> {
34 let mut config = ConnectionPoolConfig::default();
35 config.set_pool_size(pool_size);
36 config.set_statement_timeout(Duration::from_millis(timeout_ms));
37 IndexerReader::new_with_config(db_url, config)
38 .map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
39 }
40}
41
42impl PgManager {
44 pub(crate) async fn fetch_iota_system_state(
47 &self,
48 epoch_id: Option<u64>,
49 ) -> Result<NativeIotaSystemStateSummary, Error> {
50 let latest_iota_system_state = self
51 .inner
52 .spawn_blocking(move |this| this.get_latest_iota_system_state())
53 .await?;
54
55 if epoch_id.is_none() || epoch_id.is_some_and(|id| id == latest_iota_system_state.epoch()) {
56 Ok(latest_iota_system_state)
57 } else {
58 Ok(self
59 .inner
60 .spawn_blocking(move |this| this.get_epoch_iota_system_state(epoch_id))
61 .await?)
62 }
63 }
64
65 pub(crate) async fn fetch_rpc_staked_iota(
69 &self,
70 stake: NativeStakedIota,
71 ) -> Result<RpcStakedIota, Error> {
72 let governance_api = GovernanceReadApi::new(self.inner.clone());
73
74 let mut delegated_stakes = governance_api
75 .get_delegated_stakes(vec![stake])
76 .await
77 .map_err(|e| Error::Internal(format!("Error fetching delegated stake. {e}")))?;
78
79 let Some(mut delegated_stake) = delegated_stakes.pop() else {
80 return Err(Error::Internal(
81 "Error fetching delegated stake. No pools returned.".to_string(),
82 ));
83 };
84
85 let Some(stake) = delegated_stake.stakes.pop() else {
86 return Err(Error::Internal(
87 "Error fetching delegated stake. No stake in pool.".to_string(),
88 ));
89 };
90
91 Ok(stake)
92 }
93}