Skip to main content

datadog_agent_commons/ipc/client/
mod.rs

1//! Helpers for interacting with the Datadog Agent.
2
3use std::time::Duration;
4
5use backon::Retryable as _;
6use datadog_protos::agent::v1::{RefreshRemoteAgentRequest, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse};
7use datadog_protos::agent::{
8    AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
9    FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
10    TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
11    WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
12};
13use saluki_config::GenericConfiguration;
14use saluki_error::{generic_error, ErrorContext as _, GenericError};
15use saluki_io::net::client::http::HttpsCapableConnectorBuilder;
16use tonic::{
17    service::interceptor::InterceptedService,
18    transport::{Channel, Endpoint},
19    Code, Request, Response,
20};
21use tracing::warn;
22
23use crate::ipc::{config::RemoteAgentClientConfiguration, session::SessionId, tls::build_ipc_client_ipc_tls_config};
24
25mod bearer_auth;
26use self::bearer_auth::BearerAuthInterceptor;
27
28mod streaming;
29pub use self::streaming::StreamingResponse;
30
31/// A client for interacting with the Datadog Agent's internal gRPC-based API.
32#[derive(Clone)]
33pub struct RemoteAgentClient {
34    client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
35    secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
36}
37
38impl RemoteAgentClient {
39    /// Creates a new `RemoteAgentClient` from the given configuration.
40    ///
41    /// # Errors
42    ///
43    /// If the Agent gRPC client can't be created (invalid API endpoint, missing authentication token, etc), or if the
44    /// authentication token is invalid, an error will be returned.
45    pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46        let config = RemoteAgentClientConfiguration::from_configuration(config)?;
47
48        // TODO: We need to write a Tower middleware service that allows applying a backoff between failed calls,
49        // specifically so that we can throttle reconnection attempts.
50        //
51        // When the remote Agent endpoint is not available -- Agent isn't running, etc -- the gRPC client will
52        // essentially freewheel, trying to reconnect as quickly as possible, which spams the logs, wastes resources, so
53        // on and so forth. We would want to essentially apply a backoff like any other client would for the RPC calls
54        // themselves, but use it with the _connector_ instead.
55        //
56        // We could potentially just use a retry middleware, but Tonic does have its own reconnection logic, so we'd
57        // have to test it out to make sure it behaves sensibly.
58        let service_builder = || async {
59            let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth().auth_token_file_path()).await?;
60            let ipc_cert_file_path = config.auth().ipc_cert_file_path();
61            let client_tls_config = build_ipc_client_ipc_tls_config(ipc_cert_file_path).await?;
62            let https_connector = HttpsCapableConnectorBuilder::default().build(client_tls_config)?;
63            let endpoint = config.endpoint()?;
64            let channel = Endpoint::from(endpoint.clone())
65                .connect_timeout(Duration::from_secs(2))
66                .connect_with_connector(https_connector)
67                .await
68                .with_error_context(|| format!("Failed to connect to Datadog Agent API at '{}'.", endpoint))?;
69
70            Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
71        };
72
73        let service = service_builder
74            .retry(&config)
75            .notify(|e, delay| {
76                warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
77            })
78            .await
79            .error_context("Failed to create Datadog Agent API client.")?;
80
81        let client = AgentClient::new(service.clone()).max_decoding_message_size(config.grpc_max_message_size());
82        let mut secure_client =
83            AgentSecureClient::new(service).max_decoding_message_size(config.grpc_max_message_size());
84
85        // Try and do a basic health check to make sure we can connect and that our authentication token is valid.
86        try_query_agent_api(&mut secure_client).await?;
87
88        Ok(Self { client, secure_client })
89    }
90
91    /// Gets the detected hostname from the Agent.
92    ///
93    /// # Errors
94    ///
95    /// If there is an error querying the Agent API, an error will be returned.
96    pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
97        let response = self
98            .client
99            .get_hostname(HostnameRequest {})
100            .await
101            .map(|r| r.into_inner())?;
102
103        Ok(response.hostname)
104    }
105
106    /// Gets a stream of tagger entities at the given cardinality.
107    ///
108    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
109    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
110    pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
111        let mut client = self.secure_client.clone();
112        StreamingResponse::from_response_future(async move {
113            client
114                .tagger_stream_entities(StreamTagsRequest {
115                    cardinality: cardinality.into(),
116                    ..Default::default()
117                })
118                .await
119        })
120    }
121
122    /// Gets a stream of all workloadmeta entities.
123    ///
124    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
125    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
126    pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
127        let mut client = self.secure_client.clone();
128        StreamingResponse::from_response_future(async move {
129            client
130                .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
131                    filter: Some(WorkloadmetaFilter {
132                        kinds: vec![
133                            WorkloadmetaKind::Container.into(),
134                            WorkloadmetaKind::KubernetesPod.into(),
135                            WorkloadmetaKind::EcsTask.into(),
136                        ],
137                        source: WorkloadmetaSource::All.into(),
138                        event_type: WorkloadmetaEventType::EventTypeAll.into(),
139                    }),
140                })
141                .await
142        })
143    }
144
145    /// Registers a Remote Agent with the Agent.
146    ///
147    /// # Errors
148    ///
149    /// If there is an error sending the request to the Agent API, an error will be returned.
150    pub async fn register_remote_agent_request(
151        &mut self, pid: u32, display_name: &str, flavor: &str, api_endpoint: &str, services: Vec<String>,
152    ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
153        let mut client = self.secure_client.clone();
154        let response = client
155            .register_remote_agent(RegisterRemoteAgentRequest {
156                pid: pid.to_string(),
157                flavor: flavor.to_string(),
158                display_name: display_name.to_string(),
159                api_endpoint_uri: api_endpoint.to_string(),
160                services,
161            })
162            .await?;
163        Ok(response)
164    }
165
166    /// Refreshes the given remote agent session with the Agent.
167    ///
168    /// # Errors
169    ///
170    /// If there is an error sending the request to the Agent API, an error will be returned.
171    pub async fn refresh_remote_agent_request(&mut self, session_id: &SessionId) -> Result<Response<()>, GenericError> {
172        let mut client = self.secure_client.clone();
173        let response = client
174            .refresh_remote_agent(RefreshRemoteAgentRequest {
175                session_id: session_id.to_string(),
176            })
177            .await?
178            .map(|_| ());
179        Ok(response)
180    }
181
182    /// Gets the host tags from the Agent.
183    ///
184    /// # Errors
185    ///
186    /// If there is an error querying the Agent API, an error will be returned.
187    pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
188        let mut client = self.secure_client.clone();
189        let response = client.get_host_tags(HostTagRequest {}).await?;
190        Ok(response)
191    }
192
193    /// Gets a stream of autodiscovery config updates.
194    ///
195    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
196    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
197    pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
198        let mut client = self.secure_client.clone();
199        StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
200    }
201
202    /// Gets a stream of config events.
203    ///
204    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
205    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
206    pub fn stream_config_events(&mut self, session_id: &SessionId) -> StreamingResponse<ConfigEvent> {
207        let mut client = self.secure_client.clone();
208        let app_details = saluki_metadata::get_app_details();
209        let formatted_full_name = app_details
210            .full_name()
211            .replace(" ", "-")
212            .replace("_", "-")
213            .to_lowercase();
214
215        let mut request = Request::new(ConfigStreamRequest {
216            name: formatted_full_name,
217        });
218
219        request
220            .metadata_mut()
221            .insert("session_id", session_id.to_grpc_header_value());
222
223        StreamingResponse::from_response_future(async move { client.stream_config_events(request).await })
224    }
225}
226
227async fn try_query_agent_api(
228    client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
229) -> Result<(), GenericError> {
230    let noop_fetch_request = FetchEntityRequest {
231        id: Some(EntityId {
232            prefix: "container_id".to_string(),
233            uid: "nonexistent".to_string(),
234        }),
235        cardinality: TagCardinality::High.into(),
236    };
237    match client.tagger_fetch_entity(noop_fetch_request).await {
238        Ok(_) => Ok(()),
239        Err(e) => match e.code() {
240            Code::Unauthenticated => Err(generic_error!(
241                "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
242            )),
243            _ => Err(e.into()),
244        },
245    }
246}