saluki_env/helpers/remote_agent/
client.rs

1use std::{
2    future::Future,
3    path::PathBuf,
4    pin::Pin,
5    task::{ready, Context, Poll},
6    time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::v1::{
11    RefreshRemoteAgentRequest, RefreshRemoteAgentResponse, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse,
12};
13use datadog_protos::agent::{
14    AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
15    FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
16    TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
17    WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
18};
19use futures::Stream;
20use pin_project_lite::pin_project;
21use saluki_config::GenericConfiguration;
22use saluki_error::{generic_error, ErrorContext as _, GenericError};
23use saluki_io::net::{
24    build_datadog_agent_client_ipc_tls_config, client::http::HttpsCapableConnectorBuilder, get_ipc_cert_file_path,
25};
26use serde::Deserialize;
27use tonic::{
28    service::interceptor::InterceptedService,
29    transport::{Channel, Endpoint, Uri},
30    Code, Response, Status, Streaming,
31};
32use tracing::warn;
33
34use crate::helpers::tonic::BearerAuthInterceptor;
35
36fn default_agent_ipc_endpoint() -> Uri {
37    Uri::from_static("https://127.0.0.1:5001")
38}
39
40fn default_agent_auth_token_file_path() -> PathBuf {
41    PathBuf::from("/etc/datadog-agent/auth_token")
42}
43
44const fn default_connect_retry_attempts() -> usize {
45    10
46}
47
48const fn default_connect_retry_backoff() -> Duration {
49    Duration::from_secs(2)
50}
51
52#[derive(Deserialize)]
53struct RemoteAgentClientConfiguration {
54    /// Datadog Agent IPC endpoint to connect to.
55    ///
56    /// This is generally based on the configured `cmd_port` for the Datadog Agent, and must expose the `AgentSecure`
57    /// gRPC service.
58    ///
59    /// Defaults to `https://127.0.0.1:5001`.
60    #[serde(
61        rename = "agent_ipc_endpoint",
62        with = "http_serde_ext::uri",
63        default = "default_agent_ipc_endpoint"
64    )]
65    ipc_endpoint: Uri,
66
67    /// Path to the Agent authentication token file.
68    ///
69    /// The contents of the file are passed as a bearer token in RPC requests to the IPC endpoint.
70    ///
71    /// Defaults to `/etc/datadog-agent/auth_token`.
72    #[serde(default = "default_agent_auth_token_file_path")]
73    auth_token_file_path: PathBuf,
74
75    /// Path to the Agent IPC TLS certificate file.
76    ///
77    /// The file is expected to be PEM-encoded, containing both a certificate and private key. The certificate will be
78    /// used to verify the TLS server certificate presented by the Agent, and the certificate and private key will be
79    /// used together to provide client authentication _to_ the Agent.
80    ///
81    /// Defaults to `ipc_cert.pem` in the same directory as the Agent authentication token file. (e.g., if
82    /// `auth_token_file_path` is `/etc/datadog-agent/auth_token`, this will be `/etc/datadog-agent/ipc_cert.pem`.)
83    #[serde(default)]
84    ipc_cert_file_path: Option<PathBuf>,
85
86    /// Number of allowed retry attempts when initially connecting.
87    ///
88    /// Defaults to `10`.
89    #[serde(default = "default_connect_retry_attempts")]
90    connect_retry_attempts: usize,
91
92    /// Amount of time to wait between connection attempts when initially connecting.
93    ///
94    /// Defaults to 2 seconds.
95    #[serde(default = "default_connect_retry_backoff")]
96    connect_retry_backoff: Duration,
97}
98
99impl BackoffBuilder for &RemoteAgentClientConfiguration {
100    type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
101
102    fn build(self) -> Self::Backoff {
103        ConstantBuilder::default()
104            .with_delay(self.connect_retry_backoff)
105            .with_max_times(self.connect_retry_attempts)
106            .build()
107    }
108}
109
110/// A client for interacting with the Datadog Agent's internal gRPC-based API.
111#[derive(Clone)]
112pub struct RemoteAgentClient {
113    client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
114    secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
115}
116
117impl RemoteAgentClient {
118    /// Creates a new `RemoteAgentClient` from the given configuration.
119    ///
120    /// # Errors
121    ///
122    /// If the Agent gRPC client cannot be created (invalid API endpoint, missing authentication token, etc), or if the
123    /// authentication token is invalid, an error will be returned.
124    pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
125        let mut config = config
126            .as_typed::<RemoteAgentClientConfiguration>()
127            .error_context("Failed to parse configuration for Remote Agent client.")?;
128
129        // The core-agent defaults to the empty string for the auth_token_file_path and ipc_cert_file_path. We need to handle this by resetting to the defaults.
130        if config.auth_token_file_path.as_os_str().is_empty() {
131            config.auth_token_file_path = default_agent_auth_token_file_path();
132        }
133        if let Some(ref cert_path) = config.ipc_cert_file_path {
134            if cert_path.as_os_str().is_empty() {
135                config.ipc_cert_file_path = None;
136            }
137        }
138
139        // TODO: We need to write a Tower middleware service that allows applying a backoff between failed calls,
140        // specifically so that we can throttle reconnection attempts.
141        //
142        // When the remote Agent endpoint is not available -- Agent isn't running, etc -- the gRPC client will
143        // essentially freewheel, trying to reconnect as quickly as possible, which spams the logs, wastes resources, so
144        // on and so forth. We would want to essentially apply a backoff like any other client would for the RPC calls
145        // themselves, but use it with the _connector_ instead.
146        //
147        // We could potentially just use a retry middleware, but Tonic does have its own reconnection logic, so we'd
148        // have to test it out to make sure it behaves sensibly.
149        let service_builder = || async {
150            let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
151            let ipc_cert_file_path =
152                get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
153            let client_tls_config = build_datadog_agent_client_ipc_tls_config(ipc_cert_file_path).await?;
154            let https_connector = HttpsCapableConnectorBuilder::default().build(client_tls_config)?;
155            let channel = Endpoint::from(config.ipc_endpoint.clone())
156                .connect_timeout(Duration::from_secs(2))
157                .connect_with_connector(https_connector)
158                .await
159                .with_error_context(|| {
160                    format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
161                })?;
162
163            Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
164        };
165
166        let service = service_builder
167            .retry(&config)
168            .notify(|e, delay| {
169                warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
170            })
171            .await
172            .error_context("Failed to create Datadog Agent API client.")?;
173
174        let client = AgentClient::new(service.clone());
175        let mut secure_client = AgentSecureClient::new(service);
176
177        // Try and do a basic health check to make sure we can connect and that our authentication token is valid.
178        try_query_agent_api(&mut secure_client).await?;
179
180        Ok(Self { client, secure_client })
181    }
182
183    /// Gets the detected hostname from the Agent.
184    ///
185    /// # Errors
186    ///
187    /// If there is an error querying the Agent API, an error will be returned.
188    pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
189        let response = self
190            .client
191            .get_hostname(HostnameRequest {})
192            .await
193            .map(|r| r.into_inner())?;
194
195        Ok(response.hostname)
196    }
197
198    /// Gets a stream of tagger entities at the given cardinality.
199    ///
200    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
201    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
202    pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
203        let mut client = self.secure_client.clone();
204        StreamingResponse::from_response_future(async move {
205            client
206                .tagger_stream_entities(StreamTagsRequest {
207                    cardinality: cardinality.into(),
208                    ..Default::default()
209                })
210                .await
211        })
212    }
213
214    /// Gets a stream of all workloadmeta entities.
215    ///
216    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
217    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
218    pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
219        let mut client = self.secure_client.clone();
220        StreamingResponse::from_response_future(async move {
221            client
222                .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
223                    filter: Some(WorkloadmetaFilter {
224                        kinds: vec![
225                            WorkloadmetaKind::Container.into(),
226                            WorkloadmetaKind::KubernetesPod.into(),
227                            WorkloadmetaKind::EcsTask.into(),
228                        ],
229                        source: WorkloadmetaSource::All.into(),
230                        event_type: WorkloadmetaEventType::EventTypeAll.into(),
231                    }),
232                })
233                .await
234        })
235    }
236
237    /// Registers a Remote Agent with the Agent.
238    ///
239    /// # Errors
240    ///
241    /// If there is an error sending the request to the Agent API, an error will be returned.
242    pub async fn register_remote_agent_request(
243        &mut self, pid: u32, display_name: &str, api_endpoint: &str, services: Vec<String>,
244    ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
245        let mut client = self.secure_client.clone();
246        let response = client
247            .register_remote_agent(RegisterRemoteAgentRequest {
248                pid: pid.to_string(),
249                flavor: "agent-data-plane".to_string(),
250                display_name: display_name.to_string(),
251                api_endpoint_uri: api_endpoint.to_string(),
252                services,
253            })
254            .await?;
255        Ok(response)
256    }
257
258    /// Refreshes a Remote Agent with the Agent.
259    ///
260    /// If there is an error sending the request to the Agent API, an error will be returned.
261    pub async fn refresh_remote_agent_request(
262        &mut self, session_id: &str,
263    ) -> Result<Response<RefreshRemoteAgentResponse>, GenericError> {
264        let mut client = self.secure_client.clone();
265        let response = client
266            .refresh_remote_agent(RefreshRemoteAgentRequest {
267                session_id: session_id.to_string(),
268            })
269            .await?;
270        Ok(response)
271    }
272
273    /// Gets the host tags from the Agent.
274    ///
275    /// # Errors
276    ///
277    /// If there is an error querying the Agent API, an error will be returned.
278    pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
279        let mut client = self.secure_client.clone();
280        let response = client.get_host_tags(HostTagRequest {}).await?;
281        Ok(response)
282    }
283
284    /// Gets a stream of autodiscovery config updates.
285    ///
286    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
287    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
288    pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
289        let mut client = self.secure_client.clone();
290        StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
291    }
292
293    /// Gets a stream of config events.
294    ///
295    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
296    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
297    pub fn stream_config_events(&mut self) -> StreamingResponse<ConfigEvent> {
298        let mut client = self.secure_client.clone();
299        let app_details = saluki_metadata::get_app_details();
300        let formatted_full_name = app_details
301            .full_name()
302            .replace(" ", "-")
303            .replace("_", "-")
304            .to_lowercase();
305        StreamingResponse::from_response_future(async move {
306            client
307                .stream_config_events(ConfigStreamRequest {
308                    name: formatted_full_name,
309                })
310                .await
311        })
312    }
313}
314
315pin_project! {
316    /// A streaming gRPC response.
317    ///
318    /// Compared to the normal streaming response type from [`tonic`], `StreamingResponse` handles a special case where
319    /// servers may not send an initial message that allows the RPC to "establish", which is required to create the
320    /// `Streaming` object that can then be polled. This leads to an issue where calls can effectively appear to block
321    /// until the first message is sent by the server, which is suboptimal.
322    ///
323    /// `StreamingResponse` exposes a unified [`Stream`] implementation that encompasses both the initial RPC
324    /// establishment and subsequent messages sent by the server, to allow for a more seamless experience when working
325    /// with streaming RPCs.
326    #[project = StreamingResponseProj]
327    pub enum StreamingResponse<T> {
328        /// Waiting for the server to stream the first message.
329        Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
330
331        /// Waiting for the server to stream the next message.
332        Streaming { #[pin] stream: Streaming<T> },
333    }
334}
335
336impl<T> StreamingResponse<T> {
337    fn from_response_future<F>(fut: F) -> Self
338    where
339        F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
340    {
341        Self::Initial { inner: Box::pin(fut) }
342    }
343}
344
345impl<T> Stream for StreamingResponse<T> {
346    type Item = Result<T, Status>;
347
348    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
349        loop {
350            let this = self.as_mut().project();
351            let new_state = match this {
352                // When we get the initial response, we either get the streaming object or an error.
353                //
354                // The streaming object itself has to be polled to get an actual message, so we have to do a little
355                // dance here to update our state to the `Streaming` variant when that happens, and then loop so we can
356                // poll the streaming object for a message... but if we got an error, we just yield it like a normal
357                // item on the stream.
358                StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
359                    Ok(response) => {
360                        let stream = response.into_inner();
361                        StreamingResponse::Streaming { stream }
362                    }
363                    Err(status) => return Poll::Ready(Some(Err(status))),
364                },
365                StreamingResponseProj::Streaming { stream } => return stream.poll_next(cx),
366            };
367
368            self.set(new_state);
369        }
370    }
371}
372
373async fn try_query_agent_api(
374    client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
375) -> Result<(), GenericError> {
376    let noop_fetch_request = FetchEntityRequest {
377        id: Some(EntityId {
378            prefix: "container_id".to_string(),
379            uid: "nonexistent".to_string(),
380        }),
381        cardinality: TagCardinality::High.into(),
382    };
383    match client.tagger_fetch_entity(noop_fetch_request).await {
384        Ok(_) => Ok(()),
385        Err(e) => match e.code() {
386            Code::Unauthenticated => Err(generic_error!(
387                "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
388            )),
389            _ => Err(e.into()),
390        },
391    }
392}