iota_bridge_indexer/
iota_transaction_queries.rs1use std::{sync::Arc, time::Duration};
6
7use iota_bridge::{metrics::BridgeMetrics, retry_with_max_elapsed_time};
8use iota_json_rpc_types::{
9 IotaTransactionBlockResponseOptions, IotaTransactionBlockResponseQuery, TransactionFilter,
10};
11use iota_sdk::IotaClient;
12use iota_types::{IOTA_BRIDGE_OBJECT_ID, digests::TransactionDigest};
13use tracing::{error, info};
14
15use crate::types::RetrievedTransaction;
16
17const QUERY_DURATION: Duration = Duration::from_secs(1);
18const SLEEP_DURATION: Duration = Duration::from_secs(5);
19
20pub async fn start_iota_tx_polling_task(
21 iota_client: IotaClient,
22 mut cursor: Option<TransactionDigest>,
23 tx: iota_metrics::metered_channel::Sender<(
24 Vec<RetrievedTransaction>,
25 Option<TransactionDigest>,
26 )>,
27 metrics: Arc<BridgeMetrics>,
28) {
29 info!("Starting IOTA transaction polling task from {:?}", cursor);
30 loop {
31 let Ok(Ok(results)) = retry_with_max_elapsed_time!(
32 iota_client.read_api().query_transaction_blocks(
33 IotaTransactionBlockResponseQuery {
34 filter: Some(TransactionFilter::InputObject(IOTA_BRIDGE_OBJECT_ID)),
35 options: Some(IotaTransactionBlockResponseOptions::full_content()),
36 },
37 cursor,
38 None,
39 false,
40 ),
41 Duration::from_secs(600)
42 ) else {
43 error!("Failed to query bridge transactions after retry");
44 continue;
45 };
46 info!("Retrieved {} bridge transactions", results.data.len());
47 let txes = match results
48 .data
49 .into_iter()
50 .map(RetrievedTransaction::try_from)
51 .collect::<anyhow::Result<Vec<_>>>()
52 {
53 Ok(data) => data,
54 Err(e) => {
55 error!(
58 "Failed to convert retrieved transactions to sanitized format: {}",
59 e
60 );
61 tokio::time::sleep(SLEEP_DURATION).await;
62 continue;
63 }
64 };
65 if txes.is_empty() {
66 tokio::time::sleep(QUERY_DURATION).await;
69 continue;
70 }
71 let ckp = txes.last().unwrap().checkpoint;
73 tx.send((txes, results.next_cursor))
74 .await
75 .expect("Failed to send transaction block to process");
76 metrics.last_synced_iota_checkpoint.set(ckp as i64);
77 cursor = results.next_cursor;
78 }
79}