iota_swarm/memory/
swarm.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    num::NonZeroUsize,
9    ops,
10    path::{Path, PathBuf},
11    time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17    ExecutionCacheConfig, ExecutionCacheType, IOTA_GENESIS_FILENAME, NodeConfig,
18    node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange},
19};
20use iota_macros::nondeterministic;
21use iota_names::config::IotaNamesConfig;
22use iota_node::IotaNodeHandle;
23use iota_protocol_config::ProtocolVersion;
24use iota_swarm_config::{
25    genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
26    network_config::NetworkConfig,
27    network_config_builder::{
28        CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
29        SupportedProtocolVersionsCallback,
30    },
31    node_config_builder::FullnodeConfigBuilder,
32};
33use iota_types::{
34    base_types::AuthorityName,
35    object::Object,
36    supported_protocol_versions::SupportedProtocolVersions,
37    traffic_control::{PolicyConfig, RemoteFirewallConfig},
38};
39use rand::rngs::OsRng;
40use tempfile::TempDir;
41use tracing::info;
42
43use super::Node;
44
45pub struct SwarmBuilder<R = OsRng> {
46    rng: R,
47    // template: NodeConfig,
48    dir: Option<PathBuf>,
49    committee: CommitteeConfig,
50    genesis_config: Option<GenesisConfig>,
51    network_config: Option<NetworkConfig>,
52    additional_objects: Vec<Object>,
53    fullnode_count: usize,
54    fullnode_rpc_port: Option<u16>,
55    fullnode_rpc_addr: Option<SocketAddr>,
56    supported_protocol_versions_config: ProtocolVersionsConfig,
57    // Default to supported_protocol_versions_config, but can be overridden.
58    fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
59    db_checkpoint_config: DBCheckpointConfig,
60    jwk_fetch_interval: Option<Duration>,
61    num_unpruned_validators: Option<usize>,
62    authority_overload_config: Option<AuthorityOverloadConfig>,
63    execution_cache_type: Option<ExecutionCacheType>,
64    execution_cache_config: Option<ExecutionCacheConfig>,
65    data_ingestion_dir: Option<PathBuf>,
66    fullnode_run_with_range: Option<RunWithRange>,
67    fullnode_policy_config: Option<PolicyConfig>,
68    fullnode_fw_config: Option<RemoteFirewallConfig>,
69    max_submit_position: Option<usize>,
70    submit_delay_step_override_millis: Option<u64>,
71    state_accumulator_config: StateAccumulatorV1EnabledConfig,
72    disable_fullnode_pruning: bool,
73    iota_names_config: Option<IotaNamesConfig>,
74}
75
76impl SwarmBuilder {
77    #[expect(clippy::new_without_default)]
78    pub fn new() -> Self {
79        Self {
80            rng: OsRng,
81            dir: None,
82            committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
83            genesis_config: None,
84            network_config: None,
85            additional_objects: vec![],
86            fullnode_count: 0,
87            fullnode_rpc_port: None,
88            fullnode_rpc_addr: None,
89            supported_protocol_versions_config: ProtocolVersionsConfig::Default,
90            fullnode_supported_protocol_versions_config: None,
91            db_checkpoint_config: DBCheckpointConfig::default(),
92            jwk_fetch_interval: None,
93            num_unpruned_validators: None,
94            authority_overload_config: None,
95            execution_cache_type: None,
96            execution_cache_config: None,
97            data_ingestion_dir: None,
98            fullnode_run_with_range: None,
99            fullnode_policy_config: None,
100            fullnode_fw_config: None,
101            max_submit_position: None,
102            submit_delay_step_override_millis: None,
103            state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
104            disable_fullnode_pruning: false,
105            iota_names_config: None,
106        }
107    }
108}
109
110impl<R> SwarmBuilder<R> {
111    pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
112        SwarmBuilder {
113            rng,
114            dir: self.dir,
115            committee: self.committee,
116            genesis_config: self.genesis_config,
117            network_config: self.network_config,
118            additional_objects: self.additional_objects,
119            fullnode_count: self.fullnode_count,
120            fullnode_rpc_port: self.fullnode_rpc_port,
121            fullnode_rpc_addr: self.fullnode_rpc_addr,
122            supported_protocol_versions_config: self.supported_protocol_versions_config,
123            fullnode_supported_protocol_versions_config: self
124                .fullnode_supported_protocol_versions_config,
125            db_checkpoint_config: self.db_checkpoint_config,
126            jwk_fetch_interval: self.jwk_fetch_interval,
127            num_unpruned_validators: self.num_unpruned_validators,
128            authority_overload_config: self.authority_overload_config,
129            execution_cache_type: self.execution_cache_type,
130            execution_cache_config: self.execution_cache_config,
131            data_ingestion_dir: self.data_ingestion_dir,
132            fullnode_run_with_range: self.fullnode_run_with_range,
133            fullnode_policy_config: self.fullnode_policy_config,
134            fullnode_fw_config: self.fullnode_fw_config,
135            max_submit_position: self.max_submit_position,
136            submit_delay_step_override_millis: self.submit_delay_step_override_millis,
137            state_accumulator_config: self.state_accumulator_config,
138            disable_fullnode_pruning: self.disable_fullnode_pruning,
139            iota_names_config: self.iota_names_config,
140        }
141    }
142
143    /// Set the directory that should be used by the Swarm for any on-disk data.
144    ///
145    /// If a directory is provided, it will not be cleaned up when the Swarm is
146    /// dropped.
147    ///
148    /// Defaults to using a temporary directory that will be cleaned up when the
149    /// Swarm is dropped.
150    pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
151        self.dir = Some(dir.into());
152        self
153    }
154
155    /// Set the committee size (the number of validators in the validator set).
156    ///
157    /// Defaults to 1.
158    pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
159        self.committee = CommitteeConfig::Size(committee_size);
160        self
161    }
162
163    pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
164        self.committee = CommitteeConfig::Validators(validators);
165        self
166    }
167
168    pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
169        assert!(self.network_config.is_none() && self.genesis_config.is_none());
170        self.genesis_config = Some(genesis_config);
171        self
172    }
173
174    pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
175        assert!(self.network_config.is_none());
176        self.num_unpruned_validators = Some(n);
177        self
178    }
179
180    pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
181        self.jwk_fetch_interval = Some(i);
182        self
183    }
184
185    pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
186        assert!(self.network_config.is_none() && self.genesis_config.is_none());
187        self.network_config = Some(network_config);
188        self
189    }
190
191    pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
192        self.get_or_init_genesis_config().accounts = accounts;
193        self
194    }
195
196    pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
197        self.additional_objects.extend(objects);
198        self
199    }
200
201    pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
202        self.fullnode_count = fullnode_count;
203        self
204    }
205
206    pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
207        assert!(self.fullnode_rpc_addr.is_none());
208        self.fullnode_rpc_port = Some(fullnode_rpc_port);
209        self
210    }
211
212    pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
213        assert!(self.fullnode_rpc_port.is_none());
214        self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
215        self
216    }
217
218    pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
219        self.get_or_init_genesis_config()
220            .parameters
221            .epoch_duration_ms = epoch_duration_ms;
222        self
223    }
224
225    pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
226        self.get_or_init_genesis_config()
227            .parameters
228            .protocol_version = v;
229        self
230    }
231
232    pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
233        self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
234        self
235    }
236
237    pub fn with_supported_protocol_version_callback(
238        mut self,
239        func: SupportedProtocolVersionsCallback,
240    ) -> Self {
241        self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
242        self
243    }
244
245    pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
246        self.supported_protocol_versions_config = c;
247        self
248    }
249
250    pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
251        self.state_accumulator_config = c;
252        self
253    }
254
255    pub fn with_fullnode_supported_protocol_versions_config(
256        mut self,
257        c: ProtocolVersionsConfig,
258    ) -> Self {
259        self.fullnode_supported_protocol_versions_config = Some(c);
260        self
261    }
262
263    pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
264        self.db_checkpoint_config = db_checkpoint_config;
265        self
266    }
267
268    pub fn with_authority_overload_config(
269        mut self,
270        authority_overload_config: AuthorityOverloadConfig,
271    ) -> Self {
272        assert!(self.network_config.is_none());
273        self.authority_overload_config = Some(authority_overload_config);
274        self
275    }
276
277    pub fn with_execution_cache_type(mut self, execution_cache_type: ExecutionCacheType) -> Self {
278        assert!(self.execution_cache_type.is_none());
279        self.execution_cache_type = Some(execution_cache_type);
280        self
281    }
282
283    pub fn with_execution_cache_config(
284        mut self,
285        execution_cache_config: ExecutionCacheConfig,
286    ) -> Self {
287        self.execution_cache_config = Some(execution_cache_config);
288        self
289    }
290
291    pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
292        self.data_ingestion_dir = Some(path);
293        self
294    }
295
296    pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
297        if let Some(run_with_range) = run_with_range {
298            self.fullnode_run_with_range = Some(run_with_range);
299        }
300        self
301    }
302
303    pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
304        self.fullnode_policy_config = config;
305        self
306    }
307
308    pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
309        self.fullnode_fw_config = config;
310        self
311    }
312
313    fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
314        if self.genesis_config.is_none() {
315            assert!(self.network_config.is_none());
316            self.genesis_config = Some(GenesisConfig::for_local_testing());
317        }
318        self.genesis_config.as_mut().unwrap()
319    }
320
321    pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
322        self.max_submit_position = Some(max_submit_position);
323        self
324    }
325
326    pub fn with_disable_fullnode_pruning(mut self) -> Self {
327        self.disable_fullnode_pruning = true;
328        self
329    }
330
331    pub fn with_submit_delay_step_override_millis(
332        mut self,
333        submit_delay_step_override_millis: u64,
334    ) -> Self {
335        self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
336        self
337    }
338
339    pub fn with_iota_names_config(mut self, iota_names_config: IotaNamesConfig) -> Self {
340        self.iota_names_config = Some(iota_names_config);
341        self
342    }
343}
344
345impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
346    /// Create the configured Swarm.
347    pub fn build(self) -> Swarm {
348        let dir = if let Some(dir) = self.dir {
349            SwarmDirectory::Persistent(dir)
350        } else {
351            SwarmDirectory::new_temporary()
352        };
353
354        let ingest_data = self.data_ingestion_dir.clone();
355
356        let network_config = self.network_config.unwrap_or_else(|| {
357            let mut config_builder = ConfigBuilder::new(dir.as_ref());
358
359            if let Some(genesis_config) = self.genesis_config {
360                config_builder = config_builder.with_genesis_config(genesis_config);
361            }
362
363            if let Some(num_unpruned_validators) = self.num_unpruned_validators {
364                config_builder =
365                    config_builder.with_num_unpruned_validators(num_unpruned_validators);
366            }
367
368            if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
369                config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
370            }
371
372            if let Some(authority_overload_config) = self.authority_overload_config {
373                config_builder =
374                    config_builder.with_authority_overload_config(authority_overload_config);
375            }
376
377            if let Some(execution_cache_type) = self.execution_cache_type {
378                config_builder = config_builder.with_execution_cache_type(execution_cache_type);
379            }
380
381            if let Some(execution_cache_config) = self.execution_cache_config {
382                config_builder = config_builder.with_execution_cache_config(execution_cache_config);
383            }
384
385            if let Some(path) = self.data_ingestion_dir {
386                config_builder = config_builder.with_data_ingestion_dir(path);
387            }
388
389            if let Some(max_submit_position) = self.max_submit_position {
390                config_builder = config_builder.with_max_submit_position(max_submit_position);
391            }
392
393            if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
394            {
395                config_builder = config_builder
396                    .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
397            }
398
399            let mut network_config = config_builder
400                .committee(self.committee)
401                .rng(self.rng)
402                .with_objects(self.additional_objects)
403                .with_empty_validator_genesis()
404                .with_supported_protocol_versions_config(
405                    self.supported_protocol_versions_config.clone(),
406                )
407                .with_state_accumulator_config(self.state_accumulator_config.clone())
408                .build();
409            // Populate validator genesis by pointing to the blob
410            let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
411            network_config
412                .genesis
413                .save(&genesis_path)
414                .expect("genesis should be saved successfully");
415            for validator in &mut network_config.validator_configs {
416                validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
417            }
418            network_config
419        });
420
421        let mut nodes: HashMap<_, _> = network_config
422            .validator_configs()
423            .iter()
424            .map(|config| {
425                info!(
426                    "SwarmBuilder configuring validator with name {}",
427                    config.authority_public_key()
428                );
429                (config.authority_public_key(), Node::new(config.to_owned()))
430            })
431            .collect();
432
433        let mut fullnode_config_builder = FullnodeConfigBuilder::new()
434            .with_config_directory(dir.as_ref().into())
435            .with_db_checkpoint_config(self.db_checkpoint_config.clone())
436            .with_run_with_range(self.fullnode_run_with_range)
437            .with_policy_config(self.fullnode_policy_config)
438            .with_data_ingestion_dir(ingest_data)
439            .with_fw_config(self.fullnode_fw_config)
440            .with_disable_pruning(self.disable_fullnode_pruning)
441            .with_iota_names_config(self.iota_names_config);
442
443        if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
444            let supported_versions = match spvc {
445                ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
446                ProtocolVersionsConfig::Global(v) => *v,
447                ProtocolVersionsConfig::PerValidator(func) => func(0, None),
448            };
449            fullnode_config_builder =
450                fullnode_config_builder.with_supported_protocol_versions(supported_versions);
451        }
452
453        if self.fullnode_count > 0 {
454            (0..self.fullnode_count).for_each(|idx| {
455                let mut builder = fullnode_config_builder.clone();
456                if idx == 0 {
457                    // Only the first fullnode is used as the rpc fullnode, we can only use the
458                    // same address once.
459                    if let Some(rpc_addr) = self.fullnode_rpc_addr {
460                        builder = builder.with_rpc_addr(rpc_addr);
461                    }
462                    if let Some(rpc_port) = self.fullnode_rpc_port {
463                        builder = builder.with_rpc_port(rpc_port);
464                    }
465                }
466                let config = builder.build(&mut OsRng, &network_config);
467                info!(
468                    "SwarmBuilder configuring full node with name {}",
469                    config.authority_public_key()
470                );
471                nodes.insert(config.authority_public_key(), Node::new(config));
472            });
473        }
474        Swarm {
475            dir,
476            network_config,
477            nodes,
478            fullnode_config_builder,
479        }
480    }
481}
482
483/// A handle to an in-memory IOTA Network.
484#[derive(Debug)]
485pub struct Swarm {
486    dir: SwarmDirectory,
487    network_config: NetworkConfig,
488    nodes: HashMap<AuthorityName, Node>,
489    // Save a copy of the fullnode config builder to build future fullnodes.
490    fullnode_config_builder: FullnodeConfigBuilder,
491}
492
493impl Drop for Swarm {
494    fn drop(&mut self) {
495        self.nodes_iter_mut().for_each(|node| node.stop());
496    }
497}
498
499impl Swarm {
500    fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
501        self.nodes.values_mut()
502    }
503
504    /// Return a new Builder
505    pub fn builder() -> SwarmBuilder {
506        SwarmBuilder::new()
507    }
508
509    /// Start all nodes associated with this Swarm
510    pub async fn launch(&mut self) -> Result<()> {
511        try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
512        tracing::info!("Successfully launched Swarm");
513        Ok(())
514    }
515
516    /// Return the path to the directory where this Swarm's on-disk data is
517    /// kept.
518    pub fn dir(&self) -> &Path {
519        self.dir.as_ref()
520    }
521
522    /// Return a reference to this Swarm's `NetworkConfig`.
523    pub fn config(&self) -> &NetworkConfig {
524        &self.network_config
525    }
526
527    /// Return a mutable reference to this Swarm's `NetworkConfig`.
528    // TODO: It's not ideal to mutate network config. We should consider removing
529    // this.
530    pub fn config_mut(&mut self) -> &mut NetworkConfig {
531        &mut self.network_config
532    }
533
534    pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
535        self.nodes.values()
536    }
537
538    pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
539        self.nodes.get(name)
540    }
541
542    pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
543        self.nodes.get_mut(name)
544    }
545
546    /// Return an iterator over shared references of all nodes that are set up
547    /// as validators. This means that they have a consensus config. This
548    /// however doesn't mean this validator is currently active (i.e. it's
549    /// not necessarily in the validator set at the moment).
550    pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
551        self.nodes
552            .values()
553            .filter(|node| node.config().consensus_config.is_some())
554    }
555
556    pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
557        self.validator_nodes()
558            .map(|node| node.get_node_handle().unwrap())
559            .collect()
560    }
561
562    /// Returns an iterator over all currently active validators.
563    pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
564        self.validator_nodes().filter(|node| {
565            node.get_node_handle().is_some_and(|handle| {
566                let state = handle.state();
567                state.is_validator(&state.epoch_store_for_testing())
568            })
569        })
570    }
571
572    /// Return an iterator over shared references of all Fullnodes.
573    pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
574        self.nodes
575            .values()
576            .filter(|node| node.config().consensus_config.is_none())
577    }
578
579    pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
580        let name = config.authority_public_key();
581        let node = Node::new(config);
582        node.start().await.unwrap();
583        let handle = node.get_node_handle().unwrap();
584        self.nodes.insert(name, node);
585        handle
586    }
587
588    pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
589        self.fullnode_config_builder.clone()
590    }
591}
592
593#[derive(Debug)]
594enum SwarmDirectory {
595    Persistent(PathBuf),
596    Temporary(TempDir),
597}
598
599impl SwarmDirectory {
600    fn new_temporary() -> Self {
601        SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
602    }
603}
604
605impl ops::Deref for SwarmDirectory {
606    type Target = Path;
607
608    fn deref(&self) -> &Self::Target {
609        match self {
610            SwarmDirectory::Persistent(dir) => dir.deref(),
611            SwarmDirectory::Temporary(dir) => dir.path(),
612        }
613    }
614}
615
616impl AsRef<Path> for SwarmDirectory {
617    fn as_ref(&self) -> &Path {
618        match self {
619            SwarmDirectory::Persistent(dir) => dir.as_ref(),
620            SwarmDirectory::Temporary(dir) => dir.as_ref(),
621        }
622    }
623}
624
625#[cfg(test)]
626mod test {
627    use std::num::NonZeroUsize;
628
629    use super::Swarm;
630
631    #[tokio::test]
632    async fn launch() {
633        telemetry_subscribers::init_for_testing();
634        let mut swarm = Swarm::builder()
635            .committee_size(NonZeroUsize::new(4).unwrap())
636            .with_fullnode_count(1)
637            .build();
638
639        swarm.launch().await.unwrap();
640
641        for validator in swarm.validator_nodes() {
642            validator.health_check(true).await.unwrap();
643        }
644
645        for fullnode in swarm.fullnodes() {
646            fullnode.health_check(false).await.unwrap();
647        }
648
649        println!("hello");
650    }
651}