saluki_components/encoders/datadog/stats/
mod.rs1use std::time::Duration;
4
5use async_trait::async_trait;
6use datadog_protos::traces::{
7 ClientGroupedStats as ProtoClientGroupedStats, ClientStatsBucket as ProtoClientStatsBucket,
8 ClientStatsPayload as ProtoClientStatsPayload, StatsPayload as ProtoStatsPayload, Trilean,
9};
10use facet::Facet;
11use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use saluki_common::task::HandleExt as _;
14use saluki_config::GenericConfiguration;
15use saluki_core::{
16 components::{encoders::*, ComponentContext},
17 data_model::{
18 event::{
19 trace_stats::{ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, TraceStats},
20 EventType,
21 },
22 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
23 },
24 observability::ComponentMetricsExt as _,
25 topology::{EventsBuffer, PayloadsBuffer},
26};
27use saluki_env::{host::providers::BoxedHostProvider, EnvironmentProvider, HostProvider};
28use saluki_error::{generic_error, ErrorContext as _, GenericError};
29use saluki_io::compression::CompressionScheme;
30use saluki_metrics::MetricsBuilder;
31use serde::Deserialize;
32use stringtheory::MetaString;
33use tokio::{
34 select,
35 sync::mpsc::{self, Receiver, Sender},
36 time::sleep,
37};
38use tracing::{debug, error};
39
40use crate::common::datadog::{
41 io::RB_BUFFER_CHUNK_SIZE,
42 request_builder::{EndpointEncoder, RequestBuilder},
43 telemetry::ComponentTelemetry,
44 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
45};
46
47const MAX_STATS_PER_PAYLOAD: usize = 4000;
48static CONTENT_TYPE_MSGPACK: HeaderValue = HeaderValue::from_static("application/msgpack");
49
50const fn default_flush_timeout_secs() -> u64 {
51 2
52}
53
54fn default_env() -> String {
55 "none".to_string()
56}
57
58#[derive(Deserialize, Facet)]
60#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
61pub struct DatadogApmStatsEncoderConfiguration {
62 #[serde(default = "default_flush_timeout_secs")]
70 flush_timeout_secs: u64,
71
72 #[serde(skip)]
73 agent_hostname: Option<String>,
74
75 #[serde(skip)]
76 agent_version: String,
77
78 #[serde(default = "default_env")]
79 env: String,
80}
81
82impl DatadogApmStatsEncoderConfiguration {
83 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
85 let mut stats_config: Self = config.as_typed()?;
86 let app_details = saluki_metadata::get_app_details();
87 stats_config.agent_version = format!("agent-data-plane/{}", app_details.version().raw());
88
89 Ok(stats_config)
90 }
91
92 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
94 where
95 E: EnvironmentProvider<Host = BoxedHostProvider>,
96 {
97 let host_provider = environment_provider.host();
98 let hostname = host_provider.get_hostname().await?;
99 self.agent_hostname = Some(hostname);
100 Ok(self)
101 }
102}
103
104#[async_trait]
105impl EncoderBuilder for DatadogApmStatsEncoderConfiguration {
106 fn input_event_type(&self) -> EventType {
107 EventType::TraceStats
108 }
109
110 fn output_payload_type(&self) -> PayloadType {
111 PayloadType::Http
112 }
113
114 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
115 let metrics_builder = MetricsBuilder::from_component_context(&context);
116 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
117 let compression_scheme = CompressionScheme::gzip_default();
118
119 let agent_hostname = MetaString::from(self.agent_hostname.clone().unwrap_or_default());
120 let agent_version = MetaString::from(self.agent_version.clone());
121 let agent_env = MetaString::from(self.env.clone());
122
123 let mut stats_rb = RequestBuilder::new(
124 StatsEndpointEncoder::new(agent_hostname, agent_version, agent_env),
125 compression_scheme,
126 RB_BUFFER_CHUNK_SIZE,
127 )
128 .await?;
129 stats_rb.with_max_inputs_per_payload(MAX_STATS_PER_PAYLOAD);
130
131 let flush_timeout = match self.flush_timeout_secs {
132 0 => Duration::from_millis(10),
135 secs => Duration::from_secs(secs),
136 };
137
138 Ok(Box::new(DatadogStats {
139 stats_rb,
140 telemetry,
141 flush_timeout,
142 }))
143 }
144}
145
146impl MemoryBounds for DatadogApmStatsEncoderConfiguration {
147 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
148 builder
149 .minimum()
150 .with_single_value::<DatadogStats>("component struct")
151 .with_array::<EventsBuffer>("request builder events channel", 8)
152 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
153
154 builder
155 .firm()
156 .with_array::<TraceStats>("stats split re-encode buffer", MAX_STATS_PER_PAYLOAD);
157 }
158}
159
160pub struct DatadogStats {
161 stats_rb: RequestBuilder<StatsEndpointEncoder>,
162 telemetry: ComponentTelemetry,
163 flush_timeout: Duration,
164}
165
166#[async_trait]
167impl Encoder for DatadogStats {
168 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
169 let Self {
170 stats_rb,
171 telemetry,
172 flush_timeout,
173 } = *self;
174
175 let mut health = context.take_health_handle();
176
177 let (events_tx, events_rx) = mpsc::channel(8);
178 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
179
180 let request_builder_fut = run_request_builder(stats_rb, telemetry, events_rx, payloads_tx, flush_timeout);
181 let request_builder_handle = context
182 .topology_context()
183 .global_thread_pool()
184 .spawn_traced_named("dd-stats-request-builder", request_builder_fut);
185
186 health.mark_ready();
187 debug!("Datadog APM Stats encoder started.");
188
189 loop {
190 select! {
191 biased;
192
193 _ = health.live() => continue,
194 maybe_payload = payloads_rx.recv() => match maybe_payload {
195 Some(payload) => {
196 if let Err(e) = context.dispatcher().dispatch(payload).await {
197 error!("Failed to dispatch payload: {}", e);
198 }
199 }
200 None => break,
201 },
202 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
203 Some(event_buffer) => events_tx.send(event_buffer).await
204 .error_context("Failed to send event buffer to request builder task.")?,
205 None => break,
206 },
207 }
208 }
209
210 drop(events_tx);
211
212 while let Some(payload) = payloads_rx.recv().await {
214 if let Err(e) = context.dispatcher().dispatch(payload).await {
215 error!("Failed to dispatch payload: {}", e);
216 }
217 }
218
219 match request_builder_handle.await {
221 Ok(Ok(())) => debug!("Request builder task stopped."),
222 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
223 Err(e) => error!(error = %e, "Request builder task panicked."),
224 }
225
226 debug!("Datadog APM Stats encoder stopped.");
227
228 Ok(())
229 }
230}
231
232async fn run_request_builder(
233 mut stats_request_builder: RequestBuilder<StatsEndpointEncoder>, telemetry: ComponentTelemetry,
234 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
235) -> Result<(), GenericError> {
236 let mut pending_flush = false;
237 let pending_flush_timeout = sleep(flush_timeout);
238 tokio::pin!(pending_flush_timeout);
239
240 loop {
241 select! {
242 Some(event_buffer) = events_rx.recv() => {
243 for event in event_buffer {
244 let trace_stats = match event.try_into_trace_stats() {
245 Some(stats) => stats,
246 None => continue,
247 };
248
249 let stats_to_retry = match stats_request_builder.encode(trace_stats).await {
252 Ok(None) => continue,
253 Ok(Some(stats)) => stats,
254 Err(e) => {
255 error!(error = %e, "Failed to encode stats.");
256 telemetry.events_dropped_encoder().increment(1);
257 continue;
258 }
259 };
260
261 let maybe_requests = stats_request_builder.flush().await;
262 if maybe_requests.is_empty() {
263 panic!("builder told us to flush, but gave us nothing");
264 }
265
266 for maybe_request in maybe_requests {
267 match maybe_request {
268 Ok((events, request)) => {
269 let payload_meta = PayloadMetadata::from_event_count(events);
270 let http_payload = HttpPayload::new(payload_meta, request);
271 let payload = Payload::Http(http_payload);
272
273 payloads_tx.send(payload).await
274 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
275 },
276 Err(e) => if e.is_recoverable() {
277 continue;
278 } else {
279 return Err(GenericError::from(e).context("Failed to flush request."));
280 }
281 }
282 }
283
284 if let Err(e) = stats_request_builder.encode(stats_to_retry).await {
285 error!(error = %e, "Failed to encode stats.");
286 telemetry.events_dropped_encoder().increment(1);
287 }
288 }
289
290 debug!("Processed event buffer.");
291
292 if !pending_flush {
294 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
295 pending_flush = true;
296 }
297 },
298 _ = &mut pending_flush_timeout, if pending_flush => {
299 debug!("Flushing pending request(s).");
300
301 pending_flush = false;
302
303 let maybe_stats_requests = stats_request_builder.flush().await;
304 for maybe_request in maybe_stats_requests {
305 match maybe_request {
306 Ok((events, request)) => {
307 let payload_meta = PayloadMetadata::from_event_count(events);
308 let http_payload = HttpPayload::new(payload_meta, request);
309 let payload = Payload::Http(http_payload);
310
311 payloads_tx.send(payload).await
312 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
313 },
314 Err(e) => if e.is_recoverable() {
315 continue;
316 } else {
317 return Err(GenericError::from(e).context("Failed to flush request."));
318 }
319 }
320 }
321
322 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
323 },
324
325 else => break,
326 }
327 }
328
329 Ok(())
330}
331
332#[derive(Debug)]
333struct StatsEndpointEncoder {
334 agent_hostname: MetaString,
335 agent_version: MetaString,
336 agent_env: MetaString,
337}
338
339impl StatsEndpointEncoder {
340 fn new(agent_hostname: MetaString, agent_version: MetaString, agent_env: MetaString) -> Self {
341 Self {
342 agent_hostname,
343 agent_version,
344 agent_env,
345 }
346 }
347
348 fn to_proto_stats_payload(&self, stats: &TraceStats) -> ProtoStatsPayload {
349 let mut payload = ProtoStatsPayload::new();
350 payload.set_agentHostname(self.agent_hostname.to_string());
351 payload.set_agentEnv(self.agent_env.to_string());
352 payload.set_agentVersion(self.agent_version.to_string());
353 payload.set_clientComputed(false);
354 payload.set_splitPayload(false);
355 payload.set_stats(stats.stats().iter().map(convert_client_stats_payload).collect());
356
357 payload
358 }
359}
360
361fn convert_client_stats_payload(client_payload: &ClientStatsPayload) -> ProtoClientStatsPayload {
362 let mut proto_client = ProtoClientStatsPayload::new();
363 proto_client.set_hostname(client_payload.hostname().to_string());
364 proto_client.set_env(client_payload.env().to_string());
365 proto_client.set_version(client_payload.version().to_string());
366 proto_client.set_lang(client_payload.lang().to_string());
367 proto_client.set_tracerVersion(client_payload.tracer_version().to_string());
368 proto_client.set_runtimeID(client_payload.runtime_id().to_string());
369 proto_client.set_sequence(client_payload.sequence());
370 proto_client.set_agentAggregation(client_payload.agent_aggregation().to_string());
371 proto_client.set_service(client_payload.service().to_string());
372 proto_client.set_containerID(client_payload.container_id().to_string());
373 proto_client.set_tags(client_payload.tags().into_iter().map(|s| s.to_string()).collect());
374 proto_client.set_git_commit_sha(client_payload.git_commit_sha().to_string());
375 proto_client.set_image_tag(client_payload.image_tag().to_string());
376 proto_client.set_process_tags_hash(client_payload.process_tags_hash());
377 proto_client.set_process_tags(client_payload.process_tags().to_string());
378 proto_client.set_stats(client_payload.stats().iter().map(convert_client_stats_bucket).collect());
379 proto_client
380}
381
382fn convert_client_stats_bucket(bucket: &ClientStatsBucket) -> ProtoClientStatsBucket {
383 let mut proto_bucket = ProtoClientStatsBucket::new();
384 proto_bucket.set_start(bucket.start());
385 proto_bucket.set_duration(bucket.duration());
386 proto_bucket.set_agentTimeShift(bucket.agent_time_shift());
387 proto_bucket.set_stats(bucket.stats().iter().map(convert_client_grouped_stats).collect());
388 proto_bucket
389}
390
391fn convert_client_grouped_stats(grouped: &ClientGroupedStats) -> ProtoClientGroupedStats {
392 let mut proto_grouped = ProtoClientGroupedStats::new();
393 proto_grouped.set_service(grouped.service().to_string());
394 proto_grouped.set_name(grouped.name().to_string());
395 proto_grouped.set_resource(grouped.resource().to_string());
396 proto_grouped.set_HTTP_status_code(grouped.http_status_code());
397 proto_grouped.set_type(grouped.span_type().to_string());
398 proto_grouped.set_DB_type(grouped.db_type().to_string());
399 proto_grouped.set_hits(grouped.hits());
400 proto_grouped.set_errors(grouped.errors());
401 proto_grouped.set_duration(grouped.duration());
402 proto_grouped.set_okSummary(grouped.ok_summary().to_vec());
403 proto_grouped.set_errorSummary(grouped.error_summary().to_vec());
404 proto_grouped.set_synthetics(grouped.synthetics());
405 proto_grouped.set_topLevelHits(grouped.top_level_hits());
406 proto_grouped.set_span_kind(grouped.span_kind().to_string());
407 proto_grouped.set_peer_tags(grouped.peer_tags().iter().map(|s| s.to_string()).collect());
408 proto_grouped.set_is_trace_root(match grouped.is_trace_root() {
409 None => Trilean::NOT_SET,
410 Some(true) => Trilean::TRUE,
411 Some(false) => Trilean::FALSE,
412 });
413 proto_grouped.set_GRPC_status_code(grouped.grpc_status_code().to_string());
414 proto_grouped.set_HTTP_method(grouped.http_method().to_string());
415 proto_grouped.set_HTTP_endpoint(grouped.http_endpoint().to_string());
416 proto_grouped
417}
418
419#[derive(Debug)]
421pub struct StatsEncodeError(rmp_serde::encode::Error);
422
423impl std::fmt::Display for StatsEncodeError {
424 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425 write!(f, "failed to encode stats as MessagePack: {}", self.0)
426 }
427}
428
429impl std::error::Error for StatsEncodeError {
430 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
431 Some(&self.0)
432 }
433}
434
435impl EndpointEncoder for StatsEndpointEncoder {
436 type Input = TraceStats;
437 type EncodeError = StatsEncodeError;
438
439 fn encoder_name() -> &'static str {
440 "stats"
441 }
442
443 fn compressed_size_limit(&self) -> usize {
444 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
445 }
446
447 fn uncompressed_size_limit(&self) -> usize {
448 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
449 }
450
451 fn encode(&mut self, stats: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
452 let payload = self.to_proto_stats_payload(stats);
453 rmp_serde::encode::write_named(buffer, &payload).map_err(StatsEncodeError)?;
454 Ok(())
455 }
456
457 fn endpoint_uri(&self) -> Uri {
458 PathAndQuery::from_static("/api/v0.2/stats").into()
459 }
460
461 fn endpoint_method(&self) -> Method {
462 Method::POST
463 }
464
465 fn content_type(&self) -> HeaderValue {
466 CONTENT_TYPE_MSGPACK.clone()
467 }
468}
469
470#[cfg(test)]
471mod config_smoke {
472 use serde_json::json;
473
474 use super::DatadogApmStatsEncoderConfiguration;
475 use crate::config_registry::structs;
476 use crate::config_registry::test_support::run_config_smoke_tests;
477
478 #[tokio::test]
479 async fn smoke_test() {
480 run_config_smoke_tests(
481 structs::DATADOG_APM_STATS_ENCODER_CONFIGURATION,
482 &[],
483 json!({}),
484 |cfg| {
485 cfg.as_typed::<DatadogApmStatsEncoderConfiguration>()
486 .expect("DatadogApmStatsEncoderConfiguration should deserialize")
487 },
488 )
489 .await
490 }
491}