1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 num::NonZeroUsize,
9 ops,
10 path::{Path, PathBuf},
11 time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17 ExecutionCacheConfig, ExecutionCacheType, IOTA_GENESIS_FILENAME, NodeConfig,
18 node::{AuthorityOverloadConfig, DBCheckpointConfig, GrpcApiConfig, RunWithRange},
19 p2p::DiscoveryConfig,
20};
21use iota_macros::nondeterministic;
22use iota_names::config::IotaNamesConfig;
23use iota_node::IotaNodeHandle;
24use iota_protocol_config::ProtocolVersion;
25use iota_swarm_config::{
26 genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
27 network_config::NetworkConfig,
28 network_config_builder::{
29 CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
30 SupportedProtocolVersionsCallback,
31 },
32 node_config_builder::FullnodeConfigBuilder,
33};
34use iota_types::{
35 base_types::AuthorityName,
36 object::Object,
37 supported_protocol_versions::SupportedProtocolVersions,
38 traffic_control::{PolicyConfig, RemoteFirewallConfig},
39};
40use rand::rngs::OsRng;
41use tempfile::TempDir;
42use tracing::info;
43
44use super::Node;
45
46pub struct SwarmBuilder<R = OsRng> {
47 rng: R,
48 dir: Option<PathBuf>,
50 committee: CommitteeConfig,
51 genesis_config: Option<GenesisConfig>,
52 network_config: Option<NetworkConfig>,
53 additional_objects: Vec<Object>,
54 fullnode_count: usize,
55 fullnode_rpc_port: Option<u16>,
56 fullnode_rpc_addr: Option<SocketAddr>,
57 supported_protocol_versions_config: ProtocolVersionsConfig,
58 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
60 db_checkpoint_config: DBCheckpointConfig,
61 jwk_fetch_interval: Option<Duration>,
62 num_unpruned_validators: Option<usize>,
63 authority_overload_config: Option<AuthorityOverloadConfig>,
64 execution_cache_type: Option<ExecutionCacheType>,
65 execution_cache_config: Option<ExecutionCacheConfig>,
66 data_ingestion_dir: Option<PathBuf>,
67 fullnode_run_with_range: Option<RunWithRange>,
68 fullnode_policy_config: Option<PolicyConfig>,
69 fullnode_fw_config: Option<RemoteFirewallConfig>,
70 max_submit_position: Option<usize>,
71 submit_delay_step_override_millis: Option<u64>,
72 state_accumulator_config: StateAccumulatorV1EnabledConfig,
73 disable_fullnode_pruning: bool,
74 iota_names_config: Option<IotaNamesConfig>,
75 fullnode_grpc_api_config: Option<GrpcApiConfig>,
76 disable_address_verification_cooldown: bool,
77}
78
79impl SwarmBuilder {
80 #[expect(clippy::new_without_default)]
81 pub fn new() -> Self {
82 Self {
83 rng: OsRng,
84 dir: None,
85 committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
86 genesis_config: None,
87 network_config: None,
88 additional_objects: vec![],
89 fullnode_count: 0,
90 fullnode_rpc_port: None,
91 fullnode_rpc_addr: None,
92 supported_protocol_versions_config: ProtocolVersionsConfig::Default,
93 fullnode_supported_protocol_versions_config: None,
94 db_checkpoint_config: DBCheckpointConfig::default(),
95 jwk_fetch_interval: None,
96 num_unpruned_validators: None,
97 authority_overload_config: None,
98 execution_cache_type: None,
99 execution_cache_config: None,
100 data_ingestion_dir: None,
101 fullnode_run_with_range: None,
102 fullnode_policy_config: None,
103 fullnode_fw_config: None,
104 max_submit_position: None,
105 submit_delay_step_override_millis: None,
106 state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
107 disable_fullnode_pruning: false,
108 iota_names_config: None,
109 fullnode_grpc_api_config: None,
110 disable_address_verification_cooldown: false,
111 }
112 }
113}
114
115impl<R> SwarmBuilder<R> {
116 pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
117 SwarmBuilder {
118 rng,
119 dir: self.dir,
120 committee: self.committee,
121 genesis_config: self.genesis_config,
122 network_config: self.network_config,
123 additional_objects: self.additional_objects,
124 fullnode_count: self.fullnode_count,
125 fullnode_rpc_port: self.fullnode_rpc_port,
126 fullnode_rpc_addr: self.fullnode_rpc_addr,
127 supported_protocol_versions_config: self.supported_protocol_versions_config,
128 fullnode_supported_protocol_versions_config: self
129 .fullnode_supported_protocol_versions_config,
130 db_checkpoint_config: self.db_checkpoint_config,
131 jwk_fetch_interval: self.jwk_fetch_interval,
132 num_unpruned_validators: self.num_unpruned_validators,
133 authority_overload_config: self.authority_overload_config,
134 execution_cache_type: self.execution_cache_type,
135 execution_cache_config: self.execution_cache_config,
136 data_ingestion_dir: self.data_ingestion_dir,
137 fullnode_run_with_range: self.fullnode_run_with_range,
138 fullnode_policy_config: self.fullnode_policy_config,
139 fullnode_fw_config: self.fullnode_fw_config,
140 max_submit_position: self.max_submit_position,
141 submit_delay_step_override_millis: self.submit_delay_step_override_millis,
142 state_accumulator_config: self.state_accumulator_config,
143 disable_fullnode_pruning: self.disable_fullnode_pruning,
144 iota_names_config: self.iota_names_config,
145 fullnode_grpc_api_config: self.fullnode_grpc_api_config,
146 disable_address_verification_cooldown: self.disable_address_verification_cooldown,
147 }
148 }
149
150 pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
158 self.dir = Some(dir.into());
159 self
160 }
161
162 pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
166 self.committee = CommitteeConfig::Size(committee_size);
167 self
168 }
169
170 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
171 self.committee = CommitteeConfig::Validators(validators);
172 self
173 }
174
175 pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
176 assert!(self.network_config.is_none() && self.genesis_config.is_none());
177 self.genesis_config = Some(genesis_config);
178 self
179 }
180
181 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
182 assert!(self.network_config.is_none());
183 self.num_unpruned_validators = Some(n);
184 self
185 }
186
187 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
188 self.jwk_fetch_interval = Some(i);
189 self
190 }
191
192 pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
193 assert!(self.network_config.is_none() && self.genesis_config.is_none());
194 self.network_config = Some(network_config);
195 self
196 }
197
198 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
199 self.get_or_init_genesis_config().accounts = accounts;
200 self
201 }
202
203 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
204 self.additional_objects.extend(objects);
205 self
206 }
207
208 pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
209 self.fullnode_count = fullnode_count;
210 self
211 }
212
213 pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
214 assert!(self.fullnode_rpc_addr.is_none());
215 self.fullnode_rpc_port = Some(fullnode_rpc_port);
216 self
217 }
218
219 pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
220 assert!(self.fullnode_rpc_port.is_none());
221 self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
222 self
223 }
224
225 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
226 self.get_or_init_genesis_config()
227 .parameters
228 .epoch_duration_ms = epoch_duration_ms;
229 self
230 }
231
232 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
233 self.get_or_init_genesis_config()
234 .parameters
235 .protocol_version = v;
236 self
237 }
238
239 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
240 self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
241 self
242 }
243
244 pub fn with_supported_protocol_version_callback(
245 mut self,
246 func: SupportedProtocolVersionsCallback,
247 ) -> Self {
248 self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
249 self
250 }
251
252 pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
253 self.supported_protocol_versions_config = c;
254 self
255 }
256
257 pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
258 self.state_accumulator_config = c;
259 self
260 }
261
262 pub fn with_fullnode_supported_protocol_versions_config(
263 mut self,
264 c: ProtocolVersionsConfig,
265 ) -> Self {
266 self.fullnode_supported_protocol_versions_config = Some(c);
267 self
268 }
269
270 pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
271 self.db_checkpoint_config = db_checkpoint_config;
272 self
273 }
274
275 pub fn with_authority_overload_config(
276 mut self,
277 authority_overload_config: AuthorityOverloadConfig,
278 ) -> Self {
279 assert!(self.network_config.is_none());
280 self.authority_overload_config = Some(authority_overload_config);
281 self
282 }
283
284 pub fn with_execution_cache_type(mut self, execution_cache_type: ExecutionCacheType) -> Self {
285 assert!(self.execution_cache_type.is_none());
286 self.execution_cache_type = Some(execution_cache_type);
287 self
288 }
289
290 pub fn with_execution_cache_config(
291 mut self,
292 execution_cache_config: ExecutionCacheConfig,
293 ) -> Self {
294 self.execution_cache_config = Some(execution_cache_config);
295 self
296 }
297
298 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
299 self.data_ingestion_dir = Some(path);
300 self
301 }
302
303 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
304 if let Some(run_with_range) = run_with_range {
305 self.fullnode_run_with_range = Some(run_with_range);
306 }
307 self
308 }
309
310 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
311 self.fullnode_policy_config = config;
312 self
313 }
314
315 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
316 self.fullnode_fw_config = config;
317 self
318 }
319
320 pub fn with_fullnode_grpc_api_config(mut self, config: GrpcApiConfig) -> Self {
321 self.fullnode_grpc_api_config = Some(config);
322 self
323 }
324
325 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
326 if self.genesis_config.is_none() {
327 assert!(self.network_config.is_none());
328 self.genesis_config = Some(GenesisConfig::for_local_testing());
329 }
330 self.genesis_config.as_mut().unwrap()
331 }
332
333 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
334 self.max_submit_position = Some(max_submit_position);
335 self
336 }
337
338 pub fn with_disable_fullnode_pruning(mut self) -> Self {
339 self.disable_fullnode_pruning = true;
340 self
341 }
342
343 pub fn with_submit_delay_step_override_millis(
344 mut self,
345 submit_delay_step_override_millis: u64,
346 ) -> Self {
347 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
348 self
349 }
350
351 pub fn with_iota_names_config(mut self, iota_names_config: IotaNamesConfig) -> Self {
352 self.iota_names_config = Some(iota_names_config);
353 self
354 }
355
356 pub fn with_disabled_address_verification_cooldown(mut self) -> Self {
360 self.disable_address_verification_cooldown = true;
361 self
362 }
363}
364
365impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
366 pub fn build(self) -> Swarm {
368 let dir = if let Some(dir) = self.dir {
369 SwarmDirectory::Persistent(dir)
370 } else {
371 SwarmDirectory::new_temporary()
372 };
373
374 let ingest_data = self.data_ingestion_dir.clone();
375
376 let mut network_config = self.network_config.unwrap_or_else(|| {
377 let mut config_builder = ConfigBuilder::new(dir.as_ref());
378
379 if let Some(genesis_config) = self.genesis_config {
380 config_builder = config_builder.with_genesis_config(genesis_config);
381 }
382
383 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
384 config_builder =
385 config_builder.with_num_unpruned_validators(num_unpruned_validators);
386 }
387
388 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
389 config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
390 }
391
392 if let Some(authority_overload_config) = self.authority_overload_config {
393 config_builder =
394 config_builder.with_authority_overload_config(authority_overload_config);
395 }
396
397 if let Some(execution_cache_type) = self.execution_cache_type {
398 config_builder = config_builder.with_execution_cache_type(execution_cache_type);
399 }
400
401 if let Some(execution_cache_config) = self.execution_cache_config {
402 config_builder = config_builder.with_execution_cache_config(execution_cache_config);
403 }
404
405 if let Some(path) = self.data_ingestion_dir {
406 config_builder = config_builder.with_data_ingestion_dir(path);
407 }
408
409 if let Some(max_submit_position) = self.max_submit_position {
410 config_builder = config_builder.with_max_submit_position(max_submit_position);
411 }
412
413 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
414 {
415 config_builder = config_builder
416 .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
417 }
418
419 let mut network_config = config_builder
420 .committee(self.committee)
421 .rng(self.rng)
422 .with_objects(self.additional_objects)
423 .with_empty_validator_genesis()
424 .with_supported_protocol_versions_config(
425 self.supported_protocol_versions_config.clone(),
426 )
427 .with_state_accumulator_config(self.state_accumulator_config.clone())
428 .build();
429 let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
431 network_config
432 .genesis
433 .save(&genesis_path)
434 .expect("genesis should be saved successfully");
435 for validator in &mut network_config.validator_configs {
436 validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
437 }
438 network_config
439 });
440
441 if self.disable_address_verification_cooldown {
442 for validator in &mut network_config.validator_configs {
443 if let Some(ref mut discovery_config) = validator.p2p_config.discovery {
444 discovery_config.address_verification_failure_cooldown_sec = Some(0);
445 } else {
446 validator.p2p_config.discovery = Some(DiscoveryConfig {
447 address_verification_failure_cooldown_sec: Some(0),
448 ..Default::default()
449 });
450 }
451 }
452 }
453
454 let mut nodes: HashMap<_, _> = network_config
455 .validator_configs()
456 .iter()
457 .map(|config| {
458 info!(
459 "SwarmBuilder configuring validator with name {}",
460 config.authority_public_key()
461 );
462 (config.authority_public_key(), Node::new(config.to_owned()))
463 })
464 .collect();
465
466 let mut fullnode_config_builder = FullnodeConfigBuilder::new()
467 .with_config_directory(dir.as_ref().into())
468 .with_db_checkpoint_config(self.db_checkpoint_config.clone())
469 .with_run_with_range(self.fullnode_run_with_range)
470 .with_policy_config(self.fullnode_policy_config)
471 .with_data_ingestion_dir(ingest_data)
472 .with_fw_config(self.fullnode_fw_config)
473 .with_disable_pruning(self.disable_fullnode_pruning)
474 .with_iota_names_config(self.iota_names_config);
475
476 if self.disable_address_verification_cooldown {
477 let discovery_config = DiscoveryConfig {
478 address_verification_failure_cooldown_sec: Some(0),
479 ..Default::default()
480 };
481
482 fullnode_config_builder =
483 fullnode_config_builder.with_discovery_config(discovery_config);
484 }
485
486 if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
487 let supported_versions = match spvc {
488 ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
489 ProtocolVersionsConfig::Global(v) => *v,
490 ProtocolVersionsConfig::PerValidator(func) => func(0, None),
491 };
492 fullnode_config_builder =
493 fullnode_config_builder.with_supported_protocol_versions(supported_versions);
494 }
495
496 if let Some(grpc_config) = &self.fullnode_grpc_api_config {
498 fullnode_config_builder =
499 fullnode_config_builder.with_grpc_api_config(grpc_config.clone());
500 }
501
502 if self.fullnode_count > 0 {
503 (0..self.fullnode_count).for_each(|idx| {
504 let mut builder = fullnode_config_builder.clone();
505 if idx == 0 {
506 if let Some(rpc_addr) = self.fullnode_rpc_addr {
509 builder = builder.with_rpc_addr(rpc_addr);
510 }
511 if let Some(rpc_port) = self.fullnode_rpc_port {
512 builder = builder.with_rpc_port(rpc_port);
513 }
514 }
515 let config = builder.build(&mut OsRng, &network_config);
516 info!(
517 "SwarmBuilder configuring full node with name {}",
518 config.authority_public_key()
519 );
520 nodes.insert(config.authority_public_key(), Node::new(config));
521 });
522 }
523 Swarm {
524 dir,
525 network_config,
526 nodes,
527 fullnode_config_builder,
528 }
529 }
530}
531
532#[derive(Debug)]
534pub struct Swarm {
535 dir: SwarmDirectory,
536 network_config: NetworkConfig,
537 nodes: HashMap<AuthorityName, Node>,
538 fullnode_config_builder: FullnodeConfigBuilder,
540}
541
542impl Drop for Swarm {
543 fn drop(&mut self) {
544 self.nodes_iter_mut().for_each(|node| node.stop());
545 }
546}
547
548impl Swarm {
549 fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
550 self.nodes.values_mut()
551 }
552
553 pub fn builder() -> SwarmBuilder {
555 SwarmBuilder::new()
556 }
557
558 pub async fn launch(&mut self) -> Result<()> {
560 try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
561 tracing::info!("Successfully launched Swarm");
562 Ok(())
563 }
564
565 pub fn dir(&self) -> &Path {
568 self.dir.as_ref()
569 }
570
571 pub fn config(&self) -> &NetworkConfig {
573 &self.network_config
574 }
575
576 pub fn config_mut(&mut self) -> &mut NetworkConfig {
580 &mut self.network_config
581 }
582
583 pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
584 self.nodes.values()
585 }
586
587 pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
588 self.nodes.get(name)
589 }
590
591 pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
592 self.nodes.get_mut(name)
593 }
594
595 pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
600 self.nodes
601 .values()
602 .filter(|node| node.config().consensus_config.is_some())
603 }
604
605 pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
606 self.validator_nodes()
607 .map(|node| node.get_node_handle().unwrap())
608 .collect()
609 }
610
611 pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
613 self.validator_nodes().filter(|node| {
614 node.get_node_handle().is_some_and(|handle| {
615 let state = handle.state();
616 state.is_active_validator(&state.epoch_store_for_testing())
617 })
618 })
619 }
620
621 pub fn committee_validators(&self) -> impl Iterator<Item = &Node> {
623 self.validator_nodes().filter(|node| {
624 node.get_node_handle().is_some_and(|handle| {
625 let state = handle.state();
626 state.is_committee_validator(&state.epoch_store_for_testing())
627 })
628 })
629 }
630
631 pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
633 self.nodes
634 .values()
635 .filter(|node| node.config().consensus_config.is_none())
636 }
637
638 pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
639 let name = config.authority_public_key();
640 let node = Node::new(config);
641 node.start().await.unwrap();
642 let handle = node.get_node_handle().unwrap();
643 self.nodes.insert(name, node);
644 handle
645 }
646
647 pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
648 self.fullnode_config_builder.clone()
649 }
650}
651
652#[derive(Debug)]
653enum SwarmDirectory {
654 Persistent(PathBuf),
655 Temporary(TempDir),
656}
657
658impl SwarmDirectory {
659 fn new_temporary() -> Self {
660 SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
661 }
662}
663
664impl ops::Deref for SwarmDirectory {
665 type Target = Path;
666
667 fn deref(&self) -> &Self::Target {
668 match self {
669 SwarmDirectory::Persistent(dir) => dir.deref(),
670 SwarmDirectory::Temporary(dir) => dir.path(),
671 }
672 }
673}
674
675impl AsRef<Path> for SwarmDirectory {
676 fn as_ref(&self) -> &Path {
677 match self {
678 SwarmDirectory::Persistent(dir) => dir.as_ref(),
679 SwarmDirectory::Temporary(dir) => dir.as_ref(),
680 }
681 }
682}
683
684#[cfg(test)]
685mod test {
686 use std::num::NonZeroUsize;
687
688 use super::Swarm;
689
690 #[tokio::test]
691 async fn launch() {
692 telemetry_subscribers::init_for_testing();
693 let mut swarm = Swarm::builder()
694 .committee_size(NonZeroUsize::new(4).unwrap())
695 .with_fullnode_count(1)
696 .build();
697
698 swarm.launch().await.unwrap();
699
700 for validator in swarm.validator_nodes() {
701 validator.health_check(true).await.unwrap();
702 }
703
704 for fullnode in swarm.fullnodes() {
705 fullnode.health_check(false).await.unwrap();
706 }
707
708 println!("hello");
709 }
710}