iota_bridge_indexer/
iota_transaction_queries.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use iota_bridge::{metrics::BridgeMetrics, retry_with_max_elapsed_time};
8use iota_json_rpc_types::{
9    IotaTransactionBlockResponseOptions, IotaTransactionBlockResponseQuery, TransactionFilter,
10};
11use iota_sdk::IotaClient;
12use iota_types::{IOTA_BRIDGE_OBJECT_ID, digests::TransactionDigest};
13use tracing::{error, info};
14
15use crate::types::RetrievedTransaction;
16
17const QUERY_DURATION: Duration = Duration::from_secs(1);
18const SLEEP_DURATION: Duration = Duration::from_secs(5);
19
20pub async fn start_iota_tx_polling_task(
21    iota_client: IotaClient,
22    mut cursor: Option<TransactionDigest>,
23    tx: iota_metrics::metered_channel::Sender<(
24        Vec<RetrievedTransaction>,
25        Option<TransactionDigest>,
26    )>,
27    metrics: Arc<BridgeMetrics>,
28) {
29    info!("Starting IOTA transaction polling task from {:?}", cursor);
30    loop {
31        let Ok(Ok(results)) = retry_with_max_elapsed_time!(
32            iota_client.read_api().query_transaction_blocks(
33                IotaTransactionBlockResponseQuery {
34                    filter: Some(TransactionFilter::InputObject(IOTA_BRIDGE_OBJECT_ID)),
35                    options: Some(IotaTransactionBlockResponseOptions::full_content()),
36                },
37                cursor,
38                None,
39                false,
40            ),
41            Duration::from_secs(600)
42        ) else {
43            error!("Failed to query bridge transactions after retry");
44            continue;
45        };
46        info!("Retrieved {} bridge transactions", results.data.len());
47        let txes = match results
48            .data
49            .into_iter()
50            .map(RetrievedTransaction::try_from)
51            .collect::<anyhow::Result<Vec<_>>>()
52        {
53            Ok(data) => data,
54            Err(e) => {
55                // TODO: Sometimes fullnode does not return checkpoint strangely. We retry
56                // instead of panicking.
57                error!(
58                    "Failed to convert retrieved transactions to sanitized format: {}",
59                    e
60                );
61                tokio::time::sleep(SLEEP_DURATION).await;
62                continue;
63            }
64        };
65        if txes.is_empty() {
66            // When there is no more new data, we are caught up, no need to stress the
67            // fullnode
68            tokio::time::sleep(QUERY_DURATION).await;
69            continue;
70        }
71        // Unwrap: txes is not empty
72        let ckp = txes.last().unwrap().checkpoint;
73        tx.send((txes, results.next_cursor))
74            .await
75            .expect("Failed to send transaction block to process");
76        metrics.last_synced_iota_checkpoint.set(ckp as i64);
77        cursor = results.next_cursor;
78    }
79}