iota_rpc_loadgen/payload/
query_transactions.rs1use anyhow::Result;
6use async_trait::async_trait;
7use futures::future::join_all;
8use iota_json_rpc_types::{
9 IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
10 IotaTransactionBlockResponseQuery, Page, TransactionBlocksPage, TransactionFilter,
11};
12use iota_sdk::IotaClient;
13use iota_types::base_types::TransactionDigest;
14use tracing::log::warn;
15
16use crate::payload::{
17 AddressQueryType, ProcessPayload, QueryTransactionBlocks, RpcCommandProcessor, SignerInfo,
18 validation::cross_validate_entities,
19};
20
21#[async_trait]
22impl<'a> ProcessPayload<'a, &'a QueryTransactionBlocks> for RpcCommandProcessor {
23 async fn process(
24 &'a self,
25 op: &'a QueryTransactionBlocks,
26 _signer_info: &Option<SignerInfo>,
27 ) -> Result<()> {
28 let clients = self.get_clients().await?;
29 let address_type = &op.address_type;
30 if op.addresses.is_empty() {
31 warn!("No addresses provided, skipping query");
32 return Ok(());
33 }
34 let filters = {
35 let mut from: Vec<Option<TransactionFilter>> = op
36 .addresses
37 .iter()
38 .map(|address| Some(TransactionFilter::FromAddress(*address)))
39 .collect();
40
41 let mut to = op
42 .addresses
43 .iter()
44 .map(|address| Some(TransactionFilter::ToAddress(*address)))
45 .collect();
46
47 match address_type {
48 AddressQueryType::From => from,
49 AddressQueryType::To => to,
50 AddressQueryType::Both => from.drain(..).chain(to.drain(..)).collect(),
51 }
52 };
53
54 let queries: Vec<IotaTransactionBlockResponseQuery> = filters
55 .into_iter()
56 .map(|filter| IotaTransactionBlockResponseQuery {
57 filter,
58 options: Some(IotaTransactionBlockResponseOptions::full_content()),
59 })
60 .collect();
61
62 for query in queries {
64 let mut results: Vec<TransactionBlocksPage> = Vec::new();
65
66 while results.is_empty() || results.iter().any(|r| r.has_next_page) {
68 let cursor = if results.is_empty() {
69 None
70 } else {
71 match (
72 results.first().unwrap().next_cursor,
73 results.get(1).unwrap().next_cursor,
74 ) {
75 (Some(first_cursor), Some(second_cursor)) => {
76 if first_cursor != second_cursor {
77 warn!(
78 "Cursors are not the same, received {} vs {}. Selecting the first cursor to continue",
79 first_cursor, second_cursor
80 );
81 }
82 Some(first_cursor)
83 }
84 (Some(cursor), None) | (None, Some(cursor)) => Some(cursor),
85 (None, None) => None,
86 }
87 };
88
89 results = join_all(clients.iter().map(|client| {
90 let with_query = query.clone();
91 async move {
92 query_transaction_blocks(client, with_query, cursor, None)
93 .await
94 .unwrap()
95 }
96 }))
97 .await;
98
99 let transactions: Vec<Vec<IotaTransactionBlockResponse>> =
100 results.iter().map(|page| page.data.clone()).collect();
101 cross_validate_entities(&transactions, "Transactions");
102 }
103 }
104 Ok(())
105 }
106}
107
108async fn query_transaction_blocks(
109 client: &IotaClient,
110 query: IotaTransactionBlockResponseQuery,
111 cursor: Option<TransactionDigest>,
112 limit: Option<usize>, ) -> Result<Page<IotaTransactionBlockResponse, TransactionDigest>> {
114 let transactions = client
115 .read_api()
116 .query_transaction_blocks(query, cursor, limit, true)
117 .await
118 .unwrap();
119 Ok(transactions)
120}