saluki_components/encoders/datadog/stats/
mod.rs

1//! Datadog APM Stats encoder.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use datadog_protos::traces::{
7    ClientGroupedStats as ProtoClientGroupedStats, ClientStatsBucket as ProtoClientStatsBucket,
8    ClientStatsPayload as ProtoClientStatsPayload, StatsPayload as ProtoStatsPayload, Trilean,
9};
10use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use saluki_common::task::HandleExt as _;
13use saluki_config::GenericConfiguration;
14use saluki_core::{
15    components::{encoders::*, ComponentContext},
16    data_model::{
17        event::{
18            trace_stats::{ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, TraceStats},
19            EventType,
20        },
21        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
22    },
23    observability::ComponentMetricsExt as _,
24    topology::{EventsBuffer, PayloadsBuffer},
25};
26use saluki_env::{host::providers::BoxedHostProvider, EnvironmentProvider, HostProvider};
27use saluki_error::{generic_error, ErrorContext as _, GenericError};
28use saluki_io::compression::CompressionScheme;
29use saluki_metrics::MetricsBuilder;
30use serde::Deserialize;
31use stringtheory::MetaString;
32use tokio::{
33    select,
34    sync::mpsc::{self, Receiver, Sender},
35    time::sleep,
36};
37use tracing::{debug, error};
38
39use crate::common::datadog::{
40    io::RB_BUFFER_CHUNK_SIZE,
41    request_builder::{EndpointEncoder, RequestBuilder},
42    telemetry::ComponentTelemetry,
43    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
44};
45
46const MAX_STATS_PER_PAYLOAD: usize = 4000;
47static CONTENT_TYPE_MSGPACK: HeaderValue = HeaderValue::from_static("application/msgpack");
48
49const fn default_flush_timeout_secs() -> u64 {
50    2
51}
52
53fn default_env() -> String {
54    "none".to_string()
55}
56
57/// Configuration for the Datadog APM Stats encoder.
58#[derive(Deserialize)]
59pub struct DatadogApmStatsEncoderConfiguration {
60    /// Flush timeout for pending requests, in seconds.
61    ///
62    /// When the encoder has written traces to the in-flight request payload, but it has not yet reached the
63    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
64    /// before flushing the in-flight request payload.
65    ///
66    /// Defaults to 2 seconds.
67    #[serde(default = "default_flush_timeout_secs")]
68    flush_timeout_secs: u64,
69
70    #[serde(skip)]
71    agent_hostname: Option<String>,
72
73    #[serde(skip)]
74    agent_version: String,
75
76    #[serde(default = "default_env")]
77    env: String,
78}
79
80impl DatadogApmStatsEncoderConfiguration {
81    /// Creates a new `DatadogApmStatsEncoderConfiguration` from the given configuration.
82    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
83        let mut stats_config: Self = config.as_typed()?;
84        let app_details = saluki_metadata::get_app_details();
85        stats_config.agent_version = format!("agent-data-plane/{}", app_details.version().raw());
86
87        Ok(stats_config)
88    }
89
90    /// Sets the agent hostname using the environment provider.
91    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
92    where
93        E: EnvironmentProvider<Host = BoxedHostProvider>,
94    {
95        let host_provider = environment_provider.host();
96        let hostname = host_provider.get_hostname().await?;
97        self.agent_hostname = Some(hostname);
98        Ok(self)
99    }
100}
101
102#[async_trait]
103impl EncoderBuilder for DatadogApmStatsEncoderConfiguration {
104    fn input_event_type(&self) -> EventType {
105        EventType::TraceStats
106    }
107
108    fn output_payload_type(&self) -> PayloadType {
109        PayloadType::Http
110    }
111
112    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
113        let metrics_builder = MetricsBuilder::from_component_context(&context);
114        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
115        let compression_scheme = CompressionScheme::gzip_default();
116
117        let agent_hostname = MetaString::from(self.agent_hostname.clone().unwrap_or_default());
118        let agent_version = MetaString::from(self.agent_version.clone());
119        let agent_env = MetaString::from(self.env.clone());
120
121        let mut stats_rb = RequestBuilder::new(
122            StatsEndpointEncoder::new(agent_hostname, agent_version, agent_env),
123            compression_scheme,
124            RB_BUFFER_CHUNK_SIZE,
125        )
126        .await?;
127        stats_rb.with_max_inputs_per_payload(MAX_STATS_PER_PAYLOAD);
128
129        let flush_timeout = match self.flush_timeout_secs {
130            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
131            // batching, while still practically flushing things almost immediately.
132            0 => Duration::from_millis(10),
133            secs => Duration::from_secs(secs),
134        };
135
136        Ok(Box::new(DatadogStats {
137            stats_rb,
138            telemetry,
139            flush_timeout,
140        }))
141    }
142}
143
144impl MemoryBounds for DatadogApmStatsEncoderConfiguration {
145    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
146        builder
147            .minimum()
148            .with_single_value::<DatadogStats>("component struct")
149            .with_array::<EventsBuffer>("request builder events channel", 8)
150            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
151
152        builder
153            .firm()
154            .with_array::<TraceStats>("stats split re-encode buffer", MAX_STATS_PER_PAYLOAD);
155    }
156}
157
158pub struct DatadogStats {
159    stats_rb: RequestBuilder<StatsEndpointEncoder>,
160    telemetry: ComponentTelemetry,
161    flush_timeout: Duration,
162}
163
164#[async_trait]
165impl Encoder for DatadogStats {
166    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
167        let Self {
168            stats_rb,
169            telemetry,
170            flush_timeout,
171        } = *self;
172
173        let mut health = context.take_health_handle();
174
175        let (events_tx, events_rx) = mpsc::channel(8);
176        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
177
178        let request_builder_fut = run_request_builder(stats_rb, telemetry, events_rx, payloads_tx, flush_timeout);
179        let request_builder_handle = context
180            .topology_context()
181            .global_thread_pool()
182            .spawn_traced_named("dd-stats-request-builder", request_builder_fut);
183
184        health.mark_ready();
185        debug!("Datadog APM Stats encoder started.");
186
187        loop {
188            select! {
189                biased;
190
191                _ = health.live() => continue,
192                maybe_payload = payloads_rx.recv() => match maybe_payload {
193                    Some(payload) => {
194                        if let Err(e) = context.dispatcher().dispatch(payload).await {
195                            error!("Failed to dispatch payload: {}", e);
196                        }
197                    }
198                    None => break,
199                },
200                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
201                    Some(event_buffer) => events_tx.send(event_buffer).await
202                        .error_context("Failed to send event buffer to request builder task.")?,
203                    None => break,
204                },
205            }
206        }
207
208        drop(events_tx);
209
210        // Continue draining the payloads receiver until it is closed.
211        while let Some(payload) = payloads_rx.recv().await {
212            if let Err(e) = context.dispatcher().dispatch(payload).await {
213                error!("Failed to dispatch payload: {}", e);
214            }
215        }
216
217        // Request build task should now be stopped.
218        match request_builder_handle.await {
219            Ok(Ok(())) => debug!("Request builder task stopped."),
220            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
221            Err(e) => error!(error = %e, "Request builder task panicked."),
222        }
223
224        debug!("Datadog APM Stats encoder stopped.");
225
226        Ok(())
227    }
228}
229
230async fn run_request_builder(
231    mut stats_request_builder: RequestBuilder<StatsEndpointEncoder>, telemetry: ComponentTelemetry,
232    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
233) -> Result<(), GenericError> {
234    let mut pending_flush = false;
235    let pending_flush_timeout = sleep(flush_timeout);
236    tokio::pin!(pending_flush_timeout);
237
238    loop {
239        select! {
240            Some(event_buffer) = events_rx.recv() => {
241                for event in event_buffer {
242                    let trace_stats = match event.try_into_trace_stats() {
243                        Some(stats) => stats,
244                        None => continue,
245                    };
246
247                    // Encode the stats. If we get it back, that means the current request is full, and we need to
248                    // flush it before we can try to encode the stats again.
249                    let stats_to_retry = match stats_request_builder.encode(trace_stats).await {
250                        Ok(None) => continue,
251                        Ok(Some(stats)) => stats,
252                        Err(e) => {
253                            error!(error = %e, "Failed to encode stats.");
254                            telemetry.events_dropped_encoder().increment(1);
255                            continue;
256                        }
257                    };
258
259                    let maybe_requests = stats_request_builder.flush().await;
260                    if maybe_requests.is_empty() {
261                        panic!("builder told us to flush, but gave us nothing");
262                    }
263
264                    for maybe_request in maybe_requests {
265                        match maybe_request {
266                            Ok((events, request)) => {
267                                let payload_meta = PayloadMetadata::from_event_count(events);
268                                let http_payload = HttpPayload::new(payload_meta, request);
269                                let payload = Payload::Http(http_payload);
270
271                                payloads_tx.send(payload).await
272                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
273                            },
274                            Err(e) => if e.is_recoverable() {
275                                continue;
276                            } else {
277                                return Err(GenericError::from(e).context("Failed to flush request."));
278                            }
279                        }
280                    }
281
282                    if let Err(e) = stats_request_builder.encode(stats_to_retry).await {
283                        error!(error = %e, "Failed to encode stats.");
284                        telemetry.events_dropped_encoder().increment(1);
285                    }
286                }
287
288                debug!("Processed event buffer.");
289
290                // If we're not already pending a flush, we'll start the countdown.
291                if !pending_flush {
292                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
293                    pending_flush = true;
294                }
295            },
296            _ = &mut pending_flush_timeout, if pending_flush => {
297                debug!("Flushing pending request(s).");
298
299                pending_flush = false;
300
301                let maybe_stats_requests = stats_request_builder.flush().await;
302                for maybe_request in maybe_stats_requests {
303                    match maybe_request {
304                        Ok((events, request)) => {
305                            let payload_meta = PayloadMetadata::from_event_count(events);
306                            let http_payload = HttpPayload::new(payload_meta, request);
307                            let payload = Payload::Http(http_payload);
308
309                            payloads_tx.send(payload).await
310                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
311                        },
312                        Err(e) => if e.is_recoverable() {
313                            continue;
314                        } else {
315                            return Err(GenericError::from(e).context("Failed to flush request."));
316                        }
317                    }
318                }
319
320                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
321            },
322
323            else => break,
324        }
325    }
326
327    Ok(())
328}
329
330#[derive(Debug)]
331struct StatsEndpointEncoder {
332    agent_hostname: MetaString,
333    agent_version: MetaString,
334    agent_env: MetaString,
335}
336
337impl StatsEndpointEncoder {
338    fn new(agent_hostname: MetaString, agent_version: MetaString, agent_env: MetaString) -> Self {
339        Self {
340            agent_hostname,
341            agent_version,
342            agent_env,
343        }
344    }
345
346    fn to_proto_stats_payload(&self, stats: &TraceStats) -> ProtoStatsPayload {
347        let mut payload = ProtoStatsPayload::new();
348        payload.set_agentHostname(self.agent_hostname.to_string());
349        payload.set_agentEnv(self.agent_env.to_string());
350        payload.set_agentVersion(self.agent_version.to_string());
351        payload.set_clientComputed(false);
352        payload.set_splitPayload(false);
353        payload.set_stats(stats.stats().iter().map(convert_client_stats_payload).collect());
354
355        payload
356    }
357}
358
359fn convert_client_stats_payload(client_payload: &ClientStatsPayload) -> ProtoClientStatsPayload {
360    let mut proto_client = ProtoClientStatsPayload::new();
361    proto_client.set_hostname(client_payload.hostname().to_string());
362    proto_client.set_env(client_payload.env().to_string());
363    proto_client.set_version(client_payload.version().to_string());
364    proto_client.set_lang(client_payload.lang().to_string());
365    proto_client.set_tracerVersion(client_payload.tracer_version().to_string());
366    proto_client.set_runtimeID(client_payload.runtime_id().to_string());
367    proto_client.set_sequence(client_payload.sequence());
368    proto_client.set_agentAggregation(client_payload.agent_aggregation().to_string());
369    proto_client.set_service(client_payload.service().to_string());
370    proto_client.set_containerID(client_payload.container_id().to_string());
371    proto_client.set_tags(client_payload.tags().iter().map(|s| s.to_string()).collect());
372    proto_client.set_git_commit_sha(client_payload.git_commit_sha().to_string());
373    proto_client.set_image_tag(client_payload.image_tag().to_string());
374    proto_client.set_process_tags_hash(client_payload.process_tags_hash());
375    proto_client.set_process_tags(client_payload.process_tags().to_string());
376    proto_client.set_stats(client_payload.stats().iter().map(convert_client_stats_bucket).collect());
377    proto_client
378}
379
380fn convert_client_stats_bucket(bucket: &ClientStatsBucket) -> ProtoClientStatsBucket {
381    let mut proto_bucket = ProtoClientStatsBucket::new();
382    proto_bucket.set_start(bucket.start());
383    proto_bucket.set_duration(bucket.duration());
384    proto_bucket.set_agentTimeShift(bucket.agent_time_shift());
385    proto_bucket.set_stats(bucket.stats().iter().map(convert_client_grouped_stats).collect());
386    proto_bucket
387}
388
389fn convert_client_grouped_stats(grouped: &ClientGroupedStats) -> ProtoClientGroupedStats {
390    let mut proto_grouped = ProtoClientGroupedStats::new();
391    proto_grouped.set_service(grouped.service().to_string());
392    proto_grouped.set_name(grouped.name().to_string());
393    proto_grouped.set_resource(grouped.resource().to_string());
394    proto_grouped.set_HTTP_status_code(grouped.http_status_code());
395    proto_grouped.set_type(grouped.span_type().to_string());
396    proto_grouped.set_DB_type(grouped.db_type().to_string());
397    proto_grouped.set_hits(grouped.hits());
398    proto_grouped.set_errors(grouped.errors());
399    proto_grouped.set_duration(grouped.duration());
400    proto_grouped.set_okSummary(grouped.ok_summary().to_vec());
401    proto_grouped.set_errorSummary(grouped.error_summary().to_vec());
402    proto_grouped.set_synthetics(grouped.synthetics());
403    proto_grouped.set_topLevelHits(grouped.top_level_hits());
404    proto_grouped.set_span_kind(grouped.span_kind().to_string());
405    proto_grouped.set_peer_tags(grouped.peer_tags().iter().map(|s| s.to_string()).collect());
406    proto_grouped.set_is_trace_root(match grouped.is_trace_root() {
407        None => Trilean::NOT_SET,
408        Some(true) => Trilean::TRUE,
409        Some(false) => Trilean::FALSE,
410    });
411    proto_grouped.set_GRPC_status_code(grouped.grpc_status_code().to_string());
412    proto_grouped.set_HTTP_method(grouped.http_method().to_string());
413    proto_grouped.set_HTTP_endpoint(grouped.http_endpoint().to_string());
414    proto_grouped
415}
416
417/// Error type for stats encoding.
418#[derive(Debug)]
419pub struct StatsEncodeError(rmp_serde::encode::Error);
420
421impl std::fmt::Display for StatsEncodeError {
422    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
423        write!(f, "failed to encode stats as MessagePack: {}", self.0)
424    }
425}
426
427impl std::error::Error for StatsEncodeError {
428    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
429        Some(&self.0)
430    }
431}
432
433impl EndpointEncoder for StatsEndpointEncoder {
434    type Input = TraceStats;
435    type EncodeError = StatsEncodeError;
436
437    fn encoder_name() -> &'static str {
438        "stats"
439    }
440
441    fn compressed_size_limit(&self) -> usize {
442        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
443    }
444
445    fn uncompressed_size_limit(&self) -> usize {
446        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
447    }
448
449    fn encode(&mut self, stats: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
450        let payload = self.to_proto_stats_payload(stats);
451        rmp_serde::encode::write_named(buffer, &payload).map_err(StatsEncodeError)?;
452        Ok(())
453    }
454
455    fn endpoint_uri(&self) -> Uri {
456        PathAndQuery::from_static("/api/v0.2/stats").into()
457    }
458
459    fn endpoint_method(&self) -> Method {
460        Method::POST
461    }
462
463    fn content_type(&self) -> HeaderValue {
464        CONTENT_TYPE_MSGPACK.clone()
465    }
466}