Skip to main content

saluki_env/helpers/remote_agent/
client.rs

1use std::{
2    future::Future,
3    path::PathBuf,
4    pin::Pin,
5    task::{ready, Context, Poll},
6    time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::v1::{RefreshRemoteAgentRequest, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse};
11use datadog_protos::agent::{
12    AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
13    FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
14    TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
15    WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
16};
17use futures::Stream;
18use pin_project_lite::pin_project;
19use saluki_config::GenericConfiguration;
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use saluki_io::net::{
22    build_datadog_agent_client_ipc_tls_config, client::http::HttpsCapableConnectorBuilder, get_ipc_cert_file_path,
23};
24use serde::Deserialize;
25use tonic::{
26    service::interceptor::InterceptedService,
27    transport::{Channel, Endpoint, Uri},
28    Code, Request, Response, Status, Streaming,
29};
30use tracing::warn;
31
32use super::session::SessionId;
33use crate::helpers::tonic::BearerAuthInterceptor;
34
35fn default_agent_ipc_endpoint() -> Uri {
36    Uri::from_static("https://127.0.0.1:5001")
37}
38
39fn default_agent_auth_token_file_path() -> PathBuf {
40    PathBuf::from("/etc/datadog-agent/auth_token")
41}
42
43const fn default_connect_retry_attempts() -> usize {
44    10
45}
46
47const fn default_grpc_max_message_size() -> usize {
48    128 * 1024 * 1024
49}
50
51const fn default_connect_retry_backoff() -> Duration {
52    Duration::from_secs(2)
53}
54
55// TODO (dda-unification): This stuff should really move to a separate crate or `agent-data-plane` since it's very very specific to the Datadog Agent.
56#[derive(Deserialize)]
57struct RemoteAgentClientConfiguration {
58    /// Datadog Agent IPC endpoint to connect to.
59    ///
60    /// This is generally based on the configured `cmd_port` for the Datadog Agent, and must expose the `AgentSecure`
61    /// gRPC service.
62    ///
63    /// Defaults to `https://127.0.0.1:5001`.
64    #[serde(
65        rename = "agent_ipc_endpoint",
66        with = "http_serde_ext::uri",
67        default = "default_agent_ipc_endpoint"
68    )]
69    ipc_endpoint: Uri,
70
71    /// Path to the Agent authentication token file.
72    ///
73    /// The contents of the file are passed as a bearer token in RPC requests to the IPC endpoint.
74    ///
75    /// Defaults to `/etc/datadog-agent/auth_token`.
76    #[serde(default = "default_agent_auth_token_file_path")]
77    auth_token_file_path: PathBuf,
78
79    /// Path to the Agent IPC TLS certificate file.
80    ///
81    /// The file is expected to be PEM-encoded, containing both a certificate and private key. The certificate will be
82    /// used to verify the TLS server certificate presented by the Agent, and the certificate and private key will be
83    /// used together to provide client authentication _to_ the Agent.
84    ///
85    /// Defaults to `ipc_cert.pem` in the same directory as the Agent authentication token file. (for example, if
86    /// `auth_token_file_path` is `/etc/datadog-agent/auth_token`, this will be `/etc/datadog-agent/ipc_cert.pem`.)
87    #[serde(default)]
88    ipc_cert_file_path: Option<PathBuf>,
89
90    /// Number of allowed retry attempts when initially connecting.
91    ///
92    /// Defaults to `10`.
93    #[serde(default = "default_connect_retry_attempts")]
94    connect_retry_attempts: usize,
95
96    /// Amount of time to wait between connection attempts when initially connecting.
97    ///
98    /// Defaults to 2 seconds.
99    #[serde(default = "default_connect_retry_backoff")]
100    connect_retry_backoff: Duration,
101
102    /// Maximum message size for gRPC messages.
103    ///
104    /// Defaults to `128 * 1024 * 1024` (128MB).
105    #[serde(
106        rename = "agent_ipc_grpc_max_message_size",
107        default = "default_grpc_max_message_size"
108    )]
109    grpc_max_message_size: usize,
110}
111
112impl BackoffBuilder for &RemoteAgentClientConfiguration {
113    type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
114
115    fn build(self) -> Self::Backoff {
116        ConstantBuilder::default()
117            .with_delay(self.connect_retry_backoff)
118            .with_max_times(self.connect_retry_attempts)
119            .build()
120    }
121}
122
123/// A client for interacting with the Datadog Agent's internal gRPC-based API.
124#[derive(Clone)]
125pub struct RemoteAgentClient {
126    client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
127    secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
128}
129
130impl RemoteAgentClient {
131    /// Creates a new `RemoteAgentClient` from the given configuration.
132    ///
133    /// # Errors
134    ///
135    /// If the Agent gRPC client cannot be created (invalid API endpoint, missing authentication token, etc), or if the
136    /// authentication token is invalid, an error will be returned.
137    pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
138        let mut config = config
139            .as_typed::<RemoteAgentClientConfiguration>()
140            .error_context("Failed to parse configuration for Remote Agent client.")?;
141
142        // The core-agent defaults to the empty string for the auth_token_file_path and ipc_cert_file_path. We need to handle this by resetting to the defaults.
143        if config.auth_token_file_path.as_os_str().is_empty() {
144            config.auth_token_file_path = default_agent_auth_token_file_path();
145        }
146        if let Some(ref cert_path) = config.ipc_cert_file_path {
147            if cert_path.as_os_str().is_empty() {
148                config.ipc_cert_file_path = None;
149            }
150        }
151
152        // TODO: We need to write a Tower middleware service that allows applying a backoff between failed calls,
153        // specifically so that we can throttle reconnection attempts.
154        //
155        // When the remote Agent endpoint is not available -- Agent isn't running, etc -- the gRPC client will
156        // essentially freewheel, trying to reconnect as quickly as possible, which spams the logs, wastes resources, so
157        // on and so forth. We would want to essentially apply a backoff like any other client would for the RPC calls
158        // themselves, but use it with the _connector_ instead.
159        //
160        // We could potentially just use a retry middleware, but Tonic does have its own reconnection logic, so we'd
161        // have to test it out to make sure it behaves sensibly.
162        let service_builder = || async {
163            let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
164            let ipc_cert_file_path =
165                get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
166            let client_tls_config = build_datadog_agent_client_ipc_tls_config(ipc_cert_file_path).await?;
167            let https_connector = HttpsCapableConnectorBuilder::default().build(client_tls_config)?;
168            let channel = Endpoint::from(config.ipc_endpoint.clone())
169                .connect_timeout(Duration::from_secs(2))
170                .connect_with_connector(https_connector)
171                .await
172                .with_error_context(|| {
173                    format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
174                })?;
175
176            Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
177        };
178
179        let service = service_builder
180            .retry(&config)
181            .notify(|e, delay| {
182                warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
183            })
184            .await
185            .error_context("Failed to create Datadog Agent API client.")?;
186
187        let client = AgentClient::new(service.clone()).max_decoding_message_size(config.grpc_max_message_size);
188        let mut secure_client = AgentSecureClient::new(service).max_decoding_message_size(config.grpc_max_message_size);
189
190        // Try and do a basic health check to make sure we can connect and that our authentication token is valid.
191        try_query_agent_api(&mut secure_client).await?;
192
193        Ok(Self { client, secure_client })
194    }
195
196    /// Gets the detected hostname from the Agent.
197    ///
198    /// # Errors
199    ///
200    /// If there is an error querying the Agent API, an error will be returned.
201    pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
202        let response = self
203            .client
204            .get_hostname(HostnameRequest {})
205            .await
206            .map(|r| r.into_inner())?;
207
208        Ok(response.hostname)
209    }
210
211    /// Gets a stream of tagger entities at the given cardinality.
212    ///
213    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
214    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
215    pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
216        let mut client = self.secure_client.clone();
217        StreamingResponse::from_response_future(async move {
218            client
219                .tagger_stream_entities(StreamTagsRequest {
220                    cardinality: cardinality.into(),
221                    ..Default::default()
222                })
223                .await
224        })
225    }
226
227    /// Gets a stream of all workloadmeta entities.
228    ///
229    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
230    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
231    pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
232        let mut client = self.secure_client.clone();
233        StreamingResponse::from_response_future(async move {
234            client
235                .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
236                    filter: Some(WorkloadmetaFilter {
237                        kinds: vec![
238                            WorkloadmetaKind::Container.into(),
239                            WorkloadmetaKind::KubernetesPod.into(),
240                            WorkloadmetaKind::EcsTask.into(),
241                        ],
242                        source: WorkloadmetaSource::All.into(),
243                        event_type: WorkloadmetaEventType::EventTypeAll.into(),
244                    }),
245                })
246                .await
247        })
248    }
249
250    /// Registers a Remote Agent with the Agent.
251    ///
252    /// # Errors
253    ///
254    /// If there is an error sending the request to the Agent API, an error will be returned.
255    pub async fn register_remote_agent_request(
256        &mut self, pid: u32, display_name: &str, api_endpoint: &str, services: Vec<String>,
257    ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
258        let mut client = self.secure_client.clone();
259        let response = client
260            .register_remote_agent(RegisterRemoteAgentRequest {
261                pid: pid.to_string(),
262                flavor: "agent-data-plane".to_string(),
263                display_name: display_name.to_string(),
264                api_endpoint_uri: api_endpoint.to_string(),
265                services,
266            })
267            .await?;
268        Ok(response)
269    }
270
271    /// Refreshes the given remote agent session with the Agent.
272    ///
273    /// # Errors
274    ///
275    /// If there is an error sending the request to the Agent API, an error will be returned.
276    pub async fn refresh_remote_agent_request(&mut self, session_id: &SessionId) -> Result<Response<()>, GenericError> {
277        let mut client = self.secure_client.clone();
278        let response = client
279            .refresh_remote_agent(RefreshRemoteAgentRequest {
280                session_id: session_id.to_string(),
281            })
282            .await?
283            .map(|_| ());
284        Ok(response)
285    }
286
287    /// Gets the host tags from the Agent.
288    ///
289    /// # Errors
290    ///
291    /// If there is an error querying the Agent API, an error will be returned.
292    pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
293        let mut client = self.secure_client.clone();
294        let response = client.get_host_tags(HostTagRequest {}).await?;
295        Ok(response)
296    }
297
298    /// Gets a stream of autodiscovery config updates.
299    ///
300    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
301    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
302    pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
303        let mut client = self.secure_client.clone();
304        StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
305    }
306
307    /// Gets a stream of config events.
308    ///
309    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
310    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
311    pub fn stream_config_events(&mut self, session_id: &SessionId) -> StreamingResponse<ConfigEvent> {
312        let mut client = self.secure_client.clone();
313        let app_details = saluki_metadata::get_app_details();
314        let formatted_full_name = app_details
315            .full_name()
316            .replace(" ", "-")
317            .replace("_", "-")
318            .to_lowercase();
319
320        let mut request = Request::new(ConfigStreamRequest {
321            name: formatted_full_name,
322        });
323
324        request
325            .metadata_mut()
326            .insert("session_id", session_id.to_grpc_header_value());
327
328        StreamingResponse::from_response_future(async move { client.stream_config_events(request).await })
329    }
330}
331
332pin_project! {
333    /// A streaming gRPC response.
334    ///
335    /// Compared to the normal streaming response type from [`tonic`], `StreamingResponse` handles a special case where
336    /// servers may not send an initial message that allows the RPC to "establish", which is required to create the
337    /// `Streaming` object that can then be polled. This leads to an issue where calls can effectively appear to block
338    /// until the first message is sent by the server, which is suboptimal.
339    ///
340    /// `StreamingResponse` exposes a unified [`Stream`] implementation that encompasses both the initial RPC
341    /// establishment and subsequent messages sent by the server, to allow for a more seamless experience when working
342    /// with streaming RPCs.
343    #[project = StreamingResponseProj]
344    pub enum StreamingResponse<T> {
345        /// Waiting for the server to stream the first message.
346        Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
347
348        /// Waiting for the server to stream the next message.
349        Streaming { #[pin] stream: Streaming<T> },
350
351        /// Stream has produced an error or reached its end; further polls yield `None`.
352        Terminated,
353    }
354}
355
356impl<T> StreamingResponse<T> {
357    fn from_response_future<F>(fut: F) -> Self
358    where
359        F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
360    {
361        Self::Initial { inner: Box::pin(fut) }
362    }
363
364    fn from_response(response: Response<Streaming<T>>) -> Self {
365        Self::Streaming {
366            stream: response.into_inner(),
367        }
368    }
369}
370
371impl<T> Stream for StreamingResponse<T> {
372    type Item = Result<T, Status>;
373
374    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
375        // Each arm picks one of three outcomes: advance to a new state and loop, yield an item
376        // leaving state untouched, or fuse to `Terminated` while yielding an item. Fusing ensures
377        // no now-finished resource (notably the `Initial` future) is ever polled again.
378        #[allow(clippy::large_enum_variant)]
379        enum Outcome<T> {
380            Advance(StreamingResponse<T>),
381            Yield(Option<Result<T, Status>>),
382            Terminate(Option<Result<T, Status>>),
383        }
384
385        loop {
386            let this = self.as_mut().project();
387            let outcome = match this {
388                StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
389                    Ok(response) => Outcome::Advance(Self::from_response(response)),
390                    Err(status) => Outcome::Terminate(Some(Err(status))),
391                },
392                StreamingResponseProj::Streaming { stream } => match ready!(stream.poll_next(cx)) {
393                    Some(maybe_item) => Outcome::Yield(Some(maybe_item)),
394                    None => Outcome::Terminate(None),
395                },
396                StreamingResponseProj::Terminated => Outcome::Yield(None),
397            };
398
399            match outcome {
400                Outcome::Advance(state) => self.set(state),
401                Outcome::Yield(item) => return Poll::Ready(item),
402                Outcome::Terminate(item) => {
403                    self.set(Self::Terminated);
404                    return Poll::Ready(item);
405                }
406            }
407        }
408    }
409}
410
411async fn try_query_agent_api(
412    client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
413) -> Result<(), GenericError> {
414    let noop_fetch_request = FetchEntityRequest {
415        id: Some(EntityId {
416            prefix: "container_id".to_string(),
417            uid: "nonexistent".to_string(),
418        }),
419        cardinality: TagCardinality::High.into(),
420    };
421    match client.tagger_fetch_entity(noop_fetch_request).await {
422        Ok(_) => Ok(()),
423        Err(e) => match e.code() {
424            Code::Unauthenticated => Err(generic_error!(
425                "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
426            )),
427            _ => Err(e.into()),
428        },
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use std::time::Duration;
435
436    use futures::{future::pending, StreamExt};
437    use tokio::time::timeout;
438    use tonic::{Code, Status};
439
440    use super::StreamingResponse;
441
442    #[tokio::test]
443    async fn streaming_response_terminates_after_initial_error() {
444        // Regression test: prior to fusing the `Initial` state on error, the second poll re-entered
445        // the already-completed async block and panicked with "async fn resumed after completion".
446        let mut stream = StreamingResponse::<()>::from_response_future(async { Err(Status::unavailable("boom")) });
447
448        match stream.next().await {
449            Some(Err(s)) => assert_eq!(s.code(), Code::Unavailable),
450            other => panic!(
451                "expected Some(Err(Unavailable)), got {:?}",
452                other.map(|r| r.map(|_| ()))
453            ),
454        }
455
456        // Subsequent polls must yield `None` and must not panic.
457        assert!(stream.next().await.is_none());
458        assert!(stream.next().await.is_none());
459    }
460
461    #[tokio::test]
462    async fn streaming_response_pending_initial_stays_pending() {
463        // Smoke test: a pending inner future leaves `poll_next` Pending without advancing state.
464        let mut stream = StreamingResponse::<()>::from_response_future(async { pending::<Result<_, Status>>().await });
465
466        assert!(
467            timeout(Duration::from_millis(50), stream.next()).await.is_err(),
468            "stream with pending initial future should not produce an item"
469        );
470    }
471}