iota_graphql_rpc/context_data/
db_data_provider.rs1use std::time::Duration;
6
7use iota_indexer::{apis::GovernanceReadApi, db::ConnectionPoolConfig, read::IndexerReader};
8use iota_json_rpc_types::Stake as RpcStakedIota;
9use iota_types::{
10 governance::StakedIota as NativeStakedIota,
11 iota_system_state::iota_system_state_summary::IotaSystemStateSummary as NativeIotaSystemStateSummary,
12};
13
14use crate::{error::Error, types::system_state_summary::SystemStateSummaryView};
15
16pub(crate) struct PgManager {
17 pub inner: IndexerReader,
18}
19
20impl PgManager {
21 pub(crate) fn new(inner: IndexerReader) -> Self {
22 Self { inner }
23 }
24
25 pub(crate) fn reader_with_config(
28 db_url: impl Into<String>,
29 pool_size: u32,
30 timeout_ms: u64,
31 ) -> Result<IndexerReader, Error> {
32 let mut config = ConnectionPoolConfig::default();
33 config.set_pool_size(pool_size);
34 config.set_statement_timeout(Duration::from_millis(timeout_ms));
35 IndexerReader::new_with_config(db_url, config)
36 .map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
37 }
38}
39
40impl PgManager {
42 pub(crate) async fn fetch_iota_system_state(
45 &self,
46 epoch_id: Option<u64>,
47 ) -> Result<NativeIotaSystemStateSummary, Error> {
48 let latest_iota_system_state = self
49 .inner
50 .spawn_blocking(move |this| this.get_latest_iota_system_state())
51 .await?;
52
53 if epoch_id.is_none() || epoch_id.is_some_and(|id| id == latest_iota_system_state.epoch()) {
54 Ok(latest_iota_system_state)
55 } else {
56 Ok(self
57 .inner
58 .spawn_blocking(move |this| this.get_epoch_iota_system_state(epoch_id))
59 .await?)
60 }
61 }
62
63 pub(crate) async fn fetch_rpc_staked_iota(
67 &self,
68 stake: NativeStakedIota,
69 ) -> Result<RpcStakedIota, Error> {
70 let governance_api = GovernanceReadApi::new(self.inner.clone());
71
72 let mut delegated_stakes = governance_api
73 .get_delegated_stakes(vec![stake])
74 .await
75 .map_err(|e| Error::Internal(format!("Error fetching delegated stake. {e}")))?;
76
77 let Some(mut delegated_stake) = delegated_stakes.pop() else {
78 return Err(Error::Internal(
79 "Error fetching delegated stake. No pools returned.".to_string(),
80 ));
81 };
82
83 let Some(stake) = delegated_stake.stakes.pop() else {
84 return Err(Error::Internal(
85 "Error fetching delegated stake. No stake in pool.".to_string(),
86 ));
87 };
88
89 Ok(stake)
90 }
91}