Skip to main content

saluki_components/encoders/datadog/stats/
mod.rs

1//! Datadog APM Stats encoder.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use datadog_protos::traces::{
7    ClientGroupedStats as ProtoClientGroupedStats, ClientStatsBucket as ProtoClientStatsBucket,
8    ClientStatsPayload as ProtoClientStatsPayload, StatsPayload as ProtoStatsPayload, Trilean,
9};
10use facet::Facet;
11use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use saluki_common::task::HandleExt as _;
14use saluki_config::GenericConfiguration;
15use saluki_core::{
16    components::{encoders::*, ComponentContext},
17    data_model::{
18        event::{
19            trace_stats::{ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, TraceStats},
20            EventType,
21        },
22        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
23    },
24    observability::ComponentMetricsExt as _,
25    topology::{EventsBuffer, PayloadsBuffer},
26};
27use saluki_env::{host::providers::BoxedHostProvider, EnvironmentProvider, HostProvider};
28use saluki_error::{generic_error, ErrorContext as _, GenericError};
29use saluki_io::compression::CompressionScheme;
30use saluki_metrics::MetricsBuilder;
31use serde::Deserialize;
32use stringtheory::MetaString;
33use tokio::{
34    select,
35    sync::mpsc::{self, Receiver, Sender},
36    time::sleep,
37};
38use tracing::{debug, error};
39
40use crate::common::datadog::{
41    io::RB_BUFFER_CHUNK_SIZE,
42    request_builder::{EndpointEncoder, RequestBuilder},
43    telemetry::ComponentTelemetry,
44    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
45};
46
47const MAX_STATS_PER_PAYLOAD: usize = 4000;
48static CONTENT_TYPE_MSGPACK: HeaderValue = HeaderValue::from_static("application/msgpack");
49
50const fn default_flush_timeout_secs() -> u64 {
51    2
52}
53
54fn default_env() -> String {
55    "none".to_string()
56}
57
58/// Configuration for the Datadog APM Stats encoder.
59#[derive(Deserialize, Facet)]
60#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
61pub struct DatadogApmStatsEncoderConfiguration {
62    /// Flush timeout for pending requests, in seconds.
63    ///
64    /// When the encoder has written traces to the in-flight request payload, but it has not yet reached the
65    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
66    /// before flushing the in-flight request payload.
67    ///
68    /// Defaults to 2 seconds.
69    #[serde(default = "default_flush_timeout_secs")]
70    flush_timeout_secs: u64,
71
72    #[serde(skip)]
73    agent_hostname: Option<String>,
74
75    #[serde(skip)]
76    agent_version: String,
77
78    #[serde(default = "default_env")]
79    env: String,
80}
81
82impl DatadogApmStatsEncoderConfiguration {
83    /// Creates a new `DatadogApmStatsEncoderConfiguration` from the given configuration.
84    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
85        let mut stats_config: Self = config.as_typed()?;
86        let app_details = saluki_metadata::get_app_details();
87        stats_config.agent_version = format!("agent-data-plane/{}", app_details.version().raw());
88
89        Ok(stats_config)
90    }
91
92    /// Sets the agent hostname using the environment provider.
93    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
94    where
95        E: EnvironmentProvider<Host = BoxedHostProvider>,
96    {
97        let host_provider = environment_provider.host();
98        let hostname = host_provider.get_hostname().await?;
99        self.agent_hostname = Some(hostname);
100        Ok(self)
101    }
102}
103
104#[async_trait]
105impl EncoderBuilder for DatadogApmStatsEncoderConfiguration {
106    fn input_event_type(&self) -> EventType {
107        EventType::TraceStats
108    }
109
110    fn output_payload_type(&self) -> PayloadType {
111        PayloadType::Http
112    }
113
114    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
115        let metrics_builder = MetricsBuilder::from_component_context(&context);
116        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
117        let compression_scheme = CompressionScheme::gzip_default();
118
119        let agent_hostname = MetaString::from(self.agent_hostname.clone().unwrap_or_default());
120        let agent_version = MetaString::from(self.agent_version.clone());
121        let agent_env = MetaString::from(self.env.clone());
122
123        let mut stats_rb = RequestBuilder::new(
124            StatsEndpointEncoder::new(agent_hostname, agent_version, agent_env),
125            compression_scheme,
126            RB_BUFFER_CHUNK_SIZE,
127        )
128        .await?;
129        stats_rb.with_max_inputs_per_payload(MAX_STATS_PER_PAYLOAD);
130
131        let flush_timeout = match self.flush_timeout_secs {
132            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
133            // batching, while still practically flushing things almost immediately.
134            0 => Duration::from_millis(10),
135            secs => Duration::from_secs(secs),
136        };
137
138        Ok(Box::new(DatadogStats {
139            stats_rb,
140            telemetry,
141            flush_timeout,
142        }))
143    }
144}
145
146impl MemoryBounds for DatadogApmStatsEncoderConfiguration {
147    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
148        builder
149            .minimum()
150            .with_single_value::<DatadogStats>("component struct")
151            .with_array::<EventsBuffer>("request builder events channel", 8)
152            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
153
154        builder
155            .firm()
156            .with_array::<TraceStats>("stats split re-encode buffer", MAX_STATS_PER_PAYLOAD);
157    }
158}
159
160pub struct DatadogStats {
161    stats_rb: RequestBuilder<StatsEndpointEncoder>,
162    telemetry: ComponentTelemetry,
163    flush_timeout: Duration,
164}
165
166#[async_trait]
167impl Encoder for DatadogStats {
168    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
169        let Self {
170            stats_rb,
171            telemetry,
172            flush_timeout,
173        } = *self;
174
175        let mut health = context.take_health_handle();
176
177        let (events_tx, events_rx) = mpsc::channel(8);
178        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
179
180        let request_builder_fut = run_request_builder(stats_rb, telemetry, events_rx, payloads_tx, flush_timeout);
181        let request_builder_handle = context
182            .topology_context()
183            .global_thread_pool()
184            .spawn_traced_named("dd-stats-request-builder", request_builder_fut);
185
186        health.mark_ready();
187        debug!("Datadog APM Stats encoder started.");
188
189        loop {
190            select! {
191                biased;
192
193                _ = health.live() => continue,
194                maybe_payload = payloads_rx.recv() => match maybe_payload {
195                    Some(payload) => {
196                        if let Err(e) = context.dispatcher().dispatch(payload).await {
197                            error!("Failed to dispatch payload: {}", e);
198                        }
199                    }
200                    None => break,
201                },
202                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
203                    Some(event_buffer) => events_tx.send(event_buffer).await
204                        .error_context("Failed to send event buffer to request builder task.")?,
205                    None => break,
206                },
207            }
208        }
209
210        drop(events_tx);
211
212        // Continue draining the payloads receiver until it is closed.
213        while let Some(payload) = payloads_rx.recv().await {
214            if let Err(e) = context.dispatcher().dispatch(payload).await {
215                error!("Failed to dispatch payload: {}", e);
216            }
217        }
218
219        // Request build task should now be stopped.
220        match request_builder_handle.await {
221            Ok(Ok(())) => debug!("Request builder task stopped."),
222            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
223            Err(e) => error!(error = %e, "Request builder task panicked."),
224        }
225
226        debug!("Datadog APM Stats encoder stopped.");
227
228        Ok(())
229    }
230}
231
232async fn run_request_builder(
233    mut stats_request_builder: RequestBuilder<StatsEndpointEncoder>, telemetry: ComponentTelemetry,
234    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
235) -> Result<(), GenericError> {
236    let mut pending_flush = false;
237    let pending_flush_timeout = sleep(flush_timeout);
238    tokio::pin!(pending_flush_timeout);
239
240    loop {
241        select! {
242            Some(event_buffer) = events_rx.recv() => {
243                for event in event_buffer {
244                    let trace_stats = match event.try_into_trace_stats() {
245                        Some(stats) => stats,
246                        None => continue,
247                    };
248
249                    // Encode the stats. If we get it back, that means the current request is full, and we need to
250                    // flush it before we can try to encode the stats again.
251                    let stats_to_retry = match stats_request_builder.encode(trace_stats).await {
252                        Ok(None) => continue,
253                        Ok(Some(stats)) => stats,
254                        Err(e) => {
255                            error!(error = %e, "Failed to encode stats.");
256                            telemetry.events_dropped_encoder().increment(1);
257                            continue;
258                        }
259                    };
260
261                    let maybe_requests = stats_request_builder.flush().await;
262                    if maybe_requests.is_empty() {
263                        panic!("builder told us to flush, but gave us nothing");
264                    }
265
266                    for maybe_request in maybe_requests {
267                        match maybe_request {
268                            Ok((events, request)) => {
269                                let payload_meta = PayloadMetadata::from_event_count(events);
270                                let http_payload = HttpPayload::new(payload_meta, request);
271                                let payload = Payload::Http(http_payload);
272
273                                payloads_tx.send(payload).await
274                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
275                            },
276                            Err(e) => if e.is_recoverable() {
277                                continue;
278                            } else {
279                                return Err(GenericError::from(e).context("Failed to flush request."));
280                            }
281                        }
282                    }
283
284                    if let Err(e) = stats_request_builder.encode(stats_to_retry).await {
285                        error!(error = %e, "Failed to encode stats.");
286                        telemetry.events_dropped_encoder().increment(1);
287                    }
288                }
289
290                debug!("Processed event buffer.");
291
292                // If we're not already pending a flush, we'll start the countdown.
293                if !pending_flush {
294                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
295                    pending_flush = true;
296                }
297            },
298            _ = &mut pending_flush_timeout, if pending_flush => {
299                debug!("Flushing pending request(s).");
300
301                pending_flush = false;
302
303                let maybe_stats_requests = stats_request_builder.flush().await;
304                for maybe_request in maybe_stats_requests {
305                    match maybe_request {
306                        Ok((events, request)) => {
307                            let payload_meta = PayloadMetadata::from_event_count(events);
308                            let http_payload = HttpPayload::new(payload_meta, request);
309                            let payload = Payload::Http(http_payload);
310
311                            payloads_tx.send(payload).await
312                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
313                        },
314                        Err(e) => if e.is_recoverable() {
315                            continue;
316                        } else {
317                            return Err(GenericError::from(e).context("Failed to flush request."));
318                        }
319                    }
320                }
321
322                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
323            },
324
325            else => break,
326        }
327    }
328
329    Ok(())
330}
331
332#[derive(Debug)]
333struct StatsEndpointEncoder {
334    agent_hostname: MetaString,
335    agent_version: MetaString,
336    agent_env: MetaString,
337}
338
339impl StatsEndpointEncoder {
340    fn new(agent_hostname: MetaString, agent_version: MetaString, agent_env: MetaString) -> Self {
341        Self {
342            agent_hostname,
343            agent_version,
344            agent_env,
345        }
346    }
347
348    fn to_proto_stats_payload(&self, stats: &TraceStats) -> ProtoStatsPayload {
349        let mut payload = ProtoStatsPayload::new();
350        payload.set_agentHostname(self.agent_hostname.to_string());
351        payload.set_agentEnv(self.agent_env.to_string());
352        payload.set_agentVersion(self.agent_version.to_string());
353        payload.set_clientComputed(false);
354        payload.set_splitPayload(false);
355        payload.set_stats(stats.stats().iter().map(convert_client_stats_payload).collect());
356
357        payload
358    }
359}
360
361fn convert_client_stats_payload(client_payload: &ClientStatsPayload) -> ProtoClientStatsPayload {
362    let mut proto_client = ProtoClientStatsPayload::new();
363    proto_client.set_hostname(client_payload.hostname().to_string());
364    proto_client.set_env(client_payload.env().to_string());
365    proto_client.set_version(client_payload.version().to_string());
366    proto_client.set_lang(client_payload.lang().to_string());
367    proto_client.set_tracerVersion(client_payload.tracer_version().to_string());
368    proto_client.set_runtimeID(client_payload.runtime_id().to_string());
369    proto_client.set_sequence(client_payload.sequence());
370    proto_client.set_agentAggregation(client_payload.agent_aggregation().to_string());
371    proto_client.set_service(client_payload.service().to_string());
372    proto_client.set_containerID(client_payload.container_id().to_string());
373    proto_client.set_tags(client_payload.tags().into_iter().map(|s| s.to_string()).collect());
374    proto_client.set_git_commit_sha(client_payload.git_commit_sha().to_string());
375    proto_client.set_image_tag(client_payload.image_tag().to_string());
376    proto_client.set_process_tags_hash(client_payload.process_tags_hash());
377    proto_client.set_process_tags(client_payload.process_tags().to_string());
378    proto_client.set_stats(client_payload.stats().iter().map(convert_client_stats_bucket).collect());
379    proto_client
380}
381
382fn convert_client_stats_bucket(bucket: &ClientStatsBucket) -> ProtoClientStatsBucket {
383    let mut proto_bucket = ProtoClientStatsBucket::new();
384    proto_bucket.set_start(bucket.start());
385    proto_bucket.set_duration(bucket.duration());
386    proto_bucket.set_agentTimeShift(bucket.agent_time_shift());
387    proto_bucket.set_stats(bucket.stats().iter().map(convert_client_grouped_stats).collect());
388    proto_bucket
389}
390
391fn convert_client_grouped_stats(grouped: &ClientGroupedStats) -> ProtoClientGroupedStats {
392    let mut proto_grouped = ProtoClientGroupedStats::new();
393    proto_grouped.set_service(grouped.service().to_string());
394    proto_grouped.set_name(grouped.name().to_string());
395    proto_grouped.set_resource(grouped.resource().to_string());
396    proto_grouped.set_HTTP_status_code(grouped.http_status_code());
397    proto_grouped.set_type(grouped.span_type().to_string());
398    proto_grouped.set_DB_type(grouped.db_type().to_string());
399    proto_grouped.set_hits(grouped.hits());
400    proto_grouped.set_errors(grouped.errors());
401    proto_grouped.set_duration(grouped.duration());
402    proto_grouped.set_okSummary(grouped.ok_summary().to_vec());
403    proto_grouped.set_errorSummary(grouped.error_summary().to_vec());
404    proto_grouped.set_synthetics(grouped.synthetics());
405    proto_grouped.set_topLevelHits(grouped.top_level_hits());
406    proto_grouped.set_span_kind(grouped.span_kind().to_string());
407    proto_grouped.set_peer_tags(grouped.peer_tags().iter().map(|s| s.to_string()).collect());
408    proto_grouped.set_is_trace_root(match grouped.is_trace_root() {
409        None => Trilean::NOT_SET,
410        Some(true) => Trilean::TRUE,
411        Some(false) => Trilean::FALSE,
412    });
413    proto_grouped.set_GRPC_status_code(grouped.grpc_status_code().to_string());
414    proto_grouped.set_HTTP_method(grouped.http_method().to_string());
415    proto_grouped.set_HTTP_endpoint(grouped.http_endpoint().to_string());
416    proto_grouped
417}
418
419/// Error type for stats encoding.
420#[derive(Debug)]
421pub struct StatsEncodeError(rmp_serde::encode::Error);
422
423impl std::fmt::Display for StatsEncodeError {
424    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425        write!(f, "failed to encode stats as MessagePack: {}", self.0)
426    }
427}
428
429impl std::error::Error for StatsEncodeError {
430    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
431        Some(&self.0)
432    }
433}
434
435impl EndpointEncoder for StatsEndpointEncoder {
436    type Input = TraceStats;
437    type EncodeError = StatsEncodeError;
438
439    fn encoder_name() -> &'static str {
440        "stats"
441    }
442
443    fn compressed_size_limit(&self) -> usize {
444        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
445    }
446
447    fn uncompressed_size_limit(&self) -> usize {
448        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
449    }
450
451    fn encode(&mut self, stats: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
452        let payload = self.to_proto_stats_payload(stats);
453        rmp_serde::encode::write_named(buffer, &payload).map_err(StatsEncodeError)?;
454        Ok(())
455    }
456
457    fn endpoint_uri(&self) -> Uri {
458        PathAndQuery::from_static("/api/v0.2/stats").into()
459    }
460
461    fn endpoint_method(&self) -> Method {
462        Method::POST
463    }
464
465    fn content_type(&self) -> HeaderValue {
466        CONTENT_TYPE_MSGPACK.clone()
467    }
468}
469
470#[cfg(test)]
471mod config_smoke {
472    use serde_json::json;
473
474    use super::DatadogApmStatsEncoderConfiguration;
475    use crate::config_registry::structs;
476    use crate::config_registry::test_support::run_config_smoke_tests;
477
478    #[tokio::test]
479    async fn smoke_test() {
480        run_config_smoke_tests(
481            structs::DATADOG_APM_STATS_ENCODER_CONFIGURATION,
482            &[],
483            json!({}),
484            |cfg| {
485                cfg.as_typed::<DatadogApmStatsEncoderConfiguration>()
486                    .expect("DatadogApmStatsEncoderConfiguration should deserialize")
487            },
488        )
489        .await
490    }
491}