1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 num::NonZeroUsize,
9 ops,
10 path::{Path, PathBuf},
11 time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17 ExecutionCacheConfig, ExecutionCacheType, IOTA_GENESIS_FILENAME, NodeConfig,
18 node::{AuthorityOverloadConfig, DBCheckpointConfig, GrpcApiConfig, RunWithRange},
19 p2p::DiscoveryConfig,
20};
21use iota_macros::nondeterministic;
22use iota_names::config::IotaNamesConfig;
23use iota_node::IotaNodeHandle;
24use iota_protocol_config::{Chain, ProtocolVersion};
25use iota_swarm_config::{
26 genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
27 network_config::NetworkConfig,
28 network_config_builder::{
29 CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
30 SupportedProtocolVersionsCallback,
31 },
32 node_config_builder::FullnodeConfigBuilder,
33};
34use iota_types::{
35 base_types::AuthorityName,
36 object::Object,
37 supported_protocol_versions::SupportedProtocolVersions,
38 traffic_control::{PolicyConfig, RemoteFirewallConfig},
39};
40use rand::rngs::OsRng;
41use tempfile::TempDir;
42use tracing::info;
43
44use super::Node;
45
46pub struct SwarmBuilder<R = OsRng> {
47 rng: R,
48 dir: Option<PathBuf>,
50 committee: CommitteeConfig,
51 genesis_config: Option<GenesisConfig>,
52 network_config: Option<NetworkConfig>,
53 chain_override: Option<Chain>,
54 additional_objects: Vec<Object>,
55 fullnode_count: usize,
56 fullnode_rpc_port: Option<u16>,
57 fullnode_rpc_addr: Option<SocketAddr>,
58 supported_protocol_versions_config: ProtocolVersionsConfig,
59 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
61 db_checkpoint_config: DBCheckpointConfig,
62 jwk_fetch_interval: Option<Duration>,
63 num_unpruned_validators: Option<usize>,
64 authority_overload_config: Option<AuthorityOverloadConfig>,
65 execution_cache_type: Option<ExecutionCacheType>,
66 execution_cache_config: Option<ExecutionCacheConfig>,
67 data_ingestion_dir: Option<PathBuf>,
68 fullnode_run_with_range: Option<RunWithRange>,
69 fullnode_policy_config: Option<PolicyConfig>,
70 fullnode_fw_config: Option<RemoteFirewallConfig>,
71 max_submit_position: Option<usize>,
72 submit_delay_step_override_millis: Option<u64>,
73 state_accumulator_config: StateAccumulatorV1EnabledConfig,
74 disable_fullnode_pruning: bool,
75 iota_names_config: Option<IotaNamesConfig>,
76 fullnode_grpc_api_config: Option<GrpcApiConfig>,
77 disable_address_verification_cooldown: bool,
78}
79
80impl SwarmBuilder {
81 #[expect(clippy::new_without_default)]
82 pub fn new() -> Self {
83 Self {
84 rng: OsRng,
85 dir: None,
86 committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
87 genesis_config: None,
88 network_config: None,
89 chain_override: None,
90 additional_objects: vec![],
91 fullnode_count: 0,
92 fullnode_rpc_port: None,
93 fullnode_rpc_addr: None,
94 supported_protocol_versions_config: ProtocolVersionsConfig::Default,
95 fullnode_supported_protocol_versions_config: None,
96 db_checkpoint_config: DBCheckpointConfig::default(),
97 jwk_fetch_interval: None,
98 num_unpruned_validators: None,
99 authority_overload_config: None,
100 execution_cache_type: None,
101 execution_cache_config: None,
102 data_ingestion_dir: None,
103 fullnode_run_with_range: None,
104 fullnode_policy_config: None,
105 fullnode_fw_config: None,
106 max_submit_position: None,
107 submit_delay_step_override_millis: None,
108 state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
109 disable_fullnode_pruning: false,
110 iota_names_config: None,
111 fullnode_grpc_api_config: None,
112 disable_address_verification_cooldown: false,
113 }
114 }
115}
116
117impl<R> SwarmBuilder<R> {
118 pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
119 SwarmBuilder {
120 rng,
121 dir: self.dir,
122 committee: self.committee,
123 genesis_config: self.genesis_config,
124 network_config: self.network_config,
125 chain_override: self.chain_override,
126 additional_objects: self.additional_objects,
127 fullnode_count: self.fullnode_count,
128 fullnode_rpc_port: self.fullnode_rpc_port,
129 fullnode_rpc_addr: self.fullnode_rpc_addr,
130 supported_protocol_versions_config: self.supported_protocol_versions_config,
131 fullnode_supported_protocol_versions_config: self
132 .fullnode_supported_protocol_versions_config,
133 db_checkpoint_config: self.db_checkpoint_config,
134 jwk_fetch_interval: self.jwk_fetch_interval,
135 num_unpruned_validators: self.num_unpruned_validators,
136 authority_overload_config: self.authority_overload_config,
137 execution_cache_type: self.execution_cache_type,
138 execution_cache_config: self.execution_cache_config,
139 data_ingestion_dir: self.data_ingestion_dir,
140 fullnode_run_with_range: self.fullnode_run_with_range,
141 fullnode_policy_config: self.fullnode_policy_config,
142 fullnode_fw_config: self.fullnode_fw_config,
143 max_submit_position: self.max_submit_position,
144 submit_delay_step_override_millis: self.submit_delay_step_override_millis,
145 state_accumulator_config: self.state_accumulator_config,
146 disable_fullnode_pruning: self.disable_fullnode_pruning,
147 iota_names_config: self.iota_names_config,
148 fullnode_grpc_api_config: self.fullnode_grpc_api_config,
149 disable_address_verification_cooldown: self.disable_address_verification_cooldown,
150 }
151 }
152
153 pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
161 self.dir = Some(dir.into());
162 self
163 }
164
165 pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
169 self.committee = CommitteeConfig::Size(committee_size);
170 self
171 }
172
173 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
174 self.committee = CommitteeConfig::Validators(validators);
175 self
176 }
177
178 pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
179 assert!(self.network_config.is_none() && self.genesis_config.is_none());
180 self.genesis_config = Some(genesis_config);
181 self
182 }
183
184 pub fn with_chain_override(mut self, chain: Chain) -> Self {
185 assert!(self.chain_override.is_none());
186 self.chain_override = Some(chain);
187 self
188 }
189
190 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
191 assert!(self.network_config.is_none());
192 self.num_unpruned_validators = Some(n);
193 self
194 }
195
196 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
197 self.jwk_fetch_interval = Some(i);
198 self
199 }
200
201 pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
202 assert!(self.network_config.is_none() && self.genesis_config.is_none());
203 self.network_config = Some(network_config);
204 self
205 }
206
207 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
208 self.get_or_init_genesis_config().accounts = accounts;
209 self
210 }
211
212 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
213 self.additional_objects.extend(objects);
214 self
215 }
216
217 pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
218 self.fullnode_count = fullnode_count;
219 self
220 }
221
222 pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
223 assert!(self.fullnode_rpc_addr.is_none());
224 self.fullnode_rpc_port = Some(fullnode_rpc_port);
225 self
226 }
227
228 pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
229 assert!(self.fullnode_rpc_port.is_none());
230 self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
231 self
232 }
233
234 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
235 self.get_or_init_genesis_config()
236 .parameters
237 .epoch_duration_ms = epoch_duration_ms;
238 self
239 }
240
241 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
242 self.get_or_init_genesis_config()
243 .parameters
244 .protocol_version = v;
245 self
246 }
247
248 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
249 self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
250 self
251 }
252
253 pub fn with_supported_protocol_version_callback(
254 mut self,
255 func: SupportedProtocolVersionsCallback,
256 ) -> Self {
257 self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
258 self
259 }
260
261 pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
262 self.supported_protocol_versions_config = c;
263 self
264 }
265
266 pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
267 self.state_accumulator_config = c;
268 self
269 }
270
271 pub fn with_fullnode_supported_protocol_versions_config(
272 mut self,
273 c: ProtocolVersionsConfig,
274 ) -> Self {
275 self.fullnode_supported_protocol_versions_config = Some(c);
276 self
277 }
278
279 pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
280 self.db_checkpoint_config = db_checkpoint_config;
281 self
282 }
283
284 pub fn with_authority_overload_config(
285 mut self,
286 authority_overload_config: AuthorityOverloadConfig,
287 ) -> Self {
288 assert!(self.network_config.is_none());
289 self.authority_overload_config = Some(authority_overload_config);
290 self
291 }
292
293 pub fn with_execution_cache_type(mut self, execution_cache_type: ExecutionCacheType) -> Self {
294 assert!(self.execution_cache_type.is_none());
295 self.execution_cache_type = Some(execution_cache_type);
296 self
297 }
298
299 pub fn with_execution_cache_config(
300 mut self,
301 execution_cache_config: ExecutionCacheConfig,
302 ) -> Self {
303 self.execution_cache_config = Some(execution_cache_config);
304 self
305 }
306
307 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
308 self.data_ingestion_dir = Some(path);
309 self
310 }
311
312 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
313 if let Some(run_with_range) = run_with_range {
314 self.fullnode_run_with_range = Some(run_with_range);
315 }
316 self
317 }
318
319 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
320 self.fullnode_policy_config = config;
321 self
322 }
323
324 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
325 self.fullnode_fw_config = config;
326 self
327 }
328
329 pub fn with_fullnode_grpc_api_config(mut self, config: GrpcApiConfig) -> Self {
330 self.fullnode_grpc_api_config = Some(config);
331 self
332 }
333
334 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
335 if self.genesis_config.is_none() {
336 assert!(self.network_config.is_none());
337 self.genesis_config = Some(GenesisConfig::for_local_testing());
338 }
339 self.genesis_config.as_mut().unwrap()
340 }
341
342 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
343 self.max_submit_position = Some(max_submit_position);
344 self
345 }
346
347 pub fn with_disable_fullnode_pruning(mut self) -> Self {
348 self.disable_fullnode_pruning = true;
349 self
350 }
351
352 pub fn with_submit_delay_step_override_millis(
353 mut self,
354 submit_delay_step_override_millis: u64,
355 ) -> Self {
356 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
357 self
358 }
359
360 pub fn with_iota_names_config(mut self, iota_names_config: IotaNamesConfig) -> Self {
361 self.iota_names_config = Some(iota_names_config);
362 self
363 }
364
365 pub fn with_disabled_address_verification_cooldown(mut self) -> Self {
369 self.disable_address_verification_cooldown = true;
370 self
371 }
372}
373
374impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
375 pub fn build(self) -> Swarm {
377 let dir = if let Some(dir) = self.dir {
378 SwarmDirectory::Persistent(dir)
379 } else {
380 SwarmDirectory::new_temporary()
381 };
382
383 let ingest_data = self.data_ingestion_dir.clone();
384
385 let mut network_config = self.network_config.unwrap_or_else(|| {
386 let mut config_builder = ConfigBuilder::new(dir.as_ref());
387
388 if let Some(genesis_config) = self.genesis_config {
389 config_builder = config_builder.with_genesis_config(genesis_config);
390 }
391
392 if let Some(chain_override) = self.chain_override {
393 config_builder = config_builder.with_chain_override(chain_override);
394 }
395
396 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
397 config_builder =
398 config_builder.with_num_unpruned_validators(num_unpruned_validators);
399 }
400
401 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
402 config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
403 }
404
405 if let Some(authority_overload_config) = self.authority_overload_config {
406 config_builder =
407 config_builder.with_authority_overload_config(authority_overload_config);
408 }
409
410 if let Some(execution_cache_type) = self.execution_cache_type {
411 config_builder = config_builder.with_execution_cache_type(execution_cache_type);
412 }
413
414 if let Some(execution_cache_config) = self.execution_cache_config {
415 config_builder = config_builder.with_execution_cache_config(execution_cache_config);
416 }
417
418 if let Some(path) = self.data_ingestion_dir {
419 config_builder = config_builder.with_data_ingestion_dir(path);
420 }
421
422 if let Some(max_submit_position) = self.max_submit_position {
423 config_builder = config_builder.with_max_submit_position(max_submit_position);
424 }
425
426 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
427 {
428 config_builder = config_builder
429 .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
430 }
431
432 let mut network_config = config_builder
433 .committee(self.committee)
434 .rng(self.rng)
435 .with_objects(self.additional_objects)
436 .with_empty_validator_genesis()
437 .with_supported_protocol_versions_config(
438 self.supported_protocol_versions_config.clone(),
439 )
440 .with_state_accumulator_config(self.state_accumulator_config.clone())
441 .build();
442 let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
444 network_config
445 .genesis
446 .save(&genesis_path)
447 .expect("genesis should be saved successfully");
448 for validator in &mut network_config.validator_configs {
449 validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
450 }
451 network_config
452 });
453
454 if self.disable_address_verification_cooldown {
455 for validator in &mut network_config.validator_configs {
456 if let Some(ref mut discovery_config) = validator.p2p_config.discovery {
457 discovery_config.address_verification_failure_cooldown_sec = Some(0);
458 } else {
459 validator.p2p_config.discovery = Some(DiscoveryConfig {
460 address_verification_failure_cooldown_sec: Some(0),
461 ..Default::default()
462 });
463 }
464 }
465 }
466
467 let mut nodes: HashMap<_, _> = network_config
468 .validator_configs()
469 .iter()
470 .map(|config| {
471 info!(
472 "SwarmBuilder configuring validator with name {}",
473 config.authority_public_key()
474 );
475 (config.authority_public_key(), Node::new(config.to_owned()))
476 })
477 .collect();
478
479 let mut fullnode_config_builder = FullnodeConfigBuilder::new()
480 .with_config_directory(dir.as_ref().into())
481 .with_db_checkpoint_config(self.db_checkpoint_config.clone())
482 .with_run_with_range(self.fullnode_run_with_range)
483 .with_policy_config(self.fullnode_policy_config)
484 .with_data_ingestion_dir(ingest_data)
485 .with_fw_config(self.fullnode_fw_config)
486 .with_disable_pruning(self.disable_fullnode_pruning)
487 .with_iota_names_config(self.iota_names_config);
488
489 if self.disable_address_verification_cooldown {
490 let discovery_config = DiscoveryConfig {
491 address_verification_failure_cooldown_sec: Some(0),
492 ..Default::default()
493 };
494
495 fullnode_config_builder =
496 fullnode_config_builder.with_discovery_config(discovery_config);
497 }
498
499 if let Some(chain) = self.chain_override {
500 fullnode_config_builder = fullnode_config_builder.with_chain_override(chain);
501 }
502
503 if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
504 let supported_versions = match spvc {
505 ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
506 ProtocolVersionsConfig::Global(v) => *v,
507 ProtocolVersionsConfig::PerValidator(func) => func(0, None),
508 };
509 fullnode_config_builder =
510 fullnode_config_builder.with_supported_protocol_versions(supported_versions);
511 }
512
513 if let Some(grpc_config) = &self.fullnode_grpc_api_config {
515 fullnode_config_builder =
516 fullnode_config_builder.with_grpc_api_config(grpc_config.clone());
517 }
518
519 if self.fullnode_count > 0 {
520 (0..self.fullnode_count).for_each(|idx| {
521 let mut builder = fullnode_config_builder.clone();
522 if idx == 0 {
523 if let Some(rpc_addr) = self.fullnode_rpc_addr {
526 builder = builder.with_rpc_addr(rpc_addr);
527 }
528 if let Some(rpc_port) = self.fullnode_rpc_port {
529 builder = builder.with_rpc_port(rpc_port);
530 }
531 }
532 let config = builder.build(&mut OsRng, &network_config);
533 info!(
534 "SwarmBuilder configuring full node with name {}",
535 config.authority_public_key()
536 );
537 nodes.insert(config.authority_public_key(), Node::new(config));
538 });
539 }
540 Swarm {
541 dir,
542 network_config,
543 nodes,
544 fullnode_config_builder,
545 }
546 }
547}
548
549#[derive(Debug)]
551pub struct Swarm {
552 dir: SwarmDirectory,
553 network_config: NetworkConfig,
554 nodes: HashMap<AuthorityName, Node>,
555 fullnode_config_builder: FullnodeConfigBuilder,
557}
558
559impl Drop for Swarm {
560 fn drop(&mut self) {
561 self.nodes_iter_mut().for_each(|node| node.stop());
562 }
563}
564
565impl Swarm {
566 fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
567 self.nodes.values_mut()
568 }
569
570 pub fn builder() -> SwarmBuilder {
572 SwarmBuilder::new()
573 }
574
575 pub async fn launch(&mut self) -> Result<()> {
577 try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
578 tracing::info!("Successfully launched Swarm");
579 Ok(())
580 }
581
582 pub fn dir(&self) -> &Path {
585 self.dir.as_ref()
586 }
587
588 pub fn config(&self) -> &NetworkConfig {
590 &self.network_config
591 }
592
593 pub fn config_mut(&mut self) -> &mut NetworkConfig {
597 &mut self.network_config
598 }
599
600 pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
601 self.nodes.values()
602 }
603
604 pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
605 self.nodes.get(name)
606 }
607
608 pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
609 self.nodes.get_mut(name)
610 }
611
612 pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
617 self.nodes
618 .values()
619 .filter(|node| node.config().consensus_config.is_some())
620 }
621
622 pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
623 self.validator_nodes()
624 .map(|node| node.get_node_handle().unwrap())
625 .collect()
626 }
627
628 pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
630 self.validator_nodes().filter(|node| {
631 node.get_node_handle().is_some_and(|handle| {
632 let state = handle.state();
633 state.is_active_validator(&state.epoch_store_for_testing())
634 })
635 })
636 }
637
638 pub fn committee_validators(&self) -> impl Iterator<Item = &Node> {
640 self.validator_nodes().filter(|node| {
641 node.get_node_handle().is_some_and(|handle| {
642 let state = handle.state();
643 state.is_committee_validator(&state.epoch_store_for_testing())
644 })
645 })
646 }
647
648 pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
650 self.nodes
651 .values()
652 .filter(|node| node.config().consensus_config.is_none())
653 }
654
655 pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
656 let name = config.authority_public_key();
657 let node = Node::new(config);
658 node.start().await.unwrap();
659 let handle = node.get_node_handle().unwrap();
660 self.nodes.insert(name, node);
661 handle
662 }
663
664 pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
665 self.fullnode_config_builder.clone()
666 }
667}
668
669#[derive(Debug)]
670enum SwarmDirectory {
671 Persistent(PathBuf),
672 Temporary(TempDir),
673}
674
675impl SwarmDirectory {
676 fn new_temporary() -> Self {
677 SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
678 }
679}
680
681impl ops::Deref for SwarmDirectory {
682 type Target = Path;
683
684 fn deref(&self) -> &Self::Target {
685 match self {
686 SwarmDirectory::Persistent(dir) => dir.deref(),
687 SwarmDirectory::Temporary(dir) => dir.path(),
688 }
689 }
690}
691
692impl AsRef<Path> for SwarmDirectory {
693 fn as_ref(&self) -> &Path {
694 match self {
695 SwarmDirectory::Persistent(dir) => dir.as_ref(),
696 SwarmDirectory::Temporary(dir) => dir.as_ref(),
697 }
698 }
699}
700
701#[cfg(test)]
702mod test {
703 use std::num::NonZeroUsize;
704
705 use super::Swarm;
706
707 #[tokio::test]
708 async fn launch() {
709 telemetry_subscribers::init_for_testing();
710 let mut swarm = Swarm::builder()
711 .committee_size(NonZeroUsize::new(4).unwrap())
712 .with_fullnode_count(1)
713 .build();
714
715 swarm.launch().await.unwrap();
716
717 for validator in swarm.validator_nodes() {
718 validator.health_check(true).await.unwrap();
719 }
720
721 for fullnode in swarm.fullnodes() {
722 fullnode.health_check(false).await.unwrap();
723 }
724
725 println!("hello");
726 }
727}