1use std::{fmt, num::NonZeroU64, time::Duration};
2
3use async_trait::async_trait;
4use datadog_protos::metrics as proto;
5use ddsketch::DDSketch;
6use facet::Facet;
7use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
8use protobuf::{rt::WireType, CodedOutputStream, Enum as _};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use saluki_common::{iter::ReusableDeduplicator, task::HandleExt as _};
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::{SharedTagSet, Tag};
13use saluki_core::{
14 components::{encoders::*, ComponentContext},
15 data_model::{
16 event::{
17 metric::{Metric, MetricOrigin, MetricValues},
18 EventType,
19 },
20 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
21 },
22 observability::ComponentMetricsExt as _,
23 topology::{EventsBuffer, PayloadsBuffer},
24};
25use saluki_error::{generic_error, ErrorContext as _, GenericError};
26use saluki_io::compression::CompressionScheme;
27use saluki_metrics::MetricsBuilder;
28use serde::Deserialize;
29use serde_json::{Map as JsonMap, Number as JsonNumber, Value as JsonValue};
30use tokio::{pin, select, sync::mpsc, time::sleep};
31use tracing::{debug, error, warn};
32
33use crate::common::datadog::{
34 clamp_payload_limits,
35 io::RB_BUFFER_CHUNK_SIZE,
36 request_builder::{EndpointEncoder, RequestBuilder},
37 telemetry::ComponentTelemetry,
38 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT, DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT, METRICS_SERIES_V1_PATH,
39 METRICS_SERIES_V2_PATH, METRICS_SKETCHES_PATH,
40};
41
42const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; const SERIES_V1_COMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT;
47const SERIES_V1_UNCOMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT;
48
49const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
50
51const RESOURCES_TYPE_FIELD_NUMBER: u32 = 1;
55const RESOURCES_NAME_FIELD_NUMBER: u32 = 2;
56
57const METADATA_ORIGIN_FIELD_NUMBER: u32 = 1;
58
59const ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER: u32 = 4;
60const ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER: u32 = 5;
61const ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER: u32 = 6;
62
63const METRIC_POINT_VALUE_FIELD_NUMBER: u32 = 1;
64const METRIC_POINT_TIMESTAMP_FIELD_NUMBER: u32 = 2;
65
66const DOGSKETCH_TS_FIELD_NUMBER: u32 = 1;
67const DOGSKETCH_CNT_FIELD_NUMBER: u32 = 2;
68const DOGSKETCH_MIN_FIELD_NUMBER: u32 = 3;
69const DOGSKETCH_MAX_FIELD_NUMBER: u32 = 4;
70const DOGSKETCH_AVG_FIELD_NUMBER: u32 = 5;
71const DOGSKETCH_SUM_FIELD_NUMBER: u32 = 6;
72const DOGSKETCH_K_FIELD_NUMBER: u32 = 7;
73const DOGSKETCH_N_FIELD_NUMBER: u32 = 8;
74
75const SERIES_RESOURCES_FIELD_NUMBER: u32 = 1;
76const SERIES_METRIC_FIELD_NUMBER: u32 = 2;
77const SERIES_TAGS_FIELD_NUMBER: u32 = 3;
78const SERIES_POINTS_FIELD_NUMBER: u32 = 4;
79const SERIES_TYPE_FIELD_NUMBER: u32 = 5;
80const SERIES_UNIT_FIELD_NUMBER: u32 = 6;
81const SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 7;
82const SERIES_INTERVAL_FIELD_NUMBER: u32 = 8;
83const SERIES_METADATA_FIELD_NUMBER: u32 = 9;
84
85const SKETCH_METRIC_FIELD_NUMBER: u32 = 1;
86const SKETCH_HOST_FIELD_NUMBER: u32 = 2;
87const SKETCH_TAGS_FIELD_NUMBER: u32 = 4;
88const SKETCH_DOGSKETCHES_FIELD_NUMBER: u32 = 7;
89const SKETCH_METADATA_FIELD_NUMBER: u32 = 8;
90
91static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
92static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
93
94const SERIES_V1_PAYLOAD_PREFIX: &[u8] = b"{\"series\":[";
96const SERIES_V1_PAYLOAD_SUFFIX: &[u8] = b"]}";
97const SERIES_V1_INPUT_SEPARATOR: &[u8] = b",";
98
99const fn default_max_metrics_per_payload() -> usize {
100 10_000
101}
102
103const fn default_max_payload_size() -> usize {
104 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
105}
106
107const fn default_max_uncompressed_payload_size() -> usize {
108 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
109}
110
111const fn default_max_series_payload_size() -> usize {
112 SERIES_V2_COMPRESSED_SIZE_LIMIT
113}
114
115const fn default_max_series_uncompressed_payload_size() -> usize {
116 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT
117}
118
119const fn default_max_series_points_per_payload() -> usize {
120 10_000
121}
122
123const fn default_flush_timeout_secs() -> u64 {
124 2
125}
126
127fn default_serializer_compressor_kind() -> String {
128 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
129}
130
131const fn default_zstd_compressor_level() -> i32 {
132 3
133}
134
135const fn default_use_v2_api_series() -> bool {
136 true
137}
138
139const fn default_log_payloads() -> bool {
140 false
141}
142
143#[derive(Clone, Deserialize, Facet)]
147#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
148#[allow(dead_code)]
149pub struct DatadogMetricsConfiguration {
150 #[serde(
156 rename = "serializer_max_metrics_per_payload",
157 default = "default_max_metrics_per_payload"
158 )]
159 max_metrics_per_payload: usize,
160
161 #[serde(rename = "serializer_max_payload_size", default = "default_max_payload_size")]
171 max_payload_size: usize,
172
173 #[serde(
183 rename = "serializer_max_uncompressed_payload_size",
184 default = "default_max_uncompressed_payload_size"
185 )]
186 max_uncompressed_payload_size: usize,
187
188 #[serde(
198 rename = "serializer_max_series_payload_size",
199 default = "default_max_series_payload_size"
200 )]
201 max_series_payload_size: usize,
202
203 #[serde(
213 rename = "serializer_max_series_uncompressed_payload_size",
214 default = "default_max_series_uncompressed_payload_size"
215 )]
216 max_series_uncompressed_payload_size: usize,
217
218 #[serde(
227 rename = "serializer_max_series_points_per_payload",
228 default = "default_max_series_points_per_payload"
229 )]
230 max_series_points_per_payload: usize,
231
232 #[serde(default = "default_flush_timeout_secs")]
242 flush_timeout_secs: u64,
243
244 #[serde(
248 rename = "serializer_compressor_kind",
249 default = "default_serializer_compressor_kind"
250 )]
251 compressor_kind: String,
252
253 #[serde(
257 rename = "serializer_zstd_compressor_level",
258 default = "default_zstd_compressor_level"
259 )]
260 zstd_compressor_level: i32,
261
262 #[serde(default = "default_use_v2_api_series")]
270 use_v2_api_series: bool,
271
272 #[serde(default = "default_log_payloads")]
278 log_payloads: bool,
279
280 #[serde(default, skip)]
282 #[facet(opaque)]
283 additional_tags: Option<SharedTagSet>,
284}
285
286impl DatadogMetricsConfiguration {
287 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
289 Ok(config.as_typed()?)
290 }
291
292 pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
294 self.additional_tags = Some(additional_tags);
296 self
297 }
298}
299
300#[async_trait]
301impl EncoderBuilder for DatadogMetricsConfiguration {
302 fn input_event_type(&self) -> EventType {
303 EventType::Metric
304 }
305
306 fn output_payload_type(&self) -> PayloadType {
307 PayloadType::Http
308 }
309
310 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
311 let metrics_builder = MetricsBuilder::from_component_context(&context);
312 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
313 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
314
315 let series_endpoint = if self.use_v2_api_series {
317 MetricsEndpoint::SeriesV2
318 } else {
319 MetricsEndpoint::SeriesV1
320 };
321 let mut series_encoder = MetricsEndpointEncoder::from_endpoint(series_endpoint);
322 let mut sketches_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
323
324 if let Some(additional_tags) = self.additional_tags.as_ref() {
325 series_encoder = series_encoder.with_additional_tags(additional_tags.clone());
326 sketches_encoder = sketches_encoder.with_additional_tags(additional_tags.clone());
327 }
328
329 let mut series_rb = RequestBuilder::new(series_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
330 series_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
331 series_rb.with_max_data_points_per_payload(self.max_series_points_per_payload);
332
333 let generic_payload_limits = clamp_payload_limits(
334 self.max_uncompressed_payload_size,
335 self.max_payload_size,
336 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
337 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
338 );
339 let (series_uncompressed_limit, series_compressed_limit) = if series_endpoint == MetricsEndpoint::SeriesV2 {
340 clamp_payload_limits(
341 self.max_series_uncompressed_payload_size,
342 self.max_series_payload_size,
343 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
344 SERIES_V2_COMPRESSED_SIZE_LIMIT,
345 )
346 } else {
347 generic_payload_limits
348 };
349 series_rb.with_len_limits(series_uncompressed_limit, series_compressed_limit)?;
350
351 let mut sketches_rb = RequestBuilder::new(sketches_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
352 sketches_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
353 let (sketches_uncompressed_limit, sketches_compressed_limit) = generic_payload_limits;
354 sketches_rb.with_len_limits(sketches_uncompressed_limit, sketches_compressed_limit)?;
355
356 let flush_timeout = match self.flush_timeout_secs {
357 0 => Duration::from_millis(10),
360 secs => Duration::from_secs(secs),
361 };
362
363 Ok(Box::new(DatadogMetrics {
364 series_rb,
365 sketches_rb,
366 telemetry,
367 flush_timeout,
368 log_payloads: self.log_payloads,
369 }))
370 }
371}
372
373impl MemoryBounds for DatadogMetricsConfiguration {
374 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
375 builder
383 .minimum()
384 .with_single_value::<DatadogMetrics>("component struct")
385 .with_array::<EventsBuffer>("request builder events channel", 8)
386 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
387
388 builder
389 .firm()
390 .with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
393 .with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
394 }
395}
396
397pub struct DatadogMetrics {
398 series_rb: RequestBuilder<MetricsEndpointEncoder>,
399 sketches_rb: RequestBuilder<MetricsEndpointEncoder>,
400 telemetry: ComponentTelemetry,
401 flush_timeout: Duration,
402 log_payloads: bool,
403}
404
405#[async_trait]
406impl Encoder for DatadogMetrics {
407 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
408 let Self {
409 series_rb,
410 sketches_rb,
411 telemetry,
412 flush_timeout,
413 log_payloads,
414 } = *self;
415
416 let mut health = context.take_health_handle();
417
418 let (events_tx, events_rx) = mpsc::channel(8);
420 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
421 let request_builder_fut = run_request_builder(
422 series_rb,
423 sketches_rb,
424 telemetry,
425 events_rx,
426 payloads_tx,
427 flush_timeout,
428 log_payloads,
429 );
430 let request_builder_handle = context
431 .topology_context()
432 .global_thread_pool()
433 .spawn_traced_named("dd-metrics-request-builder", request_builder_fut);
434
435 health.mark_ready();
436 debug!("Datadog Metrics encoder started.");
437
438 loop {
439 select! {
440 biased;
441
442 _ = health.live() => continue,
443 maybe_payload = payloads_rx.recv() => match maybe_payload {
444 Some(payload) => {
445 if let Err(e) = context.dispatcher().dispatch(payload).await {
446 error!("Failed to dispatch payload: {}", e);
447 }
448 }
449 None => break,
450 },
451 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
452 Some(event_buffer) => events_tx.send(event_buffer).await
453 .error_context("Failed to send event buffer to request builder task.")?,
454 None => break,
455 },
456 }
457 }
458
459 drop(events_tx);
461
462 while let Some(payload) = payloads_rx.recv().await {
464 if let Err(e) = context.dispatcher().dispatch(payload).await {
465 error!("Failed to dispatch payload: {}", e);
466 }
467 }
468
469 match request_builder_handle.await {
471 Ok(Ok(())) => debug!("Request builder task stopped."),
472 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
473 Err(e) => error!(error = %e, "Request builder task panicked."),
474 }
475
476 debug!("Datadog Metrics encoder stopped.");
477
478 Ok(())
479 }
480}
481
482async fn run_request_builder(
483 mut series_request_builder: RequestBuilder<MetricsEndpointEncoder>,
484 mut sketches_request_builder: RequestBuilder<MetricsEndpointEncoder>, telemetry: ComponentTelemetry,
485 mut events_rx: mpsc::Receiver<EventsBuffer>, payloads_tx: mpsc::Sender<PayloadsBuffer>, flush_timeout: Duration,
486 log_payloads: bool,
487) -> Result<(), GenericError> {
488 let mut pending_flush = false;
489 let pending_flush_timeout = sleep(flush_timeout);
490 pin!(pending_flush_timeout);
491
492 loop {
493 select! {
494 Some(event_buffer) = events_rx.recv() => {
495 for event in event_buffer {
496 let metric = match event.try_into_metric() {
497 Some(metric) => metric,
498 None => continue,
499 };
500
501 if log_payloads {
502 log_metric_payload(&metric);
503 }
504
505 let request_builder = match metric.values() {
509 MetricValues::Counter(..)
510 | MetricValues::Rate(..)
511 | MetricValues::Gauge(..)
512 | MetricValues::Set(..) => &mut series_request_builder,
513 MetricValues::Histogram(..) | MetricValues::Distribution(..) => &mut sketches_request_builder,
514 };
515
516 let metric_to_retry = match request_builder.encode(metric).await {
520 Ok(None) => continue,
521 Ok(Some(metric)) => metric,
522 Err(e) => {
523 error!(error = %e, "Failed to encode metric.");
524 telemetry.events_dropped_encoder().increment(1);
525 continue;
526 }
527 };
528
529 let maybe_requests = request_builder.flush().await;
530 if maybe_requests.is_empty() {
531 panic!("builder told us to flush, but gave us nothing");
532 }
533
534 for maybe_request in maybe_requests {
535 match maybe_request {
536 Ok((events, data_points, request)) => {
537 let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
538 let http_payload = HttpPayload::new(payload_meta, request);
539 let payload = Payload::Http(http_payload);
540
541 payloads_tx.send(payload).await
542 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
543 },
544
545 Err(e) => if e.is_recoverable() {
547 continue;
549 } else {
550 return Err(GenericError::from(e).context("Failed to flush request."));
551 }
552 }
553 }
554
555 if let Err(e) = request_builder.encode(metric_to_retry).await {
559 error!(error = %e, "Failed to encode metric.");
560 telemetry.events_dropped_encoder().increment(1);
561 }
562 }
563
564 debug!("Processed event buffer.");
565
566 if !pending_flush {
568 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
569 pending_flush = true;
570 }
571 },
572 _ = &mut pending_flush_timeout, if pending_flush => {
573 debug!("Flushing pending request(s).");
574
575 pending_flush = false;
576
577 let maybe_series_requests = series_request_builder.flush().await;
580 for maybe_request in maybe_series_requests {
581 match maybe_request {
582 Ok((events, data_points, request)) => {
583 let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
584 let http_payload = HttpPayload::new(payload_meta, request);
585 let payload = Payload::Http(http_payload);
586
587 payloads_tx.send(payload).await
588 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
589 },
590
591 Err(e) => if e.is_recoverable() {
593 continue;
595 } else {
596 return Err(GenericError::from(e).context("Failed to flush request."));
597 }
598 }
599 }
600
601 let maybe_sketches_requests = sketches_request_builder.flush().await;
602 for maybe_request in maybe_sketches_requests {
603 match maybe_request {
604 Ok((events, data_points, request)) => {
605 let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
606 let http_payload = HttpPayload::new(payload_meta, request);
607 let payload = Payload::Http(http_payload);
608
609 payloads_tx.send(payload).await
610 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
611 },
612
613 Err(e) => if e.is_recoverable() {
615 continue;
617 } else {
618 return Err(GenericError::from(e).context("Failed to flush request."));
619 }
620 }
621 }
622
623 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
624 },
625
626 else => break,
628 }
629 }
630
631 Ok(())
632}
633
634fn log_metric_payload(metric: &Metric) {
635 match metric.values() {
636 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
637 debug!(?metric, "Flushing series metric.")
638 }
639 MetricValues::Histogram(..) | MetricValues::Distribution(..) => {
640 debug!(?metric, "Flushing sketch metric.")
641 }
642 }
643}
644
645#[derive(Clone, Copy, Debug, Eq, PartialEq)]
647enum MetricsEndpoint {
648 SeriesV1,
652
653 SeriesV2,
657
658 Sketches,
662}
663
664#[derive(Debug)]
666pub enum MetricsEncodeError {
667 Protobuf(protobuf::Error),
669
670 Json(serde_json::Error),
672}
673
674impl fmt::Display for MetricsEncodeError {
675 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
676 match self {
677 Self::Protobuf(e) => write!(f, "protobuf encode error: {}", e),
678 Self::Json(e) => write!(f, "json encode error: {}", e),
679 }
680 }
681}
682
683impl std::error::Error for MetricsEncodeError {
684 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
685 match self {
686 Self::Protobuf(e) => Some(e),
687 Self::Json(e) => Some(e),
688 }
689 }
690}
691
692impl From<protobuf::Error> for MetricsEncodeError {
693 fn from(value: protobuf::Error) -> Self {
694 Self::Protobuf(value)
695 }
696}
697
698impl From<serde_json::Error> for MetricsEncodeError {
699 fn from(value: serde_json::Error) -> Self {
700 Self::Json(value)
701 }
702}
703
704#[derive(Debug)]
705struct MetricsEndpointEncoder {
706 endpoint: MetricsEndpoint,
707 primary_scratch_buf: Vec<u8>,
708 secondary_scratch_buf: Vec<u8>,
709 packed_scratch_buf: Vec<u8>,
710 additional_tags: SharedTagSet,
711 tags_deduplicator: ReusableDeduplicator<Tag>,
712}
713
714impl MetricsEndpointEncoder {
715 pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self {
717 Self {
718 endpoint,
719 primary_scratch_buf: Vec::new(),
720 secondary_scratch_buf: Vec::new(),
721 packed_scratch_buf: Vec::new(),
722 additional_tags: SharedTagSet::default(),
723 tags_deduplicator: ReusableDeduplicator::new(),
724 }
725 }
726
727 pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
733 self.additional_tags = additional_tags;
734 self
735 }
736}
737
738impl EndpointEncoder for MetricsEndpointEncoder {
739 type Input = Metric;
740 type EncodeError = MetricsEncodeError;
741
742 fn encoder_name() -> &'static str {
743 "metrics"
744 }
745
746 fn compressed_size_limit(&self) -> usize {
747 match self.endpoint {
748 MetricsEndpoint::SeriesV1 => SERIES_V1_COMPRESSED_SIZE_LIMIT,
749 MetricsEndpoint::SeriesV2 => SERIES_V2_COMPRESSED_SIZE_LIMIT,
750 MetricsEndpoint::Sketches => DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
751 }
752 }
753
754 fn uncompressed_size_limit(&self) -> usize {
755 match self.endpoint {
756 MetricsEndpoint::SeriesV1 => SERIES_V1_UNCOMPRESSED_SIZE_LIMIT,
757 MetricsEndpoint::SeriesV2 => SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
758 MetricsEndpoint::Sketches => DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
759 }
760 }
761
762 fn input_data_point_count(&self, input: &Self::Input) -> usize {
763 input.values().len()
764 }
765
766 fn is_valid_input(&self, input: &Self::Input) -> bool {
767 let is_series_input = matches!(
768 input.values(),
769 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..)
770 );
771
772 match self.endpoint {
773 MetricsEndpoint::SeriesV1 | MetricsEndpoint::SeriesV2 => is_series_input,
774 MetricsEndpoint::Sketches => !is_series_input,
775 }
776 }
777
778 fn get_payload_prefix(&self) -> Option<&'static [u8]> {
779 match self.endpoint {
780 MetricsEndpoint::SeriesV1 => Some(SERIES_V1_PAYLOAD_PREFIX),
781 _ => None,
782 }
783 }
784
785 fn get_payload_suffix(&self) -> Option<&'static [u8]> {
786 match self.endpoint {
787 MetricsEndpoint::SeriesV1 => Some(SERIES_V1_PAYLOAD_SUFFIX),
788 _ => None,
789 }
790 }
791
792 fn get_input_separator(&self) -> Option<&'static [u8]> {
793 match self.endpoint {
794 MetricsEndpoint::SeriesV1 => Some(SERIES_V1_INPUT_SEPARATOR),
795 _ => None,
796 }
797 }
798
799 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
800 match self.endpoint {
801 MetricsEndpoint::SeriesV1 => {
802 encode_series_v1_metric(input, &self.additional_tags, buffer, &mut self.tags_deduplicator)?;
803 Ok(())
804 }
805 MetricsEndpoint::SeriesV2 | MetricsEndpoint::Sketches => {
806 encode_single_metric(
827 input,
828 &self.additional_tags,
829 buffer,
830 &mut self.primary_scratch_buf,
831 &mut self.secondary_scratch_buf,
832 &mut self.packed_scratch_buf,
833 &mut self.tags_deduplicator,
834 )?;
835 Ok(())
836 }
837 }
838 }
839
840 fn endpoint_uri(&self) -> Uri {
841 match self.endpoint {
842 MetricsEndpoint::SeriesV1 => PathAndQuery::from_static(METRICS_SERIES_V1_PATH).into(),
843 MetricsEndpoint::SeriesV2 => PathAndQuery::from_static(METRICS_SERIES_V2_PATH).into(),
844 MetricsEndpoint::Sketches => PathAndQuery::from_static(METRICS_SKETCHES_PATH).into(),
845 }
846 }
847
848 fn endpoint_method(&self) -> Method {
849 Method::POST
851 }
852
853 fn content_type(&self) -> HeaderValue {
854 match self.endpoint {
855 MetricsEndpoint::SeriesV1 => CONTENT_TYPE_JSON.clone(),
856 MetricsEndpoint::SeriesV2 | MetricsEndpoint::Sketches => CONTENT_TYPE_PROTOBUF.clone(),
857 }
858 }
859}
860
861fn field_number_for_metric_type(metric: &Metric) -> u32 {
862 match metric.values() {
863 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => 1,
864 MetricValues::Histogram(..) | MetricValues::Distribution(..) => 1,
865 }
866}
867
868fn get_message_size(raw_msg_size: usize) -> Result<u32, protobuf::Error> {
869 const MAX_MESSAGE_SIZE: u64 = i32::MAX as u64;
870
871 if raw_msg_size as u64 > MAX_MESSAGE_SIZE {
873 return Err(std::io::Error::other("message size exceeds limit (2147483648 bytes)").into());
874 }
875
876 Ok(raw_msg_size as u32)
877}
878
879fn get_message_size_from_buffer(buf: &[u8]) -> Result<u32, protobuf::Error> {
880 get_message_size(buf.len())
881}
882
883fn encode_single_metric(
884 metric: &Metric, additional_tags: &SharedTagSet, output_buf: &mut Vec<u8>, primary_scratch_buf: &mut Vec<u8>,
885 secondary_scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
886 tags_deduplicator: &mut ReusableDeduplicator<Tag>,
887) -> Result<(), protobuf::Error> {
888 let mut output_stream = CodedOutputStream::vec(output_buf);
889 let field_number = field_number_for_metric_type(metric);
890
891 write_nested_message(&mut output_stream, primary_scratch_buf, field_number, |os| {
892 match metric.values() {
894 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
895 encode_series_v2_metric(metric, additional_tags, os, secondary_scratch_buf, tags_deduplicator)
896 }
897 MetricValues::Histogram(..) | MetricValues::Distribution(..) => encode_sketch_metric(
898 metric,
899 additional_tags,
900 os,
901 secondary_scratch_buf,
902 packed_scratch_buf,
903 tags_deduplicator,
904 ),
905 }
906 })
907}
908
909fn encode_series_v2_metric(
910 metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
911 scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
912) -> Result<(), protobuf::Error> {
913 output_stream.write_string(SERIES_METRIC_FIELD_NUMBER, metric.context().name())?;
915
916 let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
917 write_series_tags(deduplicated_tags, output_stream, scratch_buf)?;
918
919 write_resource(
921 output_stream,
922 scratch_buf,
923 "host",
924 metric.metadata().hostname().unwrap_or_default(),
925 )?;
926
927 if let Some(origin) = metric.metadata().origin() {
929 match origin {
930 MetricOrigin::SourceType(source_type) => {
931 output_stream.write_string(SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER, source_type.as_ref())?;
932 }
933 MetricOrigin::OriginMetadata {
934 product,
935 subproduct,
936 product_detail,
937 } => {
938 write_origin_metadata(
939 output_stream,
940 scratch_buf,
941 SERIES_METADATA_FIELD_NUMBER,
942 *product,
943 *subproduct,
944 *product_detail,
945 )?;
946 }
947 }
948 }
949
950 let (metric_type, points, maybe_interval) = match metric.values() {
952 MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
953 MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
954 MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
955 MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
956 _ => unreachable!("encode_series_v2_metric called with non-series metric"),
957 };
958
959 output_stream.write_enum(SERIES_TYPE_FIELD_NUMBER, metric_type.value())?;
960
961 if let Some(unit) = metric.metadata().unit() {
962 output_stream.write_string(SERIES_UNIT_FIELD_NUMBER, unit)?;
963 }
964
965 for (timestamp, value) in points {
966 let value = maybe_interval
968 .map(|interval| value / interval.as_secs_f64())
969 .unwrap_or(value);
970 let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
971
972 write_point(output_stream, scratch_buf, value, timestamp)?;
973 }
974
975 if let Some(interval) = maybe_interval {
976 output_stream.write_int64(SERIES_INTERVAL_FIELD_NUMBER, interval.as_secs() as i64)?;
977 }
978
979 Ok(())
980}
981
982fn encode_series_v1_metric(
983 metric: &Metric, additional_tags: &SharedTagSet, buffer: &mut Vec<u8>,
984 tags_deduplicator: &mut ReusableDeduplicator<Tag>,
985) -> Result<(), serde_json::Error> {
986 let mut obj = JsonMap::new();
987
988 obj.insert("metric".into(), JsonValue::String(metric.context().name().to_string()));
989
990 let (type_str, points_iter, maybe_interval) = match metric.values() {
991 MetricValues::Counter(points) => ("count", points.into_iter(), None),
992 MetricValues::Rate(points, interval) => ("rate", points.into_iter(), Some(*interval)),
993 MetricValues::Gauge(points) => ("gauge", points.into_iter(), None),
994 MetricValues::Set(points) => ("gauge", points.into_iter(), None),
995 _ => unreachable!("encode_series_v1_metric called with non-series metric"),
996 };
997
998 let mut points = Vec::new();
999 for (timestamp, value) in points_iter {
1000 let value = maybe_interval
1002 .map(|interval| value / interval.as_secs_f64())
1003 .unwrap_or(value);
1004 let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
1005
1006 let value_json = JsonNumber::from_f64(value)
1008 .map(JsonValue::Number)
1009 .unwrap_or_else(|| JsonValue::from(0));
1010 points.push(JsonValue::Array(vec![JsonValue::from(timestamp), value_json]));
1011 }
1012 obj.insert("points".into(), JsonValue::Array(points));
1013
1014 let deduplicated = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
1017 let mut tags_out = Vec::new();
1018 let mut device: Option<String> = None;
1019 for tag in deduplicated {
1020 if tag.name() == "dd.internal.resource" {
1021 continue;
1022 }
1023 if device.is_none() && tag.name() == "device" {
1024 if let Some(v) = tag.value() {
1025 device = Some(v.to_string());
1026 continue;
1027 }
1028 }
1029 tags_out.push(JsonValue::String(tag.as_str().to_string()));
1030 }
1031 obj.insert("tags".into(), JsonValue::Array(tags_out));
1032
1033 obj.insert(
1035 "host".into(),
1036 JsonValue::String(metric.metadata().hostname().unwrap_or_default().to_string()),
1037 );
1038
1039 if let Some(d) = device.filter(|s| !s.is_empty()) {
1040 obj.insert("device".into(), JsonValue::String(d));
1041 }
1042
1043 obj.insert("type".into(), JsonValue::String(type_str.into()));
1044
1045 let interval_secs = maybe_interval.map(|iv| iv.as_secs() as i64).unwrap_or(0);
1046 obj.insert("interval".into(), JsonValue::from(interval_secs));
1047
1048 if let Some(MetricOrigin::SourceType(s)) = metric.metadata().origin() {
1050 obj.insert("source_type_name".into(), JsonValue::String(s.as_ref().to_string()));
1051 }
1052
1053 if let Some(unit) = metric.metadata().unit() {
1054 if !unit.is_empty() {
1055 obj.insert("unit".into(), JsonValue::String(unit.to_string()));
1056 }
1057 }
1058
1059 serde_json::to_writer(buffer, &JsonValue::Object(obj))
1060}
1061
1062fn encode_sketch_metric(
1063 metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
1064 scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
1065) -> Result<(), protobuf::Error> {
1066 output_stream.write_string(SKETCH_METRIC_FIELD_NUMBER, metric.context().name())?;
1068
1069 let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
1070 write_sketch_tags(deduplicated_tags, output_stream, scratch_buf)?;
1071
1072 output_stream.write_string(
1074 SKETCH_HOST_FIELD_NUMBER,
1075 metric.metadata().hostname().unwrap_or_default(),
1076 )?;
1077
1078 if let Some(MetricOrigin::OriginMetadata {
1080 product,
1081 subproduct,
1082 product_detail,
1083 }) = metric.metadata().origin()
1084 {
1085 write_origin_metadata(
1086 output_stream,
1087 scratch_buf,
1088 SKETCH_METADATA_FIELD_NUMBER,
1089 *product,
1090 *subproduct,
1091 *product_detail,
1092 )?;
1093 }
1094
1095 match metric.values() {
1100 MetricValues::Distribution(sketches) => {
1101 for (timestamp, value) in sketches {
1102 write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, value)?;
1103 }
1104 }
1105 MetricValues::Histogram(points) => {
1106 for (timestamp, histogram) in points {
1107 let mut ddsketch = DDSketch::default();
1109 for sample in histogram.samples() {
1110 ddsketch.insert_n(sample.value.into_inner(), sample.weight.0 as u64);
1111 }
1112
1113 write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
1114 }
1115 }
1116 _ => unreachable!("encode_sketch_metric called with non-sketch metric"),
1117 }
1118
1119 Ok(())
1120}
1121
1122fn write_resource(
1123 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, resource_type: &str, resource_name: &str,
1124) -> Result<(), protobuf::Error> {
1125 write_nested_message(output_stream, scratch_buf, SERIES_RESOURCES_FIELD_NUMBER, |os| {
1126 os.write_string(RESOURCES_TYPE_FIELD_NUMBER, resource_type)?;
1127 os.write_string(RESOURCES_NAME_FIELD_NUMBER, resource_name)
1128 })
1129}
1130
1131fn write_origin_metadata(
1132 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, origin_product: u32,
1133 origin_category: u32, origin_service: u32,
1134) -> Result<(), protobuf::Error> {
1135 scratch_buf.clear();
1138
1139 {
1140 let mut origin_output_stream = CodedOutputStream::vec(scratch_buf);
1141 origin_output_stream.write_uint32(ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER, origin_product)?;
1142 origin_output_stream.write_uint32(ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER, origin_category)?;
1143 origin_output_stream.write_uint32(ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER, origin_service)?;
1144 origin_output_stream.flush()?;
1145 }
1146
1147 let origin_message_size = get_message_size_from_buffer(scratch_buf)?;
1152
1153 let mut metadata_preamble_buf = [0; 64];
1154 let metadata_preamble_len = {
1155 let mut metadata_output_stream = CodedOutputStream::bytes(&mut metadata_preamble_buf[..]);
1156 metadata_output_stream.write_tag(METADATA_ORIGIN_FIELD_NUMBER, WireType::LengthDelimited)?;
1157 metadata_output_stream.write_raw_varint32(origin_message_size)?;
1158 metadata_output_stream.flush()?;
1159 metadata_output_stream.total_bytes_written() as usize
1160 };
1161
1162 let metadata_message_size = get_message_size(scratch_buf.len() + metadata_preamble_len)?;
1163
1164 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1165 output_stream.write_raw_varint32(metadata_message_size)?;
1166 output_stream.write_raw_bytes(&metadata_preamble_buf[..metadata_preamble_len])?;
1167 output_stream.write_raw_bytes(scratch_buf)
1168}
1169
1170fn write_point(
1171 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, value: f64, timestamp: i64,
1172) -> Result<(), protobuf::Error> {
1173 write_nested_message(output_stream, scratch_buf, SERIES_POINTS_FIELD_NUMBER, |os| {
1174 os.write_double(METRIC_POINT_VALUE_FIELD_NUMBER, value)?;
1175 os.write_int64(METRIC_POINT_TIMESTAMP_FIELD_NUMBER, timestamp)
1176 })
1177}
1178
1179fn write_dogsketch(
1180 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
1181 timestamp: Option<NonZeroU64>, sketch: &DDSketch,
1182) -> Result<(), protobuf::Error> {
1183 if sketch.is_empty() {
1185 warn!("Attempted to write an empty sketch to sketches payload, skipping.");
1186 return Ok(());
1187 }
1188
1189 write_nested_message(output_stream, scratch_buf, SKETCH_DOGSKETCHES_FIELD_NUMBER, |os| {
1190 os.write_int64(DOGSKETCH_TS_FIELD_NUMBER, timestamp.map_or(0, |ts| ts.get() as i64))?;
1191 os.write_int64(DOGSKETCH_CNT_FIELD_NUMBER, sketch.count() as i64)?;
1192 os.write_double(DOGSKETCH_MIN_FIELD_NUMBER, sketch.min().unwrap())?;
1193 os.write_double(DOGSKETCH_MAX_FIELD_NUMBER, sketch.max().unwrap())?;
1194 os.write_double(DOGSKETCH_AVG_FIELD_NUMBER, sketch.avg().unwrap())?;
1195 os.write_double(DOGSKETCH_SUM_FIELD_NUMBER, sketch.sum().unwrap())?;
1196
1197 let bin_keys = sketch.bins().iter().map(|bin| bin.key());
1198 write_repeated_packed_from_iter(
1199 os,
1200 packed_scratch_buf,
1201 DOGSKETCH_K_FIELD_NUMBER,
1202 bin_keys,
1203 |inner_os, value| inner_os.write_sint32_no_tag(value),
1204 )?;
1205
1206 let bin_counts = sketch.bins().iter().map(|bin| bin.count());
1207 write_repeated_packed_from_iter(
1208 os,
1209 packed_scratch_buf,
1210 DOGSKETCH_N_FIELD_NUMBER,
1211 bin_counts,
1212 |inner_os, value| inner_os.write_uint32_no_tag(value),
1213 )
1214 })
1215}
1216
1217fn get_deduplicated_tags<'a>(
1218 metric: &'a Metric, additional_tags: &'a SharedTagSet, tags_deduplicator: &'a mut ReusableDeduplicator<Tag>,
1219) -> impl Iterator<Item = &'a Tag> {
1220 let chained_tags = metric
1221 .context()
1222 .tags()
1223 .into_iter()
1224 .chain(additional_tags)
1225 .chain(metric.context().origin_tags());
1226
1227 tags_deduplicator.deduplicated(chained_tags)
1228}
1229
1230fn write_tags<'a, I, F>(
1231 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, tag_encoder: F,
1232) -> Result<(), protobuf::Error>
1233where
1234 I: Iterator<Item = &'a Tag>,
1235 F: Fn(&Tag, &mut CodedOutputStream<'_>, &mut Vec<u8>) -> Result<(), protobuf::Error>,
1236{
1237 for tag in tags {
1238 tag_encoder(tag, output_stream, scratch_buf)?;
1239 }
1240
1241 Ok(())
1242}
1243
1244fn write_series_tags<'a, I>(
1245 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
1246) -> Result<(), protobuf::Error>
1247where
1248 I: Iterator<Item = &'a Tag>,
1249{
1250 write_tags(tags, output_stream, scratch_buf, |tag, os, buf| {
1251 if tag.name() == "dd.internal.resource" {
1253 if let Some((resource_type, resource_name)) = tag.value().and_then(|s| s.split_once(':')) {
1254 write_resource(os, buf, resource_type, resource_name)
1255 } else {
1256 Ok(())
1257 }
1258 } else {
1259 os.write_string(SERIES_TAGS_FIELD_NUMBER, tag.as_str())
1261 }
1262 })
1263}
1264
1265fn write_sketch_tags<'a, I>(
1266 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
1267) -> Result<(), protobuf::Error>
1268where
1269 I: Iterator<Item = &'a Tag>,
1270{
1271 write_tags(tags, output_stream, scratch_buf, |tag, os, _buf| {
1272 os.write_string(SKETCH_TAGS_FIELD_NUMBER, tag.as_str())
1274 })
1275}
1276
1277fn write_nested_message<F>(
1278 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, writer: F,
1279) -> Result<(), protobuf::Error>
1280where
1281 F: FnOnce(&mut CodedOutputStream<'_>) -> Result<(), protobuf::Error>,
1282{
1283 scratch_buf.clear();
1284
1285 {
1286 let mut nested_output_stream = CodedOutputStream::vec(scratch_buf);
1287 writer(&mut nested_output_stream)?;
1288 nested_output_stream.flush()?;
1289 }
1290
1291 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1292
1293 let nested_message_size = get_message_size_from_buffer(scratch_buf)?;
1294 output_stream.write_raw_varint32(nested_message_size)?;
1295 output_stream.write_raw_bytes(scratch_buf)
1296}
1297
1298fn write_repeated_packed_from_iter<I, T, F>(
1299 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, values: I, writer: F,
1300) -> Result<(), protobuf::Error>
1301where
1302 I: Iterator<Item = T>,
1303 F: Fn(&mut CodedOutputStream<'_>, T) -> Result<(), protobuf::Error>,
1304{
1305 scratch_buf.clear();
1314
1315 {
1316 let mut packed_output_stream = CodedOutputStream::vec(scratch_buf);
1317 for value in values {
1318 writer(&mut packed_output_stream, value)?;
1319 }
1320 packed_output_stream.flush()?;
1321 }
1322
1323 let data_size = get_message_size_from_buffer(scratch_buf)?;
1324
1325 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1326 output_stream.write_raw_varint32(data_size)?;
1327 output_stream.write_raw_bytes(scratch_buf)
1328}
1329
1330#[cfg(test)]
1331mod tests {
1332 use std::{sync::Arc, time::Duration};
1333
1334 use protobuf::CodedOutputStream;
1335 use saluki_common::iter::ReusableDeduplicator;
1336 use saluki_context::{tags::SharedTagSet, Context};
1337 use saluki_core::data_model::event::metric::{Metric, MetricMetadata, MetricOrigin, MetricValues};
1338 use serde_json::Value as JsonValue;
1339 use stringtheory::MetaString;
1340
1341 use super::{
1342 encode_series_v1_metric, encode_series_v2_metric, encode_sketch_metric, MetricsEndpoint,
1343 MetricsEndpointEncoder, SERIES_V1_INPUT_SEPARATOR, SERIES_V1_PAYLOAD_PREFIX, SERIES_V1_PAYLOAD_SUFFIX,
1344 };
1345 use crate::common::datadog::{
1346 request_builder::EndpointEncoder as _, DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
1347 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
1348 };
1349
1350 fn encode_one_v1(metric: &Metric) -> JsonValue {
1351 let mut buf = Vec::new();
1352 let host_tags = SharedTagSet::default();
1353 let mut tags_deduplicator = ReusableDeduplicator::new();
1354 encode_series_v1_metric(metric, &host_tags, &mut buf, &mut tags_deduplicator)
1355 .expect("encode_series_v1_metric should succeed");
1356 serde_json::from_slice(&buf).expect("encoder produced invalid JSON")
1357 }
1358
1359 #[test]
1360 fn histogram_vs_sketch_identical_payload() {
1361 let samples = &[1.0, 2.0, 3.0, 4.0, 5.0];
1367 let histogram = Metric::histogram("simple_samples", samples);
1368 let distribution = Metric::distribution("simple_samples", samples);
1369 let host_tags = SharedTagSet::default();
1370
1371 let mut buf1 = Vec::new();
1372 let mut buf2 = Vec::new();
1373 let mut tags_deduplicator = ReusableDeduplicator::new();
1374
1375 let mut histogram_payload = Vec::new();
1376 {
1377 let mut histogram_writer = CodedOutputStream::vec(&mut histogram_payload);
1378 encode_sketch_metric(
1379 &histogram,
1380 &host_tags,
1381 &mut histogram_writer,
1382 &mut buf1,
1383 &mut buf2,
1384 &mut tags_deduplicator,
1385 )
1386 .expect("Failed to encode histogram as sketch");
1387 }
1388
1389 let mut distribution_payload = Vec::new();
1390 {
1391 let mut distribution_writer = CodedOutputStream::vec(&mut distribution_payload);
1392 encode_sketch_metric(
1393 &distribution,
1394 &host_tags,
1395 &mut distribution_writer,
1396 &mut buf1,
1397 &mut buf2,
1398 &mut tags_deduplicator,
1399 )
1400 .expect("Failed to encode distribution as sketch");
1401 }
1402
1403 assert_eq!(histogram_payload, distribution_payload);
1404 }
1405
1406 #[test]
1407 fn input_valid() {
1408 let counter = Metric::counter("counter", 1.0);
1411 let rate = Metric::rate("rate", 1.0, Duration::from_secs(1));
1412 let gauge = Metric::gauge("gauge", 1.0);
1413 let set = Metric::set("set", "foo");
1414 let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1415 let distribution = Metric::distribution("distribution", [1.0, 2.0, 3.0]);
1416
1417 let series_v1 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV1);
1418 let series_v2 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1419 let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1420
1421 for series_endpoint in [&series_v1, &series_v2] {
1422 assert!(series_endpoint.is_valid_input(&counter));
1423 assert!(series_endpoint.is_valid_input(&rate));
1424 assert!(series_endpoint.is_valid_input(&gauge));
1425 assert!(series_endpoint.is_valid_input(&set));
1426 assert!(!series_endpoint.is_valid_input(&histogram));
1427 assert!(!series_endpoint.is_valid_input(&distribution));
1428 }
1429
1430 assert!(!sketches_endpoint.is_valid_input(&counter));
1431 assert!(!sketches_endpoint.is_valid_input(&rate));
1432 assert!(!sketches_endpoint.is_valid_input(&gauge));
1433 assert!(!sketches_endpoint.is_valid_input(&set));
1434 assert!(sketches_endpoint.is_valid_input(&histogram));
1435 assert!(sketches_endpoint.is_valid_input(&distribution));
1436 }
1437
1438 #[test]
1439 fn input_data_point_count_tracks_metric_values() {
1440 let counter = Metric::counter("counter", [(123, 1.0), (124, 2.0)]);
1441 let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1442
1443 let series_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1444 let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1445
1446 assert_eq!(series_endpoint.input_data_point_count(&counter), 2);
1447 assert_eq!(sketches_endpoint.input_data_point_count(&histogram), 1);
1448 }
1449
1450 #[test]
1451 fn series_metric_unit_encoded() {
1452 let context = Context::from_static_parts("my.timer.avg", &[]);
1458 let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1459 let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1460
1461 let host_tags = SharedTagSet::default();
1462 let mut scratch_buf = Vec::new();
1463 let mut tags_deduplicator = ReusableDeduplicator::new();
1464
1465 let mut payload = Vec::new();
1466 {
1467 let mut writer = CodedOutputStream::vec(&mut payload);
1468 encode_series_v2_metric(
1469 &gauge,
1470 &host_tags,
1471 &mut writer,
1472 &mut scratch_buf,
1473 &mut tags_deduplicator,
1474 )
1475 .expect("Failed to encode gauge as series metric");
1476 writer.flush().expect("Failed to flush");
1477 }
1478
1479 let expected_tag: u8 = (6 << 3) | 2; let expected_value = b"millisecond";
1483
1484 let tag_pos = payload
1485 .windows(1 + 1 + expected_value.len())
1486 .position(|w| w[0] == expected_tag && w[1] == expected_value.len() as u8 && &w[2..] == expected_value);
1487
1488 assert!(
1489 tag_pos.is_some(),
1490 "series payload should contain unit field (field 6 = 'millisecond'), got bytes: {:?}",
1491 payload
1492 );
1493 }
1494
1495 #[test]
1496 fn series_v1_basic_payload_shape() {
1497 let counter = Metric::counter("my.count", 5.0);
1500 let counter_json = encode_one_v1(&counter);
1501 assert_eq!(counter_json["metric"], "my.count");
1502 assert_eq!(counter_json["type"], "count");
1503 assert_eq!(counter_json["interval"], 0);
1504 assert_eq!(counter_json["host"], "");
1505 assert_eq!(counter_json["tags"], JsonValue::Array(vec![]));
1506 let points = counter_json["points"].as_array().expect("points is array");
1507 assert_eq!(points.len(), 1);
1508 assert_eq!(points[0][0], 0);
1509 assert_eq!(points[0][1], 5.0);
1510 assert!(counter_json.get("unit").is_none());
1512 assert!(counter_json.get("source_type_name").is_none());
1513 assert!(counter_json.get("device").is_none());
1514
1515 let rate = Metric::rate("my.rate", 30.0, Duration::from_secs(10));
1516 let rate_json = encode_one_v1(&rate);
1517 assert_eq!(rate_json["type"], "rate");
1518 assert_eq!(rate_json["interval"], 10);
1519 let rate_points = rate_json["points"].as_array().expect("rate points is array");
1521 assert_eq!(rate_points[0][1], 3.0);
1522
1523 let gauge = Metric::gauge("my.gauge", 42.0);
1524 let gauge_json = encode_one_v1(&gauge);
1525 assert_eq!(gauge_json["type"], "gauge");
1526
1527 let set = Metric::set("my.set", "alpha");
1529 let set_json = encode_one_v1(&set);
1530 assert_eq!(set_json["type"], "gauge");
1531 let set_points = set_json["points"].as_array().expect("set points is array");
1532 assert_eq!(set_points[0][1], 1.0);
1533 }
1534
1535 #[test]
1536 fn series_v1_unit_and_hostname_emitted() {
1537 let context = Context::from_static_parts("my.timer.avg", &[]);
1538 let metadata = MetricMetadata::default()
1539 .with_unit(MetaString::from_static("millisecond"))
1540 .with_hostname(Some(Arc::from("host-1")));
1541 let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1542
1543 let json = encode_one_v1(&gauge);
1544 assert_eq!(json["unit"], "millisecond");
1545 assert_eq!(json["host"], "host-1");
1546 }
1547
1548 #[test]
1549 fn series_v1_device_tag_extraction() {
1550 let context = Context::from_static_parts("my.metric", &["device:eth0", "env:prod"]);
1552 let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), MetricMetadata::default());
1553
1554 let json = encode_one_v1(&counter);
1555 assert_eq!(json["device"], "eth0");
1556 let tags = json["tags"].as_array().expect("tags is array");
1557 let tag_strs: Vec<&str> = tags.iter().filter_map(|v| v.as_str()).collect();
1558 assert!(
1559 !tag_strs.iter().any(|t| t.starts_with("device:")),
1560 "device tag must be removed: {:?}",
1561 tag_strs
1562 );
1563 assert!(tag_strs.contains(&"env:prod"));
1564 }
1565
1566 #[test]
1567 fn series_v1_source_type_name_from_source_type_origin() {
1568 let context = Context::from_static_parts("my.metric", &[]);
1569 let metadata = MetricMetadata::default().with_source_type(Some(Arc::from("integration_x")));
1570 let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), metadata);
1571
1572 let json = encode_one_v1(&counter);
1573 assert_eq!(json["source_type_name"], "integration_x");
1574 }
1575
1576 #[test]
1577 fn series_v1_origin_metadata_dropped() {
1578 let context = Context::from_static_parts("my.metric", &[]);
1580 let metadata = MetricMetadata::default().with_origin(Some(MetricOrigin::dogstatsd()));
1581 let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), metadata);
1582
1583 let json = encode_one_v1(&counter);
1584 assert!(json.get("source_type_name").is_none());
1585 }
1586
1587 #[test]
1588 fn series_v1_dd_internal_resource_dropped() {
1589 let context = Context::from_static_parts("my.metric", &["dd.internal.resource:host:foo", "env:prod"]);
1591 let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), MetricMetadata::default());
1592
1593 let json = encode_one_v1(&counter);
1594 let tags = json["tags"].as_array().expect("tags is array");
1595 let tag_strs: Vec<&str> = tags.iter().filter_map(|v| v.as_str()).collect();
1596 assert!(
1597 !tag_strs.iter().any(|t| t.starts_with("dd.internal.resource:")),
1598 "dd.internal.resource tag must be dropped: {:?}",
1599 tag_strs
1600 );
1601 assert!(tag_strs.contains(&"env:prod"));
1602 }
1603
1604 #[test]
1605 fn series_v1_endpoint_routing() {
1606 let encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV1);
1608 assert_eq!(encoder.endpoint_uri().path(), "/api/v1/series");
1609 assert_eq!(encoder.content_type(), "application/json");
1610 assert_eq!(encoder.get_payload_prefix(), Some(SERIES_V1_PAYLOAD_PREFIX));
1611 assert_eq!(encoder.get_payload_suffix(), Some(SERIES_V1_PAYLOAD_SUFFIX));
1612 assert_eq!(encoder.get_input_separator(), Some(SERIES_V1_INPUT_SEPARATOR));
1613 assert_eq!(
1614 encoder.compressed_size_limit(),
1615 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
1616 );
1617 assert_eq!(
1618 encoder.uncompressed_size_limit(),
1619 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
1620 );
1621
1622 let sketches = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1624 assert_eq!(
1625 sketches.compressed_size_limit(),
1626 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
1627 );
1628 assert_eq!(
1629 sketches.uncompressed_size_limit(),
1630 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
1631 );
1632
1633 let v2 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1635 assert_eq!(v2.endpoint_uri().path(), "/api/v2/series");
1636 assert_eq!(v2.content_type(), "application/x-protobuf");
1637 assert!(v2.get_payload_prefix().is_none());
1638 }
1639}
1640
1641#[cfg(test)]
1642mod config_smoke {
1643 use datadog_agent_config_testing::config_registry::structs;
1644 use datadog_agent_config_testing::run_config_smoke_tests;
1645 use serde_json::json;
1646
1647 use super::DatadogMetricsConfiguration;
1648 use crate::config::{DatadogRemapper, KEY_ALIASES};
1649
1650 #[tokio::test]
1651 async fn smoke_test() {
1652 run_config_smoke_tests(
1653 structs::DATADOG_METRICS_CONFIGURATION,
1654 &[],
1655 json!({}),
1656 |cfg| {
1657 cfg.as_typed::<DatadogMetricsConfiguration>()
1658 .expect("DatadogMetricsConfiguration should deserialize")
1659 },
1660 KEY_ALIASES,
1661 DatadogRemapper::new,
1662 )
1663 .await
1664 }
1665}
1666
1667#[cfg(test)]
1668mod use_v2_api_series_default {
1669 use saluki_config::ConfigurationLoader;
1670 use serde_json::json;
1671
1672 use super::{DatadogMetricsConfiguration, SERIES_V2_COMPRESSED_SIZE_LIMIT, SERIES_V2_UNCOMPRESSED_SIZE_LIMIT};
1673 use crate::{common::datadog::clamp_payload_limits, config::KEY_ALIASES};
1674
1675 #[tokio::test]
1679 async fn defaults_to_true_when_absent() {
1680 let cfg = ConfigurationLoader::default()
1681 .with_key_aliases(KEY_ALIASES)
1682 .add_providers([figment::providers::Serialized::defaults(json!({}))])
1683 .into_generic()
1684 .await
1685 .expect("config should load");
1686 let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1687 assert!(parsed.use_v2_api_series);
1688 }
1689
1690 #[tokio::test]
1691 async fn deserializes_payload_limit_keys() {
1692 let cfg = ConfigurationLoader::default()
1693 .with_key_aliases(KEY_ALIASES)
1694 .add_providers([figment::providers::Serialized::defaults(json!({
1695 "serializer_max_payload_size": 4321,
1696 "serializer_max_uncompressed_payload_size": 8765,
1697 "serializer_max_series_payload_size": 1234,
1698 "serializer_max_series_uncompressed_payload_size": 5678,
1699 }))])
1700 .into_generic()
1701 .await
1702 .expect("config should load");
1703 let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1704
1705 assert_eq!(parsed.max_payload_size, 4321);
1706 assert_eq!(parsed.max_uncompressed_payload_size, 8765);
1707 assert_eq!(parsed.max_series_payload_size, 1234);
1708 assert_eq!(parsed.max_series_uncompressed_payload_size, 5678);
1709 }
1710
1711 #[tokio::test]
1712 async fn deserializes_max_series_points_per_payload() {
1713 let cfg = ConfigurationLoader::default()
1715 .with_key_aliases(KEY_ALIASES)
1716 .add_providers([figment::providers::Serialized::defaults(json!({}))])
1717 .into_generic()
1718 .await
1719 .expect("config should load");
1720 let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1721 assert_eq!(parsed.max_series_points_per_payload, 10_000);
1722
1723 let cfg = ConfigurationLoader::default()
1725 .with_key_aliases(KEY_ALIASES)
1726 .add_providers([figment::providers::Serialized::defaults(json!({
1727 "serializer_max_series_points_per_payload": 500,
1728 }))])
1729 .into_generic()
1730 .await
1731 .expect("config should load");
1732 let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1733 assert_eq!(parsed.max_series_points_per_payload, 500);
1734 }
1735
1736 #[test]
1737 fn clamps_series_payload_limit_keys_to_api_limits() {
1738 let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
1739 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT + 1,
1740 SERIES_V2_COMPRESSED_SIZE_LIMIT + 1,
1741 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
1742 SERIES_V2_COMPRESSED_SIZE_LIMIT,
1743 );
1744 assert_eq!(uncompressed_limit, SERIES_V2_UNCOMPRESSED_SIZE_LIMIT);
1745 assert_eq!(compressed_limit, SERIES_V2_COMPRESSED_SIZE_LIMIT);
1746
1747 let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
1748 5678,
1749 1234,
1750 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
1751 SERIES_V2_COMPRESSED_SIZE_LIMIT,
1752 );
1753 assert_eq!(uncompressed_limit, 5678);
1754 assert_eq!(compressed_limit, 1234);
1755 }
1756}