saluki_env/helpers/remote_agent/
client.rs1use std::{
2 future::Future,
3 path::PathBuf,
4 pin::Pin,
5 task::{ready, Context, Poll},
6 time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::v1::{
11 RefreshRemoteAgentRequest, RefreshRemoteAgentResponse, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse,
12};
13use datadog_protos::agent::{
14 AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
15 FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
16 TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
17 WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
18};
19use futures::Stream;
20use pin_project_lite::pin_project;
21use saluki_config::GenericConfiguration;
22use saluki_error::{generic_error, ErrorContext as _, GenericError};
23use saluki_io::net::{build_datadog_agent_ipc_https_connector, get_ipc_cert_file_path};
24use serde::Deserialize;
25use tonic::{
26 service::interceptor::InterceptedService,
27 transport::{Channel, Endpoint, Uri},
28 Code, Response, Status, Streaming,
29};
30use tracing::warn;
31
32use crate::helpers::tonic::BearerAuthInterceptor;
33
34fn default_agent_ipc_endpoint() -> Uri {
35 Uri::from_static("https://127.0.0.1:5001")
36}
37
38fn default_agent_auth_token_file_path() -> PathBuf {
39 PathBuf::from("/etc/datadog-agent/auth_token")
40}
41
42const fn default_connect_retry_attempts() -> usize {
43 10
44}
45
46const fn default_connect_retry_backoff() -> Duration {
47 Duration::from_secs(2)
48}
49
50#[derive(Deserialize)]
51struct RemoteAgentClientConfiguration {
52 #[serde(
59 rename = "agent_ipc_endpoint",
60 with = "http_serde_ext::uri",
61 default = "default_agent_ipc_endpoint"
62 )]
63 ipc_endpoint: Uri,
64
65 #[serde(default = "default_agent_auth_token_file_path")]
71 auth_token_file_path: PathBuf,
72
73 #[serde(default)]
82 ipc_cert_file_path: Option<PathBuf>,
83
84 #[serde(default = "default_connect_retry_attempts")]
88 connect_retry_attempts: usize,
89
90 #[serde(default = "default_connect_retry_backoff")]
94 connect_retry_backoff: Duration,
95}
96
97impl BackoffBuilder for &RemoteAgentClientConfiguration {
98 type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
99
100 fn build(self) -> Self::Backoff {
101 ConstantBuilder::default()
102 .with_delay(self.connect_retry_backoff)
103 .with_max_times(self.connect_retry_attempts)
104 .build()
105 }
106}
107
108#[derive(Clone)]
110pub struct RemoteAgentClient {
111 client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
112 secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
113}
114
115impl RemoteAgentClient {
116 pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
123 let mut config = config
124 .as_typed::<RemoteAgentClientConfiguration>()
125 .error_context("Failed to parse configuration for Remote Agent client.")?;
126
127 if config.auth_token_file_path.as_os_str().is_empty() {
129 config.auth_token_file_path = default_agent_auth_token_file_path();
130 }
131 if let Some(ref cert_path) = config.ipc_cert_file_path {
132 if cert_path.as_os_str().is_empty() {
133 config.ipc_cert_file_path = None;
134 }
135 }
136
137 let service_builder = || async {
148 let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
149 let ipc_cert_file_path =
150 get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
151 let https_connector = build_datadog_agent_ipc_https_connector(ipc_cert_file_path).await?;
152 let channel = Endpoint::from(config.ipc_endpoint.clone())
153 .connect_timeout(Duration::from_secs(2))
154 .connect_with_connector(https_connector)
155 .await
156 .with_error_context(|| {
157 format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
158 })?;
159
160 Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
161 };
162
163 let service = service_builder
164 .retry(&config)
165 .notify(|e, delay| {
166 warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
167 })
168 .await
169 .error_context("Failed to create Datadog Agent API client.")?;
170
171 let client = AgentClient::new(service.clone());
172 let mut secure_client = AgentSecureClient::new(service);
173
174 try_query_agent_api(&mut secure_client).await?;
176
177 Ok(Self { client, secure_client })
178 }
179
180 pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
186 let response = self
187 .client
188 .get_hostname(HostnameRequest {})
189 .await
190 .map(|r| r.into_inner())?;
191
192 Ok(response.hostname)
193 }
194
195 pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
200 let mut client = self.secure_client.clone();
201 StreamingResponse::from_response_future(async move {
202 client
203 .tagger_stream_entities(StreamTagsRequest {
204 cardinality: cardinality.into(),
205 ..Default::default()
206 })
207 .await
208 })
209 }
210
211 pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
216 let mut client = self.secure_client.clone();
217 StreamingResponse::from_response_future(async move {
218 client
219 .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
220 filter: Some(WorkloadmetaFilter {
221 kinds: vec![
222 WorkloadmetaKind::Container.into(),
223 WorkloadmetaKind::KubernetesPod.into(),
224 WorkloadmetaKind::EcsTask.into(),
225 ],
226 source: WorkloadmetaSource::All.into(),
227 event_type: WorkloadmetaEventType::EventTypeAll.into(),
228 }),
229 })
230 .await
231 })
232 }
233
234 pub async fn register_remote_agent_request(
240 &mut self, pid: u32, display_name: &str, api_endpoint: &str, services: Vec<String>,
241 ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
242 let mut client = self.secure_client.clone();
243 let response = client
244 .register_remote_agent(RegisterRemoteAgentRequest {
245 pid: pid.to_string(),
246 flavor: "agent-data-plane".to_string(),
247 display_name: display_name.to_string(),
248 api_endpoint_uri: api_endpoint.to_string(),
249 services,
250 })
251 .await?;
252 Ok(response)
253 }
254
255 pub async fn refresh_remote_agent_request(
259 &mut self, session_id: &str,
260 ) -> Result<Response<RefreshRemoteAgentResponse>, GenericError> {
261 let mut client = self.secure_client.clone();
262 let response = client
263 .refresh_remote_agent(RefreshRemoteAgentRequest {
264 session_id: session_id.to_string(),
265 })
266 .await?;
267 Ok(response)
268 }
269
270 pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
276 let mut client = self.secure_client.clone();
277 let response = client.get_host_tags(HostTagRequest {}).await?;
278 Ok(response)
279 }
280
281 pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
286 let mut client = self.secure_client.clone();
287 StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
288 }
289
290 pub fn stream_config_events(&mut self) -> StreamingResponse<ConfigEvent> {
295 let mut client = self.secure_client.clone();
296 let app_details = saluki_metadata::get_app_details();
297 let formatted_full_name = app_details
298 .full_name()
299 .replace(" ", "-")
300 .replace("_", "-")
301 .to_lowercase();
302 StreamingResponse::from_response_future(async move {
303 client
304 .stream_config_events(ConfigStreamRequest {
305 name: formatted_full_name,
306 })
307 .await
308 })
309 }
310}
311
312pin_project! {
313 #[project = StreamingResponseProj]
324 pub enum StreamingResponse<T> {
325 Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
327
328 Streaming { #[pin] stream: Streaming<T> },
330 }
331}
332
333impl<T> StreamingResponse<T> {
334 fn from_response_future<F>(fut: F) -> Self
335 where
336 F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
337 {
338 Self::Initial { inner: Box::pin(fut) }
339 }
340}
341
342impl<T> Stream for StreamingResponse<T> {
343 type Item = Result<T, Status>;
344
345 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
346 loop {
347 let this = self.as_mut().project();
348 let new_state = match this {
349 StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
356 Ok(response) => {
357 let stream = response.into_inner();
358 StreamingResponse::Streaming { stream }
359 }
360 Err(status) => return Poll::Ready(Some(Err(status))),
361 },
362 StreamingResponseProj::Streaming { stream } => return stream.poll_next(cx),
363 };
364
365 self.set(new_state);
366 }
367 }
368}
369
370async fn try_query_agent_api(
371 client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
372) -> Result<(), GenericError> {
373 let noop_fetch_request = FetchEntityRequest {
374 id: Some(EntityId {
375 prefix: "container_id".to_string(),
376 uid: "nonexistent".to_string(),
377 }),
378 cardinality: TagCardinality::High.into(),
379 };
380 match client.tagger_fetch_entity(noop_fetch_request).await {
381 Ok(_) => Ok(()),
382 Err(e) => match e.code() {
383 Code::Unauthenticated => Err(generic_error!(
384 "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
385 )),
386 _ => Err(e.into()),
387 },
388 }
389}