iota_aws_orchestrator/protocol/
iota.rs1use std::{
6 fmt::{Debug, Display},
7 path::PathBuf,
8 str::FromStr,
9};
10
11use iota_swarm_config::genesis_config::GenesisConfig;
12use iota_types::{base_types::IotaAddress, multiaddr::Multiaddr};
13use serde::{Deserialize, Serialize};
14
15use super::{ProtocolCommands, ProtocolMetrics};
16use crate::{
17 benchmark::{BenchmarkParameters, BenchmarkType},
18 client::Instance,
19 settings::Settings,
20};
21
22#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
23pub struct IotaBenchmarkType {
24 shared_objects_ratio: u16,
27}
28
29impl Debug for IotaBenchmarkType {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 write!(f, "{}", self.shared_objects_ratio)
32 }
33}
34
35impl Display for IotaBenchmarkType {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 write!(f, "{}% shared objects", self.shared_objects_ratio)
38 }
39}
40
41impl FromStr for IotaBenchmarkType {
42 type Err = std::num::ParseIntError;
43
44 fn from_str(s: &str) -> Result<Self, Self::Err> {
45 Ok(Self {
46 shared_objects_ratio: s.parse::<u16>()?.min(100),
47 })
48 }
49}
50
51impl BenchmarkType for IotaBenchmarkType {}
52
53pub struct IotaProtocol {
55 working_dir: PathBuf,
56}
57
58impl ProtocolCommands<IotaBenchmarkType> for IotaProtocol {
59 fn protocol_dependencies(&self) -> Vec<&'static str> {
60 vec![
61 "sudo apt-get -y install curl git-all clang cmake gcc libssl-dev pkg-config libclang-dev",
63 "sudo apt-get -y install libpq-dev",
65 ]
66 }
67
68 fn db_directories(&self) -> Vec<PathBuf> {
69 let authorities_db = [&self.working_dir, &iota_config::AUTHORITIES_DB_NAME.into()]
70 .iter()
71 .collect();
72 let consensus_db = [&self.working_dir, &iota_config::CONSENSUS_DB_NAME.into()]
73 .iter()
74 .collect();
75 vec![authorities_db, consensus_db]
76 }
77
78 fn genesis_command<'a, I>(&self, instances: I) -> String
79 where
80 I: Iterator<Item = &'a Instance>,
81 {
82 let working_dir = self.working_dir.display();
83 let ips = instances
84 .map(|x| x.main_ip.to_string())
85 .collect::<Vec<_>>()
86 .join(" ");
87 let genesis = [
88 "cargo run --release --bin iota --",
89 "genesis",
90 &format!("-f --working-dir {working_dir} --benchmark-ips {ips}"),
91 ]
92 .join(" ");
93
94 [
95 &format!("mkdir -p {working_dir}"),
96 "source $HOME/.cargo/env",
97 &genesis,
98 ]
99 .join(" && ")
100 }
101
102 fn monitor_command<I>(&self, _instances: I) -> Vec<(Instance, String)>
103 where
104 I: IntoIterator<Item = Instance>,
105 {
106 vec![]
116 }
117
118 fn node_command<I>(
119 &self,
120 instances: I,
121 _parameters: &BenchmarkParameters<IotaBenchmarkType>,
122 ) -> Vec<(Instance, String)>
123 where
124 I: IntoIterator<Item = Instance>,
125 {
126 let working_dir = self.working_dir.clone();
127 let network_addresses = Self::resolve_network_addresses(instances);
128
129 network_addresses
130 .into_iter()
131 .enumerate()
132 .map(|(i, (instance, network_address))| {
133 let validator_config =
134 iota_config::validator_config_file(network_address.clone(), i);
135 let config_path: PathBuf = working_dir.join(validator_config);
136
137 let run = [
138 "cargo run --release --bin iota-node --",
139 &format!(
140 "--config-path {} --listen-address {}",
141 config_path.display(),
142 network_address.with_zero_ip()
143 ),
144 ]
145 .join(" ");
146 let command = ["source $HOME/.cargo/env", &run].join(" && ");
147
148 (instance, command)
149 })
150 .collect()
151 }
152
153 fn client_command<I>(
154 &self,
155 instances: I,
156 parameters: &BenchmarkParameters<IotaBenchmarkType>,
157 ) -> Vec<(Instance, String)>
158 where
159 I: IntoIterator<Item = Instance>,
160 {
161 let genesis_path: PathBuf = [
162 &self.working_dir,
163 &iota_config::IOTA_GENESIS_FILENAME.into(),
164 ]
165 .iter()
166 .collect();
167 let keystore_path: PathBuf = [
168 &self.working_dir,
169 &iota_config::IOTA_BENCHMARK_GENESIS_GAS_KEYSTORE_FILENAME.into(),
170 ]
171 .iter()
172 .collect();
173
174 let committee_size = parameters.nodes;
175 let clients: Vec<_> = instances.into_iter().collect();
176 let load_share = parameters.load / clients.len();
177 let shared_counter = parameters.benchmark_type.shared_objects_ratio;
178 let transfer_objects = 100 - shared_counter;
179 let metrics_port = Self::CLIENT_METRICS_PORT;
180 let gas_keys = GenesisConfig::benchmark_gas_keys(committee_size);
181
182 clients
183 .into_iter()
184 .enumerate()
185 .map(|(i, instance)| {
186 let genesis = genesis_path.display();
187 let keystore = keystore_path.display();
188 let gas_key = &gas_keys[i % committee_size];
189 let gas_address = IotaAddress::from(&gas_key.public());
190
191 let run = [
192 "cargo run --release --bin stress --",
193 "--num-client-threads 24 --num-server-threads 1",
194 "--local false --num-transfer-accounts 2",
195 &format!("--genesis-blob-path {genesis} --keystore-path {keystore}",),
196 &format!("--primary-gas-owner-id {gas_address}"),
197 "bench",
198 &format!("--in-flight-ratio 30 --num-workers 24 --target-qps {load_share}"),
199 &format!(
200 "--shared-counter {shared_counter} --transfer-object {transfer_objects}"
201 ),
202 "--shared-counter-hotness-factor 50",
203 &format!("--client-metric-host 0.0.0.0 --client-metric-port {metrics_port}"),
204 ]
205 .join(" ");
206 let command = ["source $HOME/.cargo/env", &run].join(" && ");
207
208 (instance, command)
209 })
210 .collect()
211 }
212}
213
214impl IotaProtocol {
215 const CLIENT_METRICS_PORT: u16 = GenesisConfig::BENCHMARKS_PORT_OFFSET + 2000;
216
217 pub fn new(settings: &Settings) -> Self {
219 Self {
220 working_dir: [&settings.working_dir, &iota_config::IOTA_CONFIG_DIR.into()]
221 .iter()
222 .collect(),
223 }
224 }
225
226 pub fn resolve_network_addresses(
229 instances: impl IntoIterator<Item = Instance>,
230 ) -> Vec<(Instance, Multiaddr)> {
231 let instances: Vec<Instance> = instances.into_iter().collect();
232 let ips: Vec<_> = instances.iter().map(|x| x.main_ip.to_string()).collect();
233 let genesis_config = GenesisConfig::new_for_benchmarks(&ips);
234 let mut addresses = Vec::new();
235 if let Some(validator_configs) = genesis_config.validator_config_info.as_ref() {
236 for (i, validator_info) in validator_configs.iter().enumerate() {
237 let address = &validator_info.network_address;
238 addresses.push((instances[i].clone(), address.clone()));
239 }
240 }
241 addresses
242 }
243}
244
245impl ProtocolMetrics for IotaProtocol {
246 const BENCHMARK_DURATION: &'static str = "benchmark_duration";
247 const TOTAL_TRANSACTIONS: &'static str = "latency_s_count";
248 const LATENCY_BUCKETS: &'static str = "latency_s";
249 const LATENCY_SUM: &'static str = "latency_s_sum";
250 const LATENCY_SQUARED_SUM: &'static str = "latency_squared_s";
251
252 fn nodes_metrics_path<I>(&self, instances: I) -> Vec<(Instance, String)>
253 where
254 I: IntoIterator<Item = Instance>,
255 {
256 let (ips, instances): (Vec<_>, Vec<_>) = instances
257 .into_iter()
258 .map(|x| (x.main_ip.to_string(), x))
259 .unzip();
260
261 GenesisConfig::new_for_benchmarks(&ips)
262 .validator_config_info
263 .expect("No validator in genesis")
264 .iter()
265 .zip(instances)
266 .map(|(config, instance)| {
267 let path = format!(
268 "{}:{}{}",
269 instance.main_ip,
270 config.metrics_address.port(),
271 iota_metrics::METRICS_ROUTE
272 );
273 (instance, path)
274 })
275 .collect()
276 }
277
278 fn clients_metrics_path<I>(&self, instances: I) -> Vec<(Instance, String)>
279 where
280 I: IntoIterator<Item = Instance>,
281 {
282 instances
283 .into_iter()
284 .map(|instance| {
285 let path = format!(
286 "{}:{}{}",
287 instance.main_ip,
288 Self::CLIENT_METRICS_PORT,
289 iota_metrics::METRICS_ROUTE
290 );
291 (instance, path)
292 })
293 .collect()
294 }
295}