1use std::{fmt, num::NonZeroU64, time::Duration};
2
3use async_trait::async_trait;
4use datadog_protos::metrics as proto;
5use ddsketch::DDSketch;
6use facet::Facet;
7use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
8use protobuf::{rt::WireType, CodedOutputStream, Enum as _};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use saluki_common::{iter::ReusableDeduplicator, task::HandleExt as _};
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::{SharedTagSet, Tag};
13use saluki_core::{
14 components::{encoders::*, ComponentContext},
15 data_model::{
16 event::{
17 metric::{Metric, MetricOrigin, MetricValues},
18 EventType,
19 },
20 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
21 },
22 observability::ComponentMetricsExt as _,
23 topology::{EventsBuffer, PayloadsBuffer},
24};
25use saluki_error::{generic_error, ErrorContext as _, GenericError};
26use saluki_io::compression::CompressionScheme;
27use saluki_metrics::MetricsBuilder;
28use serde::Deserialize;
29use serde_json::{Map as JsonMap, Number as JsonNumber, Value as JsonValue};
30use tokio::{select, sync::mpsc, time::sleep};
31use tracing::{debug, error, warn};
32
33use crate::common::datadog::{
34 clamp_payload_limits,
35 io::RB_BUFFER_CHUNK_SIZE,
36 request_builder::{EndpointEncoder, RequestBuilder},
37 telemetry::ComponentTelemetry,
38 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT, DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT, METRICS_SERIES_V1_PATH,
39 METRICS_SERIES_V2_PATH, METRICS_SKETCHES_PATH,
40};
41
42const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; const SERIES_V1_COMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT;
47const SERIES_V1_UNCOMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT;
48
49const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
50
51const RESOURCES_TYPE_FIELD_NUMBER: u32 = 1;
55const RESOURCES_NAME_FIELD_NUMBER: u32 = 2;
56
57const METADATA_ORIGIN_FIELD_NUMBER: u32 = 1;
58
59const ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER: u32 = 4;
60const ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER: u32 = 5;
61const ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER: u32 = 6;
62
63const METRIC_POINT_VALUE_FIELD_NUMBER: u32 = 1;
64const METRIC_POINT_TIMESTAMP_FIELD_NUMBER: u32 = 2;
65
66const DOGSKETCH_TS_FIELD_NUMBER: u32 = 1;
67const DOGSKETCH_CNT_FIELD_NUMBER: u32 = 2;
68const DOGSKETCH_MIN_FIELD_NUMBER: u32 = 3;
69const DOGSKETCH_MAX_FIELD_NUMBER: u32 = 4;
70const DOGSKETCH_AVG_FIELD_NUMBER: u32 = 5;
71const DOGSKETCH_SUM_FIELD_NUMBER: u32 = 6;
72const DOGSKETCH_K_FIELD_NUMBER: u32 = 7;
73const DOGSKETCH_N_FIELD_NUMBER: u32 = 8;
74
75const SERIES_RESOURCES_FIELD_NUMBER: u32 = 1;
76const SERIES_METRIC_FIELD_NUMBER: u32 = 2;
77const SERIES_TAGS_FIELD_NUMBER: u32 = 3;
78const SERIES_POINTS_FIELD_NUMBER: u32 = 4;
79const SERIES_TYPE_FIELD_NUMBER: u32 = 5;
80const SERIES_UNIT_FIELD_NUMBER: u32 = 6;
81const SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 7;
82const SERIES_INTERVAL_FIELD_NUMBER: u32 = 8;
83const SERIES_METADATA_FIELD_NUMBER: u32 = 9;
84
85const SKETCH_METRIC_FIELD_NUMBER: u32 = 1;
86const SKETCH_HOST_FIELD_NUMBER: u32 = 2;
87const SKETCH_TAGS_FIELD_NUMBER: u32 = 4;
88const SKETCH_DOGSKETCHES_FIELD_NUMBER: u32 = 7;
89const SKETCH_METADATA_FIELD_NUMBER: u32 = 8;
90
91static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
92static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
93
94const SERIES_V1_PAYLOAD_PREFIX: &[u8] = b"{\"series\":[";
96const SERIES_V1_PAYLOAD_SUFFIX: &[u8] = b"]}";
97const SERIES_V1_INPUT_SEPARATOR: &[u8] = b",";
98
99const fn default_max_metrics_per_payload() -> usize {
100 10_000
101}
102
103const fn default_max_payload_size() -> usize {
104 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
105}
106
107const fn default_max_uncompressed_payload_size() -> usize {
108 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
109}
110
111const fn default_max_series_payload_size() -> usize {
112 SERIES_V2_COMPRESSED_SIZE_LIMIT
113}
114
115const fn default_max_series_uncompressed_payload_size() -> usize {
116 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT
117}
118
119const fn default_flush_timeout_secs() -> u64 {
120 2
121}
122
123fn default_serializer_compressor_kind() -> String {
124 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
125}
126
127const fn default_zstd_compressor_level() -> i32 {
128 3
129}
130
131const fn default_use_v2_api_series() -> bool {
132 true
133}
134
135const fn default_log_payloads() -> bool {
136 false
137}
138
139#[derive(Clone, Deserialize, Facet)]
143#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
144#[allow(dead_code)]
145pub struct DatadogMetricsConfiguration {
146 #[serde(
152 rename = "serializer_max_metrics_per_payload",
153 default = "default_max_metrics_per_payload"
154 )]
155 max_metrics_per_payload: usize,
156
157 #[serde(rename = "serializer_max_payload_size", default = "default_max_payload_size")]
167 max_payload_size: usize,
168
169 #[serde(
179 rename = "serializer_max_uncompressed_payload_size",
180 default = "default_max_uncompressed_payload_size"
181 )]
182 max_uncompressed_payload_size: usize,
183
184 #[serde(
194 rename = "serializer_max_series_payload_size",
195 default = "default_max_series_payload_size"
196 )]
197 max_series_payload_size: usize,
198
199 #[serde(
209 rename = "serializer_max_series_uncompressed_payload_size",
210 default = "default_max_series_uncompressed_payload_size"
211 )]
212 max_series_uncompressed_payload_size: usize,
213
214 #[serde(default = "default_flush_timeout_secs")]
224 flush_timeout_secs: u64,
225
226 #[serde(
230 rename = "serializer_compressor_kind",
231 default = "default_serializer_compressor_kind"
232 )]
233 compressor_kind: String,
234
235 #[serde(
239 rename = "serializer_zstd_compressor_level",
240 default = "default_zstd_compressor_level"
241 )]
242 zstd_compressor_level: i32,
243
244 #[serde(default = "default_use_v2_api_series")]
252 use_v2_api_series: bool,
253
254 #[serde(default = "default_log_payloads")]
260 log_payloads: bool,
261
262 #[serde(default, skip)]
264 #[facet(opaque)]
265 additional_tags: Option<SharedTagSet>,
266}
267
268impl DatadogMetricsConfiguration {
269 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
271 Ok(config.as_typed()?)
272 }
273
274 pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
276 self.additional_tags = Some(additional_tags);
278 self
279 }
280}
281
282#[async_trait]
283impl EncoderBuilder for DatadogMetricsConfiguration {
284 fn input_event_type(&self) -> EventType {
285 EventType::Metric
286 }
287
288 fn output_payload_type(&self) -> PayloadType {
289 PayloadType::Http
290 }
291
292 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
293 let metrics_builder = MetricsBuilder::from_component_context(&context);
294 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
295 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
296
297 let series_endpoint = if self.use_v2_api_series {
299 MetricsEndpoint::SeriesV2
300 } else {
301 MetricsEndpoint::SeriesV1
302 };
303 let mut series_encoder = MetricsEndpointEncoder::from_endpoint(series_endpoint);
304 let mut sketches_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
305
306 if let Some(additional_tags) = self.additional_tags.as_ref() {
307 series_encoder = series_encoder.with_additional_tags(additional_tags.clone());
308 sketches_encoder = sketches_encoder.with_additional_tags(additional_tags.clone());
309 }
310
311 let mut series_rb = RequestBuilder::new(series_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
312 series_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
313
314 let generic_payload_limits = clamp_payload_limits(
315 self.max_uncompressed_payload_size,
316 self.max_payload_size,
317 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
318 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
319 );
320 let (series_uncompressed_limit, series_compressed_limit) = if series_endpoint == MetricsEndpoint::SeriesV2 {
321 clamp_payload_limits(
322 self.max_series_uncompressed_payload_size,
323 self.max_series_payload_size,
324 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
325 SERIES_V2_COMPRESSED_SIZE_LIMIT,
326 )
327 } else {
328 generic_payload_limits
329 };
330 series_rb.with_len_limits(series_uncompressed_limit, series_compressed_limit)?;
331
332 let mut sketches_rb = RequestBuilder::new(sketches_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
333 sketches_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
334 let (sketches_uncompressed_limit, sketches_compressed_limit) = generic_payload_limits;
335 sketches_rb.with_len_limits(sketches_uncompressed_limit, sketches_compressed_limit)?;
336
337 let flush_timeout = match self.flush_timeout_secs {
338 0 => Duration::from_millis(10),
341 secs => Duration::from_secs(secs),
342 };
343
344 Ok(Box::new(DatadogMetrics {
345 series_rb,
346 sketches_rb,
347 telemetry,
348 flush_timeout,
349 log_payloads: self.log_payloads,
350 }))
351 }
352}
353
354impl MemoryBounds for DatadogMetricsConfiguration {
355 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
356 builder
364 .minimum()
365 .with_single_value::<DatadogMetrics>("component struct")
366 .with_array::<EventsBuffer>("request builder events channel", 8)
367 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
368
369 builder
370 .firm()
371 .with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
374 .with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
375 }
376}
377
378pub struct DatadogMetrics {
379 series_rb: RequestBuilder<MetricsEndpointEncoder>,
380 sketches_rb: RequestBuilder<MetricsEndpointEncoder>,
381 telemetry: ComponentTelemetry,
382 flush_timeout: Duration,
383 log_payloads: bool,
384}
385
386#[async_trait]
387impl Encoder for DatadogMetrics {
388 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
389 let Self {
390 series_rb,
391 sketches_rb,
392 telemetry,
393 flush_timeout,
394 log_payloads,
395 } = *self;
396
397 let mut health = context.take_health_handle();
398
399 let (events_tx, events_rx) = mpsc::channel(8);
401 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
402 let request_builder_fut = run_request_builder(
403 series_rb,
404 sketches_rb,
405 telemetry,
406 events_rx,
407 payloads_tx,
408 flush_timeout,
409 log_payloads,
410 );
411 let request_builder_handle = context
412 .topology_context()
413 .global_thread_pool()
414 .spawn_traced_named("dd-metrics-request-builder", request_builder_fut);
415
416 health.mark_ready();
417 debug!("Datadog Metrics encoder started.");
418
419 loop {
420 select! {
421 biased;
422
423 _ = health.live() => continue,
424 maybe_payload = payloads_rx.recv() => match maybe_payload {
425 Some(payload) => {
426 if let Err(e) = context.dispatcher().dispatch(payload).await {
427 error!("Failed to dispatch payload: {}", e);
428 }
429 }
430 None => break,
431 },
432 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
433 Some(event_buffer) => events_tx.send(event_buffer).await
434 .error_context("Failed to send event buffer to request builder task.")?,
435 None => break,
436 },
437 }
438 }
439
440 drop(events_tx);
442
443 while let Some(payload) = payloads_rx.recv().await {
445 if let Err(e) = context.dispatcher().dispatch(payload).await {
446 error!("Failed to dispatch payload: {}", e);
447 }
448 }
449
450 match request_builder_handle.await {
452 Ok(Ok(())) => debug!("Request builder task stopped."),
453 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
454 Err(e) => error!(error = %e, "Request builder task panicked."),
455 }
456
457 debug!("Datadog Metrics encoder stopped.");
458
459 Ok(())
460 }
461}
462
463async fn run_request_builder(
464 mut series_request_builder: RequestBuilder<MetricsEndpointEncoder>,
465 mut sketches_request_builder: RequestBuilder<MetricsEndpointEncoder>, telemetry: ComponentTelemetry,
466 mut events_rx: mpsc::Receiver<EventsBuffer>, payloads_tx: mpsc::Sender<PayloadsBuffer>, flush_timeout: Duration,
467 log_payloads: bool,
468) -> Result<(), GenericError> {
469 let mut pending_flush = false;
470 let pending_flush_timeout = sleep(flush_timeout);
471 tokio::pin!(pending_flush_timeout);
472
473 loop {
474 select! {
475 Some(event_buffer) = events_rx.recv() => {
476 for event in event_buffer {
477 let metric = match event.try_into_metric() {
478 Some(metric) => metric,
479 None => continue,
480 };
481
482 if log_payloads {
483 log_metric_payload(&metric);
484 }
485
486 let request_builder = match metric.values() {
490 MetricValues::Counter(..)
491 | MetricValues::Rate(..)
492 | MetricValues::Gauge(..)
493 | MetricValues::Set(..) => &mut series_request_builder,
494 MetricValues::Histogram(..) | MetricValues::Distribution(..) => &mut sketches_request_builder,
495 };
496
497 let metric_to_retry = match request_builder.encode(metric).await {
501 Ok(None) => continue,
502 Ok(Some(metric)) => metric,
503 Err(e) => {
504 error!(error = %e, "Failed to encode metric.");
505 telemetry.events_dropped_encoder().increment(1);
506 continue;
507 }
508 };
509
510 let maybe_requests = request_builder.flush().await;
511 if maybe_requests.is_empty() {
512 panic!("builder told us to flush, but gave us nothing");
513 }
514
515 for maybe_request in maybe_requests {
516 match maybe_request {
517 Ok((events, data_points, request)) => {
518 let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
519 let http_payload = HttpPayload::new(payload_meta, request);
520 let payload = Payload::Http(http_payload);
521
522 payloads_tx.send(payload).await
523 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
524 },
525
526 Err(e) => if e.is_recoverable() {
528 continue;
530 } else {
531 return Err(GenericError::from(e).context("Failed to flush request."));
532 }
533 }
534 }
535
536 if let Err(e) = request_builder.encode(metric_to_retry).await {
540 error!(error = %e, "Failed to encode metric.");
541 telemetry.events_dropped_encoder().increment(1);
542 }
543 }
544
545 debug!("Processed event buffer.");
546
547 if !pending_flush {
549 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
550 pending_flush = true;
551 }
552 },
553 _ = &mut pending_flush_timeout, if pending_flush => {
554 debug!("Flushing pending request(s).");
555
556 pending_flush = false;
557
558 let maybe_series_requests = series_request_builder.flush().await;
561 for maybe_request in maybe_series_requests {
562 match maybe_request {
563 Ok((events, data_points, request)) => {
564 let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
565 let http_payload = HttpPayload::new(payload_meta, request);
566 let payload = Payload::Http(http_payload);
567
568 payloads_tx.send(payload).await
569 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
570 },
571
572 Err(e) => if e.is_recoverable() {
574 continue;
576 } else {
577 return Err(GenericError::from(e).context("Failed to flush request."));
578 }
579 }
580 }
581
582 let maybe_sketches_requests = sketches_request_builder.flush().await;
583 for maybe_request in maybe_sketches_requests {
584 match maybe_request {
585 Ok((events, data_points, request)) => {
586 let payload_meta = PayloadMetadata::from_event_and_data_point_count(events, data_points);
587 let http_payload = HttpPayload::new(payload_meta, request);
588 let payload = Payload::Http(http_payload);
589
590 payloads_tx.send(payload).await
591 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
592 },
593
594 Err(e) => if e.is_recoverable() {
596 continue;
598 } else {
599 return Err(GenericError::from(e).context("Failed to flush request."));
600 }
601 }
602 }
603
604 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
605 },
606
607 else => break,
609 }
610 }
611
612 Ok(())
613}
614
615fn log_metric_payload(metric: &Metric) {
616 match metric.values() {
617 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
618 debug!(?metric, "Flushing series metric.")
619 }
620 MetricValues::Histogram(..) | MetricValues::Distribution(..) => {
621 debug!(?metric, "Flushing sketch metric.")
622 }
623 }
624}
625
626#[derive(Clone, Copy, Debug, Eq, PartialEq)]
628enum MetricsEndpoint {
629 SeriesV1,
633
634 SeriesV2,
638
639 Sketches,
643}
644
645#[derive(Debug)]
647pub enum MetricsEncodeError {
648 Protobuf(protobuf::Error),
650
651 Json(serde_json::Error),
653}
654
655impl fmt::Display for MetricsEncodeError {
656 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657 match self {
658 Self::Protobuf(e) => write!(f, "protobuf encode error: {}", e),
659 Self::Json(e) => write!(f, "json encode error: {}", e),
660 }
661 }
662}
663
664impl std::error::Error for MetricsEncodeError {
665 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
666 match self {
667 Self::Protobuf(e) => Some(e),
668 Self::Json(e) => Some(e),
669 }
670 }
671}
672
673impl From<protobuf::Error> for MetricsEncodeError {
674 fn from(value: protobuf::Error) -> Self {
675 Self::Protobuf(value)
676 }
677}
678
679impl From<serde_json::Error> for MetricsEncodeError {
680 fn from(value: serde_json::Error) -> Self {
681 Self::Json(value)
682 }
683}
684
685#[derive(Debug)]
686struct MetricsEndpointEncoder {
687 endpoint: MetricsEndpoint,
688 primary_scratch_buf: Vec<u8>,
689 secondary_scratch_buf: Vec<u8>,
690 packed_scratch_buf: Vec<u8>,
691 additional_tags: SharedTagSet,
692 tags_deduplicator: ReusableDeduplicator<Tag>,
693}
694
695impl MetricsEndpointEncoder {
696 pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self {
698 Self {
699 endpoint,
700 primary_scratch_buf: Vec::new(),
701 secondary_scratch_buf: Vec::new(),
702 packed_scratch_buf: Vec::new(),
703 additional_tags: SharedTagSet::default(),
704 tags_deduplicator: ReusableDeduplicator::new(),
705 }
706 }
707
708 pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
714 self.additional_tags = additional_tags;
715 self
716 }
717}
718
719impl EndpointEncoder for MetricsEndpointEncoder {
720 type Input = Metric;
721 type EncodeError = MetricsEncodeError;
722
723 fn encoder_name() -> &'static str {
724 "metrics"
725 }
726
727 fn compressed_size_limit(&self) -> usize {
728 match self.endpoint {
729 MetricsEndpoint::SeriesV1 => SERIES_V1_COMPRESSED_SIZE_LIMIT,
730 MetricsEndpoint::SeriesV2 => SERIES_V2_COMPRESSED_SIZE_LIMIT,
731 MetricsEndpoint::Sketches => DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
732 }
733 }
734
735 fn uncompressed_size_limit(&self) -> usize {
736 match self.endpoint {
737 MetricsEndpoint::SeriesV1 => SERIES_V1_UNCOMPRESSED_SIZE_LIMIT,
738 MetricsEndpoint::SeriesV2 => SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
739 MetricsEndpoint::Sketches => DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
740 }
741 }
742
743 fn input_data_point_count(&self, input: &Self::Input) -> usize {
744 input.values().len()
745 }
746
747 fn is_valid_input(&self, input: &Self::Input) -> bool {
748 let is_series_input = matches!(
749 input.values(),
750 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..)
751 );
752
753 match self.endpoint {
754 MetricsEndpoint::SeriesV1 | MetricsEndpoint::SeriesV2 => is_series_input,
755 MetricsEndpoint::Sketches => !is_series_input,
756 }
757 }
758
759 fn get_payload_prefix(&self) -> Option<&'static [u8]> {
760 match self.endpoint {
761 MetricsEndpoint::SeriesV1 => Some(SERIES_V1_PAYLOAD_PREFIX),
762 _ => None,
763 }
764 }
765
766 fn get_payload_suffix(&self) -> Option<&'static [u8]> {
767 match self.endpoint {
768 MetricsEndpoint::SeriesV1 => Some(SERIES_V1_PAYLOAD_SUFFIX),
769 _ => None,
770 }
771 }
772
773 fn get_input_separator(&self) -> Option<&'static [u8]> {
774 match self.endpoint {
775 MetricsEndpoint::SeriesV1 => Some(SERIES_V1_INPUT_SEPARATOR),
776 _ => None,
777 }
778 }
779
780 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
781 match self.endpoint {
782 MetricsEndpoint::SeriesV1 => {
783 encode_series_v1_metric(input, &self.additional_tags, buffer, &mut self.tags_deduplicator)?;
784 Ok(())
785 }
786 MetricsEndpoint::SeriesV2 | MetricsEndpoint::Sketches => {
787 encode_single_metric(
808 input,
809 &self.additional_tags,
810 buffer,
811 &mut self.primary_scratch_buf,
812 &mut self.secondary_scratch_buf,
813 &mut self.packed_scratch_buf,
814 &mut self.tags_deduplicator,
815 )?;
816 Ok(())
817 }
818 }
819 }
820
821 fn endpoint_uri(&self) -> Uri {
822 match self.endpoint {
823 MetricsEndpoint::SeriesV1 => PathAndQuery::from_static(METRICS_SERIES_V1_PATH).into(),
824 MetricsEndpoint::SeriesV2 => PathAndQuery::from_static(METRICS_SERIES_V2_PATH).into(),
825 MetricsEndpoint::Sketches => PathAndQuery::from_static(METRICS_SKETCHES_PATH).into(),
826 }
827 }
828
829 fn endpoint_method(&self) -> Method {
830 Method::POST
832 }
833
834 fn content_type(&self) -> HeaderValue {
835 match self.endpoint {
836 MetricsEndpoint::SeriesV1 => CONTENT_TYPE_JSON.clone(),
837 MetricsEndpoint::SeriesV2 | MetricsEndpoint::Sketches => CONTENT_TYPE_PROTOBUF.clone(),
838 }
839 }
840}
841
842fn field_number_for_metric_type(metric: &Metric) -> u32 {
843 match metric.values() {
844 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => 1,
845 MetricValues::Histogram(..) | MetricValues::Distribution(..) => 1,
846 }
847}
848
849fn get_message_size(raw_msg_size: usize) -> Result<u32, protobuf::Error> {
850 const MAX_MESSAGE_SIZE: u64 = i32::MAX as u64;
851
852 if raw_msg_size as u64 > MAX_MESSAGE_SIZE {
854 return Err(std::io::Error::other("message size exceeds limit (2147483648 bytes)").into());
855 }
856
857 Ok(raw_msg_size as u32)
858}
859
860fn get_message_size_from_buffer(buf: &[u8]) -> Result<u32, protobuf::Error> {
861 get_message_size(buf.len())
862}
863
864fn encode_single_metric(
865 metric: &Metric, additional_tags: &SharedTagSet, output_buf: &mut Vec<u8>, primary_scratch_buf: &mut Vec<u8>,
866 secondary_scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
867 tags_deduplicator: &mut ReusableDeduplicator<Tag>,
868) -> Result<(), protobuf::Error> {
869 let mut output_stream = CodedOutputStream::vec(output_buf);
870 let field_number = field_number_for_metric_type(metric);
871
872 write_nested_message(&mut output_stream, primary_scratch_buf, field_number, |os| {
873 match metric.values() {
875 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
876 encode_series_v2_metric(metric, additional_tags, os, secondary_scratch_buf, tags_deduplicator)
877 }
878 MetricValues::Histogram(..) | MetricValues::Distribution(..) => encode_sketch_metric(
879 metric,
880 additional_tags,
881 os,
882 secondary_scratch_buf,
883 packed_scratch_buf,
884 tags_deduplicator,
885 ),
886 }
887 })
888}
889
890fn encode_series_v2_metric(
891 metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
892 scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
893) -> Result<(), protobuf::Error> {
894 output_stream.write_string(SERIES_METRIC_FIELD_NUMBER, metric.context().name())?;
896
897 let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
898 write_series_tags(deduplicated_tags, output_stream, scratch_buf)?;
899
900 write_resource(
902 output_stream,
903 scratch_buf,
904 "host",
905 metric.metadata().hostname().unwrap_or_default(),
906 )?;
907
908 if let Some(origin) = metric.metadata().origin() {
910 match origin {
911 MetricOrigin::SourceType(source_type) => {
912 output_stream.write_string(SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER, source_type.as_ref())?;
913 }
914 MetricOrigin::OriginMetadata {
915 product,
916 subproduct,
917 product_detail,
918 } => {
919 write_origin_metadata(
920 output_stream,
921 scratch_buf,
922 SERIES_METADATA_FIELD_NUMBER,
923 *product,
924 *subproduct,
925 *product_detail,
926 )?;
927 }
928 }
929 }
930
931 let (metric_type, points, maybe_interval) = match metric.values() {
933 MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
934 MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
935 MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
936 MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
937 _ => unreachable!("encode_series_v2_metric called with non-series metric"),
938 };
939
940 output_stream.write_enum(SERIES_TYPE_FIELD_NUMBER, metric_type.value())?;
941
942 if let Some(unit) = metric.metadata().unit() {
943 output_stream.write_string(SERIES_UNIT_FIELD_NUMBER, unit)?;
944 }
945
946 for (timestamp, value) in points {
947 let value = maybe_interval
949 .map(|interval| value / interval.as_secs_f64())
950 .unwrap_or(value);
951 let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
952
953 write_point(output_stream, scratch_buf, value, timestamp)?;
954 }
955
956 if let Some(interval) = maybe_interval {
957 output_stream.write_int64(SERIES_INTERVAL_FIELD_NUMBER, interval.as_secs() as i64)?;
958 }
959
960 Ok(())
961}
962
963fn encode_series_v1_metric(
964 metric: &Metric, additional_tags: &SharedTagSet, buffer: &mut Vec<u8>,
965 tags_deduplicator: &mut ReusableDeduplicator<Tag>,
966) -> Result<(), serde_json::Error> {
967 let mut obj = JsonMap::new();
968
969 obj.insert("metric".into(), JsonValue::String(metric.context().name().to_string()));
970
971 let (type_str, points_iter, maybe_interval) = match metric.values() {
972 MetricValues::Counter(points) => ("count", points.into_iter(), None),
973 MetricValues::Rate(points, interval) => ("rate", points.into_iter(), Some(*interval)),
974 MetricValues::Gauge(points) => ("gauge", points.into_iter(), None),
975 MetricValues::Set(points) => ("gauge", points.into_iter(), None),
976 _ => unreachable!("encode_series_v1_metric called with non-series metric"),
977 };
978
979 let mut points = Vec::new();
980 for (timestamp, value) in points_iter {
981 let value = maybe_interval
983 .map(|interval| value / interval.as_secs_f64())
984 .unwrap_or(value);
985 let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
986
987 let value_json = JsonNumber::from_f64(value)
989 .map(JsonValue::Number)
990 .unwrap_or_else(|| JsonValue::from(0));
991 points.push(JsonValue::Array(vec![JsonValue::from(timestamp), value_json]));
992 }
993 obj.insert("points".into(), JsonValue::Array(points));
994
995 let deduplicated = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
998 let mut tags_out = Vec::new();
999 let mut device: Option<String> = None;
1000 for tag in deduplicated {
1001 if tag.name() == "dd.internal.resource" {
1002 continue;
1003 }
1004 if device.is_none() && tag.name() == "device" {
1005 if let Some(v) = tag.value() {
1006 device = Some(v.to_string());
1007 continue;
1008 }
1009 }
1010 tags_out.push(JsonValue::String(tag.as_str().to_string()));
1011 }
1012 obj.insert("tags".into(), JsonValue::Array(tags_out));
1013
1014 obj.insert(
1016 "host".into(),
1017 JsonValue::String(metric.metadata().hostname().unwrap_or_default().to_string()),
1018 );
1019
1020 if let Some(d) = device.filter(|s| !s.is_empty()) {
1021 obj.insert("device".into(), JsonValue::String(d));
1022 }
1023
1024 obj.insert("type".into(), JsonValue::String(type_str.into()));
1025
1026 let interval_secs = maybe_interval.map(|iv| iv.as_secs() as i64).unwrap_or(0);
1027 obj.insert("interval".into(), JsonValue::from(interval_secs));
1028
1029 if let Some(MetricOrigin::SourceType(s)) = metric.metadata().origin() {
1031 obj.insert("source_type_name".into(), JsonValue::String(s.as_ref().to_string()));
1032 }
1033
1034 if let Some(unit) = metric.metadata().unit() {
1035 if !unit.is_empty() {
1036 obj.insert("unit".into(), JsonValue::String(unit.to_string()));
1037 }
1038 }
1039
1040 serde_json::to_writer(buffer, &JsonValue::Object(obj))
1041}
1042
1043fn encode_sketch_metric(
1044 metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
1045 scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
1046) -> Result<(), protobuf::Error> {
1047 output_stream.write_string(SKETCH_METRIC_FIELD_NUMBER, metric.context().name())?;
1049
1050 let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
1051 write_sketch_tags(deduplicated_tags, output_stream, scratch_buf)?;
1052
1053 output_stream.write_string(
1055 SKETCH_HOST_FIELD_NUMBER,
1056 metric.metadata().hostname().unwrap_or_default(),
1057 )?;
1058
1059 if let Some(MetricOrigin::OriginMetadata {
1061 product,
1062 subproduct,
1063 product_detail,
1064 }) = metric.metadata().origin()
1065 {
1066 write_origin_metadata(
1067 output_stream,
1068 scratch_buf,
1069 SKETCH_METADATA_FIELD_NUMBER,
1070 *product,
1071 *subproduct,
1072 *product_detail,
1073 )?;
1074 }
1075
1076 match metric.values() {
1081 MetricValues::Distribution(sketches) => {
1082 for (timestamp, value) in sketches {
1083 write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, value)?;
1084 }
1085 }
1086 MetricValues::Histogram(points) => {
1087 for (timestamp, histogram) in points {
1088 let mut ddsketch = DDSketch::default();
1090 for sample in histogram.samples() {
1091 ddsketch.insert_n(sample.value.into_inner(), sample.weight.0 as u64);
1092 }
1093
1094 write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
1095 }
1096 }
1097 _ => unreachable!("encode_sketch_metric called with non-sketch metric"),
1098 }
1099
1100 Ok(())
1101}
1102
1103fn write_resource(
1104 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, resource_type: &str, resource_name: &str,
1105) -> Result<(), protobuf::Error> {
1106 write_nested_message(output_stream, scratch_buf, SERIES_RESOURCES_FIELD_NUMBER, |os| {
1107 os.write_string(RESOURCES_TYPE_FIELD_NUMBER, resource_type)?;
1108 os.write_string(RESOURCES_NAME_FIELD_NUMBER, resource_name)
1109 })
1110}
1111
1112fn write_origin_metadata(
1113 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, origin_product: u32,
1114 origin_category: u32, origin_service: u32,
1115) -> Result<(), protobuf::Error> {
1116 scratch_buf.clear();
1119
1120 {
1121 let mut origin_output_stream = CodedOutputStream::vec(scratch_buf);
1122 origin_output_stream.write_uint32(ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER, origin_product)?;
1123 origin_output_stream.write_uint32(ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER, origin_category)?;
1124 origin_output_stream.write_uint32(ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER, origin_service)?;
1125 origin_output_stream.flush()?;
1126 }
1127
1128 let origin_message_size = get_message_size_from_buffer(scratch_buf)?;
1133
1134 let mut metadata_preamble_buf = [0; 64];
1135 let metadata_preamble_len = {
1136 let mut metadata_output_stream = CodedOutputStream::bytes(&mut metadata_preamble_buf[..]);
1137 metadata_output_stream.write_tag(METADATA_ORIGIN_FIELD_NUMBER, WireType::LengthDelimited)?;
1138 metadata_output_stream.write_raw_varint32(origin_message_size)?;
1139 metadata_output_stream.flush()?;
1140 metadata_output_stream.total_bytes_written() as usize
1141 };
1142
1143 let metadata_message_size = get_message_size(scratch_buf.len() + metadata_preamble_len)?;
1144
1145 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1146 output_stream.write_raw_varint32(metadata_message_size)?;
1147 output_stream.write_raw_bytes(&metadata_preamble_buf[..metadata_preamble_len])?;
1148 output_stream.write_raw_bytes(scratch_buf)
1149}
1150
1151fn write_point(
1152 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, value: f64, timestamp: i64,
1153) -> Result<(), protobuf::Error> {
1154 write_nested_message(output_stream, scratch_buf, SERIES_POINTS_FIELD_NUMBER, |os| {
1155 os.write_double(METRIC_POINT_VALUE_FIELD_NUMBER, value)?;
1156 os.write_int64(METRIC_POINT_TIMESTAMP_FIELD_NUMBER, timestamp)
1157 })
1158}
1159
1160fn write_dogsketch(
1161 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
1162 timestamp: Option<NonZeroU64>, sketch: &DDSketch,
1163) -> Result<(), protobuf::Error> {
1164 if sketch.is_empty() {
1166 warn!("Attempted to write an empty sketch to sketches payload, skipping.");
1167 return Ok(());
1168 }
1169
1170 write_nested_message(output_stream, scratch_buf, SKETCH_DOGSKETCHES_FIELD_NUMBER, |os| {
1171 os.write_int64(DOGSKETCH_TS_FIELD_NUMBER, timestamp.map_or(0, |ts| ts.get() as i64))?;
1172 os.write_int64(DOGSKETCH_CNT_FIELD_NUMBER, sketch.count() as i64)?;
1173 os.write_double(DOGSKETCH_MIN_FIELD_NUMBER, sketch.min().unwrap())?;
1174 os.write_double(DOGSKETCH_MAX_FIELD_NUMBER, sketch.max().unwrap())?;
1175 os.write_double(DOGSKETCH_AVG_FIELD_NUMBER, sketch.avg().unwrap())?;
1176 os.write_double(DOGSKETCH_SUM_FIELD_NUMBER, sketch.sum().unwrap())?;
1177
1178 let bin_keys = sketch.bins().iter().map(|bin| bin.key());
1179 write_repeated_packed_from_iter(
1180 os,
1181 packed_scratch_buf,
1182 DOGSKETCH_K_FIELD_NUMBER,
1183 bin_keys,
1184 |inner_os, value| inner_os.write_sint32_no_tag(value),
1185 )?;
1186
1187 let bin_counts = sketch.bins().iter().map(|bin| bin.count());
1188 write_repeated_packed_from_iter(
1189 os,
1190 packed_scratch_buf,
1191 DOGSKETCH_N_FIELD_NUMBER,
1192 bin_counts,
1193 |inner_os, value| inner_os.write_uint32_no_tag(value),
1194 )
1195 })
1196}
1197
1198fn get_deduplicated_tags<'a>(
1199 metric: &'a Metric, additional_tags: &'a SharedTagSet, tags_deduplicator: &'a mut ReusableDeduplicator<Tag>,
1200) -> impl Iterator<Item = &'a Tag> {
1201 let chained_tags = metric
1202 .context()
1203 .tags()
1204 .into_iter()
1205 .chain(additional_tags)
1206 .chain(metric.context().origin_tags());
1207
1208 tags_deduplicator.deduplicated(chained_tags)
1209}
1210
1211fn write_tags<'a, I, F>(
1212 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, tag_encoder: F,
1213) -> Result<(), protobuf::Error>
1214where
1215 I: Iterator<Item = &'a Tag>,
1216 F: Fn(&Tag, &mut CodedOutputStream<'_>, &mut Vec<u8>) -> Result<(), protobuf::Error>,
1217{
1218 for tag in tags {
1219 tag_encoder(tag, output_stream, scratch_buf)?;
1220 }
1221
1222 Ok(())
1223}
1224
1225fn write_series_tags<'a, I>(
1226 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
1227) -> Result<(), protobuf::Error>
1228where
1229 I: Iterator<Item = &'a Tag>,
1230{
1231 write_tags(tags, output_stream, scratch_buf, |tag, os, buf| {
1232 if tag.name() == "dd.internal.resource" {
1234 if let Some((resource_type, resource_name)) = tag.value().and_then(|s| s.split_once(':')) {
1235 write_resource(os, buf, resource_type, resource_name)
1236 } else {
1237 Ok(())
1238 }
1239 } else {
1240 os.write_string(SERIES_TAGS_FIELD_NUMBER, tag.as_str())
1242 }
1243 })
1244}
1245
1246fn write_sketch_tags<'a, I>(
1247 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
1248) -> Result<(), protobuf::Error>
1249where
1250 I: Iterator<Item = &'a Tag>,
1251{
1252 write_tags(tags, output_stream, scratch_buf, |tag, os, _buf| {
1253 os.write_string(SKETCH_TAGS_FIELD_NUMBER, tag.as_str())
1255 })
1256}
1257
1258fn write_nested_message<F>(
1259 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, writer: F,
1260) -> Result<(), protobuf::Error>
1261where
1262 F: FnOnce(&mut CodedOutputStream<'_>) -> Result<(), protobuf::Error>,
1263{
1264 scratch_buf.clear();
1265
1266 {
1267 let mut nested_output_stream = CodedOutputStream::vec(scratch_buf);
1268 writer(&mut nested_output_stream)?;
1269 nested_output_stream.flush()?;
1270 }
1271
1272 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1273
1274 let nested_message_size = get_message_size_from_buffer(scratch_buf)?;
1275 output_stream.write_raw_varint32(nested_message_size)?;
1276 output_stream.write_raw_bytes(scratch_buf)
1277}
1278
1279fn write_repeated_packed_from_iter<I, T, F>(
1280 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, values: I, writer: F,
1281) -> Result<(), protobuf::Error>
1282where
1283 I: Iterator<Item = T>,
1284 F: Fn(&mut CodedOutputStream<'_>, T) -> Result<(), protobuf::Error>,
1285{
1286 scratch_buf.clear();
1295
1296 {
1297 let mut packed_output_stream = CodedOutputStream::vec(scratch_buf);
1298 for value in values {
1299 writer(&mut packed_output_stream, value)?;
1300 }
1301 packed_output_stream.flush()?;
1302 }
1303
1304 let data_size = get_message_size_from_buffer(scratch_buf)?;
1305
1306 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
1307 output_stream.write_raw_varint32(data_size)?;
1308 output_stream.write_raw_bytes(scratch_buf)
1309}
1310
1311#[cfg(test)]
1312mod tests {
1313 use std::{sync::Arc, time::Duration};
1314
1315 use protobuf::CodedOutputStream;
1316 use saluki_common::iter::ReusableDeduplicator;
1317 use saluki_context::{tags::SharedTagSet, Context};
1318 use saluki_core::data_model::event::metric::{Metric, MetricMetadata, MetricOrigin, MetricValues};
1319 use serde_json::Value as JsonValue;
1320 use stringtheory::MetaString;
1321
1322 use super::{
1323 encode_series_v1_metric, encode_series_v2_metric, encode_sketch_metric, MetricsEndpoint,
1324 MetricsEndpointEncoder, SERIES_V1_INPUT_SEPARATOR, SERIES_V1_PAYLOAD_PREFIX, SERIES_V1_PAYLOAD_SUFFIX,
1325 };
1326 use crate::common::datadog::{
1327 request_builder::EndpointEncoder as _, DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
1328 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
1329 };
1330
1331 fn encode_one_v1(metric: &Metric) -> JsonValue {
1332 let mut buf = Vec::new();
1333 let host_tags = SharedTagSet::default();
1334 let mut tags_deduplicator = ReusableDeduplicator::new();
1335 encode_series_v1_metric(metric, &host_tags, &mut buf, &mut tags_deduplicator)
1336 .expect("encode_series_v1_metric should succeed");
1337 serde_json::from_slice(&buf).expect("encoder produced invalid JSON")
1338 }
1339
1340 #[test]
1341 fn histogram_vs_sketch_identical_payload() {
1342 let samples = &[1.0, 2.0, 3.0, 4.0, 5.0];
1348 let histogram = Metric::histogram("simple_samples", samples);
1349 let distribution = Metric::distribution("simple_samples", samples);
1350 let host_tags = SharedTagSet::default();
1351
1352 let mut buf1 = Vec::new();
1353 let mut buf2 = Vec::new();
1354 let mut tags_deduplicator = ReusableDeduplicator::new();
1355
1356 let mut histogram_payload = Vec::new();
1357 {
1358 let mut histogram_writer = CodedOutputStream::vec(&mut histogram_payload);
1359 encode_sketch_metric(
1360 &histogram,
1361 &host_tags,
1362 &mut histogram_writer,
1363 &mut buf1,
1364 &mut buf2,
1365 &mut tags_deduplicator,
1366 )
1367 .expect("Failed to encode histogram as sketch");
1368 }
1369
1370 let mut distribution_payload = Vec::new();
1371 {
1372 let mut distribution_writer = CodedOutputStream::vec(&mut distribution_payload);
1373 encode_sketch_metric(
1374 &distribution,
1375 &host_tags,
1376 &mut distribution_writer,
1377 &mut buf1,
1378 &mut buf2,
1379 &mut tags_deduplicator,
1380 )
1381 .expect("Failed to encode distribution as sketch");
1382 }
1383
1384 assert_eq!(histogram_payload, distribution_payload);
1385 }
1386
1387 #[test]
1388 fn input_valid() {
1389 let counter = Metric::counter("counter", 1.0);
1392 let rate = Metric::rate("rate", 1.0, Duration::from_secs(1));
1393 let gauge = Metric::gauge("gauge", 1.0);
1394 let set = Metric::set("set", "foo");
1395 let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1396 let distribution = Metric::distribution("distribution", [1.0, 2.0, 3.0]);
1397
1398 let series_v1 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV1);
1399 let series_v2 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1400 let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1401
1402 for series_endpoint in [&series_v1, &series_v2] {
1403 assert!(series_endpoint.is_valid_input(&counter));
1404 assert!(series_endpoint.is_valid_input(&rate));
1405 assert!(series_endpoint.is_valid_input(&gauge));
1406 assert!(series_endpoint.is_valid_input(&set));
1407 assert!(!series_endpoint.is_valid_input(&histogram));
1408 assert!(!series_endpoint.is_valid_input(&distribution));
1409 }
1410
1411 assert!(!sketches_endpoint.is_valid_input(&counter));
1412 assert!(!sketches_endpoint.is_valid_input(&rate));
1413 assert!(!sketches_endpoint.is_valid_input(&gauge));
1414 assert!(!sketches_endpoint.is_valid_input(&set));
1415 assert!(sketches_endpoint.is_valid_input(&histogram));
1416 assert!(sketches_endpoint.is_valid_input(&distribution));
1417 }
1418
1419 #[test]
1420 fn input_data_point_count_tracks_metric_values() {
1421 let counter = Metric::counter("counter", [(123, 1.0), (124, 2.0)]);
1422 let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1423
1424 let series_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1425 let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1426
1427 assert_eq!(series_endpoint.input_data_point_count(&counter), 2);
1428 assert_eq!(sketches_endpoint.input_data_point_count(&histogram), 1);
1429 }
1430
1431 #[test]
1432 fn series_metric_unit_encoded() {
1433 let context = Context::from_static_parts("my.timer.avg", &[]);
1439 let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1440 let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1441
1442 let host_tags = SharedTagSet::default();
1443 let mut scratch_buf = Vec::new();
1444 let mut tags_deduplicator = ReusableDeduplicator::new();
1445
1446 let mut payload = Vec::new();
1447 {
1448 let mut writer = CodedOutputStream::vec(&mut payload);
1449 encode_series_v2_metric(
1450 &gauge,
1451 &host_tags,
1452 &mut writer,
1453 &mut scratch_buf,
1454 &mut tags_deduplicator,
1455 )
1456 .expect("Failed to encode gauge as series metric");
1457 writer.flush().expect("Failed to flush");
1458 }
1459
1460 let expected_tag: u8 = (6 << 3) | 2; let expected_value = b"millisecond";
1464
1465 let tag_pos = payload
1466 .windows(1 + 1 + expected_value.len())
1467 .position(|w| w[0] == expected_tag && w[1] == expected_value.len() as u8 && &w[2..] == expected_value);
1468
1469 assert!(
1470 tag_pos.is_some(),
1471 "series payload should contain unit field (field 6 = 'millisecond'), got bytes: {:?}",
1472 payload
1473 );
1474 }
1475
1476 #[test]
1477 fn series_v1_basic_payload_shape() {
1478 let counter = Metric::counter("my.count", 5.0);
1481 let counter_json = encode_one_v1(&counter);
1482 assert_eq!(counter_json["metric"], "my.count");
1483 assert_eq!(counter_json["type"], "count");
1484 assert_eq!(counter_json["interval"], 0);
1485 assert_eq!(counter_json["host"], "");
1486 assert_eq!(counter_json["tags"], JsonValue::Array(vec![]));
1487 let points = counter_json["points"].as_array().expect("points is array");
1488 assert_eq!(points.len(), 1);
1489 assert_eq!(points[0][0], 0);
1490 assert_eq!(points[0][1], 5.0);
1491 assert!(counter_json.get("unit").is_none());
1493 assert!(counter_json.get("source_type_name").is_none());
1494 assert!(counter_json.get("device").is_none());
1495
1496 let rate = Metric::rate("my.rate", 30.0, Duration::from_secs(10));
1497 let rate_json = encode_one_v1(&rate);
1498 assert_eq!(rate_json["type"], "rate");
1499 assert_eq!(rate_json["interval"], 10);
1500 let rate_points = rate_json["points"].as_array().expect("rate points is array");
1502 assert_eq!(rate_points[0][1], 3.0);
1503
1504 let gauge = Metric::gauge("my.gauge", 42.0);
1505 let gauge_json = encode_one_v1(&gauge);
1506 assert_eq!(gauge_json["type"], "gauge");
1507
1508 let set = Metric::set("my.set", "alpha");
1510 let set_json = encode_one_v1(&set);
1511 assert_eq!(set_json["type"], "gauge");
1512 let set_points = set_json["points"].as_array().expect("set points is array");
1513 assert_eq!(set_points[0][1], 1.0);
1514 }
1515
1516 #[test]
1517 fn series_v1_unit_and_hostname_emitted() {
1518 let context = Context::from_static_parts("my.timer.avg", &[]);
1519 let metadata = MetricMetadata::default()
1520 .with_unit(MetaString::from_static("millisecond"))
1521 .with_hostname(Some(Arc::from("host-1")));
1522 let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1523
1524 let json = encode_one_v1(&gauge);
1525 assert_eq!(json["unit"], "millisecond");
1526 assert_eq!(json["host"], "host-1");
1527 }
1528
1529 #[test]
1530 fn series_v1_device_tag_extraction() {
1531 let context = Context::from_static_parts("my.metric", &["device:eth0", "env:prod"]);
1533 let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), MetricMetadata::default());
1534
1535 let json = encode_one_v1(&counter);
1536 assert_eq!(json["device"], "eth0");
1537 let tags = json["tags"].as_array().expect("tags is array");
1538 let tag_strs: Vec<&str> = tags.iter().filter_map(|v| v.as_str()).collect();
1539 assert!(
1540 !tag_strs.iter().any(|t| t.starts_with("device:")),
1541 "device tag must be removed: {:?}",
1542 tag_strs
1543 );
1544 assert!(tag_strs.contains(&"env:prod"));
1545 }
1546
1547 #[test]
1548 fn series_v1_source_type_name_from_source_type_origin() {
1549 let context = Context::from_static_parts("my.metric", &[]);
1550 let metadata = MetricMetadata::default().with_source_type(Some(Arc::from("integration_x")));
1551 let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), metadata);
1552
1553 let json = encode_one_v1(&counter);
1554 assert_eq!(json["source_type_name"], "integration_x");
1555 }
1556
1557 #[test]
1558 fn series_v1_origin_metadata_dropped() {
1559 let context = Context::from_static_parts("my.metric", &[]);
1561 let metadata = MetricMetadata::default().with_origin(Some(MetricOrigin::dogstatsd()));
1562 let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), metadata);
1563
1564 let json = encode_one_v1(&counter);
1565 assert!(json.get("source_type_name").is_none());
1566 }
1567
1568 #[test]
1569 fn series_v1_dd_internal_resource_dropped() {
1570 let context = Context::from_static_parts("my.metric", &["dd.internal.resource:host:foo", "env:prod"]);
1572 let counter = Metric::from_parts(context, MetricValues::counter([1.0_f64]), MetricMetadata::default());
1573
1574 let json = encode_one_v1(&counter);
1575 let tags = json["tags"].as_array().expect("tags is array");
1576 let tag_strs: Vec<&str> = tags.iter().filter_map(|v| v.as_str()).collect();
1577 assert!(
1578 !tag_strs.iter().any(|t| t.starts_with("dd.internal.resource:")),
1579 "dd.internal.resource tag must be dropped: {:?}",
1580 tag_strs
1581 );
1582 assert!(tag_strs.contains(&"env:prod"));
1583 }
1584
1585 #[test]
1586 fn series_v1_endpoint_routing() {
1587 let encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV1);
1589 assert_eq!(encoder.endpoint_uri().path(), "/api/v1/series");
1590 assert_eq!(encoder.content_type(), "application/json");
1591 assert_eq!(encoder.get_payload_prefix(), Some(SERIES_V1_PAYLOAD_PREFIX));
1592 assert_eq!(encoder.get_payload_suffix(), Some(SERIES_V1_PAYLOAD_SUFFIX));
1593 assert_eq!(encoder.get_input_separator(), Some(SERIES_V1_INPUT_SEPARATOR));
1594 assert_eq!(
1595 encoder.compressed_size_limit(),
1596 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
1597 );
1598 assert_eq!(
1599 encoder.uncompressed_size_limit(),
1600 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
1601 );
1602
1603 let sketches = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1605 assert_eq!(
1606 sketches.compressed_size_limit(),
1607 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
1608 );
1609 assert_eq!(
1610 sketches.uncompressed_size_limit(),
1611 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
1612 );
1613
1614 let v2 = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::SeriesV2);
1616 assert_eq!(v2.endpoint_uri().path(), "/api/v2/series");
1617 assert_eq!(v2.content_type(), "application/x-protobuf");
1618 assert!(v2.get_payload_prefix().is_none());
1619 }
1620}
1621
1622#[cfg(test)]
1623mod config_smoke {
1624 use serde_json::json;
1625
1626 use super::DatadogMetricsConfiguration;
1627 use crate::config_registry::structs;
1628 use crate::config_registry::test_support::run_config_smoke_tests;
1629
1630 #[tokio::test]
1631 async fn smoke_test() {
1632 run_config_smoke_tests(structs::DATADOG_METRICS_CONFIGURATION, &[], json!({}), |cfg| {
1633 cfg.as_typed::<DatadogMetricsConfiguration>()
1634 .expect("DatadogMetricsConfiguration should deserialize")
1635 })
1636 .await
1637 }
1638}
1639
1640#[cfg(test)]
1641mod use_v2_api_series_default {
1642 use saluki_config::ConfigurationLoader;
1643 use serde_json::json;
1644
1645 use super::{DatadogMetricsConfiguration, SERIES_V2_COMPRESSED_SIZE_LIMIT, SERIES_V2_UNCOMPRESSED_SIZE_LIMIT};
1646 use crate::{common::datadog::clamp_payload_limits, config::KEY_ALIASES};
1647
1648 #[tokio::test]
1652 async fn defaults_to_true_when_absent() {
1653 let cfg = ConfigurationLoader::default()
1654 .with_key_aliases(KEY_ALIASES)
1655 .add_providers([figment::providers::Serialized::defaults(json!({}))])
1656 .into_generic()
1657 .await
1658 .expect("config should load");
1659 let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1660 assert!(parsed.use_v2_api_series);
1661 }
1662
1663 #[tokio::test]
1664 async fn deserializes_payload_limit_keys() {
1665 let cfg = ConfigurationLoader::default()
1666 .with_key_aliases(KEY_ALIASES)
1667 .add_providers([figment::providers::Serialized::defaults(json!({
1668 "serializer_max_payload_size": 4321,
1669 "serializer_max_uncompressed_payload_size": 8765,
1670 "serializer_max_series_payload_size": 1234,
1671 "serializer_max_series_uncompressed_payload_size": 5678,
1672 }))])
1673 .into_generic()
1674 .await
1675 .expect("config should load");
1676 let parsed: DatadogMetricsConfiguration = cfg.as_typed().expect("should deserialize");
1677
1678 assert_eq!(parsed.max_payload_size, 4321);
1679 assert_eq!(parsed.max_uncompressed_payload_size, 8765);
1680 assert_eq!(parsed.max_series_payload_size, 1234);
1681 assert_eq!(parsed.max_series_uncompressed_payload_size, 5678);
1682 }
1683
1684 #[test]
1685 fn clamps_series_payload_limit_keys_to_api_limits() {
1686 let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
1687 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT + 1,
1688 SERIES_V2_COMPRESSED_SIZE_LIMIT + 1,
1689 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
1690 SERIES_V2_COMPRESSED_SIZE_LIMIT,
1691 );
1692 assert_eq!(uncompressed_limit, SERIES_V2_UNCOMPRESSED_SIZE_LIMIT);
1693 assert_eq!(compressed_limit, SERIES_V2_COMPRESSED_SIZE_LIMIT);
1694
1695 let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
1696 5678,
1697 1234,
1698 SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
1699 SERIES_V2_COMPRESSED_SIZE_LIMIT,
1700 );
1701 assert_eq!(uncompressed_limit, 5678);
1702 assert_eq!(compressed_limit, 1234);
1703 }
1704}