iota_single_node_benchmark/
benchmark_context.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    ops::Deref,
8    sync::Arc,
9};
10
11use futures::{StreamExt, stream::FuturesUnordered};
12use iota_config::node::RunWithRange;
13use iota_test_transaction_builder::PublishData;
14use iota_types::{
15    base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
16    effects::{TransactionEffects, TransactionEffectsAPI},
17    messages_grpc::HandleTransactionResponse,
18    mock_checkpoint_builder::ValidatorKeypairProvider,
19    transaction::{CertifiedTransaction, SignedTransaction, Transaction, VerifiedTransaction},
20};
21use tracing::info;
22
23use crate::{
24    command::Component,
25    mock_account::{Account, batch_create_account_and_gas},
26    mock_storage::InMemoryObjectStore,
27    single_node::SingleValidator,
28    tx_generator::{RootObjectCreateTxGenerator, SharedObjectCreateTxGenerator, TxGenerator},
29    workload::Workload,
30};
31
32pub struct BenchmarkContext {
33    validator: SingleValidator,
34    user_accounts: BTreeMap<IotaAddress, Account>,
35    admin_account: Account,
36    benchmark_component: Component,
37}
38
39impl BenchmarkContext {
40    pub(crate) async fn new(
41        workload: Workload,
42        benchmark_component: Component,
43        print_sample_tx: bool,
44    ) -> Self {
45        // Reserve 1 account for package publishing.
46        let mut num_accounts = workload.num_accounts() + 1;
47        if print_sample_tx {
48            // Reserver another one to generate a sample transaction.
49            num_accounts += 1;
50        }
51        let gas_object_num_per_account = workload.gas_object_num_per_account();
52        let total = num_accounts * gas_object_num_per_account;
53
54        info!(
55            "Creating {} accounts and {} gas objects",
56            num_accounts, total
57        );
58        let (mut user_accounts, genesis_gas_objects) =
59            batch_create_account_and_gas(num_accounts, gas_object_num_per_account).await;
60        assert_eq!(genesis_gas_objects.len() as u64, total);
61        let (_, admin_account) = user_accounts.pop_last().unwrap();
62
63        info!("Initializing validator");
64        let validator = SingleValidator::new(&genesis_gas_objects[..], benchmark_component).await;
65
66        Self {
67            validator,
68            user_accounts,
69            admin_account,
70            benchmark_component,
71        }
72    }
73
74    pub(crate) fn validator(&self) -> SingleValidator {
75        self.validator.clone()
76    }
77
78    pub(crate) async fn publish_package(&mut self, publish_data: PublishData) -> ObjectRef {
79        let mut gas_objects = self.admin_account.gas_objects.deref().clone();
80        let (package, updated_gas) = self
81            .validator
82            .publish_package(
83                publish_data,
84                self.admin_account.sender,
85                &self.admin_account.keypair,
86                gas_objects[0],
87            )
88            .await;
89        gas_objects[0] = updated_gas;
90        self.admin_account.gas_objects = Arc::new(gas_objects);
91        package
92    }
93
94    /// In order to benchmark transactions that can read dynamic fields, we must
95    /// first create a root object with dynamic fields for each account
96    /// address.
97    pub(crate) async fn preparing_dynamic_fields(
98        &mut self,
99        move_package: ObjectID,
100        num_dynamic_fields: u64,
101    ) -> HashMap<IotaAddress, ObjectRef> {
102        let mut root_objects = HashMap::new();
103
104        if num_dynamic_fields == 0 {
105            return root_objects;
106        }
107
108        info!("Preparing root object with dynamic fields");
109        let root_object_create_transactions = self
110            .generate_transactions(Arc::new(RootObjectCreateTxGenerator::new(
111                move_package,
112                num_dynamic_fields,
113            )))
114            .await;
115        let results = self
116            .execute_raw_transactions(root_object_create_transactions)
117            .await;
118        let mut new_gas_objects = HashMap::new();
119        for effects in results {
120            self.validator()
121                .get_validator()
122                .get_cache_commit()
123                .commit_transaction_outputs(
124                    effects.executed_epoch(),
125                    &[*effects.transaction_digest()],
126                )
127                .await
128                .unwrap();
129            let (owner, root_object) = effects
130                .created()
131                .into_iter()
132                .filter_map(|(oref, owner)| {
133                    owner
134                        .get_address_owner_address()
135                        .ok()
136                        .map(|owner| (owner, oref))
137                })
138                .next()
139                .unwrap();
140            root_objects.insert(owner, root_object);
141            let gas_object = effects.gas_object().0;
142            new_gas_objects.insert(gas_object.0, gas_object);
143        }
144        self.refresh_gas_objects(new_gas_objects);
145        info!("Finished preparing root object with dynamic fields");
146        root_objects
147    }
148
149    pub(crate) async fn prepare_shared_objects(
150        &mut self,
151        move_package: ObjectID,
152        num_shared_objects: usize,
153    ) -> Vec<(ObjectID, SequenceNumber)> {
154        let mut shared_objects = Vec::new();
155
156        if num_shared_objects == 0 {
157            return shared_objects;
158        }
159        assert!(num_shared_objects <= self.user_accounts.len());
160
161        info!("Preparing shared objects");
162        let generator = SharedObjectCreateTxGenerator::new(move_package);
163        let shared_object_create_transactions: Vec<_> = self
164            .user_accounts
165            .values()
166            .take(num_shared_objects)
167            .map(|account| generator.generate_tx(account.clone()))
168            .collect();
169        let results = self
170            .execute_raw_transactions(shared_object_create_transactions)
171            .await;
172        let mut new_gas_objects = HashMap::new();
173        let epoch_id = self.validator.get_epoch_store().epoch();
174        let cache_commit = self.validator.get_validator().get_cache_commit();
175        for effects in results {
176            let shared_object = effects
177                .created()
178                .into_iter()
179                .filter_map(|(oref, owner)| {
180                    if owner.is_shared() {
181                        Some((oref.0, oref.1))
182                    } else {
183                        None
184                    }
185                })
186                .next()
187                .unwrap();
188            shared_objects.push(shared_object);
189            let gas_object = effects.gas_object().0;
190            new_gas_objects.insert(gas_object.0, gas_object);
191            // Make sure to commit them to DB. This is needed by both the execution-only
192            // mode and the checkpoint-executor mode. For execution-only mode,
193            // we iterate through all live objects to construct the in memory
194            // object store, hence requiring these objects committed to DB.
195            // For checkpoint executor, in order to commit a checkpoint it is required
196            // previous versions of objects are already committed.
197            cache_commit
198                .commit_transaction_outputs(epoch_id, &[*effects.transaction_digest()])
199                .await
200                .unwrap();
201        }
202        self.refresh_gas_objects(new_gas_objects);
203        info!("Finished preparing shared objects");
204        shared_objects
205    }
206
207    pub(crate) async fn generate_transactions(
208        &self,
209        tx_generator: Arc<dyn TxGenerator>,
210    ) -> Vec<Transaction> {
211        info!(
212            "{}: Creating {} transactions",
213            tx_generator.name(),
214            self.user_accounts.len()
215        );
216        let tasks: FuturesUnordered<_> = self
217            .user_accounts
218            .values()
219            .map(|account| {
220                let account = account.clone();
221                let tx_generator = tx_generator.clone();
222                tokio::spawn(async move { tx_generator.generate_tx(account) })
223            })
224            .collect();
225        let results: Vec<_> = tasks.collect().await;
226        results.into_iter().map(|r| r.unwrap()).collect()
227    }
228
229    pub(crate) async fn certify_transactions(
230        &self,
231        transactions: Vec<Transaction>,
232        skip_signing: bool,
233    ) -> Vec<CertifiedTransaction> {
234        info!("Creating transaction certificates");
235        let tasks: FuturesUnordered<_> = transactions
236            .into_iter()
237            .map(|tx| {
238                let validator = self.validator();
239                tokio::spawn(async move {
240                    let committee = validator.get_committee();
241                    let validator_state = validator.get_validator();
242                    let sig = if skip_signing {
243                        SignedTransaction::sign(
244                            0,
245                            &tx,
246                            &*validator_state.secret,
247                            validator_state.name,
248                        )
249                    } else {
250                        let verified_tx = VerifiedTransaction::new_unchecked(tx.clone());
251                        validator_state
252                            .handle_transaction(validator.get_epoch_store(), verified_tx)
253                            .await
254                            .unwrap()
255                            .status
256                            .into_signed_for_testing()
257                    };
258                    CertifiedTransaction::new(tx.into_data(), vec![sig], committee).unwrap()
259                })
260            })
261            .collect();
262        let results: Vec<_> = tasks.collect().await;
263        results.into_iter().map(|r| r.unwrap()).collect()
264    }
265
266    pub(crate) async fn benchmark_transaction_execution(
267        &self,
268        transactions: Vec<CertifiedTransaction>,
269        print_sample_tx: bool,
270    ) {
271        if print_sample_tx {
272            // We must use remove(0) in case there are shared objects and the transactions
273            // must be executed in order.
274            self.execute_sample_transaction(transactions[0].clone())
275                .await;
276        }
277
278        let tx_count = transactions.len();
279        let start_time = std::time::Instant::now();
280        info!(
281            "Started executing {} transactions. You can now attach a profiler",
282            transactions.len()
283        );
284
285        let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
286        if has_shared_object {
287            // With shared objects, we must execute each transaction in order.
288            for transaction in transactions {
289                self.validator
290                    .execute_certificate(transaction, self.benchmark_component)
291                    .await;
292            }
293        } else {
294            let tasks: FuturesUnordered<_> = transactions
295                .into_iter()
296                .map(|tx| {
297                    let validator = self.validator();
298                    let component = self.benchmark_component;
299                    tokio::spawn(async move { validator.execute_certificate(tx, component).await })
300                })
301                .collect();
302            let results: Vec<_> = tasks.collect().await;
303            results.into_iter().for_each(|r| {
304                r.unwrap();
305            });
306        }
307
308        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
309        info!(
310            "Execution finished in {}s, TPS={}",
311            elapsed,
312            tx_count as f64 / elapsed
313        );
314    }
315
316    pub(crate) async fn benchmark_transaction_execution_in_memory(
317        &self,
318        transactions: Vec<CertifiedTransaction>,
319        print_sample_tx: bool,
320    ) {
321        if print_sample_tx {
322            self.execute_sample_transaction(transactions[0].clone())
323                .await;
324        }
325
326        let tx_count = transactions.len();
327        let in_memory_store = self.validator.create_in_memory_store();
328        let start_time = std::time::Instant::now();
329        info!(
330            "Started executing {} transactions. You can now attach a profiler",
331            transactions.len()
332        );
333
334        self.execute_transactions_in_memory(in_memory_store.clone(), transactions)
335            .await;
336
337        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
338        info!(
339            "Execution finished in {}s, TPS={}, number of DB object reads per transaction: {}",
340            elapsed,
341            tx_count as f64 / elapsed,
342            in_memory_store.get_num_object_reads() as f64 / tx_count as f64
343        );
344    }
345
346    /// Print out a sample transaction and its effects so that we can get a
347    /// rough idea what we are measuring.
348    async fn execute_sample_transaction(&self, sample_transaction: CertifiedTransaction) {
349        info!(
350            "Sample transaction digest={:?}: {:?}",
351            sample_transaction.digest(),
352            sample_transaction.data()
353        );
354        let effects = self
355            .validator()
356            .execute_dry_run(sample_transaction.into_unsigned())
357            .await;
358        info!("Sample effects: {:?}\n\n", effects);
359        assert!(effects.status().is_ok());
360    }
361
362    /// Benchmark parallel signing a vector of transactions and measure the TPS.
363    pub(crate) async fn benchmark_transaction_signing(
364        &self,
365        transactions: Vec<Transaction>,
366        print_sample_tx: bool,
367    ) {
368        if print_sample_tx {
369            let sample_transaction = &transactions[0];
370            info!("Sample transaction: {:?}", sample_transaction.data());
371        }
372
373        let tx_count = transactions.len();
374        let start_time = std::time::Instant::now();
375        self.validator_sign_transactions(transactions).await;
376        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
377        info!(
378            "Transaction signing finished in {}s, TPS={}.",
379            elapsed,
380            tx_count as f64 / elapsed,
381        );
382    }
383
384    pub(crate) async fn benchmark_checkpoint_executor(
385        &self,
386        transactions: Vec<CertifiedTransaction>,
387        checkpoint_size: usize,
388    ) {
389        self.execute_sample_transaction(transactions[0].clone())
390            .await;
391
392        info!("Executing all transactions to generate effects");
393        let tx_count = transactions.len();
394        let in_memory_store = self.validator.create_in_memory_store();
395        let effects: BTreeMap<_, _> = self
396            .execute_transactions_in_memory(in_memory_store.clone(), transactions.clone())
397            .await
398            .into_iter()
399            .map(|e| (*e.transaction_digest(), e))
400            .collect();
401
402        info!("Building checkpoints");
403        let validator = self.validator();
404        let checkpoints = validator
405            .build_checkpoints(transactions, effects, checkpoint_size)
406            .await;
407        info!("Built {} checkpoints", checkpoints.len());
408        let last_checkpoint_seq = *checkpoints.last().unwrap().0.sequence_number();
409        let (mut checkpoint_executor, checkpoint_sender) = validator.create_checkpoint_executor();
410        for (checkpoint, contents) in checkpoints {
411            let state = validator.get_validator();
412            state
413                .get_checkpoint_store()
414                .insert_verified_checkpoint(&checkpoint)
415                .unwrap();
416            state
417                .get_state_sync_store()
418                .multi_insert_transaction_and_effects(contents.transactions())
419                .unwrap();
420            state
421                .get_checkpoint_store()
422                .insert_verified_checkpoint_contents(&checkpoint, contents)
423                .unwrap();
424            state
425                .get_checkpoint_store()
426                .update_highest_synced_checkpoint(&checkpoint)
427                .unwrap();
428            checkpoint_sender.send(checkpoint).unwrap();
429        }
430        let start_time = std::time::Instant::now();
431        info!("Starting checkpoint execution. You can now attach a profiler");
432        checkpoint_executor
433            .run_epoch(
434                validator.get_epoch_store().clone(),
435                Some(RunWithRange::Checkpoint(last_checkpoint_seq)),
436            )
437            .await;
438        let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
439        info!(
440            "Checkpoint execution finished in {}s, TPS={}.",
441            elapsed,
442            tx_count as f64 / elapsed,
443        );
444    }
445
446    async fn execute_raw_transactions(
447        &self,
448        transactions: Vec<Transaction>,
449    ) -> Vec<TransactionEffects> {
450        let tasks: FuturesUnordered<_> = transactions
451            .into_iter()
452            .map(|tx| {
453                let validator = self.validator();
454                tokio::spawn(async move { validator.execute_raw_transaction(tx).await })
455            })
456            .collect();
457        let results: Vec<_> = tasks.collect().await;
458        results.into_iter().map(|r| r.unwrap()).collect()
459    }
460
461    async fn execute_transactions_in_memory(
462        &self,
463        store: InMemoryObjectStore,
464        transactions: Vec<CertifiedTransaction>,
465    ) -> Vec<TransactionEffects> {
466        let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
467        if has_shared_object {
468            // With shared objects, we must execute each transaction in order.
469            let mut effects = Vec::new();
470            for transaction in transactions {
471                effects.push(
472                    self.validator
473                        .execute_transaction_in_memory(store.clone(), transaction)
474                        .await,
475                );
476            }
477            effects
478        } else {
479            let tasks: FuturesUnordered<_> = transactions
480                .into_iter()
481                .map(|tx| {
482                    let store = store.clone();
483                    let validator = self.validator();
484                    tokio::spawn(
485                        async move { validator.execute_transaction_in_memory(store, tx).await },
486                    )
487                })
488                .collect();
489            let results: Vec<_> = tasks.collect().await;
490            results.into_iter().map(|r| r.unwrap()).collect()
491        }
492    }
493
494    fn refresh_gas_objects(&mut self, mut new_gas_objects: HashMap<ObjectID, ObjectRef>) {
495        info!("Refreshing gas objects");
496        for account in self.user_accounts.values_mut() {
497            let refreshed_gas_objects: Vec<_> = account
498                .gas_objects
499                .iter()
500                .map(|oref| {
501                    if let Some(new_oref) = new_gas_objects.remove(&oref.0) {
502                        new_oref
503                    } else {
504                        *oref
505                    }
506                })
507                .collect();
508            account.gas_objects = Arc::new(refreshed_gas_objects);
509        }
510    }
511    pub(crate) async fn validator_sign_transactions(
512        &self,
513        transactions: Vec<Transaction>,
514    ) -> Vec<HandleTransactionResponse> {
515        info!(
516            "Started signing {} transactions. You can now attach a profiler",
517            transactions.len(),
518        );
519        let tasks: FuturesUnordered<_> = transactions
520            .into_iter()
521            .map(|tx| {
522                let validator = self.validator();
523                tokio::spawn(async move { validator.sign_transaction(tx).await })
524            })
525            .collect();
526        let results: Vec<_> = tasks.collect().await;
527        results.into_iter().map(|r| r.unwrap()).collect()
528    }
529}