iota_single_node_benchmark/
benchmark_context.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    ops::Deref,
8    sync::Arc,
9};
10
11use futures::{StreamExt, stream::FuturesUnordered};
12use iota_config::node::RunWithRange;
13use iota_test_transaction_builder::PublishData;
14use iota_types::{
15    base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
16    effects::{TransactionEffects, TransactionEffectsAPI},
17    messages_grpc::HandleTransactionResponse,
18    mock_checkpoint_builder::ValidatorKeypairProvider,
19    transaction::{CertifiedTransaction, SignedTransaction, Transaction, VerifiedTransaction},
20};
21use tracing::info;
22
23use crate::{
24    command::Component,
25    mock_account::{Account, batch_create_account_and_gas},
26    mock_storage::InMemoryObjectStore,
27    single_node::SingleValidator,
28    tx_generator::{RootObjectCreateTxGenerator, SharedObjectCreateTxGenerator, TxGenerator},
29    workload::Workload,
30};
31
32pub struct BenchmarkContext {
33    validator: SingleValidator,
34    user_accounts: BTreeMap<IotaAddress, Account>,
35    admin_account: Account,
36    benchmark_component: Component,
37}
38
39impl BenchmarkContext {
40    pub(crate) async fn new(
41        workload: Workload,
42        benchmark_component: Component,
43        print_sample_tx: bool,
44    ) -> Self {
45        // Reserve 1 account for package publishing.
46        let mut num_accounts = workload.num_accounts() + 1;
47        if print_sample_tx {
48            // Reserver another one to generate a sample transaction.
49            num_accounts += 1;
50        }
51        let gas_object_num_per_account = workload.gas_object_num_per_account();
52        let total = num_accounts * gas_object_num_per_account;
53
54        info!(
55            "Creating {} accounts and {} gas objects",
56            num_accounts, total
57        );
58        let (mut user_accounts, genesis_gas_objects) =
59            batch_create_account_and_gas(num_accounts, gas_object_num_per_account).await;
60        assert_eq!(genesis_gas_objects.len() as u64, total);
61        let (_, admin_account) = user_accounts.pop_last().unwrap();
62
63        info!("Initializing validator");
64        let validator = SingleValidator::new(&genesis_gas_objects[..], benchmark_component).await;
65
66        Self {
67            validator,
68            user_accounts,
69            admin_account,
70            benchmark_component,
71        }
72    }
73
74    pub(crate) fn validator(&self) -> SingleValidator {
75        self.validator.clone()
76    }
77
78    pub(crate) async fn publish_package(&mut self, publish_data: PublishData) -> ObjectRef {
79        let mut gas_objects = self.admin_account.gas_objects.deref().clone();
80        let (package, updated_gas) = self
81            .validator
82            .publish_package(
83                publish_data,
84                self.admin_account.sender,
85                &self.admin_account.keypair,
86                gas_objects[0],
87            )
88            .await;
89        gas_objects[0] = updated_gas;
90        self.admin_account.gas_objects = Arc::new(gas_objects);
91        package
92    }
93
94    /// In order to benchmark transactions that can read dynamic fields, we must
95    /// first create a root object with dynamic fields for each account
96    /// address.
97    pub(crate) async fn preparing_dynamic_fields(
98        &mut self,
99        move_package: ObjectID,
100        num_dynamic_fields: u64,
101    ) -> HashMap<IotaAddress, ObjectRef> {
102        let mut root_objects = HashMap::new();
103
104        if num_dynamic_fields == 0 {
105            return root_objects;
106        }
107
108        info!("Preparing root object with dynamic fields");
109        let root_object_create_transactions = self
110            .generate_transactions(Arc::new(RootObjectCreateTxGenerator::new(
111                move_package,
112                num_dynamic_fields,
113            )))
114            .await;
115        let results = self
116            .execute_raw_transactions(root_object_create_transactions)
117            .await;
118        let mut new_gas_objects = HashMap::new();
119        for effects in results {
120            self.validator()
121                .get_validator()
122                .get_cache_commit()
123                .commit_transaction_outputs(
124                    effects.executed_epoch(),
125                    &[*effects.transaction_digest()],
126                );
127            let (owner, root_object) = effects
128                .created()
129                .into_iter()
130                .filter_map(|(oref, owner)| {
131                    owner
132                        .get_address_owner_address()
133                        .ok()
134                        .map(|owner| (owner, oref))
135                })
136                .next()
137                .unwrap();
138            root_objects.insert(owner, root_object);
139            let gas_object = effects.gas_object().0;
140            new_gas_objects.insert(gas_object.0, gas_object);
141        }
142        self.refresh_gas_objects(new_gas_objects);
143        info!("Finished preparing root object with dynamic fields");
144        root_objects
145    }
146
147    pub(crate) async fn prepare_shared_objects(
148        &mut self,
149        move_package: ObjectID,
150        num_shared_objects: usize,
151    ) -> Vec<(ObjectID, SequenceNumber)> {
152        let mut shared_objects = Vec::new();
153
154        if num_shared_objects == 0 {
155            return shared_objects;
156        }
157        assert!(num_shared_objects <= self.user_accounts.len());
158
159        info!("Preparing shared objects");
160        let generator = SharedObjectCreateTxGenerator::new(move_package);
161        let shared_object_create_transactions: Vec<_> = self
162            .user_accounts
163            .values()
164            .take(num_shared_objects)
165            .map(|account| generator.generate_tx(account.clone()))
166            .collect();
167        let results = self
168            .execute_raw_transactions(shared_object_create_transactions)
169            .await;
170        let mut new_gas_objects = HashMap::new();
171        let epoch_id = self.validator.get_epoch_store().epoch();
172        let cache_commit = self.validator.get_validator().get_cache_commit();
173        for effects in results {
174            let shared_object = effects
175                .created()
176                .into_iter()
177                .filter_map(|(oref, owner)| {
178                    if owner.is_shared() {
179                        Some((oref.0, oref.1))
180                    } else {
181                        None
182                    }
183                })
184                .next()
185                .unwrap();
186            shared_objects.push(shared_object);
187            let gas_object = effects.gas_object().0;
188            new_gas_objects.insert(gas_object.0, gas_object);
189            // Make sure to commit them to DB. This is needed by both the execution-only
190            // mode and the checkpoint-executor mode. For execution-only mode,
191            // we iterate through all live objects to construct the in memory
192            // object store, hence requiring these objects committed to DB.
193            // For checkpoint executor, in order to commit a checkpoint it is required
194            // previous versions of objects are already committed.
195            cache_commit.commit_transaction_outputs(epoch_id, &[*effects.transaction_digest()]);
196        }
197        self.refresh_gas_objects(new_gas_objects);
198        info!("Finished preparing shared objects");
199        shared_objects
200    }
201
202    pub(crate) async fn generate_transactions(
203        &self,
204        tx_generator: Arc<dyn TxGenerator>,
205    ) -> Vec<Transaction> {
206        info!(
207            "{}: Creating {} transactions",
208            tx_generator.name(),
209            self.user_accounts.len()
210        );
211        let tasks: FuturesUnordered<_> = self
212            .user_accounts
213            .values()
214            .map(|account| {
215                let account = account.clone();
216                let tx_generator = tx_generator.clone();
217                tokio::spawn(async move { tx_generator.generate_tx(account) })
218            })
219            .collect();
220        let results: Vec<_> = tasks.collect().await;
221        results.into_iter().map(|r| r.unwrap()).collect()
222    }
223
224    pub(crate) async fn certify_transactions(
225        &self,
226        transactions: Vec<Transaction>,
227        skip_signing: bool,
228    ) -> Vec<CertifiedTransaction> {
229        info!("Creating transaction certificates");
230        let tasks: FuturesUnordered<_> = transactions
231            .into_iter()
232            .map(|tx| {
233                let validator = self.validator();
234                tokio::spawn(async move {
235                    let committee = validator.get_committee();
236                    let validator_state = validator.get_validator();
237                    let sig = if skip_signing {
238                        SignedTransaction::sign(
239                            0,
240                            &tx,
241                            &*validator_state.secret,
242                            validator_state.name,
243                        )
244                    } else {
245                        let verified_tx = VerifiedTransaction::new_unchecked(tx.clone());
246                        validator_state
247                            .handle_transaction(validator.get_epoch_store(), verified_tx)
248                            .await
249                            .unwrap()
250                            .status
251                            .into_signed_for_testing()
252                    };
253                    CertifiedTransaction::new(tx.into_data(), vec![sig], committee).unwrap()
254                })
255            })
256            .collect();
257        let results: Vec<_> = tasks.collect().await;
258        results.into_iter().map(|r| r.unwrap()).collect()
259    }
260
261    pub(crate) async fn benchmark_transaction_execution(
262        &self,
263        transactions: Vec<CertifiedTransaction>,
264        print_sample_tx: bool,
265    ) {
266        if print_sample_tx {
267            // We must use remove(0) in case there are shared objects and the transactions
268            // must be executed in order.
269            self.execute_sample_transaction(transactions[0].clone())
270                .await;
271        }
272
273        let tx_count = transactions.len();
274        let start_time = std::time::Instant::now();
275        info!(
276            "Started executing {} transactions. You can now attach a profiler",
277            transactions.len()
278        );
279
280        let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
281        if has_shared_object {
282            // With shared objects, we must execute each transaction in order.
283            for transaction in transactions {
284                self.validator
285                    .execute_certificate(transaction, self.benchmark_component)
286                    .await;
287            }
288        } else {
289            let tasks: FuturesUnordered<_> = transactions
290                .into_iter()
291                .map(|tx| {
292                    let validator = self.validator();
293                    let component = self.benchmark_component;
294                    tokio::spawn(async move { validator.execute_certificate(tx, component).await })
295                })
296                .collect();
297            let results: Vec<_> = tasks.collect().await;
298            results.into_iter().for_each(|r| {
299                r.unwrap();
300            });
301        }
302
303        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
304        info!(
305            "Execution finished in {}s, TPS={}",
306            elapsed,
307            tx_count as f64 / elapsed
308        );
309    }
310
311    pub(crate) async fn benchmark_transaction_execution_in_memory(
312        &self,
313        transactions: Vec<CertifiedTransaction>,
314        print_sample_tx: bool,
315    ) {
316        if print_sample_tx {
317            self.execute_sample_transaction(transactions[0].clone())
318                .await;
319        }
320
321        let tx_count = transactions.len();
322        let in_memory_store = self.validator.create_in_memory_store();
323        let start_time = std::time::Instant::now();
324        info!(
325            "Started executing {} transactions. You can now attach a profiler",
326            transactions.len()
327        );
328
329        self.execute_transactions_in_memory(in_memory_store.clone(), transactions)
330            .await;
331
332        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
333        info!(
334            "Execution finished in {}s, TPS={}, number of DB object reads per transaction: {}",
335            elapsed,
336            tx_count as f64 / elapsed,
337            in_memory_store.get_num_object_reads() as f64 / tx_count as f64
338        );
339    }
340
341    /// Print out a sample transaction and its effects so that we can get a
342    /// rough idea what we are measuring.
343    async fn execute_sample_transaction(&self, sample_transaction: CertifiedTransaction) {
344        info!(
345            "Sample transaction digest={:?}: {:?}",
346            sample_transaction.digest(),
347            sample_transaction.data()
348        );
349        let effects = self
350            .validator()
351            .execute_dry_run(sample_transaction.into_unsigned())
352            .await;
353        info!("Sample effects: {:?}\n\n", effects);
354        assert!(effects.status().is_ok());
355    }
356
357    /// Benchmark parallel signing a vector of transactions and measure the TPS.
358    pub(crate) async fn benchmark_transaction_signing(
359        &self,
360        transactions: Vec<Transaction>,
361        print_sample_tx: bool,
362    ) {
363        if print_sample_tx {
364            let sample_transaction = &transactions[0];
365            info!("Sample transaction: {:?}", sample_transaction.data());
366        }
367
368        let tx_count = transactions.len();
369        let start_time = std::time::Instant::now();
370        self.validator_sign_transactions(transactions).await;
371        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
372        info!(
373            "Transaction signing finished in {}s, TPS={}.",
374            elapsed,
375            tx_count as f64 / elapsed,
376        );
377    }
378
379    pub(crate) async fn benchmark_checkpoint_executor(
380        &self,
381        transactions: Vec<CertifiedTransaction>,
382        checkpoint_size: usize,
383    ) {
384        self.execute_sample_transaction(transactions[0].clone())
385            .await;
386
387        info!("Executing all transactions to generate effects");
388        let tx_count = transactions.len();
389        let in_memory_store = self.validator.create_in_memory_store();
390        let effects: BTreeMap<_, _> = self
391            .execute_transactions_in_memory(in_memory_store.clone(), transactions.clone())
392            .await
393            .into_iter()
394            .map(|e| (*e.transaction_digest(), e))
395            .collect();
396
397        info!("Building checkpoints");
398        let validator = self.validator();
399        let checkpoints = validator
400            .build_checkpoints(transactions, effects, checkpoint_size)
401            .await;
402        info!("Built {} checkpoints", checkpoints.len());
403        let last_checkpoint_seq = *checkpoints.last().unwrap().0.sequence_number();
404        let checkpoint_executor = validator.create_checkpoint_executor();
405        for (checkpoint, contents) in checkpoints {
406            let state = validator.get_validator();
407            state
408                .get_checkpoint_store()
409                .insert_verified_checkpoint(&checkpoint)
410                .unwrap();
411            state
412                .get_state_sync_store()
413                .multi_insert_transaction_and_effects(contents.transactions());
414            state
415                .get_checkpoint_store()
416                .insert_verified_checkpoint_contents(&checkpoint, contents)
417                .unwrap();
418            state
419                .get_checkpoint_store()
420                .update_highest_synced_checkpoint(&checkpoint)
421                .unwrap();
422        }
423        let start_time = std::time::Instant::now();
424        info!("Starting checkpoint execution. You can now attach a profiler");
425        checkpoint_executor
426            .run_epoch(Some(RunWithRange::Checkpoint(last_checkpoint_seq)))
427            .await;
428        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
429        info!(
430            "Checkpoint execution finished in {}s, TPS={}.",
431            elapsed,
432            tx_count as f64 / elapsed,
433        );
434    }
435
436    async fn execute_raw_transactions(
437        &self,
438        transactions: Vec<Transaction>,
439    ) -> Vec<TransactionEffects> {
440        let tasks: FuturesUnordered<_> = transactions
441            .into_iter()
442            .map(|tx| {
443                let validator = self.validator();
444                tokio::spawn(async move { validator.execute_raw_transaction(tx).await })
445            })
446            .collect();
447        let results: Vec<_> = tasks.collect().await;
448        results.into_iter().map(|r| r.unwrap()).collect()
449    }
450
451    async fn execute_transactions_in_memory(
452        &self,
453        store: InMemoryObjectStore,
454        transactions: Vec<CertifiedTransaction>,
455    ) -> Vec<TransactionEffects> {
456        let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
457        if has_shared_object {
458            // With shared objects, we must execute each transaction in order.
459            let mut effects = Vec::new();
460            for transaction in transactions {
461                effects.push(
462                    self.validator
463                        .execute_transaction_in_memory(store.clone(), transaction)
464                        .await,
465                );
466            }
467            effects
468        } else {
469            let tasks: FuturesUnordered<_> = transactions
470                .into_iter()
471                .map(|tx| {
472                    let store = store.clone();
473                    let validator = self.validator();
474                    tokio::spawn(
475                        async move { validator.execute_transaction_in_memory(store, tx).await },
476                    )
477                })
478                .collect();
479            let results: Vec<_> = tasks.collect().await;
480            results.into_iter().map(|r| r.unwrap()).collect()
481        }
482    }
483
484    fn refresh_gas_objects(&mut self, mut new_gas_objects: HashMap<ObjectID, ObjectRef>) {
485        info!("Refreshing gas objects");
486        for account in self.user_accounts.values_mut() {
487            let refreshed_gas_objects: Vec<_> = account
488                .gas_objects
489                .iter()
490                .map(|oref| {
491                    if let Some(new_oref) = new_gas_objects.remove(&oref.0) {
492                        new_oref
493                    } else {
494                        *oref
495                    }
496                })
497                .collect();
498            account.gas_objects = Arc::new(refreshed_gas_objects);
499        }
500    }
501    pub(crate) async fn validator_sign_transactions(
502        &self,
503        transactions: Vec<Transaction>,
504    ) -> Vec<HandleTransactionResponse> {
505        info!(
506            "Started signing {} transactions. You can now attach a profiler",
507            transactions.len(),
508        );
509        let tasks: FuturesUnordered<_> = transactions
510            .into_iter()
511            .map(|tx| {
512                let validator = self.validator();
513                tokio::spawn(async move { validator.sign_transaction(tx).await })
514            })
515            .collect();
516        let results: Vec<_> = tasks.collect().await;
517        results.into_iter().map(|r| r.unwrap()).collect()
518    }
519}