iota_swarm/memory/
node.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Mutex, MutexGuard};
6
7use anyhow::{Result, anyhow};
8use iota_config::NodeConfig;
9use iota_node::IotaNodeHandle;
10use iota_types::base_types::{AuthorityName, ConciseableName};
11use tap::TapFallible;
12use tracing::{error, info};
13
14use super::container::Container;
15
16/// A handle to an in-memory IOTA Node.
17///
18/// Each Node is attempted to run in isolation from each other by running them
19/// in their own tokio runtime in a separate thread. By doing this we can ensure
20/// that all asynchronous tasks associated with a Node are able to be stopped
21/// when desired (either when a Node is dropped or explicitly stopped by calling
22/// [`Node::stop`]) by simply dropping that Node's runtime.
23#[derive(Debug)]
24pub struct Node {
25    container: Mutex<Option<Container>>,
26    config: Mutex<NodeConfig>,
27    runtime_type: RuntimeType,
28}
29
30impl Node {
31    /// Create a new Node from the provided `NodeConfig`.
32    ///
33    /// The Node is returned without being started. See [`Node::spawn`] or
34    /// [`Node::start`] for how to start the node.
35    ///
36    /// [`NodeConfig`]: iota_config::NodeConfig
37    pub fn new(config: NodeConfig) -> Self {
38        Self {
39            container: Default::default(),
40            config: config.into(),
41            runtime_type: RuntimeType::SingleThreaded,
42        }
43    }
44
45    /// Return the `name` of this Node
46    pub fn name(&self) -> AuthorityName {
47        self.config().authority_public_key()
48    }
49
50    pub fn config(&self) -> MutexGuard<'_, NodeConfig> {
51        self.config.lock().unwrap()
52    }
53
54    pub fn json_rpc_address(&self) -> std::net::SocketAddr {
55        self.config().json_rpc_address
56    }
57
58    /// Start this Node
59    pub async fn spawn(&self) -> Result<()> {
60        info!(name =% self.name().concise(), "starting in-memory node");
61        let config = self.config().clone();
62        *self.container.lock().unwrap() = Some(Container::spawn(config, self.runtime_type).await);
63        Ok(())
64    }
65
66    /// Start this Node, waiting until its completely started up.
67    pub async fn start(&self) -> Result<()> {
68        self.spawn().await
69    }
70
71    /// Stop this Node
72    pub fn stop(&self) {
73        info!(name =% self.name().concise(), "stopping in-memory node");
74        *self.container.lock().unwrap() = None;
75        info!(name =% self.name().concise(), "node stopped");
76    }
77
78    /// If this Node is currently running
79    pub fn is_running(&self) -> bool {
80        self.container
81            .lock()
82            .unwrap()
83            .as_ref()
84            .is_some_and(|c| c.is_alive())
85    }
86
87    pub fn get_node_handle(&self) -> Option<IotaNodeHandle> {
88        self.container
89            .lock()
90            .unwrap()
91            .as_ref()
92            .and_then(|c| c.get_node_handle())
93    }
94
95    /// Perform a health check on this Node by:
96    /// * Checking that the node is running
97    /// * Calling the Node's gRPC Health service if it's a validator.
98    pub async fn health_check(&self, is_validator: bool) -> Result<(), HealthCheckError> {
99        {
100            let lock = self.container.lock().unwrap();
101            let container = lock.as_ref().ok_or(HealthCheckError::NotRunning)?;
102            if !container.is_alive() {
103                return Err(HealthCheckError::NotRunning);
104            }
105        }
106
107        if is_validator {
108            let network_address = self.config().network_address().clone();
109            let channel = iota_network_stack::client::connect(&network_address)
110                .await
111                .map_err(|err| anyhow!(err.to_string()))
112                .map_err(HealthCheckError::Failure)
113                .tap_err(|e| error!("error connecting to {}: {e}", self.name().concise()))?;
114
115            let mut client = tonic_health::pb::health_client::HealthClient::new(channel);
116            client
117                .check(tonic_health::pb::HealthCheckRequest::default())
118                .await
119                .map_err(|e| HealthCheckError::Failure(e.into()))
120                .tap_err(|e| {
121                    error!(
122                        "error performing health check on {}: {e}",
123                        self.name().concise()
124                    )
125                })?;
126        }
127
128        Ok(())
129    }
130}
131
132#[derive(Debug)]
133pub enum HealthCheckError {
134    NotRunning,
135    Failure(anyhow::Error),
136    Unknown(anyhow::Error),
137}
138
139impl std::fmt::Display for HealthCheckError {
140    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
141        write!(f, "{:?}", self)
142    }
143}
144
145impl std::error::Error for HealthCheckError {}
146
147/// The type of tokio runtime that should be used for a particular Node
148#[derive(Clone, Copy, Debug)]
149pub enum RuntimeType {
150    SingleThreaded,
151    MultiThreaded,
152}
153
154#[cfg(test)]
155mod test {
156    use crate::memory::Swarm;
157
158    #[tokio::test]
159    async fn start_and_stop() {
160        telemetry_subscribers::init_for_testing();
161        let swarm = Swarm::builder().build();
162
163        let validator = swarm.validator_nodes().next().unwrap();
164
165        validator.start().await.unwrap();
166        validator.health_check(true).await.unwrap();
167        validator.stop();
168        validator.health_check(true).await.unwrap_err();
169
170        validator.start().await.unwrap();
171        validator.health_check(true).await.unwrap();
172    }
173}