Skip to main content

datadog_agent_commons/ipc/client/
mod.rs

1//! Helpers for interacting with the Datadog Agent.
2
3use std::time::Duration;
4
5use backon::Retryable as _;
6use datadog_protos::agent::v1::{RefreshRemoteAgentRequest, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse};
7use datadog_protos::agent::{
8    AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
9    FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
10    TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
11    WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
12};
13use saluki_config::GenericConfiguration;
14use saluki_error::{generic_error, ErrorContext as _, GenericError};
15use saluki_io::net::client::http::HttpsCapableConnectorBuilder;
16use tonic::{
17    service::interceptor::InterceptedService,
18    transport::{Channel, Endpoint},
19    Code, Request, Response,
20};
21use tracing::warn;
22
23use crate::ipc::{config::RemoteAgentClientConfiguration, session::SessionId, tls::build_ipc_client_ipc_tls_config};
24
25mod bearer_auth;
26use self::bearer_auth::BearerAuthInterceptor;
27
28mod streaming;
29pub use self::streaming::StreamingResponse;
30
31/// A client for interacting with the Datadog Agent's internal gRPC-based API.
32#[derive(Clone)]
33pub struct RemoteAgentClient {
34    client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
35    secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
36}
37
38impl RemoteAgentClient {
39    /// Creates a new `RemoteAgentClient` from the given configuration.
40    ///
41    /// # Errors
42    ///
43    /// If the Agent gRPC client can't be created (invalid API endpoint, missing authentication token, etc), or if the
44    /// authentication token is invalid, an error will be returned.
45    pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46        let config = RemoteAgentClientConfiguration::from_configuration(config)?;
47
48        // TODO: We need to write a Tower middleware service that allows applying a backoff between failed calls,
49        // specifically so that we can throttle reconnection attempts.
50        //
51        // When the remote Agent endpoint is not available -- Agent isn't running, etc -- the gRPC client will
52        // essentially freewheel, trying to reconnect as quickly as possible, which spams the logs, wastes resources, so
53        // on and so forth. We would want to essentially apply a backoff like any other client would for the RPC calls
54        // themselves, but use it with the _connector_ instead.
55        //
56        // We could potentially just use a retry middleware, but Tonic does have its own reconnection logic, so we'd
57        // have to test it out to make sure it behaves sensibly.
58        let service_builder = || async {
59            let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth().auth_token_file_path()).await?;
60            let ipc_cert_file_path = config.auth().ipc_cert_file_path();
61            let client_tls_config = build_ipc_client_ipc_tls_config(ipc_cert_file_path).await?;
62            let connector_builder = HttpsCapableConnectorBuilder::default();
63            #[cfg(target_os = "linux")]
64            let connector_builder = if let Some(addr) = config.vsock_addr()? {
65                connector_builder.with_vsock_addr(addr)
66            } else {
67                connector_builder
68            };
69            let https_connector = connector_builder.build(client_tls_config)?;
70            let endpoint = config.endpoint()?;
71            let channel = Endpoint::from(endpoint.clone())
72                .connect_timeout(Duration::from_secs(2))
73                .connect_with_connector(https_connector)
74                .await
75                .with_error_context(|| format!("Failed to connect to Datadog Agent API at '{}'.", endpoint))?;
76
77            Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
78        };
79
80        let service = service_builder
81            .retry(&config)
82            .notify(|e, delay| {
83                warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
84            })
85            .await
86            .error_context("Failed to create Datadog Agent API client.")?;
87
88        let client = AgentClient::new(service.clone()).max_decoding_message_size(config.grpc_max_message_size());
89        let mut secure_client =
90            AgentSecureClient::new(service).max_decoding_message_size(config.grpc_max_message_size());
91
92        // Try and do a basic health check to make sure we can connect and that our authentication token is valid.
93        try_query_agent_api(&mut secure_client).await?;
94
95        Ok(Self { client, secure_client })
96    }
97
98    /// Gets the detected hostname from the Agent.
99    ///
100    /// # Errors
101    ///
102    /// If there is an error querying the Agent API, an error will be returned.
103    pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
104        let response = self
105            .client
106            .get_hostname(HostnameRequest {})
107            .await
108            .map(|r| r.into_inner())?;
109
110        Ok(response.hostname)
111    }
112
113    /// Gets a stream of tagger entities at the given cardinality.
114    ///
115    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
116    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
117    pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
118        let mut client = self.secure_client.clone();
119        StreamingResponse::from_response_future(async move {
120            client
121                .tagger_stream_entities(StreamTagsRequest {
122                    cardinality: cardinality.into(),
123                    ..Default::default()
124                })
125                .await
126        })
127    }
128
129    /// Gets a stream of all workloadmeta entities.
130    ///
131    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
132    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
133    pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
134        let mut client = self.secure_client.clone();
135        StreamingResponse::from_response_future(async move {
136            client
137                .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
138                    filter: Some(WorkloadmetaFilter {
139                        kinds: vec![
140                            WorkloadmetaKind::Container.into(),
141                            WorkloadmetaKind::KubernetesPod.into(),
142                            WorkloadmetaKind::EcsTask.into(),
143                        ],
144                        source: WorkloadmetaSource::All.into(),
145                        event_type: WorkloadmetaEventType::EventTypeAll.into(),
146                    }),
147                })
148                .await
149        })
150    }
151
152    /// Registers a Remote Agent with the Agent.
153    ///
154    /// # Errors
155    ///
156    /// If there is an error sending the request to the Agent API, an error will be returned.
157    pub async fn register_remote_agent_request(
158        &mut self, pid: u32, display_name: &str, flavor: &str, api_endpoint: &str, services: Vec<String>,
159    ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
160        let mut client = self.secure_client.clone();
161        let response = client
162            .register_remote_agent(RegisterRemoteAgentRequest {
163                pid: pid.to_string(),
164                flavor: flavor.to_string(),
165                display_name: display_name.to_string(),
166                api_endpoint_uri: api_endpoint.to_string(),
167                services,
168            })
169            .await?;
170        Ok(response)
171    }
172
173    /// Refreshes the given remote agent session with the Agent.
174    ///
175    /// # Errors
176    ///
177    /// If there is an error sending the request to the Agent API, an error will be returned.
178    pub async fn refresh_remote_agent_request(&mut self, session_id: &SessionId) -> Result<Response<()>, GenericError> {
179        let mut client = self.secure_client.clone();
180        let response = client
181            .refresh_remote_agent(RefreshRemoteAgentRequest {
182                session_id: session_id.to_string(),
183            })
184            .await?
185            .map(|_| ());
186        Ok(response)
187    }
188
189    /// Gets the host tags from the Agent.
190    ///
191    /// # Errors
192    ///
193    /// If there is an error querying the Agent API, an error will be returned.
194    pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
195        let mut client = self.secure_client.clone();
196        let response = client.get_host_tags(HostTagRequest {}).await?;
197        Ok(response)
198    }
199
200    /// Gets a stream of autodiscovery config updates.
201    ///
202    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
203    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
204    pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
205        let mut client = self.secure_client.clone();
206        StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
207    }
208
209    /// Gets a stream of config events.
210    ///
211    /// If there is an error with the initial request, or an error occurs while streaming, the next message in the
212    /// stream will be `Some(Err(status))`, where the status indicates the underlying error.
213    pub fn stream_config_events(&mut self, session_id: &SessionId) -> StreamingResponse<ConfigEvent> {
214        let mut client = self.secure_client.clone();
215        let app_details = saluki_metadata::get_app_details();
216        let formatted_full_name = app_details
217            .full_name()
218            .replace(" ", "-")
219            .replace("_", "-")
220            .to_lowercase();
221
222        let mut request = Request::new(ConfigStreamRequest {
223            name: formatted_full_name,
224        });
225
226        request
227            .metadata_mut()
228            .insert("session_id", session_id.to_grpc_header_value());
229
230        StreamingResponse::from_response_future(async move { client.stream_config_events(request).await })
231    }
232}
233
234async fn try_query_agent_api(
235    client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
236) -> Result<(), GenericError> {
237    let noop_fetch_request = FetchEntityRequest {
238        id: Some(EntityId {
239            prefix: "container_id".to_string(),
240            uid: "nonexistent".to_string(),
241        }),
242        cardinality: TagCardinality::High.into(),
243    };
244    match client.tagger_fetch_entity(noop_fetch_request).await {
245        Ok(_) => Ok(()),
246        Err(e) => match e.code() {
247            Code::Unauthenticated => Err(generic_error!(
248                "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
249            )),
250            _ => Err(e.into()),
251        },
252    }
253}