saluki_env/helpers/remote_agent/
client.rs1use std::{
2 future::Future,
3 path::PathBuf,
4 pin::Pin,
5 task::{ready, Context, Poll},
6 time::Duration,
7};
8
9use backon::{BackoffBuilder, ConstantBuilder, Retryable as _};
10use datadog_protos::agent::v1::{RefreshRemoteAgentRequest, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse};
11use datadog_protos::agent::{
12 AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
13 FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
14 TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
15 WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
16};
17use futures::Stream;
18use pin_project_lite::pin_project;
19use saluki_config::GenericConfiguration;
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use saluki_io::net::{
22 build_datadog_agent_client_ipc_tls_config, client::http::HttpsCapableConnectorBuilder, get_ipc_cert_file_path,
23};
24use serde::Deserialize;
25use tonic::{
26 service::interceptor::InterceptedService,
27 transport::{Channel, Endpoint, Uri},
28 Code, Request, Response, Status, Streaming,
29};
30use tracing::warn;
31
32use super::session::SessionId;
33use crate::helpers::tonic::BearerAuthInterceptor;
34
35fn default_agent_ipc_endpoint() -> Uri {
36 Uri::from_static("https://127.0.0.1:5001")
37}
38
39fn default_agent_auth_token_file_path() -> PathBuf {
40 PathBuf::from("/etc/datadog-agent/auth_token")
41}
42
43const fn default_connect_retry_attempts() -> usize {
44 10
45}
46
47const fn default_grpc_max_message_size() -> usize {
48 128 * 1024 * 1024
49}
50
51const fn default_connect_retry_backoff() -> Duration {
52 Duration::from_secs(2)
53}
54
55#[derive(Deserialize)]
57struct RemoteAgentClientConfiguration {
58 #[serde(
65 rename = "agent_ipc_endpoint",
66 with = "http_serde_ext::uri",
67 default = "default_agent_ipc_endpoint"
68 )]
69 ipc_endpoint: Uri,
70
71 #[serde(default = "default_agent_auth_token_file_path")]
77 auth_token_file_path: PathBuf,
78
79 #[serde(default)]
88 ipc_cert_file_path: Option<PathBuf>,
89
90 #[serde(default = "default_connect_retry_attempts")]
94 connect_retry_attempts: usize,
95
96 #[serde(default = "default_connect_retry_backoff")]
100 connect_retry_backoff: Duration,
101
102 #[serde(
106 rename = "agent_ipc_grpc_max_message_size",
107 default = "default_grpc_max_message_size"
108 )]
109 grpc_max_message_size: usize,
110}
111
112impl BackoffBuilder for &RemoteAgentClientConfiguration {
113 type Backoff = <ConstantBuilder as BackoffBuilder>::Backoff;
114
115 fn build(self) -> Self::Backoff {
116 ConstantBuilder::default()
117 .with_delay(self.connect_retry_backoff)
118 .with_max_times(self.connect_retry_attempts)
119 .build()
120 }
121}
122
123#[derive(Clone)]
125pub struct RemoteAgentClient {
126 client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
127 secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
128}
129
130impl RemoteAgentClient {
131 pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
138 let mut config = config
139 .as_typed::<RemoteAgentClientConfiguration>()
140 .error_context("Failed to parse configuration for Remote Agent client.")?;
141
142 if config.auth_token_file_path.as_os_str().is_empty() {
144 config.auth_token_file_path = default_agent_auth_token_file_path();
145 }
146 if let Some(ref cert_path) = config.ipc_cert_file_path {
147 if cert_path.as_os_str().is_empty() {
148 config.ipc_cert_file_path = None;
149 }
150 }
151
152 let service_builder = || async {
163 let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth_token_file_path).await?;
164 let ipc_cert_file_path =
165 get_ipc_cert_file_path(config.ipc_cert_file_path.as_ref(), &config.auth_token_file_path);
166 let client_tls_config = build_datadog_agent_client_ipc_tls_config(ipc_cert_file_path).await?;
167 let https_connector = HttpsCapableConnectorBuilder::default().build(client_tls_config)?;
168 let channel = Endpoint::from(config.ipc_endpoint.clone())
169 .connect_timeout(Duration::from_secs(2))
170 .connect_with_connector(https_connector)
171 .await
172 .with_error_context(|| {
173 format!("Failed to connect to Datadog Agent API at '{}'.", config.ipc_endpoint)
174 })?;
175
176 Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
177 };
178
179 let service = service_builder
180 .retry(&config)
181 .notify(|e, delay| {
182 warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
183 })
184 .await
185 .error_context("Failed to create Datadog Agent API client.")?;
186
187 let client = AgentClient::new(service.clone()).max_decoding_message_size(config.grpc_max_message_size);
188 let mut secure_client = AgentSecureClient::new(service).max_decoding_message_size(config.grpc_max_message_size);
189
190 try_query_agent_api(&mut secure_client).await?;
192
193 Ok(Self { client, secure_client })
194 }
195
196 pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
202 let response = self
203 .client
204 .get_hostname(HostnameRequest {})
205 .await
206 .map(|r| r.into_inner())?;
207
208 Ok(response.hostname)
209 }
210
211 pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
216 let mut client = self.secure_client.clone();
217 StreamingResponse::from_response_future(async move {
218 client
219 .tagger_stream_entities(StreamTagsRequest {
220 cardinality: cardinality.into(),
221 ..Default::default()
222 })
223 .await
224 })
225 }
226
227 pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
232 let mut client = self.secure_client.clone();
233 StreamingResponse::from_response_future(async move {
234 client
235 .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
236 filter: Some(WorkloadmetaFilter {
237 kinds: vec![
238 WorkloadmetaKind::Container.into(),
239 WorkloadmetaKind::KubernetesPod.into(),
240 WorkloadmetaKind::EcsTask.into(),
241 ],
242 source: WorkloadmetaSource::All.into(),
243 event_type: WorkloadmetaEventType::EventTypeAll.into(),
244 }),
245 })
246 .await
247 })
248 }
249
250 pub async fn register_remote_agent_request(
256 &mut self, pid: u32, display_name: &str, api_endpoint: &str, services: Vec<String>,
257 ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
258 let mut client = self.secure_client.clone();
259 let response = client
260 .register_remote_agent(RegisterRemoteAgentRequest {
261 pid: pid.to_string(),
262 flavor: "agent-data-plane".to_string(),
263 display_name: display_name.to_string(),
264 api_endpoint_uri: api_endpoint.to_string(),
265 services,
266 })
267 .await?;
268 Ok(response)
269 }
270
271 pub async fn refresh_remote_agent_request(&mut self, session_id: &SessionId) -> Result<Response<()>, GenericError> {
277 let mut client = self.secure_client.clone();
278 let response = client
279 .refresh_remote_agent(RefreshRemoteAgentRequest {
280 session_id: session_id.to_string(),
281 })
282 .await?
283 .map(|_| ());
284 Ok(response)
285 }
286
287 pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
293 let mut client = self.secure_client.clone();
294 let response = client.get_host_tags(HostTagRequest {}).await?;
295 Ok(response)
296 }
297
298 pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
303 let mut client = self.secure_client.clone();
304 StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
305 }
306
307 pub fn stream_config_events(&mut self, session_id: &SessionId) -> StreamingResponse<ConfigEvent> {
312 let mut client = self.secure_client.clone();
313 let app_details = saluki_metadata::get_app_details();
314 let formatted_full_name = app_details
315 .full_name()
316 .replace(" ", "-")
317 .replace("_", "-")
318 .to_lowercase();
319
320 let mut request = Request::new(ConfigStreamRequest {
321 name: formatted_full_name,
322 });
323
324 request
325 .metadata_mut()
326 .insert("session_id", session_id.to_grpc_header_value());
327
328 StreamingResponse::from_response_future(async move { client.stream_config_events(request).await })
329 }
330}
331
332pin_project! {
333 #[project = StreamingResponseProj]
344 pub enum StreamingResponse<T> {
345 Initial { inner: Pin<Box<dyn Future<Output = Result<Response<Streaming<T>>, Status>> + Send>> },
347
348 Streaming { #[pin] stream: Streaming<T> },
350
351 Terminated,
353 }
354}
355
356impl<T> StreamingResponse<T> {
357 fn from_response_future<F>(fut: F) -> Self
358 where
359 F: Future<Output = Result<Response<Streaming<T>>, Status>> + Send + 'static,
360 {
361 Self::Initial { inner: Box::pin(fut) }
362 }
363
364 fn from_response(response: Response<Streaming<T>>) -> Self {
365 Self::Streaming {
366 stream: response.into_inner(),
367 }
368 }
369}
370
371impl<T> Stream for StreamingResponse<T> {
372 type Item = Result<T, Status>;
373
374 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
375 #[allow(clippy::large_enum_variant)]
379 enum Outcome<T> {
380 Advance(StreamingResponse<T>),
381 Yield(Option<Result<T, Status>>),
382 Terminate(Option<Result<T, Status>>),
383 }
384
385 loop {
386 let this = self.as_mut().project();
387 let outcome = match this {
388 StreamingResponseProj::Initial { inner } => match ready!(inner.as_mut().poll(cx)) {
389 Ok(response) => Outcome::Advance(Self::from_response(response)),
390 Err(status) => Outcome::Terminate(Some(Err(status))),
391 },
392 StreamingResponseProj::Streaming { stream } => match ready!(stream.poll_next(cx)) {
393 Some(maybe_item) => Outcome::Yield(Some(maybe_item)),
394 None => Outcome::Terminate(None),
395 },
396 StreamingResponseProj::Terminated => Outcome::Yield(None),
397 };
398
399 match outcome {
400 Outcome::Advance(state) => self.set(state),
401 Outcome::Yield(item) => return Poll::Ready(item),
402 Outcome::Terminate(item) => {
403 self.set(Self::Terminated);
404 return Poll::Ready(item);
405 }
406 }
407 }
408 }
409}
410
411async fn try_query_agent_api(
412 client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
413) -> Result<(), GenericError> {
414 let noop_fetch_request = FetchEntityRequest {
415 id: Some(EntityId {
416 prefix: "container_id".to_string(),
417 uid: "nonexistent".to_string(),
418 }),
419 cardinality: TagCardinality::High.into(),
420 };
421 match client.tagger_fetch_entity(noop_fetch_request).await {
422 Ok(_) => Ok(()),
423 Err(e) => match e.code() {
424 Code::Unauthenticated => Err(generic_error!(
425 "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
426 )),
427 _ => Err(e.into()),
428 },
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use std::time::Duration;
435
436 use futures::{future::pending, StreamExt};
437 use tokio::time::timeout;
438 use tonic::{Code, Status};
439
440 use super::StreamingResponse;
441
442 #[tokio::test]
443 async fn streaming_response_terminates_after_initial_error() {
444 let mut stream = StreamingResponse::<()>::from_response_future(async { Err(Status::unavailable("boom")) });
447
448 match stream.next().await {
449 Some(Err(s)) => assert_eq!(s.code(), Code::Unavailable),
450 other => panic!(
451 "expected Some(Err(Unavailable)), got {:?}",
452 other.map(|r| r.map(|_| ()))
453 ),
454 }
455
456 assert!(stream.next().await.is_none());
458 assert!(stream.next().await.is_none());
459 }
460
461 #[tokio::test]
462 async fn streaming_response_pending_initial_stays_pending() {
463 let mut stream = StreamingResponse::<()>::from_response_future(async { pending::<Result<_, Status>>().await });
465
466 assert!(
467 timeout(Duration::from_millis(50), stream.next()).await.is_err(),
468 "stream with pending initial future should not produce an item"
469 );
470 }
471}