saluki_components/encoders/datadog/metrics/
mod.rs

1use std::{num::NonZeroU64, time::Duration};
2
3use async_trait::async_trait;
4use datadog_protos::metrics as proto;
5use ddsketch_agent::DDSketch;
6use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
8use protobuf::{rt::WireType, CodedOutputStream, Enum as _};
9use saluki_common::{iter::ReusableDeduplicator, task::HandleExt as _};
10use saluki_config::GenericConfiguration;
11use saluki_context::tags::{SharedTagSet, Tag};
12use saluki_core::{
13    components::{encoders::*, ComponentContext},
14    data_model::{
15        event::{
16            metric::{Metric, MetricOrigin, MetricValues},
17            EventType,
18        },
19        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
20    },
21    observability::ComponentMetricsExt as _,
22    topology::{EventsBuffer, PayloadsBuffer},
23};
24use saluki_error::{generic_error, ErrorContext as _, GenericError};
25use saluki_io::compression::CompressionScheme;
26use saluki_metrics::MetricsBuilder;
27use serde::Deserialize;
28use tokio::{select, sync::mpsc, time::sleep};
29use tracing::{debug, error, warn};
30
31use crate::common::datadog::{
32    io::RB_BUFFER_CHUNK_SIZE,
33    request_builder::{EndpointEncoder, RequestBuilder},
34    telemetry::ComponentTelemetry,
35    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
36};
37
38const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; // 500 KiB
39const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; // 5 MiB
40
41const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
42
43// Protocol Buffers field numbers for series and sketch payload messages.
44//
45// These field numbers come from the Protocol Buffers definitions in `lib/datadog-protos/proto/agent_payload.proto`.
46const RESOURCES_TYPE_FIELD_NUMBER: u32 = 1;
47const RESOURCES_NAME_FIELD_NUMBER: u32 = 2;
48
49const METADATA_ORIGIN_FIELD_NUMBER: u32 = 1;
50
51const ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER: u32 = 4;
52const ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER: u32 = 5;
53const ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER: u32 = 6;
54
55const METRIC_POINT_VALUE_FIELD_NUMBER: u32 = 1;
56const METRIC_POINT_TIMESTAMP_FIELD_NUMBER: u32 = 2;
57
58const DOGSKETCH_TS_FIELD_NUMBER: u32 = 1;
59const DOGSKETCH_CNT_FIELD_NUMBER: u32 = 2;
60const DOGSKETCH_MIN_FIELD_NUMBER: u32 = 3;
61const DOGSKETCH_MAX_FIELD_NUMBER: u32 = 4;
62const DOGSKETCH_AVG_FIELD_NUMBER: u32 = 5;
63const DOGSKETCH_SUM_FIELD_NUMBER: u32 = 6;
64const DOGSKETCH_K_FIELD_NUMBER: u32 = 7;
65const DOGSKETCH_N_FIELD_NUMBER: u32 = 8;
66
67const SERIES_RESOURCES_FIELD_NUMBER: u32 = 1;
68const SERIES_METRIC_FIELD_NUMBER: u32 = 2;
69const SERIES_TAGS_FIELD_NUMBER: u32 = 3;
70const SERIES_POINTS_FIELD_NUMBER: u32 = 4;
71const SERIES_TYPE_FIELD_NUMBER: u32 = 5;
72const SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 7;
73const SERIES_INTERVAL_FIELD_NUMBER: u32 = 8;
74const SERIES_METADATA_FIELD_NUMBER: u32 = 9;
75
76const SKETCH_METRIC_FIELD_NUMBER: u32 = 1;
77const SKETCH_HOST_FIELD_NUMBER: u32 = 2;
78const SKETCH_TAGS_FIELD_NUMBER: u32 = 4;
79const SKETCH_DOGSKETCHES_FIELD_NUMBER: u32 = 7;
80const SKETCH_METADATA_FIELD_NUMBER: u32 = 8;
81
82static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
83
84const fn default_max_metrics_per_payload() -> usize {
85    10_000
86}
87
88const fn default_flush_timeout_secs() -> u64 {
89    2
90}
91
92fn default_serializer_compressor_kind() -> String {
93    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
94}
95
96const fn default_zstd_compressor_level() -> i32 {
97    3
98}
99
100/// Datadog Metrics encoder.
101///
102/// Generates Datadog metrics payloads for the Datadog platform.
103#[derive(Clone, Deserialize)]
104#[allow(dead_code)]
105pub struct DatadogMetricsConfiguration {
106    /// Maximum number of input metrics to encode into a single request payload.
107    ///
108    /// This applies both to the series and sketches endpoints.
109    ///
110    /// Defaults to 10,000.
111    #[serde(
112        rename = "serializer_max_metrics_per_payload",
113        default = "default_max_metrics_per_payload"
114    )]
115    max_metrics_per_payload: usize,
116
117    /// Flush timeout for pending requests, in seconds.
118    ///
119    /// When the destination has written metrics to the in-flight request payload, but it has not yet reached the
120    /// payload size limits that would force the payload to be flushed, the destination will wait for a period of time
121    /// before flushing the in-flight request payload. This allows for the possibility of other events to be processed
122    /// and written into the request payload, thereby maximizing the payload size and reducing the number of requests
123    /// generated and sent overall.
124    ///
125    /// Defaults to 2 seconds.
126    #[serde(default = "default_flush_timeout_secs")]
127    flush_timeout_secs: u64,
128
129    /// Compression kind to use for the request payloads.
130    ///
131    /// Defaults to `zstd`.
132    #[serde(
133        rename = "serializer_compressor_kind",
134        default = "default_serializer_compressor_kind"
135    )]
136    compressor_kind: String,
137
138    /// Compressor level to use when the compressor kind is `zstd`.
139    ///
140    /// Defaults to 3.
141    #[serde(
142        rename = "serializer_zstd_compressor_level",
143        default = "default_zstd_compressor_level"
144    )]
145    zstd_compressor_level: i32,
146
147    /// Additional tags to apply to all forwarded metrics.
148    #[serde(default, skip)]
149    additional_tags: Option<SharedTagSet>,
150}
151
152impl DatadogMetricsConfiguration {
153    /// Creates a new `DatadogMetricsConfiguration` from the given configuration.
154    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
155        Ok(config.as_typed()?)
156    }
157
158    /// Sets additional tags to be applied uniformly to all metrics forwarded by this destination.
159    pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
160        // Add the additional tags to the forwarder configuration.
161        self.additional_tags = Some(additional_tags);
162        self
163    }
164}
165
166#[async_trait]
167impl EncoderBuilder for DatadogMetricsConfiguration {
168    fn input_event_type(&self) -> EventType {
169        EventType::Metric
170    }
171
172    fn output_payload_type(&self) -> PayloadType {
173        PayloadType::Http
174    }
175
176    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
177        let metrics_builder = MetricsBuilder::from_component_context(&context);
178        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
179        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
180
181        // Create our request builders.
182        let mut series_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Series);
183        let mut sketches_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
184
185        if let Some(additional_tags) = self.additional_tags.as_ref() {
186            series_encoder = series_encoder.with_additional_tags(additional_tags.clone());
187            sketches_encoder = sketches_encoder.with_additional_tags(additional_tags.clone());
188        }
189
190        let mut series_rb = RequestBuilder::new(series_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
191        series_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
192
193        let mut sketches_rb = RequestBuilder::new(sketches_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
194        sketches_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
195
196        let flush_timeout = match self.flush_timeout_secs {
197            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
198            // batching, while still practically flushing things almost immediately.
199            0 => Duration::from_millis(10),
200            secs => Duration::from_secs(secs),
201        };
202
203        Ok(Box::new(DatadogMetrics {
204            series_rb,
205            sketches_rb,
206            telemetry,
207            flush_timeout,
208        }))
209    }
210}
211
212impl MemoryBounds for DatadogMetricsConfiguration {
213    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
214        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
215        //
216        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
217        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
218        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
219        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
220        // calculate the firm bound.
221        builder
222            .minimum()
223            .with_single_value::<DatadogMetrics>("component struct")
224            .with_array::<EventsBuffer>("request builder events channel", 8)
225            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
226
227        builder
228            .firm()
229            // Capture the size of the "split re-encode" buffers in the request builders, which is where we keep owned
230            // versions of metrics that we encode in case we need to actually re-encode them during a split operation.
231            .with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
232            .with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
233    }
234}
235
236pub struct DatadogMetrics {
237    series_rb: RequestBuilder<MetricsEndpointEncoder>,
238    sketches_rb: RequestBuilder<MetricsEndpointEncoder>,
239    telemetry: ComponentTelemetry,
240    flush_timeout: Duration,
241}
242
243#[async_trait]
244impl Encoder for DatadogMetrics {
245    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
246        let Self {
247            series_rb,
248            sketches_rb,
249            telemetry,
250            flush_timeout,
251        } = *self;
252
253        let mut health = context.take_health_handle();
254
255        // Spawn our request builder task.
256        let (events_tx, events_rx) = mpsc::channel(8);
257        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
258        let request_builder_fut =
259            run_request_builder(series_rb, sketches_rb, telemetry, events_rx, payloads_tx, flush_timeout);
260        let request_builder_handle = context
261            .topology_context()
262            .global_thread_pool()
263            .spawn_traced_named("dd-metrics-request-builder", request_builder_fut);
264
265        health.mark_ready();
266        debug!("Datadog Metrics encoder started.");
267
268        loop {
269            select! {
270                biased;
271
272                _ = health.live() => continue,
273                maybe_payload = payloads_rx.recv() => match maybe_payload {
274                    Some(payload) => {
275                        if let Err(e) = context.dispatcher().dispatch(payload).await {
276                            error!("Failed to dispatch payload: {}", e);
277                        }
278                    }
279                    None => break,
280                },
281                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
282                    Some(event_buffer) => events_tx.send(event_buffer).await
283                        .error_context("Failed to send event buffer to request builder task.")?,
284                    None => break,
285                },
286            }
287        }
288
289        // Drop the events sender, which signals the request builder task to stop.
290        drop(events_tx);
291
292        // Continue draining the payloads receiver until it is closed.
293        while let Some(payload) = payloads_rx.recv().await {
294            if let Err(e) = context.dispatcher().dispatch(payload).await {
295                error!("Failed to dispatch payload: {}", e);
296            }
297        }
298
299        // Request build task should now be stopped.
300        match request_builder_handle.await {
301            Ok(Ok(())) => debug!("Request builder task stopped."),
302            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
303            Err(e) => error!(error = %e, "Request builder task panicked."),
304        }
305
306        debug!("Datadog Metrics encoder stopped.");
307
308        Ok(())
309    }
310}
311
312async fn run_request_builder(
313    mut series_request_builder: RequestBuilder<MetricsEndpointEncoder>,
314    mut sketches_request_builder: RequestBuilder<MetricsEndpointEncoder>, telemetry: ComponentTelemetry,
315    mut events_rx: mpsc::Receiver<EventsBuffer>, payloads_tx: mpsc::Sender<PayloadsBuffer>, flush_timeout: Duration,
316) -> Result<(), GenericError> {
317    let mut pending_flush = false;
318    let pending_flush_timeout = sleep(flush_timeout);
319    tokio::pin!(pending_flush_timeout);
320
321    loop {
322        select! {
323            Some(event_buffer) = events_rx.recv() => {
324                for event in event_buffer {
325                    let metric = match event.try_into_metric() {
326                        Some(metric) => metric,
327                        None => continue,
328                    };
329
330                    let request_builder = match MetricsEndpoint::from_metric(&metric) {
331                        MetricsEndpoint::Series => &mut series_request_builder,
332                        MetricsEndpoint::Sketches => &mut sketches_request_builder,
333                    };
334
335                    // Encode the metric. If we get it back, that means the current request is full, and we need to
336                    // flush it before we can try to encode the metric again... so we'll hold on to it in that case
337                    // before flushing and trying to encode it again.
338                    let metric_to_retry = match request_builder.encode(metric).await {
339                        Ok(None) => continue,
340                        Ok(Some(metric)) => metric,
341                        Err(e) => {
342                            error!(error = %e, "Failed to encode metric.");
343                            telemetry.events_dropped_encoder().increment(1);
344                            continue;
345                        }
346                    };
347
348
349                    let maybe_requests = request_builder.flush().await;
350                    if maybe_requests.is_empty() {
351                        panic!("builder told us to flush, but gave us nothing");
352                    }
353
354                    for maybe_request in maybe_requests {
355                        match maybe_request {
356                            Ok((events, request)) => {
357                                let payload_meta = PayloadMetadata::from_event_count(events);
358                                let http_payload = HttpPayload::new(payload_meta, request);
359                                let payload = Payload::Http(http_payload);
360
361                                payloads_tx.send(payload).await
362                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
363                            },
364
365                            // TODO: Increment a counter here that metrics were dropped due to a flush failure.
366                            Err(e) => if e.is_recoverable() {
367                                // If the error is recoverable, we'll hold on to the metric to retry it later.
368                                continue;
369                            } else {
370                                return Err(GenericError::from(e).context("Failed to flush request."));
371                            }
372                        }
373                    }
374
375                    // Now try to encode the metric again. If it fails again, we'll just log it because it shouldn't
376                    // be possible to fail at this point, otherwise we would have already caught that the first
377                    // time.
378                    if let Err(e) = request_builder.encode(metric_to_retry).await {
379                        error!(error = %e, "Failed to encode metric.");
380                        telemetry.events_dropped_encoder().increment(1);
381                    }
382                }
383
384                debug!("Processed event buffer.");
385
386                // If we're not already pending a flush, we'll start the countdown.
387                if !pending_flush {
388                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
389                    pending_flush = true;
390                }
391            },
392            _ = &mut pending_flush_timeout, if pending_flush => {
393                debug!("Flushing pending request(s).");
394
395                pending_flush = false;
396
397                // Once we've encoded and written all metrics, we flush the request builders to generate a request with
398                // anything left over. Again, we'll enqueue those requests to be sent immediately.
399                let maybe_series_requests = series_request_builder.flush().await;
400                for maybe_request in maybe_series_requests {
401                    match maybe_request {
402                        Ok((events, request)) => {
403                            let payload_meta = PayloadMetadata::from_event_count(events);
404                            let http_payload = HttpPayload::new(payload_meta, request);
405                            let payload = Payload::Http(http_payload);
406
407                            payloads_tx.send(payload).await
408                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
409                        },
410
411                        // TODO: Increment a counter here that metrics were dropped due to a flush failure.
412                        Err(e) => if e.is_recoverable() {
413                            // If the error is recoverable, we'll hold on to the metric to retry it later.
414                            continue;
415                        } else {
416                            return Err(GenericError::from(e).context("Failed to flush request."));
417                        }
418                    }
419                }
420
421                let maybe_sketches_requests = sketches_request_builder.flush().await;
422                for maybe_request in maybe_sketches_requests {
423                    match maybe_request {
424                        Ok((events, request)) => {
425                            let payload_meta = PayloadMetadata::from_event_count(events);
426                            let http_payload = HttpPayload::new(payload_meta, request);
427                            let payload = Payload::Http(http_payload);
428
429                            payloads_tx.send(payload).await
430                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
431                        },
432
433                        // TODO: Increment a counter here that metrics were dropped due to a flush failure.
434                        Err(e) => if e.is_recoverable() {
435                            // If the error is recoverable, we'll hold on to the metric to retry it later.
436                            continue;
437                        } else {
438                            return Err(GenericError::from(e).context("Failed to flush request."));
439                        }
440                    }
441                }
442
443                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
444            },
445
446            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
447            else => break,
448        }
449    }
450
451    Ok(())
452}
453
454/// Metrics intake endpoint.
455#[derive(Clone, Copy, Debug, Eq, PartialEq)]
456enum MetricsEndpoint {
457    /// Series metrics.
458    ///
459    /// Includes counters, gauges, rates, and sets.
460    Series,
461
462    /// Sketch metrics.
463    ///
464    /// Includes histograms and distributions.
465    Sketches,
466}
467
468impl MetricsEndpoint {
469    /// Creates a new `MetricsEndpoint` from the given metric.
470    pub fn from_metric(metric: &Metric) -> Self {
471        match metric.values() {
472            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
473                Self::Series
474            }
475            MetricValues::Histogram(..) | MetricValues::Distribution(..) => Self::Sketches,
476        }
477    }
478}
479
480#[derive(Debug)]
481struct MetricsEndpointEncoder {
482    endpoint: MetricsEndpoint,
483    primary_scratch_buf: Vec<u8>,
484    secondary_scratch_buf: Vec<u8>,
485    packed_scratch_buf: Vec<u8>,
486    additional_tags: SharedTagSet,
487    tags_deduplicator: ReusableDeduplicator<Tag>,
488}
489
490impl MetricsEndpointEncoder {
491    /// Creates a new `MetricsEndpointEncoder` for the given endpoint.
492    pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self {
493        Self {
494            endpoint,
495            primary_scratch_buf: Vec::new(),
496            secondary_scratch_buf: Vec::new(),
497            packed_scratch_buf: Vec::new(),
498            additional_tags: SharedTagSet::default(),
499            tags_deduplicator: ReusableDeduplicator::new(),
500        }
501    }
502
503    /// Sets the additional tags to be included with every metric encoded by this encoder.
504    ///
505    /// These tags are added in a deduplicated fashion, the same as instrumented tags and origin tags. This is an
506    /// optimized codepath for tag inclusion in high-volume scenarios, where creating new additional contexts
507    /// through the traditional means (e.g., `ContextResolver`) would be too expensive.
508    pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
509        self.additional_tags = additional_tags;
510        self
511    }
512}
513
514impl EndpointEncoder for MetricsEndpointEncoder {
515    type Input = Metric;
516    type EncodeError = protobuf::Error;
517
518    fn encoder_name() -> &'static str {
519        "metrics"
520    }
521
522    fn compressed_size_limit(&self) -> usize {
523        match self.endpoint {
524            MetricsEndpoint::Series => SERIES_V2_COMPRESSED_SIZE_LIMIT,
525            MetricsEndpoint::Sketches => DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
526        }
527    }
528
529    fn uncompressed_size_limit(&self) -> usize {
530        match self.endpoint {
531            MetricsEndpoint::Series => SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
532            MetricsEndpoint::Sketches => DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
533        }
534    }
535
536    fn is_valid_input(&self, input: &Self::Input) -> bool {
537        let input_endpoint = MetricsEndpoint::from_metric(input);
538        input_endpoint == self.endpoint
539    }
540
541    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
542        // NOTE: We're passing _four_ buffers to `encode_single_metric`, which is a lot, but with good reason.
543        //
544        // The first buffer, `buffer`, is the overall output buffer: the caller expects us to put the full encoded
545        // metric payload into this buffer.
546        //
547        // The second and third buffers, `primary_scratch_buf` and `secondary_scratch_buf`, are used for roughly the
548        // same thing but deal with _nesting_. When writing a "message" in Protocol Buffers, the message data itself is
549        // prefixed with the field number and a length delimiter that specifies how long the message is. We can't write
550        // that length delimiter until we know the full size of the message, so we write the message to a scratch
551        // buffer, calculate its size, and then write the field number and length delimiter to the output buffer
552        // followed by the message data from the scratch buffer.
553        //
554        // We have _two_ scratch buffers because you need a dedicated buffer for each level of nested message. We have
555        // to be able to nest up to two levels deep in our metrics payload, so we need two scratch buffers to handle
556        // that.
557        //
558        // The fourth buffer, `packed_scratch_buf`, is used for writing out packed repeated fields. This is similar to
559        // the situation describe above, except it's not _exactly_ the same as an additional level of nesting.. so I
560        // just decided to give it a somewhat more descriptive name.
561        encode_single_metric(
562            input,
563            &self.additional_tags,
564            buffer,
565            &mut self.primary_scratch_buf,
566            &mut self.secondary_scratch_buf,
567            &mut self.packed_scratch_buf,
568            &mut self.tags_deduplicator,
569        )?;
570
571        Ok(())
572    }
573
574    fn endpoint_uri(&self) -> Uri {
575        match self.endpoint {
576            MetricsEndpoint::Series => PathAndQuery::from_static("/api/v2/series").into(),
577            MetricsEndpoint::Sketches => PathAndQuery::from_static("/api/beta/sketches").into(),
578        }
579    }
580
581    fn endpoint_method(&self) -> Method {
582        // Both endpoints use POST.
583        Method::POST
584    }
585
586    fn content_type(&self) -> HeaderValue {
587        // Both endpoints encode via Protocol Buffers.
588        CONTENT_TYPE_PROTOBUF.clone()
589    }
590}
591
592fn field_number_for_metric_type(metric: &Metric) -> u32 {
593    match metric.values() {
594        MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => 1,
595        MetricValues::Histogram(..) | MetricValues::Distribution(..) => 1,
596    }
597}
598
599fn get_message_size(raw_msg_size: usize) -> Result<u32, protobuf::Error> {
600    const MAX_MESSAGE_SIZE: u64 = i32::MAX as u64;
601
602    // Individual messages cannot be larger than `i32::MAX`, so check that here before proceeding.
603    if raw_msg_size as u64 > MAX_MESSAGE_SIZE {
604        return Err(std::io::Error::other("message size exceeds limit (2147483648 bytes)").into());
605    }
606
607    Ok(raw_msg_size as u32)
608}
609
610fn get_message_size_from_buffer(buf: &[u8]) -> Result<u32, protobuf::Error> {
611    get_message_size(buf.len())
612}
613
614fn encode_single_metric(
615    metric: &Metric, additional_tags: &SharedTagSet, output_buf: &mut Vec<u8>, primary_scratch_buf: &mut Vec<u8>,
616    secondary_scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
617    tags_deduplicator: &mut ReusableDeduplicator<Tag>,
618) -> Result<(), protobuf::Error> {
619    let mut output_stream = CodedOutputStream::vec(output_buf);
620    let field_number = field_number_for_metric_type(metric);
621
622    write_nested_message(&mut output_stream, primary_scratch_buf, field_number, |os| {
623        // Depending on the metric type, we write out the appropriate fields.
624        match metric.values() {
625            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
626                encode_series_metric(metric, additional_tags, os, secondary_scratch_buf, tags_deduplicator)
627            }
628            MetricValues::Histogram(..) | MetricValues::Distribution(..) => encode_sketch_metric(
629                metric,
630                additional_tags,
631                os,
632                secondary_scratch_buf,
633                packed_scratch_buf,
634                tags_deduplicator,
635            ),
636        }
637    })
638}
639
640fn encode_series_metric(
641    metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
642    scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
643) -> Result<(), protobuf::Error> {
644    // Write the metric name and tags.
645    output_stream.write_string(SERIES_METRIC_FIELD_NUMBER, metric.context().name())?;
646
647    let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
648    write_series_tags(deduplicated_tags, output_stream, scratch_buf)?;
649
650    // Set the host resource.
651    write_resource(
652        output_stream,
653        scratch_buf,
654        "host",
655        metric.metadata().hostname().unwrap_or_default(),
656    )?;
657
658    // Write the origin metadata, if it exists.
659    if let Some(origin) = metric.metadata().origin() {
660        match origin {
661            MetricOrigin::SourceType(source_type) => {
662                output_stream.write_string(SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER, source_type.as_ref())?;
663            }
664            MetricOrigin::OriginMetadata {
665                product,
666                subproduct,
667                product_detail,
668            } => {
669                write_origin_metadata(
670                    output_stream,
671                    scratch_buf,
672                    SERIES_METADATA_FIELD_NUMBER,
673                    *product,
674                    *subproduct,
675                    *product_detail,
676                )?;
677            }
678        }
679    }
680
681    // Now write out our metric type, points, and interval (if applicable).
682    let (metric_type, points, maybe_interval) = match metric.values() {
683        MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
684        MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
685        MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
686        MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
687        _ => unreachable!(),
688    };
689
690    output_stream.write_enum(SERIES_TYPE_FIELD_NUMBER, metric_type.value())?;
691
692    for (timestamp, value) in points {
693        // If this is a rate metric, scale our value by the interval, in seconds.
694        let value = maybe_interval
695            .map(|interval| value / interval.as_secs_f64())
696            .unwrap_or(value);
697        let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
698
699        write_point(output_stream, scratch_buf, value, timestamp)?;
700    }
701
702    if let Some(interval) = maybe_interval {
703        output_stream.write_int64(SERIES_INTERVAL_FIELD_NUMBER, interval.as_secs() as i64)?;
704    }
705
706    Ok(())
707}
708
709fn encode_sketch_metric(
710    metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
711    scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
712) -> Result<(), protobuf::Error> {
713    // Write the metric name and tags.
714    output_stream.write_string(SKETCH_METRIC_FIELD_NUMBER, metric.context().name())?;
715
716    let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
717    write_sketch_tags(deduplicated_tags, output_stream, scratch_buf)?;
718
719    // Write the host.
720    output_stream.write_string(
721        SKETCH_HOST_FIELD_NUMBER,
722        metric.metadata().hostname().unwrap_or_default(),
723    )?;
724
725    // Set the origin metadata, if it exists.
726    if let Some(MetricOrigin::OriginMetadata {
727        product,
728        subproduct,
729        product_detail,
730    }) = metric.metadata().origin()
731    {
732        write_origin_metadata(
733            output_stream,
734            scratch_buf,
735            SKETCH_METADATA_FIELD_NUMBER,
736            *product,
737            *subproduct,
738            *product_detail,
739        )?;
740    }
741
742    // Write out our sketches.
743    match metric.values() {
744        MetricValues::Distribution(sketches) => {
745            for (timestamp, value) in sketches {
746                write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, value)?;
747            }
748        }
749        MetricValues::Histogram(points) => {
750            for (timestamp, histogram) in points {
751                // We convert histograms to sketches to be able to write them out in the payload.
752                let mut ddsketch = DDSketch::default();
753                for sample in histogram.samples() {
754                    ddsketch.insert_n(sample.value.into_inner(), sample.weight as u32);
755                }
756
757                write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
758            }
759        }
760        _ => unreachable!(),
761    }
762
763    Ok(())
764}
765
766fn write_resource(
767    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, resource_type: &str, resource_name: &str,
768) -> Result<(), protobuf::Error> {
769    write_nested_message(output_stream, scratch_buf, SERIES_RESOURCES_FIELD_NUMBER, |os| {
770        os.write_string(RESOURCES_TYPE_FIELD_NUMBER, resource_type)?;
771        os.write_string(RESOURCES_NAME_FIELD_NUMBER, resource_name)
772    })
773}
774
775fn write_origin_metadata(
776    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, origin_product: u32,
777    origin_category: u32, origin_service: u32,
778) -> Result<(), protobuf::Error> {
779    // TODO: Figure out how to cleanly use `write_nested_message` here.
780
781    scratch_buf.clear();
782
783    {
784        let mut origin_output_stream = CodedOutputStream::vec(scratch_buf);
785        origin_output_stream.write_uint32(ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER, origin_product)?;
786        origin_output_stream.write_uint32(ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER, origin_category)?;
787        origin_output_stream.write_uint32(ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER, origin_service)?;
788        origin_output_stream.flush()?;
789    }
790
791    // We do a little song and dance here because the `Origin` message is embedded inside of `Metadata`, so we need to
792    // write out field numbers/length delimiters in order: `Metadata`, and then `Origin`... but we write out origin
793    // message to the scratch buffer first... so we write out our `Metadata` preamble stuff to get its length, and then
794    // use that in conjunction with the `Origin` message size to write out the full `Metadata` message.
795    let origin_message_size = get_message_size_from_buffer(scratch_buf)?;
796
797    let mut metadata_preamble_buf = [0; 64];
798    let metadata_preamble_len = {
799        let mut metadata_output_stream = CodedOutputStream::bytes(&mut metadata_preamble_buf[..]);
800        metadata_output_stream.write_tag(METADATA_ORIGIN_FIELD_NUMBER, WireType::LengthDelimited)?;
801        metadata_output_stream.write_raw_varint32(origin_message_size)?;
802        metadata_output_stream.flush()?;
803        metadata_output_stream.total_bytes_written() as usize
804    };
805
806    let metadata_message_size = get_message_size(scratch_buf.len() + metadata_preamble_len)?;
807
808    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
809    output_stream.write_raw_varint32(metadata_message_size)?;
810    output_stream.write_raw_bytes(&metadata_preamble_buf[..metadata_preamble_len])?;
811    output_stream.write_raw_bytes(scratch_buf)
812}
813
814fn write_point(
815    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, value: f64, timestamp: i64,
816) -> Result<(), protobuf::Error> {
817    write_nested_message(output_stream, scratch_buf, SERIES_POINTS_FIELD_NUMBER, |os| {
818        os.write_double(METRIC_POINT_VALUE_FIELD_NUMBER, value)?;
819        os.write_int64(METRIC_POINT_TIMESTAMP_FIELD_NUMBER, timestamp)
820    })
821}
822
823fn write_dogsketch(
824    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
825    timestamp: Option<NonZeroU64>, sketch: &DDSketch,
826) -> Result<(), protobuf::Error> {
827    // If the sketch is empty, we don't write it out.
828    if sketch.is_empty() {
829        warn!("Attempted to write an empty sketch to sketches payload, skipping.");
830        return Ok(());
831    }
832
833    write_nested_message(output_stream, scratch_buf, SKETCH_DOGSKETCHES_FIELD_NUMBER, |os| {
834        os.write_int64(DOGSKETCH_TS_FIELD_NUMBER, timestamp.map_or(0, |ts| ts.get() as i64))?;
835        os.write_int64(DOGSKETCH_CNT_FIELD_NUMBER, sketch.count() as i64)?;
836        os.write_double(DOGSKETCH_MIN_FIELD_NUMBER, sketch.min().unwrap())?;
837        os.write_double(DOGSKETCH_MAX_FIELD_NUMBER, sketch.max().unwrap())?;
838        os.write_double(DOGSKETCH_AVG_FIELD_NUMBER, sketch.avg().unwrap())?;
839        os.write_double(DOGSKETCH_SUM_FIELD_NUMBER, sketch.sum().unwrap())?;
840
841        let bin_keys = sketch.bins().iter().map(|bin| bin.key());
842        write_repeated_packed_from_iter(
843            os,
844            packed_scratch_buf,
845            DOGSKETCH_K_FIELD_NUMBER,
846            bin_keys,
847            |inner_os, value| inner_os.write_sint32_no_tag(value),
848        )?;
849
850        let bin_counts = sketch.bins().iter().map(|bin| bin.count());
851        write_repeated_packed_from_iter(
852            os,
853            packed_scratch_buf,
854            DOGSKETCH_N_FIELD_NUMBER,
855            bin_counts,
856            |inner_os, value| inner_os.write_uint32_no_tag(value),
857        )
858    })
859}
860
861fn get_deduplicated_tags<'a>(
862    metric: &'a Metric, additional_tags: &'a SharedTagSet, tags_deduplicator: &'a mut ReusableDeduplicator<Tag>,
863) -> impl Iterator<Item = &'a Tag> {
864    let chained_tags = metric
865        .context()
866        .tags()
867        .into_iter()
868        .chain(additional_tags)
869        .chain(metric.context().origin_tags());
870
871    tags_deduplicator.deduplicated(chained_tags)
872}
873
874fn write_tags<'a, I, F>(
875    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, tag_encoder: F,
876) -> Result<(), protobuf::Error>
877where
878    I: Iterator<Item = &'a Tag>,
879    F: Fn(&Tag, &mut CodedOutputStream<'_>, &mut Vec<u8>) -> Result<(), protobuf::Error>,
880{
881    for tag in tags {
882        tag_encoder(tag, output_stream, scratch_buf)?;
883    }
884
885    Ok(())
886}
887
888fn write_series_tags<'a, I>(
889    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
890) -> Result<(), protobuf::Error>
891where
892    I: Iterator<Item = &'a Tag>,
893{
894    write_tags(tags, output_stream, scratch_buf, |tag, os, buf| {
895        // If this is a resource tag, we'll convert it directly to a resource entry.
896        if tag.name() == "dd.internal.resource" {
897            if let Some((resource_type, resource_name)) = tag.value().and_then(|s| s.split_once(':')) {
898                write_resource(os, buf, resource_type, resource_name)
899            } else {
900                Ok(())
901            }
902        } else {
903            // We're dealing with a normal tag.
904            os.write_string(SERIES_TAGS_FIELD_NUMBER, tag.as_str())
905        }
906    })
907}
908
909fn write_sketch_tags<'a, I>(
910    tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
911) -> Result<(), protobuf::Error>
912where
913    I: Iterator<Item = &'a Tag>,
914{
915    write_tags(tags, output_stream, scratch_buf, |tag, os, _buf| {
916        // We always write the tags as-is, without any special handling for resource tags.
917        os.write_string(SKETCH_TAGS_FIELD_NUMBER, tag.as_str())
918    })
919}
920
921fn write_nested_message<F>(
922    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, writer: F,
923) -> Result<(), protobuf::Error>
924where
925    F: FnOnce(&mut CodedOutputStream<'_>) -> Result<(), protobuf::Error>,
926{
927    scratch_buf.clear();
928
929    {
930        let mut nested_output_stream = CodedOutputStream::vec(scratch_buf);
931        writer(&mut nested_output_stream)?;
932        nested_output_stream.flush()?;
933    }
934
935    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
936
937    let nested_message_size = get_message_size_from_buffer(scratch_buf)?;
938    output_stream.write_raw_varint32(nested_message_size)?;
939    output_stream.write_raw_bytes(scratch_buf)
940}
941
942fn write_repeated_packed_from_iter<I, T, F>(
943    output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, values: I, writer: F,
944) -> Result<(), protobuf::Error>
945where
946    I: Iterator<Item = T>,
947    F: Fn(&mut CodedOutputStream<'_>, T) -> Result<(), protobuf::Error>,
948{
949    // This is a helper function that lets us write out a packed repeated field from an iterator of values.
950    // `CodedOutputStream` has similar functions to handle this, but they require a slice of values, which would mean we
951    // need to either allocate a new vector each time to hold the values, or thread through two additional vectors (one
952    // for `i32`, one for `u32`) to reuse the allocation... both of which are not great options.
953    //
954    // We've simply opted to pass through a _single_ vector that we can reuse, and write the packed values directly to
955    // that, almost identically to how `CodedOutputStream::write_repeated_packed_*` methods would do it.
956
957    scratch_buf.clear();
958
959    {
960        let mut packed_output_stream = CodedOutputStream::vec(scratch_buf);
961        for value in values {
962            writer(&mut packed_output_stream, value)?;
963        }
964        packed_output_stream.flush()?;
965    }
966
967    let data_size = get_message_size_from_buffer(scratch_buf)?;
968
969    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
970    output_stream.write_raw_varint32(data_size)?;
971    output_stream.write_raw_bytes(scratch_buf)
972}
973
974#[cfg(test)]
975mod tests {
976    use std::time::Duration;
977
978    use protobuf::CodedOutputStream;
979    use saluki_common::iter::ReusableDeduplicator;
980    use saluki_context::tags::SharedTagSet;
981    use saluki_core::data_model::event::metric::Metric;
982
983    use super::{encode_sketch_metric, MetricsEndpoint, MetricsEndpointEncoder};
984    use crate::common::datadog::request_builder::EndpointEncoder as _;
985
986    #[test]
987    fn histogram_vs_sketch_identical_payload() {
988        // For the same exact set of points, we should be able to construct either a histogram or distribution from
989        // those points, and when encoded as a sketch payload, end up with the same exact payload.
990        //
991        // They should be identical because the goal is that we convert histograms into sketches in the same way we
992        // would have originally constructed a sketch based on the same samples.
993        let samples = &[1.0, 2.0, 3.0, 4.0, 5.0];
994        let histogram = Metric::histogram("simple_samples", samples);
995        let distribution = Metric::distribution("simple_samples", samples);
996        let host_tags = SharedTagSet::default();
997
998        let mut buf1 = Vec::new();
999        let mut buf2 = Vec::new();
1000        let mut tags_deduplicator = ReusableDeduplicator::new();
1001
1002        let mut histogram_payload = Vec::new();
1003        {
1004            let mut histogram_writer = CodedOutputStream::vec(&mut histogram_payload);
1005            encode_sketch_metric(
1006                &histogram,
1007                &host_tags,
1008                &mut histogram_writer,
1009                &mut buf1,
1010                &mut buf2,
1011                &mut tags_deduplicator,
1012            )
1013            .expect("Failed to encode histogram as sketch");
1014        }
1015
1016        let mut distribution_payload = Vec::new();
1017        {
1018            let mut distribution_writer = CodedOutputStream::vec(&mut distribution_payload);
1019            encode_sketch_metric(
1020                &distribution,
1021                &host_tags,
1022                &mut distribution_writer,
1023                &mut buf1,
1024                &mut buf2,
1025                &mut tags_deduplicator,
1026            )
1027            .expect("Failed to encode distribution as sketch");
1028        }
1029
1030        assert_eq!(histogram_payload, distribution_payload);
1031    }
1032
1033    #[test]
1034    fn input_valid() {
1035        // Our encoder should always consider series metrics valid when set to the series endpoint, and similarly for
1036        // sketch metrics when set to the sketches endpoint.
1037        let counter = Metric::counter("counter", 1.0);
1038        let rate = Metric::rate("rate", 1.0, Duration::from_secs(1));
1039        let gauge = Metric::gauge("gauge", 1.0);
1040        let set = Metric::set("set", "foo");
1041        let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1042        let distribution = Metric::distribution("distribution", [1.0, 2.0, 3.0]);
1043
1044        let series_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Series);
1045        let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1046
1047        assert!(series_endpoint.is_valid_input(&counter));
1048        assert!(series_endpoint.is_valid_input(&rate));
1049        assert!(series_endpoint.is_valid_input(&gauge));
1050        assert!(series_endpoint.is_valid_input(&set));
1051        assert!(!series_endpoint.is_valid_input(&histogram));
1052        assert!(!series_endpoint.is_valid_input(&distribution));
1053
1054        assert!(!sketches_endpoint.is_valid_input(&counter));
1055        assert!(!sketches_endpoint.is_valid_input(&rate));
1056        assert!(!sketches_endpoint.is_valid_input(&gauge));
1057        assert!(!sketches_endpoint.is_valid_input(&set));
1058        assert!(sketches_endpoint.is_valid_input(&histogram));
1059        assert!(sketches_endpoint.is_valid_input(&distribution));
1060    }
1061}