iota_network_stack/callback/
body.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    fmt,
7    pin::Pin,
8    task::{Context, Poll, ready},
9};
10
11use http_body::{Body, Frame};
12use pin_project_lite::pin_project;
13
14use super::ResponseHandler;
15
16pin_project! {
17    /// Response body for [`Callback`].
18    ///
19    /// [`Callback`]: super::Callback
20    pub struct ResponseBody<B, ResponseHandler> {
21        #[pin]
22        pub(crate) inner: B,
23        pub(crate) handler: ResponseHandler,
24    }
25}
26
27impl<B, ResponseHandlerT> Body for ResponseBody<B, ResponseHandlerT>
28where
29    B: Body,
30    B::Error: fmt::Display + 'static,
31    ResponseHandlerT: ResponseHandler,
32{
33    type Data = B::Data;
34    type Error = B::Error;
35
36    fn poll_frame(
37        self: Pin<&mut Self>,
38        cx: &mut Context<'_>,
39    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
40        let this = self.project();
41        let result = ready!(this.inner.poll_frame(cx));
42
43        match result {
44            Some(Ok(frame)) => {
45                let frame = match frame.into_data() {
46                    Ok(chunk) => {
47                        this.handler.on_body_chunk(&chunk);
48                        Frame::data(chunk)
49                    }
50                    Err(frame) => frame,
51                };
52
53                let frame = match frame.into_trailers() {
54                    Ok(trailers) => {
55                        this.handler.on_end_of_stream(Some(&trailers));
56                        Frame::trailers(trailers)
57                    }
58                    Err(frame) => frame,
59                };
60
61                Poll::Ready(Some(Ok(frame)))
62            }
63            Some(Err(err)) => {
64                this.handler.on_error(&err);
65
66                Poll::Ready(Some(Err(err)))
67            }
68            None => {
69                this.handler.on_end_of_stream(None);
70
71                Poll::Ready(None)
72            }
73        }
74    }
75
76    fn is_end_stream(&self) -> bool {
77        self.inner.is_end_stream()
78    }
79
80    fn size_hint(&self) -> http_body::SizeHint {
81        self.inner.size_hint()
82    }
83}