iota_graphql_rpc/server/
exchange_rates_task.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use iota_indexer::apis::GovernanceReadApi;
6use tokio::sync::watch;
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info};
9
10use crate::data::pg::PgExecutor;
11
12/// Background task for kicking on epoch change the exchange rates function on
13/// the indexer, which caches the ValidatorExchangeRates that are needed for
14/// computing APYs.
15pub(crate) struct TriggerExchangeRatesTask {
16    cancel: CancellationToken,
17    db: PgExecutor,
18    epoch_rx: watch::Receiver<u64>,
19}
20
21impl TriggerExchangeRatesTask {
22    pub(crate) fn new(
23        db: PgExecutor,
24        epoch_rx: watch::Receiver<u64>,
25        cancel: CancellationToken,
26    ) -> Self {
27        Self {
28            db,
29            epoch_rx,
30            cancel,
31        }
32    }
33
34    pub(crate) async fn run(&mut self) {
35        loop {
36            tokio::select! {
37                _ = self.cancel.cancelled() => {
38                    info!("Shutdown signal received, terminating trigger exchange rates task");
39                    return;
40                }
41
42                _ = self.epoch_rx.changed() => {
43                    info!("Detected epoch boundary, triggering call to exchange rates");
44                    let latest_iota_system_state = self.db.inner.spawn_blocking(move |this|
45                        this.get_latest_iota_system_state()
46                    ).await.map_err(|_| error!("Failed to fetch latest IOTA system state"));
47
48                    if let Ok(latest_iota_system_state) = latest_iota_system_state {
49                        let db = self.db.clone();
50                        let governance_api = GovernanceReadApi::new(db.inner) ;
51                        governance_api.exchange_rates( &latest_iota_system_state)
52                            .await
53                            .map_err(|e| error!("Failed to fetch exchange rates: {:?}", e))
54                            .ok();
55                        info!("Finished fetching exchange rates");
56                    }
57                }
58            }
59        }
60    }
61}