iota_aws_orchestrator/protocol/
iota.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    fmt::{Debug, Display},
8    path::PathBuf,
9    str::FromStr,
10};
11
12use iota_swarm_config::genesis_config::GenesisConfig;
13use iota_types::{base_types::IotaAddress, multiaddr::Multiaddr};
14use serde::{Deserialize, Serialize};
15
16use super::{ProtocolCommands, ProtocolMetrics};
17use crate::{
18    ConsensusProtocol,
19    benchmark::{BenchmarkParameters, BenchmarkType},
20    client::Instance,
21    display,
22    settings::{BinaryBuildConfig, Settings, build_cargo_command, join_non_empty_strings},
23};
24
25#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct IotaBenchmarkType {
27    /// Percentage of shared vs owned objects; 0 means only owned objects and
28    /// 100 means only shared objects.
29    shared_objects_ratio: u16,
30}
31
32impl Debug for IotaBenchmarkType {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        write!(f, "{}", self.shared_objects_ratio)
35    }
36}
37
38impl Display for IotaBenchmarkType {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}% shared objects", self.shared_objects_ratio)
41    }
42}
43
44impl FromStr for IotaBenchmarkType {
45    type Err = std::num::ParseIntError;
46
47    fn from_str(s: &str) -> Result<Self, Self::Err> {
48        Ok(Self {
49            shared_objects_ratio: s.parse::<u16>()?.min(100),
50        })
51    }
52}
53
54impl BenchmarkType for IotaBenchmarkType {}
55
56/// All configurations information to run an IOTA client or validator.
57pub struct IotaProtocol {
58    working_dir: PathBuf,
59    use_fullnode_for_execution: bool,
60    use_precompiled_binaries: bool,
61    build_configs: HashMap<String, BinaryBuildConfig>,
62    enable_flamegraph: bool,
63}
64
65impl IotaProtocol {
66    /// Make a new instance of the IOTA protocol commands generator.
67    pub fn new(settings: &Settings) -> Self {
68        Self {
69            working_dir: [&settings.working_dir, &iota_config::IOTA_CONFIG_DIR.into()]
70                .iter()
71                .collect(),
72            use_fullnode_for_execution: settings.use_fullnode_for_execution,
73            use_precompiled_binaries: settings.build_cache_enabled(),
74            build_configs: settings.build_configs.clone(),
75            enable_flamegraph: settings.enable_flamegraph,
76        }
77    }
78
79    /// Build the command to run a binary, either using precompiled binary or
80    /// cargo run. Returns the command string with proper toolchain and
81    /// features configured.
82    fn run_binary_command<S1: AsRef<str>, S2: AsRef<str>>(
83        &self,
84        binary_name: &str,
85        setup_commands: &[S1],
86        additional_args: &[S2],
87    ) -> String {
88        if self.use_precompiled_binaries {
89            // The precompiled binary is located in the working directory
90            let binary_path = format!("./target/release/{binary_name}");
91            let binary_command = join_non_empty_strings(
92                &std::iter::once(binary_path.as_str())
93                    .chain(additional_args.iter().map(|s| s.as_ref()))
94                    .collect::<Vec<_>>(),
95                " ",
96            );
97
98            let all_commands: Vec<String> = setup_commands
99                .iter()
100                .map(|s| s.as_ref().to_string())
101                .chain(std::iter::once(binary_command))
102                .collect();
103
104            join_non_empty_strings(&all_commands, " && ")
105        } else {
106            let build_config = self
107                .build_configs
108                .get(binary_name)
109                .expect("No build config found for binary");
110            build_cargo_command(
111                "run",
112                build_config.toolchain.clone(),
113                build_config.features.clone(),
114                &[binary_name],
115                setup_commands,
116                additional_args,
117            )
118        }
119    }
120}
121
122impl ProtocolCommands<IotaBenchmarkType> for IotaProtocol {
123    fn protocol_dependencies(&self) -> Vec<&'static str> {
124        if !self.use_precompiled_binaries {
125            return vec!["sudo apt-get -y install libudev-dev libpq5 libpq-dev"];
126        }
127
128        vec![]
129    }
130
131    fn db_directories(&self) -> Vec<PathBuf> {
132        let authorities_db = [&self.working_dir, &iota_config::AUTHORITIES_DB_NAME.into()]
133            .iter()
134            .collect();
135        let consensus_db = [&self.working_dir, &iota_config::CONSENSUS_DB_NAME.into()]
136            .iter()
137            .collect();
138        vec![authorities_db, consensus_db]
139    }
140
141    fn genesis_command<'a, I>(
142        &self,
143        instances: I,
144        parameters: &BenchmarkParameters<IotaBenchmarkType>,
145    ) -> String
146    where
147        I: Iterator<Item = &'a Instance>,
148    {
149        let working_dir = self.working_dir.display();
150        let ips = instances
151            .map(|x| {
152                match parameters.use_internal_ip_address {
153                    true => x.private_ip,
154                    false => x.main_ip,
155                }
156                .to_string()
157            })
158            .collect::<Vec<_>>()
159            .join(" ");
160
161        let epoch_duration_flag = parameters
162            .epoch_duration_ms
163            .map(|epoch_duration_ms| format!("--epoch-duration-ms {epoch_duration_ms}"))
164            .unwrap_or_default();
165        let chain_start_timestamp_flag = parameters
166            .chain_start_timestamp_ms
167            .map(|timestamp_ms| format!("--chain-start-timestamp-ms {timestamp_ms}"))
168            .unwrap_or_default();
169        let additional_gas_accounts_flag = format!(
170            "--num-additional-gas-accounts {}",
171            parameters.additional_gas_accounts
172        );
173
174        let iota_command = self.run_binary_command(
175            "iota",
176            &[&format!("mkdir -p {working_dir}")],
177            &[
178                "genesis",
179                &format!("-f --working-dir {working_dir} --benchmark-ips {ips} --admin-interface-address=localhost:1337"),
180                &epoch_duration_flag,
181                &chain_start_timestamp_flag,
182                &additional_gas_accounts_flag,
183            ],
184        );
185
186        display::action(format!("\n Genesis Command: {iota_command}"));
187
188        iota_command
189    }
190
191    fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
192    where
193        I: IntoIterator<Item = Instance>,
194    {
195        // instances
196        //     .into_iter()
197        //     .map(|i| {
198        //         (
199        //             i,
200        //             "tail -f --pid=$(pidof iota) -f /dev/null; tail -100
201        // node.log".to_string(),         )
202        //     })
203        //     .collect()
204        vec![]
205    }
206
207    fn node_command<I>(
208        &self,
209        instances: I,
210        parameters: &BenchmarkParameters<IotaBenchmarkType>,
211    ) -> Vec<(Instance, String)>
212    where
213        I: IntoIterator<Item = Instance>,
214    {
215        let working_dir = self.working_dir.clone();
216        let network_addresses = Self::resolve_network_addresses(instances, parameters);
217
218        network_addresses
219            .into_iter()
220            .enumerate()
221            .map(|(i, (instance, network_address))| {
222                let validator_config =
223                    iota_config::validator_config_file(network_address.clone(), i);
224                let config_path: PathBuf = working_dir.join(validator_config);
225                let max_pipeline_delay = parameters.max_pipeline_delay;
226                let iota_node_command = self.run_binary_command(
227                    "iota-node",
228                    &[
229                        match parameters.consensus_protocol {
230                            ConsensusProtocol::Starfish => "export CONSENSUS_PROTOCOL=starfish",
231                            ConsensusProtocol::Mysticeti => "export CONSENSUS_PROTOCOL=mysticeti",
232                            ConsensusProtocol::SwapEachEpoch => {
233                                "export CONSENSUS_PROTOCOL=swap_each_epoch"
234                            }
235                        },
236                        format!("export MAX_PIPELINE_DELAY={max_pipeline_delay}").as_str(),
237                        if self.enable_flamegraph {
238                            "export TRACE_FLAMEGRAPH=1"
239                        } else {
240                            ""
241                        },
242                    ],
243                    &[&format!(
244                        "--config-path {} --listen-address {}",
245                        config_path.display(),
246                        network_address.with_zero_ip()
247                    )],
248                );
249
250                display::action(format!(
251                    "\n Validator-node Command ({i}): {iota_node_command}"
252                ));
253
254                (instance, iota_node_command)
255            })
256            .collect()
257    }
258
259    fn fullnode_command<I>(
260        &self,
261        instances: I,
262        parameters: &BenchmarkParameters<IotaBenchmarkType>,
263    ) -> Vec<(Instance, String)>
264    where
265        I: IntoIterator<Item = Instance>,
266    {
267        let working_dir = self.working_dir.clone();
268
269        instances
270            .into_iter()
271            .enumerate()
272            .map(|(i, instance)| {
273                let config_path: PathBuf = working_dir.join(iota_config::IOTA_FULLNODE_CONFIG);
274                let fullnode_ip = match parameters.use_internal_ip_address {
275                    true => &instance.private_ip,
276                    false => &instance.main_ip,
277                };
278
279                let iota_node_command = self.run_binary_command(
280                    "iota-node",
281                    &[
282                        // Overwrite listen address and external address with 0.0.0.0 and actual fullnode IP.
283                        // Escape quotes for proper handling inside tmux wrapper
284                        format!(
285                            "sed -i 's|listen-address: \\\"127.0.0.1:|listen-address: \\\"0.0.0.0:|' {0} && sed -i 's|external-address: /ip4/127.0.0.1/|external-address: /ip4/{1}/|' {0}",
286                            config_path.display(),
287                            fullnode_ip
288                        ),
289                        if self.enable_flamegraph {
290                            "export TRACE_FLAMEGRAPH=1".to_string()
291                        } else {
292                            "".to_string()
293                        },
294                    ],
295                    &[&format!("--config-path {}", config_path.display())],
296                );
297
298                display::action(format!("\n Full-node Command ({i}): {iota_node_command}"));
299
300                (instance, iota_node_command)
301            })
302            .collect()
303    }
304
305    fn client_command<I>(
306        &self,
307        instances: I,
308        parameters: &BenchmarkParameters<IotaBenchmarkType>,
309    ) -> Vec<(Instance, String)>
310    where
311        I: IntoIterator<Item = Instance>,
312    {
313        let genesis_path: PathBuf = [
314            &self.working_dir,
315            &iota_config::IOTA_GENESIS_FILENAME.into(),
316        ]
317        .iter()
318        .collect();
319        let keystore_path: PathBuf = [
320            &self.working_dir,
321            &iota_config::IOTA_BENCHMARK_GENESIS_GAS_KEYSTORE_FILENAME.into(),
322        ]
323        .iter()
324        .collect();
325
326        let committee_size = parameters.nodes;
327        let clients: Vec<_> = instances.into_iter().collect();
328        let load_share = parameters.load / clients.len();
329        let shared_counter = parameters.benchmark_type.shared_objects_ratio;
330        let transfer_objects = 100 - shared_counter;
331        let metrics_port = Self::CLIENT_METRICS_PORT;
332        // Get gas keys for all validators and clients
333        let gas_keys =
334            GenesisConfig::benchmark_gas_keys(committee_size + parameters.additional_gas_accounts);
335        // Validators use the first `nodes` keys, so clients should start after that
336        let client_key_offset = committee_size;
337
338        clients
339            .into_iter()
340            .enumerate()
341            .map(|(i, instance)| {
342                let genesis = genesis_path.display().to_string();
343                let keystore = keystore_path.display().to_string();
344                // Offset client key index to avoid colliding with validator keys
345                let gas_key = &gas_keys[client_key_offset + i];
346                let gas_address = IotaAddress::from(&gas_key.public());
347
348                let mut stress_args: Vec<String> = vec![
349                    "--num-client-threads 24 --num-server-threads 1".to_string(),
350                    "--local false --num-transfer-accounts 2".to_string(),
351                    format!("--genesis-blob-path {genesis} --keystore-path {keystore}"),
352                    format!("--primary-gas-owner-id {gas_address}"),
353                    "bench".to_string(),
354                    format!("--in-flight-ratio 30 --num-workers 24 --target-qps {load_share}"),
355                    format!(
356                        "--shared-counter {shared_counter} --transfer-object {transfer_objects}"
357                    ),
358                    "--shared-counter-hotness-factor 50".to_string(),
359                    format!("--client-metric-host 0.0.0.0 --client-metric-port {metrics_port}"),
360                ];
361
362                if self.use_fullnode_for_execution {
363                    stress_args.push("--use-fullnode-for-execution true".to_string());
364                    stress_args.push("--fullnode-rpc-addresses http://127.0.0.1:9000".to_string());
365                }
366
367                let stress_command = self.run_binary_command(
368                    "stress",
369                    // required for stress binary, otherwise it will use the CARGO_MANIFEST_DIR,
370                    // which is set during compilation time
371                    &["export MOVE_EXAMPLES_DIR=$(pwd)/examples/move"],
372                    &stress_args,
373                );
374
375                display::action(format!("\n Stress Command ({i}): {stress_command}"));
376
377                (instance, stress_command)
378            })
379            .collect()
380    }
381}
382
383impl IotaProtocol {
384    const CLIENT_METRICS_PORT: u16 = GenesisConfig::BENCHMARKS_PORT_OFFSET + 2000;
385
386    /// Creates the network addresses in multi address format for the instances.
387    /// It returns the Instance and the corresponding address.
388    pub fn resolve_network_addresses(
389        instances: impl IntoIterator<Item = Instance>,
390        parameters: &BenchmarkParameters<IotaBenchmarkType>,
391    ) -> Vec<(Instance, Multiaddr)> {
392        let instances: Vec<Instance> = instances.into_iter().collect();
393        let ips: Vec<_> = instances
394            .iter()
395            .map(|x| match parameters.use_internal_ip_address {
396                true => x.private_ip.to_string(),
397                false => x.main_ip.to_string(),
398            })
399            .collect();
400        let genesis_config = GenesisConfig::new_for_benchmarks(
401            &ips,
402            parameters.epoch_duration_ms,
403            parameters.chain_start_timestamp_ms,
404            Some(parameters.additional_gas_accounts),
405        );
406        let mut addresses = Vec::new();
407        if let Some(validator_configs) = genesis_config.validator_config_info.as_ref() {
408            for (i, validator_info) in validator_configs.iter().enumerate() {
409                let address = &validator_info.network_address;
410                addresses.push((instances[i].clone(), address.clone()));
411            }
412        }
413        addresses
414    }
415}
416
417impl ProtocolMetrics for IotaProtocol {
418    const BENCHMARK_DURATION: &'static str = "benchmark_duration";
419    const TOTAL_TRANSACTIONS: &'static str = "latency_s_count";
420    const LATENCY_BUCKETS: &'static str = "latency_s";
421    const LATENCY_SUM: &'static str = "latency_s_sum";
422    const LATENCY_SQUARED_SUM: &'static str = "latency_squared_s";
423
424    fn nodes_metrics_path<I, T>(
425        &self,
426        instances: I,
427        parameters: &BenchmarkParameters<T>,
428    ) -> Vec<(Instance, String)>
429    where
430        I: IntoIterator<Item = Instance>,
431        T: BenchmarkType,
432    {
433        let (ips, instances): (Vec<_>, Vec<_>) = instances
434            .into_iter()
435            .map(|x| {
436                (
437                    match parameters.use_internal_ip_address {
438                        true => x.private_ip,
439                        false => x.main_ip,
440                    }
441                    .to_string(),
442                    x,
443                )
444            })
445            .unzip();
446        GenesisConfig::new_for_benchmarks(
447            &ips,
448            parameters.epoch_duration_ms,
449            parameters.chain_start_timestamp_ms,
450            Some(parameters.additional_gas_accounts),
451        )
452        .validator_config_info
453        .expect("No validator in genesis")
454        .iter()
455        .zip(instances)
456        .map(|(config, instance)| {
457            let path = format!(
458                "{}:{}{}",
459                match parameters.use_internal_ip_address {
460                    true => instance.private_ip,
461                    false => instance.main_ip,
462                },
463                config.metrics_address.port(),
464                iota_metrics::METRICS_ROUTE
465            );
466            (instance, path)
467        })
468        .collect()
469    }
470
471    fn clients_metrics_path<I, T>(
472        &self,
473        instances: I,
474        parameters: &BenchmarkParameters<T>,
475    ) -> Vec<(Instance, String)>
476    where
477        I: IntoIterator<Item = Instance>,
478        T: BenchmarkType,
479    {
480        instances
481            .into_iter()
482            .map(|instance| {
483                let path = format!(
484                    "{}:{}{}",
485                    match parameters.use_internal_ip_address {
486                        true => instance.private_ip,
487                        false => instance.main_ip,
488                    },
489                    Self::CLIENT_METRICS_PORT,
490                    iota_metrics::METRICS_ROUTE
491                );
492                (instance, path)
493            })
494            .collect()
495    }
496}