1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 num::NonZeroUsize,
9 ops,
10 path::{Path, PathBuf},
11 time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17 IOTA_GENESIS_FILENAME, NodeConfig,
18 node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange},
19};
20use iota_macros::nondeterministic;
21use iota_node::IotaNodeHandle;
22use iota_protocol_config::ProtocolVersion;
23use iota_swarm_config::{
24 genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
25 network_config::NetworkConfig,
26 network_config_builder::{
27 CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
28 SupportedProtocolVersionsCallback,
29 },
30 node_config_builder::FullnodeConfigBuilder,
31};
32use iota_types::{
33 base_types::AuthorityName,
34 object::Object,
35 supported_protocol_versions::SupportedProtocolVersions,
36 traffic_control::{PolicyConfig, RemoteFirewallConfig},
37};
38use rand::rngs::OsRng;
39use tempfile::TempDir;
40use tracing::info;
41
42use super::Node;
43
44pub struct SwarmBuilder<R = OsRng> {
45 rng: R,
46 dir: Option<PathBuf>,
48 committee: CommitteeConfig,
49 genesis_config: Option<GenesisConfig>,
50 network_config: Option<NetworkConfig>,
51 additional_objects: Vec<Object>,
52 fullnode_count: usize,
53 fullnode_rpc_port: Option<u16>,
54 fullnode_rpc_addr: Option<SocketAddr>,
55 supported_protocol_versions_config: ProtocolVersionsConfig,
56 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
58 db_checkpoint_config: DBCheckpointConfig,
59 jwk_fetch_interval: Option<Duration>,
60 num_unpruned_validators: Option<usize>,
61 authority_overload_config: Option<AuthorityOverloadConfig>,
62 data_ingestion_dir: Option<PathBuf>,
63 fullnode_run_with_range: Option<RunWithRange>,
64 fullnode_policy_config: Option<PolicyConfig>,
65 fullnode_fw_config: Option<RemoteFirewallConfig>,
66 max_submit_position: Option<usize>,
67 submit_delay_step_override_millis: Option<u64>,
68 state_accumulator_config: StateAccumulatorV1EnabledConfig,
69 disable_fullnode_pruning: bool,
70}
71
72impl SwarmBuilder {
73 #[expect(clippy::new_without_default)]
74 pub fn new() -> Self {
75 Self {
76 rng: OsRng,
77 dir: None,
78 committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
79 genesis_config: None,
80 network_config: None,
81 additional_objects: vec![],
82 fullnode_count: 0,
83 fullnode_rpc_port: None,
84 fullnode_rpc_addr: None,
85 supported_protocol_versions_config: ProtocolVersionsConfig::Default,
86 fullnode_supported_protocol_versions_config: None,
87 db_checkpoint_config: DBCheckpointConfig::default(),
88 jwk_fetch_interval: None,
89 num_unpruned_validators: None,
90 authority_overload_config: None,
91 data_ingestion_dir: None,
92 fullnode_run_with_range: None,
93 fullnode_policy_config: None,
94 fullnode_fw_config: None,
95 max_submit_position: None,
96 submit_delay_step_override_millis: None,
97 state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
98 disable_fullnode_pruning: false,
99 }
100 }
101}
102
103impl<R> SwarmBuilder<R> {
104 pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
105 SwarmBuilder {
106 rng,
107 dir: self.dir,
108 committee: self.committee,
109 genesis_config: self.genesis_config,
110 network_config: self.network_config,
111 additional_objects: self.additional_objects,
112 fullnode_count: self.fullnode_count,
113 fullnode_rpc_port: self.fullnode_rpc_port,
114 fullnode_rpc_addr: self.fullnode_rpc_addr,
115 supported_protocol_versions_config: self.supported_protocol_versions_config,
116 fullnode_supported_protocol_versions_config: self
117 .fullnode_supported_protocol_versions_config,
118 db_checkpoint_config: self.db_checkpoint_config,
119 jwk_fetch_interval: self.jwk_fetch_interval,
120 num_unpruned_validators: self.num_unpruned_validators,
121 authority_overload_config: self.authority_overload_config,
122 data_ingestion_dir: self.data_ingestion_dir,
123 fullnode_run_with_range: self.fullnode_run_with_range,
124 fullnode_policy_config: self.fullnode_policy_config,
125 fullnode_fw_config: self.fullnode_fw_config,
126 max_submit_position: self.max_submit_position,
127 submit_delay_step_override_millis: self.submit_delay_step_override_millis,
128 state_accumulator_config: self.state_accumulator_config,
129 disable_fullnode_pruning: self.disable_fullnode_pruning,
130 }
131 }
132
133 pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
141 self.dir = Some(dir.into());
142 self
143 }
144
145 pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
149 self.committee = CommitteeConfig::Size(committee_size);
150 self
151 }
152
153 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
154 self.committee = CommitteeConfig::Validators(validators);
155 self
156 }
157
158 pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
159 assert!(self.network_config.is_none() && self.genesis_config.is_none());
160 self.genesis_config = Some(genesis_config);
161 self
162 }
163
164 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
165 assert!(self.network_config.is_none());
166 self.num_unpruned_validators = Some(n);
167 self
168 }
169
170 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
171 self.jwk_fetch_interval = Some(i);
172 self
173 }
174
175 pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
176 assert!(self.network_config.is_none() && self.genesis_config.is_none());
177 self.network_config = Some(network_config);
178 self
179 }
180
181 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
182 self.get_or_init_genesis_config().accounts = accounts;
183 self
184 }
185
186 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
187 self.additional_objects.extend(objects);
188 self
189 }
190
191 pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
192 self.fullnode_count = fullnode_count;
193 self
194 }
195
196 pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
197 assert!(self.fullnode_rpc_addr.is_none());
198 self.fullnode_rpc_port = Some(fullnode_rpc_port);
199 self
200 }
201
202 pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
203 assert!(self.fullnode_rpc_port.is_none());
204 self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
205 self
206 }
207
208 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
209 self.get_or_init_genesis_config()
210 .parameters
211 .epoch_duration_ms = epoch_duration_ms;
212 self
213 }
214
215 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
216 self.get_or_init_genesis_config()
217 .parameters
218 .protocol_version = v;
219 self
220 }
221
222 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
223 self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
224 self
225 }
226
227 pub fn with_supported_protocol_version_callback(
228 mut self,
229 func: SupportedProtocolVersionsCallback,
230 ) -> Self {
231 self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
232 self
233 }
234
235 pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
236 self.supported_protocol_versions_config = c;
237 self
238 }
239
240 pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
241 self.state_accumulator_config = c;
242 self
243 }
244
245 pub fn with_fullnode_supported_protocol_versions_config(
246 mut self,
247 c: ProtocolVersionsConfig,
248 ) -> Self {
249 self.fullnode_supported_protocol_versions_config = Some(c);
250 self
251 }
252
253 pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
254 self.db_checkpoint_config = db_checkpoint_config;
255 self
256 }
257
258 pub fn with_authority_overload_config(
259 mut self,
260 authority_overload_config: AuthorityOverloadConfig,
261 ) -> Self {
262 assert!(self.network_config.is_none());
263 self.authority_overload_config = Some(authority_overload_config);
264 self
265 }
266
267 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
268 self.data_ingestion_dir = Some(path);
269 self
270 }
271
272 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
273 if let Some(run_with_range) = run_with_range {
274 self.fullnode_run_with_range = Some(run_with_range);
275 }
276 self
277 }
278
279 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
280 self.fullnode_policy_config = config;
281 self
282 }
283
284 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
285 self.fullnode_fw_config = config;
286 self
287 }
288
289 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
290 if self.genesis_config.is_none() {
291 assert!(self.network_config.is_none());
292 self.genesis_config = Some(GenesisConfig::for_local_testing());
293 }
294 self.genesis_config.as_mut().unwrap()
295 }
296
297 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
298 self.max_submit_position = Some(max_submit_position);
299 self
300 }
301
302 pub fn with_disable_fullnode_pruning(mut self) -> Self {
303 self.disable_fullnode_pruning = true;
304 self
305 }
306
307 pub fn with_submit_delay_step_override_millis(
308 mut self,
309 submit_delay_step_override_millis: u64,
310 ) -> Self {
311 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
312 self
313 }
314}
315
316impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
317 pub fn build(self) -> Swarm {
319 let dir = if let Some(dir) = self.dir {
320 SwarmDirectory::Persistent(dir)
321 } else {
322 SwarmDirectory::new_temporary()
323 };
324
325 let ingest_data = self.data_ingestion_dir.clone();
326
327 let network_config = self.network_config.unwrap_or_else(|| {
328 let mut config_builder = ConfigBuilder::new(dir.as_ref());
329
330 if let Some(genesis_config) = self.genesis_config {
331 config_builder = config_builder.with_genesis_config(genesis_config);
332 }
333
334 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
335 config_builder =
336 config_builder.with_num_unpruned_validators(num_unpruned_validators);
337 }
338
339 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
340 config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
341 }
342
343 if let Some(authority_overload_config) = self.authority_overload_config {
344 config_builder =
345 config_builder.with_authority_overload_config(authority_overload_config);
346 }
347
348 if let Some(path) = self.data_ingestion_dir {
349 config_builder = config_builder.with_data_ingestion_dir(path);
350 }
351
352 if let Some(max_submit_position) = self.max_submit_position {
353 config_builder = config_builder.with_max_submit_position(max_submit_position);
354 }
355
356 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
357 {
358 config_builder = config_builder
359 .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
360 }
361
362 let mut network_config = config_builder
363 .committee(self.committee)
364 .rng(self.rng)
365 .with_objects(self.additional_objects)
366 .with_empty_validator_genesis()
367 .with_supported_protocol_versions_config(
368 self.supported_protocol_versions_config.clone(),
369 )
370 .with_state_accumulator_config(self.state_accumulator_config.clone())
371 .build();
372 let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
374 network_config
375 .genesis
376 .save(&genesis_path)
377 .expect("genesis should be saved successfully");
378 for validator in &mut network_config.validator_configs {
379 validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
380 }
381 network_config
382 });
383
384 let mut nodes: HashMap<_, _> = network_config
385 .validator_configs()
386 .iter()
387 .map(|config| {
388 info!(
389 "SwarmBuilder configuring validator with name {}",
390 config.authority_public_key()
391 );
392 (config.authority_public_key(), Node::new(config.to_owned()))
393 })
394 .collect();
395
396 let mut fullnode_config_builder = FullnodeConfigBuilder::new()
397 .with_config_directory(dir.as_ref().into())
398 .with_db_checkpoint_config(self.db_checkpoint_config.clone())
399 .with_run_with_range(self.fullnode_run_with_range)
400 .with_policy_config(self.fullnode_policy_config)
401 .with_data_ingestion_dir(ingest_data)
402 .with_fw_config(self.fullnode_fw_config)
403 .with_disable_pruning(self.disable_fullnode_pruning);
404
405 if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
406 let supported_versions = match spvc {
407 ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
408 ProtocolVersionsConfig::Global(v) => *v,
409 ProtocolVersionsConfig::PerValidator(func) => func(0, None),
410 };
411 fullnode_config_builder =
412 fullnode_config_builder.with_supported_protocol_versions(supported_versions);
413 }
414
415 if self.fullnode_count > 0 {
416 (0..self.fullnode_count).for_each(|idx| {
417 let mut builder = fullnode_config_builder.clone();
418 if idx == 0 {
419 if let Some(rpc_addr) = self.fullnode_rpc_addr {
422 builder = builder.with_rpc_addr(rpc_addr);
423 }
424 if let Some(rpc_port) = self.fullnode_rpc_port {
425 builder = builder.with_rpc_port(rpc_port);
426 }
427 }
428 let config = builder.build(&mut OsRng, &network_config);
429 info!(
430 "SwarmBuilder configuring full node with name {}",
431 config.authority_public_key()
432 );
433 nodes.insert(config.authority_public_key(), Node::new(config));
434 });
435 }
436 Swarm {
437 dir,
438 network_config,
439 nodes,
440 fullnode_config_builder,
441 }
442 }
443}
444
445#[derive(Debug)]
447pub struct Swarm {
448 dir: SwarmDirectory,
449 network_config: NetworkConfig,
450 nodes: HashMap<AuthorityName, Node>,
451 fullnode_config_builder: FullnodeConfigBuilder,
453}
454
455impl Drop for Swarm {
456 fn drop(&mut self) {
457 self.nodes_iter_mut().for_each(|node| node.stop());
458 }
459}
460
461impl Swarm {
462 fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
463 self.nodes.values_mut()
464 }
465
466 pub fn builder() -> SwarmBuilder {
468 SwarmBuilder::new()
469 }
470
471 pub async fn launch(&mut self) -> Result<()> {
473 try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
474 tracing::info!("Successfully launched Swarm");
475 Ok(())
476 }
477
478 pub fn dir(&self) -> &Path {
481 self.dir.as_ref()
482 }
483
484 pub fn config(&self) -> &NetworkConfig {
486 &self.network_config
487 }
488
489 pub fn config_mut(&mut self) -> &mut NetworkConfig {
493 &mut self.network_config
494 }
495
496 pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
497 self.nodes.values()
498 }
499
500 pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
501 self.nodes.get(name)
502 }
503
504 pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
505 self.nodes.get_mut(name)
506 }
507
508 pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
513 self.nodes
514 .values()
515 .filter(|node| node.config().consensus_config.is_some())
516 }
517
518 pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
519 self.validator_nodes()
520 .map(|node| node.get_node_handle().unwrap())
521 .collect()
522 }
523
524 pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
526 self.validator_nodes().filter(|node| {
527 node.get_node_handle().is_some_and(|handle| {
528 let state = handle.state();
529 state.is_validator(&state.epoch_store_for_testing())
530 })
531 })
532 }
533
534 pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
536 self.nodes
537 .values()
538 .filter(|node| node.config().consensus_config.is_none())
539 }
540
541 pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
542 let name = config.authority_public_key();
543 let node = Node::new(config);
544 node.start().await.unwrap();
545 let handle = node.get_node_handle().unwrap();
546 self.nodes.insert(name, node);
547 handle
548 }
549
550 pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
551 self.fullnode_config_builder.clone()
552 }
553}
554
555#[derive(Debug)]
556enum SwarmDirectory {
557 Persistent(PathBuf),
558 Temporary(TempDir),
559}
560
561impl SwarmDirectory {
562 fn new_temporary() -> Self {
563 SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
564 }
565}
566
567impl ops::Deref for SwarmDirectory {
568 type Target = Path;
569
570 fn deref(&self) -> &Self::Target {
571 match self {
572 SwarmDirectory::Persistent(dir) => dir.deref(),
573 SwarmDirectory::Temporary(dir) => dir.path(),
574 }
575 }
576}
577
578impl AsRef<Path> for SwarmDirectory {
579 fn as_ref(&self) -> &Path {
580 match self {
581 SwarmDirectory::Persistent(dir) => dir.as_ref(),
582 SwarmDirectory::Temporary(dir) => dir.as_ref(),
583 }
584 }
585}
586
587#[cfg(test)]
588mod test {
589 use std::num::NonZeroUsize;
590
591 use super::Swarm;
592
593 #[tokio::test]
594 async fn launch() {
595 telemetry_subscribers::init_for_testing();
596 let mut swarm = Swarm::builder()
597 .committee_size(NonZeroUsize::new(4).unwrap())
598 .with_fullnode_count(1)
599 .build();
600
601 swarm.launch().await.unwrap();
602
603 for validator in swarm.validator_nodes() {
604 validator.health_check(true).await.unwrap();
605 }
606
607 for fullnode in swarm.fullnodes() {
608 fullnode.health_check(false).await.unwrap();
609 }
610
611 println!("hello");
612 }
613}