Skip to main content

saluki_components/encoders/datadog/metrics/
mod.rs

1use std::{num::NonZeroU64, time::Duration};
2
3use async_trait::async_trait;
4use datadog_protos::metrics as proto;
5use ddsketch::DDSketch;
6use facet::Facet;
7use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use protobuf::{rt::WireType, CodedOutputStream, Enum as _};
10use saluki_common::{iter::ReusableDeduplicator, task::HandleExt as _};
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::{SharedTagSet, Tag};
13use saluki_core::{
14    components::{encoders::*, ComponentContext},
15    data_model::{
16        event::{
17            metric::{Metric, MetricOrigin, MetricValues},
18            EventType,
19        },
20        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
21    },
22    observability::ComponentMetricsExt as _,
23    topology::{EventsBuffer, PayloadsBuffer},
24};
25use saluki_error::{generic_error, ErrorContext as _, GenericError};
26use saluki_io::compression::CompressionScheme;
27use saluki_metrics::MetricsBuilder;
28use serde::Deserialize;
29use tokio::{select, sync::mpsc, time::sleep};
30use tracing::{debug, error, warn};
31
32use crate::common::datadog::{
33    io::RB_BUFFER_CHUNK_SIZE,
34    request_builder::{EndpointEncoder, RequestBuilder},
35    telemetry::ComponentTelemetry,
36    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
37};
38
39const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; // 500 KiB
40const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; // 5 MiB
41
42const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
43
44// Protocol Buffers field numbers for series and sketch payload messages.
45//
46// These field numbers come from the Protocol Buffers definitions in `lib/datadog-protos/proto/agent_payload.proto`.
47const RESOURCES_TYPE_FIELD_NUMBER: u32 = 1;
48const RESOURCES_NAME_FIELD_NUMBER: u32 = 2;
49
50const METADATA_ORIGIN_FIELD_NUMBER: u32 = 1;
51
52const ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER: u32 = 4;
53const ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER: u32 = 5;
54const ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER: u32 = 6;
55
56const METRIC_POINT_VALUE_FIELD_NUMBER: u32 = 1;
57const METRIC_POINT_TIMESTAMP_FIELD_NUMBER: u32 = 2;
58
59const DOGSKETCH_TS_FIELD_NUMBER: u32 = 1;
60const DOGSKETCH_CNT_FIELD_NUMBER: u32 = 2;
61const DOGSKETCH_MIN_FIELD_NUMBER: u32 = 3;
62const DOGSKETCH_MAX_FIELD_NUMBER: u32 = 4;
63const DOGSKETCH_AVG_FIELD_NUMBER: u32 = 5;
64const DOGSKETCH_SUM_FIELD_NUMBER: u32 = 6;
65const DOGSKETCH_K_FIELD_NUMBER: u32 = 7;
66const DOGSKETCH_N_FIELD_NUMBER: u32 = 8;
67
68const SERIES_RESOURCES_FIELD_NUMBER: u32 = 1;
69const SERIES_METRIC_FIELD_NUMBER: u32 = 2;
70const SERIES_TAGS_FIELD_NUMBER: u32 = 3;
71const SERIES_POINTS_FIELD_NUMBER: u32 = 4;
72const SERIES_TYPE_FIELD_NUMBER: u32 = 5;
73const SERIES_UNIT_FIELD_NUMBER: u32 = 6;
74const SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 7;
75const SERIES_INTERVAL_FIELD_NUMBER: u32 = 8;
76const SERIES_METADATA_FIELD_NUMBER: u32 = 9;
77
78const SKETCH_METRIC_FIELD_NUMBER: u32 = 1;
79const SKETCH_HOST_FIELD_NUMBER: u32 = 2;
80const SKETCH_TAGS_FIELD_NUMBER: u32 = 4;
81const SKETCH_DOGSKETCHES_FIELD_NUMBER: u32 = 7;
82const SKETCH_METADATA_FIELD_NUMBER: u32 = 8;
83
84static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
85
86const fn default_max_metrics_per_payload() -> usize {
87    10_000
88}
89
90const fn default_flush_timeout_secs() -> u64 {
91    2
92}
93
94fn default_serializer_compressor_kind() -> String {
95    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
96}
97
98const fn default_zstd_compressor_level() -> i32 {
99    3
100}
101
102/// Datadog Metrics encoder.
103///
104/// Generates Datadog metrics payloads for the Datadog platform.
105#[derive(Clone, Deserialize, Facet)]
106#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
107#[allow(dead_code)]
108pub struct DatadogMetricsConfiguration {
109    /// Maximum number of input metrics to encode into a single request payload.
110    ///
111    /// This applies both to the series and sketches endpoints.
112    ///
113    /// Defaults to 10,000.
114    #[serde(
115        rename = "serializer_max_metrics_per_payload",
116        default = "default_max_metrics_per_payload"
117    )]
118    max_metrics_per_payload: usize,
119
120    /// Flush timeout for pending requests, in seconds.
121    ///
122    /// When the destination has written metrics to the in-flight request payload, but it has not yet reached the
123    /// payload size limits that would force the payload to be flushed, the destination will wait for a period of time
124    /// before flushing the in-flight request payload. This allows for the possibility of other events to be processed
125    /// and written into the request payload, thereby maximizing the payload size and reducing the number of requests
126    /// generated and sent overall.
127    ///
128    /// Defaults to 2 seconds.
129    #[serde(default = "default_flush_timeout_secs")]
130    flush_timeout_secs: u64,
131
132    /// Compression kind to use for the request payloads.
133    ///
134    /// Defaults to `zstd`.
135    #[serde(
136        rename = "serializer_compressor_kind",
137        default = "default_serializer_compressor_kind"
138    )]
139    compressor_kind: String,
140
141    /// Compressor level to use when the compressor kind is `zstd`.
142    ///
143    /// Defaults to 3.
144    #[serde(
145        rename = "serializer_zstd_compressor_level",
146        default = "default_zstd_compressor_level"
147    )]
148    zstd_compressor_level: i32,
149
150    /// Additional tags to apply to all forwarded metrics.
151    #[serde(default, skip)]
152    #[facet(opaque)]
153    additional_tags: Option<SharedTagSet>,
154}
155
156impl DatadogMetricsConfiguration {
157    /// Creates a new `DatadogMetricsConfiguration` from the given configuration.
158    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
159        Ok(config.as_typed()?)
160    }
161
162    /// Sets additional tags to be applied uniformly to all metrics forwarded by this destination.
163    pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
164        // Add the additional tags to the forwarder configuration.
165        self.additional_tags = Some(additional_tags);
166        self
167    }
168}
169
170#[async_trait]
171impl EncoderBuilder for DatadogMetricsConfiguration {
172    fn input_event_type(&self) -> EventType {
173        EventType::Metric
174    }
175
176    fn output_payload_type(&self) -> PayloadType {
177        PayloadType::Http
178    }
179
180    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
181        let metrics_builder = MetricsBuilder::from_component_context(&context);
182        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
183        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
184
185        // Create our request builders.
186        let mut series_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Series);
187        let mut sketches_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
188
189        if let Some(additional_tags) = self.additional_tags.as_ref() {
190            series_encoder = series_encoder.with_additional_tags(additional_tags.clone());
191            sketches_encoder = sketches_encoder.with_additional_tags(additional_tags.clone());
192        }
193
194        let mut series_rb = RequestBuilder::new(series_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
195        series_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
196
197        let mut sketches_rb = RequestBuilder::new(sketches_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
198        sketches_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
199
200        let flush_timeout = match self.flush_timeout_secs {
201            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
202            // batching, while still practically flushing things almost immediately.
203            0 => Duration::from_millis(10),
204            secs => Duration::from_secs(secs),
205        };
206
207        Ok(Box::new(DatadogMetrics {
208            series_rb,
209            sketches_rb,
210            telemetry,
211            flush_timeout,
212        }))
213    }
214}
215
216impl MemoryBounds for DatadogMetricsConfiguration {
217    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
218        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
219        //
220        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
221        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
222        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
223        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
224        // calculate the firm bound.
225        builder
226            .minimum()
227            .with_single_value::<DatadogMetrics>("component struct")
228            .with_array::<EventsBuffer>("request builder events channel", 8)
229            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
230
231        builder
232            .firm()
233            // Capture the size of the "split re-encode" buffers in the request builders, which is where we keep owned
234            // versions of metrics that we encode in case we need to actually re-encode them during a split operation.
235            .with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
236            .with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
237    }
238}
239
240pub struct DatadogMetrics {
241    series_rb: RequestBuilder<MetricsEndpointEncoder>,
242    sketches_rb: RequestBuilder<MetricsEndpointEncoder>,
243    telemetry: ComponentTelemetry,
244    flush_timeout: Duration,
245}
246
247#[async_trait]
248impl Encoder for DatadogMetrics {
249    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
250        let Self {
251            series_rb,
252            sketches_rb,
253            telemetry,
254            flush_timeout,
255        } = *self;
256
257        let mut health = context.take_health_handle();
258
259        // Spawn our request builder task.
260        let (events_tx, events_rx) = mpsc::channel(8);
261        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
262        let request_builder_fut =
263            run_request_builder(series_rb, sketches_rb, telemetry, events_rx, payloads_tx, flush_timeout);
264        let request_builder_handle = context
265            .topology_context()
266            .global_thread_pool()
267            .spawn_traced_named("dd-metrics-request-builder", request_builder_fut);
268
269        health.mark_ready();
270        debug!("Datadog Metrics encoder started.");
271
272        loop {
273            select! {
274                biased;
275
276                _ = health.live() => continue,
277                maybe_payload = payloads_rx.recv() => match maybe_payload {
278                    Some(payload) => {
279                        if let Err(e) = context.dispatcher().dispatch(payload).await {
280                            error!("Failed to dispatch payload: {}", e);
281                        }
282                    }
283                    None => break,
284                },
285                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
286                    Some(event_buffer) => events_tx.send(event_buffer).await
287                        .error_context("Failed to send event buffer to request builder task.")?,
288                    None => break,
289                },
290            }
291        }
292
293        // Drop the events sender, which signals the request builder task to stop.
294        drop(events_tx);
295
296        // Continue draining the payloads receiver until it is closed.
297        while let Some(payload) = payloads_rx.recv().await {
298            if let Err(e) = context.dispatcher().dispatch(payload).await {
299                error!("Failed to dispatch payload: {}", e);
300            }
301        }
302
303        // Request build task should now be stopped.
304        match request_builder_handle.await {
305            Ok(Ok(())) => debug!("Request builder task stopped."),
306            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
307            Err(e) => error!(error = %e, "Request builder task panicked."),
308        }
309
310        debug!("Datadog Metrics encoder stopped.");
311
312        Ok(())
313    }
314}
315
316async fn run_request_builder(
317    mut series_request_builder: RequestBuilder<MetricsEndpointEncoder>,
318    mut sketches_request_builder: RequestBuilder<MetricsEndpointEncoder>, telemetry: ComponentTelemetry,
319    mut events_rx: mpsc::Receiver<EventsBuffer>, payloads_tx: mpsc::Sender<PayloadsBuffer>, flush_timeout: Duration,
320) -> Result<(), GenericError> {
321    let mut pending_flush = false;
322    let pending_flush_timeout = sleep(flush_timeout);
323    tokio::pin!(pending_flush_timeout);
324
325    loop {
326        select! {
327            Some(event_buffer) = events_rx.recv() => {
328                for event in event_buffer {
329                    let metric = match event.try_into_metric() {
330                        Some(metric) => metric,
331                        None => continue,
332                    };
333
334                    let request_builder = match MetricsEndpoint::from_metric(&metric) {
335                        MetricsEndpoint::Series => &mut series_request_builder,
336                        MetricsEndpoint::Sketches => &mut sketches_request_builder,
337                    };
338
339                    // Encode the metric. If we get it back, that means the current request is full, and we need to
340                    // flush it before we can try to encode the metric again... so we'll hold on to it in that case
341                    // before flushing and trying to encode it again.
342                    let metric_to_retry = match request_builder.encode(metric).await {
343                        Ok(None) => continue,
344                        Ok(Some(metric)) => metric,
345                        Err(e) => {
346                            error!(error = %e, "Failed to encode metric.");
347                            telemetry.events_dropped_encoder().increment(1);
348                            continue;
349                        }
350                    };
351
352
353                    let maybe_requests = request_builder.flush().await;
354                    if maybe_requests.is_empty() {
355                        panic!("builder told us to flush, but gave us nothing");
356                    }
357
358                    for maybe_request in maybe_requests {
359                        match maybe_request {
360                            Ok((events, request)) => {
361                                let payload_meta = PayloadMetadata::from_event_count(events);
362                                let http_payload = HttpPayload::new(payload_meta, request);
363                                let payload = Payload::Http(http_payload);
364
365                                payloads_tx.send(payload).await
366                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
367                            },
368
369                            // TODO: Increment a counter here that metrics were dropped due to a flush failure.
370                            Err(e) => if e.is_recoverable() {
371                                // If the error is recoverable, we'll hold on to the metric to retry it later.
372                                continue;
373                            } else {
374                                return Err(GenericError::from(e).context("Failed to flush request."));
375                            }
376                        }
377                    }
378
379                    // Now try to encode the metric again. If it fails again, we'll just log it because it shouldn't
380                    // be possible to fail at this point, otherwise we would have already caught that the first
381                    // time.
382                    if let Err(e) = request_builder.encode(metric_to_retry).await {
383                        error!(error = %e, "Failed to encode metric.");
384                        telemetry.events_dropped_encoder().increment(1);
385                    }
386                }
387
388                debug!("Processed event buffer.");
389
390                // If we're not already pending a flush, we'll start the countdown.
391                if !pending_flush {
392                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
393                    pending_flush = true;
394                }
395            },
396            _ = &mut pending_flush_timeout, if pending_flush => {
397                debug!("Flushing pending request(s).");
398
399                pending_flush = false;
400
401                // Once we've encoded and written all metrics, we flush the request builders to generate a request with
402                // anything left over. Again, we'll enqueue those requests to be sent immediately.
403                let maybe_series_requests = series_request_builder.flush().await;
404                for maybe_request in maybe_series_requests {
405                    match maybe_request {
406                        Ok((events, request)) => {
407                            let payload_meta = PayloadMetadata::from_event_count(events);
408                            let http_payload = HttpPayload::new(payload_meta, request);
409                            let payload = Payload::Http(http_payload);
410
411                            payloads_tx.send(payload).await
412                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
413                        },
414
415                        // TODO: Increment a counter here that metrics were dropped due to a flush failure.
416                        Err(e) => if e.is_recoverable() {
417                            // If the error is recoverable, we'll hold on to the metric to retry it later.
418                            continue;
419                        } else {
420                            return Err(GenericError::from(e).context("Failed to flush request."));
421                        }
422                    }
423                }
424
425                let maybe_sketches_requests = sketches_request_builder.flush().await;
426                for maybe_request in maybe_sketches_requests {
427                    match maybe_request {
428                        Ok((events, request)) => {
429                            let payload_meta = PayloadMetadata::from_event_count(events);
430                            let http_payload = HttpPayload::new(payload_meta, request);
431                            let payload = Payload::Http(http_payload);
432
433                            payloads_tx.send(payload).await
434                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
435                        },
436
437                        // TODO: Increment a counter here that metrics were dropped due to a flush failure.
438                        Err(e) => if e.is_recoverable() {
439                            // If the error is recoverable, we'll hold on to the metric to retry it later.
440                            continue;
441                        } else {
442                            return Err(GenericError::from(e).context("Failed to flush request."));
443                        }
444                    }
445                }
446
447                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
448            },
449
450            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
451            else => break,
452        }
453    }
454
455    Ok(())
456}
457
458/// Metrics intake endpoint.
459#[derive(Clone, Copy, Debug, Eq, PartialEq)]
460enum MetricsEndpoint {
461    /// Series metrics.
462    ///
463    /// Includes counters, gauges, rates, and sets.
464    Series,
465
466    /// Sketch metrics.
467    ///
468    /// Includes histograms and distributions.
469    Sketches,
470}
471
472impl MetricsEndpoint {
473    /// Creates a new `MetricsEndpoint` from the given metric.
474    pub fn from_metric(metric: &Metric) -> Self {
475        match metric.values() {
476            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
477                Self::Series
478            }
479            MetricValues::Histogram(..) | MetricValues::Distribution(..) => Self::Sketches,
480        }
481    }
482}
483
484#[derive(Debug)]
485struct MetricsEndpointEncoder {
486    endpoint: MetricsEndpoint,
487    primary_scratch_buf: Vec<u8>,
488    secondary_scratch_buf: Vec<u8>,
489    packed_scratch_buf: Vec<u8>,
490    additional_tags: SharedTagSet,
491    tags_deduplicator: ReusableDeduplicator<Tag>,
492}
493
494impl MetricsEndpointEncoder {
495    /// Creates a new `MetricsEndpointEncoder` for the given endpoint.
496    pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self {
497        Self {
498            endpoint,
499            primary_scratch_buf: Vec::new(),
500            secondary_scratch_buf: Vec::new(),
501            packed_scratch_buf: Vec::new(),
502            additional_tags: SharedTagSet::default(),
503            tags_deduplicator: ReusableDeduplicator::new(),
504        }
505    }
506
507    /// Sets the additional tags to be included with every metric encoded by this encoder.
508    ///
509    /// These tags are added in a deduplicated fashion, the same as instrumented tags and origin tags. This is an
510    /// optimized codepath for tag inclusion in high-volume scenarios, where creating new additional contexts
511    /// through the traditional means (for example, `ContextResolver`) would be too expensive.
512    pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
513        self.additional_tags = additional_tags;
514        self
515    }
516}
517
518impl EndpointEncoder for MetricsEndpointEncoder {
519    type Input = Metric;
520    type EncodeError = protobuf::Error;
521
522    fn encoder_name() -> &'static str {
523        "metrics"
524    }
525
526    fn compressed_size_limit(&self) -> usize {
527        match self.endpoint {
528            MetricsEndpoint::Series => SERIES_V2_COMPRESSED_SIZE_LIMIT,
529            MetricsEndpoint::Sketches => DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
530        }
531    }
532
533    fn uncompressed_size_limit(&self) -> usize {
534        match self.endpoint {
535            MetricsEndpoint::Series => SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
536            MetricsEndpoint::Sketches => DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
537        }
538    }
539
540    fn is_valid_input(&self, input: &Self::Input) -> bool {
541        let input_endpoint = MetricsEndpoint::from_metric(input);
542        input_endpoint == self.endpoint
543    }
544
545    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
546        // NOTE: We're passing _four_ buffers to `encode_single_metric`, which is a lot, but with good reason.
547        //
548        // The first buffer, `buffer`, is the overall output buffer: the caller expects us to put the full encoded
549        // metric payload into this buffer.
550        //
551        // The second and third buffers, `primary_scratch_buf` and `secondary_scratch_buf`, are used for roughly the
552        // same thing but deal with _nesting_. When writing a "message" in Protocol Buffers, the message data itself is
553        // prefixed with the field number and a length delimiter that specifies how long the message is. We can't write
554        // that length delimiter until we know the full size of the message, so we write the message to a scratch
555        // buffer, calculate its size, and then write the field number and length delimiter to the output buffer
556        // followed by the message data from the scratch buffer.
557        //
558        // We have _two_ scratch buffers because you need a dedicated buffer for each level of nested message. We have
559        // to be able to nest up to two levels deep in our metrics payload, so we need two scratch buffers to handle
560        // that.
561        //
562        // The fourth buffer, `packed_scratch_buf`, is used for writing out packed repeated fields. This is similar to
563        // the situation describe above, except it's not _exactly_ the same as an additional level of nesting.. so I
564        // just decided to give it a somewhat more descriptive name.
565        encode_single_metric(
566            input,
567            &self.additional_tags,
568            buffer,
569            &mut self.primary_scratch_buf,
570            &mut self.secondary_scratch_buf,
571            &mut self.packed_scratch_buf,
572            &mut self.tags_deduplicator,
573        )?;
574
575        Ok(())
576    }
577
578    fn endpoint_uri(&self) -> Uri {
579        match self.endpoint {
580            MetricsEndpoint::Series => PathAndQuery::from_static("/api/v2/series").into(),
581            MetricsEndpoint::Sketches => PathAndQuery::from_static("/api/beta/sketches").into(),
582        }
583    }
584
585    fn endpoint_method(&self) -> Method {
586        // Both endpoints use POST.
587        Method::POST
588    }
589
590    fn content_type(&self) -> HeaderValue {
591        // Both endpoints encode via Protocol Buffers.
592        CONTENT_TYPE_PROTOBUF.clone()
593    }
594}
595
596fn field_number_for_metric_type(metric: &Metric) -> u32 {
597    match metric.values() {
598        MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => 1,
599        MetricValues::Histogram(..) | MetricValues::Distribution(..) => 1,
600    }
601}
602
603fn get_message_size(raw_msg_size: usize) -> Result<u32, protobuf::Error> {
604    const MAX_MESSAGE_SIZE: u64 = i32::MAX as u64;
605
606    // Individual messages cannot be larger than `i32::MAX`, so check that here before proceeding.
607    if raw_msg_size as u64 > MAX_MESSAGE_SIZE {
608        return Err(std::io::Error::other("message size exceeds limit (2147483648 bytes)").into());
609    }
610
611    Ok(raw_msg_size as u32)
612}
613
614fn get_message_size_from_buffer(buf: &[u8]) -> Result<u32, protobuf::Error> {
615    get_message_size(buf.len())
616}
617
618fn encode_single_metric(
619    metric: &Metric, additional_tags: &SharedTagSet, output_buf: &mut Vec<u8>, primary_scratch_buf: &mut Vec<u8>,
620    secondary_scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
621    tags_deduplicator: &mut ReusableDeduplicator<Tag>,
622) -> Result<(), protobuf::Error> {
623    let mut output_stream = CodedOutputStream::vec(output_buf);
624    let field_number = field_number_for_metric_type(metric);
625
626    write_nested_message(&mut output_stream, primary_scratch_buf, field_number, |os| {
627        // Depending on the metric type, we write out the appropriate fields.
628        match metric.values() {
629            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
630                encode_series_metric(metric, additional_tags, os, secondary_scratch_buf, tags_deduplicator)
631            }
632            MetricValues::Histogram(..) | MetricValues::Distribution(..) => encode_sketch_metric(
633                metric,
634                additional_tags,
635                os,
636                secondary_scratch_buf,
637                packed_scratch_buf,
638                tags_deduplicator,
639            ),
640        }
641    })
642}
643
644fn encode_series_metric(
645    metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
646    scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
647) -> Result<(), protobuf::Error> {
648    // Write the metric name and tags.
649    output_stream.write_string(SERIES_METRIC_FIELD_NUMBER, metric.context().name())?;
650
651    let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
652    write_series_tags(deduplicated_tags, output_stream, scratch_buf)?;
653
654    // Set the host resource.
655    write_resource(
656        output_stream,
657        scratch_buf,
658        "host",
659        metric.metadata().hostname().unwrap_or_default(),
660    )?;
661
662    // Write the origin metadata, if it exists.
663    if let Some(origin) = metric.metadata().origin() {
664        match origin {
665            MetricOrigin::SourceType(source_type) => {
666                output_stream.write_string(SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER, source_type.as_ref())?;
667            }
668            MetricOrigin::OriginMetadata {
669                product,
670                subproduct,
671                product_detail,
672            } => {
673                write_origin_metadata(
674                    output_stream,
675                    scratch_buf,
676                    SERIES_METADATA_FIELD_NUMBER,
677                    *product,
678                    *subproduct,
679                    *product_detail,
680                )?;
681            }
682        }
683    }
684
685    // Now write out our metric type, points, and interval (if applicable).
686    let (metric_type, points, maybe_interval) = match metric.values() {
687        MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
688        MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
689        MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
690        MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
691        _ => unreachable!(),
692    };
693
694    output_stream.write_enum(SERIES_TYPE_FIELD_NUMBER, metric_type.value())?;
695
696    if let Some(unit) = metric.metadata().unit() {
697        output_stream.write_string(SERIES_UNIT_FIELD_NUMBER, unit)?;
698    }
699
700    for (timestamp, value) in points {
701        // If this is a rate metric, scale our value by the interval, in seconds.
702        let value = maybe_interval
703            .map(|interval| value / interval.as_secs_f64())
704            .unwrap_or(value);
705        let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
706
707        write_point(output_stream, scratch_buf, value, timestamp)?;
708    }
709
710    if let Some(interval) = maybe_interval {
711        output_stream.write_int64(SERIES_INTERVAL_FIELD_NUMBER, interval.as_secs() as i64)?;
712    }
713
714    Ok(())
715}
716
717fn encode_sketch_metric(
718    metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
719    scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
720) -> Result<(), protobuf::Error> {
721    // Write the metric name and tags.
722    output_stream.write_string(SKETCH_METRIC_FIELD_NUMBER, metric.context().name())?;
723
724    let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
725    write_sketch_tags(deduplicated_tags, output_stream, scratch_buf)?;
726
727    // Write the host.
728    output_stream.write_string(
729        SKETCH_HOST_FIELD_NUMBER,
730        metric.metadata().hostname().unwrap_or_default(),
731    )?;
732
733    // Set the origin metadata, if it exists.
734    if let Some(MetricOrigin::OriginMetadata {
735        product,
736        subproduct,
737        product_detail,
738    }) = metric.metadata().origin()
739    {
740        write_origin_metadata(
741            output_stream,
742            scratch_buf,
743            SKETCH_METADATA_FIELD_NUMBER,
744            *product,
745            *subproduct,
746            *product_detail,
747        )?;
748    }
749
750    // TODO: emit `metric.metadata().unit()` in the sketch payload once the upstream `agent-payload` proto defines a
751    // unit field on `SketchPayload.Sketch`.
752
753    // Write out our sketches.
754    match metric.values() {
755        MetricValues::Distribution(sketches) => {
756            for (timestamp, value) in sketches {
757                write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, value)?;
758            }
759        }
760        MetricValues::Histogram(points) => {
761            for (timestamp, histogram) in points {
762                // We convert histograms to sketches to be able to write them out in the payload.
763                let mut ddsketch = DDSketch::default();
764                for sample in histogram.samples() {
765                    ddsketch.insert_n(sample.value.into_inner(), sample.weight);
766                }
767
768                write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
769            }
770        }
771        _ => unreachable!(),
772    }
773
774    Ok(())
775}
776
777fn write_resource(
778    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, resource_type: &str, resource_name: &str,
779) -> Result<(), protobuf::Error> {
780    write_nested_message(output_stream, scratch_buf, SERIES_RESOURCES_FIELD_NUMBER, |os| {
781        os.write_string(RESOURCES_TYPE_FIELD_NUMBER, resource_type)?;
782        os.write_string(RESOURCES_NAME_FIELD_NUMBER, resource_name)
783    })
784}
785
786fn write_origin_metadata(
787    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, origin_product: u32,
788    origin_category: u32, origin_service: u32,
789) -> Result<(), protobuf::Error> {
790    // TODO: Figure out how to cleanly use `write_nested_message` here.
791
792    scratch_buf.clear();
793
794    {
795        let mut origin_output_stream = CodedOutputStream::vec(scratch_buf);
796        origin_output_stream.write_uint32(ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER, origin_product)?;
797        origin_output_stream.write_uint32(ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER, origin_category)?;
798        origin_output_stream.write_uint32(ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER, origin_service)?;
799        origin_output_stream.flush()?;
800    }
801
802    // We do a little song and dance here because the `Origin` message is embedded inside of `Metadata`, so we need to
803    // write out field numbers/length delimiters in order: `Metadata`, and then `Origin`... but we write out origin
804    // message to the scratch buffer first... so we write out our `Metadata` preamble stuff to get its length, and then
805    // use that in conjunction with the `Origin` message size to write out the full `Metadata` message.
806    let origin_message_size = get_message_size_from_buffer(scratch_buf)?;
807
808    let mut metadata_preamble_buf = [0; 64];
809    let metadata_preamble_len = {
810        let mut metadata_output_stream = CodedOutputStream::bytes(&mut metadata_preamble_buf[..]);
811        metadata_output_stream.write_tag(METADATA_ORIGIN_FIELD_NUMBER, WireType::LengthDelimited)?;
812        metadata_output_stream.write_raw_varint32(origin_message_size)?;
813        metadata_output_stream.flush()?;
814        metadata_output_stream.total_bytes_written() as usize
815    };
816
817    let metadata_message_size = get_message_size(scratch_buf.len() + metadata_preamble_len)?;
818
819    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
820    output_stream.write_raw_varint32(metadata_message_size)?;
821    output_stream.write_raw_bytes(&metadata_preamble_buf[..metadata_preamble_len])?;
822    output_stream.write_raw_bytes(scratch_buf)
823}
824
825fn write_point(
826    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, value: f64, timestamp: i64,
827) -> Result<(), protobuf::Error> {
828    write_nested_message(output_stream, scratch_buf, SERIES_POINTS_FIELD_NUMBER, |os| {
829        os.write_double(METRIC_POINT_VALUE_FIELD_NUMBER, value)?;
830        os.write_int64(METRIC_POINT_TIMESTAMP_FIELD_NUMBER, timestamp)
831    })
832}
833
834fn write_dogsketch(
835    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
836    timestamp: Option<NonZeroU64>, sketch: &DDSketch,
837) -> Result<(), protobuf::Error> {
838    // If the sketch is empty, we don't write it out.
839    if sketch.is_empty() {
840        warn!("Attempted to write an empty sketch to sketches payload, skipping.");
841        return Ok(());
842    }
843
844    write_nested_message(output_stream, scratch_buf, SKETCH_DOGSKETCHES_FIELD_NUMBER, |os| {
845        os.write_int64(DOGSKETCH_TS_FIELD_NUMBER, timestamp.map_or(0, |ts| ts.get() as i64))?;
846        os.write_int64(DOGSKETCH_CNT_FIELD_NUMBER, sketch.count() as i64)?;
847        os.write_double(DOGSKETCH_MIN_FIELD_NUMBER, sketch.min().unwrap())?;
848        os.write_double(DOGSKETCH_MAX_FIELD_NUMBER, sketch.max().unwrap())?;
849        os.write_double(DOGSKETCH_AVG_FIELD_NUMBER, sketch.avg().unwrap())?;
850        os.write_double(DOGSKETCH_SUM_FIELD_NUMBER, sketch.sum().unwrap())?;
851
852        let bin_keys = sketch.bins().iter().map(|bin| bin.key());
853        write_repeated_packed_from_iter(
854            os,
855            packed_scratch_buf,
856            DOGSKETCH_K_FIELD_NUMBER,
857            bin_keys,
858            |inner_os, value| inner_os.write_sint32_no_tag(value),
859        )?;
860
861        let bin_counts = sketch.bins().iter().map(|bin| bin.count());
862        write_repeated_packed_from_iter(
863            os,
864            packed_scratch_buf,
865            DOGSKETCH_N_FIELD_NUMBER,
866            bin_counts,
867            |inner_os, value| inner_os.write_uint32_no_tag(value),
868        )
869    })
870}
871
872fn get_deduplicated_tags<'a>(
873    metric: &'a Metric, additional_tags: &'a SharedTagSet, tags_deduplicator: &'a mut ReusableDeduplicator<Tag>,
874) -> impl Iterator<Item = &'a Tag> {
875    let chained_tags = metric
876        .context()
877        .tags()
878        .into_iter()
879        .chain(additional_tags)
880        .chain(metric.context().origin_tags());
881
882    tags_deduplicator.deduplicated(chained_tags)
883}
884
885fn write_tags<'a, I, F>(
886    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, tag_encoder: F,
887) -> Result<(), protobuf::Error>
888where
889    I: Iterator<Item = &'a Tag>,
890    F: Fn(&Tag, &mut CodedOutputStream<'_>, &mut Vec<u8>) -> Result<(), protobuf::Error>,
891{
892    for tag in tags {
893        tag_encoder(tag, output_stream, scratch_buf)?;
894    }
895
896    Ok(())
897}
898
899fn write_series_tags<'a, I>(
900    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
901) -> Result<(), protobuf::Error>
902where
903    I: Iterator<Item = &'a Tag>,
904{
905    write_tags(tags, output_stream, scratch_buf, |tag, os, buf| {
906        // If this is a resource tag, we'll convert it directly to a resource entry.
907        if tag.name() == "dd.internal.resource" {
908            if let Some((resource_type, resource_name)) = tag.value().and_then(|s| s.split_once(':')) {
909                write_resource(os, buf, resource_type, resource_name)
910            } else {
911                Ok(())
912            }
913        } else {
914            // We're dealing with a normal tag.
915            os.write_string(SERIES_TAGS_FIELD_NUMBER, tag.as_str())
916        }
917    })
918}
919
920fn write_sketch_tags<'a, I>(
921    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
922) -> Result<(), protobuf::Error>
923where
924    I: Iterator<Item = &'a Tag>,
925{
926    write_tags(tags, output_stream, scratch_buf, |tag, os, _buf| {
927        // We always write the tags as-is, without any special handling for resource tags.
928        os.write_string(SKETCH_TAGS_FIELD_NUMBER, tag.as_str())
929    })
930}
931
932fn write_nested_message<F>(
933    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, writer: F,
934) -> Result<(), protobuf::Error>
935where
936    F: FnOnce(&mut CodedOutputStream<'_>) -> Result<(), protobuf::Error>,
937{
938    scratch_buf.clear();
939
940    {
941        let mut nested_output_stream = CodedOutputStream::vec(scratch_buf);
942        writer(&mut nested_output_stream)?;
943        nested_output_stream.flush()?;
944    }
945
946    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
947
948    let nested_message_size = get_message_size_from_buffer(scratch_buf)?;
949    output_stream.write_raw_varint32(nested_message_size)?;
950    output_stream.write_raw_bytes(scratch_buf)
951}
952
953fn write_repeated_packed_from_iter<I, T, F>(
954    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, values: I, writer: F,
955) -> Result<(), protobuf::Error>
956where
957    I: Iterator<Item = T>,
958    F: Fn(&mut CodedOutputStream<'_>, T) -> Result<(), protobuf::Error>,
959{
960    // This is a helper function that lets us write out a packed repeated field from an iterator of values.
961    // `CodedOutputStream` has similar functions to handle this, but they require a slice of values, which would mean we
962    // need to either allocate a new vector each time to hold the values, or thread through two additional vectors (one
963    // for `i32`, one for `u32`) to reuse the allocation... both of which are not great options.
964    //
965    // We've simply opted to pass through a _single_ vector that we can reuse, and write the packed values directly to
966    // that, almost identically to how `CodedOutputStream::write_repeated_packed_*` methods would do it.
967
968    scratch_buf.clear();
969
970    {
971        let mut packed_output_stream = CodedOutputStream::vec(scratch_buf);
972        for value in values {
973            writer(&mut packed_output_stream, value)?;
974        }
975        packed_output_stream.flush()?;
976    }
977
978    let data_size = get_message_size_from_buffer(scratch_buf)?;
979
980    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
981    output_stream.write_raw_varint32(data_size)?;
982    output_stream.write_raw_bytes(scratch_buf)
983}
984
985#[cfg(test)]
986mod tests {
987    use std::time::Duration;
988
989    use protobuf::CodedOutputStream;
990    use saluki_common::iter::ReusableDeduplicator;
991    use saluki_context::{tags::SharedTagSet, Context};
992    use saluki_core::data_model::event::metric::{Metric, MetricMetadata, MetricValues};
993    use stringtheory::MetaString;
994
995    use super::{encode_series_metric, encode_sketch_metric, MetricsEndpoint, MetricsEndpointEncoder};
996    use crate::common::datadog::request_builder::EndpointEncoder as _;
997
998    #[test]
999    fn histogram_vs_sketch_identical_payload() {
1000        // For the same exact set of points, we should be able to construct either a histogram or distribution from
1001        // those points, and when encoded as a sketch payload, end up with the same exact payload.
1002        //
1003        // They should be identical because the goal is that we convert histograms into sketches in the same way we
1004        // would have originally constructed a sketch based on the same samples.
1005        let samples = &[1.0, 2.0, 3.0, 4.0, 5.0];
1006        let histogram = Metric::histogram("simple_samples", samples);
1007        let distribution = Metric::distribution("simple_samples", samples);
1008        let host_tags = SharedTagSet::default();
1009
1010        let mut buf1 = Vec::new();
1011        let mut buf2 = Vec::new();
1012        let mut tags_deduplicator = ReusableDeduplicator::new();
1013
1014        let mut histogram_payload = Vec::new();
1015        {
1016            let mut histogram_writer = CodedOutputStream::vec(&mut histogram_payload);
1017            encode_sketch_metric(
1018                &histogram,
1019                &host_tags,
1020                &mut histogram_writer,
1021                &mut buf1,
1022                &mut buf2,
1023                &mut tags_deduplicator,
1024            )
1025            .expect("Failed to encode histogram as sketch");
1026        }
1027
1028        let mut distribution_payload = Vec::new();
1029        {
1030            let mut distribution_writer = CodedOutputStream::vec(&mut distribution_payload);
1031            encode_sketch_metric(
1032                &distribution,
1033                &host_tags,
1034                &mut distribution_writer,
1035                &mut buf1,
1036                &mut buf2,
1037                &mut tags_deduplicator,
1038            )
1039            .expect("Failed to encode distribution as sketch");
1040        }
1041
1042        assert_eq!(histogram_payload, distribution_payload);
1043    }
1044
1045    #[test]
1046    fn input_valid() {
1047        // Our encoder should always consider series metrics valid when set to the series endpoint, and similarly for
1048        // sketch metrics when set to the sketches endpoint.
1049        let counter = Metric::counter("counter", 1.0);
1050        let rate = Metric::rate("rate", 1.0, Duration::from_secs(1));
1051        let gauge = Metric::gauge("gauge", 1.0);
1052        let set = Metric::set("set", "foo");
1053        let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1054        let distribution = Metric::distribution("distribution", [1.0, 2.0, 3.0]);
1055
1056        let series_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Series);
1057        let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1058
1059        assert!(series_endpoint.is_valid_input(&counter));
1060        assert!(series_endpoint.is_valid_input(&rate));
1061        assert!(series_endpoint.is_valid_input(&gauge));
1062        assert!(series_endpoint.is_valid_input(&set));
1063        assert!(!series_endpoint.is_valid_input(&histogram));
1064        assert!(!series_endpoint.is_valid_input(&distribution));
1065
1066        assert!(!sketches_endpoint.is_valid_input(&counter));
1067        assert!(!sketches_endpoint.is_valid_input(&rate));
1068        assert!(!sketches_endpoint.is_valid_input(&gauge));
1069        assert!(!sketches_endpoint.is_valid_input(&set));
1070        assert!(sketches_endpoint.is_valid_input(&histogram));
1071        assert!(sketches_endpoint.is_valid_input(&distribution));
1072    }
1073
1074    #[test]
1075    fn series_metric_unit_encoded() {
1076        // A gauge with a unit in its metadata must produce a series protobuf payload that contains the unit string
1077        // in field 6 (MetricSeries.unit), which the Datadog backend already accepts.
1078        //
1079        // In production this state is reached when histogram aggregation flushes timer (`ms`) statistics as gauges,
1080        // each carrying unit = "millisecond" propagated through MetricMetadata.
1081        let context = Context::from_static_parts("my.timer.avg", &[]);
1082        let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1083        let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1084
1085        let host_tags = SharedTagSet::default();
1086        let mut scratch_buf = Vec::new();
1087        let mut tags_deduplicator = ReusableDeduplicator::new();
1088
1089        let mut payload = Vec::new();
1090        {
1091            let mut writer = CodedOutputStream::vec(&mut payload);
1092            encode_series_metric(
1093                &gauge,
1094                &host_tags,
1095                &mut writer,
1096                &mut scratch_buf,
1097                &mut tags_deduplicator,
1098            )
1099            .expect("Failed to encode gauge as series metric");
1100            writer.flush().expect("Failed to flush");
1101        }
1102
1103        // In the protobuf wire format, a string field with field number 6 has tag byte 0x32 ((6 << 3) | 2).
1104        // The tag is followed by a varint length and then the UTF-8 bytes of the string.
1105        let expected_tag: u8 = (6 << 3) | 2; // 0x32
1106        let expected_value = b"millisecond";
1107
1108        let tag_pos = payload
1109            .windows(1 + 1 + expected_value.len())
1110            .position(|w| w[0] == expected_tag && w[1] == expected_value.len() as u8 && &w[2..] == expected_value);
1111
1112        assert!(
1113            tag_pos.is_some(),
1114            "series payload should contain unit field (field 6 = 'millisecond'), got bytes: {:?}",
1115            payload
1116        );
1117    }
1118}
1119
1120#[cfg(test)]
1121mod config_smoke {
1122    use serde_json::json;
1123
1124    use super::DatadogMetricsConfiguration;
1125    use crate::config_registry::structs;
1126    use crate::config_registry::test_support::run_config_smoke_tests;
1127
1128    #[tokio::test]
1129    async fn smoke_test() {
1130        run_config_smoke_tests(structs::DATADOG_METRICS_CONFIGURATION, &[], json!({}), |cfg| {
1131            cfg.as_typed::<DatadogMetricsConfiguration>()
1132                .expect("DatadogMetricsConfiguration should deserialize")
1133        })
1134        .await
1135    }
1136}