iota_swarm/memory/
node.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Mutex, MutexGuard};
6
7use anyhow::{Result, anyhow};
8use iota_config::NodeConfig;
9use iota_node::IotaNodeHandle;
10use iota_types::{
11    base_types::{AuthorityName, ConciseableName},
12    crypto::KeypairTraits,
13};
14use tap::TapFallible;
15use tracing::{error, info};
16
17use super::container::Container;
18
19/// A handle to an in-memory IOTA Node.
20///
21/// Each Node is attempted to run in isolation from each other by running them
22/// in their own tokio runtime in a separate thread. By doing this we can ensure
23/// that all asynchronous tasks associated with a Node are able to be stopped
24/// when desired (either when a Node is dropped or explicitly stopped by calling
25/// [`Node::stop`]) by simply dropping that Node's runtime.
26#[derive(Debug)]
27pub struct Node {
28    container: Mutex<Option<Container>>,
29    config: Mutex<NodeConfig>,
30    runtime_type: RuntimeType,
31}
32
33impl Node {
34    /// Create a new Node from the provided `NodeConfig`.
35    ///
36    /// The Node is returned without being started. See [`Node::spawn`] or
37    /// [`Node::start`] for how to start the node.
38    ///
39    /// [`NodeConfig`]: iota_config::NodeConfig
40    pub fn new(config: NodeConfig) -> Self {
41        Self {
42            container: Default::default(),
43            config: config.into(),
44            runtime_type: RuntimeType::SingleThreaded,
45        }
46    }
47
48    /// Return the `name` of this Node
49    pub fn name(&self) -> AuthorityName {
50        self.config().authority_public_key()
51    }
52
53    pub fn config(&self) -> MutexGuard<'_, NodeConfig> {
54        self.config.lock().unwrap()
55    }
56
57    pub fn json_rpc_address(&self) -> std::net::SocketAddr {
58        self.config().json_rpc_address
59    }
60
61    /// Start this Node
62    pub async fn spawn(&self) -> Result<()> {
63        info!(name =% self.name().concise(), "starting in-memory node");
64        let config = self.config().clone();
65        *self.container.lock().unwrap() = Some(Container::spawn(config, self.runtime_type).await);
66        Ok(())
67    }
68
69    /// Start this Node, waiting until its completely started up.
70    pub async fn start(&self) -> Result<()> {
71        self.spawn().await
72    }
73
74    /// Stop this Node
75    pub fn stop(&self) {
76        info!(name =% self.name().concise(), "stopping in-memory node");
77        *self.container.lock().unwrap() = None;
78        info!(name =% self.name().concise(), "node stopped");
79    }
80
81    /// If this Node is currently running
82    pub fn is_running(&self) -> bool {
83        self.container
84            .lock()
85            .unwrap()
86            .as_ref()
87            .is_some_and(|c| c.is_alive())
88    }
89
90    pub fn get_node_handle(&self) -> Option<IotaNodeHandle> {
91        self.container
92            .lock()
93            .unwrap()
94            .as_ref()
95            .and_then(|c| c.get_node_handle())
96    }
97
98    /// Perform a health check on this Node by:
99    /// * Checking that the node is running
100    /// * Calling the Node's gRPC Health service if it's a validator.
101    pub async fn health_check(&self, is_validator: bool) -> Result<(), HealthCheckError> {
102        {
103            let lock = self.container.lock().unwrap();
104            let container = lock.as_ref().ok_or(HealthCheckError::NotRunning)?;
105            if !container.is_alive() {
106                return Err(HealthCheckError::NotRunning);
107            }
108        }
109
110        if is_validator {
111            let network_address = self.config().network_address().clone();
112            let tls_config = iota_tls::create_rustls_client_config(
113                self.config().network_key_pair().public().to_owned(),
114                iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
115                None,
116            );
117            let channel = iota_network_stack::client::connect(&network_address, Some(tls_config))
118                .await
119                .map_err(|err| anyhow!(err.to_string()))
120                .map_err(HealthCheckError::Failure)
121                .tap_err(|e| error!("error connecting to {}: {e}", self.name().concise()))?;
122
123            let mut client = tonic_health::pb::health_client::HealthClient::new(channel);
124            client
125                .check(tonic_health::pb::HealthCheckRequest::default())
126                .await
127                .map_err(|e| HealthCheckError::Failure(e.into()))
128                .tap_err(|e| {
129                    error!(
130                        "error performing health check on {}: {e}",
131                        self.name().concise()
132                    )
133                })?;
134        }
135
136        Ok(())
137    }
138}
139
140#[derive(Debug)]
141pub enum HealthCheckError {
142    NotRunning,
143    Failure(anyhow::Error),
144    Unknown(anyhow::Error),
145}
146
147impl std::fmt::Display for HealthCheckError {
148    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
149        write!(f, "{self:?}")
150    }
151}
152
153impl std::error::Error for HealthCheckError {}
154
155/// The type of tokio runtime that should be used for a particular Node
156#[derive(Clone, Copy, Debug)]
157pub enum RuntimeType {
158    SingleThreaded,
159    MultiThreaded,
160}
161
162#[cfg(test)]
163mod test {
164    use crate::memory::Swarm;
165
166    #[tokio::test]
167    async fn start_and_stop() {
168        telemetry_subscribers::init_for_testing();
169        let swarm = Swarm::builder().build();
170
171        let validator = swarm.validator_nodes().next().unwrap();
172
173        validator.start().await.unwrap();
174        validator.health_check(true).await.unwrap();
175        validator.stop();
176        validator.health_check(true).await.unwrap_err();
177
178        validator.start().await.unwrap();
179        validator.health_check(true).await.unwrap();
180    }
181}