1use std::{
6 collections::HashMap,
7 fmt::{Debug, Display},
8 path::PathBuf,
9 str::FromStr,
10};
11
12use iota_swarm_config::genesis_config::GenesisConfig;
13use iota_types::{base_types::IotaAddress, multiaddr::Multiaddr};
14use serde::{Deserialize, Serialize};
15
16use super::{ProtocolCommands, ProtocolMetrics};
17use crate::{
18 ConsensusProtocol,
19 benchmark::{BenchmarkParameters, BenchmarkType},
20 client::Instance,
21 display,
22 settings::{BinaryBuildConfig, Settings, build_cargo_command, join_non_empty_strings},
23};
24
25#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct IotaBenchmarkType {
27 shared_objects_ratio: u16,
30}
31
32impl Debug for IotaBenchmarkType {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(f, "{}", self.shared_objects_ratio)
35 }
36}
37
38impl Display for IotaBenchmarkType {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "{}% shared objects", self.shared_objects_ratio)
41 }
42}
43
44impl FromStr for IotaBenchmarkType {
45 type Err = std::num::ParseIntError;
46
47 fn from_str(s: &str) -> Result<Self, Self::Err> {
48 Ok(Self {
49 shared_objects_ratio: s.parse::<u16>()?.min(100),
50 })
51 }
52}
53
54impl BenchmarkType for IotaBenchmarkType {}
55
56pub struct IotaProtocol {
58 working_dir: PathBuf,
59 use_fullnode_for_execution: bool,
60 use_precompiled_binaries: bool,
61 build_configs: HashMap<String, BinaryBuildConfig>,
62 enable_flamegraph: bool,
63}
64
65impl IotaProtocol {
66 pub fn new(settings: &Settings) -> Self {
68 Self {
69 working_dir: [&settings.working_dir, &iota_config::IOTA_CONFIG_DIR.into()]
70 .iter()
71 .collect(),
72 use_fullnode_for_execution: settings.use_fullnode_for_execution,
73 use_precompiled_binaries: settings.build_cache_enabled(),
74 build_configs: settings.build_configs.clone(),
75 enable_flamegraph: settings.enable_flamegraph,
76 }
77 }
78
79 fn run_binary_command<S1: AsRef<str>, S2: AsRef<str>>(
83 &self,
84 binary_name: &str,
85 setup_commands: &[S1],
86 additional_args: &[S2],
87 ) -> String {
88 if self.use_precompiled_binaries {
89 let binary_path = format!("./target/release/{binary_name}");
91 let binary_command = join_non_empty_strings(
92 &std::iter::once(binary_path.as_str())
93 .chain(additional_args.iter().map(|s| s.as_ref()))
94 .collect::<Vec<_>>(),
95 " ",
96 );
97
98 let all_commands: Vec<String> = setup_commands
99 .iter()
100 .map(|s| s.as_ref().to_string())
101 .chain(std::iter::once(binary_command))
102 .collect();
103
104 join_non_empty_strings(&all_commands, " && ")
105 } else {
106 let build_config = self
107 .build_configs
108 .get(binary_name)
109 .expect("No build config found for binary");
110 build_cargo_command(
111 "run",
112 build_config.toolchain.clone(),
113 build_config.features.clone(),
114 &[binary_name],
115 setup_commands,
116 additional_args,
117 )
118 }
119 }
120}
121
122impl ProtocolCommands<IotaBenchmarkType> for IotaProtocol {
123 fn protocol_dependencies(&self) -> Vec<&'static str> {
124 if !self.use_precompiled_binaries {
125 return vec!["sudo apt-get -y install libudev-dev libpq5 libpq-dev"];
126 }
127
128 vec![]
129 }
130
131 fn db_directories(&self) -> Vec<PathBuf> {
132 let authorities_db = self.working_dir.join(iota_config::AUTHORITIES_DB_NAME);
133 let consensus_db = self.working_dir.join(iota_config::CONSENSUS_DB_NAME);
134 let full_node_db: PathBuf = self.working_dir.join(iota_config::FULL_NODE_DB_PATH);
135 vec![authorities_db, consensus_db, full_node_db]
136 }
137
138 fn genesis_command<'a, I>(
139 &self,
140 instances: I,
141 parameters: &BenchmarkParameters<IotaBenchmarkType>,
142 ) -> String
143 where
144 I: Iterator<Item = &'a Instance>,
145 {
146 let working_dir = self.working_dir.display();
147 let ips = instances
148 .map(|x| {
149 match parameters.use_internal_ip_address {
150 true => x.private_ip,
151 false => x.main_ip,
152 }
153 .to_string()
154 })
155 .collect::<Vec<_>>()
156 .join(" ");
157
158 let epoch_duration_flag = parameters
159 .epoch_duration_ms
160 .map(|epoch_duration_ms| format!("--epoch-duration-ms {epoch_duration_ms}"))
161 .unwrap_or_default();
162 let chain_start_timestamp_flag = parameters
163 .chain_start_timestamp_ms
164 .map(|timestamp_ms| format!("--chain-start-timestamp-ms {timestamp_ms}"))
165 .unwrap_or_default();
166 let additional_gas_accounts_flag = format!(
167 "--num-additional-gas-accounts {}",
168 parameters.additional_gas_accounts
169 );
170
171 let iota_command = self.run_binary_command(
172 "iota",
173 &[&format!("mkdir -p {working_dir}")],
174 &[
175 "genesis",
176 &format!("-f --working-dir {working_dir} --benchmark-ips {ips} --admin-interface-address=localhost:1337"),
177 &epoch_duration_flag,
178 &chain_start_timestamp_flag,
179 &additional_gas_accounts_flag,
180 ],
181 );
182
183 iota_command
184 }
185
186 fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
187 where
188 I: IntoIterator<Item = Instance>,
189 {
190 vec![]
200 }
201
202 fn node_command<I>(
203 &self,
204 instances: I,
205 parameters: &BenchmarkParameters<IotaBenchmarkType>,
206 ) -> Vec<(Instance, String)>
207 where
208 I: IntoIterator<Item = Instance>,
209 {
210 let working_dir = self.working_dir.clone();
211 let network_addresses = Self::resolve_network_addresses(instances, parameters);
212
213 network_addresses
214 .into_iter()
215 .enumerate()
216 .map(|(i, (instance, network_address))| {
217 let validator_config =
218 iota_config::validator_config_file(network_address.clone(), i);
219 let config_path: PathBuf = working_dir.join(validator_config);
220 let max_pipeline_delay = parameters.max_pipeline_delay;
221 let iota_node_command = self.run_binary_command(
222 "iota-node",
223 &[
224 match parameters.consensus_protocol {
225 ConsensusProtocol::Starfish => "export CONSENSUS_PROTOCOL=starfish",
226 ConsensusProtocol::Mysticeti => "export CONSENSUS_PROTOCOL=mysticeti",
227 ConsensusProtocol::SwapEachEpoch => {
228 "export CONSENSUS_PROTOCOL=swap_each_epoch"
229 }
230 },
231 format!("export MAX_PIPELINE_DELAY={max_pipeline_delay}").as_str(),
232 if self.enable_flamegraph {
233 "export TRACE_FLAMEGRAPH=1"
234 } else {
235 ""
236 },
237 ],
238 &[&format!(
239 "--config-path {} --listen-address {}",
240 config_path.display(),
241 network_address.with_zero_ip()
242 )],
243 );
244
245 display::action(format!(
246 "\n Validator-node Command ({i}): {iota_node_command}"
247 ));
248
249 (instance, iota_node_command)
250 })
251 .collect()
252 }
253
254 fn fullnode_command<I>(
255 &self,
256 instances: I,
257 parameters: &BenchmarkParameters<IotaBenchmarkType>,
258 ) -> Vec<(Instance, String)>
259 where
260 I: IntoIterator<Item = Instance>,
261 {
262 let working_dir = self.working_dir.clone();
263
264 instances
265 .into_iter()
266 .enumerate()
267 .map(|(i, instance)| {
268 let config_path: PathBuf = working_dir.join(iota_config::IOTA_FULLNODE_CONFIG);
269 let fullnode_ip = match parameters.use_internal_ip_address {
270 true => &instance.private_ip,
271 false => &instance.main_ip,
272 };
273
274 let iota_node_command = self.run_binary_command(
275 "iota-node",
276 &[
277 format!(
280 "sed -i 's|listen-address: \\\"127.0.0.1:|listen-address: \\\"0.0.0.0:|' {0} && sed -i 's|external-address: /ip4/127.0.0.1/|external-address: /ip4/{1}/|' {0}",
281 config_path.display(),
282 fullnode_ip
283 ),
284 if self.enable_flamegraph {
285 "export TRACE_FLAMEGRAPH=1".to_string()
286 } else {
287 "".to_string()
288 },
289 ],
290 &[&format!("--config-path {}", config_path.display())],
291 );
292
293 display::action(format!("\n Full-node Command ({i}): {iota_node_command}"));
294
295 (instance, iota_node_command)
296 })
297 .collect()
298 }
299
300 fn client_command<I>(
301 &self,
302 instances: I,
303 parameters: &BenchmarkParameters<IotaBenchmarkType>,
304 ) -> Vec<(Instance, String)>
305 where
306 I: IntoIterator<Item = Instance>,
307 {
308 let genesis_path: PathBuf = [
309 &self.working_dir,
310 &iota_config::IOTA_GENESIS_FILENAME.into(),
311 ]
312 .iter()
313 .collect();
314 let keystore_path: PathBuf = [
315 &self.working_dir,
316 &iota_config::IOTA_BENCHMARK_GENESIS_GAS_KEYSTORE_FILENAME.into(),
317 ]
318 .iter()
319 .collect();
320
321 let committee_size = parameters.nodes;
322 let clients: Vec<_> = instances.into_iter().collect();
323 let load_share = parameters.load / clients.len();
324 let shared_counter = parameters.benchmark_type.shared_objects_ratio;
325 let transfer_objects = 100 - shared_counter;
326 let metrics_port = Self::CLIENT_METRICS_PORT;
327 let gas_keys =
329 GenesisConfig::benchmark_gas_keys(committee_size + parameters.additional_gas_accounts);
330 let client_key_offset = committee_size;
332
333 clients
334 .into_iter()
335 .enumerate()
336 .map(|(i, instance)| {
337 let genesis = genesis_path.display().to_string();
338 let keystore = keystore_path.display().to_string();
339 let gas_key = &gas_keys[client_key_offset + i];
341 let gas_address = IotaAddress::from(&gas_key.public());
342
343 let mut stress_args: Vec<String> = vec![
344 "--num-client-threads 24 --num-server-threads 1".to_string(),
345 "--local false --num-transfer-accounts 2".to_string(),
346 format!("--genesis-blob-path {genesis} --keystore-path {keystore}"),
347 format!("--primary-gas-owner-id {gas_address}"),
348 "bench".to_string(),
349 format!("--in-flight-ratio 30 --num-workers 24 --target-qps {load_share}"),
350 format!(
351 "--shared-counter {shared_counter} --transfer-object {transfer_objects}"
352 ),
353 format!("--client-metric-host 0.0.0.0 --client-metric-port {metrics_port}"),
354 ];
355
356 let hotness_factor = parameters.shared_counter_hotness_factor.unwrap_or(50);
358 stress_args.push(format!("--shared-counter-hotness-factor {hotness_factor}"));
359
360 if let Some(num_counters) = parameters.num_shared_counters {
362 stress_args.push(format!("--num-shared-counters {num_counters}"));
363 }
364
365 if self.use_fullnode_for_execution {
366 stress_args.push("--use-fullnode-for-execution true".to_string());
367 stress_args.push("--fullnode-rpc-addresses http://127.0.0.1:9000".to_string());
368 }
369
370 let stress_command = self.run_binary_command(
371 "stress",
372 &[
375 "export MOVE_EXAMPLES_DIR=$(pwd)/examples/move",
376 "export RUST_LOG=iota_benchmark=debug",
377 ],
378 &stress_args,
379 );
380
381 display::action(format!("\n Stress Command ({i}): {stress_command}"));
382
383 (instance, stress_command)
384 })
385 .collect()
386 }
387}
388
389impl IotaProtocol {
390 const CLIENT_METRICS_PORT: u16 = GenesisConfig::BENCHMARKS_PORT_OFFSET + 2000;
391
392 pub fn resolve_network_addresses(
395 instances: impl IntoIterator<Item = Instance>,
396 parameters: &BenchmarkParameters<IotaBenchmarkType>,
397 ) -> Vec<(Instance, Multiaddr)> {
398 let instances: Vec<Instance> = instances.into_iter().collect();
399 let ips: Vec<_> = instances
400 .iter()
401 .map(|x| match parameters.use_internal_ip_address {
402 true => x.private_ip.to_string(),
403 false => x.main_ip.to_string(),
404 })
405 .collect();
406 let genesis_config = GenesisConfig::new_for_benchmarks(
407 &ips,
408 parameters.epoch_duration_ms,
409 parameters.chain_start_timestamp_ms,
410 Some(parameters.additional_gas_accounts),
411 u64::MAX,
412 );
413 let mut addresses = Vec::new();
414 if let Some(validator_configs) = genesis_config.validator_config_info.as_ref() {
415 for (i, validator_info) in validator_configs.iter().enumerate() {
416 let address = &validator_info.network_address;
417 addresses.push((instances[i].clone(), address.clone()));
418 }
419 }
420 addresses
421 }
422}
423
424impl ProtocolMetrics for IotaProtocol {
425 const BENCHMARK_DURATION: &'static str = "benchmark_duration";
426 const TOTAL_TRANSACTIONS: &'static str = "latency_s_count";
427 const LATENCY_BUCKETS: &'static str = "latency_s";
428 const LATENCY_SUM: &'static str = "latency_s_sum";
429 const LATENCY_SQUARED_SUM: &'static str = "latency_squared_s";
430
431 fn nodes_metrics_path<I>(
432 &self,
433 instances: I,
434 use_internal_ip_address: bool,
435 ) -> Vec<(Instance, String)>
436 where
437 I: IntoIterator<Item = Instance>,
438 {
439 let instances = instances.into_iter().collect::<Vec<_>>();
440 let ips = (0..instances.len())
441 .map(|_| "0.0.0.0".to_string())
442 .collect::<Vec<_>>();
443 GenesisConfig::new_for_benchmarks(&ips, None, None, None, u64::MAX)
447 .validator_config_info
448 .expect("No validator in genesis")
449 .iter()
450 .zip(instances)
451 .map(|(config, instance)| {
452 let path = format!(
453 "{}:{}{}",
454 match use_internal_ip_address {
455 true => instance.private_ip,
456 false => instance.main_ip,
457 },
458 config.metrics_address.port(),
459 iota_metrics::METRICS_ROUTE
460 );
461 (instance, path)
462 })
463 .collect()
464 }
465
466 fn clients_metrics_path<I>(
467 &self,
468 instances: I,
469 use_internal_ip_address: bool,
470 ) -> Vec<(Instance, String)>
471 where
472 I: IntoIterator<Item = Instance>,
473 {
474 instances
475 .into_iter()
476 .map(|instance| {
477 let path = format!(
478 "{}:{}{}",
479 match use_internal_ip_address {
480 true => instance.private_ip,
481 false => instance.main_ip,
482 },
483 Self::CLIENT_METRICS_PORT,
484 iota_metrics::METRICS_ROUTE
485 );
486 (instance, path)
487 })
488 .collect()
489 }
490}