iota_single_node_benchmark/
benchmark_context.rs1use std::{
6 collections::{BTreeMap, HashMap},
7 ops::Deref,
8 sync::Arc,
9};
10
11use futures::{StreamExt, stream::FuturesUnordered};
12use iota_config::node::RunWithRange;
13use iota_test_transaction_builder::PublishData;
14use iota_types::{
15 base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
16 effects::{TransactionEffects, TransactionEffectsAPI},
17 messages_grpc::HandleTransactionResponse,
18 mock_checkpoint_builder::ValidatorKeypairProvider,
19 transaction::{CertifiedTransaction, SignedTransaction, Transaction, VerifiedTransaction},
20};
21use tracing::info;
22
23use crate::{
24 command::Component,
25 mock_account::{Account, batch_create_account_and_gas},
26 mock_storage::InMemoryObjectStore,
27 single_node::SingleValidator,
28 tx_generator::{RootObjectCreateTxGenerator, SharedObjectCreateTxGenerator, TxGenerator},
29 workload::Workload,
30};
31
32pub struct BenchmarkContext {
33 validator: SingleValidator,
34 user_accounts: BTreeMap<IotaAddress, Account>,
35 admin_account: Account,
36 benchmark_component: Component,
37}
38
39impl BenchmarkContext {
40 pub(crate) async fn new(
41 workload: Workload,
42 benchmark_component: Component,
43 print_sample_tx: bool,
44 ) -> Self {
45 let mut num_accounts = workload.num_accounts() + 1;
47 if print_sample_tx {
48 num_accounts += 1;
50 }
51 let gas_object_num_per_account = workload.gas_object_num_per_account();
52 let total = num_accounts * gas_object_num_per_account;
53
54 info!(
55 "Creating {} accounts and {} gas objects",
56 num_accounts, total
57 );
58 let (mut user_accounts, genesis_gas_objects) =
59 batch_create_account_and_gas(num_accounts, gas_object_num_per_account).await;
60 assert_eq!(genesis_gas_objects.len() as u64, total);
61 let (_, admin_account) = user_accounts.pop_last().unwrap();
62
63 info!("Initializing validator");
64 let validator = SingleValidator::new(&genesis_gas_objects[..], benchmark_component).await;
65
66 Self {
67 validator,
68 user_accounts,
69 admin_account,
70 benchmark_component,
71 }
72 }
73
74 pub(crate) fn validator(&self) -> SingleValidator {
75 self.validator.clone()
76 }
77
78 pub(crate) async fn publish_package(&mut self, publish_data: PublishData) -> ObjectRef {
79 let mut gas_objects = self.admin_account.gas_objects.deref().clone();
80 let (package, updated_gas) = self
81 .validator
82 .publish_package(
83 publish_data,
84 self.admin_account.sender,
85 &self.admin_account.keypair,
86 gas_objects[0],
87 )
88 .await;
89 gas_objects[0] = updated_gas;
90 self.admin_account.gas_objects = Arc::new(gas_objects);
91 package
92 }
93
94 pub(crate) async fn preparing_dynamic_fields(
98 &mut self,
99 move_package: ObjectID,
100 num_dynamic_fields: u64,
101 ) -> HashMap<IotaAddress, ObjectRef> {
102 let mut root_objects = HashMap::new();
103
104 if num_dynamic_fields == 0 {
105 return root_objects;
106 }
107
108 info!("Preparing root object with dynamic fields");
109 let root_object_create_transactions = self
110 .generate_transactions(Arc::new(RootObjectCreateTxGenerator::new(
111 move_package,
112 num_dynamic_fields,
113 )))
114 .await;
115 let results = self
116 .execute_raw_transactions(root_object_create_transactions)
117 .await;
118 let mut new_gas_objects = HashMap::new();
119 let cache_commit = self.validator().get_validator().get_cache_commit().clone();
120 for effects in results {
121 let batch =
122 cache_commit.build_db_batch(effects.epoch(), &[*effects.transaction_digest()]);
123
124 cache_commit.commit_transaction_outputs(
125 effects.epoch(),
126 batch,
127 &[*effects.transaction_digest()],
128 );
129
130 let (owner, root_object) = effects
131 .created()
132 .into_iter()
133 .filter_map(|(oref, owner)| owner.as_address_opt().map(|owner| (*owner, oref)))
134 .next()
135 .unwrap();
136 root_objects.insert(owner, root_object);
137 let gas_object = effects.gas_object().0;
138 new_gas_objects.insert(gas_object.object_id, gas_object);
139 }
140 self.refresh_gas_objects(new_gas_objects);
141 info!("Finished preparing root object with dynamic fields");
142 root_objects
143 }
144
145 pub(crate) async fn prepare_shared_objects(
146 &mut self,
147 move_package: ObjectID,
148 num_shared_objects: usize,
149 ) -> Vec<(ObjectID, SequenceNumber)> {
150 let mut shared_objects = Vec::new();
151
152 if num_shared_objects == 0 {
153 return shared_objects;
154 }
155 assert!(num_shared_objects <= self.user_accounts.len());
156
157 info!("Preparing shared objects");
158 let generator = SharedObjectCreateTxGenerator::new(move_package);
159 let shared_object_create_transactions: Vec<_> = self
160 .user_accounts
161 .values()
162 .take(num_shared_objects)
163 .map(|account| generator.generate_tx(account.clone()))
164 .collect();
165 let results = self
166 .execute_raw_transactions(shared_object_create_transactions)
167 .await;
168 let mut new_gas_objects = HashMap::new();
169 let cache_commit = self.validator.get_validator().get_cache_commit();
170 for effects in results {
171 let shared_object = effects
172 .created()
173 .into_iter()
174 .filter_map(|(oref, owner)| {
175 if owner.is_shared() {
176 Some((oref.object_id, oref.version))
177 } else {
178 None
179 }
180 })
181 .next()
182 .unwrap();
183 shared_objects.push(shared_object);
184 let gas_object = effects.gas_object().0;
185 new_gas_objects.insert(gas_object.object_id, gas_object);
186 let batch =
193 cache_commit.build_db_batch(effects.epoch(), &[*effects.transaction_digest()]);
194 cache_commit.commit_transaction_outputs(
195 effects.epoch(),
196 batch,
197 &[*effects.transaction_digest()],
198 );
199 }
200 self.refresh_gas_objects(new_gas_objects);
201 info!("Finished preparing shared objects");
202 shared_objects
203 }
204
205 pub(crate) async fn generate_transactions(
206 &self,
207 tx_generator: Arc<dyn TxGenerator>,
208 ) -> Vec<Transaction> {
209 info!(
210 "{}: Creating {} transactions",
211 tx_generator.name(),
212 self.user_accounts.len()
213 );
214 let tasks: FuturesUnordered<_> = self
215 .user_accounts
216 .values()
217 .map(|account| {
218 let account = account.clone();
219 let tx_generator = tx_generator.clone();
220 tokio::spawn(async move { tx_generator.generate_tx(account) })
221 })
222 .collect();
223 let results: Vec<_> = tasks.collect().await;
224 results.into_iter().map(|r| r.unwrap()).collect()
225 }
226
227 pub(crate) async fn certify_transactions(
228 &self,
229 transactions: Vec<Transaction>,
230 skip_signing: bool,
231 ) -> Vec<CertifiedTransaction> {
232 info!("Creating transaction certificates");
233 let tasks: FuturesUnordered<_> = transactions
234 .into_iter()
235 .map(|tx| {
236 let validator = self.validator();
237 tokio::spawn(async move {
238 let committee = validator.get_committee();
239 let validator_state = validator.get_validator();
240 let sig = if skip_signing {
241 SignedTransaction::sign(
242 0,
243 &tx,
244 &*validator_state.secret,
245 validator_state.name,
246 )
247 } else {
248 let verified_tx = VerifiedTransaction::new_unchecked(tx.clone());
249 validator_state
250 .handle_transaction(validator.get_epoch_store(), verified_tx)
251 .await
252 .unwrap()
253 .status
254 .into_signed_for_testing()
255 };
256 CertifiedTransaction::new(tx.into_data(), vec![sig], committee).unwrap()
257 })
258 })
259 .collect();
260 let results: Vec<_> = tasks.collect().await;
261 results.into_iter().map(|r| r.unwrap()).collect()
262 }
263
264 pub(crate) async fn benchmark_transaction_execution(
265 &self,
266 transactions: Vec<CertifiedTransaction>,
267 print_sample_tx: bool,
268 ) {
269 if print_sample_tx {
270 self.execute_sample_transaction(transactions[0].clone())
273 .await;
274 }
275
276 let tx_count = transactions.len();
277 let start_time = std::time::Instant::now();
278 info!(
279 "Started executing {} transactions. You can now attach a profiler",
280 transactions.len()
281 );
282
283 let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
284 if has_shared_object {
285 for transaction in transactions {
287 self.validator
288 .execute_certificate(transaction, self.benchmark_component)
289 .await;
290 }
291 } else {
292 let tasks: FuturesUnordered<_> = transactions
293 .into_iter()
294 .map(|tx| {
295 let validator = self.validator();
296 let component = self.benchmark_component;
297 tokio::spawn(async move { validator.execute_certificate(tx, component).await })
298 })
299 .collect();
300 let results: Vec<_> = tasks.collect().await;
301 results.into_iter().for_each(|r| {
302 r.unwrap();
303 });
304 }
305
306 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
307 info!(
308 "Execution finished in {}s, TPS={}",
309 elapsed,
310 tx_count as f64 / elapsed
311 );
312 }
313
314 pub(crate) async fn benchmark_transaction_execution_in_memory(
315 &self,
316 transactions: Vec<CertifiedTransaction>,
317 print_sample_tx: bool,
318 ) {
319 if print_sample_tx {
320 self.execute_sample_transaction(transactions[0].clone())
321 .await;
322 }
323
324 let tx_count = transactions.len();
325 let in_memory_store = self.validator.create_in_memory_store();
326 let start_time = std::time::Instant::now();
327 info!(
328 "Started executing {} transactions. You can now attach a profiler",
329 transactions.len()
330 );
331
332 self.execute_transactions_in_memory(in_memory_store.clone(), transactions)
333 .await;
334
335 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
336 info!(
337 "Execution finished in {}s, TPS={}, number of DB object reads per transaction: {}",
338 elapsed,
339 tx_count as f64 / elapsed,
340 in_memory_store.get_num_object_reads() as f64 / tx_count as f64
341 );
342 }
343
344 async fn execute_sample_transaction(&self, sample_transaction: CertifiedTransaction) {
347 info!(
348 "Sample transaction digest={:?}: {:?}",
349 sample_transaction.digest(),
350 sample_transaction.data()
351 );
352 let effects = self
353 .validator()
354 .execute_dry_run(sample_transaction.into_unsigned())
355 .await;
356 info!("Sample effects: {:?}\n\n", effects);
357 assert!(effects.status().is_success());
358 }
359
360 pub(crate) async fn benchmark_transaction_signing(
362 &self,
363 transactions: Vec<Transaction>,
364 print_sample_tx: bool,
365 ) {
366 if print_sample_tx {
367 let sample_transaction = &transactions[0];
368 info!("Sample transaction: {:?}", sample_transaction.data());
369 }
370
371 let tx_count = transactions.len();
372 let start_time = std::time::Instant::now();
373 self.validator_sign_transactions(transactions).await;
374 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
375 info!(
376 "Transaction signing finished in {}s, TPS={}.",
377 elapsed,
378 tx_count as f64 / elapsed,
379 );
380 }
381
382 pub(crate) async fn benchmark_checkpoint_executor(
383 &self,
384 transactions: Vec<CertifiedTransaction>,
385 checkpoint_size: usize,
386 ) {
387 self.execute_sample_transaction(transactions[0].clone())
388 .await;
389
390 info!("Executing all transactions to generate effects");
391 let tx_count = transactions.len();
392 let in_memory_store = self.validator.create_in_memory_store();
393 let effects: BTreeMap<_, _> = self
394 .execute_transactions_in_memory(in_memory_store.clone(), transactions.clone())
395 .await
396 .into_iter()
397 .map(|e| (*e.transaction_digest(), e))
398 .collect();
399
400 info!("Building checkpoints");
401 let validator = self.validator();
402 let checkpoints = validator
403 .build_checkpoints(transactions, effects, checkpoint_size)
404 .await;
405 info!("Built {} checkpoints", checkpoints.len());
406 let last_checkpoint_seq = *checkpoints.last().unwrap().0.sequence_number();
407 let checkpoint_executor = validator.create_checkpoint_executor();
408 for (checkpoint, contents) in checkpoints {
409 let state = validator.get_validator();
410 state
411 .get_checkpoint_store()
412 .insert_verified_checkpoint(&checkpoint)
413 .unwrap();
414 state
415 .get_state_sync_store()
416 .multi_insert_transaction_and_effects(contents.transactions());
417 state
418 .get_checkpoint_store()
419 .insert_verified_checkpoint_contents(&checkpoint, contents)
420 .unwrap();
421 state
422 .get_checkpoint_store()
423 .update_highest_synced_checkpoint(&checkpoint)
424 .unwrap();
425 }
426 let start_time = std::time::Instant::now();
427 info!("Starting checkpoint execution. You can now attach a profiler");
428 checkpoint_executor
429 .run_epoch(Some(RunWithRange::Checkpoint(last_checkpoint_seq)))
430 .await;
431 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
432 info!(
433 "Checkpoint execution finished in {}s, TPS={}.",
434 elapsed,
435 tx_count as f64 / elapsed,
436 );
437 }
438
439 async fn execute_raw_transactions(
440 &self,
441 transactions: Vec<Transaction>,
442 ) -> Vec<TransactionEffects> {
443 let tasks: FuturesUnordered<_> = transactions
444 .into_iter()
445 .map(|tx| {
446 let validator = self.validator();
447 tokio::spawn(async move { validator.execute_raw_transaction(tx).await })
448 })
449 .collect();
450 let results: Vec<_> = tasks.collect().await;
451 results.into_iter().map(|r| r.unwrap()).collect()
452 }
453
454 async fn execute_transactions_in_memory(
455 &self,
456 store: InMemoryObjectStore,
457 transactions: Vec<CertifiedTransaction>,
458 ) -> Vec<TransactionEffects> {
459 let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
460 if has_shared_object {
461 let mut effects = Vec::new();
463 for transaction in transactions {
464 effects.push(
465 self.validator
466 .execute_transaction_in_memory(store.clone(), transaction)
467 .await,
468 );
469 }
470 effects
471 } else {
472 let tasks: FuturesUnordered<_> = transactions
473 .into_iter()
474 .map(|tx| {
475 let store = store.clone();
476 let validator = self.validator();
477 tokio::spawn(
478 async move { validator.execute_transaction_in_memory(store, tx).await },
479 )
480 })
481 .collect();
482 let results: Vec<_> = tasks.collect().await;
483 results.into_iter().map(|r| r.unwrap()).collect()
484 }
485 }
486
487 fn refresh_gas_objects(&mut self, mut new_gas_objects: HashMap<ObjectID, ObjectRef>) {
488 info!("Refreshing gas objects");
489 for account in self.user_accounts.values_mut() {
490 let refreshed_gas_objects: Vec<_> = account
491 .gas_objects
492 .iter()
493 .map(|oref| {
494 if let Some(new_oref) = new_gas_objects.remove(&oref.object_id) {
495 new_oref
496 } else {
497 *oref
498 }
499 })
500 .collect();
501 account.gas_objects = Arc::new(refreshed_gas_objects);
502 }
503 }
504 pub(crate) async fn validator_sign_transactions(
505 &self,
506 transactions: Vec<Transaction>,
507 ) -> Vec<HandleTransactionResponse> {
508 info!(
509 "Started signing {} transactions. You can now attach a profiler",
510 transactions.len(),
511 );
512 let tasks: FuturesUnordered<_> = transactions
513 .into_iter()
514 .map(|tx| {
515 let validator = self.validator();
516 tokio::spawn(async move { validator.sign_transaction(tx).await })
517 })
518 .collect();
519 let results: Vec<_> = tasks.collect().await;
520 results.into_iter().map(|r| r.unwrap()).collect()
521 }
522}