datadog_agent_commons/ipc/client/
mod.rs1use std::time::Duration;
4
5use backon::Retryable as _;
6use datadog_protos::agent::v1::{RefreshRemoteAgentRequest, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse};
7use datadog_protos::agent::{
8 AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
9 FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
10 TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
11 WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
12};
13use saluki_config::GenericConfiguration;
14use saluki_error::{generic_error, ErrorContext as _, GenericError};
15use saluki_io::net::client::http::HttpsCapableConnectorBuilder;
16use tonic::{
17 service::interceptor::InterceptedService,
18 transport::{Channel, Endpoint},
19 Code, Request, Response,
20};
21use tracing::warn;
22
23use crate::ipc::{config::RemoteAgentClientConfiguration, session::SessionId, tls::build_ipc_client_ipc_tls_config};
24
25mod bearer_auth;
26use self::bearer_auth::BearerAuthInterceptor;
27
28mod streaming;
29pub use self::streaming::StreamingResponse;
30
31#[derive(Clone)]
33pub struct RemoteAgentClient {
34 client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
35 secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
36}
37
38impl RemoteAgentClient {
39 pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46 let config = RemoteAgentClientConfiguration::from_configuration(config)?;
47
48 let service_builder = || async {
59 let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth().auth_token_file_path()).await?;
60 let ipc_cert_file_path = config.auth().ipc_cert_file_path();
61 let client_tls_config = build_ipc_client_ipc_tls_config(ipc_cert_file_path).await?;
62 let https_connector = HttpsCapableConnectorBuilder::default().build(client_tls_config)?;
63 let endpoint = config.endpoint()?;
64 let channel = Endpoint::from(endpoint.clone())
65 .connect_timeout(Duration::from_secs(2))
66 .connect_with_connector(https_connector)
67 .await
68 .with_error_context(|| format!("Failed to connect to Datadog Agent API at '{}'.", endpoint))?;
69
70 Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
71 };
72
73 let service = service_builder
74 .retry(&config)
75 .notify(|e, delay| {
76 warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
77 })
78 .await
79 .error_context("Failed to create Datadog Agent API client.")?;
80
81 let client = AgentClient::new(service.clone()).max_decoding_message_size(config.grpc_max_message_size());
82 let mut secure_client =
83 AgentSecureClient::new(service).max_decoding_message_size(config.grpc_max_message_size());
84
85 try_query_agent_api(&mut secure_client).await?;
87
88 Ok(Self { client, secure_client })
89 }
90
91 pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
97 let response = self
98 .client
99 .get_hostname(HostnameRequest {})
100 .await
101 .map(|r| r.into_inner())?;
102
103 Ok(response.hostname)
104 }
105
106 pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
111 let mut client = self.secure_client.clone();
112 StreamingResponse::from_response_future(async move {
113 client
114 .tagger_stream_entities(StreamTagsRequest {
115 cardinality: cardinality.into(),
116 ..Default::default()
117 })
118 .await
119 })
120 }
121
122 pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
127 let mut client = self.secure_client.clone();
128 StreamingResponse::from_response_future(async move {
129 client
130 .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
131 filter: Some(WorkloadmetaFilter {
132 kinds: vec![
133 WorkloadmetaKind::Container.into(),
134 WorkloadmetaKind::KubernetesPod.into(),
135 WorkloadmetaKind::EcsTask.into(),
136 ],
137 source: WorkloadmetaSource::All.into(),
138 event_type: WorkloadmetaEventType::EventTypeAll.into(),
139 }),
140 })
141 .await
142 })
143 }
144
145 pub async fn register_remote_agent_request(
151 &mut self, pid: u32, display_name: &str, flavor: &str, api_endpoint: &str, services: Vec<String>,
152 ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
153 let mut client = self.secure_client.clone();
154 let response = client
155 .register_remote_agent(RegisterRemoteAgentRequest {
156 pid: pid.to_string(),
157 flavor: flavor.to_string(),
158 display_name: display_name.to_string(),
159 api_endpoint_uri: api_endpoint.to_string(),
160 services,
161 })
162 .await?;
163 Ok(response)
164 }
165
166 pub async fn refresh_remote_agent_request(&mut self, session_id: &SessionId) -> Result<Response<()>, GenericError> {
172 let mut client = self.secure_client.clone();
173 let response = client
174 .refresh_remote_agent(RefreshRemoteAgentRequest {
175 session_id: session_id.to_string(),
176 })
177 .await?
178 .map(|_| ());
179 Ok(response)
180 }
181
182 pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
188 let mut client = self.secure_client.clone();
189 let response = client.get_host_tags(HostTagRequest {}).await?;
190 Ok(response)
191 }
192
193 pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
198 let mut client = self.secure_client.clone();
199 StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
200 }
201
202 pub fn stream_config_events(&mut self, session_id: &SessionId) -> StreamingResponse<ConfigEvent> {
207 let mut client = self.secure_client.clone();
208 let app_details = saluki_metadata::get_app_details();
209 let formatted_full_name = app_details
210 .full_name()
211 .replace(" ", "-")
212 .replace("_", "-")
213 .to_lowercase();
214
215 let mut request = Request::new(ConfigStreamRequest {
216 name: formatted_full_name,
217 });
218
219 request
220 .metadata_mut()
221 .insert("session_id", session_id.to_grpc_header_value());
222
223 StreamingResponse::from_response_future(async move { client.stream_config_events(request).await })
224 }
225}
226
227async fn try_query_agent_api(
228 client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
229) -> Result<(), GenericError> {
230 let noop_fetch_request = FetchEntityRequest {
231 id: Some(EntityId {
232 prefix: "container_id".to_string(),
233 uid: "nonexistent".to_string(),
234 }),
235 cardinality: TagCardinality::High.into(),
236 };
237 match client.tagger_fetch_entity(noop_fetch_request).await {
238 Ok(_) => Ok(()),
239 Err(e) => match e.code() {
240 Code::Unauthenticated => Err(generic_error!(
241 "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
242 )),
243 _ => Err(e.into()),
244 },
245 }
246}