iota_swarm/memory/
swarm.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    num::NonZeroUsize,
9    ops,
10    path::{Path, PathBuf},
11    time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17    IOTA_GENESIS_FILENAME, NodeConfig,
18    node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange},
19};
20use iota_macros::nondeterministic;
21use iota_node::IotaNodeHandle;
22use iota_protocol_config::ProtocolVersion;
23use iota_swarm_config::{
24    genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
25    network_config::NetworkConfig,
26    network_config_builder::{
27        CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
28        SupportedProtocolVersionsCallback,
29    },
30    node_config_builder::FullnodeConfigBuilder,
31};
32use iota_types::{
33    base_types::AuthorityName,
34    object::Object,
35    supported_protocol_versions::SupportedProtocolVersions,
36    traffic_control::{PolicyConfig, RemoteFirewallConfig},
37};
38use rand::rngs::OsRng;
39use tempfile::TempDir;
40use tracing::info;
41
42use super::Node;
43
44pub struct SwarmBuilder<R = OsRng> {
45    rng: R,
46    // template: NodeConfig,
47    dir: Option<PathBuf>,
48    committee: CommitteeConfig,
49    genesis_config: Option<GenesisConfig>,
50    network_config: Option<NetworkConfig>,
51    additional_objects: Vec<Object>,
52    fullnode_count: usize,
53    fullnode_rpc_port: Option<u16>,
54    fullnode_rpc_addr: Option<SocketAddr>,
55    supported_protocol_versions_config: ProtocolVersionsConfig,
56    // Default to supported_protocol_versions_config, but can be overridden.
57    fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
58    db_checkpoint_config: DBCheckpointConfig,
59    jwk_fetch_interval: Option<Duration>,
60    num_unpruned_validators: Option<usize>,
61    authority_overload_config: Option<AuthorityOverloadConfig>,
62    data_ingestion_dir: Option<PathBuf>,
63    fullnode_run_with_range: Option<RunWithRange>,
64    fullnode_policy_config: Option<PolicyConfig>,
65    fullnode_fw_config: Option<RemoteFirewallConfig>,
66    max_submit_position: Option<usize>,
67    submit_delay_step_override_millis: Option<u64>,
68    state_accumulator_config: StateAccumulatorV1EnabledConfig,
69    disable_fullnode_pruning: bool,
70}
71
72impl SwarmBuilder {
73    #[expect(clippy::new_without_default)]
74    pub fn new() -> Self {
75        Self {
76            rng: OsRng,
77            dir: None,
78            committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
79            genesis_config: None,
80            network_config: None,
81            additional_objects: vec![],
82            fullnode_count: 0,
83            fullnode_rpc_port: None,
84            fullnode_rpc_addr: None,
85            supported_protocol_versions_config: ProtocolVersionsConfig::Default,
86            fullnode_supported_protocol_versions_config: None,
87            db_checkpoint_config: DBCheckpointConfig::default(),
88            jwk_fetch_interval: None,
89            num_unpruned_validators: None,
90            authority_overload_config: None,
91            data_ingestion_dir: None,
92            fullnode_run_with_range: None,
93            fullnode_policy_config: None,
94            fullnode_fw_config: None,
95            max_submit_position: None,
96            submit_delay_step_override_millis: None,
97            state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
98            disable_fullnode_pruning: false,
99        }
100    }
101}
102
103impl<R> SwarmBuilder<R> {
104    pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
105        SwarmBuilder {
106            rng,
107            dir: self.dir,
108            committee: self.committee,
109            genesis_config: self.genesis_config,
110            network_config: self.network_config,
111            additional_objects: self.additional_objects,
112            fullnode_count: self.fullnode_count,
113            fullnode_rpc_port: self.fullnode_rpc_port,
114            fullnode_rpc_addr: self.fullnode_rpc_addr,
115            supported_protocol_versions_config: self.supported_protocol_versions_config,
116            fullnode_supported_protocol_versions_config: self
117                .fullnode_supported_protocol_versions_config,
118            db_checkpoint_config: self.db_checkpoint_config,
119            jwk_fetch_interval: self.jwk_fetch_interval,
120            num_unpruned_validators: self.num_unpruned_validators,
121            authority_overload_config: self.authority_overload_config,
122            data_ingestion_dir: self.data_ingestion_dir,
123            fullnode_run_with_range: self.fullnode_run_with_range,
124            fullnode_policy_config: self.fullnode_policy_config,
125            fullnode_fw_config: self.fullnode_fw_config,
126            max_submit_position: self.max_submit_position,
127            submit_delay_step_override_millis: self.submit_delay_step_override_millis,
128            state_accumulator_config: self.state_accumulator_config,
129            disable_fullnode_pruning: self.disable_fullnode_pruning,
130        }
131    }
132
133    /// Set the directory that should be used by the Swarm for any on-disk data.
134    ///
135    /// If a directory is provided, it will not be cleaned up when the Swarm is
136    /// dropped.
137    ///
138    /// Defaults to using a temporary directory that will be cleaned up when the
139    /// Swarm is dropped.
140    pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
141        self.dir = Some(dir.into());
142        self
143    }
144
145    /// Set the committee size (the number of validators in the validator set).
146    ///
147    /// Defaults to 1.
148    pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
149        self.committee = CommitteeConfig::Size(committee_size);
150        self
151    }
152
153    pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
154        self.committee = CommitteeConfig::Validators(validators);
155        self
156    }
157
158    pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
159        assert!(self.network_config.is_none() && self.genesis_config.is_none());
160        self.genesis_config = Some(genesis_config);
161        self
162    }
163
164    pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
165        assert!(self.network_config.is_none());
166        self.num_unpruned_validators = Some(n);
167        self
168    }
169
170    pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
171        self.jwk_fetch_interval = Some(i);
172        self
173    }
174
175    pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
176        assert!(self.network_config.is_none() && self.genesis_config.is_none());
177        self.network_config = Some(network_config);
178        self
179    }
180
181    pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
182        self.get_or_init_genesis_config().accounts = accounts;
183        self
184    }
185
186    pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
187        self.additional_objects.extend(objects);
188        self
189    }
190
191    pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
192        self.fullnode_count = fullnode_count;
193        self
194    }
195
196    pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
197        assert!(self.fullnode_rpc_addr.is_none());
198        self.fullnode_rpc_port = Some(fullnode_rpc_port);
199        self
200    }
201
202    pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
203        assert!(self.fullnode_rpc_port.is_none());
204        self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
205        self
206    }
207
208    pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
209        self.get_or_init_genesis_config()
210            .parameters
211            .epoch_duration_ms = epoch_duration_ms;
212        self
213    }
214
215    pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
216        self.get_or_init_genesis_config()
217            .parameters
218            .protocol_version = v;
219        self
220    }
221
222    pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
223        self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
224        self
225    }
226
227    pub fn with_supported_protocol_version_callback(
228        mut self,
229        func: SupportedProtocolVersionsCallback,
230    ) -> Self {
231        self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
232        self
233    }
234
235    pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
236        self.supported_protocol_versions_config = c;
237        self
238    }
239
240    pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
241        self.state_accumulator_config = c;
242        self
243    }
244
245    pub fn with_fullnode_supported_protocol_versions_config(
246        mut self,
247        c: ProtocolVersionsConfig,
248    ) -> Self {
249        self.fullnode_supported_protocol_versions_config = Some(c);
250        self
251    }
252
253    pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
254        self.db_checkpoint_config = db_checkpoint_config;
255        self
256    }
257
258    pub fn with_authority_overload_config(
259        mut self,
260        authority_overload_config: AuthorityOverloadConfig,
261    ) -> Self {
262        assert!(self.network_config.is_none());
263        self.authority_overload_config = Some(authority_overload_config);
264        self
265    }
266
267    pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
268        self.data_ingestion_dir = Some(path);
269        self
270    }
271
272    pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
273        if let Some(run_with_range) = run_with_range {
274            self.fullnode_run_with_range = Some(run_with_range);
275        }
276        self
277    }
278
279    pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
280        self.fullnode_policy_config = config;
281        self
282    }
283
284    pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
285        self.fullnode_fw_config = config;
286        self
287    }
288
289    fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
290        if self.genesis_config.is_none() {
291            assert!(self.network_config.is_none());
292            self.genesis_config = Some(GenesisConfig::for_local_testing());
293        }
294        self.genesis_config.as_mut().unwrap()
295    }
296
297    pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
298        self.max_submit_position = Some(max_submit_position);
299        self
300    }
301
302    pub fn with_disable_fullnode_pruning(mut self) -> Self {
303        self.disable_fullnode_pruning = true;
304        self
305    }
306
307    pub fn with_submit_delay_step_override_millis(
308        mut self,
309        submit_delay_step_override_millis: u64,
310    ) -> Self {
311        self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
312        self
313    }
314}
315
316impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
317    /// Create the configured Swarm.
318    pub fn build(self) -> Swarm {
319        let dir = if let Some(dir) = self.dir {
320            SwarmDirectory::Persistent(dir)
321        } else {
322            SwarmDirectory::new_temporary()
323        };
324
325        let ingest_data = self.data_ingestion_dir.clone();
326
327        let network_config = self.network_config.unwrap_or_else(|| {
328            let mut config_builder = ConfigBuilder::new(dir.as_ref());
329
330            if let Some(genesis_config) = self.genesis_config {
331                config_builder = config_builder.with_genesis_config(genesis_config);
332            }
333
334            if let Some(num_unpruned_validators) = self.num_unpruned_validators {
335                config_builder =
336                    config_builder.with_num_unpruned_validators(num_unpruned_validators);
337            }
338
339            if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
340                config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
341            }
342
343            if let Some(authority_overload_config) = self.authority_overload_config {
344                config_builder =
345                    config_builder.with_authority_overload_config(authority_overload_config);
346            }
347
348            if let Some(path) = self.data_ingestion_dir {
349                config_builder = config_builder.with_data_ingestion_dir(path);
350            }
351
352            if let Some(max_submit_position) = self.max_submit_position {
353                config_builder = config_builder.with_max_submit_position(max_submit_position);
354            }
355
356            if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
357            {
358                config_builder = config_builder
359                    .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
360            }
361
362            let mut network_config = config_builder
363                .committee(self.committee)
364                .rng(self.rng)
365                .with_objects(self.additional_objects)
366                .with_empty_validator_genesis()
367                .with_supported_protocol_versions_config(
368                    self.supported_protocol_versions_config.clone(),
369                )
370                .with_state_accumulator_config(self.state_accumulator_config.clone())
371                .build();
372            // Populate validator genesis by pointing to the blob
373            let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
374            network_config
375                .genesis
376                .save(&genesis_path)
377                .expect("genesis should be saved successfully");
378            for validator in &mut network_config.validator_configs {
379                validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
380            }
381            network_config
382        });
383
384        let mut nodes: HashMap<_, _> = network_config
385            .validator_configs()
386            .iter()
387            .map(|config| {
388                info!(
389                    "SwarmBuilder configuring validator with name {}",
390                    config.authority_public_key()
391                );
392                (config.authority_public_key(), Node::new(config.to_owned()))
393            })
394            .collect();
395
396        let mut fullnode_config_builder = FullnodeConfigBuilder::new()
397            .with_config_directory(dir.as_ref().into())
398            .with_db_checkpoint_config(self.db_checkpoint_config.clone())
399            .with_run_with_range(self.fullnode_run_with_range)
400            .with_policy_config(self.fullnode_policy_config)
401            .with_data_ingestion_dir(ingest_data)
402            .with_fw_config(self.fullnode_fw_config)
403            .with_disable_pruning(self.disable_fullnode_pruning);
404
405        if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
406            let supported_versions = match spvc {
407                ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
408                ProtocolVersionsConfig::Global(v) => *v,
409                ProtocolVersionsConfig::PerValidator(func) => func(0, None),
410            };
411            fullnode_config_builder =
412                fullnode_config_builder.with_supported_protocol_versions(supported_versions);
413        }
414
415        if self.fullnode_count > 0 {
416            (0..self.fullnode_count).for_each(|idx| {
417                let mut builder = fullnode_config_builder.clone();
418                if idx == 0 {
419                    // Only the first fullnode is used as the rpc fullnode, we can only use the
420                    // same address once.
421                    if let Some(rpc_addr) = self.fullnode_rpc_addr {
422                        builder = builder.with_rpc_addr(rpc_addr);
423                    }
424                    if let Some(rpc_port) = self.fullnode_rpc_port {
425                        builder = builder.with_rpc_port(rpc_port);
426                    }
427                }
428                let config = builder.build(&mut OsRng, &network_config);
429                info!(
430                    "SwarmBuilder configuring full node with name {}",
431                    config.authority_public_key()
432                );
433                nodes.insert(config.authority_public_key(), Node::new(config));
434            });
435        }
436        Swarm {
437            dir,
438            network_config,
439            nodes,
440            fullnode_config_builder,
441        }
442    }
443}
444
445/// A handle to an in-memory IOTA Network.
446#[derive(Debug)]
447pub struct Swarm {
448    dir: SwarmDirectory,
449    network_config: NetworkConfig,
450    nodes: HashMap<AuthorityName, Node>,
451    // Save a copy of the fullnode config builder to build future fullnodes.
452    fullnode_config_builder: FullnodeConfigBuilder,
453}
454
455impl Drop for Swarm {
456    fn drop(&mut self) {
457        self.nodes_iter_mut().for_each(|node| node.stop());
458    }
459}
460
461impl Swarm {
462    fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
463        self.nodes.values_mut()
464    }
465
466    /// Return a new Builder
467    pub fn builder() -> SwarmBuilder {
468        SwarmBuilder::new()
469    }
470
471    /// Start all nodes associated with this Swarm
472    pub async fn launch(&mut self) -> Result<()> {
473        try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
474        tracing::info!("Successfully launched Swarm");
475        Ok(())
476    }
477
478    /// Return the path to the directory where this Swarm's on-disk data is
479    /// kept.
480    pub fn dir(&self) -> &Path {
481        self.dir.as_ref()
482    }
483
484    /// Return a reference to this Swarm's `NetworkConfig`.
485    pub fn config(&self) -> &NetworkConfig {
486        &self.network_config
487    }
488
489    /// Return a mutable reference to this Swarm's `NetworkConfig`.
490    // TODO: It's not ideal to mutate network config. We should consider removing
491    // this.
492    pub fn config_mut(&mut self) -> &mut NetworkConfig {
493        &mut self.network_config
494    }
495
496    pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
497        self.nodes.values()
498    }
499
500    pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
501        self.nodes.get(name)
502    }
503
504    pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
505        self.nodes.get_mut(name)
506    }
507
508    /// Return an iterator over shared references of all nodes that are set up
509    /// as validators. This means that they have a consensus config. This
510    /// however doesn't mean this validator is currently active (i.e. it's
511    /// not necessarily in the validator set at the moment).
512    pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
513        self.nodes
514            .values()
515            .filter(|node| node.config().consensus_config.is_some())
516    }
517
518    pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
519        self.validator_nodes()
520            .map(|node| node.get_node_handle().unwrap())
521            .collect()
522    }
523
524    /// Returns an iterator over all currently active validators.
525    pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
526        self.validator_nodes().filter(|node| {
527            node.get_node_handle().is_some_and(|handle| {
528                let state = handle.state();
529                state.is_validator(&state.epoch_store_for_testing())
530            })
531        })
532    }
533
534    /// Return an iterator over shared references of all Fullnodes.
535    pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
536        self.nodes
537            .values()
538            .filter(|node| node.config().consensus_config.is_none())
539    }
540
541    pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
542        let name = config.authority_public_key();
543        let node = Node::new(config);
544        node.start().await.unwrap();
545        let handle = node.get_node_handle().unwrap();
546        self.nodes.insert(name, node);
547        handle
548    }
549
550    pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
551        self.fullnode_config_builder.clone()
552    }
553}
554
555#[derive(Debug)]
556enum SwarmDirectory {
557    Persistent(PathBuf),
558    Temporary(TempDir),
559}
560
561impl SwarmDirectory {
562    fn new_temporary() -> Self {
563        SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
564    }
565}
566
567impl ops::Deref for SwarmDirectory {
568    type Target = Path;
569
570    fn deref(&self) -> &Self::Target {
571        match self {
572            SwarmDirectory::Persistent(dir) => dir.deref(),
573            SwarmDirectory::Temporary(dir) => dir.path(),
574        }
575    }
576}
577
578impl AsRef<Path> for SwarmDirectory {
579    fn as_ref(&self) -> &Path {
580        match self {
581            SwarmDirectory::Persistent(dir) => dir.as_ref(),
582            SwarmDirectory::Temporary(dir) => dir.as_ref(),
583        }
584    }
585}
586
587#[cfg(test)]
588mod test {
589    use std::num::NonZeroUsize;
590
591    use super::Swarm;
592
593    #[tokio::test]
594    async fn launch() {
595        telemetry_subscribers::init_for_testing();
596        let mut swarm = Swarm::builder()
597            .committee_size(NonZeroUsize::new(4).unwrap())
598            .with_fullnode_count(1)
599            .build();
600
601        swarm.launch().await.unwrap();
602
603        for validator in swarm.validator_nodes() {
604            validator.health_check(true).await.unwrap();
605        }
606
607        for fullnode in swarm.fullnodes() {
608            fullnode.health_check(false).await.unwrap();
609        }
610
611        println!("hello");
612    }
613}