iota_http/
connection_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{pin::pin, time::Duration};
6
7use http::{Request, Response};
8use tracing::{debug, trace};
9
10use crate::{ActiveConnections, BoxError, ConnectionId, fuse::Fuse};
11
12// This is moved to its own function as a way to get around
13// https://github.com/rust-lang/rust/issues/102211
14pub async fn serve_connection<IO, S, B, C>(
15    hyper_io: IO,
16    hyper_svc: S,
17    builder: hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>,
18    graceful_shutdown_token: tokio_util::sync::CancellationToken,
19    max_connection_age: Option<Duration>,
20    on_connection_close: C,
21) where
22    B: http_body::Body + Send + 'static,
23    B::Data: Send,
24    B::Error: Into<BoxError>,
25    IO: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
26    S: hyper::service::Service<Request<hyper::body::Incoming>, Response = Response<B>> + 'static,
27    S::Future: Send + 'static,
28    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
29{
30    let mut sig = pin!(Fuse::new(graceful_shutdown_token.cancelled_owned()));
31
32    let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_svc));
33
34    let sleep = sleep_or_pending(max_connection_age);
35    tokio::pin!(sleep);
36
37    loop {
38        tokio::select! {
39            _ = &mut sig => {
40                conn.as_mut().graceful_shutdown();
41            }
42            rv = &mut conn => {
43                if let Err(err) = rv {
44                    debug!("failed serving connection: {:#}", err);
45                }
46                break;
47            },
48            _ = &mut sleep  => {
49                conn.as_mut().graceful_shutdown();
50                sleep.set(sleep_or_pending(None));
51            },
52        }
53    }
54
55    trace!("connection closed");
56    drop(on_connection_close);
57}
58
59async fn sleep_or_pending(wait_for: Option<Duration>) {
60    match wait_for {
61        Some(wait) => tokio::time::sleep(wait).await,
62        None => std::future::pending().await,
63    };
64}
65
66pub(crate) struct OnConnectionClose<A> {
67    id: ConnectionId,
68    active_connections: ActiveConnections<A>,
69}
70
71impl<A> OnConnectionClose<A> {
72    pub(crate) fn new(id: ConnectionId, active_connections: ActiveConnections<A>) -> Self {
73        Self {
74            id,
75            active_connections,
76        }
77    }
78}
79
80impl<A> Drop for OnConnectionClose<A> {
81    fn drop(&mut self) {
82        self.active_connections.write().unwrap().remove(&self.id);
83    }
84}