saluki_env/helpers/remote_agent/
client.rs

1use std::{
2    future::Future,
3    path::{Path, PathBuf},
4    pin::Pin,
5    task::{ready, Context, Poll},
6    time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::{
11    AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
12    FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, RegisterRemoteAgentRequest,
13    RegisterRemoteAgentResponse, StreamTagsRequest, StreamTagsResponse, TagCardinality, WorkloadmetaEventType,
14    WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource, WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
15};
16use futures::Stream;
17use pin_project_lite::pin_project;
18use saluki_config::GenericConfiguration;
19use saluki_error::{generic_error, ErrorContext as _, GenericError};
20use saluki_io::net::build_datadog_agent_ipc_https_connector;
21use serde::Deserialize;
22use tonic::{
23    service::interceptor::InterceptedService,
24    transport::{Channel, Endpoint, Uri},
25    Code, Response, Status, Streaming,
26};
27use tracing::warn;
28
29use crate::helpers::tonic::BearerAuthInterceptor;
30
31const DEFAULT_DATADOG_AGENT_CONFIG_DIR: &str = "/etc/datadog-agent";
32const DEFAULT_IPC_CERT_FILE_NAME: &str = "ipc_cert.pem";
33
34fn default_agent_ipc_endpoint() -> Uri {
35    Uri::from_static("https://127.0.0.1:5001")
36}
37
38fn default_agent_auth_token_file_path() -> PathBuf {
39    PathBuf::from("/etc/datadog-agent/auth_token")
40}
41
42const fn default_connect_retry_attempts() -> usize {
43    10
44}
45
46const fn default_connect_retry_backoff() -> Duration {
47    Duration::from_secs(2)
48}
49
50#[derive(Deserialize)]
51struct RemoteAgentClientConfiguration {
52    /// Datadog Agent IPC endpoint to connect to.
53    ///
54    /// This is generally based on the configured `cmd_port` for the Datadog Agent, and must expose the `AgentSecure`
55    /// gRPC service.
56    ///
57    /// Defaults to `https://127.0.0.1:5001`.
58    #[serde(
59        rename = "agent_ipc_endpoint",
60        with = "http_serde_ext::uri",
61        default = "default_agent_ipc_endpoint"
62    )]
63    ipc_endpoint: Uri,
64
65    /// Path to the Agent authentication token file.
66    ///
67    /// The contents of the file are passed as a bearer token in RPC requests to the IPC endpoint.
68    ///
69    /// Defaults to `/etc/datadog-agent/auth_token`.
70    #[serde(default = "default_agent_auth_token_file_path")]
71    auth_token_file_path: PathBuf,
72
73    /// Path to the Agent IPC TLS certificate file.
74    ///
75    /// The file is expected to be PEM-encoded, containing both a certificate and private key. The certificate will be
76    /// used to verify the TLS server certificate presented by the Agent, and the certificate and private key will be
77    /// used together to provide client authentication _to_ the Agent.
78    ///
79    /// Defaults to `ipc_cert.pem` in the same directory as the Agent authentication token file. (e.g., if
80    /// `auth_token_file_path` is `/etc/datadog-agent/auth_token`, this will be `/etc/datadog-agent/ipc_cert.pem`.)
81    #[serde(default)]
82    ipc_cert_file_path: Option<PathBuf>,
83
84    /// Number of allowed retry attempts when initially connecting.
85    ///
86    /// Defaults to `10`.
87    #[serde(default = "default_connect_retry_attempts")]
88    connect_retry_attempts: usize,
89
90    /// Amount of time to wait between connection attempts when initially connecting.
91    ///
92    /// Defaults to 2 seconds.
93    #[serde(default = "default_connect_retry_backoff")]
94    connect_retry_backoff: Duration,
95}
96
97impl BackoffBuilder for &RemoteAgentClientConfiguration {
98    type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
99
100    fn build(self) -> Self::Backoff {
101        ConstantBuilder::default()
102            .with_delay(self.connect_retry_backoff)
103            .with_max_times(self.connect_retry_attempts)
104            .build()
105    }
106}
107
108/// A client for interacting with the Datadog Agent's internal gRPC-based API.
109#[derive(Clone)]
110pub struct RemoteAgentClient {
111    client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
112    secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
113}
114
115impl RemoteAgentClient {
116    /// Creates a new `RemoteAgentClient` from the given configuration.
117    ///
118    /// # Errors
119    ///
120    /// If the Agent gRPC client cannot be created (invalid API endpoint, missing authentication token, etc), or if the
121    /// authentication token is invalid, an error will be returned.
122    pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
123        let mut config = config
124            .as_typed::<RemoteAgentClientConfiguration>()
125            .error_context("Failed to parse configuration for Remote Agent client.")?;
126
127        // The core-agent defaults to the empty string for the auth_token_file_path and ipc_cert_file_path. We need to handle this by resetting to the defaults.
128        if config.auth_token_file_path.as_os_str().is_empty() {
129            config.auth_token_file_path = default_agent_auth_token_file_path();
130        }
131        if let Some(ref cert_path) = config.ipc_cert_file_path {
132            if cert_path.as_os_str().is_empty() {
133                config.ipc_cert_file_path = None;
134            }
135        }
136
137        // TODO: We need to write a Tower middleware service that allows applying a backoff between failed calls,
138        // specifically so that we can throttle reconnection attempts.
139        //
140        // When the remote Agent endpoint is not available -- Agent isn't running, etc -- the gRPC client will
141        // essentially freewheel, trying to reconnect as quickly as possible, which spams the logs, wastes resources, so
142        // on and so forth. We would want to essentially apply a backoff like any other client would for the RPC calls
143        // themselves, but use it with the _connector_ instead.
144        //
145        // We could potentially just use a retry middleware, but Tonic does have its own reconnection logic, so we'd
146        // have to test it out to make sure it behaves sensibly.
147        let service_builder = || async {
148            let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
149            let ipc_cert_file_path =
150                get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
151            let https_connector = build_datadog_agent_ipc_https_connector(ipc_cert_file_path).await?;
152            let channel = Endpoint::from(config.ipc_endpoint.clone())
153                .connect_timeout(Duration::from_secs(2))
154                .connect_with_connector(https_connector)
155                .await
156                .with_error_context(|| {
157                    format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
158                })?;
159
160            Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
161        };
162
163        let service = service_builder
164            .retry(&config)
165            .notify(|e, delay| {
166                warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
167            })
168            .await
169            .error_context("Failed to create Datadog Agent API client.")?;
170
171        let client = AgentClient::new(service.clone());
172        let mut secure_client = AgentSecureClient::new(service);
173
174        // Try and do a basic health check to make sure we can connect and that our authentication token is valid.
175        try_query_agent_api(&mut secure_client).await?;
176
177        Ok(Self { client, secure_client })
178    }
179
180    /// Gets the detected hostname from the Agent.
181    ///
182    /// # Errors
183    ///
184    /// If there is an error querying the Agent API, an error will be returned.
185    pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
186        let response = self
187            .client
188            .get_hostname(HostnameRequest {})
189            .await
190            .map(|r| r.into_inner())?;
191
192        Ok(response.hostname)
193    }
194
195    /// Gets a stream of tagger entities at the given cardinality.
196    ///
197    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
198    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
199    pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
200        let mut client = self.secure_client.clone();
201        StreamingResponse::from_response_future(async move {
202            client
203                .tagger_stream_entities(StreamTagsRequest {
204                    cardinality: cardinality.into(),
205                    ..Default::default()
206                })
207                .await
208        })
209    }
210
211    /// Gets a stream of all workloadmeta entities.
212    ///
213    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
214    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
215    pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
216        let mut client = self.secure_client.clone();
217        StreamingResponse::from_response_future(async move {
218            client
219                .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
220                    filter: Some(WorkloadmetaFilter {
221                        kinds: vec![
222                            WorkloadmetaKind::Container.into(),
223                            WorkloadmetaKind::KubernetesPod.into(),
224                            WorkloadmetaKind::EcsTask.into(),
225                        ],
226                        source: WorkloadmetaSource::All.into(),
227                        event_type: WorkloadmetaEventType::EventTypeAll.into(),
228                    }),
229                })
230                .await
231        })
232    }
233
234    /// Registers a Remote Agent with the Agent.
235    ///
236    /// # Errors
237    ///
238    /// If there is an error sending the request to the Agent API, an error will be returned.
239    pub async fn register_remote_agent_request(
240        &mut self, id: &str, display_name: &str, api_endpoint: &str, auth_token: &str,
241    ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
242        let mut client = self.secure_client.clone();
243        let response = client
244            .register_remote_agent(RegisterRemoteAgentRequest {
245                id: id.to_string(),
246                display_name: display_name.to_string(),
247                api_endpoint: api_endpoint.to_string(),
248                auth_token: auth_token.to_string(),
249            })
250            .await?;
251        Ok(response)
252    }
253
254    /// Gets the host tags from the Agent.
255    ///
256    /// # Errors
257    ///
258    /// If there is an error querying the Agent API, an error will be returned.
259    pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
260        let mut client = self.secure_client.clone();
261        let response = client.get_host_tags(HostTagRequest {}).await?;
262        Ok(response)
263    }
264
265    /// Gets a stream of autodiscovery config updates.
266    ///
267    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
268    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
269    pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
270        let mut client = self.secure_client.clone();
271        StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
272    }
273
274    /// Gets a stream of config events.
275    ///
276    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
277    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
278    pub fn stream_config_events(&mut self) -> StreamingResponse<ConfigEvent> {
279        let mut client = self.secure_client.clone();
280        let app_details = saluki_metadata::get_app_details();
281        let formatted_full_name = app_details
282            .full_name()
283            .replace(" ", "-")
284            .replace("_", "-")
285            .to_lowercase();
286        StreamingResponse::from_response_future(async move {
287            client
288                .stream_config_events(ConfigStreamRequest {
289                    name: formatted_full_name,
290                })
291                .await
292        })
293    }
294}
295
296pin_project! {
297    /// A streaming gRPC response.
298    ///
299    /// Compared to the normal streaming response type from [`tonic`], `StreamingResponse` handles a special case where
300    /// servers may not send an initial message that allows the RPC to "establish", which is required to create the
301    /// `Streaming` object that can then be polled. This leads to an issue where calls can effectively appear to block
302    /// until the first message is sent by the server, which is suboptimal.
303    ///
304    /// `StreamingResponse` exposes a unified [`Stream`] implementation that encompasses both the initial RPC
305    /// establishment and subsequent messages sent by the server, to allow for a more seamless experience when working
306    /// with streaming RPCs.
307    #[project = StreamingResponseProj]
308    pub enum StreamingResponse<T> {
309        /// Waiting for the server to stream the first message.
310        Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
311
312        /// Waiting for the server to stream the next message.
313        Streaming { #[pin] stream: Streaming<T> },
314    }
315}
316
317impl<T> StreamingResponse<T> {
318    fn from_response_future<F>(fut: F) -> Self
319    where
320        F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
321    {
322        Self::Initial { inner: Box::pin(fut) }
323    }
324}
325
326impl<T> Stream for StreamingResponse<T> {
327    type Item = Result<T, Status>;
328
329    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
330        loop {
331            let this = self.as_mut().project();
332            let new_state = match this {
333                // When we get the initial response, we either get the streaming object or an error.
334                //
335                // The streaming object itself has to be polled to get an actual message, so we have to do a little
336                // dance here to update our state to the `Streaming` variant when that happens, and then loop so we can
337                // poll the streaming object for a message... but if we got an error, we just yield it like a normal
338                // item on the stream.
339                StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
340                    Ok(response) => {
341                        let stream = response.into_inner();
342                        StreamingResponse::Streaming { stream }
343                    }
344                    Err(status) => return Poll::Ready(Some(Err(status))),
345                },
346                StreamingResponseProj::Streaming { stream } => return stream.poll_next(cx),
347            };
348
349            self.set(new_state);
350        }
351    }
352}
353
354async fn try_query_agent_api(
355    client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
356) -> Result<(), GenericError> {
357    let noop_fetch_request = FetchEntityRequest {
358        id: Some(EntityId {
359            prefix: "container_id".to_string(),
360            uid: "nonexistent".to_string(),
361        }),
362        cardinality: TagCardinality::High.into(),
363    };
364    match client.tagger_fetch_entity(noop_fetch_request).await {
365        Ok(_) => Ok(()),
366        Err(e) => match e.code() {
367            Code::Unauthenticated => Err(generic_error!(
368                "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
369            )),
370            _ => Err(e.into()),
371        },
372    }
373}
374
375fn get_ipc_cert_file_path(ipc_cert_file_path: Option<&PathBuf>, auth_token_file_path: &Path) -> PathBuf {
376    // If the IPC cert file path is set explicitly, we always prefer that.
377    if let Some(path) = ipc_cert_file_path {
378        return path.clone();
379    }
380
381    // Otherwise, we default to the same directory as the auth token file, with the default certificate file name.
382    let mut cert_path = auth_token_file_path
383        .parent()
384        .map(|p| p.to_path_buf())
385        .unwrap_or_else(|| PathBuf::from(DEFAULT_DATADOG_AGENT_CONFIG_DIR));
386
387    cert_path.push(DEFAULT_IPC_CERT_FILE_NAME);
388    cert_path
389}
390
391#[cfg(test)]
392mod tests {
393    use std::path::{Path, PathBuf};
394
395    use super::{
396        default_agent_auth_token_file_path, get_ipc_cert_file_path, DEFAULT_DATADOG_AGENT_CONFIG_DIR,
397        DEFAULT_IPC_CERT_FILE_NAME,
398    };
399
400    #[test]
401    fn ipc_cert_file_path_defaults() {
402        let default_auth_token_path = default_agent_auth_token_file_path();
403        let custom_auth_token_path = PathBuf::from("/secret/auth_token");
404        let invalid_auth_token_path = PathBuf::from("/");
405        let custom_ipc_cert_path = PathBuf::from("/tmp/custom_ipc_cert.pem");
406
407        // When the IPC cert file path is explicitly set, it should be used.
408        let result = get_ipc_cert_file_path(Some(&custom_ipc_cert_path), &default_auth_token_path);
409        assert_eq!(result, custom_ipc_cert_path);
410
411        // When the IPC cert file path is not set, it should default to the same directory as the auth token file using
412        // the default certificate file name.
413        let result = get_ipc_cert_file_path(None, &default_auth_token_path);
414        assert_eq!(result.parent(), default_auth_token_path.as_path().parent());
415        assert_eq!(
416            result.file_name().and_then(|s| s.to_str()),
417            Some(DEFAULT_IPC_CERT_FILE_NAME)
418        );
419
420        // This should hold when using a custom auth token file path as well.
421        let result = get_ipc_cert_file_path(None, &custom_auth_token_path);
422        assert_eq!(result.parent(), custom_auth_token_path.as_path().parent());
423        assert_eq!(
424            result.file_name().and_then(|s| s.to_str()),
425            Some(DEFAULT_IPC_CERT_FILE_NAME)
426        );
427
428        // If the auth token file path is somehow unset or invalid (e.g., no parent directory), we should use the same
429        // logic but with the default Datadog Agent configuration directory.
430        let result = get_ipc_cert_file_path(None, &invalid_auth_token_path);
431        assert_eq!(result.parent(), Some(Path::new(DEFAULT_DATADOG_AGENT_CONFIG_DIR)));
432        assert_eq!(
433            result.file_name().and_then(|s| s.to_str()),
434            Some(DEFAULT_IPC_CERT_FILE_NAME)
435        );
436    }
437}