iota_swarm/memory/
node.rs1use std::sync::{Mutex, MutexGuard};
6
7use anyhow::{Result, anyhow};
8use iota_config::NodeConfig;
9use iota_node::IotaNodeHandle;
10use iota_types::{
11 base_types::{AuthorityName, ConciseableName},
12 crypto::KeypairTraits,
13};
14use tap::TapFallible;
15use tracing::{error, info};
16
17use super::container::Container;
18
19#[derive(Debug)]
27pub struct Node {
28 container: Mutex<Option<Container>>,
29 config: Mutex<NodeConfig>,
30 runtime_type: RuntimeType,
31}
32
33impl Node {
34 pub fn new(config: NodeConfig) -> Self {
41 Self {
42 container: Default::default(),
43 config: config.into(),
44 runtime_type: RuntimeType::SingleThreaded,
45 }
46 }
47
48 pub fn name(&self) -> AuthorityName {
50 self.config().authority_public_key()
51 }
52
53 pub fn config(&self) -> MutexGuard<'_, NodeConfig> {
54 self.config.lock().unwrap()
55 }
56
57 pub fn json_rpc_address(&self) -> std::net::SocketAddr {
58 self.config().json_rpc_address
59 }
60
61 pub async fn spawn(&self) -> Result<()> {
63 info!(name =% self.name().concise(), "starting in-memory node");
64 let config = self.config().clone();
65 *self.container.lock().unwrap() = Some(Container::spawn(config, self.runtime_type).await);
66 Ok(())
67 }
68
69 pub async fn start(&self) -> Result<()> {
71 self.spawn().await
72 }
73
74 pub fn stop(&self) {
76 info!(name =% self.name().concise(), "stopping in-memory node");
77 *self.container.lock().unwrap() = None;
78 info!(name =% self.name().concise(), "node stopped");
79 }
80
81 pub fn is_running(&self) -> bool {
83 self.container
84 .lock()
85 .unwrap()
86 .as_ref()
87 .is_some_and(|c| c.is_alive())
88 }
89
90 pub fn get_node_handle(&self) -> Option<IotaNodeHandle> {
91 self.container
92 .lock()
93 .unwrap()
94 .as_ref()
95 .and_then(|c| c.get_node_handle())
96 }
97
98 pub async fn health_check(&self, is_validator: bool) -> Result<(), HealthCheckError> {
102 {
103 let lock = self.container.lock().unwrap();
104 let container = lock.as_ref().ok_or(HealthCheckError::NotRunning)?;
105 if !container.is_alive() {
106 return Err(HealthCheckError::NotRunning);
107 }
108 }
109
110 if is_validator {
111 let network_address = self.config().network_address().clone();
112 let tls_config = iota_tls::create_rustls_client_config(
113 self.config().network_key_pair().public().to_owned(),
114 iota_tls::IOTA_VALIDATOR_SERVER_NAME.to_string(),
115 None,
116 );
117 let channel = iota_network_stack::client::connect(&network_address, Some(tls_config))
118 .await
119 .map_err(|err| anyhow!(err.to_string()))
120 .map_err(HealthCheckError::Failure)
121 .tap_err(|e| error!("error connecting to {}: {e}", self.name().concise()))?;
122
123 let mut client = tonic_health::pb::health_client::HealthClient::new(channel);
124 client
125 .check(tonic_health::pb::HealthCheckRequest::default())
126 .await
127 .map_err(|e| HealthCheckError::Failure(e.into()))
128 .tap_err(|e| {
129 error!(
130 "error performing health check on {}: {e}",
131 self.name().concise()
132 )
133 })?;
134 }
135
136 Ok(())
137 }
138}
139
140#[derive(Debug)]
141pub enum HealthCheckError {
142 NotRunning,
143 Failure(anyhow::Error),
144 Unknown(anyhow::Error),
145}
146
147impl std::fmt::Display for HealthCheckError {
148 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
149 write!(f, "{self:?}")
150 }
151}
152
153impl std::error::Error for HealthCheckError {}
154
155#[derive(Clone, Copy, Debug)]
157pub enum RuntimeType {
158 SingleThreaded,
159 MultiThreaded,
160}
161
162#[cfg(test)]
163mod test {
164 use crate::memory::Swarm;
165
166 #[tokio::test]
167 async fn start_and_stop() {
168 telemetry_subscribers::init_for_testing();
169 let swarm = Swarm::builder().build();
170
171 let validator = swarm.validator_nodes().next().unwrap();
172
173 validator.start().await.unwrap();
174 validator.health_check(true).await.unwrap();
175 validator.stop();
176 validator.health_check(true).await.unwrap_err();
177
178 validator.start().await.unwrap();
179 validator.health_check(true).await.unwrap();
180 }
181}