Skip to main content

datadog_agent_commons/ipc/client/
streaming.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{ready, Context, Poll},
5};
6
7use futures::Stream;
8use pin_project_lite::pin_project;
9use tonic::{Response, Status, Streaming};
10
11pin_project! {
12    #[project = InnerProj]
13    enum Inner<T> {
14        /// Waiting for the server to stream the first message.
15        Initial { fut: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
16
17        /// Waiting for the server to stream the next message.
18        Streaming { #[pin] stream: Streaming<T> },
19
20        /// Stream has produced an error or reached its end; further polls yield `None`.
21        Terminated,
22    }
23}
24
25pin_project! {
26    /// A streaming gRPC response.
27    ///
28    /// Compared to the normal streaming response type from [`tonic`], `StreamingResponse` handles a special case where
29    /// servers may not send a guaranteed initial message, which is required for `tonic` to create and return the
30    /// `Streaming` object that can then be polled. This leads to an issue where calls can effectively appear to block
31    /// until the first message is sent by the server, which is suboptimal.
32    ///
33    /// `StreamingResponse` exposes a unified [`Stream`] implementation that encompasses both the initial RPC
34    /// establishment and subsequent messages sent by the server, to allow for a more seamless experience when working
35    /// with streaming RPCs.
36    pub struct StreamingResponse<T> {
37        #[pin]
38        inner: Inner<T>,
39    }
40}
41
42impl<T> StreamingResponse<T> {
43    pub(super) fn from_response_future<F>(fut: F) -> Self
44    where
45        F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
46    {
47        Self {
48            inner: Inner::Initial { fut: Box::pin(fut) },
49        }
50    }
51}
52
53impl<T> Stream for StreamingResponse<T> {
54    type Item = Result<T, Status>;
55
56    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57        // Each arm picks one of three outcomes: advance to a new state and loop, yield an item
58        // leaving state untouched, or fuse to `Terminated` while yielding an item. Fusing ensures
59        // no now-finished resource (notably the `Initial` future) is ever polled again.
60        #[allow(clippy::large_enum_variant)]
61        enum Outcome<T> {
62            Advance(Streaming<T>),
63            Yield(Option<Result<T, Status>>),
64            Terminate(Option<Result<T, Status>>),
65        }
66
67        loop {
68            let mut this = self.as_mut().project();
69            let outcome = match this.inner.as_mut().project() {
70                InnerProj::Initial { fut } => match ready!(fut.as_mut().poll(cx)) {
71                    Ok(response) => Outcome::Advance(response.into_inner()),
72                    Err(status) => Outcome::Terminate(Some(Err(status))),
73                },
74                InnerProj::Streaming { stream } => match ready!(stream.poll_next(cx)) {
75                    Some(maybe_item) => Outcome::Yield(Some(maybe_item)),
76                    None => Outcome::Terminate(None),
77                },
78                InnerProj::Terminated => Outcome::Yield(None),
79            };
80
81            match outcome {
82                Outcome::Advance(stream) => {
83                    this.inner.set(Inner::Streaming { stream });
84                }
85                Outcome::Yield(item) => return Poll::Ready(item),
86                Outcome::Terminate(item) => {
87                    this.inner.set(Inner::Terminated);
88                    return Poll::Ready(item);
89                }
90            }
91        }
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use std::time::Duration;
98
99    use futures::{future::pending, StreamExt};
100    use tokio::time::timeout;
101    use tonic::{Code, Status};
102
103    use super::StreamingResponse;
104
105    #[tokio::test]
106    async fn streaming_response_terminates_after_initial_error() {
107        // Regression test: prior to fusing the `Initial` state on error, the second poll re-entered
108        // the already-completed async block and panicked with "async fn resumed after completion".
109        let mut stream = StreamingResponse::<()>::from_response_future(async { Err(Status::unavailable("boom")) });
110
111        match stream.next().await {
112            Some(Err(s)) => assert_eq!(s.code(), Code::Unavailable),
113            other => panic!(
114                "expected Some(Err(Unavailable)), got {:?}",
115                other.map(|r| r.map(|_| ()))
116            ),
117        }
118
119        // Subsequent polls must yield `None` and must not panic.
120        assert!(stream.next().await.is_none());
121        assert!(stream.next().await.is_none());
122    }
123
124    #[tokio::test]
125    async fn streaming_response_pending_initial_stays_pending() {
126        // Smoke test: a pending inner future leaves `poll_next` Pending without advancing state.
127        let mut stream = StreamingResponse::<()>::from_response_future(async { pending::<Result<_, Status>>().await });
128
129        assert!(
130            timeout(Duration::from_millis(50), stream.next()).await.is_err(),
131            "stream with pending initial future should not produce an item"
132        );
133    }
134}