iota_network_stack/callback/
body.rs1use std::{
6 fmt,
7 pin::Pin,
8 task::{Context, Poll, ready},
9};
10
11use http_body::{Body, Frame};
12use pin_project_lite::pin_project;
13
14use super::ResponseHandler;
15
16pin_project! {
17 pub struct ResponseBody<B, ResponseHandler> {
21 #[pin]
22 pub(crate) inner: B,
23 pub(crate) handler: ResponseHandler,
24 }
25}
26
27impl<B, ResponseHandlerT> Body for ResponseBody<B, ResponseHandlerT>
28where
29 B: Body,
30 B::Error: fmt::Display + 'static,
31 ResponseHandlerT: ResponseHandler,
32{
33 type Data = B::Data;
34 type Error = B::Error;
35
36 fn poll_frame(
37 self: Pin<&mut Self>,
38 cx: &mut Context<'_>,
39 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
40 let this = self.project();
41 let result = ready!(this.inner.poll_frame(cx));
42
43 match result {
44 Some(Ok(frame)) => {
45 let frame = match frame.into_data() {
46 Ok(chunk) => {
47 this.handler.on_body_chunk(&chunk);
48 Frame::data(chunk)
49 }
50 Err(frame) => frame,
51 };
52
53 let frame = match frame.into_trailers() {
54 Ok(trailers) => {
55 this.handler.on_end_of_stream(Some(&trailers));
56 Frame::trailers(trailers)
57 }
58 Err(frame) => frame,
59 };
60
61 Poll::Ready(Some(Ok(frame)))
62 }
63 Some(Err(err)) => {
64 this.handler.on_error(&err);
65
66 Poll::Ready(Some(Err(err)))
67 }
68 None => {
69 this.handler.on_end_of_stream(None);
70
71 Poll::Ready(None)
72 }
73 }
74 }
75
76 fn is_end_stream(&self) -> bool {
77 self.inner.is_end_stream()
78 }
79
80 fn size_hint(&self) -> http_body::SizeHint {
81 self.inner.size_hint()
82 }
83}