1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 num::NonZeroUsize,
9 ops,
10 path::{Path, PathBuf},
11 time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17 ExecutionCacheConfig, ExecutionCacheType, IOTA_GENESIS_FILENAME, NodeConfig,
18 node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange},
19};
20use iota_macros::nondeterministic;
21use iota_names::config::IotaNamesConfig;
22use iota_node::IotaNodeHandle;
23use iota_protocol_config::ProtocolVersion;
24use iota_swarm_config::{
25 genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
26 network_config::NetworkConfig,
27 network_config_builder::{
28 CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
29 SupportedProtocolVersionsCallback,
30 },
31 node_config_builder::FullnodeConfigBuilder,
32};
33use iota_types::{
34 base_types::AuthorityName,
35 object::Object,
36 supported_protocol_versions::SupportedProtocolVersions,
37 traffic_control::{PolicyConfig, RemoteFirewallConfig},
38};
39use rand::rngs::OsRng;
40use tempfile::TempDir;
41use tracing::info;
42
43use super::Node;
44
45pub struct SwarmBuilder<R = OsRng> {
46 rng: R,
47 dir: Option<PathBuf>,
49 committee: CommitteeConfig,
50 genesis_config: Option<GenesisConfig>,
51 network_config: Option<NetworkConfig>,
52 additional_objects: Vec<Object>,
53 fullnode_count: usize,
54 fullnode_rpc_port: Option<u16>,
55 fullnode_rpc_addr: Option<SocketAddr>,
56 supported_protocol_versions_config: ProtocolVersionsConfig,
57 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
59 db_checkpoint_config: DBCheckpointConfig,
60 jwk_fetch_interval: Option<Duration>,
61 num_unpruned_validators: Option<usize>,
62 authority_overload_config: Option<AuthorityOverloadConfig>,
63 execution_cache_type: Option<ExecutionCacheType>,
64 execution_cache_config: Option<ExecutionCacheConfig>,
65 data_ingestion_dir: Option<PathBuf>,
66 fullnode_run_with_range: Option<RunWithRange>,
67 fullnode_policy_config: Option<PolicyConfig>,
68 fullnode_fw_config: Option<RemoteFirewallConfig>,
69 max_submit_position: Option<usize>,
70 submit_delay_step_override_millis: Option<u64>,
71 state_accumulator_config: StateAccumulatorV1EnabledConfig,
72 disable_fullnode_pruning: bool,
73 iota_names_config: Option<IotaNamesConfig>,
74}
75
76impl SwarmBuilder {
77 #[expect(clippy::new_without_default)]
78 pub fn new() -> Self {
79 Self {
80 rng: OsRng,
81 dir: None,
82 committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
83 genesis_config: None,
84 network_config: None,
85 additional_objects: vec![],
86 fullnode_count: 0,
87 fullnode_rpc_port: None,
88 fullnode_rpc_addr: None,
89 supported_protocol_versions_config: ProtocolVersionsConfig::Default,
90 fullnode_supported_protocol_versions_config: None,
91 db_checkpoint_config: DBCheckpointConfig::default(),
92 jwk_fetch_interval: None,
93 num_unpruned_validators: None,
94 authority_overload_config: None,
95 execution_cache_type: None,
96 execution_cache_config: None,
97 data_ingestion_dir: None,
98 fullnode_run_with_range: None,
99 fullnode_policy_config: None,
100 fullnode_fw_config: None,
101 max_submit_position: None,
102 submit_delay_step_override_millis: None,
103 state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
104 disable_fullnode_pruning: false,
105 iota_names_config: None,
106 }
107 }
108}
109
110impl<R> SwarmBuilder<R> {
111 pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
112 SwarmBuilder {
113 rng,
114 dir: self.dir,
115 committee: self.committee,
116 genesis_config: self.genesis_config,
117 network_config: self.network_config,
118 additional_objects: self.additional_objects,
119 fullnode_count: self.fullnode_count,
120 fullnode_rpc_port: self.fullnode_rpc_port,
121 fullnode_rpc_addr: self.fullnode_rpc_addr,
122 supported_protocol_versions_config: self.supported_protocol_versions_config,
123 fullnode_supported_protocol_versions_config: self
124 .fullnode_supported_protocol_versions_config,
125 db_checkpoint_config: self.db_checkpoint_config,
126 jwk_fetch_interval: self.jwk_fetch_interval,
127 num_unpruned_validators: self.num_unpruned_validators,
128 authority_overload_config: self.authority_overload_config,
129 execution_cache_type: self.execution_cache_type,
130 execution_cache_config: self.execution_cache_config,
131 data_ingestion_dir: self.data_ingestion_dir,
132 fullnode_run_with_range: self.fullnode_run_with_range,
133 fullnode_policy_config: self.fullnode_policy_config,
134 fullnode_fw_config: self.fullnode_fw_config,
135 max_submit_position: self.max_submit_position,
136 submit_delay_step_override_millis: self.submit_delay_step_override_millis,
137 state_accumulator_config: self.state_accumulator_config,
138 disable_fullnode_pruning: self.disable_fullnode_pruning,
139 iota_names_config: self.iota_names_config,
140 }
141 }
142
143 pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
151 self.dir = Some(dir.into());
152 self
153 }
154
155 pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
159 self.committee = CommitteeConfig::Size(committee_size);
160 self
161 }
162
163 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
164 self.committee = CommitteeConfig::Validators(validators);
165 self
166 }
167
168 pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
169 assert!(self.network_config.is_none() && self.genesis_config.is_none());
170 self.genesis_config = Some(genesis_config);
171 self
172 }
173
174 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
175 assert!(self.network_config.is_none());
176 self.num_unpruned_validators = Some(n);
177 self
178 }
179
180 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
181 self.jwk_fetch_interval = Some(i);
182 self
183 }
184
185 pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
186 assert!(self.network_config.is_none() && self.genesis_config.is_none());
187 self.network_config = Some(network_config);
188 self
189 }
190
191 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
192 self.get_or_init_genesis_config().accounts = accounts;
193 self
194 }
195
196 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
197 self.additional_objects.extend(objects);
198 self
199 }
200
201 pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
202 self.fullnode_count = fullnode_count;
203 self
204 }
205
206 pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
207 assert!(self.fullnode_rpc_addr.is_none());
208 self.fullnode_rpc_port = Some(fullnode_rpc_port);
209 self
210 }
211
212 pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
213 assert!(self.fullnode_rpc_port.is_none());
214 self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
215 self
216 }
217
218 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
219 self.get_or_init_genesis_config()
220 .parameters
221 .epoch_duration_ms = epoch_duration_ms;
222 self
223 }
224
225 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
226 self.get_or_init_genesis_config()
227 .parameters
228 .protocol_version = v;
229 self
230 }
231
232 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
233 self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
234 self
235 }
236
237 pub fn with_supported_protocol_version_callback(
238 mut self,
239 func: SupportedProtocolVersionsCallback,
240 ) -> Self {
241 self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
242 self
243 }
244
245 pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
246 self.supported_protocol_versions_config = c;
247 self
248 }
249
250 pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
251 self.state_accumulator_config = c;
252 self
253 }
254
255 pub fn with_fullnode_supported_protocol_versions_config(
256 mut self,
257 c: ProtocolVersionsConfig,
258 ) -> Self {
259 self.fullnode_supported_protocol_versions_config = Some(c);
260 self
261 }
262
263 pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
264 self.db_checkpoint_config = db_checkpoint_config;
265 self
266 }
267
268 pub fn with_authority_overload_config(
269 mut self,
270 authority_overload_config: AuthorityOverloadConfig,
271 ) -> Self {
272 assert!(self.network_config.is_none());
273 self.authority_overload_config = Some(authority_overload_config);
274 self
275 }
276
277 pub fn with_execution_cache_type(mut self, execution_cache_type: ExecutionCacheType) -> Self {
278 assert!(self.execution_cache_type.is_none());
279 self.execution_cache_type = Some(execution_cache_type);
280 self
281 }
282
283 pub fn with_execution_cache_config(
284 mut self,
285 execution_cache_config: ExecutionCacheConfig,
286 ) -> Self {
287 self.execution_cache_config = Some(execution_cache_config);
288 self
289 }
290
291 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
292 self.data_ingestion_dir = Some(path);
293 self
294 }
295
296 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
297 if let Some(run_with_range) = run_with_range {
298 self.fullnode_run_with_range = Some(run_with_range);
299 }
300 self
301 }
302
303 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
304 self.fullnode_policy_config = config;
305 self
306 }
307
308 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
309 self.fullnode_fw_config = config;
310 self
311 }
312
313 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
314 if self.genesis_config.is_none() {
315 assert!(self.network_config.is_none());
316 self.genesis_config = Some(GenesisConfig::for_local_testing());
317 }
318 self.genesis_config.as_mut().unwrap()
319 }
320
321 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
322 self.max_submit_position = Some(max_submit_position);
323 self
324 }
325
326 pub fn with_disable_fullnode_pruning(mut self) -> Self {
327 self.disable_fullnode_pruning = true;
328 self
329 }
330
331 pub fn with_submit_delay_step_override_millis(
332 mut self,
333 submit_delay_step_override_millis: u64,
334 ) -> Self {
335 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
336 self
337 }
338
339 pub fn with_iota_names_config(mut self, iota_names_config: IotaNamesConfig) -> Self {
340 self.iota_names_config = Some(iota_names_config);
341 self
342 }
343}
344
345impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
346 pub fn build(self) -> Swarm {
348 let dir = if let Some(dir) = self.dir {
349 SwarmDirectory::Persistent(dir)
350 } else {
351 SwarmDirectory::new_temporary()
352 };
353
354 let ingest_data = self.data_ingestion_dir.clone();
355
356 let network_config = self.network_config.unwrap_or_else(|| {
357 let mut config_builder = ConfigBuilder::new(dir.as_ref());
358
359 if let Some(genesis_config) = self.genesis_config {
360 config_builder = config_builder.with_genesis_config(genesis_config);
361 }
362
363 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
364 config_builder =
365 config_builder.with_num_unpruned_validators(num_unpruned_validators);
366 }
367
368 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
369 config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
370 }
371
372 if let Some(authority_overload_config) = self.authority_overload_config {
373 config_builder =
374 config_builder.with_authority_overload_config(authority_overload_config);
375 }
376
377 if let Some(execution_cache_type) = self.execution_cache_type {
378 config_builder = config_builder.with_execution_cache_type(execution_cache_type);
379 }
380
381 if let Some(execution_cache_config) = self.execution_cache_config {
382 config_builder = config_builder.with_execution_cache_config(execution_cache_config);
383 }
384
385 if let Some(path) = self.data_ingestion_dir {
386 config_builder = config_builder.with_data_ingestion_dir(path);
387 }
388
389 if let Some(max_submit_position) = self.max_submit_position {
390 config_builder = config_builder.with_max_submit_position(max_submit_position);
391 }
392
393 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
394 {
395 config_builder = config_builder
396 .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
397 }
398
399 let mut network_config = config_builder
400 .committee(self.committee)
401 .rng(self.rng)
402 .with_objects(self.additional_objects)
403 .with_empty_validator_genesis()
404 .with_supported_protocol_versions_config(
405 self.supported_protocol_versions_config.clone(),
406 )
407 .with_state_accumulator_config(self.state_accumulator_config.clone())
408 .build();
409 let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
411 network_config
412 .genesis
413 .save(&genesis_path)
414 .expect("genesis should be saved successfully");
415 for validator in &mut network_config.validator_configs {
416 validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
417 }
418 network_config
419 });
420
421 let mut nodes: HashMap<_, _> = network_config
422 .validator_configs()
423 .iter()
424 .map(|config| {
425 info!(
426 "SwarmBuilder configuring validator with name {}",
427 config.authority_public_key()
428 );
429 (config.authority_public_key(), Node::new(config.to_owned()))
430 })
431 .collect();
432
433 let mut fullnode_config_builder = FullnodeConfigBuilder::new()
434 .with_config_directory(dir.as_ref().into())
435 .with_db_checkpoint_config(self.db_checkpoint_config.clone())
436 .with_run_with_range(self.fullnode_run_with_range)
437 .with_policy_config(self.fullnode_policy_config)
438 .with_data_ingestion_dir(ingest_data)
439 .with_fw_config(self.fullnode_fw_config)
440 .with_disable_pruning(self.disable_fullnode_pruning)
441 .with_iota_names_config(self.iota_names_config);
442
443 if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
444 let supported_versions = match spvc {
445 ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
446 ProtocolVersionsConfig::Global(v) => *v,
447 ProtocolVersionsConfig::PerValidator(func) => func(0, None),
448 };
449 fullnode_config_builder =
450 fullnode_config_builder.with_supported_protocol_versions(supported_versions);
451 }
452
453 if self.fullnode_count > 0 {
454 (0..self.fullnode_count).for_each(|idx| {
455 let mut builder = fullnode_config_builder.clone();
456 if idx == 0 {
457 if let Some(rpc_addr) = self.fullnode_rpc_addr {
460 builder = builder.with_rpc_addr(rpc_addr);
461 }
462 if let Some(rpc_port) = self.fullnode_rpc_port {
463 builder = builder.with_rpc_port(rpc_port);
464 }
465 }
466 let config = builder.build(&mut OsRng, &network_config);
467 info!(
468 "SwarmBuilder configuring full node with name {}",
469 config.authority_public_key()
470 );
471 nodes.insert(config.authority_public_key(), Node::new(config));
472 });
473 }
474 Swarm {
475 dir,
476 network_config,
477 nodes,
478 fullnode_config_builder,
479 }
480 }
481}
482
483#[derive(Debug)]
485pub struct Swarm {
486 dir: SwarmDirectory,
487 network_config: NetworkConfig,
488 nodes: HashMap<AuthorityName, Node>,
489 fullnode_config_builder: FullnodeConfigBuilder,
491}
492
493impl Drop for Swarm {
494 fn drop(&mut self) {
495 self.nodes_iter_mut().for_each(|node| node.stop());
496 }
497}
498
499impl Swarm {
500 fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
501 self.nodes.values_mut()
502 }
503
504 pub fn builder() -> SwarmBuilder {
506 SwarmBuilder::new()
507 }
508
509 pub async fn launch(&mut self) -> Result<()> {
511 try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
512 tracing::info!("Successfully launched Swarm");
513 Ok(())
514 }
515
516 pub fn dir(&self) -> &Path {
519 self.dir.as_ref()
520 }
521
522 pub fn config(&self) -> &NetworkConfig {
524 &self.network_config
525 }
526
527 pub fn config_mut(&mut self) -> &mut NetworkConfig {
531 &mut self.network_config
532 }
533
534 pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
535 self.nodes.values()
536 }
537
538 pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
539 self.nodes.get(name)
540 }
541
542 pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
543 self.nodes.get_mut(name)
544 }
545
546 pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
551 self.nodes
552 .values()
553 .filter(|node| node.config().consensus_config.is_some())
554 }
555
556 pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
557 self.validator_nodes()
558 .map(|node| node.get_node_handle().unwrap())
559 .collect()
560 }
561
562 pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
564 self.validator_nodes().filter(|node| {
565 node.get_node_handle().is_some_and(|handle| {
566 let state = handle.state();
567 state.is_validator(&state.epoch_store_for_testing())
568 })
569 })
570 }
571
572 pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
574 self.nodes
575 .values()
576 .filter(|node| node.config().consensus_config.is_none())
577 }
578
579 pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
580 let name = config.authority_public_key();
581 let node = Node::new(config);
582 node.start().await.unwrap();
583 let handle = node.get_node_handle().unwrap();
584 self.nodes.insert(name, node);
585 handle
586 }
587
588 pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
589 self.fullnode_config_builder.clone()
590 }
591}
592
593#[derive(Debug)]
594enum SwarmDirectory {
595 Persistent(PathBuf),
596 Temporary(TempDir),
597}
598
599impl SwarmDirectory {
600 fn new_temporary() -> Self {
601 SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
602 }
603}
604
605impl ops::Deref for SwarmDirectory {
606 type Target = Path;
607
608 fn deref(&self) -> &Self::Target {
609 match self {
610 SwarmDirectory::Persistent(dir) => dir.deref(),
611 SwarmDirectory::Temporary(dir) => dir.path(),
612 }
613 }
614}
615
616impl AsRef<Path> for SwarmDirectory {
617 fn as_ref(&self) -> &Path {
618 match self {
619 SwarmDirectory::Persistent(dir) => dir.as_ref(),
620 SwarmDirectory::Temporary(dir) => dir.as_ref(),
621 }
622 }
623}
624
625#[cfg(test)]
626mod test {
627 use std::num::NonZeroUsize;
628
629 use super::Swarm;
630
631 #[tokio::test]
632 async fn launch() {
633 telemetry_subscribers::init_for_testing();
634 let mut swarm = Swarm::builder()
635 .committee_size(NonZeroUsize::new(4).unwrap())
636 .with_fullnode_count(1)
637 .build();
638
639 swarm.launch().await.unwrap();
640
641 for validator in swarm.validator_nodes() {
642 validator.health_check(true).await.unwrap();
643 }
644
645 for fullnode in swarm.fullnodes() {
646 fullnode.health_check(false).await.unwrap();
647 }
648
649 println!("hello");
650 }
651}