Skip to main content

saluki_components/encoders/datadog/metrics/
mod.rs

1use std::{fmt, num::NonZeroU64, time::Duration};
2
3use async_trait::async_trait;
4use datadog_protos::metrics as proto;
5use ddsketch::DDSketch;
6use facet::Facet;
7use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
8use protobuf::{rt::WireType, CodedOutputStream, Enum as _};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use saluki_common::{iter::ReusableDeduplicator, task::HandleExt as _};
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::{SharedTagSet, Tag};
13use saluki_core::{
14    components::{encoders::*, ComponentContext},
15    data_model::{
16        event::{
17            metric::{Metric, MetricOrigin, MetricValues},
18            EventType,
19        },
20        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
21    },
22    observability::ComponentMetricsExt as _,
23    topology::{EventsBuffer, PayloadsBuffer},
24};
25use saluki_error::{generic_error, ErrorContext as _, GenericError};
26use saluki_io::compression::CompressionScheme;
27use saluki_metrics::MetricsBuilder;
28use serde::Deserialize;
29use serde_json::{Map as JsonMap, Number as JsonNumber, Value as JsonValue};
30use tokio::{select, sync::mpsc, time::sleep};
31use tracing::{debug, error, warn};
32
33use crate::common::datadog::{
34    clamp_payload_limits,
35    io::RB_BUFFER_CHUNK_SIZE,
36    request_builder::{EndpointEncoder, RequestBuilder},
37    telemetry::ComponentTelemetry,
38    DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT, DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT, METRICS_SERIES_V1_PATH,
39    METRICS_SERIES_V2_PATH, METRICS_SKETCHES_PATH,
40};
41
42const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; // 500 KiB
43const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; // 5 MiB
44
45// V1 series JSON endpoint limits match the Datadog Agent's generic serializer defaults.
46const SERIES_V1_COMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT;
47const SERIES_V1_UNCOMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT;
48
49const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
50
51// Protocol Buffers field numbers for series and sketch payload messages.
52//
53// These field numbers come from the Protocol Buffers definitions in `lib/datadog-protos/proto/agent_payload.proto`.
54const RESOURCES_TYPE_FIELD_NUMBER: u32 = 1;
55const RESOURCES_NAME_FIELD_NUMBER: u32 = 2;
56
57const METADATA_ORIGIN_FIELD_NUMBER: u32 = 1;
58
59const ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER: u32 = 4;
60const ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER: u32 = 5;
61const ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER: u32 = 6;
62
63const METRIC_POINT_VALUE_FIELD_NUMBER: u32 = 1;
64const METRIC_POINT_TIMESTAMP_FIELD_NUMBER: u32 = 2;
65
66const DOGSKETCH_TS_FIELD_NUMBER: u32 = 1;
67const DOGSKETCH_CNT_FIELD_NUMBER: u32 = 2;
68const DOGSKETCH_MIN_FIELD_NUMBER: u32 = 3;
69const DOGSKETCH_MAX_FIELD_NUMBER: u32 = 4;
70const DOGSKETCH_AVG_FIELD_NUMBER: u32 = 5;
71const DOGSKETCH_SUM_FIELD_NUMBER: u32 = 6;
72const DOGSKETCH_K_FIELD_NUMBER: u32 = 7;
73const DOGSKETCH_N_FIELD_NUMBER: u32 = 8;
74
75const SERIES_RESOURCES_FIELD_NUMBER: u32 = 1;
76const SERIES_METRIC_FIELD_NUMBER: u32 = 2;
77const SERIES_TAGS_FIELD_NUMBER: u32 = 3;
78const SERIES_POINTS_FIELD_NUMBER: u32 = 4;
79const SERIES_TYPE_FIELD_NUMBER: u32 = 5;
80const SERIES_UNIT_FIELD_NUMBER: u32 = 6;
81const SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 7;
82const SERIES_INTERVAL_FIELD_NUMBER: u32 = 8;
83const SERIES_METADATA_FIELD_NUMBER: u32 = 9;
84
85const SKETCH_METRIC_FIELD_NUMBER: u32 = 1;
86const SKETCH_HOST_FIELD_NUMBER: u32 = 2;
87const SKETCH_TAGS_FIELD_NUMBER: u32 = 4;
88const SKETCH_DOGSKETCHES_FIELD_NUMBER: u32 = 7;
89const SKETCH_METADATA_FIELD_NUMBER: u32 = 8;
90
91static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
92static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
93
94// JSON framing for the V1 series payload, which wraps the array of `Serie` objects in a top-level object.
95const SERIES_V1_PAYLOAD_PREFIX: &[u8] = b"{\"series\":[";
96const SERIES_V1_PAYLOAD_SUFFIX: &[u8] = b"]}";
97const SERIES_V1_INPUT_SEPARATOR: &[u8] = b",";
98
99const fn default_max_metrics_per_payload() -> usize {
100    10_000
101}
102
103const fn default_max_payload_size() -> usize {
104    DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
105}
106
107const fn default_max_uncompressed_payload_size() -> usize {
108    DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
109}
110
111const fn default_max_series_payload_size() -> usize {
112    SERIES_V2_COMPRESSED_SIZE_LIMIT
113}
114
115const fn default_max_series_uncompressed_payload_size() -> usize {
116    SERIES_V2_UNCOMPRESSED_SIZE_LIMIT
117}
118
119const fn default_flush_timeout_secs() -> u64 {
120    2
121}
122
123fn default_serializer_compressor_kind() -> String {
124    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
125}
126
127const fn default_zstd_compressor_level() -> i32 {
128    3
129}
130
131const fn default_use_v2_api_series() -> bool {
132    true
133}
134
135const fn default_log_payloads() -> bool {
136    false
137}
138
139/// Datadog Metrics encoder.
140///
141/// Generates Datadog metrics payloads for the Datadog platform.
142#[derive(Clone, Deserialize, Facet)]
143#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
144#[allow(dead_code)]
145pub struct DatadogMetricsConfiguration {
146    /// Maximum number of input metrics to encode into a single request payload.
147    ///
148    /// This applies both to the series and sketches endpoints.
149    ///
150    /// Defaults to 10,000.
151    #[serde(
152        rename = "serializer_max_metrics_per_payload",
153        default = "default_max_metrics_per_payload"
154    )]
155    max_metrics_per_payload: usize,
156
157    /// Maximum compressed size, in bytes, of generic payloads.
158    ///
159    /// This applies to V1 JSON series payloads and sketch payloads, matching the Datadog Agent's generic payload
160    /// builder. V2 series payloads use `serializer_max_series_payload_size` instead. The effective value is clamped to
161    /// the Agent's default intake-safe limit of 2,621,440 bytes, so larger configured values do not allow payloads that
162    /// intake may reject. If set to `0`, every non-empty compressed payload exceeds the limit and is dropped during
163    /// flush.
164    ///
165    /// Defaults to 2,621,440 bytes.
166    #[serde(rename = "serializer_max_payload_size", default = "default_max_payload_size")]
167    max_payload_size: usize,
168
169    /// Maximum uncompressed size, in bytes, of generic payloads.
170    ///
171    /// This applies to V1 JSON series payloads and sketch payloads, matching the Datadog Agent's generic payload
172    /// builder. V2 series payloads use `serializer_max_series_uncompressed_payload_size` instead. The effective value
173    /// is clamped to the Agent's default intake-safe limit of 4,194,304 bytes, so larger configured values do not allow
174    /// payloads that intake may reject. Values smaller than the minimum endpoint framing size prevent the request
175    /// builder from starting.
176    ///
177    /// Defaults to 4,194,304 bytes.
178    #[serde(
179        rename = "serializer_max_uncompressed_payload_size",
180        default = "default_max_uncompressed_payload_size"
181    )]
182    max_uncompressed_payload_size: usize,
183
184    /// Maximum compressed size, in bytes, of a V2 series payload.
185    ///
186    /// This applies only when `use_v2_api.series` is `true`. V1 series and sketches use `serializer_max_payload_size`
187    /// instead. The effective value is clamped to the V2 series API limit of 512,000 bytes, so larger configured values
188    /// do not allow payloads that intake would reject. High-throughput workloads may increase this up to that API limit
189    /// to reduce request count, at the cost of larger individual requests. If set to `0`, every non-empty compressed
190    /// payload exceeds the limit and is dropped during flush.
191    ///
192    /// Defaults to 512,000 bytes.
193    #[serde(
194        rename = "serializer_max_series_payload_size",
195        default = "default_max_series_payload_size"
196    )]
197    max_series_payload_size: usize,
198
199    /// Maximum uncompressed size, in bytes, of a V2 series payload.
200    ///
201    /// This applies only when `use_v2_api.series` is `true`. V1 series and sketches use
202    /// `serializer_max_uncompressed_payload_size` instead. The effective value is clamped to the V2 series API limit of
203    /// 5,242,880 bytes, so larger configured values do not allow payloads that intake would reject. This limit protects
204    /// the encoder before compression, so compressed payload size may still force a separate flush. Values smaller than
205    /// the minimum endpoint framing size prevent the request builder from starting.
206    ///
207    /// Defaults to 5,242,880 bytes.
208    #[serde(
209        rename = "serializer_max_series_uncompressed_payload_size",
210        default = "default_max_series_uncompressed_payload_size"
211    )]
212    max_series_uncompressed_payload_size: usize,
213
214    /// Flush timeout for pending requests, in seconds.
215    ///
216    /// When the destination has written metrics to the in-flight request payload, but it hasn't yet reached the
217    /// payload size limits that would force the payload to be flushed, the destination will wait for a period of time
218    /// before flushing the in-flight request payload. This allows for the possibility of other events to be processed
219    /// and written into the request payload, thereby maximizing the payload size and reducing the number of requests
220    /// generated and sent overall.
221    ///
222    /// Defaults to 2 seconds.
223    #[serde(default = "default_flush_timeout_secs")]
224    flush_timeout_secs: u64,
225
226    /// Compression kind to use for the request payloads.
227    ///
228    /// Defaults to `zstd`.
229    #[serde(
230        rename = "serializer_compressor_kind",
231        default = "default_serializer_compressor_kind"
232    )]
233    compressor_kind: String,
234
235    /// Compressor level to use when the compressor kind is `zstd`.
236    ///
237    /// Defaults to 3.
238    #[serde(
239        rename = "serializer_zstd_compressor_level",
240        default = "default_zstd_compressor_level"
241    )]
242    zstd_compressor_level: i32,
243
244    /// Whether to use the V2 API for series metrics.
245    ///
246    /// When `true` (the default), series metrics are sent to the V2 protobuf endpoint (`/api/v2/series`). When
247    /// `false`, series metrics are sent to the legacy V1 JSON endpoint (`/api/v1/series`). Sketch metrics always use
248    /// the V2 endpoint (`/api/beta/sketches`) regardless of this setting.
249    ///
250    /// Defaults to `true`.
251    #[serde(default = "default_use_v2_api_series")]
252    use_v2_api_series: bool,
253
254    /// Whether to log metric payload contents before encoding.
255    ///
256    /// This logs decoded metric objects, not the encoded JSON/protobuf HTTP body.
257    ///
258    /// Defaults to `false`.
259    #[serde(default = "default_log_payloads")]
260    log_payloads: bool,
261
262    /// Additional tags to apply to all forwarded metrics.
263    #[serde(default, skip)]
264    #[facet(opaque)]
265    additional_tags: Option<SharedTagSet>,
266}
267
268impl DatadogMetricsConfiguration {
269    /// Creates a new `DatadogMetricsConfiguration` from the given configuration.
270    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
271        Ok(config.as_typed()?)
272    }
273
274    /// Sets additional tags to be applied uniformly to all metrics forwarded by this destination.
275    pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
276        // Add the additional tags to the forwarder configuration.
277        self.additional_tags = Some(additional_tags);
278        self
279    }
280}
281
282#[async_trait]
283impl EncoderBuilder for DatadogMetricsConfiguration {
284    fn input_event_type(&self) -> EventType {
285        EventType::Metric
286    }
287
288    fn output_payload_type(&self) -> PayloadType {
289        PayloadType::Http
290    }
291
292    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
293        let metrics_builder = MetricsBuilder::from_component_context(&context);
294        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
295        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
296
297        // Create our request builders.
298        let series_endpoint = if self.use_v2_api_series {
299            MetricsEndpoint::SeriesV2
300        } else {
301            MetricsEndpoint::SeriesV1
302        };
303        let mut series_encoder = MetricsEndpointEncoder::from_endpoint(series_endpoint);
304        let mut sketches_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
305
306        if let Some(additional_tags) = self.additional_tags.as_ref() {
307            series_encoder = series_encoder.with_additional_tags(additional_tags.clone());
308            sketches_encoder = sketches_encoder.with_additional_tags(additional_tags.clone());
309        }
310
311        let mut series_rb = RequestBuilder::new(series_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
312        series_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
313
314        let generic_payload_limits = clamp_payload_limits(
315            self.max_uncompressed_payload_size,
316            self.max_payload_size,
317            DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
318            DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
319        );
320        let (series_uncompressed_limit, series_compressed_limit) = if series_endpoint == MetricsEndpoint::SeriesV2 {
321            clamp_payload_limits(
322                self.max_series_uncompressed_payload_size,
323                self.max_series_payload_size,
324                SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
325                SERIES_V2_COMPRESSED_SIZE_LIMIT,
326            )
327        } else {
328            generic_payload_limits
329        };
330        series_rb.with_len_limits(series_uncompressed_limit, series_compressed_limit)?;
331
332        let mut sketches_rb = RequestBuilder::new(sketches_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
333        sketches_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
334        let (sketches_uncompressed_limit, sketches_compressed_limit) = generic_payload_limits;
335        sketches_rb.with_len_limits(sketches_uncompressed_limit, sketches_compressed_limit)?;
336
337        let flush_timeout = match self.flush_timeout_secs {
338            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
339            // batching, while still practically flushing things almost immediately.
340            0 => Duration::from_millis(10),
341            secs => Duration::from_secs(secs),
342        };
343
344        Ok(Box::new(DatadogMetrics {
345            series_rb,
346            sketches_rb,
347            telemetry,
348            flush_timeout,
349            log_payloads: self.log_payloads,
350        }))
351    }
352}
353
354impl MemoryBounds for DatadogMetricsConfiguration {
355    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
356        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
357        //
358        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
359        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
360        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
361        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
362        // calculate the firm bound.
363        builder
364            .minimum()
365            .with_single_value::<DatadogMetrics>("component struct")
366            .with_array::<EventsBuffer>("request builder events channel", 8)
367            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
368
369        builder
370            .firm()
371            // Capture the size of the "split re-encode" buffers in the request builders, which is where we keep owned
372            // versions of metrics that we encode in case we need to actually re-encode them during a split operation.
373            .with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
374            .with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
375    }
376}
377
378pub struct DatadogMetrics {
379    series_rb: RequestBuilder<MetricsEndpointEncoder>,
380    sketches_rb: RequestBuilder<MetricsEndpointEncoder>,
381    telemetry: ComponentTelemetry,
382    flush_timeout: Duration,
383    log_payloads: bool,
384}
385
386#[async_trait]
387impl Encoder for DatadogMetrics {
388    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
389        let Self {
390            series_rb,
391            sketches_rb,
392            telemetry,
393            flush_timeout,
394            log_payloads,
395        } = *self;
396
397        let mut health = context.take_health_handle();
398
399        // Spawn our request builder task.
400        let (events_tx, events_rx) = mpsc::channel(8);
401        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
402        let request_builder_fut = run_request_builder(
403            series_rb,
404            sketches_rb,
405            telemetry,
406            events_rx,
407            payloads_tx,
408            flush_timeout,
409            log_payloads,
410        );
411        let request_builder_handle = context
412            .topology_context()
413            .global_thread_pool()
414            .spawn_traced_named("dd-metrics-request-builder", request_builder_fut);
415
416        health.mark_ready();
417        debug!("Datadog Metrics encoder started.");
418
419        loop {
420            select! {
421                biased;
422
423                _ = health.live() => continue,
424                maybe_payload = payloads_rx.recv() => match maybe_payload {
425                    Some(payload) => {
426                        if let Err(e) = context.dispatcher().dispatch(payload).await {
427                            error!("Failed to dispatch payload: {}", e);
428                        }
429                    }
430                    None => break,
431                },
432                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
433                    Some(event_buffer) => events_tx.send(event_buffer).await
434                        .error_context("Failed to send event buffer to request builder task.")?,
435                    None => break,
436                },
437            }
438        }
439
440        // Drop the events sender, which signals the request builder task to stop.
441        drop(events_tx);
442
443        // Continue draining the payloads receiver until it is closed.
444        while let Some(payload) = payloads_rx.recv().await {
445            if let Err(e) = context.dispatcher().dispatch(payload).await {
446                error!("Failed to dispatch payload: {}", e);
447            }
448        }
449
450        // Request build task should now be stopped.
451        match request_builder_handle.await {
452            Ok(Ok(())) => debug!("Request builder task stopped."),
453            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
454            Err(e) => error!(error = %e, "Request builder task panicked."),
455        }
456
457        debug!("Datadog Metrics encoder stopped.");
458
459        Ok(())
460    }
461}
462
463async fn run_request_builder(
464    mut series_request_builder: RequestBuilder<MetricsEndpointEncoder>,
465    mut sketches_request_builder: RequestBuilder<MetricsEndpointEncoder>, telemetry: ComponentTelemetry,
466    mut events_rx: mpsc::Receiver<EventsBuffer>, payloads_tx: mpsc::Sender<PayloadsBuffer>, flush_timeout: Duration,
467    log_payloads: bool,
468) -> Result<(), GenericError> {
469    let mut pending_flush = false;
470    let pending_flush_timeout = sleep(flush_timeout);
471    tokio::pin!(pending_flush_timeout);
472
473    loop {
474        select! {
475            Some(event_buffer) = events_rx.recv() => {
476                for event in event_buffer {
477                    let metric = match event.try_into_metric() {
478                        Some(metric) => metric,
479                        None => continue,
480                    };
481
482                    if log_payloads {
483                        log_metric_payload(&metric);
484                    }
485
486                    // Series metrics (counters, gauges, rates, sets) and sketch metrics (histograms, distributions)
487                    // route to their respective request builders. Whether the series builder targets the V1 or V2
488                    // intake is decided once at builder time based on `use_v2_api_series`.
489                    let request_builder = match metric.values() {
490                        MetricValues::Counter(..)
491                        | MetricValues::Rate(..)
492                        | MetricValues::Gauge(..)
493                        | MetricValues::Set(..) => &mut series_request_builder,
494                        MetricValues::Histogram(..) | MetricValues::Distribution(..) => &mut sketches_request_builder,
495                    };
496
497                    // Encode the metric. If we get it back, that means the current request is full, and we need to
498                    // flush it before we can try to encode the metric again... so we'll hold on to it in that case
499                    // before flushing and trying to encode it again.
500                    let metric_to_retry = match request_builder.encode(metric).await {
501                        Ok(None) => continue,
502                        Ok(Some(metric)) => metric,
503                        Err(e) => {
504                            error!(error = %e, "Failed to encode metric.");
505                            telemetry.events_dropped_encoder().increment(1);
506                            continue;
507                        }
508                    };
509
510                    let maybe_requests = request_builder.flush().await;
511                    if maybe_requests.is_empty() {
512                        panic!("builder told us to flush, but gave us nothing");
513                    }
514
515                    for maybe_request in maybe_requests {
516                        match maybe_request {
517                            Ok((events, data_points, request)) => {
518                                let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
519                                let http_payload = HttpPayload::new(payload_meta, request);
520                                let payload = Payload::Http(http_payload);
521
522                                payloads_tx.send(payload).await
523                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
524                            },
525
526                            // TODO: Increment a counter here that metrics were dropped due to a flush failure.
527                            Err(e) => if e.is_recoverable() {
528                                // If the error is recoverable, we'll hold on to the metric to retry it later.
529                                continue;
530                            } else {
531                                return Err(GenericError::from(e).context("Failed to flush request."));
532                            }
533                        }
534                    }
535
536                    // Now try to encode the metric again. If it fails again, we'll just log it because it shouldn't
537                    // be possible to fail at this point, otherwise we would have already caught that the first
538                    // time.
539                    if let Err(e) = request_builder.encode(metric_to_retry).await {
540                        error!(error = %e, "Failed to encode metric.");
541                        telemetry.events_dropped_encoder().increment(1);
542                    }
543                }
544
545                debug!("Processed event buffer.");
546
547                // If we're not already pending a flush, we'll start the countdown.
548                if !pending_flush {
549                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
550                    pending_flush = true;
551                }
552            },
553            _ = &mut pending_flush_timeout, if pending_flush => {
554                debug!("Flushing pending request(s).");
555
556                pending_flush = false;
557
558                // Once we've encoded and written all metrics, we flush the request builders to generate a request with
559                // anything left over. Again, we'll enqueue those requests to be sent immediately.
560                let maybe_series_requests = series_request_builder.flush().await;
561                for maybe_request in maybe_series_requests {
562                    match maybe_request {
563                        Ok((events, data_points, request)) => {
564                            let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
565                            let http_payload = HttpPayload::new(payload_meta, request);
566                            let payload = Payload::Http(http_payload);
567
568                            payloads_tx.send(payload).await
569                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
570                        },
571
572                        // TODO: Increment a counter here that metrics were dropped due to a flush failure.
573                        Err(e) => if e.is_recoverable() {
574                            // If the error is recoverable, we'll hold on to the metric to retry it later.
575                            continue;
576                        } else {
577                            return Err(GenericError::from(e).context("Failed to flush request."));
578                        }
579                    }
580                }
581
582                let maybe_sketches_requests = sketches_request_builder.flush().await;
583                for maybe_request in maybe_sketches_requests {
584                    match maybe_request {
585                        Ok((events, data_points, request)) => {
586                            let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
587                            let http_payload = HttpPayload::new(payload_meta, request);
588                            let payload = Payload::Http(http_payload);
589
590                            payloads_tx.send(payload).await
591                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
592                        },
593
594                        // TODO: Increment a counter here that metrics were dropped due to a flush failure.
595                        Err(e) => if e.is_recoverable() {
596                            // If the error is recoverable, we'll hold on to the metric to retry it later.
597                            continue;
598                        } else {
599                            return Err(GenericError::from(e).context("Failed to flush request."));
600                        }
601                    }
602                }
603
604                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
605            },
606
607            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
608            else => break,
609        }
610    }
611
612    Ok(())
613}
614
615fn log_metric_payload(metric: &Metric) {
616    match metric.values() {
617        MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
618            debug!(?metric, "Flushing series metric.")
619        }
620        MetricValues::Histogram(..) | MetricValues::Distribution(..) => {
621            debug!(?metric, "Flushing sketch metric.")
622        }
623    }
624}
625
626/// Metrics intake endpoint.
627#[derive(Clone, Copy, Debug, Eq, PartialEq)]
628enum MetricsEndpoint {
629    /// V1 series metrics, encoded as JSON and sent to `/api/v1/series`.
630    ///
631    /// Includes counters, gauges, rates, and sets. Selected when `use_v2_api.series` is `false`.
632    SeriesV1,
633
634    /// V2 series metrics, encoded as Protocol Buffers and sent to `/api/v2/series`.
635    ///
636    /// Includes counters, gauges, rates, and sets. The default series encoding.
637    SeriesV2,
638
639    /// Sketch metrics, encoded as Protocol Buffers and sent to `/api/beta/sketches`.
640    ///
641    /// Includes histograms and distributions. Always uses the V2 endpoint regardless of `use_v2_api.series`.
642    Sketches,
643}
644
645/// Error returned when a metric fails to encode for either the V1 JSON or V2 protobuf intake.
646#[derive(Debug)]
647pub enum MetricsEncodeError {
648    /// Protobuf encoding failed.
649    Protobuf(protobuf::Error),
650
651    /// JSON encoding failed.
652    Json(serde_json::Error),
653}
654
655impl fmt::Display for MetricsEncodeError {
656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657        match self {
658            Self::Protobuf(e) => write!(f, "protobuf encode error: {}", e),
659            Self::Json(e) => write!(f, "json encode error: {}", e),
660        }
661    }
662}
663
664impl std::error::Error for MetricsEncodeError {
665    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
666        match self {
667            Self::Protobuf(e) => Some(e),
668            Self::Json(e) => Some(e),
669        }
670    }
671}
672
673impl From<protobuf::Error> for MetricsEncodeError {
674    fn from(value: protobuf::Error) -> Self {
675        Self::Protobuf(value)
676    }
677}
678
679impl From<serde_json::Error> for MetricsEncodeError {
680    fn from(value: serde_json::Error) -> Self {
681        Self::Json(value)
682    }
683}
684
685#[derive(Debug)]
686struct MetricsEndpointEncoder {
687    endpoint: MetricsEndpoint,
688    primary_scratch_buf: Vec<u8>,
689    secondary_scratch_buf: Vec<u8>,
690    packed_scratch_buf: Vec<u8>,
691    additional_tags: SharedTagSet,
692    tags_deduplicator: ReusableDeduplicator<Tag>,
693}
694
695impl MetricsEndpointEncoder {
696    /// Creates a new `MetricsEndpointEncoder` for the given endpoint.
697    pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self {
698        Self {
699            endpoint,
700            primary_scratch_buf: Vec::new(),
701            secondary_scratch_buf: Vec::new(),
702            packed_scratch_buf: Vec::new(),
703            additional_tags: SharedTagSet::default(),
704            tags_deduplicator: ReusableDeduplicator::new(),
705        }
706    }
707
708    /// Sets the additional tags to be included with every metric encoded by this encoder.
709    ///
710    /// These tags are added in a deduplicated fashion, the same as instrumented tags and origin tags. This is an
711    /// optimized codepath for tag inclusion in high-volume scenarios, where creating new additional contexts
712    /// through the traditional means (for example, `ContextResolver`) would be too expensive.
713    pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
714        self.additional_tags = additional_tags;
715        self
716    }
717}
718
719impl EndpointEncoder for MetricsEndpointEncoder {
720    type Input = Metric;
721    type EncodeError = MetricsEncodeError;
722
723    fn encoder_name() -> &'static str {
724        "metrics"
725    }
726
727    fn compressed_size_limit(&self) -> usize {
728        match self.endpoint {
729            MetricsEndpoint::SeriesV1 => SERIES_V1_COMPRESSED_SIZE_LIMIT,
730            MetricsEndpoint::SeriesV2 => SERIES_V2_COMPRESSED_SIZE_LIMIT,
731            MetricsEndpoint::Sketches => DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
732        }
733    }
734
735    fn uncompressed_size_limit(&self) -> usize {
736        match self.endpoint {
737            MetricsEndpoint::SeriesV1 => SERIES_V1_UNCOMPRESSED_SIZE_LIMIT,
738            MetricsEndpoint::SeriesV2 => SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
739            MetricsEndpoint::Sketches => DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
740        }
741    }
742
743    fn input_data_point_count(&self, input: &Self::Input) -> usize {
744        input.values().len()
745    }
746
747    fn is_valid_input(&self, input: &Self::Input) -> bool {
748        let is_series_input = matches!(
749            input.values(),
750            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..)
751        );
752
753        match self.endpoint {
754            MetricsEndpoint::SeriesV1 | MetricsEndpoint::SeriesV2 => is_series_input,
755            MetricsEndpoint::Sketches => !is_series_input,
756        }
757    }
758
759    fn get_payload_prefix(&self) -> Option<&'static [u8]> {
760        match self.endpoint {
761            MetricsEndpoint::SeriesV1 => Some(SERIES_V1_PAYLOAD_PREFIX),
762            _ => None,
763        }
764    }
765
766    fn get_payload_suffix(&self) -> Option<&'static [u8]> {
767        match self.endpoint {
768            MetricsEndpoint::SeriesV1 => Some(SERIES_V1_PAYLOAD_SUFFIX),
769            _ => None,
770        }
771    }
772
773    fn get_input_separator(&self) -> Option<&'static [u8]> {
774        match self.endpoint {
775            MetricsEndpoint::SeriesV1 => Some(SERIES_V1_INPUT_SEPARATOR),
776            _ => None,
777        }
778    }
779
780    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
781        match self.endpoint {
782            MetricsEndpoint::SeriesV1 => {
783                encode_series_v1_metric(input, &self.additional_tags, buffer, &mut self.tags_deduplicator)?;
784                Ok(())
785            }
786            MetricsEndpoint::SeriesV2 | MetricsEndpoint::Sketches => {
787                // NOTE: We're passing _four_ buffers to `encode_single_metric`, which is a lot, but with good reason.
788                //
789                // The first buffer, `buffer`, is the overall output buffer: the caller expects us to put the full
790                // encoded metric payload into this buffer.
791                //
792                // The second and third buffers, `primary_scratch_buf` and `secondary_scratch_buf`, are used for
793                // roughly the same thing but deal with _nesting_. When writing a "message" in Protocol Buffers, the
794                // message data itself is prefixed with the field number and a length delimiter that specifies how
795                // long the message is. We can't write that length delimiter until we know the full size of the
796                // message, so we write the message to a scratch buffer, calculate its size, and then write the field
797                // number and length delimiter to the output buffer followed by the message data from the scratch
798                // buffer.
799                //
800                // We have _two_ scratch buffers because you need a dedicated buffer for each level of nested message.
801                // We have to be able to nest up to two levels deep in our metrics payload, so we need two scratch
802                // buffers to handle that.
803                //
804                // The fourth buffer, `packed_scratch_buf`, is used for writing out packed repeated fields. This is
805                // similar to the situation describe above, except it's not _exactly_ the same as an additional level
806                // of nesting.. so I just decided to give it a somewhat more descriptive name.
807                encode_single_metric(
808                    input,
809                    &self.additional_tags,
810                    buffer,
811                    &mut self.primary_scratch_buf,
812                    &mut self.secondary_scratch_buf,
813                    &mut self.packed_scratch_buf,
814                    &mut self.tags_deduplicator,
815                )?;
816                Ok(())
817            }
818        }
819    }
820
821    fn endpoint_uri(&self) -> Uri {
822        match self.endpoint {
823            MetricsEndpoint::SeriesV1 => PathAndQuery::from_static(METRICS_SERIES_V1_PATH).into(),
824            MetricsEndpoint::SeriesV2 => PathAndQuery::from_static(METRICS_SERIES_V2_PATH).into(),
825            MetricsEndpoint::Sketches => PathAndQuery::from_static(METRICS_SKETCHES_PATH).into(),
826        }
827    }
828
829    fn endpoint_method(&self) -> Method {
830        // All endpoints use POST.
831        Method::POST
832    }
833
834    fn content_type(&self) -> HeaderValue {
835        match self.endpoint {
836            MetricsEndpoint::SeriesV1 => CONTENT_TYPE_JSON.clone(),
837            MetricsEndpoint::SeriesV2 | MetricsEndpoint::Sketches => CONTENT_TYPE_PROTOBUF.clone(),
838        }
839    }
840}
841
842fn field_number_for_metric_type(metric: &Metric) -> u32 {
843    match metric.values() {
844        MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => 1,
845        MetricValues::Histogram(..) | MetricValues::Distribution(..) => 1,
846    }
847}
848
849fn get_message_size(raw_msg_size: usize) -> Result<u32, protobuf::Error> {
850    const MAX_MESSAGE_SIZE: u64 = i32::MAX as u64;
851
852    // Individual messages cannot be larger than `i32::MAX`, so check that here before proceeding.
853    if raw_msg_size as u64 > MAX_MESSAGE_SIZE {
854        return Err(std::io::Error::other("message size exceeds limit (2147483648 bytes)").into());
855    }
856
857    Ok(raw_msg_size as u32)
858}
859
860fn get_message_size_from_buffer(buf: &[u8]) -> Result<u32, protobuf::Error> {
861    get_message_size(buf.len())
862}
863
864fn encode_single_metric(
865    metric: &Metric, additional_tags: &SharedTagSet, output_buf: &mut Vec<u8>, primary_scratch_buf: &mut Vec<u8>,
866    secondary_scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
867    tags_deduplicator: &mut ReusableDeduplicator<Tag>,
868) -> Result<(), protobuf::Error> {
869    let mut output_stream = CodedOutputStream::vec(output_buf);
870    let field_number = field_number_for_metric_type(metric);
871
872    write_nested_message(&mut output_stream, primary_scratch_buf, field_number, |os| {
873        // Depending on the metric type, we write out the appropriate fields.
874        match metric.values() {
875            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
876                encode_series_v2_metric(metric, additional_tags, os, secondary_scratch_buf, tags_deduplicator)
877            }
878            MetricValues::Histogram(..) | MetricValues::Distribution(..) => encode_sketch_metric(
879                metric,
880                additional_tags,
881                os,
882                secondary_scratch_buf,
883                packed_scratch_buf,
884                tags_deduplicator,
885            ),
886        }
887    })
888}
889
890fn encode_series_v2_metric(
891    metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
892    scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
893) -> Result<(), protobuf::Error> {
894    // Write the metric name and tags.
895    output_stream.write_string(SERIES_METRIC_FIELD_NUMBER, metric.context().name())?;
896
897    let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
898    write_series_tags(deduplicated_tags, output_stream, scratch_buf)?;
899
900    // Set the host resource.
901    write_resource(
902        output_stream,
903        scratch_buf,
904        "host",
905        metric.metadata().hostname().unwrap_or_default(),
906    )?;
907
908    // Write the origin metadata, if it exists.
909    if let Some(origin) = metric.metadata().origin() {
910        match origin {
911            MetricOrigin::SourceType(source_type) => {
912                output_stream.write_string(SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER, source_type.as_ref())?;
913            }
914            MetricOrigin::OriginMetadata {
915                product,
916                subproduct,
917                product_detail,
918            } => {
919                write_origin_metadata(
920                    output_stream,
921                    scratch_buf,
922                    SERIES_METADATA_FIELD_NUMBER,
923                    *product,
924                    *subproduct,
925                    *product_detail,
926                )?;
927            }
928        }
929    }
930
931    // Now write out our metric type, points, and interval (if applicable).
932    let (metric_type, points, maybe_interval) = match metric.values() {
933        MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
934        MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
935        MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
936        MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
937        _ => unreachable!("encode_series_v2_metric called with non-series metric"),
938    };
939
940    output_stream.write_enum(SERIES_TYPE_FIELD_NUMBER, metric_type.value())?;
941
942    if let Some(unit) = metric.metadata().unit() {
943        output_stream.write_string(SERIES_UNIT_FIELD_NUMBER, unit)?;
944    }
945
946    for (timestamp, value) in points {
947        // If this is a rate metric, scale our value by the interval, in seconds.
948        let value = maybe_interval
949            .map(|interval| value / interval.as_secs_f64())
950            .unwrap_or(value);
951        let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
952
953        write_point(output_stream, scratch_buf, value, timestamp)?;
954    }
955
956    if let Some(interval) = maybe_interval {
957        output_stream.write_int64(SERIES_INTERVAL_FIELD_NUMBER, interval.as_secs() as i64)?;
958    }
959
960    Ok(())
961}
962
963fn encode_series_v1_metric(
964    metric: &Metric, additional_tags: &SharedTagSet, buffer: &mut Vec<u8>,
965    tags_deduplicator: &mut ReusableDeduplicator<Tag>,
966) -> Result<(), serde_json::Error> {
967    let mut obj = JsonMap::new();
968
969    obj.insert("metric".into(), JsonValue::String(metric.context().name().to_string()));
970
971    let (type_str, points_iter, maybe_interval) = match metric.values() {
972        MetricValues::Counter(points) => ("count", points.into_iter(), None),
973        MetricValues::Rate(points, interval) => ("rate", points.into_iter(), Some(*interval)),
974        MetricValues::Gauge(points) => ("gauge", points.into_iter(), None),
975        MetricValues::Set(points) => ("gauge", points.into_iter(), None),
976        _ => unreachable!("encode_series_v1_metric called with non-series metric"),
977    };
978
979    let mut points = Vec::new();
980    for (timestamp, value) in points_iter {
981        // For rates, value is scaled by interval seconds — same as the V2 encoder.
982        let value = maybe_interval
983            .map(|interval| value / interval.as_secs_f64())
984            .unwrap_or(value);
985        let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
986
987        // V1 emits each point as a [timestamp, value] tuple — not a nested object.
988        let value_json = JsonNumber::from_f64(value)
989            .map(JsonValue::Number)
990            .unwrap_or_else(|| JsonValue::from(0));
991        points.push(JsonValue::Array(vec![JsonValue::from(timestamp), value_json]));
992    }
993    obj.insert("points".into(), JsonValue::Array(points));
994
995    // Walk the deduplicated tag set once, extracting the first `device:<value>` tag into the device JSON field while
996    // dropping `dd.internal.resource` (which is a V2-protobuf-only concept with no V1 representation).
997    let deduplicated = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
998    let mut tags_out = Vec::new();
999    let mut device: Option<String> = None;
1000    for tag in deduplicated {
1001        if tag.name() == "dd.internal.resource" {
1002            continue;
1003        }
1004        if device.is_none() && tag.name() == "device" {
1005            if let Some(v) = tag.value() {
1006                device = Some(v.to_string());
1007                continue;
1008            }
1009        }
1010        tags_out.push(JsonValue::String(tag.as_str().to_string()));
1011    }
1012    obj.insert("tags".into(), JsonValue::Array(tags_out));
1013
1014    // V1 always emits `host` and `interval`, even when empty/zero — matches the Agent encoder.
1015    obj.insert(
1016        "host".into(),
1017        JsonValue::String(metric.metadata().hostname().unwrap_or_default().to_string()),
1018    );
1019
1020    if let Some(d) = device.filter(|s| !s.is_empty()) {
1021        obj.insert("device".into(), JsonValue::String(d));
1022    }
1023
1024    obj.insert("type".into(), JsonValue::String(type_str.into()));
1025
1026    let interval_secs = maybe_interval.map(|iv| iv.as_secs() as i64).unwrap_or(0);
1027    obj.insert("interval".into(), JsonValue::from(interval_secs));
1028
1029    // V1 only emits `source_type_name` from `MetricOrigin::SourceType`.
1030    if let Some(MetricOrigin::SourceType(s)) = metric.metadata().origin() {
1031        obj.insert("source_type_name".into(), JsonValue::String(s.as_ref().to_string()));
1032    }
1033
1034    if let Some(unit) = metric.metadata().unit() {
1035        if !unit.is_empty() {
1036            obj.insert("unit".into(), JsonValue::String(unit.to_string()));
1037        }
1038    }
1039
1040    serde_json::to_writer(buffer, &JsonValue::Object(obj))
1041}
1042
1043fn encode_sketch_metric(
1044    metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
1045    scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
1046) -> Result<(), protobuf::Error> {
1047    // Write the metric name and tags.
1048    output_stream.write_string(SKETCH_METRIC_FIELD_NUMBER, metric.context().name())?;
1049
1050    let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
1051    write_sketch_tags(deduplicated_tags, output_stream, scratch_buf)?;
1052
1053    // Write the host.
1054    output_stream.write_string(
1055        SKETCH_HOST_FIELD_NUMBER,
1056        metric.metadata().hostname().unwrap_or_default(),
1057    )?;
1058
1059    // Set the origin metadata, if it exists.
1060    if let Some(MetricOrigin::OriginMetadata {
1061        product,
1062        subproduct,
1063        product_detail,
1064    }) = metric.metadata().origin()
1065    {
1066        write_origin_metadata(
1067            output_stream,
1068            scratch_buf,
1069            SKETCH_METADATA_FIELD_NUMBER,
1070            *product,
1071            *subproduct,
1072            *product_detail,
1073        )?;
1074    }
1075
1076    // TODO: emit `metric.metadata().unit()` in the sketch payload once the upstream `agent-payload` proto defines a
1077    // unit field on `SketchPayload.Sketch`.
1078
1079    // Write out our sketches.
1080    match metric.values() {
1081        MetricValues::Distribution(sketches) => {
1082            for (timestamp, value) in sketches {
1083                write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, value)?;
1084            }
1085        }
1086        MetricValues::Histogram(points) => {
1087            for (timestamp, histogram) in points {
1088                // We convert histograms to sketches to be able to write them out in the payload.
1089                let mut ddsketch = DDSketch::default();
1090                for sample in histogram.samples() {
1091                    ddsketch.insert_n(sample.value.into_inner(), sample.weight.0 as u64);
1092                }
1093
1094                write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
1095            }
1096        }
1097        _ => unreachable!("encode_sketch_metric called with non-sketch metric"),
1098    }
1099
1100    Ok(())
1101}
1102
1103fn write_resource(
1104    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, resource_type: &str, resource_name: &str,
1105) -> Result<(), protobuf::Error> {
1106    write_nested_message(output_stream, scratch_buf, SERIES_RESOURCES_FIELD_NUMBER, |os| {
1107        os.write_string(RESOURCES_TYPE_FIELD_NUMBER, resource_type)?;
1108        os.write_string(RESOURCES_NAME_FIELD_NUMBER, resource_name)
1109    })
1110}
1111
1112fn write_origin_metadata(
1113    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, origin_product: u32,
1114    origin_category: u32, origin_service: u32,
1115) -> Result<(), protobuf::Error> {
1116    // TODO: Figure out how to cleanly use `write_nested_message` here.
1117
1118    scratch_buf.clear();
1119
1120    {
1121        let mut origin_output_stream = CodedOutputStream::vec(scratch_buf);
1122        origin_output_stream.write_uint32(ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER, origin_product)?;
1123        origin_output_stream.write_uint32(ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER, origin_category)?;
1124        origin_output_stream.write_uint32(ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER, origin_service)?;
1125        origin_output_stream.flush()?;
1126    }
1127
1128    // We do a little song and dance here because the `Origin` message is embedded inside of `Metadata`, so we need to
1129    // write out field numbers/length delimiters in order: `Metadata`, and then `Origin`... but we write out origin
1130    // message to the scratch buffer first... so we write out our `Metadata` preamble stuff to get its length, and then
1131    // use that in conjunction with the `Origin` message size to write out the full `Metadata` message.
1132    let origin_message_size = get_message_size_from_buffer(scratch_buf)?;
1133
1134    let mut metadata_preamble_buf = [0; 64];
1135    let metadata_preamble_len = {
1136        let mut metadata_output_stream = CodedOutputStream::bytes(&mut metadata_preamble_buf[..]);
1137        metadata_output_stream.write_tag(METADATA_ORIGIN_FIELD_NUMBER, WireType::LengthDelimited)?;
1138        metadata_output_stream.write_raw_varint32(origin_message_size)?;
1139        metadata_output_stream.flush()?;
1140        metadata_output_stream.total_bytes_written() as usize
1141    };
1142
1143    let metadata_message_size = get_message_size(scratch_buf.len() + metadata_preamble_len)?;
1144
1145    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1146    output_stream.write_raw_varint32(metadata_message_size)?;
1147    output_stream.write_raw_bytes(&metadata_preamble_buf[..metadata_preamble_len])?;
1148    output_stream.write_raw_bytes(scratch_buf)
1149}
1150
1151fn write_point(
1152    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, value: f64, timestamp: i64,
1153) -> Result<(), protobuf::Error> {
1154    write_nested_message(output_stream, scratch_buf, SERIES_POINTS_FIELD_NUMBER, |os| {
1155        os.write_double(METRIC_POINT_VALUE_FIELD_NUMBER, value)?;
1156        os.write_int64(METRIC_POINT_TIMESTAMP_FIELD_NUMBER, timestamp)
1157    })
1158}
1159
1160fn write_dogsketch(
1161    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
1162    timestamp: Option<NonZeroU64>, sketch: &DDSketch,
1163) -> Result<(), protobuf::Error> {
1164    // If the sketch is empty, we don't write it out.
1165    if sketch.is_empty() {
1166        warn!("Attempted to write an empty sketch to sketches payload, skipping.");
1167        return Ok(());
1168    }
1169
1170    write_nested_message(output_stream, scratch_buf, SKETCH_DOGSKETCHES_FIELD_NUMBER, |os| {
1171        os.write_int64(DOGSKETCH_TS_FIELD_NUMBER, timestamp.map_or(0, |ts| ts.get() as i64))?;
1172        os.write_int64(DOGSKETCH_CNT_FIELD_NUMBER, sketch.count() as i64)?;
1173        os.write_double(DOGSKETCH_MIN_FIELD_NUMBER, sketch.min().unwrap())?;
1174        os.write_double(DOGSKETCH_MAX_FIELD_NUMBER, sketch.max().unwrap())?;
1175        os.write_double(DOGSKETCH_AVG_FIELD_NUMBER, sketch.avg().unwrap())?;
1176        os.write_double(DOGSKETCH_SUM_FIELD_NUMBER, sketch.sum().unwrap())?;
1177
1178        let bin_keys = sketch.bins().iter().map(|bin| bin.key());
1179        write_repeated_packed_from_iter(
1180            os,
1181            packed_scratch_buf,
1182            DOGSKETCH_K_FIELD_NUMBER,
1183            bin_keys,
1184            |inner_os, value| inner_os.write_sint32_no_tag(value),
1185        )?;
1186
1187        let bin_counts = sketch.bins().iter().map(|bin| bin.count());
1188        write_repeated_packed_from_iter(
1189            os,
1190            packed_scratch_buf,
1191            DOGSKETCH_N_FIELD_NUMBER,
1192            bin_counts,
1193            |inner_os, value| inner_os.write_uint32_no_tag(value),
1194        )
1195    })
1196}
1197
1198fn get_deduplicated_tags<'a>(
1199    metric: &'a Metric, additional_tags: &'a SharedTagSet, tags_deduplicator: &'a mut ReusableDeduplicator<Tag>,
1200) -> impl Iterator<Item = &'a Tag> {
1201    let chained_tags = metric
1202        .context()
1203        .tags()
1204        .into_iter()
1205        .chain(additional_tags)
1206        .chain(metric.context().origin_tags());
1207
1208    tags_deduplicator.deduplicated(chained_tags)
1209}
1210
1211fn write_tags<'a, I, F>(
1212    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, tag_encoder: F,
1213) -> Result<(), protobuf::Error>
1214where
1215    I: Iterator<Item = &'a Tag>,
1216    F: Fn(&Tag, &mut CodedOutputStream<'_>, &mut Vec<u8>) -> Result<(), protobuf::Error>,
1217{
1218    for tag in tags {
1219        tag_encoder(tag, output_stream, scratch_buf)?;
1220    }
1221
1222    Ok(())
1223}
1224
1225fn write_series_tags<'a, I>(
1226    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
1227) -> Result<(), protobuf::Error>
1228where
1229    I: Iterator<Item = &'a Tag>,
1230{
1231    write_tags(tags, output_stream, scratch_buf, |tag, os, buf| {
1232        // If this is a resource tag, we'll convert it directly to a resource entry.
1233        if tag.name() == "dd.internal.resource" {
1234            if let Some((resource_type, resource_name)) = tag.value().and_then(|s| s.split_once(':')) {
1235                write_resource(os, buf, resource_type, resource_name)
1236            } else {
1237                Ok(())
1238            }
1239        } else {
1240            // We're dealing with a normal tag.
1241            os.write_string(SERIES_TAGS_FIELD_NUMBER, tag.as_str())
1242        }
1243    })
1244}
1245
1246fn write_sketch_tags<'a, I>(
1247    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
1248) -> Result<(), protobuf::Error>
1249where
1250    I: Iterator<Item = &'a Tag>,
1251{
1252    write_tags(tags, output_stream, scratch_buf, |tag, os, _buf| {
1253        // We always write the tags as-is, without any special handling for resource tags.
1254        os.write_string(SKETCH_TAGS_FIELD_NUMBER, tag.as_str())
1255    })
1256}
1257
1258fn write_nested_message<F>(
1259    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, writer: F,
1260) -> Result<(), protobuf::Error>
1261where
1262    F: FnOnce(&mut CodedOutputStream<'_>) -> Result<(), protobuf::Error>,
1263{
1264    scratch_buf.clear();
1265
1266    {
1267        let mut nested_output_stream = CodedOutputStream::vec(scratch_buf);
1268        writer(&mut nested_output_stream)?;
1269        nested_output_stream.flush()?;
1270    }
1271
1272    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1273
1274    let nested_message_size = get_message_size_from_buffer(scratch_buf)?;
1275    output_stream.write_raw_varint32(nested_message_size)?;
1276    output_stream.write_raw_bytes(scratch_buf)
1277}
1278
1279fn write_repeated_packed_from_iter<I, T, F>(
1280    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, values: I, writer: F,
1281) -> Result<(), protobuf::Error>
1282where
1283    I: Iterator<Item = T>,
1284    F: Fn(&mut CodedOutputStream<'_>, T) -> Result<(), protobuf::Error>,
1285{
1286    // This is a helper function that lets us write out a packed repeated field from an iterator of values.
1287    // `CodedOutputStream` has similar functions to handle this, but they require a slice of values, which would mean we
1288    // need to either allocate a new vector each time to hold the values, or thread through two additional vectors (one
1289    // for `i32`, one for `u32`) to reuse the allocation... both of which are not great options.
1290    //
1291    // We've simply opted to pass through a _single_ vector that we can reuse, and write the packed values directly to
1292    // that, almost identically to how `CodedOutputStream::write_repeated_packed_*` methods would do it.
1293
1294    scratch_buf.clear();
1295
1296    {
1297        let mut packed_output_stream = CodedOutputStream::vec(scratch_buf);
1298        for value in values {
1299            writer(&mut packed_output_stream, value)?;
1300        }
1301        packed_output_stream.flush()?;
1302    }
1303
1304    let data_size = get_message_size_from_buffer(scratch_buf)?;
1305
1306    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1307    output_stream.write_raw_varint32(data_size)?;
1308    output_stream.write_raw_bytes(scratch_buf)
1309}
1310
1311#[cfg(test)]
1312mod tests {
1313    use std::{sync::Arc, time::Duration};
1314
1315    use protobuf::CodedOutputStream;
1316    use saluki_common::iter::ReusableDeduplicator;
1317    use saluki_context::{tags::SharedTagSet, Context};
1318    use saluki_core::data_model::event::metric::{Metric, MetricMetadata, MetricOrigin, MetricValues};
1319    use serde_json::Value as JsonValue;
1320    use stringtheory::MetaString;
1321
1322    use super::{
1323        encode_series_v1_metric, encode_series_v2_metric, encode_sketch_metric, MetricsEndpoint,
1324        MetricsEndpointEncoder, SERIES_V1_INPUT_SEPARATOR, SERIES_V1_PAYLOAD_PREFIX, SERIES_V1_PAYLOAD_SUFFIX,
1325    };
1326    use crate::common::datadog::{
1327        request_builder::EndpointEncoder as _, DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
1328        DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
1329    };
1330
1331    fn encode_one_v1(metric: &Metric) -> JsonValue {
1332        let mut buf = Vec::new();
1333        let host_tags = SharedTagSet::default();
1334        let mut tags_deduplicator = ReusableDeduplicator::new();
1335        encode_series_v1_metric(metric, &host_tags, &mut buf, &mut tags_deduplicator)
1336            .expect("encode_series_v1_metric should succeed");
1337        serde_json::from_slice(&buf).expect("encoder produced invalid JSON")
1338    }
1339
1340    #[test]
1341    fn histogram_vs_sketch_identical_payload() {
1342        // For the same exact set of points, we should be able to construct either a histogram or distribution from
1343        // those points, and when encoded as a sketch payload, end up with the same exact payload.
1344        //
1345        // They should be identical because the goal is that we convert histograms into sketches in the same way we
1346        // would have originally constructed a sketch based on the same samples.
1347        let samples = &[1.0, 2.0, 3.0, 4.0, 5.0];
1348        let histogram = Metric::histogram("simple_samples", samples);
1349        let distribution = Metric::distribution("simple_samples", samples);
1350        let host_tags = SharedTagSet::default();
1351
1352        let mut buf1 = Vec::new();
1353        let mut buf2 = Vec::new();
1354        let mut tags_deduplicator = ReusableDeduplicator::new();
1355
1356        let mut histogram_payload = Vec::new();
1357        {
1358            let mut histogram_writer = CodedOutputStream::vec(&mut histogram_payload);
1359            encode_sketch_metric(
1360                &histogram,
1361                &host_tags,
1362                &mut histogram_writer,
1363                &mut buf1,
1364                &mut buf2,
1365                &mut tags_deduplicator,
1366            )
1367            .expect("Failed to encode histogram as sketch");
1368        }
1369
1370        let mut distribution_payload = Vec::new();
1371        {
1372            let mut distribution_writer = CodedOutputStream::vec(&mut distribution_payload);
1373            encode_sketch_metric(
1374                &distribution,
1375                &host_tags,
1376                &mut distribution_writer,
1377                &mut buf1,
1378                &mut buf2,
1379                &mut tags_deduplicator,
1380            )
1381            .expect("Failed to encode distribution as sketch");
1382        }
1383
1384        assert_eq!(histogram_payload, distribution_payload);
1385    }
1386
1387    #[test]
1388    fn input_valid() {
1389        // Our encoder should always consider series metrics valid when set to either series endpoint, and similarly
1390        // for sketch metrics when set to the sketches endpoint.
1391        let counter = Metric::counter("counter", 1.0);
1392        let rate = Metric::rate("rate", 1.0, Duration::from_secs(1));
1393        let gauge = Metric::gauge("gauge", 1.0);
1394        let set = Metric::set("set", "foo");
1395        let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1396        let distribution = Metric::distribution("distribution", [1.0, 2.0, 3.0]);
1397
1398        let series_v1 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV1);
1399        let series_v2 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1400        let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1401
1402        for series_endpoint in [&series_v1, &series_v2] {
1403            assert!(series_endpoint.is_valid_input(&counter));
1404            assert!(series_endpoint.is_valid_input(&rate));
1405            assert!(series_endpoint.is_valid_input(&gauge));
1406            assert!(series_endpoint.is_valid_input(&set));
1407            assert!(!series_endpoint.is_valid_input(&histogram));
1408            assert!(!series_endpoint.is_valid_input(&distribution));
1409        }
1410
1411        assert!(!sketches_endpoint.is_valid_input(&counter));
1412        assert!(!sketches_endpoint.is_valid_input(&rate));
1413        assert!(!sketches_endpoint.is_valid_input(&gauge));
1414        assert!(!sketches_endpoint.is_valid_input(&set));
1415        assert!(sketches_endpoint.is_valid_input(&histogram));
1416        assert!(sketches_endpoint.is_valid_input(&distribution));
1417    }
1418
1419    #[test]
1420    fn input_data_point_count_tracks_metric_values() {
1421        let counter = Metric::counter("counter", [(123, 1.0), (124, 2.0)]);
1422        let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1423
1424        let series_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1425        let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1426
1427        assert_eq!(series_endpoint.input_data_point_count(&counter), 2);
1428        assert_eq!(sketches_endpoint.input_data_point_count(&histogram), 1);
1429    }
1430
1431    #[test]
1432    fn series_metric_unit_encoded() {
1433        // A gauge with a unit in its metadata must produce a series protobuf payload that contains the unit string
1434        // in field 6 (MetricSeries.unit), which the Datadog backend already accepts.
1435        //
1436        // In production this state is reached when histogram aggregation flushes timer (`ms`) statistics as gauges,
1437        // each carrying unit = "millisecond" propagated through MetricMetadata.
1438        let context = Context::from_static_parts("my.timer.avg", &[]);
1439        let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1440        let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1441
1442        let host_tags = SharedTagSet::default();
1443        let mut scratch_buf = Vec::new();
1444        let mut tags_deduplicator = ReusableDeduplicator::new();
1445
1446        let mut payload = Vec::new();
1447        {
1448            let mut writer = CodedOutputStream::vec(&mut payload);
1449            encode_series_v2_metric(
1450                &gauge,
1451                &host_tags,
1452                &mut writer,
1453                &mut scratch_buf,
1454                &mut tags_deduplicator,
1455            )
1456            .expect("Failed to encode gauge as series metric");
1457            writer.flush().expect("Failed to flush");
1458        }
1459
1460        // In the protobuf wire format, a string field with field number 6 has tag byte 0x32 ((6 << 3) | 2).
1461        // The tag is followed by a varint length and then the UTF-8 bytes of the string.
1462        let expected_tag: u8 = (6 << 3) | 2; // 0x32
1463        let expected_value = b"millisecond";
1464
1465        let tag_pos = payload
1466            .windows(1 + 1 + expected_value.len())
1467            .position(|w| w[0] == expected_tag && w[1] == expected_value.len() as u8 && &w[2..] == expected_value);
1468
1469        assert!(
1470            tag_pos.is_some(),
1471            "series payload should contain unit field (field 6 = 'millisecond'), got bytes: {:?}",
1472            payload
1473        );
1474    }
1475
1476    #[test]
1477    fn series_v1_basic_payload_shape() {
1478        // Each metric variant maps to the right `type` string, points are emitted as [ts, value] tuples,
1479        // and `interval`/`host` are always present (zero/empty when not set).
1480        let counter = Metric::counter("my.count", 5.0);
1481        let counter_json = encode_one_v1(&counter);
1482        assert_eq!(counter_json["metric"], "my.count");
1483        assert_eq!(counter_json["type"], "count");
1484        assert_eq!(counter_json["interval"], 0);
1485        assert_eq!(counter_json["host"], "");
1486        assert_eq!(counter_json["tags"], JsonValue::Array(vec![]));
1487        let points = counter_json["points"].as_array().expect("points is array");
1488        assert_eq!(points.len(), 1);
1489        assert_eq!(points[0][0], 0);
1490        assert_eq!(points[0][1], 5.0);
1491        // Optional fields must be absent when not set.
1492        assert!(counter_json.get("unit").is_none());
1493        assert!(counter_json.get("source_type_name").is_none());
1494        assert!(counter_json.get("device").is_none());
1495
1496        let rate = Metric::rate("my.rate", 30.0, Duration::from_secs(10));
1497        let rate_json = encode_one_v1(&rate);
1498        assert_eq!(rate_json["type"], "rate");
1499        assert_eq!(rate_json["interval"], 10);
1500        // Rate value scaled by interval seconds: 30 / 10 = 3.
1501        let rate_points = rate_json["points"].as_array().expect("rate points is array");
1502        assert_eq!(rate_points[0][1], 3.0);
1503
1504        let gauge = Metric::gauge("my.gauge", 42.0);
1505        let gauge_json = encode_one_v1(&gauge);
1506        assert_eq!(gauge_json["type"], "gauge");
1507
1508        // Sets are encoded as gauges with the set cardinality as the value (consistent with V2).
1509        let set = Metric::set("my.set", "alpha");
1510        let set_json = encode_one_v1(&set);
1511        assert_eq!(set_json["type"], "gauge");
1512        let set_points = set_json["points"].as_array().expect("set points is array");
1513        assert_eq!(set_points[0][1], 1.0);
1514    }
1515
1516    #[test]
1517    fn series_v1_unit_and_hostname_emitted() {
1518        let context = Context::from_static_parts("my.timer.avg", &[]);
1519        let metadata = MetricMetadata::default()
1520            .with_unit(MetaString::from_static("millisecond"))
1521            .with_hostname(Some(Arc::from("host-1")));
1522        let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1523
1524        let json = encode_one_v1(&gauge);
1525        assert_eq!(json["unit"], "millisecond");
1526        assert_eq!(json["host"], "host-1");
1527    }
1528
1529    #[test]
1530    fn series_v1_device_tag_extraction() {
1531        // A `device:<value>` tag is extracted into the `device` JSON field and dropped from `tags`.
1532        let context = Context::from_static_parts("my.metric", &["device:eth0", "env:prod"]);
1533        let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), MetricMetadata::default());
1534
1535        let json = encode_one_v1(&counter);
1536        assert_eq!(json["device"], "eth0");
1537        let tags = json["tags"].as_array().expect("tags is array");
1538        let tag_strs: Vec<&str> = tags.iter().filter_map(|v| v.as_str()).collect();
1539        assert!(
1540            !tag_strs.iter().any(|t| t.starts_with("device:")),
1541            "device tag must be removed: {:?}",
1542            tag_strs
1543        );
1544        assert!(tag_strs.contains(&"env:prod"));
1545    }
1546
1547    #[test]
1548    fn series_v1_source_type_name_from_source_type_origin() {
1549        let context = Context::from_static_parts("my.metric", &[]);
1550        let metadata = MetricMetadata::default().with_source_type(Some(Arc::from("integration_x")));
1551        let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), metadata);
1552
1553        let json = encode_one_v1(&counter);
1554        assert_eq!(json["source_type_name"], "integration_x");
1555    }
1556
1557    #[test]
1558    fn series_v1_origin_metadata_dropped() {
1559        // OriginMetadata is V2-protobuf only; V1 must drop it.
1560        let context = Context::from_static_parts("my.metric", &[]);
1561        let metadata = MetricMetadata::default().with_origin(Some(MetricOrigin::dogstatsd()));
1562        let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), metadata);
1563
1564        let json = encode_one_v1(&counter);
1565        assert!(json.get("source_type_name").is_none());
1566    }
1567
1568    #[test]
1569    fn series_v1_dd_internal_resource_dropped() {
1570        // `dd.internal.resource` is V2-protobuf-only; V1 must drop these tags silently.
1571        let context = Context::from_static_parts("my.metric", &["dd.internal.resource:host:foo", "env:prod"]);
1572        let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), MetricMetadata::default());
1573
1574        let json = encode_one_v1(&counter);
1575        let tags = json["tags"].as_array().expect("tags is array");
1576        let tag_strs: Vec<&str> = tags.iter().filter_map(|v| v.as_str()).collect();
1577        assert!(
1578            !tag_strs.iter().any(|t| t.starts_with("dd.internal.resource:")),
1579            "dd.internal.resource tag must be dropped: {:?}",
1580            tag_strs
1581        );
1582        assert!(tag_strs.contains(&"env:prod"));
1583    }
1584
1585    #[test]
1586    fn series_v1_endpoint_routing() {
1587        // SeriesV1 advertises the V1 URI, JSON content type, and the {"series":[...]} framing.
1588        let encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV1);
1589        assert_eq!(encoder.endpoint_uri().path(), "/api/v1/series");
1590        assert_eq!(encoder.content_type(), "application/json");
1591        assert_eq!(encoder.get_payload_prefix(), Some(SERIES_V1_PAYLOAD_PREFIX));
1592        assert_eq!(encoder.get_payload_suffix(), Some(SERIES_V1_PAYLOAD_SUFFIX));
1593        assert_eq!(encoder.get_input_separator(), Some(SERIES_V1_INPUT_SEPARATOR));
1594        assert_eq!(
1595            encoder.compressed_size_limit(),
1596            DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
1597        );
1598        assert_eq!(
1599            encoder.uncompressed_size_limit(),
1600            DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
1601        );
1602
1603        // Sketches use the generic serializer payload limits in the Datadog Agent.
1604        let sketches = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1605        assert_eq!(
1606            sketches.compressed_size_limit(),
1607            DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
1608        );
1609        assert_eq!(
1610            sketches.uncompressed_size_limit(),
1611            DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
1612        );
1613
1614        // V2 series stays on protobuf with no framing.
1615        let v2 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1616        assert_eq!(v2.endpoint_uri().path(), "/api/v2/series");
1617        assert_eq!(v2.content_type(), "application/x-protobuf");
1618        assert!(v2.get_payload_prefix().is_none());
1619    }
1620}
1621
1622#[cfg(test)]
1623mod config_smoke {
1624    use serde_json::json;
1625
1626    use super::DatadogMetricsConfiguration;
1627    use crate::config_registry::structs;
1628    use crate::config_registry::test_support::run_config_smoke_tests;
1629
1630    #[tokio::test]
1631    async fn smoke_test() {
1632        run_config_smoke_tests(structs::DATADOG_METRICS_CONFIGURATION, &[], json!({}), |cfg| {
1633            cfg.as_typed::<DatadogMetricsConfiguration>()
1634                .expect("DatadogMetricsConfiguration should deserialize")
1635        })
1636        .await
1637    }
1638}
1639
1640#[cfg(test)]
1641mod use_v2_api_series_default {
1642    use saluki_config::ConfigurationLoader;
1643    use serde_json::json;
1644
1645    use super::{DatadogMetricsConfiguration, SERIES_V2_COMPRESSED_SIZE_LIMIT, SERIES_V2_UNCOMPRESSED_SIZE_LIMIT};
1646    use crate::{common::datadog::clamp_payload_limits, config::KEY_ALIASES};
1647
1648    /// `use_v2_api_series` defaults to `true` (preserves V2 protobuf behavior when the flag is absent).
1649    /// The nested-form (`use_v2_api.series`) and env-var (`DD_USE_V2_API_SERIES`) paths to the flat key
1650    /// are exercised end-to-end by the `config_smoke::smoke_test` runner via `KEY_ALIASES`.
1651    #[tokio::test]
1652    async fn defaults_to_true_when_absent() {
1653        let cfg = ConfigurationLoader::default()
1654            .with_key_aliases(KEY_ALIASES)
1655            .add_providers([figment::providers::Serialized::defaults(json!({}))])
1656            .into_generic()
1657            .await
1658            .expect("config should load");
1659        let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1660        assert!(parsed.use_v2_api_series);
1661    }
1662
1663    #[tokio::test]
1664    async fn deserializes_payload_limit_keys() {
1665        let cfg = ConfigurationLoader::default()
1666            .with_key_aliases(KEY_ALIASES)
1667            .add_providers([figment::providers::Serialized::defaults(json!({
1668                "serializer_max_payload_size": 4321,
1669                "serializer_max_uncompressed_payload_size": 8765,
1670                "serializer_max_series_payload_size": 1234,
1671                "serializer_max_series_uncompressed_payload_size": 5678,
1672            }))])
1673            .into_generic()
1674            .await
1675            .expect("config should load");
1676        let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1677
1678        assert_eq!(parsed.max_payload_size, 4321);
1679        assert_eq!(parsed.max_uncompressed_payload_size, 8765);
1680        assert_eq!(parsed.max_series_payload_size, 1234);
1681        assert_eq!(parsed.max_series_uncompressed_payload_size, 5678);
1682    }
1683
1684    #[test]
1685    fn clamps_series_payload_limit_keys_to_api_limits() {
1686        let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
1687            SERIES_V2_UNCOMPRESSED_SIZE_LIMIT + 1,
1688            SERIES_V2_COMPRESSED_SIZE_LIMIT + 1,
1689            SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
1690            SERIES_V2_COMPRESSED_SIZE_LIMIT,
1691        );
1692        assert_eq!(uncompressed_limit, SERIES_V2_UNCOMPRESSED_SIZE_LIMIT);
1693        assert_eq!(compressed_limit, SERIES_V2_COMPRESSED_SIZE_LIMIT);
1694
1695        let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
1696            5678,
1697            1234,
1698            SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
1699            SERIES_V2_COMPRESSED_SIZE_LIMIT,
1700        );
1701        assert_eq!(uncompressed_limit, 5678);
1702        assert_eq!(compressed_limit, 1234);
1703    }
1704}