Skip to main content

iota_single_node_benchmark/
benchmark_context.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    ops::Deref,
8    sync::Arc,
9};
10
11use futures::{StreamExt, stream::FuturesUnordered};
12use iota_config::node::RunWithRange;
13use iota_test_transaction_builder::PublishData;
14use iota_types::{
15    base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
16    effects::{TransactionEffects, TransactionEffectsAPI},
17    messages_grpc::HandleTransactionResponse,
18    mock_checkpoint_builder::ValidatorKeypairProvider,
19    transaction::{CertifiedTransaction, SignedTransaction, Transaction, VerifiedTransaction},
20};
21use tracing::info;
22
23use crate::{
24    command::Component,
25    mock_account::{Account, batch_create_account_and_gas},
26    mock_storage::InMemoryObjectStore,
27    single_node::SingleValidator,
28    tx_generator::{RootObjectCreateTxGenerator, SharedObjectCreateTxGenerator, TxGenerator},
29    workload::Workload,
30};
31
32pub struct BenchmarkContext {
33    validator: SingleValidator,
34    user_accounts: BTreeMap<IotaAddress, Account>,
35    admin_account: Account,
36    benchmark_component: Component,
37}
38
39impl BenchmarkContext {
40    pub(crate) async fn new(
41        workload: Workload,
42        benchmark_component: Component,
43        print_sample_tx: bool,
44    ) -> Self {
45        // Reserve 1 account for package publishing.
46        let mut num_accounts = workload.num_accounts() + 1;
47        if print_sample_tx {
48            // Reserver another one to generate a sample transaction.
49            num_accounts += 1;
50        }
51        let gas_object_num_per_account = workload.gas_object_num_per_account();
52        let total = num_accounts * gas_object_num_per_account;
53
54        info!(
55            "Creating {} accounts and {} gas objects",
56            num_accounts, total
57        );
58        let (mut user_accounts, genesis_gas_objects) =
59            batch_create_account_and_gas(num_accounts, gas_object_num_per_account).await;
60        assert_eq!(genesis_gas_objects.len() as u64, total);
61        let (_, admin_account) = user_accounts.pop_last().unwrap();
62
63        info!("Initializing validator");
64        let validator = SingleValidator::new(&genesis_gas_objects[..], benchmark_component).await;
65
66        Self {
67            validator,
68            user_accounts,
69            admin_account,
70            benchmark_component,
71        }
72    }
73
74    pub(crate) fn validator(&self) -> SingleValidator {
75        self.validator.clone()
76    }
77
78    pub(crate) async fn publish_package(&mut self, publish_data: PublishData) -> ObjectRef {
79        let mut gas_objects = self.admin_account.gas_objects.deref().clone();
80        let (package, updated_gas) = self
81            .validator
82            .publish_package(
83                publish_data,
84                self.admin_account.sender,
85                &self.admin_account.keypair,
86                gas_objects[0],
87            )
88            .await;
89        gas_objects[0] = updated_gas;
90        self.admin_account.gas_objects = Arc::new(gas_objects);
91        package
92    }
93
94    /// In order to benchmark transactions that can read dynamic fields, we must
95    /// first create a root object with dynamic fields for each account
96    /// address.
97    pub(crate) async fn preparing_dynamic_fields(
98        &mut self,
99        move_package: ObjectID,
100        num_dynamic_fields: u64,
101    ) -> HashMap<IotaAddress, ObjectRef> {
102        let mut root_objects = HashMap::new();
103
104        if num_dynamic_fields == 0 {
105            return root_objects;
106        }
107
108        info!("Preparing root object with dynamic fields");
109        let root_object_create_transactions = self
110            .generate_transactions(Arc::new(RootObjectCreateTxGenerator::new(
111                move_package,
112                num_dynamic_fields,
113            )))
114            .await;
115        let results = self
116            .execute_raw_transactions(root_object_create_transactions)
117            .await;
118        let mut new_gas_objects = HashMap::new();
119        let cache_commit = self.validator().get_validator().get_cache_commit().clone();
120        for effects in results {
121            let batch =
122                cache_commit.build_db_batch(effects.epoch(), &[*effects.transaction_digest()]);
123
124            cache_commit.commit_transaction_outputs(
125                effects.epoch(),
126                batch,
127                &[*effects.transaction_digest()],
128            );
129
130            let (owner, root_object) = effects
131                .created()
132                .into_iter()
133                .filter_map(|(oref, owner)| owner.as_address_opt().map(|owner| (*owner, oref)))
134                .next()
135                .unwrap();
136            root_objects.insert(owner, root_object);
137            let gas_object = effects.gas_object().0;
138            new_gas_objects.insert(gas_object.object_id, gas_object);
139        }
140        self.refresh_gas_objects(new_gas_objects);
141        info!("Finished preparing root object with dynamic fields");
142        root_objects
143    }
144
145    pub(crate) async fn prepare_shared_objects(
146        &mut self,
147        move_package: ObjectID,
148        num_shared_objects: usize,
149    ) -> Vec<(ObjectID, SequenceNumber)> {
150        let mut shared_objects = Vec::new();
151
152        if num_shared_objects == 0 {
153            return shared_objects;
154        }
155        assert!(num_shared_objects <= self.user_accounts.len());
156
157        info!("Preparing shared objects");
158        let generator = SharedObjectCreateTxGenerator::new(move_package);
159        let shared_object_create_transactions: Vec<_> = self
160            .user_accounts
161            .values()
162            .take(num_shared_objects)
163            .map(|account| generator.generate_tx(account.clone()))
164            .collect();
165        let results = self
166            .execute_raw_transactions(shared_object_create_transactions)
167            .await;
168        let mut new_gas_objects = HashMap::new();
169        let cache_commit = self.validator.get_validator().get_cache_commit();
170        for effects in results {
171            let shared_object = effects
172                .created()
173                .into_iter()
174                .filter_map(|(oref, owner)| {
175                    if owner.is_shared() {
176                        Some((oref.object_id, oref.version))
177                    } else {
178                        None
179                    }
180                })
181                .next()
182                .unwrap();
183            shared_objects.push(shared_object);
184            let gas_object = effects.gas_object().0;
185            new_gas_objects.insert(gas_object.object_id, gas_object);
186            // Make sure to commit them to DB. This is needed by both the execution-only
187            // mode and the checkpoint-executor mode. For execution-only mode,
188            // we iterate through all live objects to construct the in memory
189            // object store, hence requiring these objects committed to DB.
190            // For checkpoint executor, in order to commit a checkpoint it is required
191            // previous versions of objects are already committed.
192            let batch =
193                cache_commit.build_db_batch(effects.epoch(), &[*effects.transaction_digest()]);
194            cache_commit.commit_transaction_outputs(
195                effects.epoch(),
196                batch,
197                &[*effects.transaction_digest()],
198            );
199        }
200        self.refresh_gas_objects(new_gas_objects);
201        info!("Finished preparing shared objects");
202        shared_objects
203    }
204
205    pub(crate) async fn generate_transactions(
206        &self,
207        tx_generator: Arc<dyn TxGenerator>,
208    ) -> Vec<Transaction> {
209        info!(
210            "{}: Creating {} transactions",
211            tx_generator.name(),
212            self.user_accounts.len()
213        );
214        let tasks: FuturesUnordered<_> = self
215            .user_accounts
216            .values()
217            .map(|account| {
218                let account = account.clone();
219                let tx_generator = tx_generator.clone();
220                tokio::spawn(async move { tx_generator.generate_tx(account) })
221            })
222            .collect();
223        let results: Vec<_> = tasks.collect().await;
224        results.into_iter().map(|r| r.unwrap()).collect()
225    }
226
227    pub(crate) async fn certify_transactions(
228        &self,
229        transactions: Vec<Transaction>,
230        skip_signing: bool,
231    ) -> Vec<CertifiedTransaction> {
232        info!("Creating transaction certificates");
233        let tasks: FuturesUnordered<_> = transactions
234            .into_iter()
235            .map(|tx| {
236                let validator = self.validator();
237                tokio::spawn(async move {
238                    let committee = validator.get_committee();
239                    let validator_state = validator.get_validator();
240                    let sig = if skip_signing {
241                        SignedTransaction::sign(
242                            0,
243                            &tx,
244                            &*validator_state.secret,
245                            validator_state.name,
246                        )
247                    } else {
248                        let verified_tx = VerifiedTransaction::new_unchecked(tx.clone());
249                        validator_state
250                            .handle_transaction(validator.get_epoch_store(), verified_tx)
251                            .await
252                            .unwrap()
253                            .status
254                            .into_signed_for_testing()
255                    };
256                    CertifiedTransaction::new(tx.into_data(), vec![sig], committee).unwrap()
257                })
258            })
259            .collect();
260        let results: Vec<_> = tasks.collect().await;
261        results.into_iter().map(|r| r.unwrap()).collect()
262    }
263
264    pub(crate) async fn benchmark_transaction_execution(
265        &self,
266        transactions: Vec<CertifiedTransaction>,
267        print_sample_tx: bool,
268    ) {
269        if print_sample_tx {
270            // We must use remove(0) in case there are shared objects and the transactions
271            // must be executed in order.
272            self.execute_sample_transaction(transactions[0].clone())
273                .await;
274        }
275
276        let tx_count = transactions.len();
277        let start_time = std::time::Instant::now();
278        info!(
279            "Started executing {} transactions. You can now attach a profiler",
280            transactions.len()
281        );
282
283        let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
284        if has_shared_object {
285            // With shared objects, we must execute each transaction in order.
286            for transaction in transactions {
287                self.validator
288                    .execute_certificate(transaction, self.benchmark_component)
289                    .await;
290            }
291        } else {
292            let tasks: FuturesUnordered<_> = transactions
293                .into_iter()
294                .map(|tx| {
295                    let validator = self.validator();
296                    let component = self.benchmark_component;
297                    tokio::spawn(async move { validator.execute_certificate(tx, component).await })
298                })
299                .collect();
300            let results: Vec<_> = tasks.collect().await;
301            results.into_iter().for_each(|r| {
302                r.unwrap();
303            });
304        }
305
306        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
307        info!(
308            "Execution finished in {}s, TPS={}",
309            elapsed,
310            tx_count as f64 / elapsed
311        );
312    }
313
314    pub(crate) async fn benchmark_transaction_execution_in_memory(
315        &self,
316        transactions: Vec<CertifiedTransaction>,
317        print_sample_tx: bool,
318    ) {
319        if print_sample_tx {
320            self.execute_sample_transaction(transactions[0].clone())
321                .await;
322        }
323
324        let tx_count = transactions.len();
325        let in_memory_store = self.validator.create_in_memory_store();
326        let start_time = std::time::Instant::now();
327        info!(
328            "Started executing {} transactions. You can now attach a profiler",
329            transactions.len()
330        );
331
332        self.execute_transactions_in_memory(in_memory_store.clone(), transactions)
333            .await;
334
335        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
336        info!(
337            "Execution finished in {}s, TPS={}, number of DB object reads per transaction: {}",
338            elapsed,
339            tx_count as f64 / elapsed,
340            in_memory_store.get_num_object_reads() as f64 / tx_count as f64
341        );
342    }
343
344    /// Print out a sample transaction and its effects so that we can get a
345    /// rough idea what we are measuring.
346    async fn execute_sample_transaction(&self, sample_transaction: CertifiedTransaction) {
347        info!(
348            "Sample transaction digest={:?}: {:?}",
349            sample_transaction.digest(),
350            sample_transaction.data()
351        );
352        let effects = self
353            .validator()
354            .execute_dry_run(sample_transaction.into_unsigned())
355            .await;
356        info!("Sample effects: {:?}\n\n", effects);
357        assert!(effects.status().is_success());
358    }
359
360    /// Benchmark parallel signing a vector of transactions and measure the TPS.
361    pub(crate) async fn benchmark_transaction_signing(
362        &self,
363        transactions: Vec<Transaction>,
364        print_sample_tx: bool,
365    ) {
366        if print_sample_tx {
367            let sample_transaction = &transactions[0];
368            info!("Sample transaction: {:?}", sample_transaction.data());
369        }
370
371        let tx_count = transactions.len();
372        let start_time = std::time::Instant::now();
373        self.validator_sign_transactions(transactions).await;
374        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
375        info!(
376            "Transaction signing finished in {}s, TPS={}.",
377            elapsed,
378            tx_count as f64 / elapsed,
379        );
380    }
381
382    pub(crate) async fn benchmark_checkpoint_executor(
383        &self,
384        transactions: Vec<CertifiedTransaction>,
385        checkpoint_size: usize,
386    ) {
387        self.execute_sample_transaction(transactions[0].clone())
388            .await;
389
390        info!("Executing all transactions to generate effects");
391        let tx_count = transactions.len();
392        let in_memory_store = self.validator.create_in_memory_store();
393        let effects: BTreeMap<_, _> = self
394            .execute_transactions_in_memory(in_memory_store.clone(), transactions.clone())
395            .await
396            .into_iter()
397            .map(|e| (*e.transaction_digest(), e))
398            .collect();
399
400        info!("Building checkpoints");
401        let validator = self.validator();
402        let checkpoints = validator
403            .build_checkpoints(transactions, effects, checkpoint_size)
404            .await;
405        info!("Built {} checkpoints", checkpoints.len());
406        let last_checkpoint_seq = *checkpoints.last().unwrap().0.sequence_number();
407        let checkpoint_executor = validator.create_checkpoint_executor();
408        for (checkpoint, contents) in checkpoints {
409            let state = validator.get_validator();
410            state
411                .get_checkpoint_store()
412                .insert_verified_checkpoint(&checkpoint)
413                .unwrap();
414            state
415                .get_state_sync_store()
416                .multi_insert_transaction_and_effects(contents.transactions());
417            state
418                .get_checkpoint_store()
419                .insert_verified_checkpoint_contents(&checkpoint, contents)
420                .unwrap();
421            state
422                .get_checkpoint_store()
423                .update_highest_synced_checkpoint(&checkpoint)
424                .unwrap();
425        }
426        let start_time = std::time::Instant::now();
427        info!("Starting checkpoint execution. You can now attach a profiler");
428        checkpoint_executor
429            .run_epoch(Some(RunWithRange::Checkpoint(last_checkpoint_seq)))
430            .await;
431        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
432        info!(
433            "Checkpoint execution finished in {}s, TPS={}.",
434            elapsed,
435            tx_count as f64 / elapsed,
436        );
437    }
438
439    async fn execute_raw_transactions(
440        &self,
441        transactions: Vec<Transaction>,
442    ) -> Vec<TransactionEffects> {
443        let tasks: FuturesUnordered<_> = transactions
444            .into_iter()
445            .map(|tx| {
446                let validator = self.validator();
447                tokio::spawn(async move { validator.execute_raw_transaction(tx).await })
448            })
449            .collect();
450        let results: Vec<_> = tasks.collect().await;
451        results.into_iter().map(|r| r.unwrap()).collect()
452    }
453
454    async fn execute_transactions_in_memory(
455        &self,
456        store: InMemoryObjectStore,
457        transactions: Vec<CertifiedTransaction>,
458    ) -> Vec<TransactionEffects> {
459        let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
460        if has_shared_object {
461            // With shared objects, we must execute each transaction in order.
462            let mut effects = Vec::new();
463            for transaction in transactions {
464                effects.push(
465                    self.validator
466                        .execute_transaction_in_memory(store.clone(), transaction)
467                        .await,
468                );
469            }
470            effects
471        } else {
472            let tasks: FuturesUnordered<_> = transactions
473                .into_iter()
474                .map(|tx| {
475                    let store = store.clone();
476                    let validator = self.validator();
477                    tokio::spawn(
478                        async move { validator.execute_transaction_in_memory(store, tx).await },
479                    )
480                })
481                .collect();
482            let results: Vec<_> = tasks.collect().await;
483            results.into_iter().map(|r| r.unwrap()).collect()
484        }
485    }
486
487    fn refresh_gas_objects(&mut self, mut new_gas_objects: HashMap<ObjectID, ObjectRef>) {
488        info!("Refreshing gas objects");
489        for account in self.user_accounts.values_mut() {
490            let refreshed_gas_objects: Vec<_> = account
491                .gas_objects
492                .iter()
493                .map(|oref| {
494                    if let Some(new_oref) = new_gas_objects.remove(&oref.object_id) {
495                        new_oref
496                    } else {
497                        *oref
498                    }
499                })
500                .collect();
501            account.gas_objects = Arc::new(refreshed_gas_objects);
502        }
503    }
504    pub(crate) async fn validator_sign_transactions(
505        &self,
506        transactions: Vec<Transaction>,
507    ) -> Vec<HandleTransactionResponse> {
508        info!(
509            "Started signing {} transactions. You can now attach a profiler",
510            transactions.len(),
511        );
512        let tasks: FuturesUnordered<_> = transactions
513            .into_iter()
514            .map(|tx| {
515                let validator = self.validator();
516                tokio::spawn(async move { validator.sign_transaction(tx).await })
517            })
518            .collect();
519        let results: Vec<_> = tasks.collect().await;
520        results.into_iter().map(|r| r.unwrap()).collect()
521    }
522}