1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 num::NonZeroUsize,
9 ops,
10 path::{Path, PathBuf},
11 time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17 IOTA_GENESIS_FILENAME, NodeConfig,
18 node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange},
19};
20use iota_macros::nondeterministic;
21use iota_node::IotaNodeHandle;
22use iota_protocol_config::ProtocolVersion;
23use iota_swarm_config::{
24 genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
25 network_config::NetworkConfig,
26 network_config_builder::{
27 CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
28 SupportedProtocolVersionsCallback,
29 },
30 node_config_builder::FullnodeConfigBuilder,
31};
32use iota_types::{
33 base_types::AuthorityName,
34 object::Object,
35 supported_protocol_versions::SupportedProtocolVersions,
36 traffic_control::{PolicyConfig, RemoteFirewallConfig},
37};
38use rand::rngs::OsRng;
39use tempfile::TempDir;
40use tracing::info;
41
42use super::Node;
43
44pub struct SwarmBuilder<R = OsRng> {
45 rng: R,
46 dir: Option<PathBuf>,
48 committee: CommitteeConfig,
49 genesis_config: Option<GenesisConfig>,
50 network_config: Option<NetworkConfig>,
51 additional_objects: Vec<Object>,
52 fullnode_count: usize,
53 fullnode_rpc_port: Option<u16>,
54 fullnode_rpc_addr: Option<SocketAddr>,
55 supported_protocol_versions_config: ProtocolVersionsConfig,
56 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
58 db_checkpoint_config: DBCheckpointConfig,
59 jwk_fetch_interval: Option<Duration>,
60 num_unpruned_validators: Option<usize>,
61 authority_overload_config: Option<AuthorityOverloadConfig>,
62 data_ingestion_dir: Option<PathBuf>,
63 fullnode_run_with_range: Option<RunWithRange>,
64 fullnode_policy_config: Option<PolicyConfig>,
65 fullnode_fw_config: Option<RemoteFirewallConfig>,
66 max_submit_position: Option<usize>,
67 submit_delay_step_override_millis: Option<u64>,
68 state_accumulator_config: StateAccumulatorV1EnabledConfig,
69}
70
71impl SwarmBuilder {
72 #[expect(clippy::new_without_default)]
73 pub fn new() -> Self {
74 Self {
75 rng: OsRng,
76 dir: None,
77 committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
78 genesis_config: None,
79 network_config: None,
80 additional_objects: vec![],
81 fullnode_count: 0,
82 fullnode_rpc_port: None,
83 fullnode_rpc_addr: None,
84 supported_protocol_versions_config: ProtocolVersionsConfig::Default,
85 fullnode_supported_protocol_versions_config: None,
86 db_checkpoint_config: DBCheckpointConfig::default(),
87 jwk_fetch_interval: None,
88 num_unpruned_validators: None,
89 authority_overload_config: None,
90 data_ingestion_dir: None,
91 fullnode_run_with_range: None,
92 fullnode_policy_config: None,
93 fullnode_fw_config: None,
94 max_submit_position: None,
95 submit_delay_step_override_millis: None,
96 state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
97 }
98 }
99}
100
101impl<R> SwarmBuilder<R> {
102 pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
103 SwarmBuilder {
104 rng,
105 dir: self.dir,
106 committee: self.committee,
107 genesis_config: self.genesis_config,
108 network_config: self.network_config,
109 additional_objects: self.additional_objects,
110 fullnode_count: self.fullnode_count,
111 fullnode_rpc_port: self.fullnode_rpc_port,
112 fullnode_rpc_addr: self.fullnode_rpc_addr,
113 supported_protocol_versions_config: self.supported_protocol_versions_config,
114 fullnode_supported_protocol_versions_config: self
115 .fullnode_supported_protocol_versions_config,
116 db_checkpoint_config: self.db_checkpoint_config,
117 jwk_fetch_interval: self.jwk_fetch_interval,
118 num_unpruned_validators: self.num_unpruned_validators,
119 authority_overload_config: self.authority_overload_config,
120 data_ingestion_dir: self.data_ingestion_dir,
121 fullnode_run_with_range: self.fullnode_run_with_range,
122 fullnode_policy_config: self.fullnode_policy_config,
123 fullnode_fw_config: self.fullnode_fw_config,
124 max_submit_position: self.max_submit_position,
125 submit_delay_step_override_millis: self.submit_delay_step_override_millis,
126 state_accumulator_config: self.state_accumulator_config,
127 }
128 }
129
130 pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
138 self.dir = Some(dir.into());
139 self
140 }
141
142 pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
146 self.committee = CommitteeConfig::Size(committee_size);
147 self
148 }
149
150 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
151 self.committee = CommitteeConfig::Validators(validators);
152 self
153 }
154
155 pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
156 assert!(self.network_config.is_none() && self.genesis_config.is_none());
157 self.genesis_config = Some(genesis_config);
158 self
159 }
160
161 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
162 assert!(self.network_config.is_none());
163 self.num_unpruned_validators = Some(n);
164 self
165 }
166
167 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
168 self.jwk_fetch_interval = Some(i);
169 self
170 }
171
172 pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
173 assert!(self.network_config.is_none() && self.genesis_config.is_none());
174 self.network_config = Some(network_config);
175 self
176 }
177
178 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
179 self.get_or_init_genesis_config().accounts = accounts;
180 self
181 }
182
183 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
184 self.additional_objects.extend(objects);
185 self
186 }
187
188 pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
189 self.fullnode_count = fullnode_count;
190 self
191 }
192
193 pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
194 assert!(self.fullnode_rpc_addr.is_none());
195 self.fullnode_rpc_port = Some(fullnode_rpc_port);
196 self
197 }
198
199 pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
200 assert!(self.fullnode_rpc_port.is_none());
201 self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
202 self
203 }
204
205 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
206 self.get_or_init_genesis_config()
207 .parameters
208 .epoch_duration_ms = epoch_duration_ms;
209 self
210 }
211
212 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
213 self.get_or_init_genesis_config()
214 .parameters
215 .protocol_version = v;
216 self
217 }
218
219 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
220 self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
221 self
222 }
223
224 pub fn with_supported_protocol_version_callback(
225 mut self,
226 func: SupportedProtocolVersionsCallback,
227 ) -> Self {
228 self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
229 self
230 }
231
232 pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
233 self.supported_protocol_versions_config = c;
234 self
235 }
236
237 pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
238 self.state_accumulator_config = c;
239 self
240 }
241
242 pub fn with_fullnode_supported_protocol_versions_config(
243 mut self,
244 c: ProtocolVersionsConfig,
245 ) -> Self {
246 self.fullnode_supported_protocol_versions_config = Some(c);
247 self
248 }
249
250 pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
251 self.db_checkpoint_config = db_checkpoint_config;
252 self
253 }
254
255 pub fn with_authority_overload_config(
256 mut self,
257 authority_overload_config: AuthorityOverloadConfig,
258 ) -> Self {
259 assert!(self.network_config.is_none());
260 self.authority_overload_config = Some(authority_overload_config);
261 self
262 }
263
264 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
265 self.data_ingestion_dir = Some(path);
266 self
267 }
268
269 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
270 if let Some(run_with_range) = run_with_range {
271 self.fullnode_run_with_range = Some(run_with_range);
272 }
273 self
274 }
275
276 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
277 self.fullnode_policy_config = config;
278 self
279 }
280
281 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
282 self.fullnode_fw_config = config;
283 self
284 }
285
286 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
287 if self.genesis_config.is_none() {
288 assert!(self.network_config.is_none());
289 self.genesis_config = Some(GenesisConfig::for_local_testing());
290 }
291 self.genesis_config.as_mut().unwrap()
292 }
293
294 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
295 self.max_submit_position = Some(max_submit_position);
296 self
297 }
298
299 pub fn with_submit_delay_step_override_millis(
300 mut self,
301 submit_delay_step_override_millis: u64,
302 ) -> Self {
303 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
304 self
305 }
306}
307
308impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
309 pub fn build(self) -> Swarm {
311 let dir = if let Some(dir) = self.dir {
312 SwarmDirectory::Persistent(dir)
313 } else {
314 SwarmDirectory::new_temporary()
315 };
316
317 let ingest_data = self.data_ingestion_dir.clone();
318
319 let network_config = self.network_config.unwrap_or_else(|| {
320 let mut config_builder = ConfigBuilder::new(dir.as_ref());
321
322 if let Some(genesis_config) = self.genesis_config {
323 config_builder = config_builder.with_genesis_config(genesis_config);
324 }
325
326 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
327 config_builder =
328 config_builder.with_num_unpruned_validators(num_unpruned_validators);
329 }
330
331 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
332 config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
333 }
334
335 if let Some(authority_overload_config) = self.authority_overload_config {
336 config_builder =
337 config_builder.with_authority_overload_config(authority_overload_config);
338 }
339
340 if let Some(path) = self.data_ingestion_dir {
341 config_builder = config_builder.with_data_ingestion_dir(path);
342 }
343
344 if let Some(max_submit_position) = self.max_submit_position {
345 config_builder = config_builder.with_max_submit_position(max_submit_position);
346 }
347
348 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
349 {
350 config_builder = config_builder
351 .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
352 }
353
354 let mut network_config = config_builder
355 .committee(self.committee)
356 .rng(self.rng)
357 .with_objects(self.additional_objects)
358 .with_empty_validator_genesis()
359 .with_supported_protocol_versions_config(
360 self.supported_protocol_versions_config.clone(),
361 )
362 .with_state_accumulator_config(self.state_accumulator_config.clone())
363 .build();
364 let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
366 network_config
367 .genesis
368 .save(&genesis_path)
369 .expect("genesis should be saved successfully");
370 for validator in &mut network_config.validator_configs {
371 validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
372 }
373 network_config
374 });
375
376 let mut nodes: HashMap<_, _> = network_config
377 .validator_configs()
378 .iter()
379 .map(|config| {
380 info!(
381 "SwarmBuilder configuring validator with name {}",
382 config.authority_public_key()
383 );
384 (config.authority_public_key(), Node::new(config.to_owned()))
385 })
386 .collect();
387
388 let mut fullnode_config_builder = FullnodeConfigBuilder::new()
389 .with_config_directory(dir.as_ref().into())
390 .with_db_checkpoint_config(self.db_checkpoint_config.clone())
391 .with_run_with_range(self.fullnode_run_with_range)
392 .with_policy_config(self.fullnode_policy_config)
393 .with_data_ingestion_dir(ingest_data)
394 .with_fw_config(self.fullnode_fw_config);
395
396 if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
397 let supported_versions = match spvc {
398 ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
399 ProtocolVersionsConfig::Global(v) => *v,
400 ProtocolVersionsConfig::PerValidator(func) => func(0, None),
401 };
402 fullnode_config_builder =
403 fullnode_config_builder.with_supported_protocol_versions(supported_versions);
404 }
405
406 if self.fullnode_count > 0 {
407 (0..self.fullnode_count).for_each(|idx| {
408 let mut builder = fullnode_config_builder.clone();
409 if idx == 0 {
410 if let Some(rpc_addr) = self.fullnode_rpc_addr {
413 builder = builder.with_rpc_addr(rpc_addr);
414 }
415 if let Some(rpc_port) = self.fullnode_rpc_port {
416 builder = builder.with_rpc_port(rpc_port);
417 }
418 }
419 let config = builder.build(&mut OsRng, &network_config);
420 info!(
421 "SwarmBuilder configuring full node with name {}",
422 config.authority_public_key()
423 );
424 nodes.insert(config.authority_public_key(), Node::new(config));
425 });
426 }
427 Swarm {
428 dir,
429 network_config,
430 nodes,
431 fullnode_config_builder,
432 }
433 }
434}
435
436#[derive(Debug)]
438pub struct Swarm {
439 dir: SwarmDirectory,
440 network_config: NetworkConfig,
441 nodes: HashMap<AuthorityName, Node>,
442 fullnode_config_builder: FullnodeConfigBuilder,
444}
445
446impl Drop for Swarm {
447 fn drop(&mut self) {
448 self.nodes_iter_mut().for_each(|node| node.stop());
449 }
450}
451
452impl Swarm {
453 fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
454 self.nodes.values_mut()
455 }
456
457 pub fn builder() -> SwarmBuilder {
459 SwarmBuilder::new()
460 }
461
462 pub async fn launch(&mut self) -> Result<()> {
464 try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
465 tracing::info!("Successfully launched Swarm");
466 Ok(())
467 }
468
469 pub fn dir(&self) -> &Path {
472 self.dir.as_ref()
473 }
474
475 pub fn config(&self) -> &NetworkConfig {
477 &self.network_config
478 }
479
480 pub fn config_mut(&mut self) -> &mut NetworkConfig {
484 &mut self.network_config
485 }
486
487 pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
488 self.nodes.values()
489 }
490
491 pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
492 self.nodes.get(name)
493 }
494
495 pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
496 self.nodes.get_mut(name)
497 }
498
499 pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
504 self.nodes
505 .values()
506 .filter(|node| node.config().consensus_config.is_some())
507 }
508
509 pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
510 self.validator_nodes()
511 .map(|node| node.get_node_handle().unwrap())
512 .collect()
513 }
514
515 pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
517 self.validator_nodes().filter(|node| {
518 node.get_node_handle().is_some_and(|handle| {
519 let state = handle.state();
520 state.is_validator(&state.epoch_store_for_testing())
521 })
522 })
523 }
524
525 pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
527 self.nodes
528 .values()
529 .filter(|node| node.config().consensus_config.is_none())
530 }
531
532 pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
533 let name = config.authority_public_key();
534 let node = Node::new(config);
535 node.start().await.unwrap();
536 let handle = node.get_node_handle().unwrap();
537 self.nodes.insert(name, node);
538 handle
539 }
540
541 pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
542 self.fullnode_config_builder.clone()
543 }
544}
545
546#[derive(Debug)]
547enum SwarmDirectory {
548 Persistent(PathBuf),
549 Temporary(TempDir),
550}
551
552impl SwarmDirectory {
553 fn new_temporary() -> Self {
554 SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
555 }
556}
557
558impl ops::Deref for SwarmDirectory {
559 type Target = Path;
560
561 fn deref(&self) -> &Self::Target {
562 match self {
563 SwarmDirectory::Persistent(dir) => dir.deref(),
564 SwarmDirectory::Temporary(dir) => dir.path(),
565 }
566 }
567}
568
569impl AsRef<Path> for SwarmDirectory {
570 fn as_ref(&self) -> &Path {
571 match self {
572 SwarmDirectory::Persistent(dir) => dir.as_ref(),
573 SwarmDirectory::Temporary(dir) => dir.as_ref(),
574 }
575 }
576}
577
578#[cfg(test)]
579mod test {
580 use std::num::NonZeroUsize;
581
582 use super::Swarm;
583
584 #[tokio::test]
585 async fn launch() {
586 telemetry_subscribers::init_for_testing();
587 let mut swarm = Swarm::builder()
588 .committee_size(NonZeroUsize::new(4).unwrap())
589 .with_fullnode_count(1)
590 .build();
591
592 swarm.launch().await.unwrap();
593
594 for validator in swarm.validator_nodes() {
595 validator.health_check(true).await.unwrap();
596 }
597
598 for fullnode in swarm.fullnodes() {
599 fullnode.health_check(false).await.unwrap();
600 }
601
602 println!("hello");
603 }
604}