1use std::{net::SocketAddr, sync::Arc};
7
8use anyhow::Result;
9use tokio::sync::broadcast;
10use tokio_stream::wrappers::TcpListenerStream;
11use tokio_util::sync::CancellationToken;
12use tonic::transport::Server;
13
14use crate::{
15 CheckpointGrpcService, EventGrpcService, GrpcCheckpointDataBroadcaster,
16 GrpcCheckpointSummaryBroadcaster, GrpcReader,
17 checkpoint::checkpoint_service_server::CheckpointServiceServer,
18 events::event_service_server::EventServiceServer,
19};
20
21pub struct GrpcServerHandle {
23 pub server_handle: tokio::task::JoinHandle<Result<(), tonic::transport::Error>>,
25 shutdown_token: CancellationToken,
27 pub checkpoint_summary_broadcaster: GrpcCheckpointSummaryBroadcaster,
29 pub checkpoint_data_broadcaster: GrpcCheckpointDataBroadcaster,
31 pub address: SocketAddr,
33}
34
35impl GrpcServerHandle {
36 pub async fn shutdown(self) -> Result<()> {
38 self.shutdown_token.cancel();
39 match self.server_handle.await {
40 Ok(result) => result.map_err(Into::into),
41 Err(join_error) => Err(anyhow::anyhow!("Server task failed: {join_error}")),
42 }
43 }
44
45 pub fn address(&self) -> SocketAddr {
47 self.address
48 }
49
50 pub fn checkpoint_summary_broadcaster(&self) -> &GrpcCheckpointSummaryBroadcaster {
52 &self.checkpoint_summary_broadcaster
53 }
54
55 pub fn checkpoint_data_broadcaster(&self) -> &GrpcCheckpointDataBroadcaster {
57 &self.checkpoint_data_broadcaster
58 }
59}
60
61pub async fn start_grpc_server(
68 grpc_reader: Arc<GrpcReader>,
69 event_subscriber: Arc<dyn crate::EventSubscriber>,
70 config: iota_config::node::GrpcApiConfig,
71 shutdown_token: CancellationToken,
72) -> Result<GrpcServerHandle> {
73 let (checkpoint_summary_tx, _) = broadcast::channel(config.checkpoint_broadcast_buffer_size);
75 let (checkpoint_data_tx, _) = broadcast::channel(config.checkpoint_broadcast_buffer_size);
76
77 let checkpoint_summary_broadcaster =
79 GrpcCheckpointSummaryBroadcaster::new(checkpoint_summary_tx);
80 let checkpoint_data_broadcaster = GrpcCheckpointDataBroadcaster::new(checkpoint_data_tx);
81
82 let checkpoint_service = CheckpointGrpcService::new(
85 grpc_reader.clone(),
86 checkpoint_summary_broadcaster.clone(),
87 checkpoint_data_broadcaster.clone(),
88 shutdown_token.clone(),
89 );
90 let event_service = EventGrpcService::new(event_subscriber, shutdown_token.clone());
91
92 let server_builder = Server::builder()
94 .add_service(CheckpointServiceServer::new(checkpoint_service))
95 .add_service(EventServiceServer::new(event_service));
96
97 let listener = tokio::net::TcpListener::bind(config.address).await?;
100 let actual_addr = listener.local_addr().unwrap_or(config.address);
101
102 tracing::info!(
103 "Starting gRPC server on {} (bound to {})",
104 config.address,
105 actual_addr
106 );
107
108 let shutdown_token_for_server = shutdown_token.clone();
110 let server_handle = tokio::spawn(async move {
111 let result = server_builder
112 .serve_with_incoming_shutdown(
113 TcpListenerStream::new(listener),
114 shutdown_token_for_server.cancelled(),
115 )
116 .await;
117
118 tracing::info!("gRPC server shutdown completed");
119 result
120 });
121
122 Ok(GrpcServerHandle {
123 server_handle,
124 shutdown_token,
125 checkpoint_summary_broadcaster,
126 checkpoint_data_broadcaster,
127 address: actual_addr,
128 })
129}