iota_aws_orchestrator/protocol/
iota.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    fmt::{Debug, Display},
8    path::PathBuf,
9    str::FromStr,
10};
11
12use iota_swarm_config::genesis_config::GenesisConfig;
13use iota_types::{base_types::IotaAddress, multiaddr::Multiaddr};
14use serde::{Deserialize, Serialize};
15
16use super::{ProtocolCommands, ProtocolMetrics};
17use crate::{
18    ConsensusProtocol,
19    benchmark::{BenchmarkParameters, BenchmarkType},
20    client::Instance,
21    display,
22    settings::{BinaryBuildConfig, Settings, build_cargo_command, join_non_empty_strings},
23};
24
25#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct IotaBenchmarkType {
27    /// Percentage of shared vs owned objects; 0 means only owned objects and
28    /// 100 means only shared objects.
29    shared_objects_ratio: u16,
30}
31
32impl Debug for IotaBenchmarkType {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        write!(f, "{}", self.shared_objects_ratio)
35    }
36}
37
38impl Display for IotaBenchmarkType {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}% shared objects", self.shared_objects_ratio)
41    }
42}
43
44impl FromStr for IotaBenchmarkType {
45    type Err = std::num::ParseIntError;
46
47    fn from_str(s: &str) -> Result<Self, Self::Err> {
48        Ok(Self {
49            shared_objects_ratio: s.parse::<u16>()?.min(100),
50        })
51    }
52}
53
54impl BenchmarkType for IotaBenchmarkType {}
55
56/// All configurations information to run an IOTA client or validator.
57pub struct IotaProtocol {
58    working_dir: PathBuf,
59    use_fullnode_for_execution: bool,
60    use_precompiled_binaries: bool,
61    build_configs: HashMap<String, BinaryBuildConfig>,
62    enable_flamegraph: bool,
63}
64
65impl IotaProtocol {
66    /// Make a new instance of the IOTA protocol commands generator.
67    pub fn new(settings: &Settings) -> Self {
68        Self {
69            working_dir: [&settings.working_dir, &iota_config::IOTA_CONFIG_DIR.into()]
70                .iter()
71                .collect(),
72            use_fullnode_for_execution: settings.use_fullnode_for_execution,
73            use_precompiled_binaries: settings.build_cache_enabled(),
74            build_configs: settings.build_configs.clone(),
75            enable_flamegraph: settings.enable_flamegraph,
76        }
77    }
78
79    /// Build the command to run a binary, either using precompiled binary or
80    /// cargo run. Returns the command string with proper toolchain and
81    /// features configured.
82    fn run_binary_command<S1: AsRef<str>, S2: AsRef<str>>(
83        &self,
84        binary_name: &str,
85        setup_commands: &[S1],
86        additional_args: &[S2],
87    ) -> String {
88        if self.use_precompiled_binaries {
89            // The precompiled binary is located in the working directory
90            let binary_path = format!("./target/release/{binary_name}");
91            let binary_command = join_non_empty_strings(
92                &std::iter::once(binary_path.as_str())
93                    .chain(additional_args.iter().map(|s| s.as_ref()))
94                    .collect::<Vec<_>>(),
95                " ",
96            );
97
98            let all_commands: Vec<String> = setup_commands
99                .iter()
100                .map(|s| s.as_ref().to_string())
101                .chain(std::iter::once(binary_command))
102                .collect();
103
104            join_non_empty_strings(&all_commands, " && ")
105        } else {
106            let build_config = self
107                .build_configs
108                .get(binary_name)
109                .expect("No build config found for binary");
110            build_cargo_command(
111                "run",
112                build_config.toolchain.clone(),
113                build_config.features.clone(),
114                &[binary_name],
115                setup_commands,
116                additional_args,
117            )
118        }
119    }
120}
121
122impl ProtocolCommands<IotaBenchmarkType> for IotaProtocol {
123    fn protocol_dependencies(&self) -> Vec<&'static str> {
124        if !self.use_precompiled_binaries {
125            return vec!["sudo apt-get -y install libudev-dev libpq5 libpq-dev"];
126        }
127
128        vec![]
129    }
130
131    fn db_directories(&self) -> Vec<PathBuf> {
132        let authorities_db = self.working_dir.join(iota_config::AUTHORITIES_DB_NAME);
133        let consensus_db = self.working_dir.join(iota_config::CONSENSUS_DB_NAME);
134        let full_node_db: PathBuf = self.working_dir.join(iota_config::FULL_NODE_DB_PATH);
135        vec![authorities_db, consensus_db, full_node_db]
136    }
137
138    fn genesis_command<'a, I>(
139        &self,
140        instances: I,
141        parameters: &BenchmarkParameters<IotaBenchmarkType>,
142    ) -> String
143    where
144        I: Iterator<Item = &'a Instance>,
145    {
146        let working_dir = self.working_dir.display();
147        let ips = instances
148            .map(|x| {
149                match parameters.use_internal_ip_address {
150                    true => x.private_ip,
151                    false => x.main_ip,
152                }
153                .to_string()
154            })
155            .collect::<Vec<_>>()
156            .join(" ");
157
158        let epoch_duration_flag = parameters
159            .epoch_duration_ms
160            .map(|epoch_duration_ms| format!("--epoch-duration-ms {epoch_duration_ms}"))
161            .unwrap_or_default();
162        let chain_start_timestamp_flag = parameters
163            .chain_start_timestamp_ms
164            .map(|timestamp_ms| format!("--chain-start-timestamp-ms {timestamp_ms}"))
165            .unwrap_or_default();
166        let additional_gas_accounts_flag = format!(
167            "--num-additional-gas-accounts {}",
168            parameters.additional_gas_accounts
169        );
170
171        let iota_command = self.run_binary_command(
172            "iota",
173            &[&format!("mkdir -p {working_dir}")],
174            &[
175                "genesis",
176                &format!("-f --working-dir {working_dir} --benchmark-ips {ips} --admin-interface-address=localhost:1337"),
177                &epoch_duration_flag,
178                &chain_start_timestamp_flag,
179                &additional_gas_accounts_flag,
180            ],
181        );
182
183        iota_command
184    }
185
186    fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
187    where
188        I: IntoIterator<Item = Instance>,
189    {
190        // instances
191        //     .into_iter()
192        //     .map(|i| {
193        //         (
194        //             i,
195        //             "tail -f --pid=$(pidof iota) -f /dev/null; tail -100
196        // node.log".to_string(),         )
197        //     })
198        //     .collect()
199        vec![]
200    }
201
202    fn node_command<I>(
203        &self,
204        instances: I,
205        parameters: &BenchmarkParameters<IotaBenchmarkType>,
206    ) -> Vec<(Instance, String)>
207    where
208        I: IntoIterator<Item = Instance>,
209    {
210        let working_dir = self.working_dir.clone();
211        let network_addresses = Self::resolve_network_addresses(instances, parameters);
212
213        network_addresses
214            .into_iter()
215            .enumerate()
216            .map(|(i, (instance, network_address))| {
217                let validator_config =
218                    iota_config::validator_config_file(network_address.clone(), i);
219                let config_path: PathBuf = working_dir.join(validator_config);
220                let max_pipeline_delay = parameters.max_pipeline_delay;
221                let iota_node_command = self.run_binary_command(
222                    "iota-node",
223                    &[
224                        match parameters.consensus_protocol {
225                            ConsensusProtocol::Starfish => "export CONSENSUS_PROTOCOL=starfish",
226                            ConsensusProtocol::Mysticeti => "export CONSENSUS_PROTOCOL=mysticeti",
227                            ConsensusProtocol::SwapEachEpoch => {
228                                "export CONSENSUS_PROTOCOL=swap_each_epoch"
229                            }
230                        },
231                        format!("export MAX_PIPELINE_DELAY={max_pipeline_delay}").as_str(),
232                        if self.enable_flamegraph {
233                            "export TRACE_FLAMEGRAPH=1"
234                        } else {
235                            ""
236                        },
237                    ],
238                    &[&format!(
239                        "--config-path {} --listen-address {}",
240                        config_path.display(),
241                        network_address.with_zero_ip()
242                    )],
243                );
244
245                display::action(format!(
246                    "\n Validator-node Command ({i}): {iota_node_command}"
247                ));
248
249                (instance, iota_node_command)
250            })
251            .collect()
252    }
253
254    fn fullnode_command<I>(
255        &self,
256        instances: I,
257        parameters: &BenchmarkParameters<IotaBenchmarkType>,
258    ) -> Vec<(Instance, String)>
259    where
260        I: IntoIterator<Item = Instance>,
261    {
262        let working_dir = self.working_dir.clone();
263
264        instances
265            .into_iter()
266            .enumerate()
267            .map(|(i, instance)| {
268                let config_path: PathBuf = working_dir.join(iota_config::IOTA_FULLNODE_CONFIG);
269                let fullnode_ip = match parameters.use_internal_ip_address {
270                    true => &instance.private_ip,
271                    false => &instance.main_ip,
272                };
273
274                let iota_node_command = self.run_binary_command(
275                    "iota-node",
276                    &[
277                        // Overwrite listen address and external address with 0.0.0.0 and actual fullnode IP.
278                        // Escape quotes for proper handling inside tmux wrapper
279                        format!(
280                            "sed -i 's|listen-address: \\\"127.0.0.1:|listen-address: \\\"0.0.0.0:|' {0} && sed -i 's|external-address: /ip4/127.0.0.1/|external-address: /ip4/{1}/|' {0}",
281                            config_path.display(),
282                            fullnode_ip
283                        ),
284                        if self.enable_flamegraph {
285                            "export TRACE_FLAMEGRAPH=1".to_string()
286                        } else {
287                            "".to_string()
288                        },
289                    ],
290                    &[&format!("--config-path {}", config_path.display())],
291                );
292
293                display::action(format!("\n Full-node Command ({i}): {iota_node_command}"));
294
295                (instance, iota_node_command)
296            })
297            .collect()
298    }
299
300    fn client_command<I>(
301        &self,
302        instances: I,
303        parameters: &BenchmarkParameters<IotaBenchmarkType>,
304    ) -> Vec<(Instance, String)>
305    where
306        I: IntoIterator<Item = Instance>,
307    {
308        let genesis_path: PathBuf = [
309            &self.working_dir,
310            &iota_config::IOTA_GENESIS_FILENAME.into(),
311        ]
312        .iter()
313        .collect();
314        let keystore_path: PathBuf = [
315            &self.working_dir,
316            &iota_config::IOTA_BENCHMARK_GENESIS_GAS_KEYSTORE_FILENAME.into(),
317        ]
318        .iter()
319        .collect();
320
321        let committee_size = parameters.nodes;
322        let clients: Vec<_> = instances.into_iter().collect();
323        let load_share = parameters.load / clients.len();
324        let shared_counter = parameters.benchmark_type.shared_objects_ratio;
325        let transfer_objects = 100 - shared_counter;
326        let metrics_port = Self::CLIENT_METRICS_PORT;
327        // Get gas keys for all validators and clients
328        let gas_keys =
329            GenesisConfig::benchmark_gas_keys(committee_size + parameters.additional_gas_accounts);
330        // Validators use the first `nodes` keys, so clients should start after that
331        let client_key_offset = committee_size;
332
333        clients
334            .into_iter()
335            .enumerate()
336            .map(|(i, instance)| {
337                let genesis = genesis_path.display().to_string();
338                let keystore = keystore_path.display().to_string();
339                // Offset client key index to avoid colliding with validator keys
340                let gas_key = &gas_keys[client_key_offset + i];
341                let gas_address = IotaAddress::from(&gas_key.public());
342
343                let mut stress_args: Vec<String> = vec![
344                    "--num-client-threads 24 --num-server-threads 1".to_string(),
345                    "--local false --num-transfer-accounts 2".to_string(),
346                    format!("--genesis-blob-path {genesis} --keystore-path {keystore}"),
347                    format!("--primary-gas-owner-id {gas_address}"),
348                    "bench".to_string(),
349                    format!("--in-flight-ratio 30 --num-workers 24 --target-qps {load_share}"),
350                    format!(
351                        "--shared-counter {shared_counter} --transfer-object {transfer_objects}"
352                    ),
353                    format!("--client-metric-host 0.0.0.0 --client-metric-port {metrics_port}"),
354                ];
355
356                // Add optional shared counter hotness factor if specified
357                let hotness_factor = parameters.shared_counter_hotness_factor.unwrap_or(50);
358                stress_args.push(format!("--shared-counter-hotness-factor {hotness_factor}"));
359
360                // Add optional num shared counters if specified
361                if let Some(num_counters) = parameters.num_shared_counters {
362                    stress_args.push(format!("--num-shared-counters {num_counters}"));
363                }
364
365                if self.use_fullnode_for_execution {
366                    stress_args.push("--use-fullnode-for-execution true".to_string());
367                    stress_args.push("--fullnode-rpc-addresses http://127.0.0.1:9000".to_string());
368                }
369
370                let stress_command = self.run_binary_command(
371                    "stress",
372                    // required for stress binary, otherwise it will use the CARGO_MANIFEST_DIR,
373                    // which is set during compilation time
374                    &[
375                        "export MOVE_EXAMPLES_DIR=$(pwd)/examples/move",
376                        "export RUST_LOG=iota_benchmark=debug",
377                    ],
378                    &stress_args,
379                );
380
381                display::action(format!("\n Stress Command ({i}): {stress_command}"));
382
383                (instance, stress_command)
384            })
385            .collect()
386    }
387}
388
389impl IotaProtocol {
390    const CLIENT_METRICS_PORT: u16 = GenesisConfig::BENCHMARKS_PORT_OFFSET + 2000;
391
392    /// Creates the network addresses in multi address format for the instances.
393    /// It returns the Instance and the corresponding address.
394    pub fn resolve_network_addresses(
395        instances: impl IntoIterator<Item = Instance>,
396        parameters: &BenchmarkParameters<IotaBenchmarkType>,
397    ) -> Vec<(Instance, Multiaddr)> {
398        let instances: Vec<Instance> = instances.into_iter().collect();
399        let ips: Vec<_> = instances
400            .iter()
401            .map(|x| match parameters.use_internal_ip_address {
402                true => x.private_ip.to_string(),
403                false => x.main_ip.to_string(),
404            })
405            .collect();
406        let genesis_config = GenesisConfig::new_for_benchmarks(
407            &ips,
408            parameters.epoch_duration_ms,
409            parameters.chain_start_timestamp_ms,
410            Some(parameters.additional_gas_accounts),
411            u64::MAX,
412        );
413        let mut addresses = Vec::new();
414        if let Some(validator_configs) = genesis_config.validator_config_info.as_ref() {
415            for (i, validator_info) in validator_configs.iter().enumerate() {
416                let address = &validator_info.network_address;
417                addresses.push((instances[i].clone(), address.clone()));
418            }
419        }
420        addresses
421    }
422}
423
424impl ProtocolMetrics for IotaProtocol {
425    const BENCHMARK_DURATION: &'static str = "benchmark_duration";
426    const TOTAL_TRANSACTIONS: &'static str = "latency_s_count";
427    const LATENCY_BUCKETS: &'static str = "latency_s";
428    const LATENCY_SUM: &'static str = "latency_s_sum";
429    const LATENCY_SQUARED_SUM: &'static str = "latency_squared_s";
430
431    fn nodes_metrics_path<I>(
432        &self,
433        instances: I,
434        use_internal_ip_address: bool,
435    ) -> Vec<(Instance, String)>
436    where
437        I: IntoIterator<Item = Instance>,
438    {
439        let instances = instances.into_iter().collect::<Vec<_>>();
440        let ips = (0..instances.len())
441            .map(|_| "0.0.0.0".to_string())
442            .collect::<Vec<_>>();
443        // From GenesisConfig we only need validators' `metrics_address` port which is
444        // computed from validator's offset in `ips`. The values of (the rest
445        // of) the arguments are irrelevant.
446        GenesisConfig::new_for_benchmarks(&ips, None, None, None, u64::MAX)
447            .validator_config_info
448            .expect("No validator in genesis")
449            .iter()
450            .zip(instances)
451            .map(|(config, instance)| {
452                let path = format!(
453                    "{}:{}{}",
454                    match use_internal_ip_address {
455                        true => instance.private_ip,
456                        false => instance.main_ip,
457                    },
458                    config.metrics_address.port(),
459                    iota_metrics::METRICS_ROUTE
460                );
461                (instance, path)
462            })
463            .collect()
464    }
465
466    fn clients_metrics_path<I>(
467        &self,
468        instances: I,
469        use_internal_ip_address: bool,
470    ) -> Vec<(Instance, String)>
471    where
472        I: IntoIterator<Item = Instance>,
473    {
474        instances
475            .into_iter()
476            .map(|instance| {
477                let path = format!(
478                    "{}:{}{}",
479                    match use_internal_ip_address {
480                        true => instance.private_ip,
481                        false => instance.main_ip,
482                    },
483                    Self::CLIENT_METRICS_PORT,
484                    iota_metrics::METRICS_ROUTE
485                );
486                (instance, path)
487            })
488            .collect()
489    }
490}