iota_swarm/memory/
node.rs1use std::sync::{Mutex, MutexGuard};
6
7use anyhow::{Result, anyhow};
8use iota_config::NodeConfig;
9use iota_node::IotaNodeHandle;
10use iota_types::base_types::{AuthorityName, ConciseableName};
11use tap::TapFallible;
12use tracing::{error, info};
13
14use super::container::Container;
15
16#[derive(Debug)]
24pub struct Node {
25 container: Mutex<Option<Container>>,
26 config: Mutex<NodeConfig>,
27 runtime_type: RuntimeType,
28}
29
30impl Node {
31 pub fn new(config: NodeConfig) -> Self {
38 Self {
39 container: Default::default(),
40 config: config.into(),
41 runtime_type: RuntimeType::SingleThreaded,
42 }
43 }
44
45 pub fn name(&self) -> AuthorityName {
47 self.config().authority_public_key()
48 }
49
50 pub fn config(&self) -> MutexGuard<'_, NodeConfig> {
51 self.config.lock().unwrap()
52 }
53
54 pub fn json_rpc_address(&self) -> std::net::SocketAddr {
55 self.config().json_rpc_address
56 }
57
58 pub async fn spawn(&self) -> Result<()> {
60 info!(name =% self.name().concise(), "starting in-memory node");
61 let config = self.config().clone();
62 *self.container.lock().unwrap() = Some(Container::spawn(config, self.runtime_type).await);
63 Ok(())
64 }
65
66 pub async fn start(&self) -> Result<()> {
68 self.spawn().await
69 }
70
71 pub fn stop(&self) {
73 info!(name =% self.name().concise(), "stopping in-memory node");
74 *self.container.lock().unwrap() = None;
75 info!(name =% self.name().concise(), "node stopped");
76 }
77
78 pub fn is_running(&self) -> bool {
80 self.container
81 .lock()
82 .unwrap()
83 .as_ref()
84 .is_some_and(|c| c.is_alive())
85 }
86
87 pub fn get_node_handle(&self) -> Option<IotaNodeHandle> {
88 self.container
89 .lock()
90 .unwrap()
91 .as_ref()
92 .and_then(|c| c.get_node_handle())
93 }
94
95 pub async fn health_check(&self, is_validator: bool) -> Result<(), HealthCheckError> {
99 {
100 let lock = self.container.lock().unwrap();
101 let container = lock.as_ref().ok_or(HealthCheckError::NotRunning)?;
102 if !container.is_alive() {
103 return Err(HealthCheckError::NotRunning);
104 }
105 }
106
107 if is_validator {
108 let network_address = self.config().network_address().clone();
109 let channel = iota_network_stack::client::connect(&network_address)
110 .await
111 .map_err(|err| anyhow!(err.to_string()))
112 .map_err(HealthCheckError::Failure)
113 .tap_err(|e| error!("error connecting to {}: {e}", self.name().concise()))?;
114
115 let mut client = tonic_health::pb::health_client::HealthClient::new(channel);
116 client
117 .check(tonic_health::pb::HealthCheckRequest::default())
118 .await
119 .map_err(|e| HealthCheckError::Failure(e.into()))
120 .tap_err(|e| {
121 error!(
122 "error performing health check on {}: {e}",
123 self.name().concise()
124 )
125 })?;
126 }
127
128 Ok(())
129 }
130}
131
132#[derive(Debug)]
133pub enum HealthCheckError {
134 NotRunning,
135 Failure(anyhow::Error),
136 Unknown(anyhow::Error),
137}
138
139impl std::fmt::Display for HealthCheckError {
140 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
141 write!(f, "{:?}", self)
142 }
143}
144
145impl std::error::Error for HealthCheckError {}
146
147#[derive(Clone, Copy, Debug)]
149pub enum RuntimeType {
150 SingleThreaded,
151 MultiThreaded,
152}
153
154#[cfg(test)]
155mod test {
156 use crate::memory::Swarm;
157
158 #[tokio::test]
159 async fn start_and_stop() {
160 telemetry_subscribers::init_for_testing();
161 let swarm = Swarm::builder().build();
162
163 let validator = swarm.validator_nodes().next().unwrap();
164
165 validator.start().await.unwrap();
166 validator.health_check(true).await.unwrap();
167 validator.stop();
168 validator.health_check(true).await.unwrap_err();
169
170 validator.start().await.unwrap();
171 validator.health_check(true).await.unwrap();
172 }
173}