saluki_env/helpers/remote_agent/
client.rs1use std::{
2 future::Future,
3 path::{Path, PathBuf},
4 pin::Pin,
5 task::{ready, Context, Poll},
6 time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::{
11 AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
12 FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, RegisterRemoteAgentRequest,
13 RegisterRemoteAgentResponse, StreamTagsRequest, StreamTagsResponse, TagCardinality, WorkloadmetaEventType,
14 WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource, WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
15};
16use futures::Stream;
17use pin_project_lite::pin_project;
18use saluki_config::GenericConfiguration;
19use saluki_error::{generic_error, ErrorContext as _, GenericError};
20use saluki_io::net::build_datadog_agent_ipc_https_connector;
21use serde::Deserialize;
22use tonic::{
23 service::interceptor::InterceptedService,
24 transport::{Channel, Endpoint, Uri},
25 Code, Response, Status, Streaming,
26};
27use tracing::warn;
28
29use crate::helpers::tonic::BearerAuthInterceptor;
30
31const DEFAULT_DATADOG_AGENT_CONFIG_DIR: &str = "/etc/datadog-agent";
32const DEFAULT_IPC_CERT_FILE_NAME: &str = "ipc_cert.pem";
33
34fn default_agent_ipc_endpoint() -> Uri {
35 Uri::from_static("https://127.0.0.1:5001")
36}
37
38fn default_agent_auth_token_file_path() -> PathBuf {
39 PathBuf::from("/etc/datadog-agent/auth_token")
40}
41
42const fn default_connect_retry_attempts() -> usize {
43 10
44}
45
46const fn default_connect_retry_backoff() -> Duration {
47 Duration::from_secs(2)
48}
49
50#[derive(Deserialize)]
51struct RemoteAgentClientConfiguration {
52 #[serde(
59 rename = "agent_ipc_endpoint",
60 with = "http_serde_ext::uri",
61 default = "default_agent_ipc_endpoint"
62 )]
63 ipc_endpoint: Uri,
64
65 #[serde(default = "default_agent_auth_token_file_path")]
71 auth_token_file_path: PathBuf,
72
73 #[serde(default)]
82 ipc_cert_file_path: Option<PathBuf>,
83
84 #[serde(default = "default_connect_retry_attempts")]
88 connect_retry_attempts: usize,
89
90 #[serde(default = "default_connect_retry_backoff")]
94 connect_retry_backoff: Duration,
95}
96
97impl BackoffBuilder for &RemoteAgentClientConfiguration {
98 type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
99
100 fn build(self) -> Self::Backoff {
101 ConstantBuilder::default()
102 .with_delay(self.connect_retry_backoff)
103 .with_max_times(self.connect_retry_attempts)
104 .build()
105 }
106}
107
108#[derive(Clone)]
110pub struct RemoteAgentClient {
111 client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
112 secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
113}
114
115impl RemoteAgentClient {
116 pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
123 let mut config = config
124 .as_typed::<RemoteAgentClientConfiguration>()
125 .error_context("Failed to parse configuration for Remote Agent client.")?;
126
127 if config.auth_token_file_path.as_os_str().is_empty() {
129 config.auth_token_file_path = default_agent_auth_token_file_path();
130 }
131 if let Some(ref cert_path) = config.ipc_cert_file_path {
132 if cert_path.as_os_str().is_empty() {
133 config.ipc_cert_file_path = None;
134 }
135 }
136
137 let service_builder = || async {
148 let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
149 let ipc_cert_file_path =
150 get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
151 let https_connector = build_datadog_agent_ipc_https_connector(ipc_cert_file_path).await?;
152 let channel = Endpoint::from(config.ipc_endpoint.clone())
153 .connect_timeout(Duration::from_secs(2))
154 .connect_with_connector(https_connector)
155 .await
156 .with_error_context(|| {
157 format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
158 })?;
159
160 Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
161 };
162
163 let service = service_builder
164 .retry(&config)
165 .notify(|e, delay| {
166 warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
167 })
168 .await
169 .error_context("Failed to create Datadog Agent API client.")?;
170
171 let client = AgentClient::new(service.clone());
172 let mut secure_client = AgentSecureClient::new(service);
173
174 try_query_agent_api(&mut secure_client).await?;
176
177 Ok(Self { client, secure_client })
178 }
179
180 pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
186 let response = self
187 .client
188 .get_hostname(HostnameRequest {})
189 .await
190 .map(|r| r.into_inner())?;
191
192 Ok(response.hostname)
193 }
194
195 pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
200 let mut client = self.secure_client.clone();
201 StreamingResponse::from_response_future(async move {
202 client
203 .tagger_stream_entities(StreamTagsRequest {
204 cardinality: cardinality.into(),
205 ..Default::default()
206 })
207 .await
208 })
209 }
210
211 pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
216 let mut client = self.secure_client.clone();
217 StreamingResponse::from_response_future(async move {
218 client
219 .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
220 filter: Some(WorkloadmetaFilter {
221 kinds: vec![
222 WorkloadmetaKind::Container.into(),
223 WorkloadmetaKind::KubernetesPod.into(),
224 WorkloadmetaKind::EcsTask.into(),
225 ],
226 source: WorkloadmetaSource::All.into(),
227 event_type: WorkloadmetaEventType::EventTypeAll.into(),
228 }),
229 })
230 .await
231 })
232 }
233
234 pub async fn register_remote_agent_request(
240 &mut self, id: &str, display_name: &str, api_endpoint: &str, auth_token: &str,
241 ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
242 let mut client = self.secure_client.clone();
243 let response = client
244 .register_remote_agent(RegisterRemoteAgentRequest {
245 id: id.to_string(),
246 display_name: display_name.to_string(),
247 api_endpoint: api_endpoint.to_string(),
248 auth_token: auth_token.to_string(),
249 })
250 .await?;
251 Ok(response)
252 }
253
254 pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
260 let mut client = self.secure_client.clone();
261 let response = client.get_host_tags(HostTagRequest {}).await?;
262 Ok(response)
263 }
264
265 pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
270 let mut client = self.secure_client.clone();
271 StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
272 }
273
274 pub fn stream_config_events(&mut self) -> StreamingResponse<ConfigEvent> {
279 let mut client = self.secure_client.clone();
280 let app_details = saluki_metadata::get_app_details();
281 let formatted_full_name = app_details
282 .full_name()
283 .replace(" ", "-")
284 .replace("_", "-")
285 .to_lowercase();
286 StreamingResponse::from_response_future(async move {
287 client
288 .stream_config_events(ConfigStreamRequest {
289 name: formatted_full_name,
290 })
291 .await
292 })
293 }
294}
295
296pin_project! {
297 #[project = StreamingResponseProj]
308 pub enum StreamingResponse<T> {
309 Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
311
312 Streaming { #[pin] stream: Streaming<T> },
314 }
315}
316
317impl<T> StreamingResponse<T> {
318 fn from_response_future<F>(fut: F) -> Self
319 where
320 F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
321 {
322 Self::Initial { inner: Box::pin(fut) }
323 }
324}
325
326impl<T> Stream for StreamingResponse<T> {
327 type Item = Result<T, Status>;
328
329 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
330 loop {
331 let this = self.as_mut().project();
332 let new_state = match this {
333 StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
340 Ok(response) => {
341 let stream = response.into_inner();
342 StreamingResponse::Streaming { stream }
343 }
344 Err(status) => return Poll::Ready(Some(Err(status))),
345 },
346 StreamingResponseProj::Streaming { stream } => return stream.poll_next(cx),
347 };
348
349 self.set(new_state);
350 }
351 }
352}
353
354async fn try_query_agent_api(
355 client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
356) -> Result<(), GenericError> {
357 let noop_fetch_request = FetchEntityRequest {
358 id: Some(EntityId {
359 prefix: "container_id".to_string(),
360 uid: "nonexistent".to_string(),
361 }),
362 cardinality: TagCardinality::High.into(),
363 };
364 match client.tagger_fetch_entity(noop_fetch_request).await {
365 Ok(_) => Ok(()),
366 Err(e) => match e.code() {
367 Code::Unauthenticated => Err(generic_error!(
368 "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
369 )),
370 _ => Err(e.into()),
371 },
372 }
373}
374
375fn get_ipc_cert_file_path(ipc_cert_file_path: Option<&PathBuf>, auth_token_file_path: &Path) -> PathBuf {
376 if let Some(path) = ipc_cert_file_path {
378 return path.clone();
379 }
380
381 let mut cert_path = auth_token_file_path
383 .parent()
384 .map(|p| p.to_path_buf())
385 .unwrap_or_else(|| PathBuf::from(DEFAULT_DATADOG_AGENT_CONFIG_DIR));
386
387 cert_path.push(DEFAULT_IPC_CERT_FILE_NAME);
388 cert_path
389}
390
391#[cfg(test)]
392mod tests {
393 use std::path::{Path, PathBuf};
394
395 use super::{
396 default_agent_auth_token_file_path, get_ipc_cert_file_path, DEFAULT_DATADOG_AGENT_CONFIG_DIR,
397 DEFAULT_IPC_CERT_FILE_NAME,
398 };
399
400 #[test]
401 fn ipc_cert_file_path_defaults() {
402 let default_auth_token_path = default_agent_auth_token_file_path();
403 let custom_auth_token_path = PathBuf::from("/secret/auth_token");
404 let invalid_auth_token_path = PathBuf::from("/");
405 let custom_ipc_cert_path = PathBuf::from("/tmp/custom_ipc_cert.pem");
406
407 let result = get_ipc_cert_file_path(Some(&custom_ipc_cert_path), &default_auth_token_path);
409 assert_eq!(result, custom_ipc_cert_path);
410
411 let result = get_ipc_cert_file_path(None, &default_auth_token_path);
414 assert_eq!(result.parent(), default_auth_token_path.as_path().parent());
415 assert_eq!(
416 result.file_name().and_then(|s| s.to_str()),
417 Some(DEFAULT_IPC_CERT_FILE_NAME)
418 );
419
420 let result = get_ipc_cert_file_path(None, &custom_auth_token_path);
422 assert_eq!(result.parent(), custom_auth_token_path.as_path().parent());
423 assert_eq!(
424 result.file_name().and_then(|s| s.to_str()),
425 Some(DEFAULT_IPC_CERT_FILE_NAME)
426 );
427
428 let result = get_ipc_cert_file_path(None, &invalid_auth_token_path);
431 assert_eq!(result.parent(), Some(Path::new(DEFAULT_DATADOG_AGENT_CONFIG_DIR)));
432 assert_eq!(
433 result.file_name().and_then(|s| s.to_str()),
434 Some(DEFAULT_IPC_CERT_FILE_NAME)
435 );
436 }
437}