iota_swarm/memory/
swarm.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    num::NonZeroUsize,
9    ops,
10    path::{Path, PathBuf},
11    time::Duration,
12};
13
14use anyhow::Result;
15use futures::future::try_join_all;
16use iota_config::{
17    IOTA_GENESIS_FILENAME, NodeConfig,
18    node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange},
19};
20use iota_macros::nondeterministic;
21use iota_node::IotaNodeHandle;
22use iota_protocol_config::ProtocolVersion;
23use iota_swarm_config::{
24    genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig},
25    network_config::NetworkConfig,
26    network_config_builder::{
27        CommitteeConfig, ConfigBuilder, ProtocolVersionsConfig, StateAccumulatorV1EnabledConfig,
28        SupportedProtocolVersionsCallback,
29    },
30    node_config_builder::FullnodeConfigBuilder,
31};
32use iota_types::{
33    base_types::AuthorityName,
34    object::Object,
35    supported_protocol_versions::SupportedProtocolVersions,
36    traffic_control::{PolicyConfig, RemoteFirewallConfig},
37};
38use rand::rngs::OsRng;
39use tempfile::TempDir;
40use tracing::info;
41
42use super::Node;
43
44pub struct SwarmBuilder<R = OsRng> {
45    rng: R,
46    // template: NodeConfig,
47    dir: Option<PathBuf>,
48    committee: CommitteeConfig,
49    genesis_config: Option<GenesisConfig>,
50    network_config: Option<NetworkConfig>,
51    additional_objects: Vec<Object>,
52    fullnode_count: usize,
53    fullnode_rpc_port: Option<u16>,
54    fullnode_rpc_addr: Option<SocketAddr>,
55    supported_protocol_versions_config: ProtocolVersionsConfig,
56    // Default to supported_protocol_versions_config, but can be overridden.
57    fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
58    db_checkpoint_config: DBCheckpointConfig,
59    jwk_fetch_interval: Option<Duration>,
60    num_unpruned_validators: Option<usize>,
61    authority_overload_config: Option<AuthorityOverloadConfig>,
62    data_ingestion_dir: Option<PathBuf>,
63    fullnode_run_with_range: Option<RunWithRange>,
64    fullnode_policy_config: Option<PolicyConfig>,
65    fullnode_fw_config: Option<RemoteFirewallConfig>,
66    max_submit_position: Option<usize>,
67    submit_delay_step_override_millis: Option<u64>,
68    state_accumulator_config: StateAccumulatorV1EnabledConfig,
69}
70
71impl SwarmBuilder {
72    #[expect(clippy::new_without_default)]
73    pub fn new() -> Self {
74        Self {
75            rng: OsRng,
76            dir: None,
77            committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
78            genesis_config: None,
79            network_config: None,
80            additional_objects: vec![],
81            fullnode_count: 0,
82            fullnode_rpc_port: None,
83            fullnode_rpc_addr: None,
84            supported_protocol_versions_config: ProtocolVersionsConfig::Default,
85            fullnode_supported_protocol_versions_config: None,
86            db_checkpoint_config: DBCheckpointConfig::default(),
87            jwk_fetch_interval: None,
88            num_unpruned_validators: None,
89            authority_overload_config: None,
90            data_ingestion_dir: None,
91            fullnode_run_with_range: None,
92            fullnode_policy_config: None,
93            fullnode_fw_config: None,
94            max_submit_position: None,
95            submit_delay_step_override_millis: None,
96            state_accumulator_config: StateAccumulatorV1EnabledConfig::Global(true),
97        }
98    }
99}
100
101impl<R> SwarmBuilder<R> {
102    pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
103        SwarmBuilder {
104            rng,
105            dir: self.dir,
106            committee: self.committee,
107            genesis_config: self.genesis_config,
108            network_config: self.network_config,
109            additional_objects: self.additional_objects,
110            fullnode_count: self.fullnode_count,
111            fullnode_rpc_port: self.fullnode_rpc_port,
112            fullnode_rpc_addr: self.fullnode_rpc_addr,
113            supported_protocol_versions_config: self.supported_protocol_versions_config,
114            fullnode_supported_protocol_versions_config: self
115                .fullnode_supported_protocol_versions_config,
116            db_checkpoint_config: self.db_checkpoint_config,
117            jwk_fetch_interval: self.jwk_fetch_interval,
118            num_unpruned_validators: self.num_unpruned_validators,
119            authority_overload_config: self.authority_overload_config,
120            data_ingestion_dir: self.data_ingestion_dir,
121            fullnode_run_with_range: self.fullnode_run_with_range,
122            fullnode_policy_config: self.fullnode_policy_config,
123            fullnode_fw_config: self.fullnode_fw_config,
124            max_submit_position: self.max_submit_position,
125            submit_delay_step_override_millis: self.submit_delay_step_override_millis,
126            state_accumulator_config: self.state_accumulator_config,
127        }
128    }
129
130    /// Set the directory that should be used by the Swarm for any on-disk data.
131    ///
132    /// If a directory is provided, it will not be cleaned up when the Swarm is
133    /// dropped.
134    ///
135    /// Defaults to using a temporary directory that will be cleaned up when the
136    /// Swarm is dropped.
137    pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
138        self.dir = Some(dir.into());
139        self
140    }
141
142    /// Set the committee size (the number of validators in the validator set).
143    ///
144    /// Defaults to 1.
145    pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
146        self.committee = CommitteeConfig::Size(committee_size);
147        self
148    }
149
150    pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
151        self.committee = CommitteeConfig::Validators(validators);
152        self
153    }
154
155    pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
156        assert!(self.network_config.is_none() && self.genesis_config.is_none());
157        self.genesis_config = Some(genesis_config);
158        self
159    }
160
161    pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
162        assert!(self.network_config.is_none());
163        self.num_unpruned_validators = Some(n);
164        self
165    }
166
167    pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
168        self.jwk_fetch_interval = Some(i);
169        self
170    }
171
172    pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
173        assert!(self.network_config.is_none() && self.genesis_config.is_none());
174        self.network_config = Some(network_config);
175        self
176    }
177
178    pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
179        self.get_or_init_genesis_config().accounts = accounts;
180        self
181    }
182
183    pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
184        self.additional_objects.extend(objects);
185        self
186    }
187
188    pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
189        self.fullnode_count = fullnode_count;
190        self
191    }
192
193    pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
194        assert!(self.fullnode_rpc_addr.is_none());
195        self.fullnode_rpc_port = Some(fullnode_rpc_port);
196        self
197    }
198
199    pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
200        assert!(self.fullnode_rpc_port.is_none());
201        self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
202        self
203    }
204
205    pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
206        self.get_or_init_genesis_config()
207            .parameters
208            .epoch_duration_ms = epoch_duration_ms;
209        self
210    }
211
212    pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
213        self.get_or_init_genesis_config()
214            .parameters
215            .protocol_version = v;
216        self
217    }
218
219    pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
220        self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
221        self
222    }
223
224    pub fn with_supported_protocol_version_callback(
225        mut self,
226        func: SupportedProtocolVersionsCallback,
227    ) -> Self {
228        self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
229        self
230    }
231
232    pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
233        self.supported_protocol_versions_config = c;
234        self
235    }
236
237    pub fn with_state_accumulator_config(mut self, c: StateAccumulatorV1EnabledConfig) -> Self {
238        self.state_accumulator_config = c;
239        self
240    }
241
242    pub fn with_fullnode_supported_protocol_versions_config(
243        mut self,
244        c: ProtocolVersionsConfig,
245    ) -> Self {
246        self.fullnode_supported_protocol_versions_config = Some(c);
247        self
248    }
249
250    pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
251        self.db_checkpoint_config = db_checkpoint_config;
252        self
253    }
254
255    pub fn with_authority_overload_config(
256        mut self,
257        authority_overload_config: AuthorityOverloadConfig,
258    ) -> Self {
259        assert!(self.network_config.is_none());
260        self.authority_overload_config = Some(authority_overload_config);
261        self
262    }
263
264    pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
265        self.data_ingestion_dir = Some(path);
266        self
267    }
268
269    pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
270        if let Some(run_with_range) = run_with_range {
271            self.fullnode_run_with_range = Some(run_with_range);
272        }
273        self
274    }
275
276    pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
277        self.fullnode_policy_config = config;
278        self
279    }
280
281    pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
282        self.fullnode_fw_config = config;
283        self
284    }
285
286    fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
287        if self.genesis_config.is_none() {
288            assert!(self.network_config.is_none());
289            self.genesis_config = Some(GenesisConfig::for_local_testing());
290        }
291        self.genesis_config.as_mut().unwrap()
292    }
293
294    pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
295        self.max_submit_position = Some(max_submit_position);
296        self
297    }
298
299    pub fn with_submit_delay_step_override_millis(
300        mut self,
301        submit_delay_step_override_millis: u64,
302    ) -> Self {
303        self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
304        self
305    }
306}
307
308impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
309    /// Create the configured Swarm.
310    pub fn build(self) -> Swarm {
311        let dir = if let Some(dir) = self.dir {
312            SwarmDirectory::Persistent(dir)
313        } else {
314            SwarmDirectory::new_temporary()
315        };
316
317        let ingest_data = self.data_ingestion_dir.clone();
318
319        let network_config = self.network_config.unwrap_or_else(|| {
320            let mut config_builder = ConfigBuilder::new(dir.as_ref());
321
322            if let Some(genesis_config) = self.genesis_config {
323                config_builder = config_builder.with_genesis_config(genesis_config);
324            }
325
326            if let Some(num_unpruned_validators) = self.num_unpruned_validators {
327                config_builder =
328                    config_builder.with_num_unpruned_validators(num_unpruned_validators);
329            }
330
331            if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
332                config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
333            }
334
335            if let Some(authority_overload_config) = self.authority_overload_config {
336                config_builder =
337                    config_builder.with_authority_overload_config(authority_overload_config);
338            }
339
340            if let Some(path) = self.data_ingestion_dir {
341                config_builder = config_builder.with_data_ingestion_dir(path);
342            }
343
344            if let Some(max_submit_position) = self.max_submit_position {
345                config_builder = config_builder.with_max_submit_position(max_submit_position);
346            }
347
348            if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
349            {
350                config_builder = config_builder
351                    .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
352            }
353
354            let mut network_config = config_builder
355                .committee(self.committee)
356                .rng(self.rng)
357                .with_objects(self.additional_objects)
358                .with_empty_validator_genesis()
359                .with_supported_protocol_versions_config(
360                    self.supported_protocol_versions_config.clone(),
361                )
362                .with_state_accumulator_config(self.state_accumulator_config.clone())
363                .build();
364            // Populate validator genesis by pointing to the blob
365            let genesis_path = dir.join(IOTA_GENESIS_FILENAME);
366            network_config
367                .genesis
368                .save(&genesis_path)
369                .expect("genesis should be saved successfully");
370            for validator in &mut network_config.validator_configs {
371                validator.genesis = iota_config::node::Genesis::new_from_file(&genesis_path);
372            }
373            network_config
374        });
375
376        let mut nodes: HashMap<_, _> = network_config
377            .validator_configs()
378            .iter()
379            .map(|config| {
380                info!(
381                    "SwarmBuilder configuring validator with name {}",
382                    config.authority_public_key()
383                );
384                (config.authority_public_key(), Node::new(config.to_owned()))
385            })
386            .collect();
387
388        let mut fullnode_config_builder = FullnodeConfigBuilder::new()
389            .with_config_directory(dir.as_ref().into())
390            .with_db_checkpoint_config(self.db_checkpoint_config.clone())
391            .with_run_with_range(self.fullnode_run_with_range)
392            .with_policy_config(self.fullnode_policy_config)
393            .with_data_ingestion_dir(ingest_data)
394            .with_fw_config(self.fullnode_fw_config);
395
396        if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
397            let supported_versions = match spvc {
398                ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
399                ProtocolVersionsConfig::Global(v) => *v,
400                ProtocolVersionsConfig::PerValidator(func) => func(0, None),
401            };
402            fullnode_config_builder =
403                fullnode_config_builder.with_supported_protocol_versions(supported_versions);
404        }
405
406        if self.fullnode_count > 0 {
407            (0..self.fullnode_count).for_each(|idx| {
408                let mut builder = fullnode_config_builder.clone();
409                if idx == 0 {
410                    // Only the first fullnode is used as the rpc fullnode, we can only use the
411                    // same address once.
412                    if let Some(rpc_addr) = self.fullnode_rpc_addr {
413                        builder = builder.with_rpc_addr(rpc_addr);
414                    }
415                    if let Some(rpc_port) = self.fullnode_rpc_port {
416                        builder = builder.with_rpc_port(rpc_port);
417                    }
418                }
419                let config = builder.build(&mut OsRng, &network_config);
420                info!(
421                    "SwarmBuilder configuring full node with name {}",
422                    config.authority_public_key()
423                );
424                nodes.insert(config.authority_public_key(), Node::new(config));
425            });
426        }
427        Swarm {
428            dir,
429            network_config,
430            nodes,
431            fullnode_config_builder,
432        }
433    }
434}
435
436/// A handle to an in-memory IOTA Network.
437#[derive(Debug)]
438pub struct Swarm {
439    dir: SwarmDirectory,
440    network_config: NetworkConfig,
441    nodes: HashMap<AuthorityName, Node>,
442    // Save a copy of the fullnode config builder to build future fullnodes.
443    fullnode_config_builder: FullnodeConfigBuilder,
444}
445
446impl Drop for Swarm {
447    fn drop(&mut self) {
448        self.nodes_iter_mut().for_each(|node| node.stop());
449    }
450}
451
452impl Swarm {
453    fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
454        self.nodes.values_mut()
455    }
456
457    /// Return a new Builder
458    pub fn builder() -> SwarmBuilder {
459        SwarmBuilder::new()
460    }
461
462    /// Start all nodes associated with this Swarm
463    pub async fn launch(&mut self) -> Result<()> {
464        try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
465        tracing::info!("Successfully launched Swarm");
466        Ok(())
467    }
468
469    /// Return the path to the directory where this Swarm's on-disk data is
470    /// kept.
471    pub fn dir(&self) -> &Path {
472        self.dir.as_ref()
473    }
474
475    /// Return a reference to this Swarm's `NetworkConfig`.
476    pub fn config(&self) -> &NetworkConfig {
477        &self.network_config
478    }
479
480    /// Return a mutable reference to this Swarm's `NetworkConfig`.
481    // TODO: It's not ideal to mutate network config. We should consider removing
482    // this.
483    pub fn config_mut(&mut self) -> &mut NetworkConfig {
484        &mut self.network_config
485    }
486
487    pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
488        self.nodes.values()
489    }
490
491    pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
492        self.nodes.get(name)
493    }
494
495    pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
496        self.nodes.get_mut(name)
497    }
498
499    /// Return an iterator over shared references of all nodes that are set up
500    /// as validators. This means that they have a consensus config. This
501    /// however doesn't mean this validator is currently active (i.e. it's
502    /// not necessarily in the validator set at the moment).
503    pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
504        self.nodes
505            .values()
506            .filter(|node| node.config().consensus_config.is_some())
507    }
508
509    pub fn validator_node_handles(&self) -> Vec<IotaNodeHandle> {
510        self.validator_nodes()
511            .map(|node| node.get_node_handle().unwrap())
512            .collect()
513    }
514
515    /// Returns an iterator over all currently active validators.
516    pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
517        self.validator_nodes().filter(|node| {
518            node.get_node_handle().is_some_and(|handle| {
519                let state = handle.state();
520                state.is_validator(&state.epoch_store_for_testing())
521            })
522        })
523    }
524
525    /// Return an iterator over shared references of all Fullnodes.
526    pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
527        self.nodes
528            .values()
529            .filter(|node| node.config().consensus_config.is_none())
530    }
531
532    pub async fn spawn_new_node(&mut self, config: NodeConfig) -> IotaNodeHandle {
533        let name = config.authority_public_key();
534        let node = Node::new(config);
535        node.start().await.unwrap();
536        let handle = node.get_node_handle().unwrap();
537        self.nodes.insert(name, node);
538        handle
539    }
540
541    pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
542        self.fullnode_config_builder.clone()
543    }
544}
545
546#[derive(Debug)]
547enum SwarmDirectory {
548    Persistent(PathBuf),
549    Temporary(TempDir),
550}
551
552impl SwarmDirectory {
553    fn new_temporary() -> Self {
554        SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
555    }
556}
557
558impl ops::Deref for SwarmDirectory {
559    type Target = Path;
560
561    fn deref(&self) -> &Self::Target {
562        match self {
563            SwarmDirectory::Persistent(dir) => dir.deref(),
564            SwarmDirectory::Temporary(dir) => dir.path(),
565        }
566    }
567}
568
569impl AsRef<Path> for SwarmDirectory {
570    fn as_ref(&self) -> &Path {
571        match self {
572            SwarmDirectory::Persistent(dir) => dir.as_ref(),
573            SwarmDirectory::Temporary(dir) => dir.as_ref(),
574        }
575    }
576}
577
578#[cfg(test)]
579mod test {
580    use std::num::NonZeroUsize;
581
582    use super::Swarm;
583
584    #[tokio::test]
585    async fn launch() {
586        telemetry_subscribers::init_for_testing();
587        let mut swarm = Swarm::builder()
588            .committee_size(NonZeroUsize::new(4).unwrap())
589            .with_fullnode_count(1)
590            .build();
591
592        swarm.launch().await.unwrap();
593
594        for validator in swarm.validator_nodes() {
595            validator.health_check(true).await.unwrap();
596        }
597
598        for fullnode in swarm.fullnodes() {
599            fullnode.health_check(false).await.unwrap();
600        }
601
602        println!("hello");
603    }
604}