iota_grpc_api/
server.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! Shared gRPC server utilities
5
6use std::{net::SocketAddr, sync::Arc};
7
8use anyhow::Result;
9use tokio::sync::broadcast;
10use tokio_stream::wrappers::TcpListenerStream;
11use tokio_util::sync::CancellationToken;
12use tonic::transport::Server;
13
14use crate::{
15    CheckpointGrpcService, EventGrpcService, GrpcCheckpointDataBroadcaster,
16    GrpcCheckpointSummaryBroadcaster, GrpcReader,
17    checkpoint::checkpoint_service_server::CheckpointServiceServer,
18    events::event_service_server::EventServiceServer,
19};
20
21/// Handle to control a running gRPC server
22pub struct GrpcServerHandle {
23    /// Handle to the server task
24    pub server_handle: tokio::task::JoinHandle<Result<(), tonic::transport::Error>>,
25    /// Shutdown signal sender
26    shutdown_token: CancellationToken,
27    /// Broadcaster for checkpoint summaries
28    pub checkpoint_summary_broadcaster: GrpcCheckpointSummaryBroadcaster,
29    /// Broadcaster for checkpoint data
30    pub checkpoint_data_broadcaster: GrpcCheckpointDataBroadcaster,
31    /// Actual server address (with resolved port)
32    pub address: SocketAddr,
33}
34
35impl GrpcServerHandle {
36    /// Graceful shutdown of the gRPC server
37    pub async fn shutdown(self) -> Result<()> {
38        self.shutdown_token.cancel();
39        match self.server_handle.await {
40            Ok(result) => result.map_err(Into::into),
41            Err(join_error) => Err(anyhow::anyhow!("Server task failed: {join_error}")),
42        }
43    }
44
45    /// Get the server address (actual bound address)
46    pub fn address(&self) -> SocketAddr {
47        self.address
48    }
49
50    /// Get a reference to the checkpoint summary broadcaster
51    pub fn checkpoint_summary_broadcaster(&self) -> &GrpcCheckpointSummaryBroadcaster {
52        &self.checkpoint_summary_broadcaster
53    }
54
55    /// Get a reference to the checkpoint data broadcaster
56    pub fn checkpoint_data_broadcaster(&self) -> &GrpcCheckpointDataBroadcaster {
57        &self.checkpoint_data_broadcaster
58    }
59}
60
61/// Start a gRPC server with checkpoint and event services
62///
63/// This function creates and starts a gRPC server that hosts checkpoint-related
64/// and event streaming services. Currently includes the checkpoint streaming
65/// and event streaming services, but can be extended to host additional
66/// services in the future.
67pub async fn start_grpc_server(
68    grpc_reader: Arc<GrpcReader>,
69    event_subscriber: Arc<dyn crate::EventSubscriber>,
70    config: iota_config::node::GrpcApiConfig,
71    shutdown_token: CancellationToken,
72) -> Result<GrpcServerHandle> {
73    // Create broadcast channels
74    let (checkpoint_summary_tx, _) = broadcast::channel(config.checkpoint_broadcast_buffer_size);
75    let (checkpoint_data_tx, _) = broadcast::channel(config.checkpoint_broadcast_buffer_size);
76
77    // Create broadcasters
78    let checkpoint_summary_broadcaster =
79        GrpcCheckpointSummaryBroadcaster::new(checkpoint_summary_tx);
80    let checkpoint_data_broadcaster = GrpcCheckpointDataBroadcaster::new(checkpoint_data_tx);
81
82    // Create the gRPC services - both get the cancellation token directly from
83    // server level
84    let checkpoint_service = CheckpointGrpcService::new(
85        grpc_reader.clone(),
86        checkpoint_summary_broadcaster.clone(),
87        checkpoint_data_broadcaster.clone(),
88        shutdown_token.clone(),
89    );
90    let event_service = EventGrpcService::new(event_subscriber, shutdown_token.clone());
91
92    // Create the server with proper address binding
93    let server_builder = Server::builder()
94        .add_service(CheckpointServiceServer::new(checkpoint_service))
95        .add_service(EventServiceServer::new(event_service));
96
97    // Bind to the address to get the actual local address (especially important for
98    // port 0)
99    let listener = tokio::net::TcpListener::bind(config.address).await?;
100    let actual_addr = listener.local_addr().unwrap_or(config.address);
101
102    tracing::info!(
103        "Starting gRPC server on {} (bound to {})",
104        config.address,
105        actual_addr
106    );
107
108    // Spawn the server task with graceful shutdown
109    let shutdown_token_for_server = shutdown_token.clone();
110    let server_handle = tokio::spawn(async move {
111        let result = server_builder
112            .serve_with_incoming_shutdown(
113                TcpListenerStream::new(listener),
114                shutdown_token_for_server.cancelled(),
115            )
116            .await;
117
118        tracing::info!("gRPC server shutdown completed");
119        result
120    });
121
122    Ok(GrpcServerHandle {
123        server_handle,
124        shutdown_token,
125        checkpoint_summary_broadcaster,
126        checkpoint_data_broadcaster,
127        address: actual_addr,
128    })
129}