Skip to main content

saluki_components/encoders/datadog/metrics/
mod.rs

1use std::{fmt, num::NonZeroU64, time::Duration};
2
3use async_trait::async_trait;
4use datadog_protos::metrics as proto;
5use ddsketch::DDSketch;
6use facet::Facet;
7use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
8use protobuf::{rt::WireType, CodedOutputStream, Enum as _};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use saluki_common::{iter::ReusableDeduplicator, task::HandleExt as _};
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::{SharedTagSet, Tag};
13use saluki_core::{
14    components::{encoders::*, ComponentContext},
15    data_model::{
16        event::{
17            metric::{Metric, MetricOrigin, MetricValues},
18            EventType,
19        },
20        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
21    },
22    observability::ComponentMetricsExt as _,
23    topology::{EventsBuffer, PayloadsBuffer},
24};
25use saluki_error::{generic_error, ErrorContext as _, GenericError};
26use saluki_io::compression::CompressionScheme;
27use saluki_metrics::MetricsBuilder;
28use serde::Deserialize;
29use serde_json::{Map as JsonMap, Number as JsonNumber, Value as JsonValue};
30use tokio::{pin, select, sync::mpsc, time::sleep};
31use tracing::{debug, error, warn};
32
33use crate::common::datadog::{
34    clamp_payload_limits,
35    io::RB_BUFFER_CHUNK_SIZE,
36    request_builder::{EndpointEncoder, RequestBuilder},
37    telemetry::ComponentTelemetry,
38    DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT, DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT, METRICS_SERIES_V1_PATH,
39    METRICS_SERIES_V2_PATH, METRICS_SKETCHES_PATH,
40};
41
42const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; // 500 KiB
43const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; // 5 MiB
44
45// V1 series JSON endpoint limits match the Datadog Agent's generic serializer defaults.
46const SERIES_V1_COMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT;
47const SERIES_V1_UNCOMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT;
48
49const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
50
51// Protocol Buffers field numbers for series and sketch payload messages.
52//
53// These field numbers come from the Protocol Buffers definitions in `lib/datadog-protos/proto/agent_payload.proto`.
54const RESOURCES_TYPE_FIELD_NUMBER: u32 = 1;
55const RESOURCES_NAME_FIELD_NUMBER: u32 = 2;
56
57const METADATA_ORIGIN_FIELD_NUMBER: u32 = 1;
58
59const ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER: u32 = 4;
60const ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER: u32 = 5;
61const ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER: u32 = 6;
62
63const METRIC_POINT_VALUE_FIELD_NUMBER: u32 = 1;
64const METRIC_POINT_TIMESTAMP_FIELD_NUMBER: u32 = 2;
65
66const DOGSKETCH_TS_FIELD_NUMBER: u32 = 1;
67const DOGSKETCH_CNT_FIELD_NUMBER: u32 = 2;
68const DOGSKETCH_MIN_FIELD_NUMBER: u32 = 3;
69const DOGSKETCH_MAX_FIELD_NUMBER: u32 = 4;
70const DOGSKETCH_AVG_FIELD_NUMBER: u32 = 5;
71const DOGSKETCH_SUM_FIELD_NUMBER: u32 = 6;
72const DOGSKETCH_K_FIELD_NUMBER: u32 = 7;
73const DOGSKETCH_N_FIELD_NUMBER: u32 = 8;
74
75const SERIES_RESOURCES_FIELD_NUMBER: u32 = 1;
76const SERIES_METRIC_FIELD_NUMBER: u32 = 2;
77const SERIES_TAGS_FIELD_NUMBER: u32 = 3;
78const SERIES_POINTS_FIELD_NUMBER: u32 = 4;
79const SERIES_TYPE_FIELD_NUMBER: u32 = 5;
80const SERIES_UNIT_FIELD_NUMBER: u32 = 6;
81const SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 7;
82const SERIES_INTERVAL_FIELD_NUMBER: u32 = 8;
83const SERIES_METADATA_FIELD_NUMBER: u32 = 9;
84
85const SKETCH_METRIC_FIELD_NUMBER: u32 = 1;
86const SKETCH_HOST_FIELD_NUMBER: u32 = 2;
87const SKETCH_TAGS_FIELD_NUMBER: u32 = 4;
88const SKETCH_DOGSKETCHES_FIELD_NUMBER: u32 = 7;
89const SKETCH_METADATA_FIELD_NUMBER: u32 = 8;
90
91static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
92static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
93
94// JSON framing for the V1 series payload, which wraps the array of `Serie` objects in a top-level object.
95const SERIES_V1_PAYLOAD_PREFIX: &[u8] = b"{\"series\":[";
96const SERIES_V1_PAYLOAD_SUFFIX: &[u8] = b"]}";
97const SERIES_V1_INPUT_SEPARATOR: &[u8] = b",";
98
99const fn default_max_metrics_per_payload() -> usize {
100    10_000
101}
102
103const fn default_max_payload_size() -> usize {
104    DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
105}
106
107const fn default_max_uncompressed_payload_size() -> usize {
108    DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
109}
110
111const fn default_max_series_payload_size() -> usize {
112    SERIES_V2_COMPRESSED_SIZE_LIMIT
113}
114
115const fn default_max_series_uncompressed_payload_size() -> usize {
116    SERIES_V2_UNCOMPRESSED_SIZE_LIMIT
117}
118
119const fn default_max_series_points_per_payload() -> usize {
120    10_000
121}
122
123const fn default_flush_timeout_secs() -> u64 {
124    2
125}
126
127fn default_serializer_compressor_kind() -> String {
128    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
129}
130
131const fn default_zstd_compressor_level() -> i32 {
132    3
133}
134
135const fn default_use_v2_api_series() -> bool {
136    true
137}
138
139const fn default_log_payloads() -> bool {
140    false
141}
142
143/// Datadog Metrics encoder.
144///
145/// Generates Datadog metrics payloads for the Datadog platform.
146#[derive(Clone, Deserialize, Facet)]
147#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
148#[allow(dead_code)]
149pub struct DatadogMetricsConfiguration {
150    /// Maximum number of input metrics to encode into a single request payload.
151    ///
152    /// This applies both to the series and sketches endpoints.
153    ///
154    /// Defaults to 10,000.
155    #[serde(
156        rename = "serializer_max_metrics_per_payload",
157        default = "default_max_metrics_per_payload"
158    )]
159    max_metrics_per_payload: usize,
160
161    /// Maximum compressed size, in bytes, of generic payloads.
162    ///
163    /// This applies to V1 JSON series payloads and sketch payloads, matching the Datadog Agent's generic payload
164    /// builder. V2 series payloads use `serializer_max_series_payload_size` instead. The effective value is clamped to
165    /// the Agent's default intake-safe limit of 2,621,440 bytes, so larger configured values do not allow payloads that
166    /// intake may reject. If set to `0`, every non-empty compressed payload exceeds the limit and is dropped during
167    /// flush.
168    ///
169    /// Defaults to 2,621,440 bytes.
170    #[serde(rename = "serializer_max_payload_size", default = "default_max_payload_size")]
171    max_payload_size: usize,
172
173    /// Maximum uncompressed size, in bytes, of generic payloads.
174    ///
175    /// This applies to V1 JSON series payloads and sketch payloads, matching the Datadog Agent's generic payload
176    /// builder. V2 series payloads use `serializer_max_series_uncompressed_payload_size` instead. The effective value
177    /// is clamped to the Agent's default intake-safe limit of 4,194,304 bytes, so larger configured values do not allow
178    /// payloads that intake may reject. Values smaller than the minimum endpoint framing size prevent the request
179    /// builder from starting.
180    ///
181    /// Defaults to 4,194,304 bytes.
182    #[serde(
183        rename = "serializer_max_uncompressed_payload_size",
184        default = "default_max_uncompressed_payload_size"
185    )]
186    max_uncompressed_payload_size: usize,
187
188    /// Maximum compressed size, in bytes, of a V2 series payload.
189    ///
190    /// This applies only when `use_v2_api.series` is `true`. V1 series and sketches use `serializer_max_payload_size`
191    /// instead. The effective value is clamped to the V2 series API limit of 512,000 bytes, so larger configured values
192    /// do not allow payloads that intake would reject. High-throughput workloads may increase this up to that API limit
193    /// to reduce request count, at the cost of larger individual requests. If set to `0`, every non-empty compressed
194    /// payload exceeds the limit and is dropped during flush.
195    ///
196    /// Defaults to 512,000 bytes.
197    #[serde(
198        rename = "serializer_max_series_payload_size",
199        default = "default_max_series_payload_size"
200    )]
201    max_series_payload_size: usize,
202
203    /// Maximum uncompressed size, in bytes, of a V2 series payload.
204    ///
205    /// This applies only when `use_v2_api.series` is `true`. V1 series and sketches use
206    /// `serializer_max_uncompressed_payload_size` instead. The effective value is clamped to the V2 series API limit of
207    /// 5,242,880 bytes, so larger configured values do not allow payloads that intake would reject. This limit protects
208    /// the encoder before compression, so compressed payload size may still force a separate flush. Values smaller than
209    /// the minimum endpoint framing size prevent the request builder from starting.
210    ///
211    /// Defaults to 5,242,880 bytes.
212    #[serde(
213        rename = "serializer_max_series_uncompressed_payload_size",
214        default = "default_max_series_uncompressed_payload_size"
215    )]
216    max_series_uncompressed_payload_size: usize,
217
218    /// Maximum number of data points, across all series, to encode into a single series request payload.
219    ///
220    /// This applies only to series metrics (counters, gauges, rates, sets) and not to sketch metrics (histograms,
221    /// distributions). A single metric series may contribute multiple data points when it carries more than one
222    /// timestamp/value pair. When encoding an input would cause the running data point total to exceed this limit, the
223    /// current payload is flushed first and the input is placed in the next payload.
224    ///
225    /// Defaults to 10,000.
226    #[serde(
227        rename = "serializer_max_series_points_per_payload",
228        default = "default_max_series_points_per_payload"
229    )]
230    max_series_points_per_payload: usize,
231
232    /// Flush timeout for pending requests, in seconds.
233    ///
234    /// When the destination has written metrics to the in-flight request payload, but it hasn't yet reached the
235    /// payload size limits that would force the payload to be flushed, the destination will wait for a period of time
236    /// before flushing the in-flight request payload. This allows for the possibility of other events to be processed
237    /// and written into the request payload, thereby maximizing the payload size and reducing the number of requests
238    /// generated and sent overall.
239    ///
240    /// Defaults to 2 seconds.
241    #[serde(default = "default_flush_timeout_secs")]
242    flush_timeout_secs: u64,
243
244    /// Compression kind to use for the request payloads.
245    ///
246    /// Defaults to `zstd`.
247    #[serde(
248        rename = "serializer_compressor_kind",
249        default = "default_serializer_compressor_kind"
250    )]
251    compressor_kind: String,
252
253    /// Compressor level to use when the compressor kind is `zstd`.
254    ///
255    /// Defaults to 3.
256    #[serde(
257        rename = "serializer_zstd_compressor_level",
258        default = "default_zstd_compressor_level"
259    )]
260    zstd_compressor_level: i32,
261
262    /// Whether to use the V2 API for series metrics.
263    ///
264    /// When `true` (the default), series metrics are sent to the V2 protobuf endpoint (`/api/v2/series`). When
265    /// `false`, series metrics are sent to the legacy V1 JSON endpoint (`/api/v1/series`). Sketch metrics always use
266    /// the V2 endpoint (`/api/beta/sketches`) regardless of this setting.
267    ///
268    /// Defaults to `true`.
269    #[serde(default = "default_use_v2_api_series")]
270    use_v2_api_series: bool,
271
272    /// Whether to log metric payload contents before encoding.
273    ///
274    /// This logs decoded metric objects, not the encoded JSON/protobuf HTTP body.
275    ///
276    /// Defaults to `false`.
277    #[serde(default = "default_log_payloads")]
278    log_payloads: bool,
279
280    /// Additional tags to apply to all forwarded metrics.
281    #[serde(default, skip)]
282    #[facet(opaque)]
283    additional_tags: Option<SharedTagSet>,
284}
285
286impl DatadogMetricsConfiguration {
287    /// Creates a new `DatadogMetricsConfiguration` from the given configuration.
288    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
289        Ok(config.as_typed()?)
290    }
291
292    /// Sets additional tags to be applied uniformly to all metrics forwarded by this destination.
293    pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
294        // Add the additional tags to the forwarder configuration.
295        self.additional_tags = Some(additional_tags);
296        self
297    }
298}
299
300#[async_trait]
301impl EncoderBuilder for DatadogMetricsConfiguration {
302    fn input_event_type(&self) -> EventType {
303        EventType::Metric
304    }
305
306    fn output_payload_type(&self) -> PayloadType {
307        PayloadType::Http
308    }
309
310    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
311        let metrics_builder = MetricsBuilder::from_component_context(&context);
312        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
313        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
314
315        // Create our request builders.
316        let series_endpoint = if self.use_v2_api_series {
317            MetricsEndpoint::SeriesV2
318        } else {
319            MetricsEndpoint::SeriesV1
320        };
321        let mut series_encoder = MetricsEndpointEncoder::from_endpoint(series_endpoint);
322        let mut sketches_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
323
324        if let Some(additional_tags) = self.additional_tags.as_ref() {
325            series_encoder = series_encoder.with_additional_tags(additional_tags.clone());
326            sketches_encoder = sketches_encoder.with_additional_tags(additional_tags.clone());
327        }
328
329        let mut series_rb = RequestBuilder::new(series_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
330        series_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
331        series_rb.with_max_data_points_per_payload(self.max_series_points_per_payload);
332
333        let generic_payload_limits = clamp_payload_limits(
334            self.max_uncompressed_payload_size,
335            self.max_payload_size,
336            DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
337            DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
338        );
339        let (series_uncompressed_limit, series_compressed_limit) = if series_endpoint == MetricsEndpoint::SeriesV2 {
340            clamp_payload_limits(
341                self.max_series_uncompressed_payload_size,
342                self.max_series_payload_size,
343                SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
344                SERIES_V2_COMPRESSED_SIZE_LIMIT,
345            )
346        } else {
347            generic_payload_limits
348        };
349        series_rb.with_len_limits(series_uncompressed_limit, series_compressed_limit)?;
350
351        let mut sketches_rb = RequestBuilder::new(sketches_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
352        sketches_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
353        let (sketches_uncompressed_limit, sketches_compressed_limit) = generic_payload_limits;
354        sketches_rb.with_len_limits(sketches_uncompressed_limit, sketches_compressed_limit)?;
355
356        let flush_timeout = match self.flush_timeout_secs {
357            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
358            // batching, while still practically flushing things almost immediately.
359            0 => Duration::from_millis(10),
360            secs => Duration::from_secs(secs),
361        };
362
363        Ok(Box::new(DatadogMetrics {
364            series_rb,
365            sketches_rb,
366            telemetry,
367            flush_timeout,
368            log_payloads: self.log_payloads,
369        }))
370    }
371}
372
373impl MemoryBounds for DatadogMetricsConfiguration {
374    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
375        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
376        //
377        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
378        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
379        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
380        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
381        // calculate the firm bound.
382        builder
383            .minimum()
384            .with_single_value::<DatadogMetrics>("component struct")
385            .with_array::<EventsBuffer>("request builder events channel", 8)
386            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
387
388        builder
389            .firm()
390            // Capture the size of the "split re-encode" buffers in the request builders, which is where we keep owned
391            // versions of metrics that we encode in case we need to actually re-encode them during a split operation.
392            .with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
393            .with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
394    }
395}
396
397pub struct DatadogMetrics {
398    series_rb: RequestBuilder<MetricsEndpointEncoder>,
399    sketches_rb: RequestBuilder<MetricsEndpointEncoder>,
400    telemetry: ComponentTelemetry,
401    flush_timeout: Duration,
402    log_payloads: bool,
403}
404
405#[async_trait]
406impl Encoder for DatadogMetrics {
407    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
408        let Self {
409            series_rb,
410            sketches_rb,
411            telemetry,
412            flush_timeout,
413            log_payloads,
414        } = *self;
415
416        let mut health = context.take_health_handle();
417
418        // Spawn our request builder task.
419        let (events_tx, events_rx) = mpsc::channel(8);
420        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
421        let request_builder_fut = run_request_builder(
422            series_rb,
423            sketches_rb,
424            telemetry,
425            events_rx,
426            payloads_tx,
427            flush_timeout,
428            log_payloads,
429        );
430        let request_builder_handle = context
431            .topology_context()
432            .global_thread_pool()
433            .spawn_traced_named("dd-metrics-request-builder", request_builder_fut);
434
435        health.mark_ready();
436        debug!("Datadog Metrics encoder started.");
437
438        loop {
439            select! {
440                biased;
441
442                _ = health.live() => continue,
443                maybe_payload = payloads_rx.recv() => match maybe_payload {
444                    Some(payload) => {
445                        if let Err(e) = context.dispatcher().dispatch(payload).await {
446                            error!("Failed to dispatch payload: {}", e);
447                        }
448                    }
449                    None => break,
450                },
451                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
452                    Some(event_buffer) => events_tx.send(event_buffer).await
453                        .error_context("Failed to send event buffer to request builder task.")?,
454                    None => break,
455                },
456            }
457        }
458
459        // Drop the events sender, which signals the request builder task to stop.
460        drop(events_tx);
461
462        // Continue draining the payloads receiver until it is closed.
463        while let Some(payload) = payloads_rx.recv().await {
464            if let Err(e) = context.dispatcher().dispatch(payload).await {
465                error!("Failed to dispatch payload: {}", e);
466            }
467        }
468
469        // Request build task should now be stopped.
470        match request_builder_handle.await {
471            Ok(Ok(())) => debug!("Request builder task stopped."),
472            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
473            Err(e) => error!(error = %e, "Request builder task panicked."),
474        }
475
476        debug!("Datadog Metrics encoder stopped.");
477
478        Ok(())
479    }
480}
481
482async fn run_request_builder(
483    mut series_request_builder: RequestBuilder<MetricsEndpointEncoder>,
484    mut sketches_request_builder: RequestBuilder<MetricsEndpointEncoder>, telemetry: ComponentTelemetry,
485    mut events_rx: mpsc::Receiver<EventsBuffer>, payloads_tx: mpsc::Sender<PayloadsBuffer>, flush_timeout: Duration,
486    log_payloads: bool,
487) -> Result<(), GenericError> {
488    let mut pending_flush = false;
489    let pending_flush_timeout = sleep(flush_timeout);
490    pin!(pending_flush_timeout);
491
492    loop {
493        select! {
494            Some(event_buffer) = events_rx.recv() => {
495                for event in event_buffer {
496                    let metric = match event.try_into_metric() {
497                        Some(metric) => metric,
498                        None => continue,
499                    };
500
501                    if log_payloads {
502                        log_metric_payload(&metric);
503                    }
504
505                    // Series metrics (counters, gauges, rates, sets) and sketch metrics (histograms, distributions)
506                    // route to their respective request builders. Whether the series builder targets the V1 or V2
507                    // intake is decided once at builder time based on `use_v2_api_series`.
508                    let request_builder = match metric.values() {
509                        MetricValues::Counter(..)
510                        | MetricValues::Rate(..)
511                        | MetricValues::Gauge(..)
512                        | MetricValues::Set(..) => &mut series_request_builder,
513                        MetricValues::Histogram(..) | MetricValues::Distribution(..) => &mut sketches_request_builder,
514                    };
515
516                    // Encode the metric. If we get it back, that means the current request is full, and we need to
517                    // flush it before we can try to encode the metric again... so we'll hold on to it in that case
518                    // before flushing and trying to encode it again.
519                    let metric_to_retry = match request_builder.encode(metric).await {
520                        Ok(None) => continue,
521                        Ok(Some(metric)) => metric,
522                        Err(e) => {
523                            error!(error = %e, "Failed to encode metric.");
524                            telemetry.events_dropped_encoder().increment(1);
525                            continue;
526                        }
527                    };
528
529                    let maybe_requests = request_builder.flush().await;
530                    if maybe_requests.is_empty() {
531                        panic!("builder told us to flush, but gave us nothing");
532                    }
533
534                    for maybe_request in maybe_requests {
535                        match maybe_request {
536                            Ok((events, data_points, request)) => {
537                                let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
538                                let http_payload = HttpPayload::new(payload_meta, request);
539                                let payload = Payload::Http(http_payload);
540
541                                payloads_tx.send(payload).await
542                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
543                            },
544
545                            // TODO: Increment a counter here that metrics were dropped due to a flush failure.
546                            Err(e) => if e.is_recoverable() {
547                                // If the error is recoverable, we'll hold on to the metric to retry it later.
548                                continue;
549                            } else {
550                                return Err(GenericError::from(e).context("Failed to flush request."));
551                            }
552                        }
553                    }
554
555                    // Now try to encode the metric again. If it fails again, we'll just log it because it shouldn't
556                    // be possible to fail at this point, otherwise we would have already caught that the first
557                    // time.
558                    if let Err(e) = request_builder.encode(metric_to_retry).await {
559                        error!(error = %e, "Failed to encode metric.");
560                        telemetry.events_dropped_encoder().increment(1);
561                    }
562                }
563
564                debug!("Processed event buffer.");
565
566                // If we're not already pending a flush, we'll start the countdown.
567                if !pending_flush {
568                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
569                    pending_flush = true;
570                }
571            },
572            _ = &mut pending_flush_timeout, if pending_flush => {
573                debug!("Flushing pending request(s).");
574
575                pending_flush = false;
576
577                // Once we've encoded and written all metrics, we flush the request builders to generate a request with
578                // anything left over. Again, we'll enqueue those requests to be sent immediately.
579                let maybe_series_requests = series_request_builder.flush().await;
580                for maybe_request in maybe_series_requests {
581                    match maybe_request {
582                        Ok((events, data_points, request)) => {
583                            let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
584                            let http_payload = HttpPayload::new(payload_meta, request);
585                            let payload = Payload::Http(http_payload);
586
587                            payloads_tx.send(payload).await
588                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
589                        },
590
591                        // TODO: Increment a counter here that metrics were dropped due to a flush failure.
592                        Err(e) => if e.is_recoverable() {
593                            // If the error is recoverable, we'll hold on to the metric to retry it later.
594                            continue;
595                        } else {
596                            return Err(GenericError::from(e).context("Failed to flush request."));
597                        }
598                    }
599                }
600
601                let maybe_sketches_requests = sketches_request_builder.flush().await;
602                for maybe_request in maybe_sketches_requests {
603                    match maybe_request {
604                        Ok((events, data_points, request)) => {
605                            let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
606                            let http_payload = HttpPayload::new(payload_meta, request);
607                            let payload = Payload::Http(http_payload);
608
609                            payloads_tx.send(payload).await
610                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
611                        },
612
613                        // TODO: Increment a counter here that metrics were dropped due to a flush failure.
614                        Err(e) => if e.is_recoverable() {
615                            // If the error is recoverable, we'll hold on to the metric to retry it later.
616                            continue;
617                        } else {
618                            return Err(GenericError::from(e).context("Failed to flush request."));
619                        }
620                    }
621                }
622
623                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
624            },
625
626            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
627            else => break,
628        }
629    }
630
631    Ok(())
632}
633
634fn log_metric_payload(metric: &Metric) {
635    match metric.values() {
636        MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
637            debug!(?metric, "Flushing series metric.")
638        }
639        MetricValues::Histogram(..) | MetricValues::Distribution(..) => {
640            debug!(?metric, "Flushing sketch metric.")
641        }
642    }
643}
644
645/// Metrics intake endpoint.
646#[derive(Clone, Copy, Debug, Eq, PartialEq)]
647enum MetricsEndpoint {
648    /// V1 series metrics, encoded as JSON and sent to `/api/v1/series`.
649    ///
650    /// Includes counters, gauges, rates, and sets. Selected when `use_v2_api.series` is `false`.
651    SeriesV1,
652
653    /// V2 series metrics, encoded as Protocol Buffers and sent to `/api/v2/series`.
654    ///
655    /// Includes counters, gauges, rates, and sets. The default series encoding.
656    SeriesV2,
657
658    /// Sketch metrics, encoded as Protocol Buffers and sent to `/api/beta/sketches`.
659    ///
660    /// Includes histograms and distributions. Always uses the V2 endpoint regardless of `use_v2_api.series`.
661    Sketches,
662}
663
664/// Error returned when a metric fails to encode for either the V1 JSON or V2 protobuf intake.
665#[derive(Debug)]
666pub enum MetricsEncodeError {
667    /// Protobuf encoding failed.
668    Protobuf(protobuf::Error),
669
670    /// JSON encoding failed.
671    Json(serde_json::Error),
672}
673
674impl fmt::Display for MetricsEncodeError {
675    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
676        match self {
677            Self::Protobuf(e) => write!(f, "protobuf encode error: {}", e),
678            Self::Json(e) => write!(f, "json encode error: {}", e),
679        }
680    }
681}
682
683impl std::error::Error for MetricsEncodeError {
684    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
685        match self {
686            Self::Protobuf(e) => Some(e),
687            Self::Json(e) => Some(e),
688        }
689    }
690}
691
692impl From<protobuf::Error> for MetricsEncodeError {
693    fn from(value: protobuf::Error) -> Self {
694        Self::Protobuf(value)
695    }
696}
697
698impl From<serde_json::Error> for MetricsEncodeError {
699    fn from(value: serde_json::Error) -> Self {
700        Self::Json(value)
701    }
702}
703
704#[derive(Debug)]
705struct MetricsEndpointEncoder {
706    endpoint: MetricsEndpoint,
707    primary_scratch_buf: Vec<u8>,
708    secondary_scratch_buf: Vec<u8>,
709    packed_scratch_buf: Vec<u8>,
710    additional_tags: SharedTagSet,
711    tags_deduplicator: ReusableDeduplicator<Tag>,
712}
713
714impl MetricsEndpointEncoder {
715    /// Creates a new `MetricsEndpointEncoder` for the given endpoint.
716    pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self {
717        Self {
718            endpoint,
719            primary_scratch_buf: Vec::new(),
720            secondary_scratch_buf: Vec::new(),
721            packed_scratch_buf: Vec::new(),
722            additional_tags: SharedTagSet::default(),
723            tags_deduplicator: ReusableDeduplicator::new(),
724        }
725    }
726
727    /// Sets the additional tags to be included with every metric encoded by this encoder.
728    ///
729    /// These tags are added in a deduplicated fashion, the same as instrumented tags and origin tags. This is an
730    /// optimized codepath for tag inclusion in high-volume scenarios, where creating new additional contexts
731    /// through the traditional means (for example, `ContextResolver`) would be too expensive.
732    pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
733        self.additional_tags = additional_tags;
734        self
735    }
736}
737
738impl EndpointEncoder for MetricsEndpointEncoder {
739    type Input = Metric;
740    type EncodeError = MetricsEncodeError;
741
742    fn encoder_name() -> &'static str {
743        "metrics"
744    }
745
746    fn compressed_size_limit(&self) -> usize {
747        match self.endpoint {
748            MetricsEndpoint::SeriesV1 => SERIES_V1_COMPRESSED_SIZE_LIMIT,
749            MetricsEndpoint::SeriesV2 => SERIES_V2_COMPRESSED_SIZE_LIMIT,
750            MetricsEndpoint::Sketches => DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
751        }
752    }
753
754    fn uncompressed_size_limit(&self) -> usize {
755        match self.endpoint {
756            MetricsEndpoint::SeriesV1 => SERIES_V1_UNCOMPRESSED_SIZE_LIMIT,
757            MetricsEndpoint::SeriesV2 => SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
758            MetricsEndpoint::Sketches => DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
759        }
760    }
761
762    fn input_data_point_count(&self, input: &Self::Input) -> usize {
763        input.values().len()
764    }
765
766    fn is_valid_input(&self, input: &Self::Input) -> bool {
767        let is_series_input = matches!(
768            input.values(),
769            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..)
770        );
771
772        match self.endpoint {
773            MetricsEndpoint::SeriesV1 | MetricsEndpoint::SeriesV2 => is_series_input,
774            MetricsEndpoint::Sketches => !is_series_input,
775        }
776    }
777
778    fn get_payload_prefix(&self) -> Option<&'static [u8]> {
779        match self.endpoint {
780            MetricsEndpoint::SeriesV1 => Some(SERIES_V1_PAYLOAD_PREFIX),
781            _ => None,
782        }
783    }
784
785    fn get_payload_suffix(&self) -> Option<&'static [u8]> {
786        match self.endpoint {
787            MetricsEndpoint::SeriesV1 => Some(SERIES_V1_PAYLOAD_SUFFIX),
788            _ => None,
789        }
790    }
791
792    fn get_input_separator(&self) -> Option<&'static [u8]> {
793        match self.endpoint {
794            MetricsEndpoint::SeriesV1 => Some(SERIES_V1_INPUT_SEPARATOR),
795            _ => None,
796        }
797    }
798
799    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
800        match self.endpoint {
801            MetricsEndpoint::SeriesV1 => {
802                encode_series_v1_metric(input, &self.additional_tags, buffer, &mut self.tags_deduplicator)?;
803                Ok(())
804            }
805            MetricsEndpoint::SeriesV2 | MetricsEndpoint::Sketches => {
806                // NOTE: We're passing _four_ buffers to `encode_single_metric`, which is a lot, but with good reason.
807                //
808                // The first buffer, `buffer`, is the overall output buffer: the caller expects us to put the full
809                // encoded metric payload into this buffer.
810                //
811                // The second and third buffers, `primary_scratch_buf` and `secondary_scratch_buf`, are used for
812                // roughly the same thing but deal with _nesting_. When writing a "message" in Protocol Buffers, the
813                // message data itself is prefixed with the field number and a length delimiter that specifies how
814                // long the message is. We can't write that length delimiter until we know the full size of the
815                // message, so we write the message to a scratch buffer, calculate its size, and then write the field
816                // number and length delimiter to the output buffer followed by the message data from the scratch
817                // buffer.
818                //
819                // We have _two_ scratch buffers because you need a dedicated buffer for each level of nested message.
820                // We have to be able to nest up to two levels deep in our metrics payload, so we need two scratch
821                // buffers to handle that.
822                //
823                // The fourth buffer, `packed_scratch_buf`, is used for writing out packed repeated fields. This is
824                // similar to the situation describe above, except it's not _exactly_ the same as an additional level
825                // of nesting.. so I just decided to give it a somewhat more descriptive name.
826                encode_single_metric(
827                    input,
828                    &self.additional_tags,
829                    buffer,
830                    &mut self.primary_scratch_buf,
831                    &mut self.secondary_scratch_buf,
832                    &mut self.packed_scratch_buf,
833                    &mut self.tags_deduplicator,
834                )?;
835                Ok(())
836            }
837        }
838    }
839
840    fn endpoint_uri(&self) -> Uri {
841        match self.endpoint {
842            MetricsEndpoint::SeriesV1 => PathAndQuery::from_static(METRICS_SERIES_V1_PATH).into(),
843            MetricsEndpoint::SeriesV2 => PathAndQuery::from_static(METRICS_SERIES_V2_PATH).into(),
844            MetricsEndpoint::Sketches => PathAndQuery::from_static(METRICS_SKETCHES_PATH).into(),
845        }
846    }
847
848    fn endpoint_method(&self) -> Method {
849        // All endpoints use POST.
850        Method::POST
851    }
852
853    fn content_type(&self) -> HeaderValue {
854        match self.endpoint {
855            MetricsEndpoint::SeriesV1 => CONTENT_TYPE_JSON.clone(),
856            MetricsEndpoint::SeriesV2 | MetricsEndpoint::Sketches => CONTENT_TYPE_PROTOBUF.clone(),
857        }
858    }
859}
860
861fn field_number_for_metric_type(metric: &Metric) -> u32 {
862    match metric.values() {
863        MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => 1,
864        MetricValues::Histogram(..) | MetricValues::Distribution(..) => 1,
865    }
866}
867
868fn get_message_size(raw_msg_size: usize) -> Result<u32, protobuf::Error> {
869    const MAX_MESSAGE_SIZE: u64 = i32::MAX as u64;
870
871    // Individual messages cannot be larger than `i32::MAX`, so check that here before proceeding.
872    if raw_msg_size as u64 > MAX_MESSAGE_SIZE {
873        return Err(std::io::Error::other("message size exceeds limit (2147483648 bytes)").into());
874    }
875
876    Ok(raw_msg_size as u32)
877}
878
879fn get_message_size_from_buffer(buf: &[u8]) -> Result<u32, protobuf::Error> {
880    get_message_size(buf.len())
881}
882
883fn encode_single_metric(
884    metric: &Metric, additional_tags: &SharedTagSet, output_buf: &mut Vec<u8>, primary_scratch_buf: &mut Vec<u8>,
885    secondary_scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
886    tags_deduplicator: &mut ReusableDeduplicator<Tag>,
887) -> Result<(), protobuf::Error> {
888    let mut output_stream = CodedOutputStream::vec(output_buf);
889    let field_number = field_number_for_metric_type(metric);
890
891    write_nested_message(&mut output_stream, primary_scratch_buf, field_number, |os| {
892        // Depending on the metric type, we write out the appropriate fields.
893        match metric.values() {
894            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
895                encode_series_v2_metric(metric, additional_tags, os, secondary_scratch_buf, tags_deduplicator)
896            }
897            MetricValues::Histogram(..) | MetricValues::Distribution(..) => encode_sketch_metric(
898                metric,
899                additional_tags,
900                os,
901                secondary_scratch_buf,
902                packed_scratch_buf,
903                tags_deduplicator,
904            ),
905        }
906    })
907}
908
909fn encode_series_v2_metric(
910    metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
911    scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
912) -> Result<(), protobuf::Error> {
913    // Write the metric name and tags.
914    output_stream.write_string(SERIES_METRIC_FIELD_NUMBER, metric.context().name())?;
915
916    let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
917    write_series_tags(deduplicated_tags, output_stream, scratch_buf)?;
918
919    // Set the host resource.
920    write_resource(
921        output_stream,
922        scratch_buf,
923        "host",
924        metric.metadata().hostname().unwrap_or_default(),
925    )?;
926
927    // Write the origin metadata, if it exists.
928    if let Some(origin) = metric.metadata().origin() {
929        match origin {
930            MetricOrigin::SourceType(source_type) => {
931                output_stream.write_string(SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER, source_type.as_ref())?;
932            }
933            MetricOrigin::OriginMetadata {
934                product,
935                subproduct,
936                product_detail,
937            } => {
938                write_origin_metadata(
939                    output_stream,
940                    scratch_buf,
941                    SERIES_METADATA_FIELD_NUMBER,
942                    *product,
943                    *subproduct,
944                    *product_detail,
945                )?;
946            }
947        }
948    }
949
950    // Now write out our metric type, points, and interval (if applicable).
951    let (metric_type, points, maybe_interval) = match metric.values() {
952        MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
953        MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
954        MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
955        MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
956        _ => unreachable!("encode_series_v2_metric called with non-series metric"),
957    };
958
959    output_stream.write_enum(SERIES_TYPE_FIELD_NUMBER, metric_type.value())?;
960
961    if let Some(unit) = metric.metadata().unit() {
962        output_stream.write_string(SERIES_UNIT_FIELD_NUMBER, unit)?;
963    }
964
965    for (timestamp, value) in points {
966        // If this is a rate metric, scale our value by the interval, in seconds.
967        let value = maybe_interval
968            .map(|interval| value / interval.as_secs_f64())
969            .unwrap_or(value);
970        let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
971
972        write_point(output_stream, scratch_buf, value, timestamp)?;
973    }
974
975    if let Some(interval) = maybe_interval {
976        output_stream.write_int64(SERIES_INTERVAL_FIELD_NUMBER, interval.as_secs() as i64)?;
977    }
978
979    Ok(())
980}
981
982fn encode_series_v1_metric(
983    metric: &Metric, additional_tags: &SharedTagSet, buffer: &mut Vec<u8>,
984    tags_deduplicator: &mut ReusableDeduplicator<Tag>,
985) -> Result<(), serde_json::Error> {
986    let mut obj = JsonMap::new();
987
988    obj.insert("metric".into(), JsonValue::String(metric.context().name().to_string()));
989
990    let (type_str, points_iter, maybe_interval) = match metric.values() {
991        MetricValues::Counter(points) => ("count", points.into_iter(), None),
992        MetricValues::Rate(points, interval) => ("rate", points.into_iter(), Some(*interval)),
993        MetricValues::Gauge(points) => ("gauge", points.into_iter(), None),
994        MetricValues::Set(points) => ("gauge", points.into_iter(), None),
995        _ => unreachable!("encode_series_v1_metric called with non-series metric"),
996    };
997
998    let mut points = Vec::new();
999    for (timestamp, value) in points_iter {
1000        // For rates, value is scaled by interval seconds — same as the V2 encoder.
1001        let value = maybe_interval
1002            .map(|interval| value / interval.as_secs_f64())
1003            .unwrap_or(value);
1004        let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
1005
1006        // V1 emits each point as a [timestamp, value] tuple — not a nested object.
1007        let value_json = JsonNumber::from_f64(value)
1008            .map(JsonValue::Number)
1009            .unwrap_or_else(|| JsonValue::from(0));
1010        points.push(JsonValue::Array(vec![JsonValue::from(timestamp), value_json]));
1011    }
1012    obj.insert("points".into(), JsonValue::Array(points));
1013
1014    // Walk the deduplicated tag set once, extracting the first `device:<value>` tag into the device JSON field while
1015    // dropping `dd.internal.resource` (which is a V2-protobuf-only concept with no V1 representation).
1016    let deduplicated = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
1017    let mut tags_out = Vec::new();
1018    let mut device: Option<String> = None;
1019    for tag in deduplicated {
1020        if tag.name() == "dd.internal.resource" {
1021            continue;
1022        }
1023        if device.is_none() && tag.name() == "device" {
1024            if let Some(v) = tag.value() {
1025                device = Some(v.to_string());
1026                continue;
1027            }
1028        }
1029        tags_out.push(JsonValue::String(tag.as_str().to_string()));
1030    }
1031    obj.insert("tags".into(), JsonValue::Array(tags_out));
1032
1033    // V1 always emits `host` and `interval`, even when empty/zero — matches the Agent encoder.
1034    obj.insert(
1035        "host".into(),
1036        JsonValue::String(metric.metadata().hostname().unwrap_or_default().to_string()),
1037    );
1038
1039    if let Some(d) = device.filter(|s| !s.is_empty()) {
1040        obj.insert("device".into(), JsonValue::String(d));
1041    }
1042
1043    obj.insert("type".into(), JsonValue::String(type_str.into()));
1044
1045    let interval_secs = maybe_interval.map(|iv| iv.as_secs() as i64).unwrap_or(0);
1046    obj.insert("interval".into(), JsonValue::from(interval_secs));
1047
1048    // V1 only emits `source_type_name` from `MetricOrigin::SourceType`.
1049    if let Some(MetricOrigin::SourceType(s)) = metric.metadata().origin() {
1050        obj.insert("source_type_name".into(), JsonValue::String(s.as_ref().to_string()));
1051    }
1052
1053    if let Some(unit) = metric.metadata().unit() {
1054        if !unit.is_empty() {
1055            obj.insert("unit".into(), JsonValue::String(unit.to_string()));
1056        }
1057    }
1058
1059    serde_json::to_writer(buffer, &JsonValue::Object(obj))
1060}
1061
1062fn encode_sketch_metric(
1063    metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
1064    scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
1065) -> Result<(), protobuf::Error> {
1066    // Write the metric name and tags.
1067    output_stream.write_string(SKETCH_METRIC_FIELD_NUMBER, metric.context().name())?;
1068
1069    let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
1070    write_sketch_tags(deduplicated_tags, output_stream, scratch_buf)?;
1071
1072    // Write the host.
1073    output_stream.write_string(
1074        SKETCH_HOST_FIELD_NUMBER,
1075        metric.metadata().hostname().unwrap_or_default(),
1076    )?;
1077
1078    // Set the origin metadata, if it exists.
1079    if let Some(MetricOrigin::OriginMetadata {
1080        product,
1081        subproduct,
1082        product_detail,
1083    }) = metric.metadata().origin()
1084    {
1085        write_origin_metadata(
1086            output_stream,
1087            scratch_buf,
1088            SKETCH_METADATA_FIELD_NUMBER,
1089            *product,
1090            *subproduct,
1091            *product_detail,
1092        )?;
1093    }
1094
1095    // TODO: emit `metric.metadata().unit()` in the sketch payload once the upstream `agent-payload` proto defines a
1096    // unit field on `SketchPayload.Sketch`.
1097
1098    // Write out our sketches.
1099    match metric.values() {
1100        MetricValues::Distribution(sketches) => {
1101            for (timestamp, value) in sketches {
1102                write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, value)?;
1103            }
1104        }
1105        MetricValues::Histogram(points) => {
1106            for (timestamp, histogram) in points {
1107                // We convert histograms to sketches to be able to write them out in the payload.
1108                let mut ddsketch = DDSketch::default();
1109                for sample in histogram.samples() {
1110                    ddsketch.insert_n(sample.value.into_inner(), sample.weight.0 as u64);
1111                }
1112
1113                write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
1114            }
1115        }
1116        _ => unreachable!("encode_sketch_metric called with non-sketch metric"),
1117    }
1118
1119    Ok(())
1120}
1121
1122fn write_resource(
1123    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, resource_type: &str, resource_name: &str,
1124) -> Result<(), protobuf::Error> {
1125    write_nested_message(output_stream, scratch_buf, SERIES_RESOURCES_FIELD_NUMBER, |os| {
1126        os.write_string(RESOURCES_TYPE_FIELD_NUMBER, resource_type)?;
1127        os.write_string(RESOURCES_NAME_FIELD_NUMBER, resource_name)
1128    })
1129}
1130
1131fn write_origin_metadata(
1132    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, origin_product: u32,
1133    origin_category: u32, origin_service: u32,
1134) -> Result<(), protobuf::Error> {
1135    // TODO: Figure out how to cleanly use `write_nested_message` here.
1136
1137    scratch_buf.clear();
1138
1139    {
1140        let mut origin_output_stream = CodedOutputStream::vec(scratch_buf);
1141        origin_output_stream.write_uint32(ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER, origin_product)?;
1142        origin_output_stream.write_uint32(ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER, origin_category)?;
1143        origin_output_stream.write_uint32(ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER, origin_service)?;
1144        origin_output_stream.flush()?;
1145    }
1146
1147    // We do a little song and dance here because the `Origin` message is embedded inside of `Metadata`, so we need to
1148    // write out field numbers/length delimiters in order: `Metadata`, and then `Origin`... but we write out origin
1149    // message to the scratch buffer first... so we write out our `Metadata` preamble stuff to get its length, and then
1150    // use that in conjunction with the `Origin` message size to write out the full `Metadata` message.
1151    let origin_message_size = get_message_size_from_buffer(scratch_buf)?;
1152
1153    let mut metadata_preamble_buf = [0; 64];
1154    let metadata_preamble_len = {
1155        let mut metadata_output_stream = CodedOutputStream::bytes(&mut metadata_preamble_buf[..]);
1156        metadata_output_stream.write_tag(METADATA_ORIGIN_FIELD_NUMBER, WireType::LengthDelimited)?;
1157        metadata_output_stream.write_raw_varint32(origin_message_size)?;
1158        metadata_output_stream.flush()?;
1159        metadata_output_stream.total_bytes_written() as usize
1160    };
1161
1162    let metadata_message_size = get_message_size(scratch_buf.len() + metadata_preamble_len)?;
1163
1164    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1165    output_stream.write_raw_varint32(metadata_message_size)?;
1166    output_stream.write_raw_bytes(&metadata_preamble_buf[..metadata_preamble_len])?;
1167    output_stream.write_raw_bytes(scratch_buf)
1168}
1169
1170fn write_point(
1171    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, value: f64, timestamp: i64,
1172) -> Result<(), protobuf::Error> {
1173    write_nested_message(output_stream, scratch_buf, SERIES_POINTS_FIELD_NUMBER, |os| {
1174        os.write_double(METRIC_POINT_VALUE_FIELD_NUMBER, value)?;
1175        os.write_int64(METRIC_POINT_TIMESTAMP_FIELD_NUMBER, timestamp)
1176    })
1177}
1178
1179fn write_dogsketch(
1180    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
1181    timestamp: Option<NonZeroU64>, sketch: &DDSketch,
1182) -> Result<(), protobuf::Error> {
1183    // If the sketch is empty, we don't write it out.
1184    if sketch.is_empty() {
1185        warn!("Attempted to write an empty sketch to sketches payload, skipping.");
1186        return Ok(());
1187    }
1188
1189    write_nested_message(output_stream, scratch_buf, SKETCH_DOGSKETCHES_FIELD_NUMBER, |os| {
1190        os.write_int64(DOGSKETCH_TS_FIELD_NUMBER, timestamp.map_or(0, |ts| ts.get() as i64))?;
1191        os.write_int64(DOGSKETCH_CNT_FIELD_NUMBER, sketch.count() as i64)?;
1192        os.write_double(DOGSKETCH_MIN_FIELD_NUMBER, sketch.min().unwrap())?;
1193        os.write_double(DOGSKETCH_MAX_FIELD_NUMBER, sketch.max().unwrap())?;
1194        os.write_double(DOGSKETCH_AVG_FIELD_NUMBER, sketch.avg().unwrap())?;
1195        os.write_double(DOGSKETCH_SUM_FIELD_NUMBER, sketch.sum().unwrap())?;
1196
1197        let bin_keys = sketch.bins().iter().map(|bin| bin.key());
1198        write_repeated_packed_from_iter(
1199            os,
1200            packed_scratch_buf,
1201            DOGSKETCH_K_FIELD_NUMBER,
1202            bin_keys,
1203            |inner_os, value| inner_os.write_sint32_no_tag(value),
1204        )?;
1205
1206        let bin_counts = sketch.bins().iter().map(|bin| bin.count());
1207        write_repeated_packed_from_iter(
1208            os,
1209            packed_scratch_buf,
1210            DOGSKETCH_N_FIELD_NUMBER,
1211            bin_counts,
1212            |inner_os, value| inner_os.write_uint32_no_tag(value),
1213        )
1214    })
1215}
1216
1217fn get_deduplicated_tags<'a>(
1218    metric: &'a Metric, additional_tags: &'a SharedTagSet, tags_deduplicator: &'a mut ReusableDeduplicator<Tag>,
1219) -> impl Iterator<Item = &'a Tag> {
1220    let chained_tags = metric
1221        .context()
1222        .tags()
1223        .into_iter()
1224        .chain(additional_tags)
1225        .chain(metric.context().origin_tags());
1226
1227    tags_deduplicator.deduplicated(chained_tags)
1228}
1229
1230fn write_tags<'a, I, F>(
1231    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, tag_encoder: F,
1232) -> Result<(), protobuf::Error>
1233where
1234    I: Iterator<Item = &'a Tag>,
1235    F: Fn(&Tag, &mut CodedOutputStream<'_>, &mut Vec<u8>) -> Result<(), protobuf::Error>,
1236{
1237    for tag in tags {
1238        tag_encoder(tag, output_stream, scratch_buf)?;
1239    }
1240
1241    Ok(())
1242}
1243
1244fn write_series_tags<'a, I>(
1245    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
1246) -> Result<(), protobuf::Error>
1247where
1248    I: Iterator<Item = &'a Tag>,
1249{
1250    write_tags(tags, output_stream, scratch_buf, |tag, os, buf| {
1251        // If this is a resource tag, we'll convert it directly to a resource entry.
1252        if tag.name() == "dd.internal.resource" {
1253            if let Some((resource_type, resource_name)) = tag.value().and_then(|s| s.split_once(':')) {
1254                write_resource(os, buf, resource_type, resource_name)
1255            } else {
1256                Ok(())
1257            }
1258        } else {
1259            // We're dealing with a normal tag.
1260            os.write_string(SERIES_TAGS_FIELD_NUMBER, tag.as_str())
1261        }
1262    })
1263}
1264
1265fn write_sketch_tags<'a, I>(
1266    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
1267) -> Result<(), protobuf::Error>
1268where
1269    I: Iterator<Item = &'a Tag>,
1270{
1271    write_tags(tags, output_stream, scratch_buf, |tag, os, _buf| {
1272        // We always write the tags as-is, without any special handling for resource tags.
1273        os.write_string(SKETCH_TAGS_FIELD_NUMBER, tag.as_str())
1274    })
1275}
1276
1277fn write_nested_message<F>(
1278    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, writer: F,
1279) -> Result<(), protobuf::Error>
1280where
1281    F: FnOnce(&mut CodedOutputStream<'_>) -> Result<(), protobuf::Error>,
1282{
1283    scratch_buf.clear();
1284
1285    {
1286        let mut nested_output_stream = CodedOutputStream::vec(scratch_buf);
1287        writer(&mut nested_output_stream)?;
1288        nested_output_stream.flush()?;
1289    }
1290
1291    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1292
1293    let nested_message_size = get_message_size_from_buffer(scratch_buf)?;
1294    output_stream.write_raw_varint32(nested_message_size)?;
1295    output_stream.write_raw_bytes(scratch_buf)
1296}
1297
1298fn write_repeated_packed_from_iter<I, T, F>(
1299    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, values: I, writer: F,
1300) -> Result<(), protobuf::Error>
1301where
1302    I: Iterator<Item = T>,
1303    F: Fn(&mut CodedOutputStream<'_>, T) -> Result<(), protobuf::Error>,
1304{
1305    // This is a helper function that lets us write out a packed repeated field from an iterator of values.
1306    // `CodedOutputStream` has similar functions to handle this, but they require a slice of values, which would mean we
1307    // need to either allocate a new vector each time to hold the values, or thread through two additional vectors (one
1308    // for `i32`, one for `u32`) to reuse the allocation... both of which are not great options.
1309    //
1310    // We've simply opted to pass through a _single_ vector that we can reuse, and write the packed values directly to
1311    // that, almost identically to how `CodedOutputStream::write_repeated_packed_*` methods would do it.
1312
1313    scratch_buf.clear();
1314
1315    {
1316        let mut packed_output_stream = CodedOutputStream::vec(scratch_buf);
1317        for value in values {
1318            writer(&mut packed_output_stream, value)?;
1319        }
1320        packed_output_stream.flush()?;
1321    }
1322
1323    let data_size = get_message_size_from_buffer(scratch_buf)?;
1324
1325    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1326    output_stream.write_raw_varint32(data_size)?;
1327    output_stream.write_raw_bytes(scratch_buf)
1328}
1329
1330#[cfg(test)]
1331mod tests {
1332    use std::{sync::Arc, time::Duration};
1333
1334    use protobuf::CodedOutputStream;
1335    use saluki_common::iter::ReusableDeduplicator;
1336    use saluki_context::{tags::SharedTagSet, Context};
1337    use saluki_core::data_model::event::metric::{Metric, MetricMetadata, MetricOrigin, MetricValues};
1338    use serde_json::Value as JsonValue;
1339    use stringtheory::MetaString;
1340
1341    use super::{
1342        encode_series_v1_metric, encode_series_v2_metric, encode_sketch_metric, MetricsEndpoint,
1343        MetricsEndpointEncoder, SERIES_V1_INPUT_SEPARATOR, SERIES_V1_PAYLOAD_PREFIX, SERIES_V1_PAYLOAD_SUFFIX,
1344    };
1345    use crate::common::datadog::{
1346        request_builder::EndpointEncoder as _, DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
1347        DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
1348    };
1349
1350    fn encode_one_v1(metric: &Metric) -> JsonValue {
1351        let mut buf = Vec::new();
1352        let host_tags = SharedTagSet::default();
1353        let mut tags_deduplicator = ReusableDeduplicator::new();
1354        encode_series_v1_metric(metric, &host_tags, &mut buf, &mut tags_deduplicator)
1355            .expect("encode_series_v1_metric should succeed");
1356        serde_json::from_slice(&buf).expect("encoder produced invalid JSON")
1357    }
1358
1359    #[test]
1360    fn histogram_vs_sketch_identical_payload() {
1361        // For the same exact set of points, we should be able to construct either a histogram or distribution from
1362        // those points, and when encoded as a sketch payload, end up with the same exact payload.
1363        //
1364        // They should be identical because the goal is that we convert histograms into sketches in the same way we
1365        // would have originally constructed a sketch based on the same samples.
1366        let samples = &[1.0, 2.0, 3.0, 4.0, 5.0];
1367        let histogram = Metric::histogram("simple_samples", samples);
1368        let distribution = Metric::distribution("simple_samples", samples);
1369        let host_tags = SharedTagSet::default();
1370
1371        let mut buf1 = Vec::new();
1372        let mut buf2 = Vec::new();
1373        let mut tags_deduplicator = ReusableDeduplicator::new();
1374
1375        let mut histogram_payload = Vec::new();
1376        {
1377            let mut histogram_writer = CodedOutputStream::vec(&mut histogram_payload);
1378            encode_sketch_metric(
1379                &histogram,
1380                &host_tags,
1381                &mut histogram_writer,
1382                &mut buf1,
1383                &mut buf2,
1384                &mut tags_deduplicator,
1385            )
1386            .expect("Failed to encode histogram as sketch");
1387        }
1388
1389        let mut distribution_payload = Vec::new();
1390        {
1391            let mut distribution_writer = CodedOutputStream::vec(&mut distribution_payload);
1392            encode_sketch_metric(
1393                &distribution,
1394                &host_tags,
1395                &mut distribution_writer,
1396                &mut buf1,
1397                &mut buf2,
1398                &mut tags_deduplicator,
1399            )
1400            .expect("Failed to encode distribution as sketch");
1401        }
1402
1403        assert_eq!(histogram_payload, distribution_payload);
1404    }
1405
1406    #[test]
1407    fn input_valid() {
1408        // Our encoder should always consider series metrics valid when set to either series endpoint, and similarly
1409        // for sketch metrics when set to the sketches endpoint.
1410        let counter = Metric::counter("counter", 1.0);
1411        let rate = Metric::rate("rate", 1.0, Duration::from_secs(1));
1412        let gauge = Metric::gauge("gauge", 1.0);
1413        let set = Metric::set("set", "foo");
1414        let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1415        let distribution = Metric::distribution("distribution", [1.0, 2.0, 3.0]);
1416
1417        let series_v1 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV1);
1418        let series_v2 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1419        let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1420
1421        for series_endpoint in [&series_v1, &series_v2] {
1422            assert!(series_endpoint.is_valid_input(&counter));
1423            assert!(series_endpoint.is_valid_input(&rate));
1424            assert!(series_endpoint.is_valid_input(&gauge));
1425            assert!(series_endpoint.is_valid_input(&set));
1426            assert!(!series_endpoint.is_valid_input(&histogram));
1427            assert!(!series_endpoint.is_valid_input(&distribution));
1428        }
1429
1430        assert!(!sketches_endpoint.is_valid_input(&counter));
1431        assert!(!sketches_endpoint.is_valid_input(&rate));
1432        assert!(!sketches_endpoint.is_valid_input(&gauge));
1433        assert!(!sketches_endpoint.is_valid_input(&set));
1434        assert!(sketches_endpoint.is_valid_input(&histogram));
1435        assert!(sketches_endpoint.is_valid_input(&distribution));
1436    }
1437
1438    #[test]
1439    fn input_data_point_count_tracks_metric_values() {
1440        let counter = Metric::counter("counter", [(123, 1.0), (124, 2.0)]);
1441        let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1442
1443        let series_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1444        let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1445
1446        assert_eq!(series_endpoint.input_data_point_count(&counter), 2);
1447        assert_eq!(sketches_endpoint.input_data_point_count(&histogram), 1);
1448    }
1449
1450    #[test]
1451    fn series_metric_unit_encoded() {
1452        // A gauge with a unit in its metadata must produce a series protobuf payload that contains the unit string
1453        // in field 6 (MetricSeries.unit), which the Datadog backend already accepts.
1454        //
1455        // In production this state is reached when histogram aggregation flushes timer (`ms`) statistics as gauges,
1456        // each carrying unit = "millisecond" propagated through MetricMetadata.
1457        let context = Context::from_static_parts("my.timer.avg", &[]);
1458        let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1459        let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1460
1461        let host_tags = SharedTagSet::default();
1462        let mut scratch_buf = Vec::new();
1463        let mut tags_deduplicator = ReusableDeduplicator::new();
1464
1465        let mut payload = Vec::new();
1466        {
1467            let mut writer = CodedOutputStream::vec(&mut payload);
1468            encode_series_v2_metric(
1469                &gauge,
1470                &host_tags,
1471                &mut writer,
1472                &mut scratch_buf,
1473                &mut tags_deduplicator,
1474            )
1475            .expect("Failed to encode gauge as series metric");
1476            writer.flush().expect("Failed to flush");
1477        }
1478
1479        // In the protobuf wire format, a string field with field number 6 has tag byte 0x32 ((6 << 3) | 2).
1480        // The tag is followed by a varint length and then the UTF-8 bytes of the string.
1481        let expected_tag: u8 = (6 << 3) | 2; // 0x32
1482        let expected_value = b"millisecond";
1483
1484        let tag_pos = payload
1485            .windows(1 + 1 + expected_value.len())
1486            .position(|w| w[0] == expected_tag && w[1] == expected_value.len() as u8 && &w[2..] == expected_value);
1487
1488        assert!(
1489            tag_pos.is_some(),
1490            "series payload should contain unit field (field 6 = 'millisecond'), got bytes: {:?}",
1491            payload
1492        );
1493    }
1494
1495    #[test]
1496    fn series_v1_basic_payload_shape() {
1497        // Each metric variant maps to the right `type` string, points are emitted as [ts, value] tuples,
1498        // and `interval`/`host` are always present (zero/empty when not set).
1499        let counter = Metric::counter("my.count", 5.0);
1500        let counter_json = encode_one_v1(&counter);
1501        assert_eq!(counter_json["metric"], "my.count");
1502        assert_eq!(counter_json["type"], "count");
1503        assert_eq!(counter_json["interval"], 0);
1504        assert_eq!(counter_json["host"], "");
1505        assert_eq!(counter_json["tags"], JsonValue::Array(vec![]));
1506        let points = counter_json["points"].as_array().expect("points is array");
1507        assert_eq!(points.len(), 1);
1508        assert_eq!(points[0][0], 0);
1509        assert_eq!(points[0][1], 5.0);
1510        // Optional fields must be absent when not set.
1511        assert!(counter_json.get("unit").is_none());
1512        assert!(counter_json.get("source_type_name").is_none());
1513        assert!(counter_json.get("device").is_none());
1514
1515        let rate = Metric::rate("my.rate", 30.0, Duration::from_secs(10));
1516        let rate_json = encode_one_v1(&rate);
1517        assert_eq!(rate_json["type"], "rate");
1518        assert_eq!(rate_json["interval"], 10);
1519        // Rate value scaled by interval seconds: 30 / 10 = 3.
1520        let rate_points = rate_json["points"].as_array().expect("rate points is array");
1521        assert_eq!(rate_points[0][1], 3.0);
1522
1523        let gauge = Metric::gauge("my.gauge", 42.0);
1524        let gauge_json = encode_one_v1(&gauge);
1525        assert_eq!(gauge_json["type"], "gauge");
1526
1527        // Sets are encoded as gauges with the set cardinality as the value (consistent with V2).
1528        let set = Metric::set("my.set", "alpha");
1529        let set_json = encode_one_v1(&set);
1530        assert_eq!(set_json["type"], "gauge");
1531        let set_points = set_json["points"].as_array().expect("set points is array");
1532        assert_eq!(set_points[0][1], 1.0);
1533    }
1534
1535    #[test]
1536    fn series_v1_unit_and_hostname_emitted() {
1537        let context = Context::from_static_parts("my.timer.avg", &[]);
1538        let metadata = MetricMetadata::default()
1539            .with_unit(MetaString::from_static("millisecond"))
1540            .with_hostname(Some(Arc::from("host-1")));
1541        let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1542
1543        let json = encode_one_v1(&gauge);
1544        assert_eq!(json["unit"], "millisecond");
1545        assert_eq!(json["host"], "host-1");
1546    }
1547
1548    #[test]
1549    fn series_v1_device_tag_extraction() {
1550        // A `device:<value>` tag is extracted into the `device` JSON field and dropped from `tags`.
1551        let context = Context::from_static_parts("my.metric", &["device:eth0", "env:prod"]);
1552        let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), MetricMetadata::default());
1553
1554        let json = encode_one_v1(&counter);
1555        assert_eq!(json["device"], "eth0");
1556        let tags = json["tags"].as_array().expect("tags is array");
1557        let tag_strs: Vec<&str> = tags.iter().filter_map(|v| v.as_str()).collect();
1558        assert!(
1559            !tag_strs.iter().any(|t| t.starts_with("device:")),
1560            "device tag must be removed: {:?}",
1561            tag_strs
1562        );
1563        assert!(tag_strs.contains(&"env:prod"));
1564    }
1565
1566    #[test]
1567    fn series_v1_source_type_name_from_source_type_origin() {
1568        let context = Context::from_static_parts("my.metric", &[]);
1569        let metadata = MetricMetadata::default().with_source_type(Some(Arc::from("integration_x")));
1570        let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), metadata);
1571
1572        let json = encode_one_v1(&counter);
1573        assert_eq!(json["source_type_name"], "integration_x");
1574    }
1575
1576    #[test]
1577    fn series_v1_origin_metadata_dropped() {
1578        // OriginMetadata is V2-protobuf only; V1 must drop it.
1579        let context = Context::from_static_parts("my.metric", &[]);
1580        let metadata = MetricMetadata::default().with_origin(Some(MetricOrigin::dogstatsd()));
1581        let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), metadata);
1582
1583        let json = encode_one_v1(&counter);
1584        assert!(json.get("source_type_name").is_none());
1585    }
1586
1587    #[test]
1588    fn series_v1_dd_internal_resource_dropped() {
1589        // `dd.internal.resource` is V2-protobuf-only; V1 must drop these tags silently.
1590        let context = Context::from_static_parts("my.metric", &["dd.internal.resource:host:foo", "env:prod"]);
1591        let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), MetricMetadata::default());
1592
1593        let json = encode_one_v1(&counter);
1594        let tags = json["tags"].as_array().expect("tags is array");
1595        let tag_strs: Vec<&str> = tags.iter().filter_map(|v| v.as_str()).collect();
1596        assert!(
1597            !tag_strs.iter().any(|t| t.starts_with("dd.internal.resource:")),
1598            "dd.internal.resource tag must be dropped: {:?}",
1599            tag_strs
1600        );
1601        assert!(tag_strs.contains(&"env:prod"));
1602    }
1603
1604    #[test]
1605    fn series_v1_endpoint_routing() {
1606        // SeriesV1 advertises the V1 URI, JSON content type, and the {"series":[...]} framing.
1607        let encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV1);
1608        assert_eq!(encoder.endpoint_uri().path(), "/api/v1/series");
1609        assert_eq!(encoder.content_type(), "application/json");
1610        assert_eq!(encoder.get_payload_prefix(), Some(SERIES_V1_PAYLOAD_PREFIX));
1611        assert_eq!(encoder.get_payload_suffix(), Some(SERIES_V1_PAYLOAD_SUFFIX));
1612        assert_eq!(encoder.get_input_separator(), Some(SERIES_V1_INPUT_SEPARATOR));
1613        assert_eq!(
1614            encoder.compressed_size_limit(),
1615            DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
1616        );
1617        assert_eq!(
1618            encoder.uncompressed_size_limit(),
1619            DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
1620        );
1621
1622        // Sketches use the generic serializer payload limits in the Datadog Agent.
1623        let sketches = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1624        assert_eq!(
1625            sketches.compressed_size_limit(),
1626            DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
1627        );
1628        assert_eq!(
1629            sketches.uncompressed_size_limit(),
1630            DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
1631        );
1632
1633        // V2 series stays on protobuf with no framing.
1634        let v2 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1635        assert_eq!(v2.endpoint_uri().path(), "/api/v2/series");
1636        assert_eq!(v2.content_type(), "application/x-protobuf");
1637        assert!(v2.get_payload_prefix().is_none());
1638    }
1639}
1640
1641#[cfg(test)]
1642mod config_smoke {
1643    use datadog_agent_config_testing::config_registry::structs;
1644    use datadog_agent_config_testing::run_config_smoke_tests;
1645    use serde_json::json;
1646
1647    use super::DatadogMetricsConfiguration;
1648    use crate::config::{DatadogRemapper, KEY_ALIASES};
1649
1650    #[tokio::test]
1651    async fn smoke_test() {
1652        run_config_smoke_tests(
1653            structs::DATADOG_METRICS_CONFIGURATION,
1654            &[],
1655            json!({}),
1656            |cfg| {
1657                cfg.as_typed::<DatadogMetricsConfiguration>()
1658                    .expect("DatadogMetricsConfiguration should deserialize")
1659            },
1660            KEY_ALIASES,
1661            DatadogRemapper::new,
1662        )
1663        .await
1664    }
1665}
1666
1667#[cfg(test)]
1668mod use_v2_api_series_default {
1669    use saluki_config::ConfigurationLoader;
1670    use serde_json::json;
1671
1672    use super::{DatadogMetricsConfiguration, SERIES_V2_COMPRESSED_SIZE_LIMIT, SERIES_V2_UNCOMPRESSED_SIZE_LIMIT};
1673    use crate::{common::datadog::clamp_payload_limits, config::KEY_ALIASES};
1674
1675    /// `use_v2_api_series` defaults to `true` (preserves V2 protobuf behavior when the flag is absent).
1676    /// The nested-form (`use_v2_api.series`) and env-var (`DD_USE_V2_API_SERIES`) paths to the flat key
1677    /// are exercised end-to-end by the `config_smoke::smoke_test` runner via `KEY_ALIASES`.
1678    #[tokio::test]
1679    async fn defaults_to_true_when_absent() {
1680        let cfg = ConfigurationLoader::default()
1681            .with_key_aliases(KEY_ALIASES)
1682            .add_providers([figment::providers::Serialized::defaults(json!({}))])
1683            .into_generic()
1684            .await
1685            .expect("config should load");
1686        let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1687        assert!(parsed.use_v2_api_series);
1688    }
1689
1690    #[tokio::test]
1691    async fn deserializes_payload_limit_keys() {
1692        let cfg = ConfigurationLoader::default()
1693            .with_key_aliases(KEY_ALIASES)
1694            .add_providers([figment::providers::Serialized::defaults(json!({
1695                "serializer_max_payload_size": 4321,
1696                "serializer_max_uncompressed_payload_size": 8765,
1697                "serializer_max_series_payload_size": 1234,
1698                "serializer_max_series_uncompressed_payload_size": 5678,
1699            }))])
1700            .into_generic()
1701            .await
1702            .expect("config should load");
1703        let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1704
1705        assert_eq!(parsed.max_payload_size, 4321);
1706        assert_eq!(parsed.max_uncompressed_payload_size, 8765);
1707        assert_eq!(parsed.max_series_payload_size, 1234);
1708        assert_eq!(parsed.max_series_uncompressed_payload_size, 5678);
1709    }
1710
1711    #[tokio::test]
1712    async fn deserializes_max_series_points_per_payload() {
1713        // Default should be 10,000.
1714        let cfg = ConfigurationLoader::default()
1715            .with_key_aliases(KEY_ALIASES)
1716            .add_providers([figment::providers::Serialized::defaults(json!({}))])
1717            .into_generic()
1718            .await
1719            .expect("config should load");
1720        let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1721        assert_eq!(parsed.max_series_points_per_payload, 10_000);
1722
1723        // Explicit value should round-trip.
1724        let cfg = ConfigurationLoader::default()
1725            .with_key_aliases(KEY_ALIASES)
1726            .add_providers([figment::providers::Serialized::defaults(json!({
1727                "serializer_max_series_points_per_payload": 500,
1728            }))])
1729            .into_generic()
1730            .await
1731            .expect("config should load");
1732        let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1733        assert_eq!(parsed.max_series_points_per_payload, 500);
1734    }
1735
1736    #[test]
1737    fn clamps_series_payload_limit_keys_to_api_limits() {
1738        let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
1739            SERIES_V2_UNCOMPRESSED_SIZE_LIMIT + 1,
1740            SERIES_V2_COMPRESSED_SIZE_LIMIT + 1,
1741            SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
1742            SERIES_V2_COMPRESSED_SIZE_LIMIT,
1743        );
1744        assert_eq!(uncompressed_limit, SERIES_V2_UNCOMPRESSED_SIZE_LIMIT);
1745        assert_eq!(compressed_limit, SERIES_V2_COMPRESSED_SIZE_LIMIT);
1746
1747        let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
1748            5678,
1749            1234,
1750            SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
1751            SERIES_V2_COMPRESSED_SIZE_LIMIT,
1752        );
1753        assert_eq!(uncompressed_limit, 5678);
1754        assert_eq!(compressed_limit, 1234);
1755    }
1756}