saluki_env/helpers/remote_agent/
client.rs1use std::{
2 future::Future,
3 path::PathBuf,
4 pin::Pin,
5 task::{ready, Context, Poll},
6 time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::v1::{RefreshRemoteAgentRequest, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse};
11use datadog_protos::agent::{
12 AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
13 FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
14 TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
15 WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
16};
17use futures::Stream;
18use pin_project_lite::pin_project;
19use saluki_config::GenericConfiguration;
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use saluki_io::net::{
22 build_datadog_agent_client_ipc_tls_config, client::http::HttpsCapableConnectorBuilder, get_ipc_cert_file_path,
23};
24use serde::Deserialize;
25use tonic::{
26 service::interceptor::InterceptedService,
27 transport::{Channel, Endpoint, Uri},
28 Code, Request, Response, Status, Streaming,
29};
30use tracing::warn;
31
32use super::session::SessionId;
33use crate::helpers::tonic::BearerAuthInterceptor;
34
35fn default_agent_ipc_endpoint() -> Uri {
36 Uri::from_static("https://127.0.0.1:5001")
37}
38
39fn default_agent_auth_token_file_path() -> PathBuf {
40 PathBuf::from("/etc/datadog-agent/auth_token")
41}
42
43const fn default_connect_retry_attempts() -> usize {
44 10
45}
46
47const fn default_connect_retry_backoff() -> Duration {
48 Duration::from_secs(2)
49}
50
51#[derive(Deserialize)]
52struct RemoteAgentClientConfiguration {
53 #[serde(
60 rename = "agent_ipc_endpoint",
61 with = "http_serde_ext::uri",
62 default = "default_agent_ipc_endpoint"
63 )]
64 ipc_endpoint: Uri,
65
66 #[serde(default = "default_agent_auth_token_file_path")]
72 auth_token_file_path: PathBuf,
73
74 #[serde(default)]
83 ipc_cert_file_path: Option<PathBuf>,
84
85 #[serde(default = "default_connect_retry_attempts")]
89 connect_retry_attempts: usize,
90
91 #[serde(default = "default_connect_retry_backoff")]
95 connect_retry_backoff: Duration,
96}
97
98impl BackoffBuilder for &RemoteAgentClientConfiguration {
99 type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
100
101 fn build(self) -> Self::Backoff {
102 ConstantBuilder::default()
103 .with_delay(self.connect_retry_backoff)
104 .with_max_times(self.connect_retry_attempts)
105 .build()
106 }
107}
108
109#[derive(Clone)]
111pub struct RemoteAgentClient {
112 client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
113 secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
114}
115
116impl RemoteAgentClient {
117 pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
124 let mut config = config
125 .as_typed::<RemoteAgentClientConfiguration>()
126 .error_context("Failed to parse configuration for Remote Agent client.")?;
127
128 if config.auth_token_file_path.as_os_str().is_empty() {
130 config.auth_token_file_path = default_agent_auth_token_file_path();
131 }
132 if let Some(ref cert_path) = config.ipc_cert_file_path {
133 if cert_path.as_os_str().is_empty() {
134 config.ipc_cert_file_path = None;
135 }
136 }
137
138 let service_builder = || async {
149 let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
150 let ipc_cert_file_path =
151 get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
152 let client_tls_config = build_datadog_agent_client_ipc_tls_config(ipc_cert_file_path).await?;
153 let https_connector = HttpsCapableConnectorBuilder::default().build(client_tls_config)?;
154 let channel = Endpoint::from(config.ipc_endpoint.clone())
155 .connect_timeout(Duration::from_secs(2))
156 .connect_with_connector(https_connector)
157 .await
158 .with_error_context(|| {
159 format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
160 })?;
161
162 Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
163 };
164
165 let service = service_builder
166 .retry(&config)
167 .notify(|e, delay| {
168 warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
169 })
170 .await
171 .error_context("Failed to create Datadog Agent API client.")?;
172
173 let client = AgentClient::new(service.clone());
174 let mut secure_client = AgentSecureClient::new(service);
175
176 try_query_agent_api(&mut secure_client).await?;
178
179 Ok(Self { client, secure_client })
180 }
181
182 pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
188 let response = self
189 .client
190 .get_hostname(HostnameRequest {})
191 .await
192 .map(|r| r.into_inner())?;
193
194 Ok(response.hostname)
195 }
196
197 pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
202 let mut client = self.secure_client.clone();
203 StreamingResponse::from_response_future(async move {
204 client
205 .tagger_stream_entities(StreamTagsRequest {
206 cardinality: cardinality.into(),
207 ..Default::default()
208 })
209 .await
210 })
211 }
212
213 pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
218 let mut client = self.secure_client.clone();
219 StreamingResponse::from_response_future(async move {
220 client
221 .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
222 filter: Some(WorkloadmetaFilter {
223 kinds: vec![
224 WorkloadmetaKind::Container.into(),
225 WorkloadmetaKind::KubernetesPod.into(),
226 WorkloadmetaKind::EcsTask.into(),
227 ],
228 source: WorkloadmetaSource::All.into(),
229 event_type: WorkloadmetaEventType::EventTypeAll.into(),
230 }),
231 })
232 .await
233 })
234 }
235
236 pub async fn register_remote_agent_request(
242 &mut self, pid: u32, display_name: &str, api_endpoint: &str, services: Vec<String>,
243 ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
244 let mut client = self.secure_client.clone();
245 let response = client
246 .register_remote_agent(RegisterRemoteAgentRequest {
247 pid: pid.to_string(),
248 flavor: "agent-data-plane".to_string(),
249 display_name: display_name.to_string(),
250 api_endpoint_uri: api_endpoint.to_string(),
251 services,
252 })
253 .await?;
254 Ok(response)
255 }
256
257 pub async fn refresh_remote_agent_request(&mut self, session_id: &SessionId) -> Result<Response<()>, GenericError> {
263 let mut client = self.secure_client.clone();
264 let response = client
265 .refresh_remote_agent(RefreshRemoteAgentRequest {
266 session_id: session_id.to_string(),
267 })
268 .await?
269 .map(|_| ());
270 Ok(response)
271 }
272
273 pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
279 let mut client = self.secure_client.clone();
280 let response = client.get_host_tags(HostTagRequest {}).await?;
281 Ok(response)
282 }
283
284 pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
289 let mut client = self.secure_client.clone();
290 StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
291 }
292
293 pub fn stream_config_events(&mut self, session_id: &SessionId) -> StreamingResponse<ConfigEvent> {
298 let mut client = self.secure_client.clone();
299 let app_details = saluki_metadata::get_app_details();
300 let formatted_full_name = app_details
301 .full_name()
302 .replace(" ", "-")
303 .replace("_", "-")
304 .to_lowercase();
305
306 let mut request = Request::new(ConfigStreamRequest {
307 name: formatted_full_name,
308 });
309
310 request
311 .metadata_mut()
312 .insert("session_id", session_id.to_grpc_header_value());
313
314 StreamingResponse::from_response_future(async move { client.stream_config_events(request).await })
315 }
316}
317
318pin_project! {
319 #[project = StreamingResponseProj]
330 pub enum StreamingResponse<T> {
331 Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
333
334 Streaming { #[pin] stream: Streaming<T> },
336 }
337}
338
339impl<T> StreamingResponse<T> {
340 fn from_response_future<F>(fut: F) -> Self
341 where
342 F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
343 {
344 Self::Initial { inner: Box::pin(fut) }
345 }
346}
347
348impl<T> Stream for StreamingResponse<T> {
349 type Item = Result<T, Status>;
350
351 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352 loop {
353 let this = self.as_mut().project();
354 let new_state = match this {
355 StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
362 Ok(response) => {
363 let stream = response.into_inner();
364 StreamingResponse::Streaming { stream }
365 }
366 Err(status) => return Poll::Ready(Some(Err(status))),
367 },
368 StreamingResponseProj::Streaming { stream } => return stream.poll_next(cx),
369 };
370
371 self.set(new_state);
372 }
373 }
374}
375
376async fn try_query_agent_api(
377 client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
378) -> Result<(), GenericError> {
379 let noop_fetch_request = FetchEntityRequest {
380 id: Some(EntityId {
381 prefix: "container_id".to_string(),
382 uid: "nonexistent".to_string(),
383 }),
384 cardinality: TagCardinality::High.into(),
385 };
386 match client.tagger_fetch_entity(noop_fetch_request).await {
387 Ok(_) => Ok(()),
388 Err(e) => match e.code() {
389 Code::Unauthenticated => Err(generic_error!(
390 "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
391 )),
392 _ => Err(e.into()),
393 },
394 }
395}