saluki_env/helpers/remote_agent/
client.rs

1use std::{
2    future::Future,
3    path::PathBuf,
4    pin::Pin,
5    task::{ready, Context, Poll},
6    time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::v1::{
11    RefreshRemoteAgentRequest, RefreshRemoteAgentResponse, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse,
12};
13use datadog_protos::agent::{
14    AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
15    FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
16    TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
17    WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
18};
19use futures::Stream;
20use pin_project_lite::pin_project;
21use saluki_config::GenericConfiguration;
22use saluki_error::{generic_error, ErrorContext as _, GenericError};
23use saluki_io::net::{build_datadog_agent_ipc_https_connector, get_ipc_cert_file_path};
24use serde::Deserialize;
25use tonic::{
26    service::interceptor::InterceptedService,
27    transport::{Channel, Endpoint, Uri},
28    Code, Response, Status, Streaming,
29};
30use tracing::warn;
31
32use crate::helpers::tonic::BearerAuthInterceptor;
33
34fn default_agent_ipc_endpoint() -> Uri {
35    Uri::from_static("https://127.0.0.1:5001")
36}
37
38fn default_agent_auth_token_file_path() -> PathBuf {
39    PathBuf::from("/etc/datadog-agent/auth_token")
40}
41
42const fn default_connect_retry_attempts() -> usize {
43    10
44}
45
46const fn default_connect_retry_backoff() -> Duration {
47    Duration::from_secs(2)
48}
49
50#[derive(Deserialize)]
51struct RemoteAgentClientConfiguration {
52    /// Datadog Agent IPC endpoint to connect to.
53    ///
54    /// This is generally based on the configured `cmd_port` for the Datadog Agent, and must expose the `AgentSecure`
55    /// gRPC service.
56    ///
57    /// Defaults to `https://127.0.0.1:5001`.
58    #[serde(
59        rename = "agent_ipc_endpoint",
60        with = "http_serde_ext::uri",
61        default = "default_agent_ipc_endpoint"
62    )]
63    ipc_endpoint: Uri,
64
65    /// Path to the Agent authentication token file.
66    ///
67    /// The contents of the file are passed as a bearer token in RPC requests to the IPC endpoint.
68    ///
69    /// Defaults to `/etc/datadog-agent/auth_token`.
70    #[serde(default = "default_agent_auth_token_file_path")]
71    auth_token_file_path: PathBuf,
72
73    /// Path to the Agent IPC TLS certificate file.
74    ///
75    /// The file is expected to be PEM-encoded, containing both a certificate and private key. The certificate will be
76    /// used to verify the TLS server certificate presented by the Agent, and the certificate and private key will be
77    /// used together to provide client authentication _to_ the Agent.
78    ///
79    /// Defaults to `ipc_cert.pem` in the same directory as the Agent authentication token file. (e.g., if
80    /// `auth_token_file_path` is `/etc/datadog-agent/auth_token`, this will be `/etc/datadog-agent/ipc_cert.pem`.)
81    #[serde(default)]
82    ipc_cert_file_path: Option<PathBuf>,
83
84    /// Number of allowed retry attempts when initially connecting.
85    ///
86    /// Defaults to `10`.
87    #[serde(default = "default_connect_retry_attempts")]
88    connect_retry_attempts: usize,
89
90    /// Amount of time to wait between connection attempts when initially connecting.
91    ///
92    /// Defaults to 2 seconds.
93    #[serde(default = "default_connect_retry_backoff")]
94    connect_retry_backoff: Duration,
95}
96
97impl BackoffBuilder for &RemoteAgentClientConfiguration {
98    type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
99
100    fn build(self) -> Self::Backoff {
101        ConstantBuilder::default()
102            .with_delay(self.connect_retry_backoff)
103            .with_max_times(self.connect_retry_attempts)
104            .build()
105    }
106}
107
108/// A client for interacting with the Datadog Agent's internal gRPC-based API.
109#[derive(Clone)]
110pub struct RemoteAgentClient {
111    client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
112    secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
113}
114
115impl RemoteAgentClient {
116    /// Creates a new `RemoteAgentClient` from the given configuration.
117    ///
118    /// # Errors
119    ///
120    /// If the Agent gRPC client cannot be created (invalid API endpoint, missing authentication token, etc), or if the
121    /// authentication token is invalid, an error will be returned.
122    pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
123        let mut config = config
124            .as_typed::<RemoteAgentClientConfiguration>()
125            .error_context("Failed to parse configuration for Remote Agent client.")?;
126
127        // The core-agent defaults to the empty string for the auth_token_file_path and ipc_cert_file_path. We need to handle this by resetting to the defaults.
128        if config.auth_token_file_path.as_os_str().is_empty() {
129            config.auth_token_file_path = default_agent_auth_token_file_path();
130        }
131        if let Some(ref cert_path) = config.ipc_cert_file_path {
132            if cert_path.as_os_str().is_empty() {
133                config.ipc_cert_file_path = None;
134            }
135        }
136
137        // TODO: We need to write a Tower middleware service that allows applying a backoff between failed calls,
138        // specifically so that we can throttle reconnection attempts.
139        //
140        // When the remote Agent endpoint is not available -- Agent isn't running, etc -- the gRPC client will
141        // essentially freewheel, trying to reconnect as quickly as possible, which spams the logs, wastes resources, so
142        // on and so forth. We would want to essentially apply a backoff like any other client would for the RPC calls
143        // themselves, but use it with the _connector_ instead.
144        //
145        // We could potentially just use a retry middleware, but Tonic does have its own reconnection logic, so we'd
146        // have to test it out to make sure it behaves sensibly.
147        let service_builder = || async {
148            let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
149            let ipc_cert_file_path =
150                get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
151            let https_connector = build_datadog_agent_ipc_https_connector(ipc_cert_file_path).await?;
152            let channel = Endpoint::from(config.ipc_endpoint.clone())
153                .connect_timeout(Duration::from_secs(2))
154                .connect_with_connector(https_connector)
155                .await
156                .with_error_context(|| {
157                    format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
158                })?;
159
160            Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
161        };
162
163        let service = service_builder
164            .retry(&config)
165            .notify(|e, delay| {
166                warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
167            })
168            .await
169            .error_context("Failed to create Datadog Agent API client.")?;
170
171        let client = AgentClient::new(service.clone());
172        let mut secure_client = AgentSecureClient::new(service);
173
174        // Try and do a basic health check to make sure we can connect and that our authentication token is valid.
175        try_query_agent_api(&mut secure_client).await?;
176
177        Ok(Self { client, secure_client })
178    }
179
180    /// Gets the detected hostname from the Agent.
181    ///
182    /// # Errors
183    ///
184    /// If there is an error querying the Agent API, an error will be returned.
185    pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
186        let response = self
187            .client
188            .get_hostname(HostnameRequest {})
189            .await
190            .map(|r| r.into_inner())?;
191
192        Ok(response.hostname)
193    }
194
195    /// Gets a stream of tagger entities at the given cardinality.
196    ///
197    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
198    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
199    pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
200        let mut client = self.secure_client.clone();
201        StreamingResponse::from_response_future(async move {
202            client
203                .tagger_stream_entities(StreamTagsRequest {
204                    cardinality: cardinality.into(),
205                    ..Default::default()
206                })
207                .await
208        })
209    }
210
211    /// Gets a stream of all workloadmeta entities.
212    ///
213    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
214    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
215    pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
216        let mut client = self.secure_client.clone();
217        StreamingResponse::from_response_future(async move {
218            client
219                .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
220                    filter: Some(WorkloadmetaFilter {
221                        kinds: vec![
222                            WorkloadmetaKind::Container.into(),
223                            WorkloadmetaKind::KubernetesPod.into(),
224                            WorkloadmetaKind::EcsTask.into(),
225                        ],
226                        source: WorkloadmetaSource::All.into(),
227                        event_type: WorkloadmetaEventType::EventTypeAll.into(),
228                    }),
229                })
230                .await
231        })
232    }
233
234    /// Registers a Remote Agent with the Agent.
235    ///
236    /// # Errors
237    ///
238    /// If there is an error sending the request to the Agent API, an error will be returned.
239    pub async fn register_remote_agent_request(
240        &mut self, pid: u32, display_name: &str, api_endpoint: &str, services: Vec<String>,
241    ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
242        let mut client = self.secure_client.clone();
243        let response = client
244            .register_remote_agent(RegisterRemoteAgentRequest {
245                pid: pid.to_string(),
246                flavor: "agent-data-plane".to_string(),
247                display_name: display_name.to_string(),
248                api_endpoint_uri: api_endpoint.to_string(),
249                services,
250            })
251            .await?;
252        Ok(response)
253    }
254
255    /// Refreshes a Remote Agent with the Agent.
256    ///
257    /// If there is an error sending the request to the Agent API, an error will be returned.
258    pub async fn refresh_remote_agent_request(
259        &mut self, session_id: &str,
260    ) -> Result<Response<RefreshRemoteAgentResponse>, GenericError> {
261        let mut client = self.secure_client.clone();
262        let response = client
263            .refresh_remote_agent(RefreshRemoteAgentRequest {
264                session_id: session_id.to_string(),
265            })
266            .await?;
267        Ok(response)
268    }
269
270    /// Gets the host tags from the Agent.
271    ///
272    /// # Errors
273    ///
274    /// If there is an error querying the Agent API, an error will be returned.
275    pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
276        let mut client = self.secure_client.clone();
277        let response = client.get_host_tags(HostTagRequest {}).await?;
278        Ok(response)
279    }
280
281    /// Gets a stream of autodiscovery config updates.
282    ///
283    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
284    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
285    pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
286        let mut client = self.secure_client.clone();
287        StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
288    }
289
290    /// Gets a stream of config events.
291    ///
292    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
293    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
294    pub fn stream_config_events(&mut self) -> StreamingResponse<ConfigEvent> {
295        let mut client = self.secure_client.clone();
296        let app_details = saluki_metadata::get_app_details();
297        let formatted_full_name = app_details
298            .full_name()
299            .replace(" ", "-")
300            .replace("_", "-")
301            .to_lowercase();
302        StreamingResponse::from_response_future(async move {
303            client
304                .stream_config_events(ConfigStreamRequest {
305                    name: formatted_full_name,
306                })
307                .await
308        })
309    }
310}
311
312pin_project! {
313    /// A streaming gRPC response.
314    ///
315    /// Compared to the normal streaming response type from [`tonic`], `StreamingResponse` handles a special case where
316    /// servers may not send an initial message that allows the RPC to "establish", which is required to create the
317    /// `Streaming` object that can then be polled. This leads to an issue where calls can effectively appear to block
318    /// until the first message is sent by the server, which is suboptimal.
319    ///
320    /// `StreamingResponse` exposes a unified [`Stream`] implementation that encompasses both the initial RPC
321    /// establishment and subsequent messages sent by the server, to allow for a more seamless experience when working
322    /// with streaming RPCs.
323    #[project = StreamingResponseProj]
324    pub enum StreamingResponse<T> {
325        /// Waiting for the server to stream the first message.
326        Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
327
328        /// Waiting for the server to stream the next message.
329        Streaming { #[pin] stream: Streaming<T> },
330    }
331}
332
333impl<T> StreamingResponse<T> {
334    fn from_response_future<F>(fut: F) -> Self
335    where
336        F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
337    {
338        Self::Initial { inner: Box::pin(fut) }
339    }
340}
341
342impl<T> Stream for StreamingResponse<T> {
343    type Item = Result<T, Status>;
344
345    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
346        loop {
347            let this = self.as_mut().project();
348            let new_state = match this {
349                // When we get the initial response, we either get the streaming object or an error.
350                //
351                // The streaming object itself has to be polled to get an actual message, so we have to do a little
352                // dance here to update our state to the `Streaming` variant when that happens, and then loop so we can
353                // poll the streaming object for a message... but if we got an error, we just yield it like a normal
354                // item on the stream.
355                StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
356                    Ok(response) => {
357                        let stream = response.into_inner();
358                        StreamingResponse::Streaming { stream }
359                    }
360                    Err(status) => return Poll::Ready(Some(Err(status))),
361                },
362                StreamingResponseProj::Streaming { stream } => return stream.poll_next(cx),
363            };
364
365            self.set(new_state);
366        }
367    }
368}
369
370async fn try_query_agent_api(
371    client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
372) -> Result<(), GenericError> {
373    let noop_fetch_request = FetchEntityRequest {
374        id: Some(EntityId {
375            prefix: "container_id".to_string(),
376            uid: "nonexistent".to_string(),
377        }),
378        cardinality: TagCardinality::High.into(),
379    };
380    match client.tagger_fetch_entity(noop_fetch_request).await {
381        Ok(_) => Ok(()),
382        Err(e) => match e.code() {
383            Code::Unauthenticated => Err(generic_error!(
384                "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
385            )),
386            _ => Err(e.into()),
387        },
388    }
389}