Skip to main content

saluki_env/helpers/remote_agent/
client.rs

1use std::{
2    future::Future,
3    path::PathBuf,
4    pin::Pin,
5    task::{ready, Context, Poll},
6    time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::v1::{RefreshRemoteAgentRequest, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse};
11use datadog_protos::agent::{
12    AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
13    FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
14    TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
15    WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
16};
17use futures::Stream;
18use pin_project_lite::pin_project;
19use saluki_config::GenericConfiguration;
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use saluki_io::net::{
22    build_datadog_agent_client_ipc_tls_config, client::http::HttpsCapableConnectorBuilder, get_ipc_cert_file_path,
23};
24use serde::Deserialize;
25use tonic::{
26    service::interceptor::InterceptedService,
27    transport::{Channel, Endpoint, Uri},
28    Code, Request, Response, Status, Streaming,
29};
30use tracing::warn;
31
32use super::session::SessionId;
33use crate::helpers::tonic::BearerAuthInterceptor;
34
35fn default_agent_ipc_endpoint() -> Uri {
36    Uri::from_static("https://127.0.0.1:5001")
37}
38
39fn default_agent_auth_token_file_path() -> PathBuf {
40    PathBuf::from("/etc/datadog-agent/auth_token")
41}
42
43const fn default_connect_retry_attempts() -> usize {
44    10
45}
46
47const fn default_connect_retry_backoff() -> Duration {
48    Duration::from_secs(2)
49}
50
51#[derive(Deserialize)]
52struct RemoteAgentClientConfiguration {
53    /// Datadog Agent IPC endpoint to connect to.
54    ///
55    /// This is generally based on the configured `cmd_port` for the Datadog Agent, and must expose the `AgentSecure`
56    /// gRPC service.
57    ///
58    /// Defaults to `https://127.0.0.1:5001`.
59    #[serde(
60        rename = "agent_ipc_endpoint",
61        with = "http_serde_ext::uri",
62        default = "default_agent_ipc_endpoint"
63    )]
64    ipc_endpoint: Uri,
65
66    /// Path to the Agent authentication token file.
67    ///
68    /// The contents of the file are passed as a bearer token in RPC requests to the IPC endpoint.
69    ///
70    /// Defaults to `/etc/datadog-agent/auth_token`.
71    #[serde(default = "default_agent_auth_token_file_path")]
72    auth_token_file_path: PathBuf,
73
74    /// Path to the Agent IPC TLS certificate file.
75    ///
76    /// The file is expected to be PEM-encoded, containing both a certificate and private key. The certificate will be
77    /// used to verify the TLS server certificate presented by the Agent, and the certificate and private key will be
78    /// used together to provide client authentication _to_ the Agent.
79    ///
80    /// Defaults to `ipc_cert.pem` in the same directory as the Agent authentication token file. (e.g., if
81    /// `auth_token_file_path` is `/etc/datadog-agent/auth_token`, this will be `/etc/datadog-agent/ipc_cert.pem`.)
82    #[serde(default)]
83    ipc_cert_file_path: Option<PathBuf>,
84
85    /// Number of allowed retry attempts when initially connecting.
86    ///
87    /// Defaults to `10`.
88    #[serde(default = "default_connect_retry_attempts")]
89    connect_retry_attempts: usize,
90
91    /// Amount of time to wait between connection attempts when initially connecting.
92    ///
93    /// Defaults to 2 seconds.
94    #[serde(default = "default_connect_retry_backoff")]
95    connect_retry_backoff: Duration,
96}
97
98impl BackoffBuilder for &RemoteAgentClientConfiguration {
99    type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
100
101    fn build(self) -> Self::Backoff {
102        ConstantBuilder::default()
103            .with_delay(self.connect_retry_backoff)
104            .with_max_times(self.connect_retry_attempts)
105            .build()
106    }
107}
108
109/// A client for interacting with the Datadog Agent's internal gRPC-based API.
110#[derive(Clone)]
111pub struct RemoteAgentClient {
112    client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
113    secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
114}
115
116impl RemoteAgentClient {
117    /// Creates a new `RemoteAgentClient` from the given configuration.
118    ///
119    /// # Errors
120    ///
121    /// If the Agent gRPC client cannot be created (invalid API endpoint, missing authentication token, etc), or if the
122    /// authentication token is invalid, an error will be returned.
123    pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
124        let mut config = config
125            .as_typed::<RemoteAgentClientConfiguration>()
126            .error_context("Failed to parse configuration for Remote Agent client.")?;
127
128        // The core-agent defaults to the empty string for the auth_token_file_path and ipc_cert_file_path. We need to handle this by resetting to the defaults.
129        if config.auth_token_file_path.as_os_str().is_empty() {
130            config.auth_token_file_path = default_agent_auth_token_file_path();
131        }
132        if let Some(ref cert_path) = config.ipc_cert_file_path {
133            if cert_path.as_os_str().is_empty() {
134                config.ipc_cert_file_path = None;
135            }
136        }
137
138        // TODO: We need to write a Tower middleware service that allows applying a backoff between failed calls,
139        // specifically so that we can throttle reconnection attempts.
140        //
141        // When the remote Agent endpoint is not available -- Agent isn't running, etc -- the gRPC client will
142        // essentially freewheel, trying to reconnect as quickly as possible, which spams the logs, wastes resources, so
143        // on and so forth. We would want to essentially apply a backoff like any other client would for the RPC calls
144        // themselves, but use it with the _connector_ instead.
145        //
146        // We could potentially just use a retry middleware, but Tonic does have its own reconnection logic, so we'd
147        // have to test it out to make sure it behaves sensibly.
148        let service_builder = || async {
149            let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
150            let ipc_cert_file_path =
151                get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
152            let client_tls_config = build_datadog_agent_client_ipc_tls_config(ipc_cert_file_path).await?;
153            let https_connector = HttpsCapableConnectorBuilder::default().build(client_tls_config)?;
154            let channel = Endpoint::from(config.ipc_endpoint.clone())
155                .connect_timeout(Duration::from_secs(2))
156                .connect_with_connector(https_connector)
157                .await
158                .with_error_context(|| {
159                    format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
160                })?;
161
162            Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
163        };
164
165        let service = service_builder
166            .retry(&config)
167            .notify(|e, delay| {
168                warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
169            })
170            .await
171            .error_context("Failed to create Datadog Agent API client.")?;
172
173        let client = AgentClient::new(service.clone());
174        let mut secure_client = AgentSecureClient::new(service);
175
176        // Try and do a basic health check to make sure we can connect and that our authentication token is valid.
177        try_query_agent_api(&mut secure_client).await?;
178
179        Ok(Self { client, secure_client })
180    }
181
182    /// Gets the detected hostname from the Agent.
183    ///
184    /// # Errors
185    ///
186    /// If there is an error querying the Agent API, an error will be returned.
187    pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
188        let response = self
189            .client
190            .get_hostname(HostnameRequest {})
191            .await
192            .map(|r| r.into_inner())?;
193
194        Ok(response.hostname)
195    }
196
197    /// Gets a stream of tagger entities at the given cardinality.
198    ///
199    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
200    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
201    pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
202        let mut client = self.secure_client.clone();
203        StreamingResponse::from_response_future(async move {
204            client
205                .tagger_stream_entities(StreamTagsRequest {
206                    cardinality: cardinality.into(),
207                    ..Default::default()
208                })
209                .await
210        })
211    }
212
213    /// Gets a stream of all workloadmeta entities.
214    ///
215    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
216    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
217    pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
218        let mut client = self.secure_client.clone();
219        StreamingResponse::from_response_future(async move {
220            client
221                .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
222                    filter: Some(WorkloadmetaFilter {
223                        kinds: vec![
224                            WorkloadmetaKind::Container.into(),
225                            WorkloadmetaKind::KubernetesPod.into(),
226                            WorkloadmetaKind::EcsTask.into(),
227                        ],
228                        source: WorkloadmetaSource::All.into(),
229                        event_type: WorkloadmetaEventType::EventTypeAll.into(),
230                    }),
231                })
232                .await
233        })
234    }
235
236    /// Registers a Remote Agent with the Agent.
237    ///
238    /// # Errors
239    ///
240    /// If there is an error sending the request to the Agent API, an error will be returned.
241    pub async fn register_remote_agent_request(
242        &mut self, pid: u32, display_name: &str, api_endpoint: &str, services: Vec<String>,
243    ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
244        let mut client = self.secure_client.clone();
245        let response = client
246            .register_remote_agent(RegisterRemoteAgentRequest {
247                pid: pid.to_string(),
248                flavor: "agent-data-plane".to_string(),
249                display_name: display_name.to_string(),
250                api_endpoint_uri: api_endpoint.to_string(),
251                services,
252            })
253            .await?;
254        Ok(response)
255    }
256
257    /// Refreshes the given remote agent session with the Agent.
258    ///
259    /// # Errors
260    ///
261    /// If there is an error sending the request to the Agent API, an error will be returned.
262    pub async fn refresh_remote_agent_request(&mut self, session_id: &SessionId) -> Result<Response<()>, GenericError> {
263        let mut client = self.secure_client.clone();
264        let response = client
265            .refresh_remote_agent(RefreshRemoteAgentRequest {
266                session_id: session_id.to_string(),
267            })
268            .await?
269            .map(|_| ());
270        Ok(response)
271    }
272
273    /// Gets the host tags from the Agent.
274    ///
275    /// # Errors
276    ///
277    /// If there is an error querying the Agent API, an error will be returned.
278    pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
279        let mut client = self.secure_client.clone();
280        let response = client.get_host_tags(HostTagRequest {}).await?;
281        Ok(response)
282    }
283
284    /// Gets a stream of autodiscovery config updates.
285    ///
286    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
287    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
288    pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
289        let mut client = self.secure_client.clone();
290        StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
291    }
292
293    /// Gets a stream of config events.
294    ///
295    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
296    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
297    pub fn stream_config_events(&mut self, session_id: &SessionId) -> StreamingResponse<ConfigEvent> {
298        let mut client = self.secure_client.clone();
299        let app_details = saluki_metadata::get_app_details();
300        let formatted_full_name = app_details
301            .full_name()
302            .replace(" ", "-")
303            .replace("_", "-")
304            .to_lowercase();
305
306        let mut request = Request::new(ConfigStreamRequest {
307            name: formatted_full_name,
308        });
309
310        request
311            .metadata_mut()
312            .insert("session_id", session_id.to_grpc_header_value());
313
314        StreamingResponse::from_response_future(async move { client.stream_config_events(request).await })
315    }
316}
317
318pin_project! {
319    /// A streaming gRPC response.
320    ///
321    /// Compared to the normal streaming response type from [`tonic`], `StreamingResponse` handles a special case where
322    /// servers may not send an initial message that allows the RPC to "establish", which is required to create the
323    /// `Streaming` object that can then be polled. This leads to an issue where calls can effectively appear to block
324    /// until the first message is sent by the server, which is suboptimal.
325    ///
326    /// `StreamingResponse` exposes a unified [`Stream`] implementation that encompasses both the initial RPC
327    /// establishment and subsequent messages sent by the server, to allow for a more seamless experience when working
328    /// with streaming RPCs.
329    #[project = StreamingResponseProj]
330    pub enum StreamingResponse<T> {
331        /// Waiting for the server to stream the first message.
332        Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
333
334        /// Waiting for the server to stream the next message.
335        Streaming { #[pin] stream: Streaming<T> },
336    }
337}
338
339impl<T> StreamingResponse<T> {
340    fn from_response_future<F>(fut: F) -> Self
341    where
342        F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
343    {
344        Self::Initial { inner: Box::pin(fut) }
345    }
346}
347
348impl<T> Stream for StreamingResponse<T> {
349    type Item = Result<T, Status>;
350
351    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352        loop {
353            let this = self.as_mut().project();
354            let new_state = match this {
355                // When we get the initial response, we either get the streaming object or an error.
356                //
357                // The streaming object itself has to be polled to get an actual message, so we have to do a little
358                // dance here to update our state to the `Streaming` variant when that happens, and then loop so we can
359                // poll the streaming object for a message... but if we got an error, we just yield it like a normal
360                // item on the stream.
361                StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
362                    Ok(response) => {
363                        let stream = response.into_inner();
364                        StreamingResponse::Streaming { stream }
365                    }
366                    Err(status) => return Poll::Ready(Some(Err(status))),
367                },
368                StreamingResponseProj::Streaming { stream } => return stream.poll_next(cx),
369            };
370
371            self.set(new_state);
372        }
373    }
374}
375
376async fn try_query_agent_api(
377    client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
378) -> Result<(), GenericError> {
379    let noop_fetch_request = FetchEntityRequest {
380        id: Some(EntityId {
381            prefix: "container_id".to_string(),
382            uid: "nonexistent".to_string(),
383        }),
384        cardinality: TagCardinality::High.into(),
385    };
386    match client.tagger_fetch_entity(noop_fetch_request).await {
387        Ok(_) => Ok(()),
388        Err(e) => match e.code() {
389            Code::Unauthenticated => Err(generic_error!(
390                "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
391            )),
392            _ => Err(e.into()),
393        },
394    }
395}