iota_swarm/memory/
container.rs1use std::{
6 sync::{Arc, Weak},
7 thread,
8};
9
10use futures::FutureExt;
11use iota_config::NodeConfig;
12use iota_node::{IotaNode, IotaNodeHandle};
13use iota_types::{
14 base_types::ConciseableName,
15 crypto::{AuthorityPublicKeyBytes, KeypairTraits},
16};
17use telemetry_subscribers::get_global_telemetry_config;
18use tracing::{info, trace};
19
20use super::node::RuntimeType;
21
22#[derive(Debug)]
23pub(crate) struct Container {
24 join_handle: Option<thread::JoinHandle<()>>,
25 cancel_sender: Option<tokio::sync::oneshot::Sender<()>>,
26 node: Weak<IotaNode>,
27}
28
29impl Drop for Container {
32 fn drop(&mut self) {
33 trace!("dropping Container");
34
35 let thread = self.join_handle.take().unwrap();
36
37 let cancel_handle = self.cancel_sender.take().unwrap();
38
39 let _ = cancel_handle.send(());
41
42 thread.join().unwrap();
44
45 trace!("finished dropping Container");
46 }
47}
48
49impl Container {
50 pub async fn spawn(config: NodeConfig, runtime: RuntimeType) -> Self {
52 let (startup_sender, startup_receiver) = tokio::sync::oneshot::channel();
53 let (cancel_sender, cancel_receiver) = tokio::sync::oneshot::channel();
54 let name = AuthorityPublicKeyBytes::from(config.authority_key_pair().public())
55 .concise()
56 .to_string();
57
58 let thread = thread::Builder::new().name(name).spawn(move || {
59 let span = if get_global_telemetry_config()
60 .map(|c| c.enable_otlp_tracing)
61 .unwrap_or(false)
62 {
63 None
65 } else {
66 Some(tracing::span!(
67 tracing::Level::INFO,
68 "node",
69 name =% AuthorityPublicKeyBytes::from(config.authority_key_pair().public()).concise(),
70 ))
71 };
72
73 let _guard = span.as_ref().map(|span| span.enter());
74
75 let mut builder = match runtime {
76 RuntimeType::SingleThreaded => tokio::runtime::Builder::new_current_thread(),
77 RuntimeType::MultiThreaded => {
78 thread_local! {
79 static SPAN: std::cell::RefCell<Option<tracing::span::EnteredSpan>> =
80 const { std::cell::RefCell::new(None) };
81 }
82 let mut builder = tokio::runtime::Builder::new_multi_thread();
83 let span = span.clone();
84 builder
85 .on_thread_start(move || {
86 SPAN.with(|maybe_entered_span| {
87 if let Some(span) = &span {
88 *maybe_entered_span.borrow_mut() = Some(span.clone().entered());
89 }
90 });
91 })
92 .on_thread_stop(|| {
93 SPAN.with(|maybe_entered_span| {
94 maybe_entered_span.borrow_mut().take();
95 });
96 });
97
98 builder
99 }
100 };
101 let runtime = builder.enable_all().build().unwrap();
102
103 runtime.block_on(async move {
104 let registry_service = iota_metrics::start_prometheus_server(config.metrics_address);
105 info!(
106 "Started Prometheus HTTP endpoint. To query metrics use\n\tcurl -s http://{}/metrics",
107 config.metrics_address
108 );
109 let server = IotaNode::start(config, registry_service, None).await.unwrap();
110 let _ = startup_sender.send(Arc::downgrade(&server));
112 cancel_receiver.map(|_| ()).await;
114
115 trace!("cancellation received; shutting down thread");
116 });
117 }).unwrap();
118
119 let node = startup_receiver.await.unwrap();
120
121 Self {
122 join_handle: Some(thread),
123 cancel_sender: Some(cancel_sender),
124 node,
125 }
126 }
127
128 pub fn get_node_handle(&self) -> Option<IotaNodeHandle> {
130 Some(IotaNodeHandle::new(self.node.upgrade()?))
131 }
132
133 pub fn is_alive(&self) -> bool {
139 if let Some(cancel_sender) = &self.cancel_sender {
140 !cancel_sender.is_closed()
141 } else {
142 false
143 }
144 }
145}