1use std::{num::NonZeroU64, time::Duration};
2
3use async_trait::async_trait;
4use datadog_protos::metrics as proto;
5use ddsketch_agent::DDSketch;
6use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
8use protobuf::{rt::WireType, CodedOutputStream, Enum as _};
9use saluki_common::{iter::ReusableDeduplicator, task::HandleExt as _};
10use saluki_config::GenericConfiguration;
11use saluki_context::tags::{SharedTagSet, Tag};
12use saluki_core::{
13 components::{encoders::*, ComponentContext},
14 data_model::{
15 event::{
16 metric::{Metric, MetricOrigin, MetricValues},
17 EventType,
18 },
19 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
20 },
21 observability::ComponentMetricsExt as _,
22 topology::{EventsBuffer, PayloadsBuffer},
23};
24use saluki_error::{generic_error, ErrorContext as _, GenericError};
25use saluki_io::compression::CompressionScheme;
26use saluki_metrics::MetricsBuilder;
27use serde::Deserialize;
28use tokio::{select, sync::mpsc, time::sleep};
29use tracing::{debug, error, warn};
30
31use crate::common::datadog::{
32 io::RB_BUFFER_CHUNK_SIZE,
33 request_builder::{EndpointEncoder, RequestBuilder},
34 telemetry::ComponentTelemetry,
35 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
36};
37
38const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
42
43const RESOURCES_TYPE_FIELD_NUMBER: u32 = 1;
47const RESOURCES_NAME_FIELD_NUMBER: u32 = 2;
48
49const METADATA_ORIGIN_FIELD_NUMBER: u32 = 1;
50
51const ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER: u32 = 4;
52const ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER: u32 = 5;
53const ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER: u32 = 6;
54
55const METRIC_POINT_VALUE_FIELD_NUMBER: u32 = 1;
56const METRIC_POINT_TIMESTAMP_FIELD_NUMBER: u32 = 2;
57
58const DOGSKETCH_TS_FIELD_NUMBER: u32 = 1;
59const DOGSKETCH_CNT_FIELD_NUMBER: u32 = 2;
60const DOGSKETCH_MIN_FIELD_NUMBER: u32 = 3;
61const DOGSKETCH_MAX_FIELD_NUMBER: u32 = 4;
62const DOGSKETCH_AVG_FIELD_NUMBER: u32 = 5;
63const DOGSKETCH_SUM_FIELD_NUMBER: u32 = 6;
64const DOGSKETCH_K_FIELD_NUMBER: u32 = 7;
65const DOGSKETCH_N_FIELD_NUMBER: u32 = 8;
66
67const SERIES_RESOURCES_FIELD_NUMBER: u32 = 1;
68const SERIES_METRIC_FIELD_NUMBER: u32 = 2;
69const SERIES_TAGS_FIELD_NUMBER: u32 = 3;
70const SERIES_POINTS_FIELD_NUMBER: u32 = 4;
71const SERIES_TYPE_FIELD_NUMBER: u32 = 5;
72const SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 7;
73const SERIES_INTERVAL_FIELD_NUMBER: u32 = 8;
74const SERIES_METADATA_FIELD_NUMBER: u32 = 9;
75
76const SKETCH_METRIC_FIELD_NUMBER: u32 = 1;
77const SKETCH_HOST_FIELD_NUMBER: u32 = 2;
78const SKETCH_TAGS_FIELD_NUMBER: u32 = 4;
79const SKETCH_DOGSKETCHES_FIELD_NUMBER: u32 = 7;
80const SKETCH_METADATA_FIELD_NUMBER: u32 = 8;
81
82static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
83
84const fn default_max_metrics_per_payload() -> usize {
85 10_000
86}
87
88const fn default_flush_timeout_secs() -> u64 {
89 2
90}
91
92fn default_serializer_compressor_kind() -> String {
93 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
94}
95
96const fn default_zstd_compressor_level() -> i32 {
97 3
98}
99
100#[derive(Clone, Deserialize)]
104#[allow(dead_code)]
105pub struct DatadogMetricsConfiguration {
106 #[serde(
112 rename = "serializer_max_metrics_per_payload",
113 default = "default_max_metrics_per_payload"
114 )]
115 max_metrics_per_payload: usize,
116
117 #[serde(default = "default_flush_timeout_secs")]
127 flush_timeout_secs: u64,
128
129 #[serde(
133 rename = "serializer_compressor_kind",
134 default = "default_serializer_compressor_kind"
135 )]
136 compressor_kind: String,
137
138 #[serde(
142 rename = "serializer_zstd_compressor_level",
143 default = "default_zstd_compressor_level"
144 )]
145 zstd_compressor_level: i32,
146
147 #[serde(default, skip)]
149 additional_tags: Option<SharedTagSet>,
150}
151
152impl DatadogMetricsConfiguration {
153 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
155 Ok(config.as_typed()?)
156 }
157
158 pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
160 self.additional_tags = Some(additional_tags);
162 self
163 }
164}
165
166#[async_trait]
167impl EncoderBuilder for DatadogMetricsConfiguration {
168 fn input_event_type(&self) -> EventType {
169 EventType::Metric
170 }
171
172 fn output_payload_type(&self) -> PayloadType {
173 PayloadType::Http
174 }
175
176 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
177 let metrics_builder = MetricsBuilder::from_component_context(&context);
178 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
179 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
180
181 let mut series_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Series);
183 let mut sketches_encoder = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
184
185 if let Some(additional_tags) = self.additional_tags.as_ref() {
186 series_encoder = series_encoder.with_additional_tags(additional_tags.clone());
187 sketches_encoder = sketches_encoder.with_additional_tags(additional_tags.clone());
188 }
189
190 let mut series_rb = RequestBuilder::new(series_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
191 series_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
192
193 let mut sketches_rb = RequestBuilder::new(sketches_encoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
194 sketches_rb.with_max_inputs_per_payload(self.max_metrics_per_payload);
195
196 let flush_timeout = match self.flush_timeout_secs {
197 0 => Duration::from_millis(10),
200 secs => Duration::from_secs(secs),
201 };
202
203 Ok(Box::new(DatadogMetrics {
204 series_rb,
205 sketches_rb,
206 telemetry,
207 flush_timeout,
208 }))
209 }
210}
211
212impl MemoryBounds for DatadogMetricsConfiguration {
213 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
214 builder
222 .minimum()
223 .with_single_value::<DatadogMetrics>("component struct")
224 .with_array::<EventsBuffer>("request builder events channel", 8)
225 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
226
227 builder
228 .firm()
229 .with_array::<Metric>("series metrics split re-encode buffer", self.max_metrics_per_payload)
232 .with_array::<Metric>("sketch metrics split re-encode buffer", self.max_metrics_per_payload);
233 }
234}
235
236pub struct DatadogMetrics {
237 series_rb: RequestBuilder<MetricsEndpointEncoder>,
238 sketches_rb: RequestBuilder<MetricsEndpointEncoder>,
239 telemetry: ComponentTelemetry,
240 flush_timeout: Duration,
241}
242
243#[async_trait]
244impl Encoder for DatadogMetrics {
245 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
246 let Self {
247 series_rb,
248 sketches_rb,
249 telemetry,
250 flush_timeout,
251 } = *self;
252
253 let mut health = context.take_health_handle();
254
255 let (events_tx, events_rx) = mpsc::channel(8);
257 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
258 let request_builder_fut =
259 run_request_builder(series_rb, sketches_rb, telemetry, events_rx, payloads_tx, flush_timeout);
260 let request_builder_handle = context
261 .topology_context()
262 .global_thread_pool()
263 .spawn_traced_named("dd-metrics-request-builder", request_builder_fut);
264
265 health.mark_ready();
266 debug!("Datadog Metrics encoder started.");
267
268 loop {
269 select! {
270 biased;
271
272 _ = health.live() => continue,
273 maybe_payload = payloads_rx.recv() => match maybe_payload {
274 Some(payload) => {
275 if let Err(e) = context.dispatcher().dispatch(payload).await {
276 error!("Failed to dispatch payload: {}", e);
277 }
278 }
279 None => break,
280 },
281 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
282 Some(event_buffer) => events_tx.send(event_buffer).await
283 .error_context("Failed to send event buffer to request builder task.")?,
284 None => break,
285 },
286 }
287 }
288
289 drop(events_tx);
291
292 while let Some(payload) = payloads_rx.recv().await {
294 if let Err(e) = context.dispatcher().dispatch(payload).await {
295 error!("Failed to dispatch payload: {}", e);
296 }
297 }
298
299 match request_builder_handle.await {
301 Ok(Ok(())) => debug!("Request builder task stopped."),
302 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
303 Err(e) => error!(error = %e, "Request builder task panicked."),
304 }
305
306 debug!("Datadog Metrics encoder stopped.");
307
308 Ok(())
309 }
310}
311
312async fn run_request_builder(
313 mut series_request_builder: RequestBuilder<MetricsEndpointEncoder>,
314 mut sketches_request_builder: RequestBuilder<MetricsEndpointEncoder>, telemetry: ComponentTelemetry,
315 mut events_rx: mpsc::Receiver<EventsBuffer>, payloads_tx: mpsc::Sender<PayloadsBuffer>, flush_timeout: Duration,
316) -> Result<(), GenericError> {
317 let mut pending_flush = false;
318 let pending_flush_timeout = sleep(flush_timeout);
319 tokio::pin!(pending_flush_timeout);
320
321 loop {
322 select! {
323 Some(event_buffer) = events_rx.recv() => {
324 for event in event_buffer {
325 let metric = match event.try_into_metric() {
326 Some(metric) => metric,
327 None => continue,
328 };
329
330 let request_builder = match MetricsEndpoint::from_metric(&metric) {
331 MetricsEndpoint::Series => &mut series_request_builder,
332 MetricsEndpoint::Sketches => &mut sketches_request_builder,
333 };
334
335 let metric_to_retry = match request_builder.encode(metric).await {
339 Ok(None) => continue,
340 Ok(Some(metric)) => metric,
341 Err(e) => {
342 error!(error = %e, "Failed to encode metric.");
343 telemetry.events_dropped_encoder().increment(1);
344 continue;
345 }
346 };
347
348
349 let maybe_requests = request_builder.flush().await;
350 if maybe_requests.is_empty() {
351 panic!("builder told us to flush, but gave us nothing");
352 }
353
354 for maybe_request in maybe_requests {
355 match maybe_request {
356 Ok((events, request)) => {
357 let payload_meta = PayloadMetadata::from_event_count(events);
358 let http_payload = HttpPayload::new(payload_meta, request);
359 let payload = Payload::Http(http_payload);
360
361 payloads_tx.send(payload).await
362 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
363 },
364
365 Err(e) => if e.is_recoverable() {
367 continue;
369 } else {
370 return Err(GenericError::from(e).context("Failed to flush request."));
371 }
372 }
373 }
374
375 if let Err(e) = request_builder.encode(metric_to_retry).await {
379 error!(error = %e, "Failed to encode metric.");
380 telemetry.events_dropped_encoder().increment(1);
381 }
382 }
383
384 debug!("Processed event buffer.");
385
386 if !pending_flush {
388 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
389 pending_flush = true;
390 }
391 },
392 _ = &mut pending_flush_timeout, if pending_flush => {
393 debug!("Flushing pending request(s).");
394
395 pending_flush = false;
396
397 let maybe_series_requests = series_request_builder.flush().await;
400 for maybe_request in maybe_series_requests {
401 match maybe_request {
402 Ok((events, request)) => {
403 let payload_meta = PayloadMetadata::from_event_count(events);
404 let http_payload = HttpPayload::new(payload_meta, request);
405 let payload = Payload::Http(http_payload);
406
407 payloads_tx.send(payload).await
408 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
409 },
410
411 Err(e) => if e.is_recoverable() {
413 continue;
415 } else {
416 return Err(GenericError::from(e).context("Failed to flush request."));
417 }
418 }
419 }
420
421 let maybe_sketches_requests = sketches_request_builder.flush().await;
422 for maybe_request in maybe_sketches_requests {
423 match maybe_request {
424 Ok((events, request)) => {
425 let payload_meta = PayloadMetadata::from_event_count(events);
426 let http_payload = HttpPayload::new(payload_meta, request);
427 let payload = Payload::Http(http_payload);
428
429 payloads_tx.send(payload).await
430 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
431 },
432
433 Err(e) => if e.is_recoverable() {
435 continue;
437 } else {
438 return Err(GenericError::from(e).context("Failed to flush request."));
439 }
440 }
441 }
442
443 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
444 },
445
446 else => break,
448 }
449 }
450
451 Ok(())
452}
453
454#[derive(Clone, Copy, Debug, Eq, PartialEq)]
456enum MetricsEndpoint {
457 Series,
461
462 Sketches,
466}
467
468impl MetricsEndpoint {
469 pub fn from_metric(metric: &Metric) -> Self {
471 match metric.values() {
472 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
473 Self::Series
474 }
475 MetricValues::Histogram(..) | MetricValues::Distribution(..) => Self::Sketches,
476 }
477 }
478}
479
480#[derive(Debug)]
481struct MetricsEndpointEncoder {
482 endpoint: MetricsEndpoint,
483 primary_scratch_buf: Vec<u8>,
484 secondary_scratch_buf: Vec<u8>,
485 packed_scratch_buf: Vec<u8>,
486 additional_tags: SharedTagSet,
487 tags_deduplicator: ReusableDeduplicator<Tag>,
488}
489
490impl MetricsEndpointEncoder {
491 pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self {
493 Self {
494 endpoint,
495 primary_scratch_buf: Vec::new(),
496 secondary_scratch_buf: Vec::new(),
497 packed_scratch_buf: Vec::new(),
498 additional_tags: SharedTagSet::default(),
499 tags_deduplicator: ReusableDeduplicator::new(),
500 }
501 }
502
503 pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self {
509 self.additional_tags = additional_tags;
510 self
511 }
512}
513
514impl EndpointEncoder for MetricsEndpointEncoder {
515 type Input = Metric;
516 type EncodeError = protobuf::Error;
517
518 fn encoder_name() -> &'static str {
519 "metrics"
520 }
521
522 fn compressed_size_limit(&self) -> usize {
523 match self.endpoint {
524 MetricsEndpoint::Series => SERIES_V2_COMPRESSED_SIZE_LIMIT,
525 MetricsEndpoint::Sketches => DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
526 }
527 }
528
529 fn uncompressed_size_limit(&self) -> usize {
530 match self.endpoint {
531 MetricsEndpoint::Series => SERIES_V2_UNCOMPRESSED_SIZE_LIMIT,
532 MetricsEndpoint::Sketches => DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
533 }
534 }
535
536 fn is_valid_input(&self, input: &Self::Input) -> bool {
537 let input_endpoint = MetricsEndpoint::from_metric(input);
538 input_endpoint == self.endpoint
539 }
540
541 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
542 encode_single_metric(
562 input,
563 &self.additional_tags,
564 buffer,
565 &mut self.primary_scratch_buf,
566 &mut self.secondary_scratch_buf,
567 &mut self.packed_scratch_buf,
568 &mut self.tags_deduplicator,
569 )?;
570
571 Ok(())
572 }
573
574 fn endpoint_uri(&self) -> Uri {
575 match self.endpoint {
576 MetricsEndpoint::Series => PathAndQuery::from_static("/api/v2/series").into(),
577 MetricsEndpoint::Sketches => PathAndQuery::from_static("/api/beta/sketches").into(),
578 }
579 }
580
581 fn endpoint_method(&self) -> Method {
582 Method::POST
584 }
585
586 fn content_type(&self) -> HeaderValue {
587 CONTENT_TYPE_PROTOBUF.clone()
589 }
590}
591
592fn field_number_for_metric_type(metric: &Metric) -> u32 {
593 match metric.values() {
594 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => 1,
595 MetricValues::Histogram(..) | MetricValues::Distribution(..) => 1,
596 }
597}
598
599fn get_message_size(raw_msg_size: usize) -> Result<u32, protobuf::Error> {
600 const MAX_MESSAGE_SIZE: u64 = i32::MAX as u64;
601
602 if raw_msg_size as u64 > MAX_MESSAGE_SIZE {
604 return Err(std::io::Error::other("message size exceeds limit (2147483648 bytes)").into());
605 }
606
607 Ok(raw_msg_size as u32)
608}
609
610fn get_message_size_from_buffer(buf: &[u8]) -> Result<u32, protobuf::Error> {
611 get_message_size(buf.len())
612}
613
614fn encode_single_metric(
615 metric: &Metric, additional_tags: &SharedTagSet, output_buf: &mut Vec<u8>, primary_scratch_buf: &mut Vec<u8>,
616 secondary_scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
617 tags_deduplicator: &mut ReusableDeduplicator<Tag>,
618) -> Result<(), protobuf::Error> {
619 let mut output_stream = CodedOutputStream::vec(output_buf);
620 let field_number = field_number_for_metric_type(metric);
621
622 write_nested_message(&mut output_stream, primary_scratch_buf, field_number, |os| {
623 match metric.values() {
625 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
626 encode_series_metric(metric, additional_tags, os, secondary_scratch_buf, tags_deduplicator)
627 }
628 MetricValues::Histogram(..) | MetricValues::Distribution(..) => encode_sketch_metric(
629 metric,
630 additional_tags,
631 os,
632 secondary_scratch_buf,
633 packed_scratch_buf,
634 tags_deduplicator,
635 ),
636 }
637 })
638}
639
640fn encode_series_metric(
641 metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
642 scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
643) -> Result<(), protobuf::Error> {
644 output_stream.write_string(SERIES_METRIC_FIELD_NUMBER, metric.context().name())?;
646
647 let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
648 write_series_tags(deduplicated_tags, output_stream, scratch_buf)?;
649
650 write_resource(
652 output_stream,
653 scratch_buf,
654 "host",
655 metric.metadata().hostname().unwrap_or_default(),
656 )?;
657
658 if let Some(origin) = metric.metadata().origin() {
660 match origin {
661 MetricOrigin::SourceType(source_type) => {
662 output_stream.write_string(SERIES_SOURCE_TYPE_NAME_FIELD_NUMBER, source_type.as_ref())?;
663 }
664 MetricOrigin::OriginMetadata {
665 product,
666 subproduct,
667 product_detail,
668 } => {
669 write_origin_metadata(
670 output_stream,
671 scratch_buf,
672 SERIES_METADATA_FIELD_NUMBER,
673 *product,
674 *subproduct,
675 *product_detail,
676 )?;
677 }
678 }
679 }
680
681 let (metric_type, points, maybe_interval) = match metric.values() {
683 MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
684 MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
685 MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
686 MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
687 _ => unreachable!(),
688 };
689
690 output_stream.write_enum(SERIES_TYPE_FIELD_NUMBER, metric_type.value())?;
691
692 for (timestamp, value) in points {
693 let value = maybe_interval
695 .map(|interval| value / interval.as_secs_f64())
696 .unwrap_or(value);
697 let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;
698
699 write_point(output_stream, scratch_buf, value, timestamp)?;
700 }
701
702 if let Some(interval) = maybe_interval {
703 output_stream.write_int64(SERIES_INTERVAL_FIELD_NUMBER, interval.as_secs() as i64)?;
704 }
705
706 Ok(())
707}
708
709fn encode_sketch_metric(
710 metric: &Metric, additional_tags: &SharedTagSet, output_stream: &mut CodedOutputStream<'_>,
711 scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
712) -> Result<(), protobuf::Error> {
713 output_stream.write_string(SKETCH_METRIC_FIELD_NUMBER, metric.context().name())?;
715
716 let deduplicated_tags = get_deduplicated_tags(metric, additional_tags, tags_deduplicator);
717 write_sketch_tags(deduplicated_tags, output_stream, scratch_buf)?;
718
719 output_stream.write_string(
721 SKETCH_HOST_FIELD_NUMBER,
722 metric.metadata().hostname().unwrap_or_default(),
723 )?;
724
725 if let Some(MetricOrigin::OriginMetadata {
727 product,
728 subproduct,
729 product_detail,
730 }) = metric.metadata().origin()
731 {
732 write_origin_metadata(
733 output_stream,
734 scratch_buf,
735 SKETCH_METADATA_FIELD_NUMBER,
736 *product,
737 *subproduct,
738 *product_detail,
739 )?;
740 }
741
742 match metric.values() {
744 MetricValues::Distribution(sketches) => {
745 for (timestamp, value) in sketches {
746 write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, value)?;
747 }
748 }
749 MetricValues::Histogram(points) => {
750 for (timestamp, histogram) in points {
751 let mut ddsketch = DDSketch::default();
753 for sample in histogram.samples() {
754 ddsketch.insert_n(sample.value.into_inner(), sample.weight as u32);
755 }
756
757 write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
758 }
759 }
760 _ => unreachable!(),
761 }
762
763 Ok(())
764}
765
766fn write_resource(
767 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, resource_type: &str, resource_name: &str,
768) -> Result<(), protobuf::Error> {
769 write_nested_message(output_stream, scratch_buf, SERIES_RESOURCES_FIELD_NUMBER, |os| {
770 os.write_string(RESOURCES_TYPE_FIELD_NUMBER, resource_type)?;
771 os.write_string(RESOURCES_NAME_FIELD_NUMBER, resource_name)
772 })
773}
774
775fn write_origin_metadata(
776 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, origin_product: u32,
777 origin_category: u32, origin_service: u32,
778) -> Result<(), protobuf::Error> {
779 scratch_buf.clear();
782
783 {
784 let mut origin_output_stream = CodedOutputStream::vec(scratch_buf);
785 origin_output_stream.write_uint32(ORIGIN_ORIGIN_PRODUCT_FIELD_NUMBER, origin_product)?;
786 origin_output_stream.write_uint32(ORIGIN_ORIGIN_CATEGORY_FIELD_NUMBER, origin_category)?;
787 origin_output_stream.write_uint32(ORIGIN_ORIGIN_SERVICE_FIELD_NUMBER, origin_service)?;
788 origin_output_stream.flush()?;
789 }
790
791 let origin_message_size = get_message_size_from_buffer(scratch_buf)?;
796
797 let mut metadata_preamble_buf = [0; 64];
798 let metadata_preamble_len = {
799 let mut metadata_output_stream = CodedOutputStream::bytes(&mut metadata_preamble_buf[..]);
800 metadata_output_stream.write_tag(METADATA_ORIGIN_FIELD_NUMBER, WireType::LengthDelimited)?;
801 metadata_output_stream.write_raw_varint32(origin_message_size)?;
802 metadata_output_stream.flush()?;
803 metadata_output_stream.total_bytes_written() as usize
804 };
805
806 let metadata_message_size = get_message_size(scratch_buf.len() + metadata_preamble_len)?;
807
808 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
809 output_stream.write_raw_varint32(metadata_message_size)?;
810 output_stream.write_raw_bytes(&metadata_preamble_buf[..metadata_preamble_len])?;
811 output_stream.write_raw_bytes(scratch_buf)
812}
813
814fn write_point(
815 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, value: f64, timestamp: i64,
816) -> Result<(), protobuf::Error> {
817 write_nested_message(output_stream, scratch_buf, SERIES_POINTS_FIELD_NUMBER, |os| {
818 os.write_double(METRIC_POINT_VALUE_FIELD_NUMBER, value)?;
819 os.write_int64(METRIC_POINT_TIMESTAMP_FIELD_NUMBER, timestamp)
820 })
821}
822
823fn write_dogsketch(
824 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, packed_scratch_buf: &mut Vec<u8>,
825 timestamp: Option<NonZeroU64>, sketch: &DDSketch,
826) -> Result<(), protobuf::Error> {
827 if sketch.is_empty() {
829 warn!("Attempted to write an empty sketch to sketches payload, skipping.");
830 return Ok(());
831 }
832
833 write_nested_message(output_stream, scratch_buf, SKETCH_DOGSKETCHES_FIELD_NUMBER, |os| {
834 os.write_int64(DOGSKETCH_TS_FIELD_NUMBER, timestamp.map_or(0, |ts| ts.get() as i64))?;
835 os.write_int64(DOGSKETCH_CNT_FIELD_NUMBER, sketch.count() as i64)?;
836 os.write_double(DOGSKETCH_MIN_FIELD_NUMBER, sketch.min().unwrap())?;
837 os.write_double(DOGSKETCH_MAX_FIELD_NUMBER, sketch.max().unwrap())?;
838 os.write_double(DOGSKETCH_AVG_FIELD_NUMBER, sketch.avg().unwrap())?;
839 os.write_double(DOGSKETCH_SUM_FIELD_NUMBER, sketch.sum().unwrap())?;
840
841 let bin_keys = sketch.bins().iter().map(|bin| bin.key());
842 write_repeated_packed_from_iter(
843 os,
844 packed_scratch_buf,
845 DOGSKETCH_K_FIELD_NUMBER,
846 bin_keys,
847 |inner_os, value| inner_os.write_sint32_no_tag(value),
848 )?;
849
850 let bin_counts = sketch.bins().iter().map(|bin| bin.count());
851 write_repeated_packed_from_iter(
852 os,
853 packed_scratch_buf,
854 DOGSKETCH_N_FIELD_NUMBER,
855 bin_counts,
856 |inner_os, value| inner_os.write_uint32_no_tag(value),
857 )
858 })
859}
860
861fn get_deduplicated_tags<'a>(
862 metric: &'a Metric, additional_tags: &'a SharedTagSet, tags_deduplicator: &'a mut ReusableDeduplicator<Tag>,
863) -> impl Iterator<Item = &'a Tag> {
864 let chained_tags = metric
865 .context()
866 .tags()
867 .into_iter()
868 .chain(additional_tags)
869 .chain(metric.context().origin_tags());
870
871 tags_deduplicator.deduplicated(chained_tags)
872}
873
874fn write_tags<'a, I, F>(
875 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, tag_encoder: F,
876) -> Result<(), protobuf::Error>
877where
878 I: Iterator<Item = &'a Tag>,
879 F: Fn(&Tag, &mut CodedOutputStream<'_>, &mut Vec<u8>) -> Result<(), protobuf::Error>,
880{
881 for tag in tags {
882 tag_encoder(tag, output_stream, scratch_buf)?;
883 }
884
885 Ok(())
886}
887
888fn write_series_tags<'a, I>(
889 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
890) -> Result<(), protobuf::Error>
891where
892 I: Iterator<Item = &'a Tag>,
893{
894 write_tags(tags, output_stream, scratch_buf, |tag, os, buf| {
895 if tag.name() == "dd.internal.resource" {
897 if let Some((resource_type, resource_name)) = tag.value().and_then(|s| s.split_once(':')) {
898 write_resource(os, buf, resource_type, resource_name)
899 } else {
900 Ok(())
901 }
902 } else {
903 os.write_string(SERIES_TAGS_FIELD_NUMBER, tag.as_str())
905 }
906 })
907}
908
909fn write_sketch_tags<'a, I>(
910 tags: I, output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>,
911) -> Result<(), protobuf::Error>
912where
913 I: Iterator<Item = &'a Tag>,
914{
915 write_tags(tags, output_stream, scratch_buf, |tag, os, _buf| {
916 os.write_string(SKETCH_TAGS_FIELD_NUMBER, tag.as_str())
918 })
919}
920
921fn write_nested_message<F>(
922 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, writer: F,
923) -> Result<(), protobuf::Error>
924where
925 F: FnOnce(&mut CodedOutputStream<'_>) -> Result<(), protobuf::Error>,
926{
927 scratch_buf.clear();
928
929 {
930 let mut nested_output_stream = CodedOutputStream::vec(scratch_buf);
931 writer(&mut nested_output_stream)?;
932 nested_output_stream.flush()?;
933 }
934
935 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
936
937 let nested_message_size = get_message_size_from_buffer(scratch_buf)?;
938 output_stream.write_raw_varint32(nested_message_size)?;
939 output_stream.write_raw_bytes(scratch_buf)
940}
941
942fn write_repeated_packed_from_iter<I, T, F>(
943 output_stream: &mut CodedOutputStream<'_>, scratch_buf: &mut Vec<u8>, field_number: u32, values: I, writer: F,
944) -> Result<(), protobuf::Error>
945where
946 I: Iterator<Item = T>,
947 F: Fn(&mut CodedOutputStream<'_>, T) -> Result<(), protobuf::Error>,
948{
949 scratch_buf.clear();
958
959 {
960 let mut packed_output_stream = CodedOutputStream::vec(scratch_buf);
961 for value in values {
962 writer(&mut packed_output_stream, value)?;
963 }
964 packed_output_stream.flush()?;
965 }
966
967 let data_size = get_message_size_from_buffer(scratch_buf)?;
968
969 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
970 output_stream.write_raw_varint32(data_size)?;
971 output_stream.write_raw_bytes(scratch_buf)
972}
973
974#[cfg(test)]
975mod tests {
976 use std::time::Duration;
977
978 use protobuf::CodedOutputStream;
979 use saluki_common::iter::ReusableDeduplicator;
980 use saluki_context::tags::SharedTagSet;
981 use saluki_core::data_model::event::metric::Metric;
982
983 use super::{encode_sketch_metric, MetricsEndpoint, MetricsEndpointEncoder};
984 use crate::common::datadog::request_builder::EndpointEncoder as _;
985
986 #[test]
987 fn histogram_vs_sketch_identical_payload() {
988 let samples = &[1.0, 2.0, 3.0, 4.0, 5.0];
994 let histogram = Metric::histogram("simple_samples", samples);
995 let distribution = Metric::distribution("simple_samples", samples);
996 let host_tags = SharedTagSet::default();
997
998 let mut buf1 = Vec::new();
999 let mut buf2 = Vec::new();
1000 let mut tags_deduplicator = ReusableDeduplicator::new();
1001
1002 let mut histogram_payload = Vec::new();
1003 {
1004 let mut histogram_writer = CodedOutputStream::vec(&mut histogram_payload);
1005 encode_sketch_metric(
1006 &histogram,
1007 &host_tags,
1008 &mut histogram_writer,
1009 &mut buf1,
1010 &mut buf2,
1011 &mut tags_deduplicator,
1012 )
1013 .expect("Failed to encode histogram as sketch");
1014 }
1015
1016 let mut distribution_payload = Vec::new();
1017 {
1018 let mut distribution_writer = CodedOutputStream::vec(&mut distribution_payload);
1019 encode_sketch_metric(
1020 &distribution,
1021 &host_tags,
1022 &mut distribution_writer,
1023 &mut buf1,
1024 &mut buf2,
1025 &mut tags_deduplicator,
1026 )
1027 .expect("Failed to encode distribution as sketch");
1028 }
1029
1030 assert_eq!(histogram_payload, distribution_payload);
1031 }
1032
1033 #[test]
1034 fn input_valid() {
1035 let counter = Metric::counter("counter", 1.0);
1038 let rate = Metric::rate("rate", 1.0, Duration::from_secs(1));
1039 let gauge = Metric::gauge("gauge", 1.0);
1040 let set = Metric::set("set", "foo");
1041 let histogram = Metric::histogram("histogram", [1.0, 2.0, 3.0]);
1042 let distribution = Metric::distribution("distribution", [1.0, 2.0, 3.0]);
1043
1044 let series_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Series);
1045 let sketches_endpoint = MetricsEndpointEncoder::from_endpoint(MetricsEndpoint::Sketches);
1046
1047 assert!(series_endpoint.is_valid_input(&counter));
1048 assert!(series_endpoint.is_valid_input(&rate));
1049 assert!(series_endpoint.is_valid_input(&gauge));
1050 assert!(series_endpoint.is_valid_input(&set));
1051 assert!(!series_endpoint.is_valid_input(&histogram));
1052 assert!(!series_endpoint.is_valid_input(&distribution));
1053
1054 assert!(!sketches_endpoint.is_valid_input(&counter));
1055 assert!(!sketches_endpoint.is_valid_input(&rate));
1056 assert!(!sketches_endpoint.is_valid_input(&gauge));
1057 assert!(!sketches_endpoint.is_valid_input(&set));
1058 assert!(sketches_endpoint.is_valid_input(&histogram));
1059 assert!(sketches_endpoint.is_valid_input(&distribution));
1060 }
1061}