1use std::{
6 collections::HashMap,
7 fmt::{Debug, Display},
8 path::PathBuf,
9 str::FromStr,
10};
11
12use iota_swarm_config::genesis_config::GenesisConfig;
13use iota_types::{base_types::IotaAddress, multiaddr::Multiaddr};
14use serde::{Deserialize, Serialize};
15
16use super::{ProtocolCommands, ProtocolMetrics};
17use crate::{
18 ConsensusProtocol,
19 benchmark::{BenchmarkParameters, BenchmarkType},
20 client::Instance,
21 display,
22 settings::{BinaryBuildConfig, Settings, build_cargo_command, join_non_empty_strings},
23};
24
25#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct IotaBenchmarkType {
27 shared_objects_ratio: u16,
30}
31
32impl Debug for IotaBenchmarkType {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(f, "{}", self.shared_objects_ratio)
35 }
36}
37
38impl Display for IotaBenchmarkType {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "{}% shared objects", self.shared_objects_ratio)
41 }
42}
43
44impl FromStr for IotaBenchmarkType {
45 type Err = std::num::ParseIntError;
46
47 fn from_str(s: &str) -> Result<Self, Self::Err> {
48 Ok(Self {
49 shared_objects_ratio: s.parse::<u16>()?.min(100),
50 })
51 }
52}
53
54impl BenchmarkType for IotaBenchmarkType {}
55
56pub struct IotaProtocol {
58 working_dir: PathBuf,
59 use_fullnode_for_execution: bool,
60 use_precompiled_binaries: bool,
61 build_configs: HashMap<String, BinaryBuildConfig>,
62 enable_flamegraph: bool,
63}
64
65impl IotaProtocol {
66 pub fn new(settings: &Settings) -> Self {
68 Self {
69 working_dir: [&settings.working_dir, &iota_config::IOTA_CONFIG_DIR.into()]
70 .iter()
71 .collect(),
72 use_fullnode_for_execution: settings.use_fullnode_for_execution,
73 use_precompiled_binaries: settings.build_cache_enabled(),
74 build_configs: settings.build_configs.clone(),
75 enable_flamegraph: settings.enable_flamegraph,
76 }
77 }
78
79 fn run_binary_command<S1: AsRef<str>, S2: AsRef<str>>(
83 &self,
84 binary_name: &str,
85 setup_commands: &[S1],
86 additional_args: &[S2],
87 ) -> String {
88 if self.use_precompiled_binaries {
89 let binary_path = format!("./target/release/{binary_name}");
91 let binary_command = join_non_empty_strings(
92 &std::iter::once(binary_path.as_str())
93 .chain(additional_args.iter().map(|s| s.as_ref()))
94 .collect::<Vec<_>>(),
95 " ",
96 );
97
98 let all_commands: Vec<String> = setup_commands
99 .iter()
100 .map(|s| s.as_ref().to_string())
101 .chain(std::iter::once(binary_command))
102 .collect();
103
104 join_non_empty_strings(&all_commands, " && ")
105 } else {
106 let build_config = self
107 .build_configs
108 .get(binary_name)
109 .expect("No build config found for binary");
110 build_cargo_command(
111 "run",
112 build_config.toolchain.clone(),
113 build_config.features.clone(),
114 &[binary_name],
115 setup_commands,
116 additional_args,
117 )
118 }
119 }
120}
121
122impl ProtocolCommands<IotaBenchmarkType> for IotaProtocol {
123 fn protocol_dependencies(&self) -> Vec<&'static str> {
124 if !self.use_precompiled_binaries {
125 return vec!["sudo apt-get -y install libudev-dev libpq5 libpq-dev"];
126 }
127
128 vec![]
129 }
130
131 fn db_directories(&self) -> Vec<PathBuf> {
132 let authorities_db = [&self.working_dir, &iota_config::AUTHORITIES_DB_NAME.into()]
133 .iter()
134 .collect();
135 let consensus_db = [&self.working_dir, &iota_config::CONSENSUS_DB_NAME.into()]
136 .iter()
137 .collect();
138 vec![authorities_db, consensus_db]
139 }
140
141 fn genesis_command<'a, I>(
142 &self,
143 instances: I,
144 parameters: &BenchmarkParameters<IotaBenchmarkType>,
145 ) -> String
146 where
147 I: Iterator<Item = &'a Instance>,
148 {
149 let working_dir = self.working_dir.display();
150 let ips = instances
151 .map(|x| {
152 match parameters.use_internal_ip_address {
153 true => x.private_ip,
154 false => x.main_ip,
155 }
156 .to_string()
157 })
158 .collect::<Vec<_>>()
159 .join(" ");
160
161 let epoch_duration_flag = parameters
162 .epoch_duration_ms
163 .map(|epoch_duration_ms| format!("--epoch-duration-ms {epoch_duration_ms}"))
164 .unwrap_or_default();
165 let chain_start_timestamp_flag = parameters
166 .chain_start_timestamp_ms
167 .map(|timestamp_ms| format!("--chain-start-timestamp-ms {timestamp_ms}"))
168 .unwrap_or_default();
169 let additional_gas_accounts_flag = format!(
170 "--num-additional-gas-accounts {}",
171 parameters.additional_gas_accounts
172 );
173
174 let iota_command = self.run_binary_command(
175 "iota",
176 &[&format!("mkdir -p {working_dir}")],
177 &[
178 "genesis",
179 &format!("-f --working-dir {working_dir} --benchmark-ips {ips} --admin-interface-address=localhost:1337"),
180 &epoch_duration_flag,
181 &chain_start_timestamp_flag,
182 &additional_gas_accounts_flag,
183 ],
184 );
185
186 display::action(format!("\n Genesis Command: {iota_command}"));
187
188 iota_command
189 }
190
191 fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
192 where
193 I: IntoIterator<Item = Instance>,
194 {
195 vec![]
205 }
206
207 fn node_command<I>(
208 &self,
209 instances: I,
210 parameters: &BenchmarkParameters<IotaBenchmarkType>,
211 ) -> Vec<(Instance, String)>
212 where
213 I: IntoIterator<Item = Instance>,
214 {
215 let working_dir = self.working_dir.clone();
216 let network_addresses = Self::resolve_network_addresses(instances, parameters);
217
218 network_addresses
219 .into_iter()
220 .enumerate()
221 .map(|(i, (instance, network_address))| {
222 let validator_config =
223 iota_config::validator_config_file(network_address.clone(), i);
224 let config_path: PathBuf = working_dir.join(validator_config);
225 let max_pipeline_delay = parameters.max_pipeline_delay;
226 let iota_node_command = self.run_binary_command(
227 "iota-node",
228 &[
229 match parameters.consensus_protocol {
230 ConsensusProtocol::Starfish => "export CONSENSUS_PROTOCOL=starfish",
231 ConsensusProtocol::Mysticeti => "export CONSENSUS_PROTOCOL=mysticeti",
232 ConsensusProtocol::SwapEachEpoch => {
233 "export CONSENSUS_PROTOCOL=swap_each_epoch"
234 }
235 },
236 format!("export MAX_PIPELINE_DELAY={max_pipeline_delay}").as_str(),
237 if self.enable_flamegraph {
238 "export TRACE_FLAMEGRAPH=1"
239 } else {
240 ""
241 },
242 ],
243 &[&format!(
244 "--config-path {} --listen-address {}",
245 config_path.display(),
246 network_address.with_zero_ip()
247 )],
248 );
249
250 display::action(format!(
251 "\n Validator-node Command ({i}): {iota_node_command}"
252 ));
253
254 (instance, iota_node_command)
255 })
256 .collect()
257 }
258
259 fn fullnode_command<I>(
260 &self,
261 instances: I,
262 parameters: &BenchmarkParameters<IotaBenchmarkType>,
263 ) -> Vec<(Instance, String)>
264 where
265 I: IntoIterator<Item = Instance>,
266 {
267 let working_dir = self.working_dir.clone();
268
269 instances
270 .into_iter()
271 .enumerate()
272 .map(|(i, instance)| {
273 let config_path: PathBuf = working_dir.join(iota_config::IOTA_FULLNODE_CONFIG);
274 let fullnode_ip = match parameters.use_internal_ip_address {
275 true => &instance.private_ip,
276 false => &instance.main_ip,
277 };
278
279 let iota_node_command = self.run_binary_command(
280 "iota-node",
281 &[
282 format!(
285 "sed -i 's|listen-address: \\\"127.0.0.1:|listen-address: \\\"0.0.0.0:|' {0} && sed -i 's|external-address: /ip4/127.0.0.1/|external-address: /ip4/{1}/|' {0}",
286 config_path.display(),
287 fullnode_ip
288 ),
289 if self.enable_flamegraph {
290 "export TRACE_FLAMEGRAPH=1".to_string()
291 } else {
292 "".to_string()
293 },
294 ],
295 &[&format!("--config-path {}", config_path.display())],
296 );
297
298 display::action(format!("\n Full-node Command ({i}): {iota_node_command}"));
299
300 (instance, iota_node_command)
301 })
302 .collect()
303 }
304
305 fn client_command<I>(
306 &self,
307 instances: I,
308 parameters: &BenchmarkParameters<IotaBenchmarkType>,
309 ) -> Vec<(Instance, String)>
310 where
311 I: IntoIterator<Item = Instance>,
312 {
313 let genesis_path: PathBuf = [
314 &self.working_dir,
315 &iota_config::IOTA_GENESIS_FILENAME.into(),
316 ]
317 .iter()
318 .collect();
319 let keystore_path: PathBuf = [
320 &self.working_dir,
321 &iota_config::IOTA_BENCHMARK_GENESIS_GAS_KEYSTORE_FILENAME.into(),
322 ]
323 .iter()
324 .collect();
325
326 let committee_size = parameters.nodes;
327 let clients: Vec<_> = instances.into_iter().collect();
328 let load_share = parameters.load / clients.len();
329 let shared_counter = parameters.benchmark_type.shared_objects_ratio;
330 let transfer_objects = 100 - shared_counter;
331 let metrics_port = Self::CLIENT_METRICS_PORT;
332 let gas_keys =
334 GenesisConfig::benchmark_gas_keys(committee_size + parameters.additional_gas_accounts);
335 let client_key_offset = committee_size;
337
338 clients
339 .into_iter()
340 .enumerate()
341 .map(|(i, instance)| {
342 let genesis = genesis_path.display().to_string();
343 let keystore = keystore_path.display().to_string();
344 let gas_key = &gas_keys[client_key_offset + i];
346 let gas_address = IotaAddress::from(&gas_key.public());
347
348 let mut stress_args: Vec<String> = vec![
349 "--num-client-threads 24 --num-server-threads 1".to_string(),
350 "--local false --num-transfer-accounts 2".to_string(),
351 format!("--genesis-blob-path {genesis} --keystore-path {keystore}"),
352 format!("--primary-gas-owner-id {gas_address}"),
353 "bench".to_string(),
354 format!("--in-flight-ratio 30 --num-workers 24 --target-qps {load_share}"),
355 format!(
356 "--shared-counter {shared_counter} --transfer-object {transfer_objects}"
357 ),
358 "--shared-counter-hotness-factor 50".to_string(),
359 format!("--client-metric-host 0.0.0.0 --client-metric-port {metrics_port}"),
360 ];
361
362 if self.use_fullnode_for_execution {
363 stress_args.push("--use-fullnode-for-execution true".to_string());
364 stress_args.push("--fullnode-rpc-addresses http://127.0.0.1:9000".to_string());
365 }
366
367 let stress_command = self.run_binary_command(
368 "stress",
369 &["export MOVE_EXAMPLES_DIR=$(pwd)/examples/move"],
372 &stress_args,
373 );
374
375 display::action(format!("\n Stress Command ({i}): {stress_command}"));
376
377 (instance, stress_command)
378 })
379 .collect()
380 }
381}
382
383impl IotaProtocol {
384 const CLIENT_METRICS_PORT: u16 = GenesisConfig::BENCHMARKS_PORT_OFFSET + 2000;
385
386 pub fn resolve_network_addresses(
389 instances: impl IntoIterator<Item = Instance>,
390 parameters: &BenchmarkParameters<IotaBenchmarkType>,
391 ) -> Vec<(Instance, Multiaddr)> {
392 let instances: Vec<Instance> = instances.into_iter().collect();
393 let ips: Vec<_> = instances
394 .iter()
395 .map(|x| match parameters.use_internal_ip_address {
396 true => x.private_ip.to_string(),
397 false => x.main_ip.to_string(),
398 })
399 .collect();
400 let genesis_config = GenesisConfig::new_for_benchmarks(
401 &ips,
402 parameters.epoch_duration_ms,
403 parameters.chain_start_timestamp_ms,
404 Some(parameters.additional_gas_accounts),
405 );
406 let mut addresses = Vec::new();
407 if let Some(validator_configs) = genesis_config.validator_config_info.as_ref() {
408 for (i, validator_info) in validator_configs.iter().enumerate() {
409 let address = &validator_info.network_address;
410 addresses.push((instances[i].clone(), address.clone()));
411 }
412 }
413 addresses
414 }
415}
416
417impl ProtocolMetrics for IotaProtocol {
418 const BENCHMARK_DURATION: &'static str = "benchmark_duration";
419 const TOTAL_TRANSACTIONS: &'static str = "latency_s_count";
420 const LATENCY_BUCKETS: &'static str = "latency_s";
421 const LATENCY_SUM: &'static str = "latency_s_sum";
422 const LATENCY_SQUARED_SUM: &'static str = "latency_squared_s";
423
424 fn nodes_metrics_path<I, T>(
425 &self,
426 instances: I,
427 parameters: &BenchmarkParameters<T>,
428 ) -> Vec<(Instance, String)>
429 where
430 I: IntoIterator<Item = Instance>,
431 T: BenchmarkType,
432 {
433 let (ips, instances): (Vec<_>, Vec<_>) = instances
434 .into_iter()
435 .map(|x| {
436 (
437 match parameters.use_internal_ip_address {
438 true => x.private_ip,
439 false => x.main_ip,
440 }
441 .to_string(),
442 x,
443 )
444 })
445 .unzip();
446 GenesisConfig::new_for_benchmarks(
447 &ips,
448 parameters.epoch_duration_ms,
449 parameters.chain_start_timestamp_ms,
450 Some(parameters.additional_gas_accounts),
451 )
452 .validator_config_info
453 .expect("No validator in genesis")
454 .iter()
455 .zip(instances)
456 .map(|(config, instance)| {
457 let path = format!(
458 "{}:{}{}",
459 match parameters.use_internal_ip_address {
460 true => instance.private_ip,
461 false => instance.main_ip,
462 },
463 config.metrics_address.port(),
464 iota_metrics::METRICS_ROUTE
465 );
466 (instance, path)
467 })
468 .collect()
469 }
470
471 fn clients_metrics_path<I, T>(
472 &self,
473 instances: I,
474 parameters: &BenchmarkParameters<T>,
475 ) -> Vec<(Instance, String)>
476 where
477 I: IntoIterator<Item = Instance>,
478 T: BenchmarkType,
479 {
480 instances
481 .into_iter()
482 .map(|instance| {
483 let path = format!(
484 "{}:{}{}",
485 match parameters.use_internal_ip_address {
486 true => instance.private_ip,
487 false => instance.main_ip,
488 },
489 Self::CLIENT_METRICS_PORT,
490 iota_metrics::METRICS_ROUTE
491 );
492 (instance, path)
493 })
494 .collect()
495 }
496}