datadog_agent_commons/ipc/client/
streaming.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6
7use futures::Stream;
8use pin_project_lite::pin_project;
9use tonic::{Response, Status, Streaming};
10
11pin_project! {
12 #[project = InnerProj]
13 enum Inner<T> {
14 Initial { fut: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
16
17 Streaming { #[pin] stream: Streaming<T> },
19
20 Terminated,
22 }
23}
24
25pin_project! {
26 pub struct StreamingResponse<T> {
37 #[pin]
38 inner: Inner<T>,
39 }
40}
41
42impl<T> StreamingResponse<T> {
43 pub(super) fn from_response_future<F>(fut: F) -> Self
44 where
45 F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
46 {
47 Self {
48 inner: Inner::Initial { fut: Box::pin(fut) },
49 }
50 }
51}
52
53impl<T> Stream for StreamingResponse<T> {
54 type Item = Result<T, Status>;
55
56 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 #[allow(clippy::large_enum_variant)]
61 enum Outcome<T> {
62 Advance(Streaming<T>),
63 Yield(Option<Result<T, Status>>),
64 Terminate(Option<Result<T, Status>>),
65 }
66
67 loop {
68 let mut this = self.as_mut().project();
69 let outcome = match this.inner.as_mut().project() {
70 InnerProj::Initial { fut } => match ready!(fut.as_mut().poll(cx)) {
71 Ok(response) => Outcome::Advance(response.into_inner()),
72 Err(status) => Outcome::Terminate(Some(Err(status))),
73 },
74 InnerProj::Streaming { stream } => match ready!(stream.poll_next(cx)) {
75 Some(maybe_item) => Outcome::Yield(Some(maybe_item)),
76 None => Outcome::Terminate(None),
77 },
78 InnerProj::Terminated => Outcome::Yield(None),
79 };
80
81 match outcome {
82 Outcome::Advance(stream) => {
83 this.inner.set(Inner::Streaming { stream });
84 }
85 Outcome::Yield(item) => return Poll::Ready(item),
86 Outcome::Terminate(item) => {
87 this.inner.set(Inner::Terminated);
88 return Poll::Ready(item);
89 }
90 }
91 }
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use std::time::Duration;
98
99 use futures::{future::pending, StreamExt};
100 use tokio::time::timeout;
101 use tonic::{Code, Status};
102
103 use super::StreamingResponse;
104
105 #[tokio::test]
106 async fn streaming_response_terminates_after_initial_error() {
107 let mut stream = StreamingResponse::<()>::from_response_future(async { Err(Status::unavailable("boom")) });
110
111 match stream.next().await {
112 Some(Err(s)) => assert_eq!(s.code(), Code::Unavailable),
113 other => panic!(
114 "expected Some(Err(Unavailable)), got {:?}",
115 other.map(|r| r.map(|_| ()))
116 ),
117 }
118
119 assert!(stream.next().await.is_none());
121 assert!(stream.next().await.is_none());
122 }
123
124 #[tokio::test]
125 async fn streaming_response_pending_initial_stays_pending() {
126 let mut stream = StreamingResponse::<()>::from_response_future(async { pending::<Result<_, Status>>().await });
128
129 assert!(
130 timeout(Duration::from_millis(50), stream.next()).await.is_err(),
131 "stream with pending initial future should not produce an item"
132 );
133 }
134}