iota_swarm/memory/
swarm.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    num::NonZeroUsize,
9    ops,
10    path::{Path, PathBuf},
11    time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17    ExecutionCacheConfig, ExecutionCacheType, IOTA_GENESIS_FILENAME, NodeConfig,
18    node::{AuthorityOverloadConfig, DBCheckpointConfig, GrpcApiConfig, RunWithRange},
19    p2p::DiscoveryConfig,
20};
21use iota_macros::nondeterministic;
22use iota_names::config::IotaNamesConfig;
23use iota_node::IotaNodeHandle;
24use iota_protocol_config::{Chain, ProtocolVersion};
25use iota_swarm_config::{
26    genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
27    network_config::NetworkConfig,
28    network_config_builder::{
29        CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
30        SupportedProtocolVersionsCallback,
31    },
32    node_config_builder::FullnodeConfigBuilder,
33};
34use iota_types::{
35    base_types::AuthorityName,
36    object::Object,
37    supported_protocol_versions::SupportedProtocolVersions,
38    traffic_control::{PolicyConfig, RemoteFirewallConfig},
39};
40use rand::rngs::OsRng;
41use tempfile::TempDir;
42use tracing::info;
43
44use super::Node;
45
46pub struct SwarmBuilder<R = OsRng> {
47    rng: R,
48    // template: NodeConfig,
49    dir: Option<PathBuf>,
50    committee: CommitteeConfig,
51    genesis_config: Option<GenesisConfig>,
52    network_config: Option<NetworkConfig>,
53    chain_override: Option<Chain>,
54    additional_objects: Vec<Object>,
55    fullnode_count: usize,
56    fullnode_db_path: Option<PathBuf>,
57    fullnode_rpc_port: Option<u16>,
58    fullnode_rpc_addr: Option<SocketAddr>,
59    supported_protocol_versions_config: ProtocolVersionsConfig,
60    // Default to supported_protocol_versions_config, but can be overridden.
61    fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
62    db_checkpoint_config: DBCheckpointConfig,
63    jwk_fetch_interval: Option<Duration>,
64    num_unpruned_validators: Option<usize>,
65    authority_overload_config: Option<AuthorityOverloadConfig>,
66    execution_cache_type: Option<ExecutionCacheType>,
67    execution_cache_config: Option<ExecutionCacheConfig>,
68    data_ingestion_dir: Option<PathBuf>,
69    fullnode_run_with_range: Option<RunWithRange>,
70    fullnode_policy_config: Option<PolicyConfig>,
71    fullnode_fw_config: Option<RemoteFirewallConfig>,
72    max_submit_position: Option<usize>,
73    submit_delay_step_override_millis: Option<u64>,
74    state_accumulator_config: StateAccumulatorV1EnabledConfig,
75    disable_fullnode_pruning: bool,
76    iota_names_config: Option<IotaNamesConfig>,
77    fullnode_enable_grpc_api: bool,
78    fullnode_grpc_api_config: Option<GrpcApiConfig>,
79    disable_address_verification_cooldown: bool,
80}
81
82impl SwarmBuilder {
83    #[expect(clippy::new_without_default)]
84    pub fn new() -> Self {
85        Self {
86            rng: OsRng,
87            dir: None,
88            committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
89            genesis_config: None,
90            network_config: None,
91            chain_override: None,
92            additional_objects: vec![],
93            fullnode_count: 0,
94            fullnode_db_path: None,
95            fullnode_rpc_port: None,
96            fullnode_rpc_addr: None,
97            supported_protocol_versions_config: ProtocolVersionsConfig::Default,
98            fullnode_supported_protocol_versions_config: None,
99            db_checkpoint_config: DBCheckpointConfig::default(),
100            jwk_fetch_interval: None,
101            num_unpruned_validators: None,
102            authority_overload_config: None,
103            execution_cache_type: None,
104            execution_cache_config: None,
105            data_ingestion_dir: None,
106            fullnode_run_with_range: None,
107            fullnode_policy_config: None,
108            fullnode_fw_config: None,
109            max_submit_position: None,
110            submit_delay_step_override_millis: None,
111            state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
112            disable_fullnode_pruning: false,
113            iota_names_config: None,
114            fullnode_enable_grpc_api: false,
115            fullnode_grpc_api_config: None,
116            disable_address_verification_cooldown: false,
117        }
118    }
119}
120
121impl<R> SwarmBuilder<R> {
122    pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
123        SwarmBuilder {
124            rng,
125            dir: self.dir,
126            committee: self.committee,
127            genesis_config: self.genesis_config,
128            network_config: self.network_config,
129            chain_override: self.chain_override,
130            additional_objects: self.additional_objects,
131            fullnode_count: self.fullnode_count,
132            fullnode_db_path: self.fullnode_db_path,
133            fullnode_rpc_port: self.fullnode_rpc_port,
134            fullnode_rpc_addr: self.fullnode_rpc_addr,
135            supported_protocol_versions_config: self.supported_protocol_versions_config,
136            fullnode_supported_protocol_versions_config: self
137                .fullnode_supported_protocol_versions_config,
138            db_checkpoint_config: self.db_checkpoint_config,
139            jwk_fetch_interval: self.jwk_fetch_interval,
140            num_unpruned_validators: self.num_unpruned_validators,
141            authority_overload_config: self.authority_overload_config,
142            execution_cache_type: self.execution_cache_type,
143            execution_cache_config: self.execution_cache_config,
144            data_ingestion_dir: self.data_ingestion_dir,
145            fullnode_run_with_range: self.fullnode_run_with_range,
146            fullnode_policy_config: self.fullnode_policy_config,
147            fullnode_fw_config: self.fullnode_fw_config,
148            max_submit_position: self.max_submit_position,
149            submit_delay_step_override_millis: self.submit_delay_step_override_millis,
150            state_accumulator_config: self.state_accumulator_config,
151            disable_fullnode_pruning: self.disable_fullnode_pruning,
152            iota_names_config: self.iota_names_config,
153            fullnode_enable_grpc_api: self.fullnode_enable_grpc_api,
154            fullnode_grpc_api_config: self.fullnode_grpc_api_config,
155            disable_address_verification_cooldown: self.disable_address_verification_cooldown,
156        }
157    }
158
159    /// Set the directory that should be used by the Swarm for any on-disk data.
160    ///
161    /// If a directory is provided, it will not be cleaned up when the Swarm is
162    /// dropped.
163    ///
164    /// Defaults to using a temporary directory that will be cleaned up when the
165    /// Swarm is dropped.
166    pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
167        self.dir = Some(dir.into());
168        self
169    }
170
171    /// Set the committee size (the number of validators in the validator set).
172    ///
173    /// Defaults to 1.
174    pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
175        self.committee = CommitteeConfig::Size(committee_size);
176        self
177    }
178
179    pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
180        self.committee = CommitteeConfig::Validators(validators);
181        self
182    }
183
184    pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
185        assert!(self.network_config.is_none() && self.genesis_config.is_none());
186        self.genesis_config = Some(genesis_config);
187        self
188    }
189
190    pub fn with_chain_override(mut self, chain: Chain) -> Self {
191        assert!(self.chain_override.is_none());
192        self.chain_override = Some(chain);
193        self
194    }
195
196    pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
197        assert!(self.network_config.is_none());
198        self.num_unpruned_validators = Some(n);
199        self
200    }
201
202    pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
203        self.jwk_fetch_interval = Some(i);
204        self
205    }
206
207    pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
208        assert!(self.network_config.is_none() && self.genesis_config.is_none());
209        self.network_config = Some(network_config);
210        self
211    }
212
213    pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
214        self.get_or_init_genesis_config().accounts = accounts;
215        self
216    }
217
218    pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
219        self.additional_objects.extend(objects);
220        self
221    }
222
223    pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
224        self.fullnode_count = fullnode_count;
225        self
226    }
227
228    pub fn with_fullnode_db_path(mut self, fullnode_db_path: PathBuf) -> Self {
229        self.fullnode_db_path = Some(fullnode_db_path);
230        self
231    }
232
233    pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
234        assert!(self.fullnode_rpc_addr.is_none());
235        self.fullnode_rpc_port = Some(fullnode_rpc_port);
236        self
237    }
238
239    pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
240        assert!(self.fullnode_rpc_port.is_none());
241        self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
242        self
243    }
244
245    pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
246        self.get_or_init_genesis_config()
247            .parameters
248            .epoch_duration_ms = epoch_duration_ms;
249        self
250    }
251
252    pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
253        self.get_or_init_genesis_config()
254            .parameters
255            .protocol_version = v;
256        self
257    }
258
259    pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
260        self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
261        self
262    }
263
264    pub fn with_supported_protocol_version_callback(
265        mut self,
266        func: SupportedProtocolVersionsCallback,
267    ) -> Self {
268        self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
269        self
270    }
271
272    pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
273        self.supported_protocol_versions_config = c;
274        self
275    }
276
277    pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
278        self.state_accumulator_config = c;
279        self
280    }
281
282    pub fn with_fullnode_supported_protocol_versions_config(
283        mut self,
284        c: ProtocolVersionsConfig,
285    ) -> Self {
286        self.fullnode_supported_protocol_versions_config = Some(c);
287        self
288    }
289
290    pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
291        self.db_checkpoint_config = db_checkpoint_config;
292        self
293    }
294
295    pub fn with_authority_overload_config(
296        mut self,
297        authority_overload_config: AuthorityOverloadConfig,
298    ) -> Self {
299        assert!(self.network_config.is_none());
300        self.authority_overload_config = Some(authority_overload_config);
301        self
302    }
303
304    pub fn with_execution_cache_type(mut self, execution_cache_type: ExecutionCacheType) -> Self {
305        assert!(self.execution_cache_type.is_none());
306        self.execution_cache_type = Some(execution_cache_type);
307        self
308    }
309
310    pub fn with_execution_cache_config(
311        mut self,
312        execution_cache_config: ExecutionCacheConfig,
313    ) -> Self {
314        self.execution_cache_config = Some(execution_cache_config);
315        self
316    }
317
318    pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
319        self.data_ingestion_dir = Some(path);
320        self
321    }
322
323    pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
324        if let Some(run_with_range) = run_with_range {
325            self.fullnode_run_with_range = Some(run_with_range);
326        }
327        self
328    }
329
330    pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
331        self.fullnode_policy_config = config;
332        self
333    }
334
335    pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
336        self.fullnode_fw_config = config;
337        self
338    }
339
340    pub fn with_fullnode_enable_grpc_api(mut self, enable: bool) -> Self {
341        self.fullnode_enable_grpc_api = enable;
342        self
343    }
344
345    pub fn with_fullnode_grpc_api_config(mut self, config: GrpcApiConfig) -> Self {
346        self.fullnode_grpc_api_config = Some(config);
347        self
348    }
349
350    fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
351        if self.genesis_config.is_none() {
352            assert!(self.network_config.is_none());
353            self.genesis_config = Some(GenesisConfig::for_local_testing());
354        }
355        self.genesis_config.as_mut().unwrap()
356    }
357
358    pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
359        self.max_submit_position = Some(max_submit_position);
360        self
361    }
362
363    pub fn with_disable_fullnode_pruning(mut self) -> Self {
364        self.disable_fullnode_pruning = true;
365        self
366    }
367
368    pub fn with_submit_delay_step_override_millis(
369        mut self,
370        submit_delay_step_override_millis: u64,
371    ) -> Self {
372        self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
373        self
374    }
375
376    pub fn with_iota_names_config(mut self, iota_names_config: IotaNamesConfig) -> Self {
377        self.iota_names_config = Some(iota_names_config);
378        self
379    }
380
381    /// Disable address verification cooldown for test environments where nodes
382    /// frequently restart. This prevents nodes from being blocked from
383    /// reconnecting after crashes/restarts.
384    pub fn with_disabled_address_verification_cooldown(mut self) -> Self {
385        self.disable_address_verification_cooldown = true;
386        self
387    }
388}
389
390impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
391    /// Create the configured Swarm.
392    pub fn build(self) -> Swarm {
393        let dir = if let Some(dir) = self.dir {
394            SwarmDirectory::Persistent(dir)
395        } else {
396            SwarmDirectory::new_temporary()
397        };
398
399        let ingest_data = self.data_ingestion_dir.clone();
400
401        let mut network_config = self.network_config.unwrap_or_else(|| {
402            let mut config_builder = ConfigBuilder::new(dir.as_ref());
403
404            if let Some(genesis_config) = self.genesis_config {
405                config_builder = config_builder.with_genesis_config(genesis_config);
406            }
407
408            if let Some(chain_override) = self.chain_override {
409                config_builder = config_builder.with_chain_override(chain_override);
410            }
411
412            if let Some(num_unpruned_validators) = self.num_unpruned_validators {
413                config_builder =
414                    config_builder.with_num_unpruned_validators(num_unpruned_validators);
415            }
416
417            if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
418                config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
419            }
420
421            if let Some(authority_overload_config) = self.authority_overload_config {
422                config_builder =
423                    config_builder.with_authority_overload_config(authority_overload_config);
424            }
425
426            if let Some(execution_cache_type) = self.execution_cache_type {
427                config_builder = config_builder.with_execution_cache_type(execution_cache_type);
428            }
429
430            if let Some(execution_cache_config) = self.execution_cache_config {
431                config_builder = config_builder.with_execution_cache_config(execution_cache_config);
432            }
433
434            if let Some(path) = self.data_ingestion_dir {
435                config_builder = config_builder.with_data_ingestion_dir(path);
436            }
437
438            if let Some(max_submit_position) = self.max_submit_position {
439                config_builder = config_builder.with_max_submit_position(max_submit_position);
440            }
441
442            if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
443            {
444                config_builder = config_builder
445                    .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
446            }
447
448            let mut network_config = config_builder
449                .committee(self.committee)
450                .rng(self.rng)
451                .with_objects(self.additional_objects)
452                .with_empty_validator_genesis()
453                .with_supported_protocol_versions_config(
454                    self.supported_protocol_versions_config.clone(),
455                )
456                .with_state_accumulator_config(self.state_accumulator_config.clone())
457                .build();
458            // Populate validator genesis by pointing to the blob
459            let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
460            network_config
461                .genesis
462                .save(&genesis_path)
463                .expect("genesis should be saved successfully");
464            for validator in &mut network_config.validator_configs {
465                validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
466            }
467            network_config
468        });
469
470        if self.disable_address_verification_cooldown {
471            for validator in &mut network_config.validator_configs {
472                if let Some(ref mut discovery_config) = validator.p2p_config.discovery {
473                    discovery_config.address_verification_failure_cooldown_sec = Some(0);
474                } else {
475                    validator.p2p_config.discovery = Some(DiscoveryConfig {
476                        address_verification_failure_cooldown_sec: Some(0),
477                        ..Default::default()
478                    });
479                }
480            }
481        }
482
483        let mut nodes: HashMap<_, _> = network_config
484            .validator_configs()
485            .iter()
486            .map(|config| {
487                info!(
488                    "SwarmBuilder configuring validator with name {}",
489                    config.authority_public_key()
490                );
491                (config.authority_public_key(), Node::new(config.to_owned()))
492            })
493            .collect();
494
495        let mut fullnode_config_builder = FullnodeConfigBuilder::new()
496            .with_config_directory(dir.as_ref().into())
497            .with_db_checkpoint_config(self.db_checkpoint_config.clone())
498            .with_run_with_range(self.fullnode_run_with_range)
499            .with_policy_config(self.fullnode_policy_config)
500            .with_data_ingestion_dir(ingest_data)
501            .with_fw_config(self.fullnode_fw_config)
502            .with_disable_pruning(self.disable_fullnode_pruning)
503            .with_iota_names_config(self.iota_names_config);
504        if let Some(fullnode_db_path) = self.fullnode_db_path {
505            fullnode_config_builder = fullnode_config_builder.with_db_path(fullnode_db_path);
506        }
507
508        if self.disable_address_verification_cooldown {
509            let discovery_config = DiscoveryConfig {
510                address_verification_failure_cooldown_sec: Some(0),
511                ..Default::default()
512            };
513
514            fullnode_config_builder =
515                fullnode_config_builder.with_discovery_config(discovery_config);
516        }
517
518        if let Some(chain) = self.chain_override {
519            fullnode_config_builder = fullnode_config_builder.with_chain_override(chain);
520        }
521
522        if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
523            let supported_versions = match spvc {
524                ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
525                ProtocolVersionsConfig::Global(v) => *v,
526                ProtocolVersionsConfig::PerValidator(func) => func(0, None),
527            };
528            fullnode_config_builder =
529                fullnode_config_builder.with_supported_protocol_versions(supported_versions);
530        }
531
532        // Add gRPC config wiring
533        fullnode_config_builder =
534            fullnode_config_builder.with_enable_grpc_api(self.fullnode_enable_grpc_api);
535        if let Some(grpc_config) = &self.fullnode_grpc_api_config {
536            fullnode_config_builder =
537                fullnode_config_builder.with_grpc_api_config(grpc_config.clone());
538        }
539
540        if self.fullnode_count > 0 {
541            (0..self.fullnode_count).for_each(|idx| {
542                let mut builder = fullnode_config_builder.clone();
543                if idx == 0 {
544                    // Only the first fullnode is used as the rpc fullnode, we can only use the
545                    // same address once.
546                    if let Some(rpc_addr) = self.fullnode_rpc_addr {
547                        builder = builder.with_rpc_addr(rpc_addr);
548                    }
549                    if let Some(rpc_port) = self.fullnode_rpc_port {
550                        builder = builder.with_rpc_port(rpc_port);
551                    }
552                }
553                let config = builder.build(&mut OsRng, &network_config);
554                info!(
555                    "SwarmBuilder configuring full node with name {}",
556                    config.authority_public_key()
557                );
558                nodes.insert(config.authority_public_key(), Node::new(config));
559            });
560        }
561        Swarm {
562            dir,
563            network_config,
564            nodes,
565            fullnode_config_builder,
566        }
567    }
568}
569
570/// A handle to an in-memory IOTA Network.
571#[derive(Debug)]
572pub struct Swarm {
573    dir: SwarmDirectory,
574    network_config: NetworkConfig,
575    nodes: HashMap<AuthorityName, Node>,
576    // Save a copy of the fullnode config builder to build future fullnodes.
577    fullnode_config_builder: FullnodeConfigBuilder,
578}
579
580impl Drop for Swarm {
581    fn drop(&mut self) {
582        self.nodes_iter_mut().for_each(|node| node.stop());
583    }
584}
585
586impl Swarm {
587    fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
588        self.nodes.values_mut()
589    }
590
591    /// Return a new Builder
592    pub fn builder() -> SwarmBuilder {
593        SwarmBuilder::new()
594    }
595
596    /// Start all nodes associated with this Swarm
597    pub async fn launch(&mut self) -> Result<()> {
598        try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
599        tracing::info!("Successfully launched Swarm");
600        Ok(())
601    }
602
603    /// Return the path to the directory where this Swarm's on-disk data is
604    /// kept.
605    pub fn dir(&self) -> &Path {
606        self.dir.as_ref()
607    }
608
609    /// Return a reference to this Swarm's `NetworkConfig`.
610    pub fn config(&self) -> &NetworkConfig {
611        &self.network_config
612    }
613
614    /// Return a mutable reference to this Swarm's `NetworkConfig`.
615    // TODO: It's not ideal to mutate network config. We should consider removing
616    // this.
617    pub fn config_mut(&mut self) -> &mut NetworkConfig {
618        &mut self.network_config
619    }
620
621    pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
622        self.nodes.values()
623    }
624
625    pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
626        self.nodes.get(name)
627    }
628
629    pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
630        self.nodes.get_mut(name)
631    }
632
633    /// Return an iterator over shared references of all nodes that are set up
634    /// as validators. This means that they have a consensus config. This
635    /// however doesn't mean this validator is currently active (i.e. it's
636    /// not necessarily in the validator set at the moment).
637    pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
638        self.nodes
639            .values()
640            .filter(|node| node.config().consensus_config.is_some())
641    }
642
643    pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
644        self.validator_nodes()
645            .map(|node| node.get_node_handle().unwrap())
646            .collect()
647    }
648
649    /// Returns an iterator over all current active validators.
650    pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
651        self.validator_nodes().filter(|node| {
652            node.get_node_handle().is_some_and(|handle| {
653                let state = handle.state();
654                state.is_active_validator(&state.epoch_store_for_testing())
655            })
656        })
657    }
658
659    /// Returns an iterator over all current active validators.
660    pub fn committee_validators(&self) -> impl Iterator<Item = &Node> {
661        self.validator_nodes().filter(|node| {
662            node.get_node_handle().is_some_and(|handle| {
663                let state = handle.state();
664                state.is_committee_validator(&state.epoch_store_for_testing())
665            })
666        })
667    }
668
669    /// Return an iterator over shared references of all Fullnodes.
670    pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
671        self.nodes
672            .values()
673            .filter(|node| node.config().consensus_config.is_none())
674    }
675
676    pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
677        let name = config.authority_public_key();
678        let node = Node::new(config);
679        node.start().await.unwrap();
680        let handle = node.get_node_handle().unwrap();
681        self.nodes.insert(name, node);
682        handle
683    }
684
685    pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
686        self.fullnode_config_builder.clone()
687    }
688}
689
690#[derive(Debug)]
691enum SwarmDirectory {
692    Persistent(PathBuf),
693    Temporary(TempDir),
694}
695
696impl SwarmDirectory {
697    fn new_temporary() -> Self {
698        SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
699    }
700}
701
702impl ops::Deref for SwarmDirectory {
703    type Target = Path;
704
705    fn deref(&self) -> &Self::Target {
706        match self {
707            SwarmDirectory::Persistent(dir) => dir.deref(),
708            SwarmDirectory::Temporary(dir) => dir.path(),
709        }
710    }
711}
712
713impl AsRef<Path> for SwarmDirectory {
714    fn as_ref(&self) -> &Path {
715        match self {
716            SwarmDirectory::Persistent(dir) => dir.as_ref(),
717            SwarmDirectory::Temporary(dir) => dir.as_ref(),
718        }
719    }
720}
721
722#[cfg(test)]
723mod test {
724    use std::num::NonZeroUsize;
725
726    use super::Swarm;
727
728    #[tokio::test]
729    async fn launch() {
730        telemetry_subscribers::init_for_testing();
731        let mut swarm = Swarm::builder()
732            .committee_size(NonZeroUsize::new(4).unwrap())
733            .with_fullnode_count(1)
734            .build();
735
736        swarm.launch().await.unwrap();
737
738        for validator in swarm.validator_nodes() {
739            validator.health_check(true).await.unwrap();
740        }
741
742        for fullnode in swarm.fullnodes() {
743            fullnode.health_check(false).await.unwrap();
744        }
745
746        println!("hello");
747    }
748}