iota_graphql_rpc/context_data/
db_data_provider.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Duration;
6
7use iota_indexer::{
8    apis::GovernanceReadApi,
9    db::ConnectionPoolConfig,
10    metrics::IndexerMetrics,
11    pruning::watermark_task::{WatermarkCache, WatermarkTask},
12    read::IndexerReader,
13    store::PgIndexerStore,
14};
15use iota_json_rpc_types::Stake as RpcStakedIota;
16use iota_types::{
17    governance::StakedIota as NativeStakedIota,
18    iota_system_state::iota_system_state_summary::IotaSystemStateSummary as NativeIotaSystemStateSummary,
19};
20use tokio_util::sync::CancellationToken;
21
22use crate::error::Error;
23
24pub(crate) struct PgManager {
25    pub inner: IndexerReader,
26}
27
28impl PgManager {
29    pub(crate) fn new(inner: IndexerReader) -> Self {
30        Self { inner }
31    }
32
33    /// Create a new underlying reader, which is used by this type as well as
34    /// other data providers.
35    pub(crate) fn reader_with_config(
36        db_url: impl Into<String>,
37        pool_size: u32,
38        timeout_ms: u64,
39        indexer_metrics: IndexerMetrics,
40        cancellation_token: CancellationToken,
41    ) -> Result<IndexerReader, Error> {
42        let mut config = ConnectionPoolConfig::default();
43        config.set_pool_size(pool_size);
44        config.set_statement_timeout(Duration::from_millis(timeout_ms));
45
46        // Create connection pool
47        let connection_pool = iota_indexer::db::new_connection_pool(&db_url.into(), &config)
48            .map_err(|e| Error::Internal(format!("Failed to create connection pool: {e}")))?;
49
50        // Create store and watermark cache for pruning support
51        let store = PgIndexerStore::new(connection_pool.clone(), indexer_metrics);
52        let watermark_cache = WatermarkCache::new();
53
54        // Start watermark task with cancellation token
55        let watermark_task = WatermarkTask::new(store, watermark_cache.clone());
56        watermark_task.start(cancellation_token);
57
58        // Create reader with watermark cache
59        Ok(IndexerReader::new(connection_pool, watermark_cache))
60    }
61}
62
63/// Implement methods to be used by graphql resolvers
64impl PgManager {
65    /// If no epoch was requested or if the epoch requested is in progress,
66    /// returns the latest iota system state.
67    pub(crate) async fn fetch_iota_system_state(
68        &self,
69        epoch_id: Option<u64>,
70    ) -> Result<NativeIotaSystemStateSummary, Error> {
71        let latest_iota_system_state = self
72            .inner
73            .spawn_blocking(move |this| this.get_latest_iota_system_state())
74            .await?;
75
76        if epoch_id.is_none() || epoch_id.is_some_and(|id| id == latest_iota_system_state.epoch()) {
77            Ok(latest_iota_system_state)
78        } else {
79            Ok(self
80                .inner
81                .spawn_blocking(move |this| this.get_epoch_iota_system_state(epoch_id))
82                .await?)
83        }
84    }
85
86    /// Make a request to the RPC for its representations of the staked iota we
87    /// parsed out of the object.  Used to implement fields that are
88    /// implemented in JSON-RPC but not GraphQL (yet).
89    pub(crate) async fn fetch_rpc_staked_iota(
90        &self,
91        stake: NativeStakedIota,
92    ) -> Result<RpcStakedIota, Error> {
93        let governance_api = GovernanceReadApi::new(self.inner.clone());
94
95        let mut delegated_stakes = governance_api
96            .get_delegated_stakes(vec![stake])
97            .await
98            .map_err(|e| Error::Internal(format!("Error fetching delegated stake. {e}")))?;
99
100        let Some(mut delegated_stake) = delegated_stakes.pop() else {
101            return Err(Error::Internal(
102                "Error fetching delegated stake. No pools returned.".to_string(),
103            ));
104        };
105
106        let Some(stake) = delegated_stake.stakes.pop() else {
107            return Err(Error::Internal(
108                "Error fetching delegated stake. No stake in pool.".to_string(),
109            ));
110        };
111
112        Ok(stake)
113    }
114}