iota_graphql_rpc/context_data/
db_data_provider.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Duration;
6
7use iota_indexer::{apis::GovernanceReadApi, db::ConnectionPoolConfig, read::IndexerReader};
8use iota_json_rpc_types::Stake as RpcStakedIota;
9use iota_types::{
10    governance::StakedIota as NativeStakedIota,
11    iota_system_state::iota_system_state_summary::IotaSystemStateSummary as NativeIotaSystemStateSummary,
12};
13
14use crate::{error::Error, types::system_state_summary::SystemStateSummaryView};
15
16pub(crate) struct PgManager {
17    pub inner: IndexerReader,
18}
19
20impl PgManager {
21    pub(crate) fn new(inner: IndexerReader) -> Self {
22        Self { inner }
23    }
24
25    /// Create a new underlying reader, which is used by this type as well as
26    /// other data providers.
27    pub(crate) fn reader_with_config(
28        db_url: impl Into<String>,
29        pool_size: u32,
30        timeout_ms: u64,
31    ) -> Result<IndexerReader, Error> {
32        let mut config = ConnectionPoolConfig::default();
33        config.set_pool_size(pool_size);
34        config.set_statement_timeout(Duration::from_millis(timeout_ms));
35        IndexerReader::new_with_config(db_url, config)
36            .map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
37    }
38}
39
40/// Implement methods to be used by graphql resolvers
41impl PgManager {
42    /// If no epoch was requested or if the epoch requested is in progress,
43    /// returns the latest iota system state.
44    pub(crate) async fn fetch_iota_system_state(
45        &self,
46        epoch_id: Option<u64>,
47    ) -> Result<NativeIotaSystemStateSummary, Error> {
48        let latest_iota_system_state = self
49            .inner
50            .spawn_blocking(move |this| this.get_latest_iota_system_state())
51            .await?;
52
53        if epoch_id.is_none() || epoch_id.is_some_and(|id| id == latest_iota_system_state.epoch()) {
54            Ok(latest_iota_system_state)
55        } else {
56            Ok(self
57                .inner
58                .spawn_blocking(move |this| this.get_epoch_iota_system_state(epoch_id))
59                .await?)
60        }
61    }
62
63    /// Make a request to the RPC for its representations of the staked iota we
64    /// parsed out of the object.  Used to implement fields that are
65    /// implemented in JSON-RPC but not GraphQL (yet).
66    pub(crate) async fn fetch_rpc_staked_iota(
67        &self,
68        stake: NativeStakedIota,
69    ) -> Result<RpcStakedIota, Error> {
70        let governance_api = GovernanceReadApi::new(self.inner.clone());
71
72        let mut delegated_stakes = governance_api
73            .get_delegated_stakes(vec![stake])
74            .await
75            .map_err(|e| Error::Internal(format!("Error fetching delegated stake. {e}")))?;
76
77        let Some(mut delegated_stake) = delegated_stakes.pop() else {
78            return Err(Error::Internal(
79                "Error fetching delegated stake. No pools returned.".to_string(),
80            ));
81        };
82
83        let Some(stake) = delegated_stake.stakes.pop() else {
84            return Err(Error::Internal(
85                "Error fetching delegated stake. No stake in pool.".to_string(),
86            ));
87        };
88
89        Ok(stake)
90    }
91}