iota_rpc_loadgen/payload/
get_checkpoints.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use anyhow::Result;
8use async_trait::async_trait;
9use dashmap::DashSet;
10use futures::future::join_all;
11use iota_json_rpc_types::CheckpointId;
12use iota_types::base_types::TransactionDigest;
13use itertools::Itertools;
14use tokio::sync::Mutex;
15use tracing::{debug, error, info, log::warn};
16
17use crate::payload::{
18    GetCheckpoints, ProcessPayload, RpcCommandProcessor, SignerInfo,
19    checkpoint_utils::get_latest_checkpoint_stats, validation::check_transactions,
20};
21
22#[async_trait]
23impl<'a> ProcessPayload<'a, &'a GetCheckpoints> for RpcCommandProcessor {
24    async fn process(
25        &'a self,
26        op: &'a GetCheckpoints,
27        _signer_info: &Option<SignerInfo>,
28    ) -> Result<()> {
29        let clients = self.get_clients().await?;
30
31        let checkpoint_stats = get_latest_checkpoint_stats(&clients, op.end).await;
32        let max_checkpoint = checkpoint_stats.max_latest_checkpoint();
33        debug!("GetCheckpoints({}, {:?})", op.start, max_checkpoint,);
34
35        // TODO(chris): read `cross_validate` from config
36        let cross_validate = true;
37
38        for seq in op.start..=max_checkpoint {
39            let transaction_digests: Arc<Mutex<DashSet<TransactionDigest>>> =
40                Arc::new(Mutex::new(DashSet::new()));
41            let checkpoints = join_all(clients.iter().enumerate().map(|(i, client)| {
42                let transaction_digests = transaction_digests.clone();
43                let end_checkpoint_for_clients = checkpoint_stats.latest_checkpoints.clone();
44                async move {
45                    if end_checkpoint_for_clients[i] < seq {
46                        // TODO(chris) log actual url
47                        warn!(
48                            "The RPC server corresponding to the {i}th url has a outdated checkpoint number {}.\
49                            The latest checkpoint number is {seq}",
50                            end_checkpoint_for_clients[i]
51                        );
52                        return None;
53                    }
54
55                    match client
56                        .read_api()
57                        .get_checkpoint(CheckpointId::SequenceNumber(seq))
58                        .await {
59                        Ok(t) => {
60                            if t.sequence_number != seq {
61                                error!("The RPC server corresponding to the {i}th url has unexpected checkpoint sequence number {}, expected {seq}", t.sequence_number,);
62                            }
63                            for digest in t.transactions.iter() {
64                                transaction_digests.lock().await.insert(*digest);
65                            }
66                            Some(t)
67                        },
68                        Err(err) => {
69                            error!("Failed to fetch checkpoint {seq} on the {i}th url: {err}");
70                            None
71                        }
72                    }
73                }
74            }))
75                .await;
76
77            let transaction_digests = transaction_digests
78                .lock()
79                .await
80                .iter()
81                .map(|digest| *digest)
82                .collect::<Vec<_>>();
83
84            if op.verify_transactions {
85                let transaction_responses = check_transactions(
86                    &clients,
87                    &transaction_digests,
88                    cross_validate,
89                    op.verify_objects,
90                )
91                .await
92                .into_iter()
93                .concat();
94
95                if op.record {
96                    debug!("adding addresses and object ids from response");
97                    self.add_addresses_from_response(&transaction_responses);
98                    self.add_object_ids_from_response(&transaction_responses);
99                };
100            }
101
102            if op.record {
103                debug!("adding transaction digests from response");
104                self.add_transaction_digests(transaction_digests);
105            };
106
107            if cross_validate {
108                let valid_checkpoint = checkpoints.iter().enumerate().find_map(|(i, x)| {
109                    if x.is_some() {
110                        Some((i, x.clone().unwrap()))
111                    } else {
112                        None
113                    }
114                });
115
116                if valid_checkpoint.is_none() {
117                    error!("none of the urls are returning valid checkpoint for seq {seq}");
118                    continue;
119                }
120                // safe to unwrap because we check some above
121                let (valid_checkpoint_idx, valid_checkpoint) = valid_checkpoint.unwrap();
122                for (i, x) in checkpoints.iter().enumerate() {
123                    if i == valid_checkpoint_idx {
124                        continue;
125                    }
126                    // ignore the None value because it's warned above
127                    let eq = x.is_none() || x.as_ref().unwrap() == &valid_checkpoint;
128                    if !eq {
129                        error!(
130                            "getCheckpoint {seq} has a different result between the {valid_checkpoint_idx}th and {i}th URL {:?} {:?}",
131                            x, checkpoints[valid_checkpoint_idx]
132                        )
133                    }
134                }
135            }
136
137            if seq % 10000 == 0 {
138                info!("Finished processing checkpoint {seq}");
139            }
140        }
141
142        Ok(())
143    }
144}