1use std::{
6 sync::Arc,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use async_trait::async_trait;
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{CONTAINER_ID, K8S_POD_UID};
13use saluki_config::GenericConfiguration;
14use saluki_context::{origin::OriginTagCardinality, tags::TagSet};
15use saluki_core::{
16 components::{transforms::*, ComponentContext},
17 data_model::event::{
18 trace::Trace,
19 trace_stats::{ClientStatsPayload, TraceStats},
20 Event, EventType,
21 },
22 topology::OutputDefinition,
23};
24use saluki_env::{
25 host::providers::BoxedHostProvider, workload::EntityId, EnvironmentProvider, HostProvider, WorkloadProvider,
26};
27use saluki_error::{ErrorContext as _, GenericError};
28use stringtheory::MetaString;
29use tokio::{select, time::interval};
30use tracing::{debug, error};
31
32use crate::common::{
33 datadog::apm::ApmConfig,
34 otlp::util::{extract_container_tags_from_resource_tagset, KEY_DATADOG_CONTAINER_ID},
35};
36
37mod aggregation;
38use self::aggregation::{process_tags_hash, PayloadAggregationKey};
39
40mod span_concentrator;
41use self::span_concentrator::{InfraTags, SpanConcentrator};
42
43mod statsraw;
44
45mod weight;
46use self::weight::weight;
47
48const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
50
51const TAG_PROCESS_TAGS: &str = "_dd.tags.process";
53
54const MAX_STATS_GROUPS_PER_EVENT: usize = 4000;
56
57pub struct ApmStatsTransformConfiguration {
62 apm_config: ApmConfig,
63 default_hostname: Option<String>,
64 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
65}
66
67impl ApmStatsTransformConfiguration {
68 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
70 let apm_config = ApmConfig::from_configuration(config)?;
71 Ok(Self {
72 apm_config,
73 default_hostname: None,
74 workload_provider: None,
75 })
76 }
77
78 pub async fn with_environment_provider<E>(mut self, env_provider: E) -> Result<Self, GenericError>
80 where
81 E: EnvironmentProvider<Host = BoxedHostProvider>,
82 {
83 let hostname = env_provider.host().get_hostname().await?;
84 self.default_hostname = Some(hostname);
85 Ok(self)
86 }
87
88 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
92 where
93 W: WorkloadProvider + Send + Sync + 'static,
94 {
95 self.workload_provider = Some(Arc::new(workload_provider));
96 self
97 }
98}
99
100#[async_trait]
101impl TransformBuilder for ApmStatsTransformConfiguration {
102 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
103 let mut apm_config = self.apm_config.clone();
104
105 if let Some(hostname) = &self.default_hostname {
106 apm_config.set_hostname_if_empty(hostname.as_str());
107 }
108
109 let concentrator = SpanConcentrator::new(
110 apm_config.compute_stats_by_span_kind(),
111 apm_config.peer_tags_aggregation(),
112 apm_config.peer_tags(),
113 now_nanos(),
114 );
115
116 Ok(Box::new(ApmStats {
117 concentrator,
118 flush_interval: DEFAULT_FLUSH_INTERVAL,
119 agent_env: apm_config.default_env().clone(),
120 agent_hostname: apm_config.hostname().clone(),
121 workload_provider: self.workload_provider.clone(),
122 }))
123 }
124
125 fn input_event_type(&self) -> EventType {
126 EventType::Trace
127 }
128
129 fn outputs(&self) -> &[OutputDefinition<EventType>] {
130 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::TraceStats)];
131 OUTPUTS
132 }
133}
134
135impl MemoryBounds for ApmStatsTransformConfiguration {
136 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
137 builder.minimum().with_single_value::<ApmStats>("component struct");
138 }
140}
141
142struct ApmStats {
143 concentrator: SpanConcentrator,
144 flush_interval: Duration,
145 agent_env: MetaString,
146 agent_hostname: MetaString,
147 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
148}
149
150impl ApmStats {
151 fn process_trace(&mut self, trace: &Trace) {
152 let root_span = trace
153 .spans()
154 .iter()
155 .find(|s| s.parent_id() == 0)
156 .or_else(|| trace.spans().first());
157
158 let trace_weight = root_span.map(weight).unwrap_or(1.0);
159
160 let process_tags = extract_process_tags(trace);
161
162 let payload_key = self.build_payload_key(trace, &process_tags);
163 let infra_tags = self.build_infra_tags(trace, &process_tags);
164
165 let origin = trace
166 .spans()
167 .first()
168 .and_then(|s| s.meta().get("_dd.origin"))
169 .map(|s| s.as_ref())
170 .unwrap_or("");
171
172 for span in trace.spans() {
173 if let Some(stat_span) = self.concentrator.new_stat_span_from_span(span) {
174 self.concentrator
175 .add_span(&stat_span, trace_weight, &payload_key, &infra_tags, origin);
176 }
177 }
178 }
179
180 fn build_infra_tags(&self, trace: &Trace, process_tags: &str) -> InfraTags {
181 let resource_tags = trace.resource_tags();
182 let container_id = resolve_container_id(resource_tags);
183 let mut container_tags = if container_id.is_empty() {
184 TagSet::default()
185 } else {
186 extract_container_tags(resource_tags)
187 };
188
189 if !container_id.is_empty() {
191 if let Some(workload_provider) = &self.workload_provider {
192 let entity_id = EntityId::Container(container_id.clone());
193 if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
194 container_tags.merge_shared(&tags);
195 }
196 }
197 }
198
199 InfraTags::new(container_id, container_tags, process_tags)
200 }
201
202 fn build_payload_key(&self, trace: &Trace, process_tags: &str) -> PayloadAggregationKey {
203 let root_span = trace
204 .spans()
205 .iter()
206 .find(|s| s.parent_id() == 0)
207 .or_else(|| trace.spans().first());
208
209 let span_env = root_span.and_then(|s| s.meta().get("env")).filter(|s| !s.is_empty());
210 let env = span_env.cloned().unwrap_or_else(|| self.agent_env.clone());
211
212 let hostname = root_span
213 .and_then(|s| s.meta().get("_dd.hostname"))
214 .filter(|s| !s.is_empty())
215 .cloned()
216 .unwrap_or_else(|| self.agent_hostname.clone());
217
218 let version = root_span
219 .and_then(|s| s.meta().get("version"))
220 .cloned()
221 .unwrap_or_default();
222
223 let container_id = root_span
224 .and_then(|s| s.meta().get("_dd.container_id"))
225 .cloned()
226 .unwrap_or_default();
227
228 let git_commit_sha = root_span
229 .and_then(|s| s.meta().get("_dd.git.commit.sha"))
230 .cloned()
231 .unwrap_or_default();
232
233 let image_tag = root_span
234 .and_then(|s| s.meta().get("_dd.image_tag"))
235 .cloned()
236 .unwrap_or_default();
237
238 let lang = root_span
239 .and_then(|s| s.meta().get("language"))
240 .cloned()
241 .unwrap_or_default();
242
243 PayloadAggregationKey {
244 env,
245 hostname,
246 version,
247 container_id,
248 git_commit_sha,
249 image_tag,
250 lang,
251 process_tags_hash: process_tags_hash(process_tags),
252 }
253 }
254}
255
256fn split_into_trace_stats(client_payloads: Vec<ClientStatsPayload>, max_entries_per_event: usize) -> Vec<TraceStats> {
262 if client_payloads.is_empty() {
263 return Vec::new();
264 }
265
266 let total_grouped_entries = client_payloads
269 .iter()
270 .map(|p| p.stats().iter().map(|b| b.stats().len()).sum::<usize>())
271 .sum::<usize>();
272 if total_grouped_entries <= max_entries_per_event {
273 return vec![TraceStats::new(client_payloads)];
274 }
275
276 let mut events = Vec::new();
277 let mut current_client_payloads = Vec::new();
278 let mut current_event_len = 0;
279
280 for mut client_payload in client_payloads {
281 let client_payload_len = client_payload.stats().iter().map(|b| b.stats().len()).sum::<usize>();
283 if current_event_len + client_payload_len <= max_entries_per_event {
284 current_client_payloads.push(client_payload);
285 current_event_len += client_payload_len;
286 continue;
287 }
288
289 let mut current_client_stats_buckets = Vec::new();
296 for mut client_stats_bucket in client_payload.take_stats() {
297 let bucket_len = client_stats_bucket.stats().len();
298 if current_event_len + bucket_len <= max_entries_per_event {
300 current_client_stats_buckets.push(client_stats_bucket);
301 current_event_len += bucket_len;
302 continue;
303 }
304
305 let mut bucket_entries = client_stats_bucket.take_stats();
308 while current_event_len + bucket_entries.len() > max_entries_per_event {
309 let split_amount = max_entries_per_event - current_event_len;
313 let split_point = bucket_entries.len() - split_amount;
314 let split_entries = bucket_entries.split_off(split_point);
315
316 let split_bucket = client_stats_bucket.clone().with_stats(split_entries);
320 current_client_stats_buckets.push(split_bucket);
321
322 let split_client_payload = client_payload
323 .clone()
324 .with_stats(std::mem::take(&mut current_client_stats_buckets));
325 current_client_payloads.push(split_client_payload);
326
327 events.push(TraceStats::new(std::mem::take(&mut current_client_payloads)));
328 current_event_len = 0;
329 }
330
331 if !bucket_entries.is_empty() {
334 current_event_len += bucket_entries.len();
335 current_client_stats_buckets.push(client_stats_bucket.with_stats(bucket_entries));
336 }
337 }
338
339 if !current_client_stats_buckets.is_empty() {
341 current_client_payloads.push(client_payload.with_stats(current_client_stats_buckets));
342 }
343 }
344
345 if !current_client_payloads.is_empty() {
347 events.push(TraceStats::new(current_client_payloads));
348 }
349
350 events
351}
352
353#[async_trait]
354impl Transform for ApmStats {
355 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
356 let mut health = context.take_health_handle();
357
358 let mut flush_ticker = interval(self.flush_interval);
359 flush_ticker.tick().await;
360
361 let mut final_flush = false;
362
363 health.mark_ready();
364 debug!("APM Stats transform started.");
365
366 loop {
367 select! {
368 _ = health.live() => continue,
369
370 _ = flush_ticker.tick() => {
371 let stats_payloads = self.concentrator.flush(now_nanos(), final_flush);
372 if !stats_payloads.is_empty() {
373 debug!(stats_payloads = stats_payloads.len(), "Flushing APM stats.");
374
375 let events = split_into_trace_stats(stats_payloads, MAX_STATS_GROUPS_PER_EVENT);
376 let dispatcher = context.dispatcher().buffered()
377 .error_context("Default output should be available.")?;
378
379 if let Err(e) = dispatcher.send_all(events.into_iter().map(Event::TraceStats)).await {
380 error!(error = %e, "Failed to dispatch events.");
381 }
382 }
383
384 if final_flush {
385 debug!("Final APM stats flush complete.");
386 break;
387 }
388 },
389
390 maybe_events = context.events().next(), if !final_flush => {
391 match maybe_events {
392 Some(events) => {
393 for event in events {
394 if let Event::Trace(trace) = event {
395 self.process_trace(&trace);
396 }
397 }
398 },
399 None => {
400 final_flush = true;
403 flush_ticker.reset_immediately();
404 debug!("APM Stats transform stopping, triggering final flush...");
405 }
406 }
407 },
408 }
409 }
410
411 debug!("APM Stats transform stopped.");
412 Ok(())
413 }
414}
415
416fn now_nanos() -> u64 {
418 SystemTime::now()
419 .duration_since(UNIX_EPOCH)
420 .unwrap_or_default()
421 .as_nanos() as u64
422}
423
424fn resolve_container_id(resource_tags: &TagSet) -> MetaString {
426 for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
427 if let Some(tag) = resource_tags.get_single_tag(key) {
428 if let Some(value) = tag.value() {
429 if !value.is_empty() {
430 return MetaString::from(value);
431 }
432 }
433 }
434 }
435
436 MetaString::empty()
437}
438
439fn extract_container_tags(resource_tags: &TagSet) -> TagSet {
441 let mut container_tags_set = TagSet::default();
442 extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags_set);
443
444 container_tags_set
445}
446
447fn extract_process_tags(trace: &Trace) -> MetaString {
449 if let Some(first_span) = trace.spans().first() {
450 if let Some(process_tags) = first_span.meta().get(TAG_PROCESS_TAGS) {
451 return process_tags.clone();
452 }
453 }
454
455 MetaString::empty()
456}
457
458#[cfg(test)]
459mod tests {
460 use proptest::prelude::*;
461 use saluki_common::collections::FastHashMap;
462 use saluki_context::tags::TagSet;
463 use saluki_core::data_model::event::trace_stats::ClientStatsBucket;
464 use saluki_core::data_model::event::{trace::Span, trace_stats::ClientGroupedStats};
465
466 use super::aggregation::BUCKET_DURATION_NS;
467 use super::span_concentrator::METRIC_PARTIAL_VERSION;
468 use super::*;
469
470 fn align_ts(ts: u64, bsize: u64) -> u64 {
472 ts - ts % bsize
473 }
474
475 #[allow(clippy::too_many_arguments)]
477 fn test_span(
478 aligned_now: u64, span_id: u64, parent_id: u64, duration: u64, bucket_offset: u64, service: &str,
479 resource: &str, error: i32, meta: Option<FastHashMap<MetaString, MetaString>>,
480 metrics: Option<FastHashMap<MetaString, f64>>,
481 ) -> Span {
482 let bucket_start = aligned_now - bucket_offset * BUCKET_DURATION_NS;
486 let start = bucket_start - duration;
487
488 Span::new(
489 service, "query", resource, "db", 1, span_id, parent_id, start, duration, error,
490 )
491 .with_meta(meta)
492 .with_metrics(metrics)
493 }
494
495 fn make_test_span(service: &str, name: &str, resource: &str) -> Span {
497 let mut metrics = FastHashMap::default();
498 metrics.insert(MetaString::from("_dd.measured"), 1.0);
499
500 Span::new(service, name, resource, "web", 1, 1, 0, 1000000000, 100000000, 0).with_metrics(metrics)
501 }
502
503 fn make_top_level_span(
505 aligned_now: u64, span_id: u64, duration: u64, bucket_offset: u64, service: &str, resource: &str, error: i32,
506 meta: Option<FastHashMap<MetaString, MetaString>>,
507 ) -> Span {
508 let mut metrics = FastHashMap::default();
509 metrics.insert(MetaString::from("_top_level"), 1.0);
510 test_span(
511 aligned_now,
512 span_id,
513 0,
514 duration,
515 bucket_offset,
516 service,
517 resource,
518 error,
519 meta,
520 Some(metrics),
521 )
522 }
523
524 #[test]
525 fn test_process_trace_creates_stats() {
526 let now = now_nanos();
527
528 let concentrator = SpanConcentrator::new(true, true, &[], now);
529 let mut transform = ApmStats {
530 concentrator,
531 flush_interval: DEFAULT_FLUSH_INTERVAL,
532 agent_env: MetaString::from("none"),
533 agent_hostname: MetaString::default(),
534 workload_provider: None,
535 };
536
537 let span = make_test_span("test-service", "test-operation", "test-resource");
538 let trace = Trace::new(vec![span], TagSet::default());
539
540 transform.process_trace(&trace);
541
542 let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
544 assert!(!stats.is_empty(), "Expected stats to be produced");
545 }
546
547 #[test]
548 fn test_weight_applied_to_stats() {
549 let now = now_nanos();
550
551 let concentrator = SpanConcentrator::new(true, true, &[], now);
552 let mut transform = ApmStats {
553 concentrator,
554 flush_interval: DEFAULT_FLUSH_INTERVAL,
555 agent_env: MetaString::from("none"),
556 agent_hostname: MetaString::default(),
557 workload_provider: None,
558 };
559
560 let mut metrics = FastHashMap::default();
562 metrics.insert(MetaString::from("_dd.measured"), 1.0);
563 metrics.insert(MetaString::from("_sample_rate"), 0.5);
564
565 let span = Span::new(
566 "test-service",
567 "test-op",
568 "test-resource",
569 "web",
570 1,
571 1,
572 0,
573 now,
574 100000000,
575 0,
576 )
577 .with_metrics(metrics);
578
579 let trace = Trace::new(vec![span], TagSet::default());
580 transform.process_trace(&trace);
581
582 let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
583 assert!(!stats.is_empty());
584
585 let bucket = &stats[0].stats()[0];
587 let grouped = &bucket.stats()[0];
588 assert!(grouped.hits() >= 1, "Expected weighted hits");
590 }
591
592 #[test]
593 fn test_force_flush() {
594 let now = now_nanos();
595 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
596
597 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
598
599 let span = make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None);
601 let trace = Trace::new(vec![span], TagSet::default());
602
603 let payload_key = PayloadAggregationKey {
604 env: MetaString::from("test"),
605 hostname: MetaString::from("host"),
606 ..Default::default()
607 };
608 let infra_tags = InfraTags::default();
609
610 for span in trace.spans() {
611 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
612 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
613 }
614 }
615
616 let ts: u64 = 0;
618
619 let stats = concentrator.flush(ts, false);
621 assert!(stats.is_empty(), "Non-force flush should return empty");
622
623 let stats = concentrator.flush(ts, true);
625 assert!(!stats.is_empty(), "Force flush should return stats");
626 assert_eq!(stats[0].stats().len(), 1, "Should have 1 bucket");
627 }
628
629 #[test]
630 fn test_ignores_partial_spans() {
631 let now = now_nanos();
632 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
633
634 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
635
636 let mut metrics = FastHashMap::default();
638 metrics.insert(MetaString::from("_top_level"), 1.0);
639 metrics.insert(MetaString::from(METRIC_PARTIAL_VERSION), 830604.0);
640
641 let span = test_span(aligned_now, 1, 0, 50, 5, "A1", "resource1", 0, None, Some(metrics));
642 let trace = Trace::new(vec![span], TagSet::default());
643
644 let payload_key = PayloadAggregationKey {
645 env: MetaString::from("test"),
646 hostname: MetaString::from("tracer-hostname"),
647 ..Default::default()
648 };
649 let infra_tags = InfraTags::default();
650
651 for span in trace.spans() {
652 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
653 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
654 }
655 }
656
657 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
659 assert!(stats.is_empty(), "Partial spans should be ignored");
660 }
661
662 #[test]
663 fn test_concentrator_stats_totals() {
664 let now = now_nanos();
665 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
666
667 let oldest_ts = aligned_now - 2 * BUCKET_DURATION_NS;
669 let mut concentrator = SpanConcentrator::new(true, true, &[], oldest_ts);
670
671 let spans = vec![
673 make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
674 make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
675 make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
676 make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
677 make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
678 make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
679 ];
680
681 let payload_key = PayloadAggregationKey {
682 env: MetaString::from("none"),
683 ..Default::default()
684 };
685 let infra_tags = InfraTags::default();
686
687 for span in &spans {
688 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
689 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
690 }
691 }
692
693 let all_stats = concentrator.flush(now + BUCKET_DURATION_NS * 10, true);
695
696 let mut total_duration: u64 = 0;
697 let mut total_hits: u64 = 0;
698 let mut total_errors: u64 = 0;
699 let mut total_top_level_hits: u64 = 0;
700
701 for payload in &all_stats {
702 for bucket in payload.stats() {
703 for grouped in bucket.stats() {
704 total_duration += grouped.duration();
705 total_hits += grouped.hits();
706 total_errors += grouped.errors();
707 total_top_level_hits += grouped.top_level_hits();
708 }
709 }
710 }
711
712 assert_eq!(total_duration, 50 + 40 + 30 + 20 + 10 + 1, "Wrong total duration");
713 assert_eq!(total_hits, 6, "Wrong total hits");
714 assert_eq!(total_top_level_hits, 6, "Wrong total top level hits");
715 assert_eq!(total_errors, 0, "Wrong total errors");
716 }
717
718 #[test]
719 fn test_root_tag() {
720 let now = now_nanos();
721 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
722
723 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
724
725 let mut root_metrics = FastHashMap::default();
727 root_metrics.insert(MetaString::from("_top_level"), 1.0);
728 let root_span = test_span(
729 aligned_now,
730 1,
731 0,
732 40,
733 10,
734 "A1",
735 "resource1",
736 0,
737 None,
738 Some(root_metrics),
739 );
740
741 let mut top_level_metrics = FastHashMap::default();
743 top_level_metrics.insert(MetaString::from("_top_level"), 1.0);
744 let top_level_span = test_span(
745 aligned_now,
746 4,
747 1000,
748 10,
749 10,
750 "A1",
751 "resource1",
752 0,
753 None,
754 Some(top_level_metrics),
755 );
756
757 let mut client_meta = FastHashMap::default();
759 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
760 let client_span = test_span(aligned_now, 3, 2, 20, 10, "A1", "resource1", 0, Some(client_meta), None);
761
762 let spans = vec![root_span, top_level_span, client_span];
763
764 let payload_key = PayloadAggregationKey {
765 env: MetaString::from("none"),
766 ..Default::default()
767 };
768 let infra_tags = InfraTags::default();
769
770 for span in &spans {
771 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
772 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
773 }
774 }
775
776 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 20, true);
777 assert!(!stats.is_empty(), "Should have stats");
778
779 let mut total_grouped = 0;
781 let mut root_count = 0;
782 let mut non_root_count = 0;
783
784 for payload in &stats {
785 for bucket in payload.stats() {
786 for grouped in bucket.stats() {
787 total_grouped += 1;
788 match grouped.is_trace_root() {
789 Some(true) => root_count += 1,
790 Some(false) => non_root_count += 1,
791 None => {}
792 }
793 }
794 }
795 }
796
797 assert_eq!(total_grouped, 3, "Expected 3 grouped stats");
802 assert_eq!(root_count, 1, "Expected 1 root span");
803 assert_eq!(non_root_count, 2, "Expected 2 non-root spans");
804 }
805
806 #[test]
807 fn test_compute_stats_through_span_kind_check() {
808 let now = now_nanos();
809
810 {
812 let mut concentrator = SpanConcentrator::new(false, true, &[], now);
813
814 let mut metrics = FastHashMap::default();
816 metrics.insert(MetaString::from("_top_level"), 1.0);
817 let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
818
819 let payload_key = PayloadAggregationKey {
820 env: MetaString::from("test"),
821 ..Default::default()
822 };
823 let infra_tags = InfraTags::default();
824
825 if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
826 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
827 }
828
829 let mut client_meta = FastHashMap::default();
832 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
833 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
834 .with_meta(client_meta);
835
836 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
837 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
838 }
839
840 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
841
842 let mut count = 0;
843 for payload in &stats {
844 for bucket in payload.stats() {
845 count += bucket.stats().len();
846 }
847 }
848
849 assert_eq!(count, 1, "Expected 1 stat when span kind check disabled");
851 }
852
853 {
855 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
856
857 let mut metrics = FastHashMap::default();
859 metrics.insert(MetaString::from("_top_level"), 1.0);
860 let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
861
862 let payload_key = PayloadAggregationKey {
863 env: MetaString::from("test"),
864 ..Default::default()
865 };
866 let infra_tags = InfraTags::default();
867
868 if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
869 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
870 }
871
872 let mut client_meta = FastHashMap::default();
875 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
876 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
877 .with_meta(client_meta);
878
879 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
880 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
881 }
882
883 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
884
885 let mut count = 0;
886 for payload in &stats {
887 for bucket in payload.stats() {
888 count += bucket.stats().len();
889 }
890 }
891
892 assert_eq!(count, 2, "Expected 2 stats when span kind check enabled");
894 }
895 }
896
897 #[test]
898 fn test_peer_tags() {
899 let now = now_nanos();
900
901 {
903 let mut concentrator = SpanConcentrator::new(true, false, &[], now);
904
905 let mut client_meta = FastHashMap::default();
907 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
908 client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
909 client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
910 let mut client_metrics = FastHashMap::default();
911 client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
912 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
913 .with_meta(client_meta)
914 .with_metrics(client_metrics);
915
916 let payload_key = PayloadAggregationKey {
917 env: MetaString::from("test"),
918 ..Default::default()
919 };
920 let infra_tags = InfraTags::default();
921
922 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
923 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
924 }
925
926 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
927
928 for payload in &stats {
930 for bucket in payload.stats() {
931 for grouped in bucket.stats() {
932 assert!(
933 grouped.peer_tags().is_empty(),
934 "Peer tags should be empty when peer_tags_aggregation is false"
935 );
936 }
937 }
938 }
939 }
940
941 {
943 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
945
946 let mut client_meta = FastHashMap::default();
948 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
949 client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
950 client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
951 let mut client_metrics = FastHashMap::default();
952 client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
953 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
954 .with_meta(client_meta)
955 .with_metrics(client_metrics);
956
957 let payload_key = PayloadAggregationKey {
958 env: MetaString::from("test"),
959 ..Default::default()
960 };
961 let infra_tags = InfraTags::default();
962
963 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
964 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
965 }
966
967 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
968
969 let mut found_client_with_peer_tags = false;
971 for payload in &stats {
972 for bucket in payload.stats() {
973 for grouped in bucket.stats() {
974 if grouped.resource() == "SELECT ..." {
975 assert!(!grouped.peer_tags().is_empty(), "Client span should have peer tags");
976 let peer_tags: Vec<&str> = grouped.peer_tags().iter().map(|s| s.as_ref()).collect();
978 assert!(
979 peer_tags.iter().any(|t| t.starts_with("db.instance:")),
980 "Should have db.instance peer tag"
981 );
982 assert!(
983 peer_tags.iter().any(|t| t.starts_with("db.system:")),
984 "Should have db.system peer tag"
985 );
986 found_client_with_peer_tags = true;
987 }
988 }
989 }
990 }
991 assert!(
992 found_client_with_peer_tags,
993 "Should have found client span with peer tags"
994 );
995 }
996 }
997
998 #[test]
999 fn test_concentrator_oldest_ts() {
1000 let now = now_nanos();
1001 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
1002
1003 {
1005 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
1007
1008 let spans = vec![
1010 make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
1011 make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
1012 make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
1013 make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
1014 make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
1015 make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
1016 ];
1017
1018 let payload_key = PayloadAggregationKey {
1019 env: MetaString::from("none"),
1020 ..Default::default()
1021 };
1022 let infra_tags = InfraTags::default();
1023
1024 for span in &spans {
1025 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
1026 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
1027 }
1028 }
1029
1030 let mut flush_time = now;
1032 let buffer_len = 2; for _ in 0..buffer_len {
1035 let stats = concentrator.flush(flush_time, false);
1036 assert!(stats.is_empty(), "Should not flush before buffer fills");
1037 flush_time += BUCKET_DURATION_NS;
1038 }
1039
1040 let stats = concentrator.flush(flush_time, false);
1042 assert!(!stats.is_empty(), "Should flush after buffer fills");
1043
1044 let mut total_hits: u64 = 0;
1046 let mut total_duration: u64 = 0;
1047 for payload in &stats {
1048 for bucket in payload.stats() {
1049 for grouped in bucket.stats() {
1050 total_hits += grouped.hits();
1051 total_duration += grouped.duration();
1052 }
1053 }
1054 }
1055
1056 assert_eq!(total_hits, 6, "All 6 spans should be counted");
1057 assert_eq!(
1058 total_duration,
1059 50 + 40 + 30 + 20 + 10 + 1,
1060 "Total duration should match"
1061 );
1062 }
1063 }
1064
1065 #[test]
1066 fn test_compute_stats_for_span_kind() {
1067 use super::span_concentrator::compute_stats_for_span_kind;
1068
1069 assert!(compute_stats_for_span_kind("server"));
1071 assert!(compute_stats_for_span_kind("consumer"));
1072 assert!(compute_stats_for_span_kind("client"));
1073 assert!(compute_stats_for_span_kind("producer"));
1074
1075 assert!(compute_stats_for_span_kind("SERVER"));
1077 assert!(compute_stats_for_span_kind("CONSUMER"));
1078 assert!(compute_stats_for_span_kind("CLIENT"));
1079 assert!(compute_stats_for_span_kind("PRODUCER"));
1080
1081 assert!(compute_stats_for_span_kind("SErVER"));
1083 assert!(compute_stats_for_span_kind("COnSUMER"));
1084 assert!(compute_stats_for_span_kind("CLiENT"));
1085 assert!(compute_stats_for_span_kind("PRoDUCER"));
1086
1087 assert!(!compute_stats_for_span_kind("internal"));
1089 assert!(!compute_stats_for_span_kind("INTERNAL"));
1090 assert!(!compute_stats_for_span_kind("INtERNAL"));
1091 assert!(!compute_stats_for_span_kind(""));
1092 }
1093
1094 #[test]
1095 fn test_extract_process_tags() {
1096 {
1098 let span = Span::default();
1099 let trace = Trace::new(vec![span], TagSet::default());
1100 let process_tags = extract_process_tags(&trace);
1101 assert!(process_tags.is_empty(), "Should be empty when no _dd.tags.process");
1102 }
1103
1104 {
1106 let mut meta = FastHashMap::default();
1107 meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from("a:1,b:2,c:3"));
1108 let span = Span::default().with_meta(meta);
1109 let trace = Trace::new(vec![span], TagSet::default());
1110 let process_tags = extract_process_tags(&trace);
1111 assert_eq!(process_tags, "a:1,b:2,c:3");
1112 }
1113
1114 {
1116 let mut meta = FastHashMap::default();
1117 meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from(""));
1118 let span = Span::default().with_meta(meta);
1119 let trace = Trace::new(vec![span], TagSet::default());
1120 let process_tags = extract_process_tags(&trace);
1121 assert!(
1122 process_tags.is_empty(),
1123 "Should be empty when _dd.tags.process is empty string"
1124 );
1125 }
1126
1127 {
1129 let trace = Trace::new(vec![], TagSet::default());
1130 let process_tags = extract_process_tags(&trace);
1131 assert!(process_tags.is_empty(), "Should be empty when trace has no spans");
1132 }
1133 }
1134
1135 #[test]
1136 fn test_process_tags_hash_computation() {
1137 use super::aggregation::process_tags_hash;
1138
1139 assert_eq!(process_tags_hash(""), 0);
1141
1142 let hash1 = process_tags_hash("a:1,b:2,c:3");
1144 let hash2 = process_tags_hash("a:1,b:2,c:3");
1145 assert_eq!(hash1, hash2);
1146
1147 let hash3 = process_tags_hash("a:1,b:2");
1149 assert_ne!(hash1, hash3);
1150 }
1151
1152 fn make_grouped_stats(service: &str, resource: &str) -> ClientGroupedStats {
1154 ClientGroupedStats::new(service, "operation", resource)
1155 .with_hits(1)
1156 .with_duration(100)
1157 }
1158
1159 fn make_bucket_with_stats(n: usize) -> ClientStatsBucket {
1161 let stats: Vec<ClientGroupedStats> = (0..n)
1162 .map(|i| make_grouped_stats("service", &format!("resource-{}", i)))
1163 .collect();
1164 ClientStatsBucket::new(1000, 10_000_000_000, stats)
1165 }
1166
1167 fn make_payload_with_buckets(hostname: &str, buckets: Vec<ClientStatsBucket>) -> ClientStatsPayload {
1169 ClientStatsPayload::new(hostname, "test-env", "1.0.0")
1170 .with_stats(buckets)
1171 .with_container_id("container-123")
1172 .with_lang("rust")
1173 }
1174
1175 fn count_grouped_stats(trace_stats: &TraceStats) -> usize {
1177 trace_stats
1178 .stats()
1179 .iter()
1180 .flat_map(|p| p.stats())
1181 .map(|b| b.stats().len())
1182 .sum()
1183 }
1184
1185 #[test]
1186 fn test_split_into_trace_stats_empty_input() {
1187 let result = split_into_trace_stats(vec![], 100);
1188 assert!(result.is_empty());
1189 }
1190
1191 #[test]
1192 fn test_split_into_trace_stats_no_split_needed() {
1193 let bucket = make_bucket_with_stats(50);
1195 let payload = make_payload_with_buckets("host1", vec![bucket]);
1196
1197 let result = split_into_trace_stats(vec![payload], 100);
1198
1199 assert_eq!(result.len(), 1);
1200 assert_eq!(count_grouped_stats(&result[0]), 50);
1201 }
1202
1203 #[test]
1204 fn test_split_into_trace_stats_exact_threshold() {
1205 let bucket = make_bucket_with_stats(100);
1207 let payload = make_payload_with_buckets("host1", vec![bucket]);
1208
1209 let result = split_into_trace_stats(vec![payload], 100);
1210
1211 assert_eq!(result.len(), 1);
1212 assert_eq!(count_grouped_stats(&result[0]), 100);
1213 }
1214
1215 #[test]
1216 fn test_split_into_trace_stats_splits_single_bucket() {
1217 let bucket = make_bucket_with_stats(250);
1219 let payload = make_payload_with_buckets("host1", vec![bucket]);
1220
1221 let result = split_into_trace_stats(vec![payload], 100);
1222
1223 assert_eq!(result.len(), 3);
1224 assert_eq!(count_grouped_stats(&result[0]), 100);
1225 assert_eq!(count_grouped_stats(&result[1]), 100);
1226 assert_eq!(count_grouped_stats(&result[2]), 50);
1227
1228 let total: usize = result.iter().map(count_grouped_stats).sum();
1230 assert_eq!(total, 250);
1231 }
1232
1233 #[test]
1234 fn test_split_into_trace_stats_splits_across_payloads() {
1235 let payload1 = make_payload_with_buckets("host1", vec![make_bucket_with_stats(60)]);
1237 let payload2 = make_payload_with_buckets("host2", vec![make_bucket_with_stats(60)]);
1238
1239 let result = split_into_trace_stats(vec![payload1, payload2], 100);
1240
1241 assert_eq!(result.len(), 2);
1242 assert_eq!(count_grouped_stats(&result[0]), 100);
1244 assert_eq!(count_grouped_stats(&result[1]), 20);
1246 }
1247
1248 #[test]
1249 fn test_split_into_trace_stats_splits_single_payload_multiple_buckets() {
1250 let bucket1 = make_bucket_with_stats(70);
1252 let bucket2 = make_bucket_with_stats(80);
1253 let payload = make_payload_with_buckets("host1", vec![bucket1, bucket2]);
1254
1255 let result = split_into_trace_stats(vec![payload], 100);
1256
1257 assert_eq!(result.len(), 2);
1258 let total: usize = result.iter().map(count_grouped_stats).sum();
1259 assert_eq!(total, 150);
1260 }
1261
1262 #[test]
1263 fn test_split_into_trace_stats_preserves_metadata() {
1264 let bucket = make_bucket_with_stats(250);
1265 let payload = ClientStatsPayload::new("test-host", "prod", "2.0.0")
1266 .with_stats(vec![bucket])
1267 .with_container_id("container-abc")
1268 .with_lang("go")
1269 .with_git_commit_sha("abc123")
1270 .with_image_tag("v1.2.3")
1271 .with_process_tags_hash(12345)
1272 .with_process_tags("tag1,tag2");
1273
1274 let result = split_into_trace_stats(vec![payload], 100);
1275
1276 for trace_stats in &result {
1278 for p in trace_stats.stats() {
1279 assert_eq!(p.hostname(), "test-host");
1280 assert_eq!(p.env(), "prod");
1281 assert_eq!(p.version(), "2.0.0");
1282 assert_eq!(p.container_id(), "container-abc");
1283 assert_eq!(p.lang(), "go");
1284 assert_eq!(p.git_commit_sha(), "abc123");
1285 assert_eq!(p.image_tag(), "v1.2.3");
1286 assert_eq!(p.process_tags_hash(), 12345);
1287 assert_eq!(p.process_tags(), "tag1,tag2");
1288 }
1289 }
1290 }
1291
1292 #[test]
1293 fn test_split_into_trace_stats_preserves_bucket_metadata() {
1294 let stats: Vec<ClientGroupedStats> = (0..150)
1296 .map(|i| make_grouped_stats("svc", &format!("res-{}", i)))
1297 .collect();
1298 let bucket = ClientStatsBucket::new(999_000_000, 10_000_000_000, stats).with_agent_time_shift(42);
1299 let payload = make_payload_with_buckets("host1", vec![bucket]);
1300
1301 let result = split_into_trace_stats(vec![payload], 100);
1302
1303 for trace_stats in &result {
1305 for p in trace_stats.stats() {
1306 for b in p.stats() {
1307 assert_eq!(b.start(), 999_000_000);
1308 assert_eq!(b.duration(), 10_000_000_000);
1309 assert_eq!(b.agent_time_shift(), 42);
1310 }
1311 }
1312 }
1313 }
1314
1315 #[test]
1316 fn test_split_into_trace_stats_handles_empty_bucket() {
1317 let empty_bucket = ClientStatsBucket::new(1000, 10_000_000_000, vec![]);
1318 let payload = make_payload_with_buckets("host1", vec![empty_bucket]);
1319
1320 let result = split_into_trace_stats(vec![payload], 100);
1321
1322 assert_eq!(result.len(), 1);
1323 assert_eq!(count_grouped_stats(&result[0]), 0);
1324 }
1325
1326 #[test]
1327 fn test_split_into_trace_stats_large_split() {
1328 let bucket = make_bucket_with_stats(10_000);
1330 let payload = make_payload_with_buckets("host1", vec![bucket]);
1331
1332 let result = split_into_trace_stats(vec![payload], 4000);
1333
1334 assert_eq!(result.len(), 3);
1335 assert_eq!(count_grouped_stats(&result[0]), 4000);
1336 assert_eq!(count_grouped_stats(&result[1]), 4000);
1337 assert_eq!(count_grouped_stats(&result[2]), 2000);
1338 }
1339
1340 fn arb_grouped_stats() -> impl Strategy<Value = ClientGroupedStats> {
1344 (0..100u64, 0..1000u64).prop_map(|(hits, duration)| {
1345 ClientGroupedStats::new("service", "operation", "resource")
1346 .with_hits(hits)
1347 .with_duration(duration)
1348 })
1349 }
1350
1351 fn arb_bucket(max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsBucket> {
1353 proptest::collection::vec(arb_grouped_stats(), 0..=max_stats_per_bucket)
1354 .prop_map(|stats| ClientStatsBucket::new(1000, 10_000_000_000, stats))
1355 }
1356
1357 fn arb_payload(max_buckets: usize, max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsPayload> {
1359 proptest::collection::vec(arb_bucket(max_stats_per_bucket), 1..=max_buckets)
1360 .prop_map(|buckets| ClientStatsPayload::new("host", "env", "1.0.0").with_stats(buckets))
1361 }
1362
1363 fn arb_split_inputs() -> impl Strategy<Value = (Vec<ClientStatsPayload>, usize)> {
1371 let payloads_strategy = proptest::collection::vec(arb_payload(5, 500), 1..=10);
1372 let max_entries_strategy = 1..=1000usize;
1373
1374 (payloads_strategy, max_entries_strategy)
1375 }
1376
1377 #[test_strategy::proptest]
1378 #[cfg_attr(miri, ignore)]
1379 fn property_test_split_respects_max_entries(
1380 #[strategy(arb_split_inputs())] inputs: (Vec<ClientStatsPayload>, usize),
1381 ) {
1382 let (payloads, max_entries_per_event) = inputs;
1383
1384 let input_total: usize = payloads.iter().flat_map(|p| p.stats()).map(|b| b.stats().len()).sum();
1385
1386 let result = split_into_trace_stats(payloads, max_entries_per_event);
1387
1388 for trace_stats in &result {
1390 let count = count_grouped_stats(trace_stats);
1391 prop_assert!(
1392 count <= max_entries_per_event,
1393 "TraceStats has {} grouped stats, exceeds max of {}",
1394 count,
1395 max_entries_per_event
1396 );
1397 }
1398
1399 let output_total: usize = result.iter().map(count_grouped_stats).sum();
1401 prop_assert_eq!(input_total, output_total, "Total stats count should be preserved");
1402 }
1403}