iota_graphql_rpc/server/
exchange_rates_task.rs1use iota_indexer::apis::GovernanceReadApi;
6use tokio::sync::watch;
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info};
9
10use crate::data::pg::PgExecutor;
11
12pub(crate) struct TriggerExchangeRatesTask {
16 cancel: CancellationToken,
17 db: PgExecutor,
18 epoch_rx: watch::Receiver<u64>,
19}
20
21impl TriggerExchangeRatesTask {
22 pub(crate) fn new(
23 db: PgExecutor,
24 epoch_rx: watch::Receiver<u64>,
25 cancel: CancellationToken,
26 ) -> Self {
27 Self {
28 db,
29 epoch_rx,
30 cancel,
31 }
32 }
33
34 pub(crate) async fn run(&mut self) {
35 loop {
36 tokio::select! {
37 _ = self.cancel.cancelled() => {
38 info!("Shutdown signal received, terminating trigger exchange rates task");
39 return;
40 }
41
42 _ = self.epoch_rx.changed() => {
43 info!("Detected epoch boundary, triggering call to exchange rates");
44 let latest_iota_system_state = self.db.inner.spawn_blocking(move |this|
45 this.get_latest_iota_system_state()
46 ).await.map_err(|_| error!("Failed to fetch latest IOTA system state"));
47
48 if let Ok(latest_iota_system_state) = latest_iota_system_state {
49 let db = self.db.clone();
50 let governance_api = GovernanceReadApi::new(db.inner) ;
51 governance_api.exchange_rates( &latest_iota_system_state)
52 .await
53 .map_err(|e| error!("Failed to fetch exchange rates: {:?}", e))
54 .ok();
55 info!("Finished fetching exchange rates");
56 }
57 }
58 }
59 }
60 }
61}