iota_rpc_loadgen/payload/
validation.rs1use std::{collections::HashSet, fmt::Debug};
6
7use futures::future::join_all;
8use iota_json_rpc_types::{
9 IotaObjectDataOptions, IotaObjectResponse, IotaTransactionBlockEffectsAPI,
10 IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
11};
12use iota_sdk::IotaClient;
13use iota_types::base_types::{ObjectID, TransactionDigest};
14use itertools::Itertools;
15use tracing::{error, log::warn};
16
17const LOADGEN_QUERY_MAX_RESULT_LIMIT: usize = 25;
18
19pub(crate) fn cross_validate_entities<U>(entities: &[Vec<U>], entity_name: &str)
20where
21 U: PartialEq + Debug,
22{
23 if entities.len() < 2 {
24 return;
25 }
26
27 let length = entities[0].len();
28 if let Some((vec_index, v)) = entities.iter().enumerate().find(|(_, v)| v.len() != length) {
29 error!(
30 "Entity: {} lengths do not match at index {}: first vec has length {} vs vec {} has length {}",
31 entity_name,
32 vec_index,
33 length,
34 vec_index,
35 v.len()
36 );
37 return;
38 }
39
40 for i in 0..length {
42 let mut iter = entities.iter().map(|v| &v[i]);
45
46 if let Some(first) = iter.next() {
48 for (j, other) in iter.enumerate() {
49 if first != other {
50 error!(
53 "Entity: {} mismatch at index {}: expected: {:?}, received {}: {:?}",
54 entity_name,
55 i,
56 first,
57 format!("{}[{}]", entity_name, j + 1),
58 other
59 );
60 }
61 }
62 }
63 }
64}
65
66pub(crate) async fn check_transactions(
67 clients: &[IotaClient],
68 digests: &[TransactionDigest],
69 cross_validate: bool,
70 verify_objects: bool,
71) -> Vec<Vec<IotaTransactionBlockResponse>> {
72 let transactions: Vec<Vec<IotaTransactionBlockResponse>> =
73 join_all(clients.iter().map(|client| async move {
74 client
75 .read_api()
76 .multi_get_transactions_with_options(
77 digests.to_vec(),
78 IotaTransactionBlockResponseOptions::full_content(), )
81 .await
82 }))
83 .await
84 .into_iter()
85 .enumerate()
86 .filter_map(|(i, result)| match result {
87 Ok(transactions) => Some(transactions),
88 Err(err) => {
89 warn!(
90 "Failed to fetch transactions for vec {i}: {:?}. Logging digests, {:?}",
91 err, digests
92 );
93 None
94 }
95 })
96 .collect();
97
98 if cross_validate {
99 cross_validate_entities(&transactions, "Transactions");
100 }
101
102 if verify_objects {
103 let object_ids = transactions
104 .iter()
105 .flatten()
106 .flat_map(get_all_object_ids)
107 .collect::<HashSet<_>>()
108 .into_iter()
109 .collect::<Vec<_>>();
110
111 check_objects(clients, &object_ids, cross_validate).await;
112 }
113 transactions
114}
115
116pub(crate) fn get_all_object_ids(response: &IotaTransactionBlockResponse) -> Vec<ObjectID> {
117 let objects = match response.effects.as_ref() {
118 Some(effects) => effects.all_changed_objects(),
120 None => {
121 error!(
122 "Effects for transaction digest {} should not be empty",
123 response.digest
124 );
125 vec![]
126 }
127 };
128 objects
129 .iter()
130 .map(|(owned_object_ref, _)| owned_object_ref.object_id())
131 .collect::<Vec<_>>()
132}
133
134pub(crate) fn chunk_entities<U>(entities: &[U], chunk_size: Option<usize>) -> Vec<Vec<U>>
135where
136 U: Clone + PartialEq + Debug,
137{
138 let chunk_size = chunk_size.unwrap_or(LOADGEN_QUERY_MAX_RESULT_LIMIT);
139 entities
140 .iter()
141 .cloned()
142 .chunks(chunk_size)
143 .into_iter()
144 .map(|chunk| chunk.collect())
145 .collect()
146}
147
148pub(crate) async fn check_objects(
149 clients: &[IotaClient],
150 object_ids: &[ObjectID],
151 cross_validate: bool,
152) {
153 let chunks = chunk_entities(object_ids, None);
154 let results = join_all(chunks.iter().map(|chunk| multi_get_object(clients, chunk))).await;
155
156 if cross_validate {
157 for result in results {
158 cross_validate_entities(&result, "Objects");
159 }
160 }
161}
162
163pub(crate) async fn multi_get_object(
164 clients: &[IotaClient],
165 object_ids: &[ObjectID],
166) -> Vec<Vec<IotaObjectResponse>> {
167 let objects: Vec<Vec<IotaObjectResponse>> = join_all(clients.iter().map(|client| async move {
168 let object_ids = if object_ids.len() > LOADGEN_QUERY_MAX_RESULT_LIMIT {
169 warn!(
170 "The input size for multi_get_object_with_options has exceed the query limit\
171 {LOADGEN_QUERY_MAX_RESULT_LIMIT}: {}, time to implement chunking",
172 object_ids.len()
173 );
174 &object_ids[0..LOADGEN_QUERY_MAX_RESULT_LIMIT]
175 } else {
176 object_ids
177 };
178
179 client
180 .read_api()
181 .multi_get_object_with_options(
182 object_ids.to_vec(),
183 IotaObjectDataOptions::full_content(), )
185 .await
186 }))
187 .await
188 .into_iter()
189 .enumerate()
190 .filter_map(|(i, result)| match result {
191 Ok(obj_vec) => Some(obj_vec),
192 Err(err) => {
193 error!(
194 "Failed to fetch objects for vec {i}: {:?}. Logging objectIDs, {:?}",
195 err, object_ids
196 );
197 None
198 }
199 })
200 .collect();
201 objects
202}