saluki_env/helpers/remote_agent/
client.rs1use std::{
2 future::Future,
3 path::PathBuf,
4 pin::Pin,
5 task::{ready, Context, Poll},
6 time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::v1::{
11 RefreshRemoteAgentRequest, RefreshRemoteAgentResponse, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse,
12};
13use datadog_protos::agent::{
14 AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
15 FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
16 TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
17 WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
18};
19use futures::Stream;
20use pin_project_lite::pin_project;
21use saluki_config::GenericConfiguration;
22use saluki_error::{generic_error, ErrorContext as _, GenericError};
23use saluki_io::net::{
24 build_datadog_agent_client_ipc_tls_config, client::http::HttpsCapableConnectorBuilder, get_ipc_cert_file_path,
25};
26use serde::Deserialize;
27use tonic::{
28 service::interceptor::InterceptedService,
29 transport::{Channel, Endpoint, Uri},
30 Code, Response, Status, Streaming,
31};
32use tracing::warn;
33
34use crate::helpers::tonic::BearerAuthInterceptor;
35
36fn default_agent_ipc_endpoint() -> Uri {
37 Uri::from_static("https://127.0.0.1:5001")
38}
39
40fn default_agent_auth_token_file_path() -> PathBuf {
41 PathBuf::from("/etc/datadog-agent/auth_token")
42}
43
44const fn default_connect_retry_attempts() -> usize {
45 10
46}
47
48const fn default_connect_retry_backoff() -> Duration {
49 Duration::from_secs(2)
50}
51
52#[derive(Deserialize)]
53struct RemoteAgentClientConfiguration {
54 #[serde(
61 rename = "agent_ipc_endpoint",
62 with = "http_serde_ext::uri",
63 default = "default_agent_ipc_endpoint"
64 )]
65 ipc_endpoint: Uri,
66
67 #[serde(default = "default_agent_auth_token_file_path")]
73 auth_token_file_path: PathBuf,
74
75 #[serde(default)]
84 ipc_cert_file_path: Option<PathBuf>,
85
86 #[serde(default = "default_connect_retry_attempts")]
90 connect_retry_attempts: usize,
91
92 #[serde(default = "default_connect_retry_backoff")]
96 connect_retry_backoff: Duration,
97}
98
99impl BackoffBuilder for &RemoteAgentClientConfiguration {
100 type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
101
102 fn build(self) -> Self::Backoff {
103 ConstantBuilder::default()
104 .with_delay(self.connect_retry_backoff)
105 .with_max_times(self.connect_retry_attempts)
106 .build()
107 }
108}
109
110#[derive(Clone)]
112pub struct RemoteAgentClient {
113 client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
114 secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
115}
116
117impl RemoteAgentClient {
118 pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
125 let mut config = config
126 .as_typed::<RemoteAgentClientConfiguration>()
127 .error_context("Failed to parse configuration for Remote Agent client.")?;
128
129 if config.auth_token_file_path.as_os_str().is_empty() {
131 config.auth_token_file_path = default_agent_auth_token_file_path();
132 }
133 if let Some(ref cert_path) = config.ipc_cert_file_path {
134 if cert_path.as_os_str().is_empty() {
135 config.ipc_cert_file_path = None;
136 }
137 }
138
139 let service_builder = || async {
150 let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
151 let ipc_cert_file_path =
152 get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
153 let client_tls_config = build_datadog_agent_client_ipc_tls_config(ipc_cert_file_path).await?;
154 let https_connector = HttpsCapableConnectorBuilder::default().build(client_tls_config)?;
155 let channel = Endpoint::from(config.ipc_endpoint.clone())
156 .connect_timeout(Duration::from_secs(2))
157 .connect_with_connector(https_connector)
158 .await
159 .with_error_context(|| {
160 format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
161 })?;
162
163 Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
164 };
165
166 let service = service_builder
167 .retry(&config)
168 .notify(|e, delay| {
169 warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
170 })
171 .await
172 .error_context("Failed to create Datadog Agent API client.")?;
173
174 let client = AgentClient::new(service.clone());
175 let mut secure_client = AgentSecureClient::new(service);
176
177 try_query_agent_api(&mut secure_client).await?;
179
180 Ok(Self { client, secure_client })
181 }
182
183 pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
189 let response = self
190 .client
191 .get_hostname(HostnameRequest {})
192 .await
193 .map(|r| r.into_inner())?;
194
195 Ok(response.hostname)
196 }
197
198 pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
203 let mut client = self.secure_client.clone();
204 StreamingResponse::from_response_future(async move {
205 client
206 .tagger_stream_entities(StreamTagsRequest {
207 cardinality: cardinality.into(),
208 ..Default::default()
209 })
210 .await
211 })
212 }
213
214 pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
219 let mut client = self.secure_client.clone();
220 StreamingResponse::from_response_future(async move {
221 client
222 .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
223 filter: Some(WorkloadmetaFilter {
224 kinds: vec![
225 WorkloadmetaKind::Container.into(),
226 WorkloadmetaKind::KubernetesPod.into(),
227 WorkloadmetaKind::EcsTask.into(),
228 ],
229 source: WorkloadmetaSource::All.into(),
230 event_type: WorkloadmetaEventType::EventTypeAll.into(),
231 }),
232 })
233 .await
234 })
235 }
236
237 pub async fn register_remote_agent_request(
243 &mut self, pid: u32, display_name: &str, api_endpoint: &str, services: Vec<String>,
244 ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
245 let mut client = self.secure_client.clone();
246 let response = client
247 .register_remote_agent(RegisterRemoteAgentRequest {
248 pid: pid.to_string(),
249 flavor: "agent-data-plane".to_string(),
250 display_name: display_name.to_string(),
251 api_endpoint_uri: api_endpoint.to_string(),
252 services,
253 })
254 .await?;
255 Ok(response)
256 }
257
258 pub async fn refresh_remote_agent_request(
262 &mut self, session_id: &str,
263 ) -> Result<Response<RefreshRemoteAgentResponse>, GenericError> {
264 let mut client = self.secure_client.clone();
265 let response = client
266 .refresh_remote_agent(RefreshRemoteAgentRequest {
267 session_id: session_id.to_string(),
268 })
269 .await?;
270 Ok(response)
271 }
272
273 pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
279 let mut client = self.secure_client.clone();
280 let response = client.get_host_tags(HostTagRequest {}).await?;
281 Ok(response)
282 }
283
284 pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
289 let mut client = self.secure_client.clone();
290 StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
291 }
292
293 pub fn stream_config_events(&mut self) -> StreamingResponse<ConfigEvent> {
298 let mut client = self.secure_client.clone();
299 let app_details = saluki_metadata::get_app_details();
300 let formatted_full_name = app_details
301 .full_name()
302 .replace(" ", "-")
303 .replace("_", "-")
304 .to_lowercase();
305 StreamingResponse::from_response_future(async move {
306 client
307 .stream_config_events(ConfigStreamRequest {
308 name: formatted_full_name,
309 })
310 .await
311 })
312 }
313}
314
315pin_project! {
316 #[project = StreamingResponseProj]
327 pub enum StreamingResponse<T> {
328 Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
330
331 Streaming { #[pin] stream: Streaming<T> },
333 }
334}
335
336impl<T> StreamingResponse<T> {
337 fn from_response_future<F>(fut: F) -> Self
338 where
339 F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
340 {
341 Self::Initial { inner: Box::pin(fut) }
342 }
343}
344
345impl<T> Stream for StreamingResponse<T> {
346 type Item = Result<T, Status>;
347
348 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
349 loop {
350 let this = self.as_mut().project();
351 let new_state = match this {
352 StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
359 Ok(response) => {
360 let stream = response.into_inner();
361 StreamingResponse::Streaming { stream }
362 }
363 Err(status) => return Poll::Ready(Some(Err(status))),
364 },
365 StreamingResponseProj::Streaming { stream } => return stream.poll_next(cx),
366 };
367
368 self.set(new_state);
369 }
370 }
371}
372
373async fn try_query_agent_api(
374 client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
375) -> Result<(), GenericError> {
376 let noop_fetch_request = FetchEntityRequest {
377 id: Some(EntityId {
378 prefix: "container_id".to_string(),
379 uid: "nonexistent".to_string(),
380 }),
381 cardinality: TagCardinality::High.into(),
382 };
383 match client.tagger_fetch_entity(noop_fetch_request).await {
384 Ok(_) => Ok(()),
385 Err(e) => match e.code() {
386 Code::Unauthenticated => Err(generic_error!(
387 "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
388 )),
389 _ => Err(e.into()),
390 },
391 }
392}