iota_single_node_benchmark/
benchmark_context.rs1use std::{
6 collections::{BTreeMap, HashMap},
7 ops::Deref,
8 sync::Arc,
9};
10
11use futures::{StreamExt, stream::FuturesUnordered};
12use iota_config::node::RunWithRange;
13use iota_test_transaction_builder::PublishData;
14use iota_types::{
15 base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
16 effects::{TransactionEffects, TransactionEffectsAPI},
17 messages_grpc::HandleTransactionResponse,
18 mock_checkpoint_builder::ValidatorKeypairProvider,
19 transaction::{CertifiedTransaction, SignedTransaction, Transaction, VerifiedTransaction},
20};
21use tracing::info;
22
23use crate::{
24 command::Component,
25 mock_account::{Account, batch_create_account_and_gas},
26 mock_storage::InMemoryObjectStore,
27 single_node::SingleValidator,
28 tx_generator::{RootObjectCreateTxGenerator, SharedObjectCreateTxGenerator, TxGenerator},
29 workload::Workload,
30};
31
32pub struct BenchmarkContext {
33 validator: SingleValidator,
34 user_accounts: BTreeMap<IotaAddress, Account>,
35 admin_account: Account,
36 benchmark_component: Component,
37}
38
39impl BenchmarkContext {
40 pub(crate) async fn new(
41 workload: Workload,
42 benchmark_component: Component,
43 print_sample_tx: bool,
44 ) -> Self {
45 let mut num_accounts = workload.num_accounts() + 1;
47 if print_sample_tx {
48 num_accounts += 1;
50 }
51 let gas_object_num_per_account = workload.gas_object_num_per_account();
52 let total = num_accounts * gas_object_num_per_account;
53
54 info!(
55 "Creating {} accounts and {} gas objects",
56 num_accounts, total
57 );
58 let (mut user_accounts, genesis_gas_objects) =
59 batch_create_account_and_gas(num_accounts, gas_object_num_per_account).await;
60 assert_eq!(genesis_gas_objects.len() as u64, total);
61 let (_, admin_account) = user_accounts.pop_last().unwrap();
62
63 info!("Initializing validator");
64 let validator = SingleValidator::new(&genesis_gas_objects[..], benchmark_component).await;
65
66 Self {
67 validator,
68 user_accounts,
69 admin_account,
70 benchmark_component,
71 }
72 }
73
74 pub(crate) fn validator(&self) -> SingleValidator {
75 self.validator.clone()
76 }
77
78 pub(crate) async fn publish_package(&mut self, publish_data: PublishData) -> ObjectRef {
79 let mut gas_objects = self.admin_account.gas_objects.deref().clone();
80 let (package, updated_gas) = self
81 .validator
82 .publish_package(
83 publish_data,
84 self.admin_account.sender,
85 &self.admin_account.keypair,
86 gas_objects[0],
87 )
88 .await;
89 gas_objects[0] = updated_gas;
90 self.admin_account.gas_objects = Arc::new(gas_objects);
91 package
92 }
93
94 pub(crate) async fn preparing_dynamic_fields(
98 &mut self,
99 move_package: ObjectID,
100 num_dynamic_fields: u64,
101 ) -> HashMap<IotaAddress, ObjectRef> {
102 let mut root_objects = HashMap::new();
103
104 if num_dynamic_fields == 0 {
105 return root_objects;
106 }
107
108 info!("Preparing root object with dynamic fields");
109 let root_object_create_transactions = self
110 .generate_transactions(Arc::new(RootObjectCreateTxGenerator::new(
111 move_package,
112 num_dynamic_fields,
113 )))
114 .await;
115 let results = self
116 .execute_raw_transactions(root_object_create_transactions)
117 .await;
118 let mut new_gas_objects = HashMap::new();
119 for effects in results {
120 self.validator()
121 .get_validator()
122 .get_cache_commit()
123 .commit_transaction_outputs(
124 effects.executed_epoch(),
125 &[*effects.transaction_digest()],
126 )
127 .await
128 .unwrap();
129 let (owner, root_object) = effects
130 .created()
131 .into_iter()
132 .filter_map(|(oref, owner)| {
133 owner
134 .get_address_owner_address()
135 .ok()
136 .map(|owner| (owner, oref))
137 })
138 .next()
139 .unwrap();
140 root_objects.insert(owner, root_object);
141 let gas_object = effects.gas_object().0;
142 new_gas_objects.insert(gas_object.0, gas_object);
143 }
144 self.refresh_gas_objects(new_gas_objects);
145 info!("Finished preparing root object with dynamic fields");
146 root_objects
147 }
148
149 pub(crate) async fn prepare_shared_objects(
150 &mut self,
151 move_package: ObjectID,
152 num_shared_objects: usize,
153 ) -> Vec<(ObjectID, SequenceNumber)> {
154 let mut shared_objects = Vec::new();
155
156 if num_shared_objects == 0 {
157 return shared_objects;
158 }
159 assert!(num_shared_objects <= self.user_accounts.len());
160
161 info!("Preparing shared objects");
162 let generator = SharedObjectCreateTxGenerator::new(move_package);
163 let shared_object_create_transactions: Vec<_> = self
164 .user_accounts
165 .values()
166 .take(num_shared_objects)
167 .map(|account| generator.generate_tx(account.clone()))
168 .collect();
169 let results = self
170 .execute_raw_transactions(shared_object_create_transactions)
171 .await;
172 let mut new_gas_objects = HashMap::new();
173 let epoch_id = self.validator.get_epoch_store().epoch();
174 let cache_commit = self.validator.get_validator().get_cache_commit();
175 for effects in results {
176 let shared_object = effects
177 .created()
178 .into_iter()
179 .filter_map(|(oref, owner)| {
180 if owner.is_shared() {
181 Some((oref.0, oref.1))
182 } else {
183 None
184 }
185 })
186 .next()
187 .unwrap();
188 shared_objects.push(shared_object);
189 let gas_object = effects.gas_object().0;
190 new_gas_objects.insert(gas_object.0, gas_object);
191 cache_commit
198 .commit_transaction_outputs(epoch_id, &[*effects.transaction_digest()])
199 .await
200 .unwrap();
201 }
202 self.refresh_gas_objects(new_gas_objects);
203 info!("Finished preparing shared objects");
204 shared_objects
205 }
206
207 pub(crate) async fn generate_transactions(
208 &self,
209 tx_generator: Arc<dyn TxGenerator>,
210 ) -> Vec<Transaction> {
211 info!(
212 "{}: Creating {} transactions",
213 tx_generator.name(),
214 self.user_accounts.len()
215 );
216 let tasks: FuturesUnordered<_> = self
217 .user_accounts
218 .values()
219 .map(|account| {
220 let account = account.clone();
221 let tx_generator = tx_generator.clone();
222 tokio::spawn(async move { tx_generator.generate_tx(account) })
223 })
224 .collect();
225 let results: Vec<_> = tasks.collect().await;
226 results.into_iter().map(|r| r.unwrap()).collect()
227 }
228
229 pub(crate) async fn certify_transactions(
230 &self,
231 transactions: Vec<Transaction>,
232 skip_signing: bool,
233 ) -> Vec<CertifiedTransaction> {
234 info!("Creating transaction certificates");
235 let tasks: FuturesUnordered<_> = transactions
236 .into_iter()
237 .map(|tx| {
238 let validator = self.validator();
239 tokio::spawn(async move {
240 let committee = validator.get_committee();
241 let validator_state = validator.get_validator();
242 let sig = if skip_signing {
243 SignedTransaction::sign(
244 0,
245 &tx,
246 &*validator_state.secret,
247 validator_state.name,
248 )
249 } else {
250 let verified_tx = VerifiedTransaction::new_unchecked(tx.clone());
251 validator_state
252 .handle_transaction(validator.get_epoch_store(), verified_tx)
253 .await
254 .unwrap()
255 .status
256 .into_signed_for_testing()
257 };
258 CertifiedTransaction::new(tx.into_data(), vec![sig], committee).unwrap()
259 })
260 })
261 .collect();
262 let results: Vec<_> = tasks.collect().await;
263 results.into_iter().map(|r| r.unwrap()).collect()
264 }
265
266 pub(crate) async fn benchmark_transaction_execution(
267 &self,
268 transactions: Vec<CertifiedTransaction>,
269 print_sample_tx: bool,
270 ) {
271 if print_sample_tx {
272 self.execute_sample_transaction(transactions[0].clone())
275 .await;
276 }
277
278 let tx_count = transactions.len();
279 let start_time = std::time::Instant::now();
280 info!(
281 "Started executing {} transactions. You can now attach a profiler",
282 transactions.len()
283 );
284
285 let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
286 if has_shared_object {
287 for transaction in transactions {
289 self.validator
290 .execute_certificate(transaction, self.benchmark_component)
291 .await;
292 }
293 } else {
294 let tasks: FuturesUnordered<_> = transactions
295 .into_iter()
296 .map(|tx| {
297 let validator = self.validator();
298 let component = self.benchmark_component;
299 tokio::spawn(async move { validator.execute_certificate(tx, component).await })
300 })
301 .collect();
302 let results: Vec<_> = tasks.collect().await;
303 results.into_iter().for_each(|r| {
304 r.unwrap();
305 });
306 }
307
308 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
309 info!(
310 "Execution finished in {}s, TPS={}",
311 elapsed,
312 tx_count as f64 / elapsed
313 );
314 }
315
316 pub(crate) async fn benchmark_transaction_execution_in_memory(
317 &self,
318 transactions: Vec<CertifiedTransaction>,
319 print_sample_tx: bool,
320 ) {
321 if print_sample_tx {
322 self.execute_sample_transaction(transactions[0].clone())
323 .await;
324 }
325
326 let tx_count = transactions.len();
327 let in_memory_store = self.validator.create_in_memory_store();
328 let start_time = std::time::Instant::now();
329 info!(
330 "Started executing {} transactions. You can now attach a profiler",
331 transactions.len()
332 );
333
334 self.execute_transactions_in_memory(in_memory_store.clone(), transactions)
335 .await;
336
337 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
338 info!(
339 "Execution finished in {}s, TPS={}, number of DB object reads per transaction: {}",
340 elapsed,
341 tx_count as f64 / elapsed,
342 in_memory_store.get_num_object_reads() as f64 / tx_count as f64
343 );
344 }
345
346 async fn execute_sample_transaction(&self, sample_transaction: CertifiedTransaction) {
349 info!(
350 "Sample transaction digest={:?}: {:?}",
351 sample_transaction.digest(),
352 sample_transaction.data()
353 );
354 let effects = self
355 .validator()
356 .execute_dry_run(sample_transaction.into_unsigned())
357 .await;
358 info!("Sample effects: {:?}\n\n", effects);
359 assert!(effects.status().is_ok());
360 }
361
362 pub(crate) async fn benchmark_transaction_signing(
364 &self,
365 transactions: Vec<Transaction>,
366 print_sample_tx: bool,
367 ) {
368 if print_sample_tx {
369 let sample_transaction = &transactions[0];
370 info!("Sample transaction: {:?}", sample_transaction.data());
371 }
372
373 let tx_count = transactions.len();
374 let start_time = std::time::Instant::now();
375 self.validator_sign_transactions(transactions).await;
376 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
377 info!(
378 "Transaction signing finished in {}s, TPS={}.",
379 elapsed,
380 tx_count as f64 / elapsed,
381 );
382 }
383
384 pub(crate) async fn benchmark_checkpoint_executor(
385 &self,
386 transactions: Vec<CertifiedTransaction>,
387 checkpoint_size: usize,
388 ) {
389 self.execute_sample_transaction(transactions[0].clone())
390 .await;
391
392 info!("Executing all transactions to generate effects");
393 let tx_count = transactions.len();
394 let in_memory_store = self.validator.create_in_memory_store();
395 let effects: BTreeMap<_, _> = self
396 .execute_transactions_in_memory(in_memory_store.clone(), transactions.clone())
397 .await
398 .into_iter()
399 .map(|e| (*e.transaction_digest(), e))
400 .collect();
401
402 info!("Building checkpoints");
403 let validator = self.validator();
404 let checkpoints = validator
405 .build_checkpoints(transactions, effects, checkpoint_size)
406 .await;
407 info!("Built {} checkpoints", checkpoints.len());
408 let last_checkpoint_seq = *checkpoints.last().unwrap().0.sequence_number();
409 let (mut checkpoint_executor, checkpoint_sender) = validator.create_checkpoint_executor();
410 for (checkpoint, contents) in checkpoints {
411 let state = validator.get_validator();
412 state
413 .get_checkpoint_store()
414 .insert_verified_checkpoint(&checkpoint)
415 .unwrap();
416 state
417 .get_state_sync_store()
418 .multi_insert_transaction_and_effects(contents.transactions())
419 .unwrap();
420 state
421 .get_checkpoint_store()
422 .insert_verified_checkpoint_contents(&checkpoint, contents)
423 .unwrap();
424 state
425 .get_checkpoint_store()
426 .update_highest_synced_checkpoint(&checkpoint)
427 .unwrap();
428 checkpoint_sender.send(checkpoint).unwrap();
429 }
430 let start_time = std::time::Instant::now();
431 info!("Starting checkpoint execution. You can now attach a profiler");
432 checkpoint_executor
433 .run_epoch(
434 validator.get_epoch_store().clone(),
435 Some(RunWithRange::Checkpoint(last_checkpoint_seq)),
436 )
437 .await;
438 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
439 info!(
440 "Checkpoint execution finished in {}s, TPS={}.",
441 elapsed,
442 tx_count as f64 / elapsed,
443 );
444 }
445
446 async fn execute_raw_transactions(
447 &self,
448 transactions: Vec<Transaction>,
449 ) -> Vec<TransactionEffects> {
450 let tasks: FuturesUnordered<_> = transactions
451 .into_iter()
452 .map(|tx| {
453 let validator = self.validator();
454 tokio::spawn(async move { validator.execute_raw_transaction(tx).await })
455 })
456 .collect();
457 let results: Vec<_> = tasks.collect().await;
458 results.into_iter().map(|r| r.unwrap()).collect()
459 }
460
461 async fn execute_transactions_in_memory(
462 &self,
463 store: InMemoryObjectStore,
464 transactions: Vec<CertifiedTransaction>,
465 ) -> Vec<TransactionEffects> {
466 let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
467 if has_shared_object {
468 let mut effects = Vec::new();
470 for transaction in transactions {
471 effects.push(
472 self.validator
473 .execute_transaction_in_memory(store.clone(), transaction)
474 .await,
475 );
476 }
477 effects
478 } else {
479 let tasks: FuturesUnordered<_> = transactions
480 .into_iter()
481 .map(|tx| {
482 let store = store.clone();
483 let validator = self.validator();
484 tokio::spawn(
485 async move { validator.execute_transaction_in_memory(store, tx).await },
486 )
487 })
488 .collect();
489 let results: Vec<_> = tasks.collect().await;
490 results.into_iter().map(|r| r.unwrap()).collect()
491 }
492 }
493
494 fn refresh_gas_objects(&mut self, mut new_gas_objects: HashMap<ObjectID, ObjectRef>) {
495 info!("Refreshing gas objects");
496 for account in self.user_accounts.values_mut() {
497 let refreshed_gas_objects: Vec<_> = account
498 .gas_objects
499 .iter()
500 .map(|oref| {
501 if let Some(new_oref) = new_gas_objects.remove(&oref.0) {
502 new_oref
503 } else {
504 *oref
505 }
506 })
507 .collect();
508 account.gas_objects = Arc::new(refreshed_gas_objects);
509 }
510 }
511 pub(crate) async fn validator_sign_transactions(
512 &self,
513 transactions: Vec<Transaction>,
514 ) -> Vec<HandleTransactionResponse> {
515 info!(
516 "Started signing {} transactions. You can now attach a profiler",
517 transactions.len(),
518 );
519 let tasks: FuturesUnordered<_> = transactions
520 .into_iter()
521 .map(|tx| {
522 let validator = self.validator();
523 tokio::spawn(async move { validator.sign_transaction(tx).await })
524 })
525 .collect();
526 let results: Vec<_> = tasks.collect().await;
527 results.into_iter().map(|r| r.unwrap()).collect()
528 }
529}