datadog_agent_commons/ipc/client/
mod.rs1use std::time::Duration;
4
5use backon::Retryable as _;
6use datadog_protos::agent::v1::{RefreshRemoteAgentRequest, RegisterRemoteAgentRequest, RegisterRemoteAgentResponse};
7use datadog_protos::agent::{
8 AgentClient, AgentSecureClient, AutodiscoveryStreamResponse, ConfigEvent, ConfigStreamRequest, EntityId,
9 FetchEntityRequest, HostTagReply, HostTagRequest, HostnameRequest, StreamTagsRequest, StreamTagsResponse,
10 TagCardinality, WorkloadmetaEventType, WorkloadmetaFilter, WorkloadmetaKind, WorkloadmetaSource,
11 WorkloadmetaStreamRequest, WorkloadmetaStreamResponse,
12};
13use saluki_config::GenericConfiguration;
14use saluki_error::{generic_error, ErrorContext as _, GenericError};
15use saluki_io::net::client::http::HttpsCapableConnectorBuilder;
16use tonic::{
17 service::interceptor::InterceptedService,
18 transport::{Channel, Endpoint},
19 Code, Request, Response,
20};
21use tracing::warn;
22
23use crate::ipc::{config::RemoteAgentClientConfiguration, session::SessionId, tls::build_ipc_client_ipc_tls_config};
24
25mod bearer_auth;
26use self::bearer_auth::BearerAuthInterceptor;
27
28mod streaming;
29pub use self::streaming::StreamingResponse;
30
31#[derive(Clone)]
33pub struct RemoteAgentClient {
34 client: AgentClient<InterceptedService<Channel, BearerAuthInterceptor>>,
35 secure_client: AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
36}
37
38impl RemoteAgentClient {
39 pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46 let config = RemoteAgentClientConfiguration::from_configuration(config)?;
47
48 let service_builder = || async {
59 let auth_interceptor = BearerAuthInterceptor::from_file(&config.auth().auth_token_file_path()).await?;
60 let ipc_cert_file_path = config.auth().ipc_cert_file_path();
61 let client_tls_config = build_ipc_client_ipc_tls_config(ipc_cert_file_path).await?;
62 let connector_builder = HttpsCapableConnectorBuilder::default();
63 #[cfg(target_os = "linux")]
64 let connector_builder = if let Some(addr) = config.vsock_addr()? {
65 connector_builder.with_vsock_addr(addr)
66 } else {
67 connector_builder
68 };
69 let https_connector = connector_builder.build(client_tls_config)?;
70 let endpoint = config.endpoint()?;
71 let channel = Endpoint::from(endpoint.clone())
72 .connect_timeout(Duration::from_secs(2))
73 .connect_with_connector(https_connector)
74 .await
75 .with_error_context(|| format!("Failed to connect to Datadog Agent API at '{}'.", endpoint))?;
76
77 Ok::<_, GenericError>(InterceptedService::new(channel, auth_interceptor))
78 };
79
80 let service = service_builder
81 .retry(&config)
82 .notify(|e, delay| {
83 warn!(error = %e, "Failed to create Datadog Agent API client. Retrying in {:?}...", delay);
84 })
85 .await
86 .error_context("Failed to create Datadog Agent API client.")?;
87
88 let client = AgentClient::new(service.clone()).max_decoding_message_size(config.grpc_max_message_size());
89 let mut secure_client =
90 AgentSecureClient::new(service).max_decoding_message_size(config.grpc_max_message_size());
91
92 try_query_agent_api(&mut secure_client).await?;
94
95 Ok(Self { client, secure_client })
96 }
97
98 pub async fn get_hostname(&mut self) -> Result<String, GenericError> {
104 let response = self
105 .client
106 .get_hostname(HostnameRequest {})
107 .await
108 .map(|r| r.into_inner())?;
109
110 Ok(response.hostname)
111 }
112
113 pub fn get_tagger_stream(&mut self, cardinality: TagCardinality) -> StreamingResponse<StreamTagsResponse> {
118 let mut client = self.secure_client.clone();
119 StreamingResponse::from_response_future(async move {
120 client
121 .tagger_stream_entities(StreamTagsRequest {
122 cardinality: cardinality.into(),
123 ..Default::default()
124 })
125 .await
126 })
127 }
128
129 pub fn get_workloadmeta_stream(&mut self) -> StreamingResponse<WorkloadmetaStreamResponse> {
134 let mut client = self.secure_client.clone();
135 StreamingResponse::from_response_future(async move {
136 client
137 .workloadmeta_stream_entities(WorkloadmetaStreamRequest {
138 filter: Some(WorkloadmetaFilter {
139 kinds: vec![
140 WorkloadmetaKind::Container.into(),
141 WorkloadmetaKind::KubernetesPod.into(),
142 WorkloadmetaKind::EcsTask.into(),
143 ],
144 source: WorkloadmetaSource::All.into(),
145 event_type: WorkloadmetaEventType::EventTypeAll.into(),
146 }),
147 })
148 .await
149 })
150 }
151
152 pub async fn register_remote_agent_request(
158 &mut self, pid: u32, display_name: &str, flavor: &str, api_endpoint: &str, services: Vec<String>,
159 ) -> Result<Response<RegisterRemoteAgentResponse>, GenericError> {
160 let mut client = self.secure_client.clone();
161 let response = client
162 .register_remote_agent(RegisterRemoteAgentRequest {
163 pid: pid.to_string(),
164 flavor: flavor.to_string(),
165 display_name: display_name.to_string(),
166 api_endpoint_uri: api_endpoint.to_string(),
167 services,
168 })
169 .await?;
170 Ok(response)
171 }
172
173 pub async fn refresh_remote_agent_request(&mut self, session_id: &SessionId) -> Result<Response<()>, GenericError> {
179 let mut client = self.secure_client.clone();
180 let response = client
181 .refresh_remote_agent(RefreshRemoteAgentRequest {
182 session_id: session_id.to_string(),
183 })
184 .await?
185 .map(|_| ());
186 Ok(response)
187 }
188
189 pub async fn get_host_tags(&self) -> Result<Response<HostTagReply>, GenericError> {
195 let mut client = self.secure_client.clone();
196 let response = client.get_host_tags(HostTagRequest {}).await?;
197 Ok(response)
198 }
199
200 pub fn get_autodiscovery_stream(&mut self) -> StreamingResponse<AutodiscoveryStreamResponse> {
205 let mut client = self.secure_client.clone();
206 StreamingResponse::from_response_future(async move { client.autodiscovery_stream_config(()).await })
207 }
208
209 pub fn stream_config_events(&mut self, session_id: &SessionId) -> StreamingResponse<ConfigEvent> {
214 let mut client = self.secure_client.clone();
215 let app_details = saluki_metadata::get_app_details();
216 let formatted_full_name = app_details
217 .full_name()
218 .replace(" ", "-")
219 .replace("_", "-")
220 .to_lowercase();
221
222 let mut request = Request::new(ConfigStreamRequest {
223 name: formatted_full_name,
224 });
225
226 request
227 .metadata_mut()
228 .insert("session_id", session_id.to_grpc_header_value());
229
230 StreamingResponse::from_response_future(async move { client.stream_config_events(request).await })
231 }
232}
233
234async fn try_query_agent_api(
235 client: &mut AgentSecureClient<InterceptedService<Channel, BearerAuthInterceptor>>,
236) -> Result<(), GenericError> {
237 let noop_fetch_request = FetchEntityRequest {
238 id: Some(EntityId {
239 prefix: "container_id".to_string(),
240 uid: "nonexistent".to_string(),
241 }),
242 cardinality: TagCardinality::High.into(),
243 };
244 match client.tagger_fetch_entity(noop_fetch_request).await {
245 Ok(_) => Ok(()),
246 Err(e) => match e.code() {
247 Code::Unauthenticated => Err(generic_error!(
248 "Failed to authenticate to Datadog Agent API. Check that the configured authentication token is correct."
249 )),
250 _ => Err(e.into()),
251 },
252 }
253}