iota_rpc_loadgen/payload/
get_checkpoints.rs1use std::sync::Arc;
6
7use anyhow::Result;
8use async_trait::async_trait;
9use dashmap::DashSet;
10use futures::future::join_all;
11use iota_json_rpc_types::CheckpointId;
12use iota_types::base_types::TransactionDigest;
13use itertools::Itertools;
14use tokio::sync::Mutex;
15use tracing::{debug, error, info, log::warn};
16
17use crate::payload::{
18 GetCheckpoints, ProcessPayload, RpcCommandProcessor, SignerInfo,
19 checkpoint_utils::get_latest_checkpoint_stats, validation::check_transactions,
20};
21
22#[async_trait]
23impl<'a> ProcessPayload<'a, &'a GetCheckpoints> for RpcCommandProcessor {
24 async fn process(
25 &'a self,
26 op: &'a GetCheckpoints,
27 _signer_info: &Option<SignerInfo>,
28 ) -> Result<()> {
29 let clients = self.get_clients().await?;
30
31 let checkpoint_stats = get_latest_checkpoint_stats(&clients, op.end).await;
32 let max_checkpoint = checkpoint_stats.max_latest_checkpoint();
33 debug!("GetCheckpoints({}, {:?})", op.start, max_checkpoint,);
34
35 let cross_validate = true;
37
38 for seq in op.start..=max_checkpoint {
39 let transaction_digests: Arc<Mutex<DashSet<TransactionDigest>>> =
40 Arc::new(Mutex::new(DashSet::new()));
41 let checkpoints = join_all(clients.iter().enumerate().map(|(i, client)| {
42 let transaction_digests = transaction_digests.clone();
43 let end_checkpoint_for_clients = checkpoint_stats.latest_checkpoints.clone();
44 async move {
45 if end_checkpoint_for_clients[i] < seq {
46 warn!(
48 "The RPC server corresponding to the {i}th url has a outdated checkpoint number {}.\
49 The latest checkpoint number is {seq}",
50 end_checkpoint_for_clients[i]
51 );
52 return None;
53 }
54
55 match client
56 .read_api()
57 .get_checkpoint(CheckpointId::SequenceNumber(seq))
58 .await {
59 Ok(t) => {
60 if t.sequence_number != seq {
61 error!("The RPC server corresponding to the {i}th url has unexpected checkpoint sequence number {}, expected {seq}", t.sequence_number,);
62 }
63 for digest in t.transactions.iter() {
64 transaction_digests.lock().await.insert(*digest);
65 }
66 Some(t)
67 },
68 Err(err) => {
69 error!("Failed to fetch checkpoint {seq} on the {i}th url: {err}");
70 None
71 }
72 }
73 }
74 }))
75 .await;
76
77 let transaction_digests = transaction_digests
78 .lock()
79 .await
80 .iter()
81 .map(|digest| *digest)
82 .collect::<Vec<_>>();
83
84 if op.verify_transactions {
85 let transaction_responses = check_transactions(
86 &clients,
87 &transaction_digests,
88 cross_validate,
89 op.verify_objects,
90 )
91 .await
92 .into_iter()
93 .concat();
94
95 if op.record {
96 debug!("adding addresses and object ids from response");
97 self.add_addresses_from_response(&transaction_responses);
98 self.add_object_ids_from_response(&transaction_responses);
99 };
100 }
101
102 if op.record {
103 debug!("adding transaction digests from response");
104 self.add_transaction_digests(transaction_digests);
105 };
106
107 if cross_validate {
108 let valid_checkpoint = checkpoints.iter().enumerate().find_map(|(i, x)| {
109 if x.is_some() {
110 Some((i, x.clone().unwrap()))
111 } else {
112 None
113 }
114 });
115
116 if valid_checkpoint.is_none() {
117 error!("none of the urls are returning valid checkpoint for seq {seq}");
118 continue;
119 }
120 let (valid_checkpoint_idx, valid_checkpoint) = valid_checkpoint.unwrap();
122 for (i, x) in checkpoints.iter().enumerate() {
123 if i == valid_checkpoint_idx {
124 continue;
125 }
126 let eq = x.is_none() || x.as_ref().unwrap() == &valid_checkpoint;
128 if !eq {
129 error!(
130 "getCheckpoint {seq} has a different result between the {valid_checkpoint_idx}th and {i}th URL {:?} {:?}",
131 x, checkpoints[valid_checkpoint_idx]
132 )
133 }
134 }
135 }
136
137 if seq % 10000 == 0 {
138 info!("Finished processing checkpoint {seq}");
139 }
140 }
141
142 Ok(())
143 }
144}