iota_aws_orchestrator/protocol/
iota.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, path::PathBuf, str::FromStr};
6
7use eyre::Result;
8use iota_swarm_config::genesis_config::GenesisConfig;
9use iota_types::{base_types::IotaAddress, multiaddr::Multiaddr};
10use serde::{Deserialize, Serialize};
11
12use super::{ProtocolCommands, ProtocolMetrics};
13use crate::{
14    ConsensusProtocol,
15    benchmark::{BenchmarkParameters, BenchmarkType},
16    client::Instance,
17    display,
18    settings::{BinaryBuildConfig, Settings, build_cargo_command, join_non_empty_strings},
19};
20
21#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
22pub enum IotaBenchmarkType {
23    /// Percentage of shared vs owned objects; 0 means only owned objects and
24    /// 100 means only shared objects.
25    SharedObjectsRatio(u16),
26    /// Benchmark for Abstract Account functionality.
27    AbstractAccountBench,
28}
29
30impl Default for IotaBenchmarkType {
31    fn default() -> Self {
32        Self::SharedObjectsRatio(0)
33    }
34}
35
36impl std::fmt::Debug for IotaBenchmarkType {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        match self {
39            Self::SharedObjectsRatio(shared_objects_ratio) => write!(f, "{shared_objects_ratio}"),
40            Self::AbstractAccountBench => write!(f, "abstract_account_bench"),
41        }
42    }
43}
44
45impl std::fmt::Display for IotaBenchmarkType {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        match self {
48            Self::SharedObjectsRatio(shared_objects_ratio) => {
49                write!(f, "bench ({}% shared objects)", shared_objects_ratio)
50            }
51            Self::AbstractAccountBench => write!(f, "abstract-account-bench"),
52        }
53    }
54}
55
56impl FromStr for IotaBenchmarkType {
57    type Err = String;
58
59    fn from_str(s: &str) -> Result<Self, Self::Err> {
60        let v = s.trim().to_ascii_lowercase();
61
62        // Backward compatible: numeric => bench(shared_ratio)
63        if let Ok(n) = v.parse::<u16>() {
64            return Ok(Self::SharedObjectsRatio(n.min(100)));
65        }
66
67        match v.as_str() {
68            "abstract-account-bench" => Ok(Self::AbstractAccountBench),
69            _ => Err(format!(
70                "Unknown benchmark type '{s}'. Expected 0..=100 or abstract-account-bench"
71            )),
72        }
73    }
74}
75
76impl BenchmarkType for IotaBenchmarkType {}
77
78/// All configurations information to run an IOTA client or validator.
79pub struct IotaProtocol {
80    working_dir: PathBuf,
81    use_fullnode_for_execution: bool,
82    use_precompiled_binaries: bool,
83    build_configs: HashMap<String, BinaryBuildConfig>,
84    enable_flamegraph: bool,
85}
86
87impl IotaProtocol {
88    /// Make a new instance of the IOTA protocol commands generator.
89    pub fn new(settings: &Settings) -> Self {
90        Self {
91            working_dir: [&settings.working_dir, &iota_config::IOTA_CONFIG_DIR.into()]
92                .iter()
93                .collect(),
94            use_fullnode_for_execution: settings.use_fullnode_for_execution,
95            use_precompiled_binaries: settings.build_cache_enabled(),
96            build_configs: settings.build_configs.clone(),
97            enable_flamegraph: settings.enable_flamegraph,
98        }
99    }
100
101    /// Build the command to run a binary, either using precompiled binary or
102    /// cargo run. Returns the command string with proper toolchain and
103    /// features configured.
104    fn run_binary_command<S1: AsRef<str>, S2: AsRef<str>>(
105        &self,
106        binary_name: &str,
107        setup_commands: &[S1],
108        additional_args: &[S2],
109    ) -> String {
110        if self.use_precompiled_binaries {
111            // The precompiled binary is located in the working directory
112            let binary_path = format!("./target/release/{binary_name}");
113            let binary_command = join_non_empty_strings(
114                &std::iter::once(binary_path.as_str())
115                    .chain(additional_args.iter().map(|s| s.as_ref()))
116                    .collect::<Vec<_>>(),
117                " ",
118            );
119
120            let all_commands: Vec<String> = setup_commands
121                .iter()
122                .map(|s| s.as_ref().to_string())
123                .chain(std::iter::once(binary_command))
124                .collect();
125
126            join_non_empty_strings(&all_commands, " && ")
127        } else {
128            let build_config = self
129                .build_configs
130                .get(binary_name)
131                .expect("No build config found for binary");
132            build_cargo_command(
133                "run",
134                build_config.toolchain.clone(),
135                build_config.features.clone(),
136                &[binary_name],
137                setup_commands,
138                additional_args,
139            )
140        }
141    }
142
143    fn otel_env(
144        &self,
145        parameters: &BenchmarkParameters<IotaBenchmarkType>,
146        service_name: &str,
147    ) -> Vec<String> {
148        let Some(otel) = &parameters.otel else {
149            return vec![];
150        };
151        // TRACE_FILTER values can be implemented as params as well. For now, we keep
152        // them fixed to trace handle_transaction and process_certificate which are the
153        // most relevant spans for benchmarks.
154        vec![
155            format!("export OTEL_EXPORTER_OTLP_ENDPOINT={}", otel.otlp_endpoint),
156            "export TRACE_FILTER=[handle_transaction]=trace,[process_certificate]=trace"
157                .to_string(),
158            format!(
159                "export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT={}",
160                otel.otlp_endpoint
161            ),
162            format!("export OTEL_EXPORTER_OTLP_PROTOCOL={}", otel.protocol),
163            format!("export OTEL_TRACES_SAMPLER={}", otel.sampler),
164            format!("export OTEL_TRACES_SAMPLER_ARG={}", otel.sampler_arg),
165            format!("export OTEL_SERVICE_NAME={service_name}"),
166            format!("export OTEL_RESOURCE_ATTRIBUTES=service.name={service_name}"),
167        ]
168    }
169}
170
171impl ProtocolCommands<IotaBenchmarkType> for IotaProtocol {
172    fn protocol_dependencies(&self) -> Vec<&'static str> {
173        if !self.use_precompiled_binaries {
174            return vec!["sudo apt-get -y install libudev-dev libpq5 libpq-dev"];
175        }
176
177        vec![]
178    }
179
180    fn db_directories(&self) -> Vec<PathBuf> {
181        let authorities_db = self.working_dir.join(iota_config::AUTHORITIES_DB_NAME);
182        let consensus_db = self.working_dir.join(iota_config::CONSENSUS_DB_NAME);
183        let full_node_db: PathBuf = self.working_dir.join(iota_config::FULL_NODE_DB_PATH);
184        vec![authorities_db, consensus_db, full_node_db]
185    }
186
187    fn genesis_command<'a, I>(
188        &self,
189        instances: I,
190        parameters: &BenchmarkParameters<IotaBenchmarkType>,
191    ) -> String
192    where
193        I: Iterator<Item = &'a Instance>,
194    {
195        let working_dir = self.working_dir.display();
196        let ips = instances
197            .map(|x| {
198                match parameters.use_internal_ip_address {
199                    true => x.private_ip,
200                    false => x.main_ip,
201                }
202                .to_string()
203            })
204            .collect::<Vec<_>>()
205            .join(" ");
206
207        let epoch_duration_flag = parameters
208            .epoch_duration_ms
209            .map(|epoch_duration_ms| format!("--epoch-duration-ms {epoch_duration_ms}"))
210            .unwrap_or_default();
211        let chain_start_timestamp_flag = parameters
212            .chain_start_timestamp_ms
213            .map(|timestamp_ms| format!("--chain-start-timestamp-ms {timestamp_ms}"))
214            .unwrap_or_default();
215        let additional_gas_accounts_flag = format!(
216            "--num-additional-gas-accounts {}",
217            parameters.additional_gas_accounts
218        );
219
220        let iota_command = self.run_binary_command(
221            "iota",
222            &[
223                // Set protocol config override to disable validator subsidies for benchmarks
224                "export IOTA_PROTOCOL_CONFIG_OVERRIDE_ENABLE=1",
225                "export IOTA_PROTOCOL_CONFIG_OVERRIDE_validator_target_reward=0",
226                &format!("mkdir -p {working_dir}")
227            ],
228            &[
229                "genesis",
230                &format!("-f --working-dir {working_dir} --benchmark-ips {ips} --admin-interface-address=localhost:1337"),
231                &epoch_duration_flag,
232                &chain_start_timestamp_flag,
233                &additional_gas_accounts_flag,
234            ],
235        );
236
237        iota_command
238    }
239
240    fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
241    where
242        I: IntoIterator<Item = Instance>,
243    {
244        // instances
245        //     .into_iter()
246        //     .map(|i| {
247        //         (
248        //             i,
249        //             "tail -f --pid=$(pidof iota) -f /dev/null; tail -100
250        // node.log".to_string(),         )
251        //     })
252        //     .collect()
253        vec![]
254    }
255
256    fn node_command<I>(
257        &self,
258        instances: I,
259        parameters: &BenchmarkParameters<IotaBenchmarkType>,
260    ) -> Vec<(Instance, String)>
261    where
262        I: IntoIterator<Item = Instance>,
263    {
264        let working_dir = self.working_dir.clone();
265        let network_addresses = Self::resolve_network_addresses(instances, parameters);
266
267        network_addresses
268            .into_iter()
269            .enumerate()
270            .map(|(i, (instance, network_address))| {
271                let validator_config =
272                    iota_config::validator_config_file(network_address.clone(), i);
273                let config_path: PathBuf = working_dir.join(validator_config);
274                let max_pipeline_delay = parameters.max_pipeline_delay;
275
276                let mut setup: Vec<String> = vec![
277                    match parameters.consensus_protocol {
278                        ConsensusProtocol::Starfish => {
279                            "export CONSENSUS_PROTOCOL=starfish".to_string()
280                        }
281                        ConsensusProtocol::Mysticeti => {
282                            "export CONSENSUS_PROTOCOL=mysticeti".to_string()
283                        }
284                        ConsensusProtocol::SwapEachEpoch => {
285                            "export CONSENSUS_PROTOCOL=swap_each_epoch".to_string()
286                        }
287                    },
288                    format!("export MAX_PIPELINE_DELAY={max_pipeline_delay}"),
289                    // Set protocol config override to disable validator subsidies for
290                    // benchmarks
291                    format!("export IOTA_PROTOCOL_CONFIG_OVERRIDE_ENABLE=1"),
292                    format!("export IOTA_PROTOCOL_CONFIG_OVERRIDE_validator_target_reward=0"),
293                ];
294
295                if self.enable_flamegraph {
296                    setup.push("export TRACE_FLAMEGRAPH=1".to_string());
297                }
298
299                setup.extend(self.otel_env(parameters, &format!("iota-validator-{i}")));
300
301                let iota_node_command = self.run_binary_command(
302                    "iota-node",
303                    &setup,
304                    &[format!(
305                        "--config-path {} --listen-address {}",
306                        config_path.display(),
307                        network_address.with_zero_ip()
308                    )],
309                );
310
311                display::action(format!(
312                    "\n Validator-node Command ({i}): {iota_node_command}"
313                ));
314
315                (instance, iota_node_command)
316            })
317            .collect()
318    }
319
320    fn fullnode_command<I>(
321        &self,
322        instances: I,
323        parameters: &BenchmarkParameters<IotaBenchmarkType>,
324    ) -> Vec<(Instance, String)>
325    where
326        I: IntoIterator<Item = Instance>,
327    {
328        let working_dir = self.working_dir.clone();
329
330        instances
331            .into_iter()
332            .enumerate()
333            .map(|(i, instance)| {
334                let config_path: PathBuf = working_dir.join(iota_config::IOTA_FULLNODE_CONFIG);
335                let fullnode_ip = match parameters.use_internal_ip_address {
336                    true => &instance.private_ip,
337                    false => &instance.main_ip,
338                };
339
340                let mut setup = vec![
341                        format!(
342                            "sed -i 's|listen-address: \\\"127.0.0.1:|listen-address: \\\"0.0.0.0:|' {0} && sed -i 's|external-address: /ip4/127.0.0.1/|external-address: /ip4/{1}/|' {0}",
343                            config_path.display(),
344                            fullnode_ip
345                        ),
346                        if self.enable_flamegraph {
347                            "export TRACE_FLAMEGRAPH=1".to_string()
348                        } else {
349                            "".to_string()
350                        },
351                        // Set protocol config override to disable validator subsidies for benchmarks
352                        "export IOTA_PROTOCOL_CONFIG_OVERRIDE_ENABLE=1".to_string(),
353                        "export IOTA_PROTOCOL_CONFIG_OVERRIDE_validator_target_reward=0".to_string(),
354                ];
355                setup.extend(self.otel_env(parameters, &format!("iota-node-{i}")));
356
357                let iota_node_command = self.run_binary_command(
358                    "iota-node",
359                    &setup,
360                    &[&format!("--config-path {}", config_path.display())],
361                );
362
363                display::action(format!("\n Full-node Command ({i}): {iota_node_command}"));
364
365                (instance, iota_node_command)
366            })
367            .collect()
368    }
369
370    fn client_command<I>(
371        &self,
372        instances: I,
373        parameters: &BenchmarkParameters<IotaBenchmarkType>,
374    ) -> Vec<(Instance, String)>
375    where
376        I: IntoIterator<Item = Instance>,
377    {
378        let genesis_path: PathBuf = [
379            &self.working_dir,
380            &iota_config::IOTA_GENESIS_FILENAME.into(),
381        ]
382        .iter()
383        .collect();
384        let keystore_path: PathBuf = [
385            &self.working_dir,
386            &iota_config::IOTA_BENCHMARK_GENESIS_GAS_KEYSTORE_FILENAME.into(),
387        ]
388        .iter()
389        .collect();
390
391        let committee_size = parameters.nodes;
392        let clients: Vec<_> = instances.into_iter().collect();
393        let load_share = parameters.load / clients.len();
394        let metrics_port = Self::CLIENT_METRICS_PORT;
395        // Get gas keys for all validators and clients
396        let gas_keys =
397            GenesisConfig::benchmark_gas_keys(committee_size + parameters.additional_gas_accounts);
398        // Validators use the first `nodes` keys, so clients should start after that
399        let client_key_offset = committee_size;
400
401        clients
402            .into_iter()
403            .enumerate()
404            .map(|(i, instance)| {
405                let genesis = genesis_path.display().to_string();
406                let keystore = keystore_path.display().to_string();
407                // Offset client key index to avoid colliding with validator keys
408                let gas_key = &gas_keys[client_key_offset + i];
409                let gas_address = IotaAddress::from(&gas_key.public());
410
411                let client_threads = parameters.stress_num_client_threads;
412                let server_threads = parameters.stress_num_server_threads;
413
414                let mut stress_args: Vec<String> = vec![
415                    format!("--num-client-threads {client_threads} --num-server-threads {server_threads}"),
416                    "--local false --num-transfer-accounts 2".to_string(),
417                    format!("--genesis-blob-path {genesis} --keystore-path {keystore}"),
418                    format!("--primary-gas-owner-id {gas_address}"),
419                    // Run interval param
420                    parameters.run_interval.as_stress_flag(),
421                    format!("--client-metric-host 0.0.0.0 --client-metric-port {metrics_port}"),
422                    if let Some(stats_path) = &parameters.benchmark_stats_path {
423                        format!("--benchmark-stats-path {stats_path}")
424                    } else {
425                        "".to_string()
426                    },
427                ];
428
429                match parameters.benchmark_type {
430                    IotaBenchmarkType::SharedObjectsRatio(shared_objects_ratio) => {
431                        let transfer_objects = 100 - shared_objects_ratio;
432                        let hotness_factor = parameters.shared_counter_hotness_factor.unwrap_or(50);
433                        stress_args.push("bench".to_string());
434                        stress_args.push(format!("--target-qps {load_share}"));
435                        stress_args.push(format!("--num-workers {}", parameters.stress_num_workers));
436                        stress_args.push(format!("--in-flight-ratio {}", parameters.stress_in_flight_ratio));
437                        stress_args.push(format!("--shared-counter {shared_objects_ratio} --transfer-object {transfer_objects}"));
438                        stress_args.push(format!("--shared-counter-hotness-factor {hotness_factor}"));
439                        if let Some(num_counters) = parameters.num_shared_counters {
440                            stress_args.push(format!("--num-shared-counters {num_counters}"));
441                        }
442                    }
443
444                    IotaBenchmarkType::AbstractAccountBench => {
445                        stress_args.push("abstract-account-bench".to_string());
446                        stress_args.push(format!("--authenticator {}", parameters.aa_authenticator));
447                        stress_args.push(format!("--tx-payload-obj-type {}", parameters.tx_payload_obj_type));
448                        stress_args.push(format!("--target-qps {load_share}"));
449                        stress_args.push(format!("--num-workers {}", parameters.stress_num_workers));
450                        stress_args.push(format!("--in-flight-ratio {}", parameters.stress_in_flight_ratio));
451                        stress_args.push(format!("--split-amount {}", parameters.aa_split_amount));
452                        if parameters.should_fail {
453                            stress_args.push("--should-fail".to_string());
454                        }
455                    }
456                }
457
458                if self.use_fullnode_for_execution {
459                    stress_args.push("--use-fullnode-for-execution true".to_string());
460                    stress_args.push("--fullnode-rpc-addresses http://127.0.0.1:9000".to_string());
461                }
462
463                let mut setup = vec![
464                    "export MOVE_EXAMPLES_DIR=$(pwd)/examples/move".to_string(),
465                    "export RUST_LOG=iota_benchmark=debug".to_string(),
466                ];
467
468                setup.extend(self.otel_env(parameters, &format!("iota-stress-{i}")));
469
470                let stress_command = self.run_binary_command("stress", &setup, &stress_args);
471
472                display::action(format!("\n Stress Command ({i}): {stress_command}"));
473
474                (instance, stress_command)
475            })
476            .collect()
477    }
478}
479
480impl IotaProtocol {
481    const CLIENT_METRICS_PORT: u16 = GenesisConfig::BENCHMARKS_PORT_OFFSET + 2000;
482
483    /// Creates the network addresses in multi address format for the instances.
484    /// It returns the Instance and the corresponding address.
485    pub fn resolve_network_addresses(
486        instances: impl IntoIterator<Item = Instance>,
487        parameters: &BenchmarkParameters<IotaBenchmarkType>,
488    ) -> Vec<(Instance, Multiaddr)> {
489        let instances: Vec<Instance> = instances.into_iter().collect();
490        let ips: Vec<_> = instances
491            .iter()
492            .map(|x| match parameters.use_internal_ip_address {
493                true => x.private_ip.to_string(),
494                false => x.main_ip.to_string(),
495            })
496            .collect();
497
498        // `u64::MAX - 1` is the max total supply value acceptable by
499        // `iota::balance::increase_supply`
500        let genesis_config = GenesisConfig::new_for_benchmarks(
501            &ips,
502            parameters.epoch_duration_ms,
503            parameters.chain_start_timestamp_ms,
504            Some(parameters.additional_gas_accounts),
505            u64::MAX - 1,
506        );
507        let mut addresses = Vec::new();
508        if let Some(validator_configs) = genesis_config.validator_config_info.as_ref() {
509            for (i, validator_info) in validator_configs.iter().enumerate() {
510                let address = &validator_info.network_address;
511                addresses.push((instances[i].clone(), address.clone()));
512            }
513        }
514        addresses
515    }
516}
517
518impl ProtocolMetrics for IotaProtocol {
519    const BENCHMARK_DURATION: &'static str = "benchmark_duration";
520    const TOTAL_TRANSACTIONS: &'static str = "latency_s_count";
521    const LATENCY_BUCKETS: &'static str = "latency_s";
522    const LATENCY_SUM: &'static str = "latency_s_sum";
523    const LATENCY_SQUARED_SUM: &'static str = "latency_squared_s";
524
525    fn nodes_metrics_path<I>(
526        &self,
527        instances: I,
528        use_internal_ip_address: bool,
529    ) -> Vec<(Instance, String)>
530    where
531        I: IntoIterator<Item = Instance>,
532    {
533        let instances = instances.into_iter().collect::<Vec<_>>();
534        let ips = (0..instances.len())
535            .map(|_| "0.0.0.0".to_string())
536            .collect::<Vec<_>>();
537        // From GenesisConfig we only need validators' `metrics_address` port which is
538        // computed from validator's offset in `ips`. The values of (the rest
539        // of) the arguments are irrelevant.
540        GenesisConfig::new_for_benchmarks(&ips, None, None, None, u64::MAX)
541            .validator_config_info
542            .expect("No validator in genesis")
543            .iter()
544            .zip(instances)
545            .map(|(config, instance)| {
546                let path = format!(
547                    "{}:{}{}",
548                    match use_internal_ip_address {
549                        true => instance.private_ip,
550                        false => instance.main_ip,
551                    },
552                    config.metrics_address.port(),
553                    iota_metrics::METRICS_ROUTE
554                );
555                (instance, path)
556            })
557            .collect()
558    }
559
560    fn clients_metrics_path<I>(
561        &self,
562        instances: I,
563        use_internal_ip_address: bool,
564    ) -> Vec<(Instance, String)>
565    where
566        I: IntoIterator<Item = Instance>,
567    {
568        instances
569            .into_iter()
570            .map(|instance| {
571                let path = format!(
572                    "{}:{}{}",
573                    match use_internal_ip_address {
574                        true => instance.private_ip,
575                        false => instance.main_ip,
576                    },
577                    Self::CLIENT_METRICS_PORT,
578                    iota_metrics::METRICS_ROUTE
579                );
580                (instance, path)
581            })
582            .collect()
583    }
584}