iota_graphql_rpc/context_data/
db_data_provider.rs1use std::time::Duration;
6
7use iota_indexer::{
8 apis::GovernanceReadApi,
9 db::ConnectionPoolConfig,
10 metrics::IndexerMetrics,
11 pruning::watermark_task::{WatermarkCache, WatermarkTask},
12 read::IndexerReader,
13 store::PgIndexerStore,
14};
15use iota_json_rpc_types::Stake as RpcStakedIota;
16use iota_types::{
17 governance::StakedIota as NativeStakedIota,
18 iota_system_state::iota_system_state_summary::IotaSystemStateSummary as NativeIotaSystemStateSummary,
19};
20use tokio_util::sync::CancellationToken;
21
22use crate::error::Error;
23
24pub(crate) struct PgManager {
25 pub inner: IndexerReader,
26}
27
28impl PgManager {
29 pub(crate) fn new(inner: IndexerReader) -> Self {
30 Self { inner }
31 }
32
33 pub(crate) fn reader_with_config(
36 db_url: impl Into<String>,
37 pool_size: u32,
38 timeout_ms: u64,
39 indexer_metrics: IndexerMetrics,
40 cancellation_token: CancellationToken,
41 ) -> Result<IndexerReader, Error> {
42 let mut config = ConnectionPoolConfig::default();
43 config.set_pool_size(pool_size);
44 config.set_statement_timeout(Duration::from_millis(timeout_ms));
45
46 let connection_pool = iota_indexer::db::new_connection_pool(&db_url.into(), &config)
48 .map_err(|e| Error::Internal(format!("Failed to create connection pool: {e}")))?;
49
50 let store = PgIndexerStore::new(connection_pool.clone(), indexer_metrics);
52 let watermark_cache = WatermarkCache::new();
53
54 let watermark_task = WatermarkTask::new(store, watermark_cache.clone());
56 watermark_task.start(cancellation_token);
57
58 Ok(IndexerReader::new(connection_pool, watermark_cache))
60 }
61}
62
63impl PgManager {
65 pub(crate) async fn fetch_iota_system_state(
68 &self,
69 epoch_id: Option<u64>,
70 ) -> Result<NativeIotaSystemStateSummary, Error> {
71 let latest_iota_system_state = self
72 .inner
73 .spawn_blocking(move |this| this.get_latest_iota_system_state())
74 .await?;
75
76 if epoch_id.is_none() || epoch_id.is_some_and(|id| id == latest_iota_system_state.epoch()) {
77 Ok(latest_iota_system_state)
78 } else {
79 Ok(self
80 .inner
81 .spawn_blocking(move |this| this.get_epoch_iota_system_state(epoch_id))
82 .await?)
83 }
84 }
85
86 pub(crate) async fn fetch_rpc_staked_iota(
90 &self,
91 stake: NativeStakedIota,
92 ) -> Result<RpcStakedIota, Error> {
93 let governance_api = GovernanceReadApi::new(self.inner.clone());
94
95 let mut delegated_stakes = governance_api
96 .get_delegated_stakes(vec![stake])
97 .await
98 .map_err(|e| Error::Internal(format!("Error fetching delegated stake. {e}")))?;
99
100 let Some(mut delegated_stake) = delegated_stakes.pop() else {
101 return Err(Error::Internal(
102 "Error fetching delegated stake. No pools returned.".to_string(),
103 ));
104 };
105
106 let Some(stake) = delegated_stake.stakes.pop() else {
107 return Err(Error::Internal(
108 "Error fetching delegated stake. No stake in pool.".to_string(),
109 ));
110 };
111
112 Ok(stake)
113 }
114}