iota_graphql_rpc/context_data/
db_data_provider.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Duration;
6
7use iota_indexer::{
8    apis::GovernanceReadApi, db::ConnectionPoolConfig, indexer_reader::IndexerReader,
9};
10use iota_json_rpc_types::Stake as RpcStakedIota;
11use iota_types::{
12    governance::StakedIota as NativeStakedIota,
13    iota_system_state::iota_system_state_summary::IotaSystemStateSummary as NativeIotaSystemStateSummary,
14};
15
16use crate::{error::Error, types::system_state_summary::SystemStateSummaryView};
17
18pub(crate) struct PgManager {
19    pub inner: IndexerReader,
20}
21
22impl PgManager {
23    pub(crate) fn new(inner: IndexerReader) -> Self {
24        Self { inner }
25    }
26
27    /// Create a new underlying reader, which is used by this type as well as
28    /// other data providers.
29    pub(crate) fn reader_with_config(
30        db_url: impl Into<String>,
31        pool_size: u32,
32        timeout_ms: u64,
33    ) -> Result<IndexerReader, Error> {
34        let mut config = ConnectionPoolConfig::default();
35        config.set_pool_size(pool_size);
36        config.set_statement_timeout(Duration::from_millis(timeout_ms));
37        IndexerReader::new_with_config(db_url, config)
38            .map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
39    }
40}
41
42/// Implement methods to be used by graphql resolvers
43impl PgManager {
44    /// If no epoch was requested or if the epoch requested is in progress,
45    /// returns the latest iota system state.
46    pub(crate) async fn fetch_iota_system_state(
47        &self,
48        epoch_id: Option<u64>,
49    ) -> Result<NativeIotaSystemStateSummary, Error> {
50        let latest_iota_system_state = self
51            .inner
52            .spawn_blocking(move |this| this.get_latest_iota_system_state())
53            .await?;
54
55        if epoch_id.is_none() || epoch_id.is_some_and(|id| id == latest_iota_system_state.epoch()) {
56            Ok(latest_iota_system_state)
57        } else {
58            Ok(self
59                .inner
60                .spawn_blocking(move |this| this.get_epoch_iota_system_state(epoch_id))
61                .await?)
62        }
63    }
64
65    /// Make a request to the RPC for its representations of the staked iota we
66    /// parsed out of the object.  Used to implement fields that are
67    /// implemented in JSON-RPC but not GraphQL (yet).
68    pub(crate) async fn fetch_rpc_staked_iota(
69        &self,
70        stake: NativeStakedIota,
71    ) -> Result<RpcStakedIota, Error> {
72        let governance_api = GovernanceReadApi::new(self.inner.clone());
73
74        let mut delegated_stakes = governance_api
75            .get_delegated_stakes(vec![stake])
76            .await
77            .map_err(|e| Error::Internal(format!("Error fetching delegated stake. {e}")))?;
78
79        let Some(mut delegated_stake) = delegated_stakes.pop() else {
80            return Err(Error::Internal(
81                "Error fetching delegated stake. No pools returned.".to_string(),
82            ));
83        };
84
85        let Some(stake) = delegated_stake.stakes.pop() else {
86            return Err(Error::Internal(
87                "Error fetching delegated stake. No stake in pool.".to_string(),
88            ));
89        };
90
91        Ok(stake)
92    }
93}