iota_rpc_loadgen/payload/
validation.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, fmt::Debug};
6
7use futures::future::join_all;
8use iota_json_rpc_types::{
9    IotaObjectDataOptions, IotaObjectResponse, IotaTransactionBlockEffectsAPI,
10    IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
11};
12use iota_sdk::IotaClient;
13use iota_types::base_types::{ObjectID, TransactionDigest};
14use itertools::Itertools;
15use tracing::{error, log::warn};
16
17const LOADGEN_QUERY_MAX_RESULT_LIMIT: usize = 25;
18
19pub(crate) fn cross_validate_entities<U>(entities: &[Vec<U>], entity_name: &str)
20where
21    U: PartialEq + Debug,
22{
23    if entities.len() < 2 {
24        return;
25    }
26
27    let length = entities[0].len();
28    if let Some((vec_index, v)) = entities.iter().enumerate().find(|(_, v)| v.len() != length) {
29        error!(
30            "Entity: {} lengths do not match at index {}: first vec has length {} vs vec {} has length {}",
31            entity_name,
32            vec_index,
33            length,
34            vec_index,
35            v.len()
36        );
37        return;
38    }
39
40    // Iterate through all indices (from 0 to length - 1) of the inner vectors.
41    for i in 0..length {
42        // Create an iterator that produces references to elements at position i in each
43        // inner vector of entities.
44        let mut iter = entities.iter().map(|v| &v[i]);
45
46        // Compare first against rest of the iter (other inner vectors)
47        if let Some(first) = iter.next() {
48            for (j, other) in iter.enumerate() {
49                if first != other {
50                    // Example error: Entity: ExampleEntity mismatch at index 2: expected: 3,
51                    // received ExampleEntity[1]: 4
52                    error!(
53                        "Entity: {} mismatch at index {}: expected: {:?}, received {}: {:?}",
54                        entity_name,
55                        i,
56                        first,
57                        format!("{}[{}]", entity_name, j + 1),
58                        other
59                    );
60                }
61            }
62        }
63    }
64}
65
66pub(crate) async fn check_transactions(
67    clients: &[IotaClient],
68    digests: &[TransactionDigest],
69    cross_validate: bool,
70    verify_objects: bool,
71) -> Vec<Vec<IotaTransactionBlockResponse>> {
72    let transactions: Vec<Vec<IotaTransactionBlockResponse>> =
73        join_all(clients.iter().map(|client| async move {
74            client
75                .read_api()
76                .multi_get_transactions_with_options(
77                    digests.to_vec(),
78                    IotaTransactionBlockResponseOptions::full_content(), /* todo(Will) support
79                                                                          * options for this */
80                )
81                .await
82        }))
83        .await
84        .into_iter()
85        .enumerate()
86        .filter_map(|(i, result)| match result {
87            Ok(transactions) => Some(transactions),
88            Err(err) => {
89                warn!(
90                    "Failed to fetch transactions for vec {i}: {:?}. Logging digests, {:?}",
91                    err, digests
92                );
93                None
94            }
95        })
96        .collect();
97
98    if cross_validate {
99        cross_validate_entities(&transactions, "Transactions");
100    }
101
102    if verify_objects {
103        let object_ids = transactions
104            .iter()
105            .flatten()
106            .flat_map(get_all_object_ids)
107            .collect::<HashSet<_>>()
108            .into_iter()
109            .collect::<Vec<_>>();
110
111        check_objects(clients, &object_ids, cross_validate).await;
112    }
113    transactions
114}
115
116pub(crate) fn get_all_object_ids(response: &IotaTransactionBlockResponse) -> Vec<ObjectID> {
117    let objects = match response.effects.as_ref() {
118        // TODO: handle deleted and wrapped objects
119        Some(effects) => effects.all_changed_objects(),
120        None => {
121            error!(
122                "Effects for transaction digest {} should not be empty",
123                response.digest
124            );
125            vec![]
126        }
127    };
128    objects
129        .iter()
130        .map(|(owned_object_ref, _)| owned_object_ref.object_id())
131        .collect::<Vec<_>>()
132}
133
134pub(crate) fn chunk_entities<U>(entities: &[U], chunk_size: Option<usize>) -> Vec<Vec<U>>
135where
136    U: Clone + PartialEq + Debug,
137{
138    let chunk_size = chunk_size.unwrap_or(LOADGEN_QUERY_MAX_RESULT_LIMIT);
139    entities
140        .iter()
141        .cloned()
142        .chunks(chunk_size)
143        .into_iter()
144        .map(|chunk| chunk.collect())
145        .collect()
146}
147
148pub(crate) async fn check_objects(
149    clients: &[IotaClient],
150    object_ids: &[ObjectID],
151    cross_validate: bool,
152) {
153    let chunks = chunk_entities(object_ids, None);
154    let results = join_all(chunks.iter().map(|chunk| multi_get_object(clients, chunk))).await;
155
156    if cross_validate {
157        for result in results {
158            cross_validate_entities(&result, "Objects");
159        }
160    }
161}
162
163pub(crate) async fn multi_get_object(
164    clients: &[IotaClient],
165    object_ids: &[ObjectID],
166) -> Vec<Vec<IotaObjectResponse>> {
167    let objects: Vec<Vec<IotaObjectResponse>> = join_all(clients.iter().map(|client| async move {
168        let object_ids = if object_ids.len() > LOADGEN_QUERY_MAX_RESULT_LIMIT {
169            warn!(
170                "The input size for multi_get_object_with_options has exceed the query limit\
171         {LOADGEN_QUERY_MAX_RESULT_LIMIT}: {}, time to implement chunking",
172                object_ids.len()
173            );
174            &object_ids[0..LOADGEN_QUERY_MAX_RESULT_LIMIT]
175        } else {
176            object_ids
177        };
178
179        client
180            .read_api()
181            .multi_get_object_with_options(
182                object_ids.to_vec(),
183                IotaObjectDataOptions::full_content(), // todo(Will) support options for this
184            )
185            .await
186    }))
187    .await
188    .into_iter()
189    .enumerate()
190    .filter_map(|(i, result)| match result {
191        Ok(obj_vec) => Some(obj_vec),
192        Err(err) => {
193            error!(
194                "Failed to fetch objects for vec {i}: {:?}. Logging objectIDs, {:?}",
195                err, object_ids
196            );
197            None
198        }
199    })
200    .collect();
201    objects
202}