iota_rpc_loadgen/payload/
query_transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use futures::future::join_all;
8use iota_json_rpc_types::{
9    IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
10    IotaTransactionBlockResponseQuery, Page, TransactionBlocksPage, TransactionFilter,
11};
12use iota_sdk::IotaClient;
13use iota_types::base_types::TransactionDigest;
14use tracing::log::warn;
15
16use crate::payload::{
17    AddressQueryType, ProcessPayload, QueryTransactionBlocks, RpcCommandProcessor, SignerInfo,
18    validation::cross_validate_entities,
19};
20
21#[async_trait]
22impl<'a> ProcessPayload<'a, &'a QueryTransactionBlocks> for RpcCommandProcessor {
23    async fn process(
24        &'a self,
25        op: &'a QueryTransactionBlocks,
26        _signer_info: &Option<SignerInfo>,
27    ) -> Result<()> {
28        let clients = self.get_clients().await?;
29        let address_type = &op.address_type;
30        if op.addresses.is_empty() {
31            warn!("No addresses provided, skipping query");
32            return Ok(());
33        }
34        let filters = {
35            let mut from: Vec<Option<TransactionFilter>> = op
36                .addresses
37                .iter()
38                .map(|address| Some(TransactionFilter::FromAddress(*address)))
39                .collect();
40
41            let mut to = op
42                .addresses
43                .iter()
44                .map(|address| Some(TransactionFilter::ToAddress(*address)))
45                .collect();
46
47            match address_type {
48                AddressQueryType::From => from,
49                AddressQueryType::To => to,
50                AddressQueryType::Both => from.drain(..).chain(to.drain(..)).collect(),
51            }
52        };
53
54        let queries: Vec<IotaTransactionBlockResponseQuery> = filters
55            .into_iter()
56            .map(|filter| IotaTransactionBlockResponseQuery {
57                filter,
58                options: Some(IotaTransactionBlockResponseOptions::full_content()),
59            })
60            .collect();
61
62        // todo: can map this
63        for query in queries {
64            let mut results: Vec<TransactionBlocksPage> = Vec::new();
65
66            // Paginate results, if any
67            while results.is_empty() || results.iter().any(|r| r.has_next_page) {
68                let cursor = if results.is_empty() {
69                    None
70                } else {
71                    match (
72                        results.first().unwrap().next_cursor,
73                        results.get(1).unwrap().next_cursor,
74                    ) {
75                        (Some(first_cursor), Some(second_cursor)) => {
76                            if first_cursor != second_cursor {
77                                warn!(
78                                    "Cursors are not the same, received {} vs {}. Selecting the first cursor to continue",
79                                    first_cursor, second_cursor
80                                );
81                            }
82                            Some(first_cursor)
83                        }
84                        (Some(cursor), None) | (None, Some(cursor)) => Some(cursor),
85                        (None, None) => None,
86                    }
87                };
88
89                results = join_all(clients.iter().map(|client| {
90                    let with_query = query.clone();
91                    async move {
92                        query_transaction_blocks(client, with_query, cursor, None)
93                            .await
94                            .unwrap()
95                    }
96                }))
97                .await;
98
99                let transactions: Vec<Vec<IotaTransactionBlockResponse>> =
100                    results.iter().map(|page| page.data.clone()).collect();
101                cross_validate_entities(&transactions, "Transactions");
102            }
103        }
104        Ok(())
105    }
106}
107
108async fn query_transaction_blocks(
109    client: &IotaClient,
110    query: IotaTransactionBlockResponseQuery,
111    cursor: Option<TransactionDigest>,
112    limit: Option<usize>, // TODO: we should probably set a limit and paginate
113) -> Result<Page<IotaTransactionBlockResponse, TransactionDigest>> {
114    let transactions = client
115        .read_api()
116        .query_transaction_blocks(query, cursor, limit, true)
117        .await
118        .unwrap();
119    Ok(transactions)
120}