1use std::{num::NonZeroU64, time::Duration};
2
3use async_trait::async_trait;
4use datadog_protos::metrics as proto;
5use ddsketch::DDSketch;
6use facet::Facet;
7use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use protobuf::{rt::WireType, CodedOutputStream, Enum as _};
10use saluki_common::{iter::ReusableDeduplicator, task::HandleExt as _};
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::{SharedTagSet, Tag};
13use saluki_core::{
14 components::{encoders::*, ComponentContext},
15 data_model::{
16 event::{
17 metric::{Metric, MetricOrigin, MetricValues},
18 EventType,
19 },
20 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
21 },
22 observability::ComponentMetricsExt as _,
23 topology::{EventsBuffer, PayloadsBuffer},
24};
25use saluki_error::{generic_error, ErrorContext as _, GenericError};
26use saluki_io::compression::CompressionScheme;
27use saluki_metrics::MetricsBuilder;
28use serde::Deserialize;
29use tokio::{select, sync::mpsc, time::sleep};
30use tracing::{debug, error, warn};
31
32use crate::common::datadog::{
33 io::RB_BUFFER_CHUNK_SIZE,
34 request_builder::{EndpointEncoder, RequestBuilder},
35 telemetry::ComponentTelemetry,
36 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
37};
38
39const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
43
44const RESOURCES_TYPE_FIELD_NUMBER: u32 = 1;
48const RESOURCES_NAME_FIELD_NUMBER: u32 = 2;
49
50const METADATA_ORIGIN_FIELD_NUMBER: u32 = 1;
51
52const ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER: u32 = 4;
53const ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER: u32 = 5;
54const ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER: u32 = 6;
55
56const METRIC_POINT_VALUE_FIELD_NUMBER: u32 = 1;
57const METRIC_POINT_TIMESTAMP_FIELD_NUMBER: u32 = 2;
58
59const DOGSKETCH_TS_FIELD_NUMBER: u32 = 1;
60const DOGSKETCH_CNT_FIELD_NUMBER: u32 = 2;
61const DOGSKETCH_MIN_FIELD_NUMBER: u32 = 3;
62const DOGSKETCH_MAX_FIELD_NUMBER: u32 = 4;
63const DOGSKETCH_AVG_FIELD_NUMBER: u32 = 5;
64const DOGSKETCH_SUM_FIELD_NUMBER: u32 = 6;
65const DOGSKETCH_K_FIELD_NUMBER: u32 = 7;
66const DOGSKETCH_N_FIELD_NUMBER: u32 = 8;
67
68const SERIES_RESOURCES_FIELD_NUMBER: u32 = 1;
69const SERIES_METRIC_FIELD_NUMBER: u32 = 2;
70const SERIES_TAGS_FIELD_NUMBER: u32 = 3;
71const SERIES_POINTS_FIELD_NUMBER: u32 = 4;
72const SERIES_TYPE_FIELD_NUMBER: u32 = 5;
73const SERIES_UNIT_FIELD_NUMBER: u32 = 6;
74const SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 7;
75const SERIES_INTERVAL_FIELD_NUMBER: u32 = 8;
76const SERIES_METADATA_FIELD_NUMBER: u32 = 9;
77
78const SKETCH_METRIC_FIELD_NUMBER: u32 = 1;
79const SKETCH_HOST_FIELD_NUMBER: u32 = 2;
80const SKETCH_TAGS_FIELD_NUMBER: u32 = 4;
81const SKETCH_DOGSKETCHES_FIELD_NUMBER: u32 = 7;
82const SKETCH_METADATA_FIELD_NUMBER: u32 = 8;
83
84static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
85
86const fn default_max_metrics_per_payload() -> usize {
87 10_000
88}
89
90const fn default_flush_timeout_secs() -> u64 {
91 2
92}
93
94fn default_serializer_compressor_kind() -> String {
95 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
96}
97
98const fn default_zstd_compressor_level() -> i32 {
99 3
100}
101
102#[derive(Clone, Deserialize, Facet)]
106#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
107#[allow(dead_code)]
108pub struct DatadogMetricsConfiguration {
109 #[serde(
115 rename = "serializer_max_metrics_per_payload",
116 default = "default_max_metrics_per_payload"
117 )]
118 max_metrics_per_payload: usize,
119
120 #[serde(default = "default_flush_timeout_secs")]
130 flush_timeout_secs: u64,
131
132 #[serde(
136 rename = "serializer_compressor_kind",
137 default = "default_serializer_compressor_kind"
138 )]
139 compressor_kind: String,
140
141 #[serde(
145 rename = "serializer_zstd_compressor_level",
146 default = "default_zstd_compressor_level"
147 )]
148 zstd_compressor_level: i32,
149
150 #[serde(default, skip)]
152 #[facet(opaque)]
153 additional_tags: Option<SharedTagSet>,
154}
155
156impl DatadogMetricsConfiguration {
157 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
159 Ok(config.as_typed()?)
160 }
161
162 pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
164 self.additional_tags = Some(additional_tags);
166 self
167 }
168}
169
170#[async_trait]
171impl EncoderBuilder for DatadogMetricsConfiguration {
172 fn input_event_type(&self) -> EventType {
173 EventType::Metric
174 }
175
176 fn output_payload_type(&self) -> PayloadType {
177 PayloadType::Http
178 }
179
180 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
181 let metrics_builder = MetricsBuilder::from_component_context(&context);
182 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
183 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
184
185 let mut series_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Series);
187 let mut sketches_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
188
189 if let Some(additional_tags) = self.additional_tags.as_ref() {
190 series_encoder = series_encoder.with_additional_tags(additional_tags.clone());
191 sketches_encoder = sketches_encoder.with_additional_tags(additional_tags.clone());
192 }
193
194 let mut series_rb = RequestBuilder::new(series_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
195 series_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
196
197 let mut sketches_rb = RequestBuilder::new(sketches_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
198 sketches_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
199
200 let flush_timeout = match self.flush_timeout_secs {
201 0 => Duration::from_millis(10),
204 secs => Duration::from_secs(secs),
205 };
206
207 Ok(Box::new(DatadogMetrics {
208 series_rb,
209 sketches_rb,
210 telemetry,
211 flush_timeout,
212 }))
213 }
214}
215
216impl MemoryBounds for DatadogMetricsConfiguration {
217 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
218 builder
226 .minimum()
227 .with_single_value::<DatadogMetrics>("component struct")
228 .with_array::<EventsBuffer>("request builder events channel", 8)
229 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
230
231 builder
232 .firm()
233 .with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
236 .with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
237 }
238}
239
240pub struct DatadogMetrics {
241 series_rb: RequestBuilder<MetricsEndpointEncoder>,
242 sketches_rb: RequestBuilder<MetricsEndpointEncoder>,
243 telemetry: ComponentTelemetry,
244 flush_timeout: Duration,
245}
246
247#[async_trait]
248impl Encoder for DatadogMetrics {
249 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
250 let Self {
251 series_rb,
252 sketches_rb,
253 telemetry,
254 flush_timeout,
255 } = *self;
256
257 let mut health = context.take_health_handle();
258
259 let (events_tx, events_rx) = mpsc::channel(8);
261 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
262 let request_builder_fut =
263 run_request_builder(series_rb, sketches_rb, telemetry, events_rx, payloads_tx, flush_timeout);
264 let request_builder_handle = context
265 .topology_context()
266 .global_thread_pool()
267 .spawn_traced_named("dd-metrics-request-builder", request_builder_fut);
268
269 health.mark_ready();
270 debug!("Datadog Metrics encoder started.");
271
272 loop {
273 select! {
274 biased;
275
276 _ = health.live() => continue,
277 maybe_payload = payloads_rx.recv() => match maybe_payload {
278 Some(payload) => {
279 if let Err(e) = context.dispatcher().dispatch(payload).await {
280 error!("Failed to dispatch payload: {}", e);
281 }
282 }
283 None => break,
284 },
285 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
286 Some(event_buffer) => events_tx.send(event_buffer).await
287 .error_context("Failed to send event buffer to request builder task.")?,
288 None => break,
289 },
290 }
291 }
292
293 drop(events_tx);
295
296 while let Some(payload) = payloads_rx.recv().await {
298 if let Err(e) = context.dispatcher().dispatch(payload).await {
299 error!("Failed to dispatch payload: {}", e);
300 }
301 }
302
303 match request_builder_handle.await {
305 Ok(Ok(())) => debug!("Request builder task stopped."),
306 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
307 Err(e) => error!(error = %e, "Request builder task panicked."),
308 }
309
310 debug!("Datadog Metrics encoder stopped.");
311
312 Ok(())
313 }
314}
315
316async fn run_request_builder(
317 mut series_request_builder: RequestBuilder<MetricsEndpointEncoder>,
318 mut sketches_request_builder: RequestBuilder<MetricsEndpointEncoder>, telemetry: ComponentTelemetry,
319 mut events_rx: mpsc::Receiver<EventsBuffer>, payloads_tx: mpsc::Sender<PayloadsBuffer>, flush_timeout: Duration,
320) -> Result<(), GenericError> {
321 let mut pending_flush = false;
322 let pending_flush_timeout = sleep(flush_timeout);
323 tokio::pin!(pending_flush_timeout);
324
325 loop {
326 select! {
327 Some(event_buffer) = events_rx.recv() => {
328 for event in event_buffer {
329 let metric = match event.try_into_metric() {
330 Some(metric) => metric,
331 None => continue,
332 };
333
334 let request_builder = match MetricsEndpoint::from_metric(&metric) {
335 MetricsEndpoint::Series => &mut series_request_builder,
336 MetricsEndpoint::Sketches => &mut sketches_request_builder,
337 };
338
339 let metric_to_retry = match request_builder.encode(metric).await {
343 Ok(None) => continue,
344 Ok(Some(metric)) => metric,
345 Err(e) => {
346 error!(error = %e, "Failed to encode metric.");
347 telemetry.events_dropped_encoder().increment(1);
348 continue;
349 }
350 };
351
352
353 let maybe_requests = request_builder.flush().await;
354 if maybe_requests.is_empty() {
355 panic!("builder told us to flush, but gave us nothing");
356 }
357
358 for maybe_request in maybe_requests {
359 match maybe_request {
360 Ok((events, request)) => {
361 let payload_meta = PayloadMetadata::from_event_count(events);
362 let http_payload = HttpPayload::new(payload_meta, request);
363 let payload = Payload::Http(http_payload);
364
365 payloads_tx.send(payload).await
366 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
367 },
368
369 Err(e) => if e.is_recoverable() {
371 continue;
373 } else {
374 return Err(GenericError::from(e).context("Failed to flush request."));
375 }
376 }
377 }
378
379 if let Err(e) = request_builder.encode(metric_to_retry).await {
383 error!(error = %e, "Failed to encode metric.");
384 telemetry.events_dropped_encoder().increment(1);
385 }
386 }
387
388 debug!("Processed event buffer.");
389
390 if !pending_flush {
392 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
393 pending_flush = true;
394 }
395 },
396 _ = &mut pending_flush_timeout, if pending_flush => {
397 debug!("Flushing pending request(s).");
398
399 pending_flush = false;
400
401 let maybe_series_requests = series_request_builder.flush().await;
404 for maybe_request in maybe_series_requests {
405 match maybe_request {
406 Ok((events, request)) => {
407 let payload_meta = PayloadMetadata::from_event_count(events);
408 let http_payload = HttpPayload::new(payload_meta, request);
409 let payload = Payload::Http(http_payload);
410
411 payloads_tx.send(payload).await
412 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
413 },
414
415 Err(e) => if e.is_recoverable() {
417 continue;
419 } else {
420 return Err(GenericError::from(e).context("Failed to flush request."));
421 }
422 }
423 }
424
425 let maybe_sketches_requests = sketches_request_builder.flush().await;
426 for maybe_request in maybe_sketches_requests {
427 match maybe_request {
428 Ok((events, request)) => {
429 let payload_meta = PayloadMetadata::from_event_count(events);
430 let http_payload = HttpPayload::new(payload_meta, request);
431 let payload = Payload::Http(http_payload);
432
433 payloads_tx.send(payload).await
434 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
435 },
436
437 Err(e) => if e.is_recoverable() {
439 continue;
441 } else {
442 return Err(GenericError::from(e).context("Failed to flush request."));
443 }
444 }
445 }
446
447 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
448 },
449
450 else => break,
452 }
453 }
454
455 Ok(())
456}
457
458#[derive(Clone, Copy, Debug, Eq, PartialEq)]
460enum MetricsEndpoint {
461 Series,
465
466 Sketches,
470}
471
472impl MetricsEndpoint {
473 pub fn from_metric(metric: &Metric) -> Self {
475 match metric.values() {
476 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
477 Self::Series
478 }
479 MetricValues::Histogram(..) | MetricValues::Distribution(..) => Self::Sketches,
480 }
481 }
482}
483
484#[derive(Debug)]
485struct MetricsEndpointEncoder {
486 endpoint: MetricsEndpoint,
487 primary_scratch_buf: Vec<u8>,
488 secondary_scratch_buf: Vec<u8>,
489 packed_scratch_buf: Vec<u8>,
490 additional_tags: SharedTagSet,
491 tags_deduplicator: ReusableDeduplicator<Tag>,
492}
493
494impl MetricsEndpointEncoder {
495 pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self {
497 Self {
498 endpoint,
499 primary_scratch_buf: Vec::new(),
500 secondary_scratch_buf: Vec::new(),
501 packed_scratch_buf: Vec::new(),
502 additional_tags: SharedTagSet::default(),
503 tags_deduplicator: ReusableDeduplicator::new(),
504 }
505 }
506
507 pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
513 self.additional_tags = additional_tags;
514 self
515 }
516}
517
518impl EndpointEncoder for MetricsEndpointEncoder {
519 type Input = Metric;
520 type EncodeError = protobuf::Error;
521
522 fn encoder_name() -> &'static str {
523 "metrics"
524 }
525
526 fn compressed_size_limit(&self) -> usize {
527 match self.endpoint {
528 MetricsEndpoint::Series => SERIES_V2_COMPRESSED_SIZE_LIMIT,
529 MetricsEndpoint::Sketches => DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
530 }
531 }
532
533 fn uncompressed_size_limit(&self) -> usize {
534 match self.endpoint {
535 MetricsEndpoint::Series => SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
536 MetricsEndpoint::Sketches => DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
537 }
538 }
539
540 fn is_valid_input(&self, input: &Self::Input) -> bool {
541 let input_endpoint = MetricsEndpoint::from_metric(input);
542 input_endpoint == self.endpoint
543 }
544
545 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
546 encode_single_metric(
566 input,
567 &self.additional_tags,
568 buffer,
569 &mut self.primary_scratch_buf,
570 &mut self.secondary_scratch_buf,
571 &mut self.packed_scratch_buf,
572 &mut self.tags_deduplicator,
573 )?;
574
575 Ok(())
576 }
577
578 fn endpoint_uri(&self) -> Uri {
579 match self.endpoint {
580 MetricsEndpoint::Series => PathAndQuery::from_static("/api/v2/series").into(),
581 MetricsEndpoint::Sketches => PathAndQuery::from_static("/api/beta/sketches").into(),
582 }
583 }
584
585 fn endpoint_method(&self) -> Method {
586 Method::POST
588 }
589
590 fn content_type(&self) -> HeaderValue {
591 CONTENT_TYPE_PROTOBUF.clone()
593 }
594}
595
596fn field_number_for_metric_type(metric: &Metric) -> u32 {
597 match metric.values() {
598 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => 1,
599 MetricValues::Histogram(..) | MetricValues::Distribution(..) => 1,
600 }
601}
602
603fn get_message_size(raw_msg_size: usize) -> Result<u32, protobuf::Error> {
604 const MAX_MESSAGE_SIZE: u64 = i32::MAX as u64;
605
606 if raw_msg_size as u64 > MAX_MESSAGE_SIZE {
608 return Err(std::io::Error::other("message size exceeds limit (2147483648 bytes)").into());
609 }
610
611 Ok(raw_msg_size as u32)
612}
613
614fn get_message_size_from_buffer(buf: &[u8]) -> Result<u32, protobuf::Error> {
615 get_message_size(buf.len())
616}
617
618fn encode_single_metric(
619 metric: &Metric, additional_tags: &SharedTagSet, output_buf: &mut Vec<u8>, primary_scratch_buf: &mut Vec<u8>,
620 secondary_scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
621 tags_deduplicator: &mut ReusableDeduplicator<Tag>,
622) -> Result<(), protobuf::Error> {
623 let mut output_stream = CodedOutputStream::vec(output_buf);
624 let field_number = field_number_for_metric_type(metric);
625
626 write_nested_message(&mut output_stream, primary_scratch_buf, field_number, |os| {
627 match metric.values() {
629 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
630 encode_series_metric(metric, additional_tags, os, secondary_scratch_buf, tags_deduplicator)
631 }
632 MetricValues::Histogram(..) | MetricValues::Distribution(..) => encode_sketch_metric(
633 metric,
634 additional_tags,
635 os,
636 secondary_scratch_buf,
637 packed_scratch_buf,
638 tags_deduplicator,
639 ),
640 }
641 })
642}
643
644fn encode_series_metric(
645 metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
646 scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
647) -> Result<(), protobuf::Error> {
648 output_stream.write_string(SERIES_METRIC_FIELD_NUMBER, metric.context().name())?;
650
651 let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
652 write_series_tags(deduplicated_tags, output_stream, scratch_buf)?;
653
654 write_resource(
656 output_stream,
657 scratch_buf,
658 "host",
659 metric.metadata().hostname().unwrap_or_default(),
660 )?;
661
662 if let Some(origin) = metric.metadata().origin() {
664 match origin {
665 MetricOrigin::SourceType(source_type) => {
666 output_stream.write_string(SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER, source_type.as_ref())?;
667 }
668 MetricOrigin::OriginMetadata {
669 product,
670 subproduct,
671 product_detail,
672 } => {
673 write_origin_metadata(
674 output_stream,
675 scratch_buf,
676 SERIES_METADATA_FIELD_NUMBER,
677 *product,
678 *subproduct,
679 *product_detail,
680 )?;
681 }
682 }
683 }
684
685 let (metric_type, points, maybe_interval) = match metric.values() {
687 MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
688 MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
689 MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
690 MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
691 _ => unreachable!(),
692 };
693
694 output_stream.write_enum(SERIES_TYPE_FIELD_NUMBER, metric_type.value())?;
695
696 if let Some(unit) = metric.metadata().unit() {
697 output_stream.write_string(SERIES_UNIT_FIELD_NUMBER, unit)?;
698 }
699
700 for (timestamp, value) in points {
701 let value = maybe_interval
703 .map(|interval| value / interval.as_secs_f64())
704 .unwrap_or(value);
705 let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
706
707 write_point(output_stream, scratch_buf, value, timestamp)?;
708 }
709
710 if let Some(interval) = maybe_interval {
711 output_stream.write_int64(SERIES_INTERVAL_FIELD_NUMBER, interval.as_secs() as i64)?;
712 }
713
714 Ok(())
715}
716
717fn encode_sketch_metric(
718 metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
719 scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
720) -> Result<(), protobuf::Error> {
721 output_stream.write_string(SKETCH_METRIC_FIELD_NUMBER, metric.context().name())?;
723
724 let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
725 write_sketch_tags(deduplicated_tags, output_stream, scratch_buf)?;
726
727 output_stream.write_string(
729 SKETCH_HOST_FIELD_NUMBER,
730 metric.metadata().hostname().unwrap_or_default(),
731 )?;
732
733 if let Some(MetricOrigin::OriginMetadata {
735 product,
736 subproduct,
737 product_detail,
738 }) = metric.metadata().origin()
739 {
740 write_origin_metadata(
741 output_stream,
742 scratch_buf,
743 SKETCH_METADATA_FIELD_NUMBER,
744 *product,
745 *subproduct,
746 *product_detail,
747 )?;
748 }
749
750 match metric.values() {
755 MetricValues::Distribution(sketches) => {
756 for (timestamp, value) in sketches {
757 write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, value)?;
758 }
759 }
760 MetricValues::Histogram(points) => {
761 for (timestamp, histogram) in points {
762 let mut ddsketch = DDSketch::default();
764 for sample in histogram.samples() {
765 ddsketch.insert_n(sample.value.into_inner(), sample.weight);
766 }
767
768 write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
769 }
770 }
771 _ => unreachable!(),
772 }
773
774 Ok(())
775}
776
777fn write_resource(
778 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, resource_type: &str, resource_name: &str,
779) -> Result<(), protobuf::Error> {
780 write_nested_message(output_stream, scratch_buf, SERIES_RESOURCES_FIELD_NUMBER, |os| {
781 os.write_string(RESOURCES_TYPE_FIELD_NUMBER, resource_type)?;
782 os.write_string(RESOURCES_NAME_FIELD_NUMBER, resource_name)
783 })
784}
785
786fn write_origin_metadata(
787 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, origin_product: u32,
788 origin_category: u32, origin_service: u32,
789) -> Result<(), protobuf::Error> {
790 scratch_buf.clear();
793
794 {
795 let mut origin_output_stream = CodedOutputStream::vec(scratch_buf);
796 origin_output_stream.write_uint32(ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER, origin_product)?;
797 origin_output_stream.write_uint32(ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER, origin_category)?;
798 origin_output_stream.write_uint32(ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER, origin_service)?;
799 origin_output_stream.flush()?;
800 }
801
802 let origin_message_size = get_message_size_from_buffer(scratch_buf)?;
807
808 let mut metadata_preamble_buf = [0; 64];
809 let metadata_preamble_len = {
810 let mut metadata_output_stream = CodedOutputStream::bytes(&mut metadata_preamble_buf[..]);
811 metadata_output_stream.write_tag(METADATA_ORIGIN_FIELD_NUMBER, WireType::LengthDelimited)?;
812 metadata_output_stream.write_raw_varint32(origin_message_size)?;
813 metadata_output_stream.flush()?;
814 metadata_output_stream.total_bytes_written() as usize
815 };
816
817 let metadata_message_size = get_message_size(scratch_buf.len() + metadata_preamble_len)?;
818
819 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
820 output_stream.write_raw_varint32(metadata_message_size)?;
821 output_stream.write_raw_bytes(&metadata_preamble_buf[..metadata_preamble_len])?;
822 output_stream.write_raw_bytes(scratch_buf)
823}
824
825fn write_point(
826 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, value: f64, timestamp: i64,
827) -> Result<(), protobuf::Error> {
828 write_nested_message(output_stream, scratch_buf, SERIES_POINTS_FIELD_NUMBER, |os| {
829 os.write_double(METRIC_POINT_VALUE_FIELD_NUMBER, value)?;
830 os.write_int64(METRIC_POINT_TIMESTAMP_FIELD_NUMBER, timestamp)
831 })
832}
833
834fn write_dogsketch(
835 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
836 timestamp: Option<NonZeroU64>, sketch: &DDSketch,
837) -> Result<(), protobuf::Error> {
838 if sketch.is_empty() {
840 warn!("Attempted to write an empty sketch to sketches payload, skipping.");
841 return Ok(());
842 }
843
844 write_nested_message(output_stream, scratch_buf, SKETCH_DOGSKETCHES_FIELD_NUMBER, |os| {
845 os.write_int64(DOGSKETCH_TS_FIELD_NUMBER, timestamp.map_or(0, |ts| ts.get() as i64))?;
846 os.write_int64(DOGSKETCH_CNT_FIELD_NUMBER, sketch.count() as i64)?;
847 os.write_double(DOGSKETCH_MIN_FIELD_NUMBER, sketch.min().unwrap())?;
848 os.write_double(DOGSKETCH_MAX_FIELD_NUMBER, sketch.max().unwrap())?;
849 os.write_double(DOGSKETCH_AVG_FIELD_NUMBER, sketch.avg().unwrap())?;
850 os.write_double(DOGSKETCH_SUM_FIELD_NUMBER, sketch.sum().unwrap())?;
851
852 let bin_keys = sketch.bins().iter().map(|bin| bin.key());
853 write_repeated_packed_from_iter(
854 os,
855 packed_scratch_buf,
856 DOGSKETCH_K_FIELD_NUMBER,
857 bin_keys,
858 |inner_os, value| inner_os.write_sint32_no_tag(value),
859 )?;
860
861 let bin_counts = sketch.bins().iter().map(|bin| bin.count());
862 write_repeated_packed_from_iter(
863 os,
864 packed_scratch_buf,
865 DOGSKETCH_N_FIELD_NUMBER,
866 bin_counts,
867 |inner_os, value| inner_os.write_uint32_no_tag(value),
868 )
869 })
870}
871
872fn get_deduplicated_tags<'a>(
873 metric: &'a Metric, additional_tags: &'a SharedTagSet, tags_deduplicator: &'a mut ReusableDeduplicator<Tag>,
874) -> impl Iterator<Item = &'a Tag> {
875 let chained_tags = metric
876 .context()
877 .tags()
878 .into_iter()
879 .chain(additional_tags)
880 .chain(metric.context().origin_tags());
881
882 tags_deduplicator.deduplicated(chained_tags)
883}
884
885fn write_tags<'a, I, F>(
886 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, tag_encoder: F,
887) -> Result<(), protobuf::Error>
888where
889 I: Iterator<Item = &'a Tag>,
890 F: Fn(&Tag, &mut CodedOutputStream<'_>, &mut Vec<u8>) -> Result<(), protobuf::Error>,
891{
892 for tag in tags {
893 tag_encoder(tag, output_stream, scratch_buf)?;
894 }
895
896 Ok(())
897}
898
899fn write_series_tags<'a, I>(
900 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
901) -> Result<(), protobuf::Error>
902where
903 I: Iterator<Item = &'a Tag>,
904{
905 write_tags(tags, output_stream, scratch_buf, |tag, os, buf| {
906 if tag.name() == "dd.internal.resource" {
908 if let Some((resource_type, resource_name)) = tag.value().and_then(|s| s.split_once(':')) {
909 write_resource(os, buf, resource_type, resource_name)
910 } else {
911 Ok(())
912 }
913 } else {
914 os.write_string(SERIES_TAGS_FIELD_NUMBER, tag.as_str())
916 }
917 })
918}
919
920fn write_sketch_tags<'a, I>(
921 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
922) -> Result<(), protobuf::Error>
923where
924 I: Iterator<Item = &'a Tag>,
925{
926 write_tags(tags, output_stream, scratch_buf, |tag, os, _buf| {
927 os.write_string(SKETCH_TAGS_FIELD_NUMBER, tag.as_str())
929 })
930}
931
932fn write_nested_message<F>(
933 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, writer: F,
934) -> Result<(), protobuf::Error>
935where
936 F: FnOnce(&mut CodedOutputStream<'_>) -> Result<(), protobuf::Error>,
937{
938 scratch_buf.clear();
939
940 {
941 let mut nested_output_stream = CodedOutputStream::vec(scratch_buf);
942 writer(&mut nested_output_stream)?;
943 nested_output_stream.flush()?;
944 }
945
946 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
947
948 let nested_message_size = get_message_size_from_buffer(scratch_buf)?;
949 output_stream.write_raw_varint32(nested_message_size)?;
950 output_stream.write_raw_bytes(scratch_buf)
951}
952
953fn write_repeated_packed_from_iter<I, T, F>(
954 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, values: I, writer: F,
955) -> Result<(), protobuf::Error>
956where
957 I: Iterator<Item = T>,
958 F: Fn(&mut CodedOutputStream<'_>, T) -> Result<(), protobuf::Error>,
959{
960 scratch_buf.clear();
969
970 {
971 let mut packed_output_stream = CodedOutputStream::vec(scratch_buf);
972 for value in values {
973 writer(&mut packed_output_stream, value)?;
974 }
975 packed_output_stream.flush()?;
976 }
977
978 let data_size = get_message_size_from_buffer(scratch_buf)?;
979
980 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
981 output_stream.write_raw_varint32(data_size)?;
982 output_stream.write_raw_bytes(scratch_buf)
983}
984
985#[cfg(test)]
986mod tests {
987 use std::time::Duration;
988
989 use protobuf::CodedOutputStream;
990 use saluki_common::iter::ReusableDeduplicator;
991 use saluki_context::{tags::SharedTagSet, Context};
992 use saluki_core::data_model::event::metric::{Metric, MetricMetadata, MetricValues};
993 use stringtheory::MetaString;
994
995 use super::{encode_series_metric, encode_sketch_metric, MetricsEndpoint, MetricsEndpointEncoder};
996 use crate::common::datadog::request_builder::EndpointEncoder as _;
997
998 #[test]
999 fn histogram_vs_sketch_identical_payload() {
1000 let samples = &[1.0, 2.0, 3.0, 4.0, 5.0];
1006 let histogram = Metric::histogram("simple_samples", samples);
1007 let distribution = Metric::distribution("simple_samples", samples);
1008 let host_tags = SharedTagSet::default();
1009
1010 let mut buf1 = Vec::new();
1011 let mut buf2 = Vec::new();
1012 let mut tags_deduplicator = ReusableDeduplicator::new();
1013
1014 let mut histogram_payload = Vec::new();
1015 {
1016 let mut histogram_writer = CodedOutputStream::vec(&mut histogram_payload);
1017 encode_sketch_metric(
1018 &histogram,
1019 &host_tags,
1020 &mut histogram_writer,
1021 &mut buf1,
1022 &mut buf2,
1023 &mut tags_deduplicator,
1024 )
1025 .expect("Failed to encode histogram as sketch");
1026 }
1027
1028 let mut distribution_payload = Vec::new();
1029 {
1030 let mut distribution_writer = CodedOutputStream::vec(&mut distribution_payload);
1031 encode_sketch_metric(
1032 &distribution,
1033 &host_tags,
1034 &mut distribution_writer,
1035 &mut buf1,
1036 &mut buf2,
1037 &mut tags_deduplicator,
1038 )
1039 .expect("Failed to encode distribution as sketch");
1040 }
1041
1042 assert_eq!(histogram_payload, distribution_payload);
1043 }
1044
1045 #[test]
1046 fn input_valid() {
1047 let counter = Metric::counter("counter", 1.0);
1050 let rate = Metric::rate("rate", 1.0, Duration::from_secs(1));
1051 let gauge = Metric::gauge("gauge", 1.0);
1052 let set = Metric::set("set", "foo");
1053 let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1054 let distribution = Metric::distribution("distribution", [1.0, 2.0, 3.0]);
1055
1056 let series_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Series);
1057 let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1058
1059 assert!(series_endpoint.is_valid_input(&counter));
1060 assert!(series_endpoint.is_valid_input(&rate));
1061 assert!(series_endpoint.is_valid_input(&gauge));
1062 assert!(series_endpoint.is_valid_input(&set));
1063 assert!(!series_endpoint.is_valid_input(&histogram));
1064 assert!(!series_endpoint.is_valid_input(&distribution));
1065
1066 assert!(!sketches_endpoint.is_valid_input(&counter));
1067 assert!(!sketches_endpoint.is_valid_input(&rate));
1068 assert!(!sketches_endpoint.is_valid_input(&gauge));
1069 assert!(!sketches_endpoint.is_valid_input(&set));
1070 assert!(sketches_endpoint.is_valid_input(&histogram));
1071 assert!(sketches_endpoint.is_valid_input(&distribution));
1072 }
1073
1074 #[test]
1075 fn series_metric_unit_encoded() {
1076 let context = Context::from_static_parts("my.timer.avg", &[]);
1082 let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1083 let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
1084
1085 let host_tags = SharedTagSet::default();
1086 let mut scratch_buf = Vec::new();
1087 let mut tags_deduplicator = ReusableDeduplicator::new();
1088
1089 let mut payload = Vec::new();
1090 {
1091 let mut writer = CodedOutputStream::vec(&mut payload);
1092 encode_series_metric(
1093 &gauge,
1094 &host_tags,
1095 &mut writer,
1096 &mut scratch_buf,
1097 &mut tags_deduplicator,
1098 )
1099 .expect("Failed to encode gauge as series metric");
1100 writer.flush().expect("Failed to flush");
1101 }
1102
1103 let expected_tag: u8 = (6 << 3) | 2; let expected_value = b"millisecond";
1107
1108 let tag_pos = payload
1109 .windows(1 + 1 + expected_value.len())
1110 .position(|w| w[0] == expected_tag && w[1] == expected_value.len() as u8 && &w[2..] == expected_value);
1111
1112 assert!(
1113 tag_pos.is_some(),
1114 "series payload should contain unit field (field 6 = 'millisecond'), got bytes: {:?}",
1115 payload
1116 );
1117 }
1118}
1119
1120#[cfg(test)]
1121mod config_smoke {
1122 use serde_json::json;
1123
1124 use super::DatadogMetricsConfiguration;
1125 use crate::config_registry::structs;
1126 use crate::config_registry::test_support::run_config_smoke_tests;
1127
1128 #[tokio::test]
1129 async fn smoke_test() {
1130 run_config_smoke_tests(structs::DATADOG_METRICS_CONFIGURATION, &[], json!({}), |cfg| {
1131 cfg.as_typed::<DatadogMetricsConfiguration>()
1132 .expect("DatadogMetricsConfiguration should deserialize")
1133 })
1134 .await
1135 }
1136}