iota_single_node_benchmark/
benchmark_context.rs1use std::{
6 collections::{BTreeMap, HashMap},
7 ops::Deref,
8 sync::Arc,
9};
10
11use futures::{StreamExt, stream::FuturesUnordered};
12use iota_config::node::RunWithRange;
13use iota_test_transaction_builder::PublishData;
14use iota_types::{
15 base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
16 effects::{TransactionEffects, TransactionEffectsAPI},
17 messages_grpc::HandleTransactionResponse,
18 mock_checkpoint_builder::ValidatorKeypairProvider,
19 transaction::{CertifiedTransaction, SignedTransaction, Transaction, VerifiedTransaction},
20};
21use tracing::info;
22
23use crate::{
24 command::Component,
25 mock_account::{Account, batch_create_account_and_gas},
26 mock_storage::InMemoryObjectStore,
27 single_node::SingleValidator,
28 tx_generator::{RootObjectCreateTxGenerator, SharedObjectCreateTxGenerator, TxGenerator},
29 workload::Workload,
30};
31
32pub struct BenchmarkContext {
33 validator: SingleValidator,
34 user_accounts: BTreeMap<IotaAddress, Account>,
35 admin_account: Account,
36 benchmark_component: Component,
37}
38
39impl BenchmarkContext {
40 pub(crate) async fn new(
41 workload: Workload,
42 benchmark_component: Component,
43 print_sample_tx: bool,
44 ) -> Self {
45 let mut num_accounts = workload.num_accounts() + 1;
47 if print_sample_tx {
48 num_accounts += 1;
50 }
51 let gas_object_num_per_account = workload.gas_object_num_per_account();
52 let total = num_accounts * gas_object_num_per_account;
53
54 info!(
55 "Creating {} accounts and {} gas objects",
56 num_accounts, total
57 );
58 let (mut user_accounts, genesis_gas_objects) =
59 batch_create_account_and_gas(num_accounts, gas_object_num_per_account).await;
60 assert_eq!(genesis_gas_objects.len() as u64, total);
61 let (_, admin_account) = user_accounts.pop_last().unwrap();
62
63 info!("Initializing validator");
64 let validator = SingleValidator::new(&genesis_gas_objects[..], benchmark_component).await;
65
66 Self {
67 validator,
68 user_accounts,
69 admin_account,
70 benchmark_component,
71 }
72 }
73
74 pub(crate) fn validator(&self) -> SingleValidator {
75 self.validator.clone()
76 }
77
78 pub(crate) async fn publish_package(&mut self, publish_data: PublishData) -> ObjectRef {
79 let mut gas_objects = self.admin_account.gas_objects.deref().clone();
80 let (package, updated_gas) = self
81 .validator
82 .publish_package(
83 publish_data,
84 self.admin_account.sender,
85 &self.admin_account.keypair,
86 gas_objects[0],
87 )
88 .await;
89 gas_objects[0] = updated_gas;
90 self.admin_account.gas_objects = Arc::new(gas_objects);
91 package
92 }
93
94 pub(crate) async fn preparing_dynamic_fields(
98 &mut self,
99 move_package: ObjectID,
100 num_dynamic_fields: u64,
101 ) -> HashMap<IotaAddress, ObjectRef> {
102 let mut root_objects = HashMap::new();
103
104 if num_dynamic_fields == 0 {
105 return root_objects;
106 }
107
108 info!("Preparing root object with dynamic fields");
109 let root_object_create_transactions = self
110 .generate_transactions(Arc::new(RootObjectCreateTxGenerator::new(
111 move_package,
112 num_dynamic_fields,
113 )))
114 .await;
115 let results = self
116 .execute_raw_transactions(root_object_create_transactions)
117 .await;
118 let mut new_gas_objects = HashMap::new();
119 for effects in results {
120 self.validator()
121 .get_validator()
122 .get_cache_commit()
123 .commit_transaction_outputs(
124 effects.executed_epoch(),
125 &[*effects.transaction_digest()],
126 );
127 let (owner, root_object) = effects
128 .created()
129 .into_iter()
130 .filter_map(|(oref, owner)| {
131 owner
132 .get_address_owner_address()
133 .ok()
134 .map(|owner| (owner, oref))
135 })
136 .next()
137 .unwrap();
138 root_objects.insert(owner, root_object);
139 let gas_object = effects.gas_object().0;
140 new_gas_objects.insert(gas_object.0, gas_object);
141 }
142 self.refresh_gas_objects(new_gas_objects);
143 info!("Finished preparing root object with dynamic fields");
144 root_objects
145 }
146
147 pub(crate) async fn prepare_shared_objects(
148 &mut self,
149 move_package: ObjectID,
150 num_shared_objects: usize,
151 ) -> Vec<(ObjectID, SequenceNumber)> {
152 let mut shared_objects = Vec::new();
153
154 if num_shared_objects == 0 {
155 return shared_objects;
156 }
157 assert!(num_shared_objects <= self.user_accounts.len());
158
159 info!("Preparing shared objects");
160 let generator = SharedObjectCreateTxGenerator::new(move_package);
161 let shared_object_create_transactions: Vec<_> = self
162 .user_accounts
163 .values()
164 .take(num_shared_objects)
165 .map(|account| generator.generate_tx(account.clone()))
166 .collect();
167 let results = self
168 .execute_raw_transactions(shared_object_create_transactions)
169 .await;
170 let mut new_gas_objects = HashMap::new();
171 let epoch_id = self.validator.get_epoch_store().epoch();
172 let cache_commit = self.validator.get_validator().get_cache_commit();
173 for effects in results {
174 let shared_object = effects
175 .created()
176 .into_iter()
177 .filter_map(|(oref, owner)| {
178 if owner.is_shared() {
179 Some((oref.0, oref.1))
180 } else {
181 None
182 }
183 })
184 .next()
185 .unwrap();
186 shared_objects.push(shared_object);
187 let gas_object = effects.gas_object().0;
188 new_gas_objects.insert(gas_object.0, gas_object);
189 cache_commit.commit_transaction_outputs(epoch_id, &[*effects.transaction_digest()]);
196 }
197 self.refresh_gas_objects(new_gas_objects);
198 info!("Finished preparing shared objects");
199 shared_objects
200 }
201
202 pub(crate) async fn generate_transactions(
203 &self,
204 tx_generator: Arc<dyn TxGenerator>,
205 ) -> Vec<Transaction> {
206 info!(
207 "{}: Creating {} transactions",
208 tx_generator.name(),
209 self.user_accounts.len()
210 );
211 let tasks: FuturesUnordered<_> = self
212 .user_accounts
213 .values()
214 .map(|account| {
215 let account = account.clone();
216 let tx_generator = tx_generator.clone();
217 tokio::spawn(async move { tx_generator.generate_tx(account) })
218 })
219 .collect();
220 let results: Vec<_> = tasks.collect().await;
221 results.into_iter().map(|r| r.unwrap()).collect()
222 }
223
224 pub(crate) async fn certify_transactions(
225 &self,
226 transactions: Vec<Transaction>,
227 skip_signing: bool,
228 ) -> Vec<CertifiedTransaction> {
229 info!("Creating transaction certificates");
230 let tasks: FuturesUnordered<_> = transactions
231 .into_iter()
232 .map(|tx| {
233 let validator = self.validator();
234 tokio::spawn(async move {
235 let committee = validator.get_committee();
236 let validator_state = validator.get_validator();
237 let sig = if skip_signing {
238 SignedTransaction::sign(
239 0,
240 &tx,
241 &*validator_state.secret,
242 validator_state.name,
243 )
244 } else {
245 let verified_tx = VerifiedTransaction::new_unchecked(tx.clone());
246 validator_state
247 .handle_transaction(validator.get_epoch_store(), verified_tx)
248 .await
249 .unwrap()
250 .status
251 .into_signed_for_testing()
252 };
253 CertifiedTransaction::new(tx.into_data(), vec![sig], committee).unwrap()
254 })
255 })
256 .collect();
257 let results: Vec<_> = tasks.collect().await;
258 results.into_iter().map(|r| r.unwrap()).collect()
259 }
260
261 pub(crate) async fn benchmark_transaction_execution(
262 &self,
263 transactions: Vec<CertifiedTransaction>,
264 print_sample_tx: bool,
265 ) {
266 if print_sample_tx {
267 self.execute_sample_transaction(transactions[0].clone())
270 .await;
271 }
272
273 let tx_count = transactions.len();
274 let start_time = std::time::Instant::now();
275 info!(
276 "Started executing {} transactions. You can now attach a profiler",
277 transactions.len()
278 );
279
280 let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
281 if has_shared_object {
282 for transaction in transactions {
284 self.validator
285 .execute_certificate(transaction, self.benchmark_component)
286 .await;
287 }
288 } else {
289 let tasks: FuturesUnordered<_> = transactions
290 .into_iter()
291 .map(|tx| {
292 let validator = self.validator();
293 let component = self.benchmark_component;
294 tokio::spawn(async move { validator.execute_certificate(tx, component).await })
295 })
296 .collect();
297 let results: Vec<_> = tasks.collect().await;
298 results.into_iter().for_each(|r| {
299 r.unwrap();
300 });
301 }
302
303 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
304 info!(
305 "Execution finished in {}s, TPS={}",
306 elapsed,
307 tx_count as f64 / elapsed
308 );
309 }
310
311 pub(crate) async fn benchmark_transaction_execution_in_memory(
312 &self,
313 transactions: Vec<CertifiedTransaction>,
314 print_sample_tx: bool,
315 ) {
316 if print_sample_tx {
317 self.execute_sample_transaction(transactions[0].clone())
318 .await;
319 }
320
321 let tx_count = transactions.len();
322 let in_memory_store = self.validator.create_in_memory_store();
323 let start_time = std::time::Instant::now();
324 info!(
325 "Started executing {} transactions. You can now attach a profiler",
326 transactions.len()
327 );
328
329 self.execute_transactions_in_memory(in_memory_store.clone(), transactions)
330 .await;
331
332 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
333 info!(
334 "Execution finished in {}s, TPS={}, number of DB object reads per transaction: {}",
335 elapsed,
336 tx_count as f64 / elapsed,
337 in_memory_store.get_num_object_reads() as f64 / tx_count as f64
338 );
339 }
340
341 async fn execute_sample_transaction(&self, sample_transaction: CertifiedTransaction) {
344 info!(
345 "Sample transaction digest={:?}: {:?}",
346 sample_transaction.digest(),
347 sample_transaction.data()
348 );
349 let effects = self
350 .validator()
351 .execute_dry_run(sample_transaction.into_unsigned())
352 .await;
353 info!("Sample effects: {:?}\n\n", effects);
354 assert!(effects.status().is_ok());
355 }
356
357 pub(crate) async fn benchmark_transaction_signing(
359 &self,
360 transactions: Vec<Transaction>,
361 print_sample_tx: bool,
362 ) {
363 if print_sample_tx {
364 let sample_transaction = &transactions[0];
365 info!("Sample transaction: {:?}", sample_transaction.data());
366 }
367
368 let tx_count = transactions.len();
369 let start_time = std::time::Instant::now();
370 self.validator_sign_transactions(transactions).await;
371 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
372 info!(
373 "Transaction signing finished in {}s, TPS={}.",
374 elapsed,
375 tx_count as f64 / elapsed,
376 );
377 }
378
379 pub(crate) async fn benchmark_checkpoint_executor(
380 &self,
381 transactions: Vec<CertifiedTransaction>,
382 checkpoint_size: usize,
383 ) {
384 self.execute_sample_transaction(transactions[0].clone())
385 .await;
386
387 info!("Executing all transactions to generate effects");
388 let tx_count = transactions.len();
389 let in_memory_store = self.validator.create_in_memory_store();
390 let effects: BTreeMap<_, _> = self
391 .execute_transactions_in_memory(in_memory_store.clone(), transactions.clone())
392 .await
393 .into_iter()
394 .map(|e| (*e.transaction_digest(), e))
395 .collect();
396
397 info!("Building checkpoints");
398 let validator = self.validator();
399 let checkpoints = validator
400 .build_checkpoints(transactions, effects, checkpoint_size)
401 .await;
402 info!("Built {} checkpoints", checkpoints.len());
403 let last_checkpoint_seq = *checkpoints.last().unwrap().0.sequence_number();
404 let checkpoint_executor = validator.create_checkpoint_executor();
405 for (checkpoint, contents) in checkpoints {
406 let state = validator.get_validator();
407 state
408 .get_checkpoint_store()
409 .insert_verified_checkpoint(&checkpoint)
410 .unwrap();
411 state
412 .get_state_sync_store()
413 .multi_insert_transaction_and_effects(contents.transactions());
414 state
415 .get_checkpoint_store()
416 .insert_verified_checkpoint_contents(&checkpoint, contents)
417 .unwrap();
418 state
419 .get_checkpoint_store()
420 .update_highest_synced_checkpoint(&checkpoint)
421 .unwrap();
422 }
423 let start_time = std::time::Instant::now();
424 info!("Starting checkpoint execution. You can now attach a profiler");
425 checkpoint_executor
426 .run_epoch(Some(RunWithRange::Checkpoint(last_checkpoint_seq)))
427 .await;
428 let elapsed = start_time.elapsed().as_millis() as f64 / 1000f64;
429 info!(
430 "Checkpoint execution finished in {}s, TPS={}.",
431 elapsed,
432 tx_count as f64 / elapsed,
433 );
434 }
435
436 async fn execute_raw_transactions(
437 &self,
438 transactions: Vec<Transaction>,
439 ) -> Vec<TransactionEffects> {
440 let tasks: FuturesUnordered<_> = transactions
441 .into_iter()
442 .map(|tx| {
443 let validator = self.validator();
444 tokio::spawn(async move { validator.execute_raw_transaction(tx).await })
445 })
446 .collect();
447 let results: Vec<_> = tasks.collect().await;
448 results.into_iter().map(|r| r.unwrap()).collect()
449 }
450
451 async fn execute_transactions_in_memory(
452 &self,
453 store: InMemoryObjectStore,
454 transactions: Vec<CertifiedTransaction>,
455 ) -> Vec<TransactionEffects> {
456 let has_shared_object = transactions.iter().any(|tx| tx.contains_shared_object());
457 if has_shared_object {
458 let mut effects = Vec::new();
460 for transaction in transactions {
461 effects.push(
462 self.validator
463 .execute_transaction_in_memory(store.clone(), transaction)
464 .await,
465 );
466 }
467 effects
468 } else {
469 let tasks: FuturesUnordered<_> = transactions
470 .into_iter()
471 .map(|tx| {
472 let store = store.clone();
473 let validator = self.validator();
474 tokio::spawn(
475 async move { validator.execute_transaction_in_memory(store, tx).await },
476 )
477 })
478 .collect();
479 let results: Vec<_> = tasks.collect().await;
480 results.into_iter().map(|r| r.unwrap()).collect()
481 }
482 }
483
484 fn refresh_gas_objects(&mut self, mut new_gas_objects: HashMap<ObjectID, ObjectRef>) {
485 info!("Refreshing gas objects");
486 for account in self.user_accounts.values_mut() {
487 let refreshed_gas_objects: Vec<_> = account
488 .gas_objects
489 .iter()
490 .map(|oref| {
491 if let Some(new_oref) = new_gas_objects.remove(&oref.0) {
492 new_oref
493 } else {
494 *oref
495 }
496 })
497 .collect();
498 account.gas_objects = Arc::new(refreshed_gas_objects);
499 }
500 }
501 pub(crate) async fn validator_sign_transactions(
502 &self,
503 transactions: Vec<Transaction>,
504 ) -> Vec<HandleTransactionResponse> {
505 info!(
506 "Started signing {} transactions. You can now attach a profiler",
507 transactions.len(),
508 );
509 let tasks: FuturesUnordered<_> = transactions
510 .into_iter()
511 .map(|tx| {
512 let validator = self.validator();
513 tokio::spawn(async move { validator.sign_transaction(tx).await })
514 })
515 .collect();
516 let results: Vec<_> = tasks.collect().await;
517 results.into_iter().map(|r| r.unwrap()).collect()
518 }
519}