iota_single_node_benchmark/
single_node.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, HashSet},
7    sync::Arc,
8};
9
10use iota_core::{
11    authority::{
12        AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
13        authority_store_tables::LiveObject, test_authority_builder::TestAuthorityBuilder,
14    },
15    authority_server::{ValidatorService, ValidatorServiceMetrics},
16    checkpoints::checkpoint_executor::CheckpointExecutor,
17    consensus_adapter::{
18        ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics,
19    },
20    mock_consensus::{ConsensusMode, MockConsensusClient},
21    state_accumulator::StateAccumulator,
22};
23use iota_test_transaction_builder::{PublishData, TestTransactionBuilder};
24use iota_types::{
25    base_types::{AuthorityName, IotaAddress, ObjectRef, TransactionDigest},
26    committee::Committee,
27    crypto::{AccountKeyPair, AuthoritySignature, Signer},
28    effects::{TransactionEffects, TransactionEffectsAPI},
29    executable_transaction::VerifiedExecutableTransaction,
30    messages_checkpoint::{VerifiedCheckpoint, VerifiedCheckpointContents},
31    messages_grpc::HandleTransactionResponse,
32    mock_checkpoint_builder::{MockCheckpointBuilder, ValidatorKeypairProvider},
33    object::Object,
34    transaction::{
35        CertifiedTransaction, DEFAULT_VALIDATOR_GAS_PRICE, Transaction, TransactionDataAPI,
36        VerifiedCertificate, VerifiedTransaction,
37    },
38};
39use tokio::sync::broadcast;
40
41use crate::{command::Component, mock_storage::InMemoryObjectStore};
42
43#[derive(Clone)]
44pub struct SingleValidator {
45    validator_service: Arc<ValidatorService>,
46    epoch_store: Arc<AuthorityPerEpochStore>,
47}
48
49impl SingleValidator {
50    pub(crate) async fn new(genesis_objects: &[Object], component: Component) -> Self {
51        let validator = TestAuthorityBuilder::new()
52            .disable_indexer()
53            .with_starting_objects(genesis_objects)
54            // This is needed to properly run checkpoint executor.
55            .insert_genesis_checkpoint()
56            .build()
57            .await;
58        let epoch_store = validator.epoch_store_for_testing().clone();
59        let consensus_mode = match component {
60            Component::ValidatorWithFakeConsensus => ConsensusMode::DirectSequencing,
61            _ => ConsensusMode::Noop,
62        };
63        let consensus_adapter = Arc::new(ConsensusAdapter::new(
64            Arc::new(MockConsensusClient::new(
65                Arc::downgrade(&validator),
66                consensus_mode,
67            )),
68            validator.name,
69            Arc::new(ConnectionMonitorStatusForTests {}),
70            100_000,
71            100_000,
72            None,
73            None,
74            ConsensusAdapterMetrics::new_test(),
75        ));
76        // TODO: for validator benchmarking purposes, we should allow for traffic
77        // control to be configurable and introduce traffic control benchmarks
78        // to test against different policies
79        let validator_service = Arc::new(ValidatorService::new_for_tests(
80            validator,
81            consensus_adapter,
82            Arc::new(ValidatorServiceMetrics::new_for_tests()),
83        ));
84        Self {
85            validator_service,
86            epoch_store,
87        }
88    }
89
90    pub fn get_validator(&self) -> &Arc<AuthorityState> {
91        self.validator_service.validator_state()
92    }
93
94    pub fn get_epoch_store(&self) -> &Arc<AuthorityPerEpochStore> {
95        &self.epoch_store
96    }
97
98    /// Publish a package, returns the package object and the updated gas
99    /// object.
100    pub async fn publish_package(
101        &self,
102        publish_data: PublishData,
103        sender: IotaAddress,
104        keypair: &AccountKeyPair,
105        gas: ObjectRef,
106    ) -> (ObjectRef, ObjectRef) {
107        let tx_builder = TestTransactionBuilder::new(sender, gas, DEFAULT_VALIDATOR_GAS_PRICE)
108            .publish_with_data(publish_data);
109        let transaction = tx_builder.build_and_sign(keypair);
110        let effects = self.execute_raw_transaction(transaction).await;
111        let package = effects
112            .all_changed_objects()
113            .into_iter()
114            .filter_map(|(oref, owner, _)| owner.is_immutable().then_some(oref))
115            .next()
116            .unwrap();
117        let updated_gas = effects.gas_object().0;
118        (package, updated_gas)
119    }
120
121    pub async fn execute_raw_transaction(&self, transaction: Transaction) -> TransactionEffects {
122        let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
123            VerifiedTransaction::new_unchecked(transaction),
124            0,
125        );
126        let effects = self
127            .get_validator()
128            .try_execute_immediately(&executable, None, &self.epoch_store)
129            .await
130            .unwrap()
131            .0;
132        assert!(effects.status().is_ok());
133        effects
134    }
135
136    pub async fn execute_dry_run(&self, transaction: Transaction) -> TransactionEffects {
137        let effects = self
138            .get_validator()
139            .dry_exec_transaction_for_benchmark(
140                transaction.data().intent_message().value.clone(),
141                *transaction.digest(),
142            )
143            .await
144            .unwrap()
145            .2;
146        assert!(effects.status().is_ok());
147        effects
148    }
149
150    pub async fn execute_certificate(
151        &self,
152        cert: CertifiedTransaction,
153        component: Component,
154    ) -> TransactionEffects {
155        let effects = match component {
156            Component::Baseline => {
157                let cert = VerifiedExecutableTransaction::new_from_certificate(
158                    VerifiedCertificate::new_unchecked(cert),
159                );
160                self.get_validator()
161                    .try_execute_immediately(&cert, None, &self.epoch_store)
162                    .await
163                    .unwrap()
164                    .0
165            }
166            Component::WithTxManager => {
167                let cert = VerifiedCertificate::new_unchecked(cert);
168                if cert.contains_shared_object() {
169                    // For shared objects transactions, `execute_certificate` won't enqueue it
170                    // because it expects consensus to do so. However we don't
171                    // have consensus, hence the manual enqueue.
172                    self.get_validator()
173                        .enqueue_certificates_for_execution(vec![cert.clone()], &self.epoch_store);
174                }
175                self.get_validator()
176                    .execute_certificate(&cert, &self.epoch_store)
177                    .await
178                    .unwrap()
179            }
180            Component::ValidatorWithoutConsensus | Component::ValidatorWithFakeConsensus => {
181                let response = self
182                    .validator_service
183                    .execute_certificate_for_testing(cert)
184                    .await
185                    .unwrap()
186                    .into_inner();
187                response.signed_effects.into_data()
188            }
189            Component::TxnSigning | Component::CheckpointExecutor | Component::ExecutionOnly => {
190                unreachable!()
191            }
192        };
193        assert!(effects.status().is_ok());
194        effects
195    }
196
197    pub(crate) async fn execute_transaction_in_memory(
198        &self,
199        store: InMemoryObjectStore,
200        transaction: CertifiedTransaction,
201    ) -> TransactionEffects {
202        let input_objects = transaction.transaction_data().input_objects().unwrap();
203        let objects = store
204            .read_objects_for_execution(&*self.epoch_store, &transaction.key(), &input_objects)
205            .unwrap();
206
207        let executable = VerifiedExecutableTransaction::new_from_certificate(
208            VerifiedCertificate::new_unchecked(transaction),
209        );
210        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
211            &executable,
212            objects,
213            self.epoch_store.protocol_config(),
214            self.epoch_store.reference_gas_price(),
215        )
216        .unwrap();
217        let (kind, signer, gas) = executable.transaction_data().execution_parts();
218        let (inner_temp_store, _, effects, _) =
219            self.epoch_store.executor().execute_transaction_to_effects(
220                &store,
221                self.epoch_store.protocol_config(),
222                self.get_validator().metrics.limits_metrics.clone(),
223                false,
224                &HashSet::new(),
225                &self.epoch_store.epoch(),
226                0,
227                input_objects,
228                gas,
229                gas_status,
230                kind,
231                signer,
232                *executable.digest(),
233                &mut None,
234            );
235        assert!(effects.status().is_ok());
236        store.commit_objects(inner_temp_store);
237        effects
238    }
239
240    pub async fn sign_transaction(&self, transaction: Transaction) -> HandleTransactionResponse {
241        self.validator_service
242            .handle_transaction_for_benchmarking(transaction)
243            .await
244            .unwrap()
245            .into_inner()
246    }
247
248    pub(crate) async fn build_checkpoints(
249        &self,
250        transactions: Vec<CertifiedTransaction>,
251        mut all_effects: BTreeMap<TransactionDigest, TransactionEffects>,
252        checkpoint_size: usize,
253    ) -> Vec<(VerifiedCheckpoint, VerifiedCheckpointContents)> {
254        let mut builder = MockCheckpointBuilder::new(
255            self.get_validator()
256                .get_checkpoint_store()
257                .get_latest_certified_checkpoint()
258                .unwrap(),
259        );
260        let mut checkpoints = vec![];
261        for transaction in transactions {
262            let effects = all_effects.remove(transaction.digest()).unwrap();
263            builder.push_transaction(
264                VerifiedTransaction::new_unchecked(transaction.into_unsigned()),
265                effects,
266            );
267            if builder.size() == checkpoint_size {
268                let (checkpoint, _, full_contents) = builder.build(self, 0);
269                checkpoints.push((checkpoint, full_contents));
270            }
271        }
272        if builder.size() > 0 {
273            let (checkpoint, _, full_contents) = builder.build(self, 0);
274            checkpoints.push((checkpoint, full_contents));
275        }
276        checkpoints
277    }
278
279    pub fn create_checkpoint_executor(
280        &self,
281    ) -> (CheckpointExecutor, broadcast::Sender<VerifiedCheckpoint>) {
282        let validator = self.get_validator();
283        let (ckpt_sender, ckpt_receiver) = broadcast::channel(1000000);
284        let checkpoint_executor = CheckpointExecutor::new_for_tests(
285            ckpt_receiver,
286            validator.get_checkpoint_store().clone(),
287            validator.clone(),
288            Arc::new(StateAccumulator::new_for_tests(
289                validator.get_accumulator_store().clone(),
290            )),
291        );
292        (checkpoint_executor, ckpt_sender)
293    }
294
295    pub(crate) fn create_in_memory_store(&self) -> InMemoryObjectStore {
296        let objects: HashMap<_, _> = self
297            .get_validator()
298            .get_accumulator_store()
299            .iter_cached_live_object_set_for_testing()
300            .map(|o| match o {
301                LiveObject::Normal(object) => (object.id(), object),
302                LiveObject::Wrapped(_) => unreachable!(),
303            })
304            .collect();
305        InMemoryObjectStore::new(objects)
306    }
307
308    pub(crate) async fn assigned_shared_object_versions(
309        &self,
310        transactions: &[CertifiedTransaction],
311    ) {
312        let transactions: Vec<_> = transactions
313            .iter()
314            .map(|tx| {
315                VerifiedExecutableTransaction::new_from_certificate(
316                    VerifiedCertificate::new_unchecked(tx.clone()),
317                )
318            })
319            .collect();
320        self.epoch_store
321            .assign_shared_object_versions_idempotent(
322                self.get_validator().get_object_cache_reader().as_ref(),
323                &transactions,
324            )
325            .await
326            .unwrap();
327    }
328}
329
330impl ValidatorKeypairProvider for SingleValidator {
331    fn get_validator_key(&self, name: &AuthorityName) -> &dyn Signer<AuthoritySignature> {
332        assert_eq!(name, &self.get_validator().name);
333        &*self.get_validator().secret
334    }
335
336    fn get_committee(&self) -> &Committee {
337        self.epoch_store.committee().as_ref()
338    }
339}