iota_swarm/memory/
container.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    sync::{Arc, Weak},
7    thread,
8};
9
10use futures::FutureExt;
11use iota_config::NodeConfig;
12use iota_node::{IotaNode, IotaNodeHandle};
13use iota_types::{
14    base_types::ConciseableName,
15    crypto::{AuthorityPublicKeyBytes, KeypairTraits},
16};
17use telemetry_subscribers::get_global_telemetry_config;
18use tracing::{info, trace};
19
20use super::node::RuntimeType;
21
22#[derive(Debug)]
23pub(crate) struct Container {
24    join_handle: Option<thread::JoinHandle<()>>,
25    cancel_sender: Option<tokio::sync::oneshot::Sender<()>>,
26    node: Weak<IotaNode>,
27}
28
29/// When dropped, stop and wait for the node running in this Container to
30/// completely shutdown.
31impl Drop for Container {
32    fn drop(&mut self) {
33        trace!("dropping Container");
34
35        let thread = self.join_handle.take().unwrap();
36
37        let cancel_handle = self.cancel_sender.take().unwrap();
38
39        // Notify the thread to shutdown
40        let _ = cancel_handle.send(());
41
42        // Wait for the thread to join
43        thread.join().unwrap();
44
45        trace!("finished dropping Container");
46    }
47}
48
49impl Container {
50    /// Spawn a new Node.
51    pub async fn spawn(config: NodeConfig, runtime: RuntimeType) -> Self {
52        let (startup_sender, startup_receiver) = tokio::sync::oneshot::channel();
53        let (cancel_sender, cancel_receiver) = tokio::sync::oneshot::channel();
54        let name = AuthorityPublicKeyBytes::from(config.authority_key_pair().public())
55            .concise()
56            .to_string();
57
58        let thread = thread::Builder::new().name(name).spawn(move || {
59            let span = if get_global_telemetry_config()
60                .map(|c| c.enable_otlp_tracing)
61                .unwrap_or(false)
62            {
63                // we cannot have long-lived root spans when exporting trace data to otlp
64                None
65            } else {
66                Some(tracing::span!(
67                    tracing::Level::INFO,
68                    "node",
69                    name =% AuthorityPublicKeyBytes::from(config.authority_key_pair().public()).concise(),
70                ))
71            };
72
73            let _guard = span.as_ref().map(|span| span.enter());
74
75            let mut builder = match runtime {
76                RuntimeType::SingleThreaded => tokio::runtime::Builder::new_current_thread(),
77                RuntimeType::MultiThreaded => {
78                    thread_local! {
79                        static SPAN: std::cell::RefCell<Option<tracing::span::EnteredSpan>> =
80                            const { std::cell::RefCell::new(None) };
81                    }
82                    let mut builder = tokio::runtime::Builder::new_multi_thread();
83                    let span = span.clone();
84                    builder
85                        .on_thread_start(move || {
86                            SPAN.with(|maybe_entered_span| {
87                                if let Some(span) = &span {
88                                    *maybe_entered_span.borrow_mut() = Some(span.clone().entered());
89                                }
90                            });
91                        })
92                        .on_thread_stop(|| {
93                            SPAN.with(|maybe_entered_span| {
94                                maybe_entered_span.borrow_mut().take();
95                            });
96                        });
97
98                    builder
99                }
100            };
101            let runtime = builder.enable_all().build().unwrap();
102
103            runtime.block_on(async move {
104                let registry_service = iota_metrics::start_prometheus_server(config.metrics_address);
105                info!(
106                    "Started Prometheus HTTP endpoint. To query metrics use\n\tcurl -s http://{}/metrics",
107                    config.metrics_address
108                );
109                let server = IotaNode::start(config, registry_service, None).await.unwrap();
110                // Notify that we've successfully started the node
111                let _ = startup_sender.send(Arc::downgrade(&server));
112                // run until canceled
113                cancel_receiver.map(|_| ()).await;
114
115                trace!("cancellation received; shutting down thread");
116            });
117        }).unwrap();
118
119        let node = startup_receiver.await.unwrap();
120
121        Self {
122            join_handle: Some(thread),
123            cancel_sender: Some(cancel_sender),
124            node,
125        }
126    }
127
128    /// Get a IotaNodeHandle to the node owned by the container.
129    pub fn get_node_handle(&self) -> Option<IotaNodeHandle> {
130        Some(IotaNodeHandle::new(self.node.upgrade()?))
131    }
132
133    /// Check to see that the Node is still alive by checking if the receiving
134    /// side of the `cancel_sender` has been dropped.
135    // TODO When we move to rust 1.61 we should also use
136    // https://doc.rust-lang.org/stable/std/thread/struct.JoinHandle.html#method.is_finished
137    // in order to check if the thread has finished.
138    pub fn is_alive(&self) -> bool {
139        if let Some(cancel_sender) = &self.cancel_sender {
140            !cancel_sender.is_closed()
141        } else {
142            false
143        }
144    }
145}