1use std::{collections::HashMap, path::PathBuf, str::FromStr};
6
7use eyre::Result;
8use iota_swarm_config::genesis_config::GenesisConfig;
9use iota_types::{base_types::IotaAddress, multiaddr::Multiaddr};
10use serde::{Deserialize, Serialize};
11
12use super::{ProtocolCommands, ProtocolMetrics};
13use crate::{
14 ConsensusProtocol,
15 benchmark::{BenchmarkParameters, BenchmarkType},
16 client::Instance,
17 display,
18 settings::{BinaryBuildConfig, Settings, build_cargo_command, join_non_empty_strings},
19};
20
21#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
22pub enum IotaBenchmarkType {
23 SharedObjectsRatio(u16),
26 AbstractAccountBench,
28}
29
30impl Default for IotaBenchmarkType {
31 fn default() -> Self {
32 Self::SharedObjectsRatio(0)
33 }
34}
35
36impl std::fmt::Debug for IotaBenchmarkType {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 Self::SharedObjectsRatio(shared_objects_ratio) => write!(f, "{shared_objects_ratio}"),
40 Self::AbstractAccountBench => write!(f, "abstract_account_bench"),
41 }
42 }
43}
44
45impl std::fmt::Display for IotaBenchmarkType {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 match self {
48 Self::SharedObjectsRatio(shared_objects_ratio) => {
49 write!(f, "bench ({}% shared objects)", shared_objects_ratio)
50 }
51 Self::AbstractAccountBench => write!(f, "abstract-account-bench"),
52 }
53 }
54}
55
56impl FromStr for IotaBenchmarkType {
57 type Err = String;
58
59 fn from_str(s: &str) -> Result<Self, Self::Err> {
60 let v = s.trim().to_ascii_lowercase();
61
62 if let Ok(n) = v.parse::<u16>() {
64 return Ok(Self::SharedObjectsRatio(n.min(100)));
65 }
66
67 match v.as_str() {
68 "abstract-account-bench" => Ok(Self::AbstractAccountBench),
69 _ => Err(format!(
70 "Unknown benchmark type '{s}'. Expected 0..=100 or abstract-account-bench"
71 )),
72 }
73 }
74}
75
76impl BenchmarkType for IotaBenchmarkType {}
77
78pub struct IotaProtocol {
80 working_dir: PathBuf,
81 use_fullnode_for_execution: bool,
82 use_precompiled_binaries: bool,
83 build_configs: HashMap<String, BinaryBuildConfig>,
84 enable_flamegraph: bool,
85}
86
87impl IotaProtocol {
88 pub fn new(settings: &Settings) -> Self {
90 Self {
91 working_dir: [&settings.working_dir, &iota_config::IOTA_CONFIG_DIR.into()]
92 .iter()
93 .collect(),
94 use_fullnode_for_execution: settings.use_fullnode_for_execution,
95 use_precompiled_binaries: settings.build_cache_enabled(),
96 build_configs: settings.build_configs.clone(),
97 enable_flamegraph: settings.enable_flamegraph,
98 }
99 }
100
101 fn run_binary_command<S1: AsRef<str>, S2: AsRef<str>>(
105 &self,
106 binary_name: &str,
107 setup_commands: &[S1],
108 additional_args: &[S2],
109 ) -> String {
110 if self.use_precompiled_binaries {
111 let binary_path = format!("./target/release/{binary_name}");
113 let binary_command = join_non_empty_strings(
114 &std::iter::once(binary_path.as_str())
115 .chain(additional_args.iter().map(|s| s.as_ref()))
116 .collect::<Vec<_>>(),
117 " ",
118 );
119
120 let all_commands: Vec<String> = setup_commands
121 .iter()
122 .map(|s| s.as_ref().to_string())
123 .chain(std::iter::once(binary_command))
124 .collect();
125
126 join_non_empty_strings(&all_commands, " && ")
127 } else {
128 let build_config = self
129 .build_configs
130 .get(binary_name)
131 .expect("No build config found for binary");
132 build_cargo_command(
133 "run",
134 build_config.toolchain.clone(),
135 build_config.features.clone(),
136 &[binary_name],
137 setup_commands,
138 additional_args,
139 )
140 }
141 }
142
143 fn otel_env(
144 &self,
145 parameters: &BenchmarkParameters<IotaBenchmarkType>,
146 service_name: &str,
147 ) -> Vec<String> {
148 let Some(otel) = ¶meters.otel else {
149 return vec![];
150 };
151 vec![
155 format!("export OTEL_EXPORTER_OTLP_ENDPOINT={}", otel.otlp_endpoint),
156 "export TRACE_FILTER=[handle_transaction]=trace,[process_certificate]=trace"
157 .to_string(),
158 format!(
159 "export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT={}",
160 otel.otlp_endpoint
161 ),
162 format!("export OTEL_EXPORTER_OTLP_PROTOCOL={}", otel.protocol),
163 format!("export OTEL_TRACES_SAMPLER={}", otel.sampler),
164 format!("export OTEL_TRACES_SAMPLER_ARG={}", otel.sampler_arg),
165 format!("export OTEL_SERVICE_NAME={service_name}"),
166 format!("export OTEL_RESOURCE_ATTRIBUTES=service.name={service_name}"),
167 ]
168 }
169}
170
171impl ProtocolCommands<IotaBenchmarkType> for IotaProtocol {
172 fn protocol_dependencies(&self) -> Vec<&'static str> {
173 if !self.use_precompiled_binaries {
174 return vec!["sudo apt-get -y install libudev-dev libpq5 libpq-dev"];
175 }
176
177 vec![]
178 }
179
180 fn db_directories(&self) -> Vec<PathBuf> {
181 let authorities_db = self.working_dir.join(iota_config::AUTHORITIES_DB_NAME);
182 let consensus_db = self.working_dir.join(iota_config::CONSENSUS_DB_NAME);
183 let full_node_db: PathBuf = self.working_dir.join(iota_config::FULL_NODE_DB_PATH);
184 vec![authorities_db, consensus_db, full_node_db]
185 }
186
187 fn genesis_command<'a, I>(
188 &self,
189 instances: I,
190 parameters: &BenchmarkParameters<IotaBenchmarkType>,
191 ) -> String
192 where
193 I: Iterator<Item = &'a Instance>,
194 {
195 let working_dir = self.working_dir.display();
196 let ips = instances
197 .map(|x| {
198 match parameters.use_internal_ip_address {
199 true => x.private_ip,
200 false => x.main_ip,
201 }
202 .to_string()
203 })
204 .collect::<Vec<_>>()
205 .join(" ");
206
207 let epoch_duration_flag = parameters
208 .epoch_duration_ms
209 .map(|epoch_duration_ms| format!("--epoch-duration-ms {epoch_duration_ms}"))
210 .unwrap_or_default();
211 let chain_start_timestamp_flag = parameters
212 .chain_start_timestamp_ms
213 .map(|timestamp_ms| format!("--chain-start-timestamp-ms {timestamp_ms}"))
214 .unwrap_or_default();
215 let additional_gas_accounts_flag = format!(
216 "--num-additional-gas-accounts {}",
217 parameters.additional_gas_accounts
218 );
219
220 let iota_command = self.run_binary_command(
221 "iota",
222 &[
223 "export IOTA_PROTOCOL_CONFIG_OVERRIDE_ENABLE=1",
225 "export IOTA_PROTOCOL_CONFIG_OVERRIDE_validator_target_reward=0",
226 &format!("mkdir -p {working_dir}")
227 ],
228 &[
229 "genesis",
230 &format!("-f --working-dir {working_dir} --benchmark-ips {ips} --admin-interface-address=localhost:1337"),
231 &epoch_duration_flag,
232 &chain_start_timestamp_flag,
233 &additional_gas_accounts_flag,
234 ],
235 );
236
237 iota_command
238 }
239
240 fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
241 where
242 I: IntoIterator<Item = Instance>,
243 {
244 vec![]
254 }
255
256 fn node_command<I>(
257 &self,
258 instances: I,
259 parameters: &BenchmarkParameters<IotaBenchmarkType>,
260 ) -> Vec<(Instance, String)>
261 where
262 I: IntoIterator<Item = Instance>,
263 {
264 let working_dir = self.working_dir.clone();
265 let network_addresses = Self::resolve_network_addresses(instances, parameters);
266
267 network_addresses
268 .into_iter()
269 .enumerate()
270 .map(|(i, (instance, network_address))| {
271 let validator_config =
272 iota_config::validator_config_file(network_address.clone(), i);
273 let config_path: PathBuf = working_dir.join(validator_config);
274 let max_pipeline_delay = parameters.max_pipeline_delay;
275
276 let mut setup: Vec<String> = vec![
277 match parameters.consensus_protocol {
278 ConsensusProtocol::Starfish => {
279 "export CONSENSUS_PROTOCOL=starfish".to_string()
280 }
281 ConsensusProtocol::Mysticeti => {
282 "export CONSENSUS_PROTOCOL=mysticeti".to_string()
283 }
284 ConsensusProtocol::SwapEachEpoch => {
285 "export CONSENSUS_PROTOCOL=swap_each_epoch".to_string()
286 }
287 },
288 format!("export MAX_PIPELINE_DELAY={max_pipeline_delay}"),
289 format!("export IOTA_PROTOCOL_CONFIG_OVERRIDE_ENABLE=1"),
292 format!("export IOTA_PROTOCOL_CONFIG_OVERRIDE_validator_target_reward=0"),
293 ];
294
295 if self.enable_flamegraph {
296 setup.push("export TRACE_FLAMEGRAPH=1".to_string());
297 }
298
299 setup.extend(self.otel_env(parameters, &format!("iota-validator-{i}")));
300
301 let iota_node_command = self.run_binary_command(
302 "iota-node",
303 &setup,
304 &[format!(
305 "--config-path {} --listen-address {}",
306 config_path.display(),
307 network_address.with_zero_ip()
308 )],
309 );
310
311 display::action(format!(
312 "\n Validator-node Command ({i}): {iota_node_command}"
313 ));
314
315 (instance, iota_node_command)
316 })
317 .collect()
318 }
319
320 fn fullnode_command<I>(
321 &self,
322 instances: I,
323 parameters: &BenchmarkParameters<IotaBenchmarkType>,
324 ) -> Vec<(Instance, String)>
325 where
326 I: IntoIterator<Item = Instance>,
327 {
328 let working_dir = self.working_dir.clone();
329
330 instances
331 .into_iter()
332 .enumerate()
333 .map(|(i, instance)| {
334 let config_path: PathBuf = working_dir.join(iota_config::IOTA_FULLNODE_CONFIG);
335 let fullnode_ip = match parameters.use_internal_ip_address {
336 true => &instance.private_ip,
337 false => &instance.main_ip,
338 };
339
340 let mut setup = vec![
341 format!(
342 "sed -i 's|listen-address: \\\"127.0.0.1:|listen-address: \\\"0.0.0.0:|' {0} && sed -i 's|external-address: /ip4/127.0.0.1/|external-address: /ip4/{1}/|' {0}",
343 config_path.display(),
344 fullnode_ip
345 ),
346 if self.enable_flamegraph {
347 "export TRACE_FLAMEGRAPH=1".to_string()
348 } else {
349 "".to_string()
350 },
351 "export IOTA_PROTOCOL_CONFIG_OVERRIDE_ENABLE=1".to_string(),
353 "export IOTA_PROTOCOL_CONFIG_OVERRIDE_validator_target_reward=0".to_string(),
354 ];
355 setup.extend(self.otel_env(parameters, &format!("iota-node-{i}")));
356
357 let iota_node_command = self.run_binary_command(
358 "iota-node",
359 &setup,
360 &[&format!("--config-path {}", config_path.display())],
361 );
362
363 display::action(format!("\n Full-node Command ({i}): {iota_node_command}"));
364
365 (instance, iota_node_command)
366 })
367 .collect()
368 }
369
370 fn client_command<I>(
371 &self,
372 instances: I,
373 parameters: &BenchmarkParameters<IotaBenchmarkType>,
374 ) -> Vec<(Instance, String)>
375 where
376 I: IntoIterator<Item = Instance>,
377 {
378 let genesis_path: PathBuf = [
379 &self.working_dir,
380 &iota_config::IOTA_GENESIS_FILENAME.into(),
381 ]
382 .iter()
383 .collect();
384 let keystore_path: PathBuf = [
385 &self.working_dir,
386 &iota_config::IOTA_BENCHMARK_GENESIS_GAS_KEYSTORE_FILENAME.into(),
387 ]
388 .iter()
389 .collect();
390
391 let committee_size = parameters.nodes;
392 let clients: Vec<_> = instances.into_iter().collect();
393 let load_share = parameters.load / clients.len();
394 let metrics_port = Self::CLIENT_METRICS_PORT;
395 let gas_keys =
397 GenesisConfig::benchmark_gas_keys(committee_size + parameters.additional_gas_accounts);
398 let client_key_offset = committee_size;
400
401 clients
402 .into_iter()
403 .enumerate()
404 .map(|(i, instance)| {
405 let genesis = genesis_path.display().to_string();
406 let keystore = keystore_path.display().to_string();
407 let gas_key = &gas_keys[client_key_offset + i];
409 let gas_address = IotaAddress::from(&gas_key.public());
410
411 let client_threads = parameters.stress_num_client_threads;
412 let server_threads = parameters.stress_num_server_threads;
413
414 let mut stress_args: Vec<String> = vec![
415 format!("--num-client-threads {client_threads} --num-server-threads {server_threads}"),
416 "--local false --num-transfer-accounts 2".to_string(),
417 format!("--genesis-blob-path {genesis} --keystore-path {keystore}"),
418 format!("--primary-gas-owner-id {gas_address}"),
419 parameters.run_interval.as_stress_flag(),
421 format!("--client-metric-host 0.0.0.0 --client-metric-port {metrics_port}"),
422 if let Some(stats_path) = ¶meters.benchmark_stats_path {
423 format!("--benchmark-stats-path {stats_path}")
424 } else {
425 "".to_string()
426 },
427 ];
428
429 match parameters.benchmark_type {
430 IotaBenchmarkType::SharedObjectsRatio(shared_objects_ratio) => {
431 let transfer_objects = 100 - shared_objects_ratio;
432 let hotness_factor = parameters.shared_counter_hotness_factor.unwrap_or(50);
433 stress_args.push("bench".to_string());
434 stress_args.push(format!("--target-qps {load_share}"));
435 stress_args.push(format!("--num-workers {}", parameters.stress_num_workers));
436 stress_args.push(format!("--in-flight-ratio {}", parameters.stress_in_flight_ratio));
437 stress_args.push(format!("--shared-counter {shared_objects_ratio} --transfer-object {transfer_objects}"));
438 stress_args.push(format!("--shared-counter-hotness-factor {hotness_factor}"));
439 if let Some(num_counters) = parameters.num_shared_counters {
440 stress_args.push(format!("--num-shared-counters {num_counters}"));
441 }
442 }
443
444 IotaBenchmarkType::AbstractAccountBench => {
445 stress_args.push("abstract-account-bench".to_string());
446 stress_args.push(format!("--authenticator {}", parameters.aa_authenticator));
447 stress_args.push(format!("--tx-payload-obj-type {}", parameters.tx_payload_obj_type));
448 stress_args.push(format!("--target-qps {load_share}"));
449 stress_args.push(format!("--num-workers {}", parameters.stress_num_workers));
450 stress_args.push(format!("--in-flight-ratio {}", parameters.stress_in_flight_ratio));
451 stress_args.push(format!("--split-amount {}", parameters.aa_split_amount));
452 if parameters.should_fail {
453 stress_args.push("--should-fail".to_string());
454 }
455 }
456 }
457
458 if self.use_fullnode_for_execution {
459 stress_args.push("--use-fullnode-for-execution true".to_string());
460 stress_args.push("--fullnode-rpc-addresses http://127.0.0.1:9000".to_string());
461 }
462
463 let mut setup = vec![
464 "export MOVE_EXAMPLES_DIR=$(pwd)/examples/move".to_string(),
465 "export RUST_LOG=iota_benchmark=debug".to_string(),
466 ];
467
468 setup.extend(self.otel_env(parameters, &format!("iota-stress-{i}")));
469
470 let stress_command = self.run_binary_command("stress", &setup, &stress_args);
471
472 display::action(format!("\n Stress Command ({i}): {stress_command}"));
473
474 (instance, stress_command)
475 })
476 .collect()
477 }
478}
479
480impl IotaProtocol {
481 const CLIENT_METRICS_PORT: u16 = GenesisConfig::BENCHMARKS_PORT_OFFSET + 2000;
482
483 pub fn resolve_network_addresses(
486 instances: impl IntoIterator<Item = Instance>,
487 parameters: &BenchmarkParameters<IotaBenchmarkType>,
488 ) -> Vec<(Instance, Multiaddr)> {
489 let instances: Vec<Instance> = instances.into_iter().collect();
490 let ips: Vec<_> = instances
491 .iter()
492 .map(|x| match parameters.use_internal_ip_address {
493 true => x.private_ip.to_string(),
494 false => x.main_ip.to_string(),
495 })
496 .collect();
497
498 let genesis_config = GenesisConfig::new_for_benchmarks(
501 &ips,
502 parameters.epoch_duration_ms,
503 parameters.chain_start_timestamp_ms,
504 Some(parameters.additional_gas_accounts),
505 u64::MAX - 1,
506 );
507 let mut addresses = Vec::new();
508 if let Some(validator_configs) = genesis_config.validator_config_info.as_ref() {
509 for (i, validator_info) in validator_configs.iter().enumerate() {
510 let address = &validator_info.network_address;
511 addresses.push((instances[i].clone(), address.clone()));
512 }
513 }
514 addresses
515 }
516}
517
518impl ProtocolMetrics for IotaProtocol {
519 const BENCHMARK_DURATION: &'static str = "benchmark_duration";
520 const TOTAL_TRANSACTIONS: &'static str = "latency_s_count";
521 const LATENCY_BUCKETS: &'static str = "latency_s";
522 const LATENCY_SUM: &'static str = "latency_s_sum";
523 const LATENCY_SQUARED_SUM: &'static str = "latency_squared_s";
524
525 fn nodes_metrics_path<I>(
526 &self,
527 instances: I,
528 use_internal_ip_address: bool,
529 ) -> Vec<(Instance, String)>
530 where
531 I: IntoIterator<Item = Instance>,
532 {
533 let instances = instances.into_iter().collect::<Vec<_>>();
534 let ips = (0..instances.len())
535 .map(|_| "0.0.0.0".to_string())
536 .collect::<Vec<_>>();
537 GenesisConfig::new_for_benchmarks(&ips, None, None, None, u64::MAX)
541 .validator_config_info
542 .expect("No validator in genesis")
543 .iter()
544 .zip(instances)
545 .map(|(config, instance)| {
546 let path = format!(
547 "{}:{}{}",
548 match use_internal_ip_address {
549 true => instance.private_ip,
550 false => instance.main_ip,
551 },
552 config.metrics_address.port(),
553 iota_metrics::METRICS_ROUTE
554 );
555 (instance, path)
556 })
557 .collect()
558 }
559
560 fn clients_metrics_path<I>(
561 &self,
562 instances: I,
563 use_internal_ip_address: bool,
564 ) -> Vec<(Instance, String)>
565 where
566 I: IntoIterator<Item = Instance>,
567 {
568 instances
569 .into_iter()
570 .map(|instance| {
571 let path = format!(
572 "{}:{}{}",
573 match use_internal_ip_address {
574 true => instance.private_ip,
575 false => instance.main_ip,
576 },
577 Self::CLIENT_METRICS_PORT,
578 iota_metrics::METRICS_ROUTE
579 );
580 (instance, path)
581 })
582 .collect()
583 }
584}