1use std::{
6 sync::Arc,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use async_trait::async_trait;
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{CONTAINER_ID, K8S_POD_UID};
13use saluki_config::GenericConfiguration;
14use saluki_context::{
15 origin::OriginTagCardinality,
16 tags::{SharedTagSet, TagSet},
17};
18use saluki_core::{
19 components::{transforms::*, ComponentContext},
20 data_model::event::{
21 trace::Trace,
22 trace_stats::{ClientStatsPayload, TraceStats},
23 Event, EventType,
24 },
25 topology::OutputDefinition,
26};
27use saluki_env::{
28 host::providers::BoxedHostProvider, workload::EntityId, EnvironmentProvider, HostProvider, WorkloadProvider,
29};
30use saluki_error::{ErrorContext as _, GenericError};
31use stringtheory::MetaString;
32use tokio::{select, time::interval};
33use tracing::{debug, error};
34
35use crate::common::{
36 datadog::apm::ApmConfig,
37 otlp::util::{extract_container_tags_from_resource_tagset, KEY_DATADOG_CONTAINER_ID},
38};
39
40mod aggregation;
41use self::aggregation::{process_tags_hash, PayloadAggregationKey};
42
43mod span_concentrator;
44use self::span_concentrator::{InfraTags, SpanConcentrator};
45
46mod statsraw;
47
48mod weight;
49use self::weight::weight;
50
51const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
53
54const TAG_PROCESS_TAGS: &str = "_dd.tags.process";
56
57const MAX_STATS_GROUPS_PER_EVENT: usize = 4000;
59
60pub struct ApmStatsTransformConfiguration {
65 apm_config: ApmConfig,
66 default_hostname: Option<String>,
67 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
68}
69
70impl ApmStatsTransformConfiguration {
71 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
73 let apm_config = ApmConfig::from_configuration(config)?;
74 Ok(Self {
75 apm_config,
76 default_hostname: None,
77 workload_provider: None,
78 })
79 }
80
81 pub async fn with_environment_provider<E>(mut self, env_provider: E) -> Result<Self, GenericError>
83 where
84 E: EnvironmentProvider<Host = BoxedHostProvider>,
85 {
86 let hostname = env_provider.host().get_hostname().await?;
87 self.default_hostname = Some(hostname);
88 Ok(self)
89 }
90
91 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
95 where
96 W: WorkloadProvider + Send + Sync + 'static,
97 {
98 self.workload_provider = Some(Arc::new(workload_provider));
99 self
100 }
101}
102
103#[async_trait]
104impl TransformBuilder for ApmStatsTransformConfiguration {
105 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
106 let mut apm_config = self.apm_config.clone();
107
108 if let Some(hostname) = &self.default_hostname {
109 apm_config.set_hostname_if_empty(hostname.as_str());
110 }
111
112 let concentrator = SpanConcentrator::new(
113 apm_config.compute_stats_by_span_kind(),
114 apm_config.peer_tags_aggregation(),
115 apm_config.peer_tags(),
116 now_nanos(),
117 );
118
119 Ok(Box::new(ApmStats {
120 concentrator,
121 flush_interval: DEFAULT_FLUSH_INTERVAL,
122 agent_env: apm_config.default_env().clone(),
123 agent_hostname: apm_config.hostname().clone(),
124 workload_provider: self.workload_provider.clone(),
125 }))
126 }
127
128 fn input_event_type(&self) -> EventType {
129 EventType::Trace
130 }
131
132 fn outputs(&self) -> &[OutputDefinition<EventType>] {
133 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::TraceStats)];
134 OUTPUTS
135 }
136}
137
138impl MemoryBounds for ApmStatsTransformConfiguration {
139 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
140 builder.minimum().with_single_value::<ApmStats>("component struct");
141 }
143}
144
145struct ApmStats {
146 concentrator: SpanConcentrator,
147 flush_interval: Duration,
148 agent_env: MetaString,
149 agent_hostname: MetaString,
150 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
151}
152
153impl ApmStats {
154 fn process_trace(&mut self, trace: &Trace) {
155 let root_span = trace
156 .spans()
157 .iter()
158 .find(|s| s.parent_id() == 0)
159 .or_else(|| trace.spans().first());
160
161 let trace_weight = root_span.map(weight).unwrap_or(1.0);
162
163 let process_tags = extract_process_tags(trace);
164
165 let payload_key = self.build_payload_key(trace, &process_tags);
166 let infra_tags = self.build_infra_tags(trace, &process_tags);
167
168 let origin = trace
169 .spans()
170 .first()
171 .and_then(|s| s.meta().get("_dd.origin"))
172 .map(|s| s.as_ref())
173 .unwrap_or("");
174
175 for span in trace.spans() {
176 if let Some(stat_span) = self.concentrator.new_stat_span_from_span(span) {
177 self.concentrator
178 .add_span(&stat_span, trace_weight, &payload_key, &infra_tags, origin);
179 }
180 }
181 }
182
183 fn build_infra_tags(&self, trace: &Trace, process_tags: &str) -> InfraTags {
184 let resource_tags = trace.resource_tags();
185 let container_id = resolve_container_id(resource_tags);
186 let mut container_tags = if container_id.is_empty() {
187 SharedTagSet::default()
188 } else {
189 extract_container_tags(resource_tags)
190 };
191
192 if !container_id.is_empty() {
194 if let Some(workload_provider) = &self.workload_provider {
195 let entity_id = EntityId::Container(container_id.clone());
196 if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
197 container_tags.extend_from_shared(&tags);
198 }
199 }
200 }
201
202 InfraTags::new(container_id, container_tags, process_tags)
203 }
204
205 fn build_payload_key(&self, trace: &Trace, process_tags: &str) -> PayloadAggregationKey {
206 let root_span = trace
207 .spans()
208 .iter()
209 .find(|s| s.parent_id() == 0)
210 .or_else(|| trace.spans().first());
211
212 let span_env = root_span.and_then(|s| s.meta().get("env")).filter(|s| !s.is_empty());
213 let env = span_env.cloned().unwrap_or_else(|| self.agent_env.clone());
214
215 let hostname = root_span
216 .and_then(|s| s.meta().get("_dd.hostname"))
217 .filter(|s| !s.is_empty())
218 .cloned()
219 .unwrap_or_else(|| self.agent_hostname.clone());
220
221 let version = root_span
222 .and_then(|s| s.meta().get("version"))
223 .cloned()
224 .unwrap_or_default();
225
226 let container_id = root_span
227 .and_then(|s| s.meta().get("_dd.container_id"))
228 .cloned()
229 .unwrap_or_default();
230
231 let git_commit_sha = root_span
232 .and_then(|s| s.meta().get("_dd.git.commit.sha"))
233 .cloned()
234 .unwrap_or_default();
235
236 let image_tag = root_span
237 .and_then(|s| s.meta().get("_dd.image_tag"))
238 .cloned()
239 .unwrap_or_default();
240
241 let lang = root_span
242 .and_then(|s| s.meta().get("language"))
243 .cloned()
244 .unwrap_or_default();
245
246 PayloadAggregationKey {
247 env,
248 hostname,
249 version,
250 container_id,
251 git_commit_sha,
252 image_tag,
253 lang,
254 process_tags_hash: process_tags_hash(process_tags),
255 }
256 }
257}
258
259fn split_into_trace_stats(client_payloads: Vec<ClientStatsPayload>, max_entries_per_event: usize) -> Vec<TraceStats> {
265 if client_payloads.is_empty() {
266 return Vec::new();
267 }
268
269 let total_grouped_entries = client_payloads
272 .iter()
273 .map(|p| p.stats().iter().map(|b| b.stats().len()).sum::<usize>())
274 .sum::<usize>();
275 if total_grouped_entries <= max_entries_per_event {
276 return vec![TraceStats::new(client_payloads)];
277 }
278
279 let mut events = Vec::new();
280 let mut current_client_payloads = Vec::new();
281 let mut current_event_len = 0;
282
283 for mut client_payload in client_payloads {
284 let client_payload_len = client_payload.stats().iter().map(|b| b.stats().len()).sum::<usize>();
286 if current_event_len + client_payload_len <= max_entries_per_event {
287 current_client_payloads.push(client_payload);
288 current_event_len += client_payload_len;
289 continue;
290 }
291
292 let mut current_client_stats_buckets = Vec::new();
299 for mut client_stats_bucket in client_payload.take_stats() {
300 let bucket_len = client_stats_bucket.stats().len();
301 if current_event_len + bucket_len <= max_entries_per_event {
303 current_client_stats_buckets.push(client_stats_bucket);
304 current_event_len += bucket_len;
305 continue;
306 }
307
308 let mut bucket_entries = client_stats_bucket.take_stats();
311 while current_event_len + bucket_entries.len() > max_entries_per_event {
312 let split_amount = max_entries_per_event - current_event_len;
316 let split_point = bucket_entries.len() - split_amount;
317 let split_entries = bucket_entries.split_off(split_point);
318
319 let split_bucket = client_stats_bucket.clone().with_stats(split_entries);
323 current_client_stats_buckets.push(split_bucket);
324
325 let split_client_payload = client_payload
326 .clone()
327 .with_stats(std::mem::take(&mut current_client_stats_buckets));
328 current_client_payloads.push(split_client_payload);
329
330 events.push(TraceStats::new(std::mem::take(&mut current_client_payloads)));
331 current_event_len = 0;
332 }
333
334 if !bucket_entries.is_empty() {
337 current_event_len += bucket_entries.len();
338 current_client_stats_buckets.push(client_stats_bucket.with_stats(bucket_entries));
339 }
340 }
341
342 if !current_client_stats_buckets.is_empty() {
344 current_client_payloads.push(client_payload.with_stats(current_client_stats_buckets));
345 }
346 }
347
348 if !current_client_payloads.is_empty() {
350 events.push(TraceStats::new(current_client_payloads));
351 }
352
353 events
354}
355
356#[async_trait]
357impl Transform for ApmStats {
358 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
359 let mut health = context.take_health_handle();
360
361 let mut flush_ticker = interval(self.flush_interval);
362 flush_ticker.tick().await;
363
364 let mut final_flush = false;
365
366 health.mark_ready();
367 debug!("APM Stats transform started.");
368
369 loop {
370 select! {
371 _ = health.live() => continue,
372
373 _ = flush_ticker.tick() => {
374 let stats_payloads = self.concentrator.flush(now_nanos(), final_flush);
375 if !stats_payloads.is_empty() {
376 debug!(stats_payloads = stats_payloads.len(), "Flushing APM stats.");
377
378 let events = split_into_trace_stats(stats_payloads, MAX_STATS_GROUPS_PER_EVENT);
379 let dispatcher = context.dispatcher().buffered()
380 .error_context("Default output should be available.")?;
381
382 if let Err(e) = dispatcher.send_all(events.into_iter().map(Event::TraceStats)).await {
383 error!(error = %e, "Failed to dispatch events.");
384 }
385 }
386
387 if final_flush {
388 debug!("Final APM stats flush complete.");
389 break;
390 }
391 },
392
393 maybe_events = context.events().next(), if !final_flush => {
394 match maybe_events {
395 Some(events) => {
396 for event in events {
397 if let Event::Trace(trace) = event {
398 self.process_trace(&trace);
399 }
400 }
401 },
402 None => {
403 final_flush = true;
406 flush_ticker.reset_immediately();
407 debug!("APM Stats transform stopping, triggering final flush...");
408 }
409 }
410 },
411 }
412 }
413
414 debug!("APM Stats transform stopped.");
415 Ok(())
416 }
417}
418
419fn now_nanos() -> u64 {
421 SystemTime::now()
422 .duration_since(UNIX_EPOCH)
423 .unwrap_or_default()
424 .as_nanos() as u64
425}
426
427fn resolve_container_id(resource_tags: &SharedTagSet) -> MetaString {
429 for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
430 if let Some(tag) = resource_tags.get_single_tag(key) {
431 if let Some(value) = tag.value() {
432 if !value.is_empty() {
433 return MetaString::from(value);
434 }
435 }
436 }
437 }
438
439 MetaString::empty()
440}
441
442fn extract_container_tags(resource_tags: &SharedTagSet) -> SharedTagSet {
444 let mut container_tags_set = TagSet::default();
445 extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags_set);
446
447 container_tags_set.into_shared()
448}
449
450fn extract_process_tags(trace: &Trace) -> MetaString {
452 if let Some(first_span) = trace.spans().first() {
453 if let Some(process_tags) = first_span.meta().get(TAG_PROCESS_TAGS) {
454 return process_tags.clone();
455 }
456 }
457
458 MetaString::empty()
459}
460
461#[cfg(test)]
462mod tests {
463 use proptest::prelude::*;
464 use saluki_common::collections::FastHashMap;
465 use saluki_context::tags::TagSet;
466 use saluki_core::data_model::event::trace_stats::ClientStatsBucket;
467 use saluki_core::data_model::event::{trace::Span, trace_stats::ClientGroupedStats};
468
469 use super::aggregation::BUCKET_DURATION_NS;
470 use super::span_concentrator::METRIC_PARTIAL_VERSION;
471 use super::*;
472
473 fn align_ts(ts: u64, bsize: u64) -> u64 {
475 ts - ts % bsize
476 }
477
478 #[allow(clippy::too_many_arguments)]
480 fn test_span(
481 aligned_now: u64, span_id: u64, parent_id: u64, duration: u64, bucket_offset: u64, service: &str,
482 resource: &str, error: i32, meta: Option<FastHashMap<MetaString, MetaString>>,
483 metrics: Option<FastHashMap<MetaString, f64>>,
484 ) -> Span {
485 let bucket_start = aligned_now - bucket_offset * BUCKET_DURATION_NS;
489 let start = bucket_start - duration;
490
491 Span::new(
492 service, "query", resource, "db", 1, span_id, parent_id, start, duration, error,
493 )
494 .with_meta(meta)
495 .with_metrics(metrics)
496 }
497
498 fn make_test_span(service: &str, name: &str, resource: &str) -> Span {
500 let mut metrics = FastHashMap::default();
501 metrics.insert(MetaString::from("_dd.measured"), 1.0);
502
503 Span::new(service, name, resource, "web", 1, 1, 0, 1000000000, 100000000, 0).with_metrics(metrics)
504 }
505
506 fn make_top_level_span(
508 aligned_now: u64, span_id: u64, duration: u64, bucket_offset: u64, service: &str, resource: &str, error: i32,
509 meta: Option<FastHashMap<MetaString, MetaString>>,
510 ) -> Span {
511 let mut metrics = FastHashMap::default();
512 metrics.insert(MetaString::from("_top_level"), 1.0);
513 test_span(
514 aligned_now,
515 span_id,
516 0,
517 duration,
518 bucket_offset,
519 service,
520 resource,
521 error,
522 meta,
523 Some(metrics),
524 )
525 }
526
527 #[test]
528 fn test_process_trace_creates_stats() {
529 let now = now_nanos();
530
531 let concentrator = SpanConcentrator::new(true, true, &[], now);
532 let mut transform = ApmStats {
533 concentrator,
534 flush_interval: DEFAULT_FLUSH_INTERVAL,
535 agent_env: MetaString::from("none"),
536 agent_hostname: MetaString::default(),
537 workload_provider: None,
538 };
539
540 let span = make_test_span("test-service", "test-operation", "test-resource");
541 let trace = Trace::new(vec![span], TagSet::default());
542
543 transform.process_trace(&trace);
544
545 let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
547 assert!(!stats.is_empty(), "Expected stats to be produced");
548 }
549
550 #[test]
551 fn test_weight_applied_to_stats() {
552 let now = now_nanos();
553
554 let concentrator = SpanConcentrator::new(true, true, &[], now);
555 let mut transform = ApmStats {
556 concentrator,
557 flush_interval: DEFAULT_FLUSH_INTERVAL,
558 agent_env: MetaString::from("none"),
559 agent_hostname: MetaString::default(),
560 workload_provider: None,
561 };
562
563 let mut metrics = FastHashMap::default();
565 metrics.insert(MetaString::from("_dd.measured"), 1.0);
566 metrics.insert(MetaString::from("_sample_rate"), 0.5);
567
568 let span = Span::new(
569 "test-service",
570 "test-op",
571 "test-resource",
572 "web",
573 1,
574 1,
575 0,
576 now,
577 100000000,
578 0,
579 )
580 .with_metrics(metrics);
581
582 let trace = Trace::new(vec![span], TagSet::default());
583 transform.process_trace(&trace);
584
585 let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
586 assert!(!stats.is_empty());
587
588 let bucket = &stats[0].stats()[0];
590 let grouped = &bucket.stats()[0];
591 assert!(grouped.hits() >= 1, "Expected weighted hits");
593 }
594
595 #[test]
596 fn test_force_flush() {
597 let now = now_nanos();
598 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
599
600 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
601
602 let span = make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None);
604 let trace = Trace::new(vec![span], TagSet::default());
605
606 let payload_key = PayloadAggregationKey {
607 env: MetaString::from("test"),
608 hostname: MetaString::from("host"),
609 ..Default::default()
610 };
611 let infra_tags = InfraTags::default();
612
613 for span in trace.spans() {
614 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
615 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
616 }
617 }
618
619 let ts: u64 = 0;
621
622 let stats = concentrator.flush(ts, false);
624 assert!(stats.is_empty(), "Non-force flush should return empty");
625
626 let stats = concentrator.flush(ts, true);
628 assert!(!stats.is_empty(), "Force flush should return stats");
629 assert_eq!(stats[0].stats().len(), 1, "Should have 1 bucket");
630 }
631
632 #[test]
633 fn test_ignores_partial_spans() {
634 let now = now_nanos();
635 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
636
637 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
638
639 let mut metrics = FastHashMap::default();
641 metrics.insert(MetaString::from("_top_level"), 1.0);
642 metrics.insert(MetaString::from(METRIC_PARTIAL_VERSION), 830604.0);
643
644 let span = test_span(aligned_now, 1, 0, 50, 5, "A1", "resource1", 0, None, Some(metrics));
645 let trace = Trace::new(vec![span], TagSet::default());
646
647 let payload_key = PayloadAggregationKey {
648 env: MetaString::from("test"),
649 hostname: MetaString::from("tracer-hostname"),
650 ..Default::default()
651 };
652 let infra_tags = InfraTags::default();
653
654 for span in trace.spans() {
655 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
656 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
657 }
658 }
659
660 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
662 assert!(stats.is_empty(), "Partial spans should be ignored");
663 }
664
665 #[test]
666 fn test_concentrator_stats_totals() {
667 let now = now_nanos();
668 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
669
670 let oldest_ts = aligned_now - 2 * BUCKET_DURATION_NS;
672 let mut concentrator = SpanConcentrator::new(true, true, &[], oldest_ts);
673
674 let spans = vec![
676 make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
677 make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
678 make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
679 make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
680 make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
681 make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
682 ];
683
684 let payload_key = PayloadAggregationKey {
685 env: MetaString::from("none"),
686 ..Default::default()
687 };
688 let infra_tags = InfraTags::default();
689
690 for span in &spans {
691 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
692 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
693 }
694 }
695
696 let all_stats = concentrator.flush(now + BUCKET_DURATION_NS * 10, true);
698
699 let mut total_duration: u64 = 0;
700 let mut total_hits: u64 = 0;
701 let mut total_errors: u64 = 0;
702 let mut total_top_level_hits: u64 = 0;
703
704 for payload in &all_stats {
705 for bucket in payload.stats() {
706 for grouped in bucket.stats() {
707 total_duration += grouped.duration();
708 total_hits += grouped.hits();
709 total_errors += grouped.errors();
710 total_top_level_hits += grouped.top_level_hits();
711 }
712 }
713 }
714
715 assert_eq!(total_duration, 50 + 40 + 30 + 20 + 10 + 1, "Wrong total duration");
716 assert_eq!(total_hits, 6, "Wrong total hits");
717 assert_eq!(total_top_level_hits, 6, "Wrong total top level hits");
718 assert_eq!(total_errors, 0, "Wrong total errors");
719 }
720
721 #[test]
722 fn test_root_tag() {
723 let now = now_nanos();
724 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
725
726 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
727
728 let mut root_metrics = FastHashMap::default();
730 root_metrics.insert(MetaString::from("_top_level"), 1.0);
731 let root_span = test_span(
732 aligned_now,
733 1,
734 0,
735 40,
736 10,
737 "A1",
738 "resource1",
739 0,
740 None,
741 Some(root_metrics),
742 );
743
744 let mut top_level_metrics = FastHashMap::default();
746 top_level_metrics.insert(MetaString::from("_top_level"), 1.0);
747 let top_level_span = test_span(
748 aligned_now,
749 4,
750 1000,
751 10,
752 10,
753 "A1",
754 "resource1",
755 0,
756 None,
757 Some(top_level_metrics),
758 );
759
760 let mut client_meta = FastHashMap::default();
762 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
763 let client_span = test_span(aligned_now, 3, 2, 20, 10, "A1", "resource1", 0, Some(client_meta), None);
764
765 let spans = vec![root_span, top_level_span, client_span];
766
767 let payload_key = PayloadAggregationKey {
768 env: MetaString::from("none"),
769 ..Default::default()
770 };
771 let infra_tags = InfraTags::default();
772
773 for span in &spans {
774 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
775 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
776 }
777 }
778
779 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 20, true);
780 assert!(!stats.is_empty(), "Should have stats");
781
782 let mut total_grouped = 0;
784 let mut root_count = 0;
785 let mut non_root_count = 0;
786
787 for payload in &stats {
788 for bucket in payload.stats() {
789 for grouped in bucket.stats() {
790 total_grouped += 1;
791 match grouped.is_trace_root() {
792 Some(true) => root_count += 1,
793 Some(false) => non_root_count += 1,
794 None => {}
795 }
796 }
797 }
798 }
799
800 assert_eq!(total_grouped, 3, "Expected 3 grouped stats");
805 assert_eq!(root_count, 1, "Expected 1 root span");
806 assert_eq!(non_root_count, 2, "Expected 2 non-root spans");
807 }
808
809 #[test]
810 fn test_compute_stats_through_span_kind_check() {
811 let now = now_nanos();
812
813 {
815 let mut concentrator = SpanConcentrator::new(false, true, &[], now);
816
817 let mut metrics = FastHashMap::default();
819 metrics.insert(MetaString::from("_top_level"), 1.0);
820 let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
821
822 let payload_key = PayloadAggregationKey {
823 env: MetaString::from("test"),
824 ..Default::default()
825 };
826 let infra_tags = InfraTags::default();
827
828 if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
829 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
830 }
831
832 let mut client_meta = FastHashMap::default();
835 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
836 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
837 .with_meta(client_meta);
838
839 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
840 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
841 }
842
843 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
844
845 let mut count = 0;
846 for payload in &stats {
847 for bucket in payload.stats() {
848 count += bucket.stats().len();
849 }
850 }
851
852 assert_eq!(count, 1, "Expected 1 stat when span kind check disabled");
854 }
855
856 {
858 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
859
860 let mut metrics = FastHashMap::default();
862 metrics.insert(MetaString::from("_top_level"), 1.0);
863 let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
864
865 let payload_key = PayloadAggregationKey {
866 env: MetaString::from("test"),
867 ..Default::default()
868 };
869 let infra_tags = InfraTags::default();
870
871 if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
872 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
873 }
874
875 let mut client_meta = FastHashMap::default();
878 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
879 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
880 .with_meta(client_meta);
881
882 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
883 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
884 }
885
886 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
887
888 let mut count = 0;
889 for payload in &stats {
890 for bucket in payload.stats() {
891 count += bucket.stats().len();
892 }
893 }
894
895 assert_eq!(count, 2, "Expected 2 stats when span kind check enabled");
897 }
898 }
899
900 #[test]
901 fn test_peer_tags() {
902 let now = now_nanos();
903
904 {
906 let mut concentrator = SpanConcentrator::new(true, false, &[], now);
907
908 let mut client_meta = FastHashMap::default();
910 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
911 client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
912 client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
913 let mut client_metrics = FastHashMap::default();
914 client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
915 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
916 .with_meta(client_meta)
917 .with_metrics(client_metrics);
918
919 let payload_key = PayloadAggregationKey {
920 env: MetaString::from("test"),
921 ..Default::default()
922 };
923 let infra_tags = InfraTags::default();
924
925 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
926 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
927 }
928
929 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
930
931 for payload in &stats {
933 for bucket in payload.stats() {
934 for grouped in bucket.stats() {
935 assert!(
936 grouped.peer_tags().is_empty(),
937 "Peer tags should be empty when peer_tags_aggregation is false"
938 );
939 }
940 }
941 }
942 }
943
944 {
946 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
948
949 let mut client_meta = FastHashMap::default();
951 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
952 client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
953 client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
954 let mut client_metrics = FastHashMap::default();
955 client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
956 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
957 .with_meta(client_meta)
958 .with_metrics(client_metrics);
959
960 let payload_key = PayloadAggregationKey {
961 env: MetaString::from("test"),
962 ..Default::default()
963 };
964 let infra_tags = InfraTags::default();
965
966 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
967 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
968 }
969
970 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
971
972 let mut found_client_with_peer_tags = false;
974 for payload in &stats {
975 for bucket in payload.stats() {
976 for grouped in bucket.stats() {
977 if grouped.resource() == "SELECT ..." {
978 assert!(!grouped.peer_tags().is_empty(), "Client span should have peer tags");
979 let peer_tags: Vec<&str> = grouped.peer_tags().iter().map(|s| s.as_ref()).collect();
981 assert!(
982 peer_tags.iter().any(|t| t.starts_with("db.instance:")),
983 "Should have db.instance peer tag"
984 );
985 assert!(
986 peer_tags.iter().any(|t| t.starts_with("db.system:")),
987 "Should have db.system peer tag"
988 );
989 found_client_with_peer_tags = true;
990 }
991 }
992 }
993 }
994 assert!(
995 found_client_with_peer_tags,
996 "Should have found client span with peer tags"
997 );
998 }
999 }
1000
1001 #[test]
1002 fn test_concentrator_oldest_ts() {
1003 let now = now_nanos();
1004 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
1005
1006 {
1008 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
1010
1011 let spans = vec![
1013 make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
1014 make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
1015 make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
1016 make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
1017 make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
1018 make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
1019 ];
1020
1021 let payload_key = PayloadAggregationKey {
1022 env: MetaString::from("none"),
1023 ..Default::default()
1024 };
1025 let infra_tags = InfraTags::default();
1026
1027 for span in &spans {
1028 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
1029 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
1030 }
1031 }
1032
1033 let mut flush_time = now;
1035 let buffer_len = 2; for _ in 0..buffer_len {
1038 let stats = concentrator.flush(flush_time, false);
1039 assert!(stats.is_empty(), "Should not flush before buffer fills");
1040 flush_time += BUCKET_DURATION_NS;
1041 }
1042
1043 let stats = concentrator.flush(flush_time, false);
1045 assert!(!stats.is_empty(), "Should flush after buffer fills");
1046
1047 let mut total_hits: u64 = 0;
1049 let mut total_duration: u64 = 0;
1050 for payload in &stats {
1051 for bucket in payload.stats() {
1052 for grouped in bucket.stats() {
1053 total_hits += grouped.hits();
1054 total_duration += grouped.duration();
1055 }
1056 }
1057 }
1058
1059 assert_eq!(total_hits, 6, "All 6 spans should be counted");
1060 assert_eq!(
1061 total_duration,
1062 50 + 40 + 30 + 20 + 10 + 1,
1063 "Total duration should match"
1064 );
1065 }
1066 }
1067
1068 #[test]
1069 fn test_compute_stats_for_span_kind() {
1070 use super::span_concentrator::compute_stats_for_span_kind;
1071
1072 assert!(compute_stats_for_span_kind("server"));
1074 assert!(compute_stats_for_span_kind("consumer"));
1075 assert!(compute_stats_for_span_kind("client"));
1076 assert!(compute_stats_for_span_kind("producer"));
1077
1078 assert!(compute_stats_for_span_kind("SERVER"));
1080 assert!(compute_stats_for_span_kind("CONSUMER"));
1081 assert!(compute_stats_for_span_kind("CLIENT"));
1082 assert!(compute_stats_for_span_kind("PRODUCER"));
1083
1084 assert!(compute_stats_for_span_kind("SErVER"));
1086 assert!(compute_stats_for_span_kind("COnSUMER"));
1087 assert!(compute_stats_for_span_kind("CLiENT"));
1088 assert!(compute_stats_for_span_kind("PRoDUCER"));
1089
1090 assert!(!compute_stats_for_span_kind("internal"));
1092 assert!(!compute_stats_for_span_kind("INTERNAL"));
1093 assert!(!compute_stats_for_span_kind("INtERNAL"));
1094 assert!(!compute_stats_for_span_kind(""));
1095 }
1096
1097 #[test]
1098 fn test_extract_process_tags() {
1099 {
1101 let span = Span::default();
1102 let trace = Trace::new(vec![span], TagSet::default());
1103 let process_tags = extract_process_tags(&trace);
1104 assert!(process_tags.is_empty(), "Should be empty when no _dd.tags.process");
1105 }
1106
1107 {
1109 let mut meta = FastHashMap::default();
1110 meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from("a:1,b:2,c:3"));
1111 let span = Span::default().with_meta(meta);
1112 let trace = Trace::new(vec![span], TagSet::default());
1113 let process_tags = extract_process_tags(&trace);
1114 assert_eq!(process_tags, "a:1,b:2,c:3");
1115 }
1116
1117 {
1119 let mut meta = FastHashMap::default();
1120 meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from(""));
1121 let span = Span::default().with_meta(meta);
1122 let trace = Trace::new(vec![span], TagSet::default());
1123 let process_tags = extract_process_tags(&trace);
1124 assert!(
1125 process_tags.is_empty(),
1126 "Should be empty when _dd.tags.process is empty string"
1127 );
1128 }
1129
1130 {
1132 let trace = Trace::new(vec![], TagSet::default());
1133 let process_tags = extract_process_tags(&trace);
1134 assert!(process_tags.is_empty(), "Should be empty when trace has no spans");
1135 }
1136 }
1137
1138 #[test]
1139 fn test_process_tags_hash_computation() {
1140 use super::aggregation::process_tags_hash;
1141
1142 assert_eq!(process_tags_hash(""), 0);
1144
1145 let hash1 = process_tags_hash("a:1,b:2,c:3");
1147 let hash2 = process_tags_hash("a:1,b:2,c:3");
1148 assert_eq!(hash1, hash2);
1149
1150 let hash3 = process_tags_hash("a:1,b:2");
1152 assert_ne!(hash1, hash3);
1153 }
1154
1155 fn make_grouped_stats(service: &str, resource: &str) -> ClientGroupedStats {
1157 ClientGroupedStats::new(service, "operation", resource)
1158 .with_hits(1)
1159 .with_duration(100)
1160 }
1161
1162 fn make_bucket_with_stats(n: usize) -> ClientStatsBucket {
1164 let stats: Vec<ClientGroupedStats> = (0..n)
1165 .map(|i| make_grouped_stats("service", &format!("resource-{}", i)))
1166 .collect();
1167 ClientStatsBucket::new(1000, 10_000_000_000, stats)
1168 }
1169
1170 fn make_payload_with_buckets(hostname: &str, buckets: Vec<ClientStatsBucket>) -> ClientStatsPayload {
1172 ClientStatsPayload::new(hostname, "test-env", "1.0.0")
1173 .with_stats(buckets)
1174 .with_container_id("container-123")
1175 .with_lang("rust")
1176 }
1177
1178 fn count_grouped_stats(trace_stats: &TraceStats) -> usize {
1180 trace_stats
1181 .stats()
1182 .iter()
1183 .flat_map(|p| p.stats())
1184 .map(|b| b.stats().len())
1185 .sum()
1186 }
1187
1188 #[test]
1189 fn test_split_into_trace_stats_empty_input() {
1190 let result = split_into_trace_stats(vec![], 100);
1191 assert!(result.is_empty());
1192 }
1193
1194 #[test]
1195 fn test_split_into_trace_stats_no_split_needed() {
1196 let bucket = make_bucket_with_stats(50);
1198 let payload = make_payload_with_buckets("host1", vec![bucket]);
1199
1200 let result = split_into_trace_stats(vec![payload], 100);
1201
1202 assert_eq!(result.len(), 1);
1203 assert_eq!(count_grouped_stats(&result[0]), 50);
1204 }
1205
1206 #[test]
1207 fn test_split_into_trace_stats_exact_threshold() {
1208 let bucket = make_bucket_with_stats(100);
1210 let payload = make_payload_with_buckets("host1", vec![bucket]);
1211
1212 let result = split_into_trace_stats(vec![payload], 100);
1213
1214 assert_eq!(result.len(), 1);
1215 assert_eq!(count_grouped_stats(&result[0]), 100);
1216 }
1217
1218 #[test]
1219 fn test_split_into_trace_stats_splits_single_bucket() {
1220 let bucket = make_bucket_with_stats(250);
1222 let payload = make_payload_with_buckets("host1", vec![bucket]);
1223
1224 let result = split_into_trace_stats(vec![payload], 100);
1225
1226 assert_eq!(result.len(), 3);
1227 assert_eq!(count_grouped_stats(&result[0]), 100);
1228 assert_eq!(count_grouped_stats(&result[1]), 100);
1229 assert_eq!(count_grouped_stats(&result[2]), 50);
1230
1231 let total: usize = result.iter().map(count_grouped_stats).sum();
1233 assert_eq!(total, 250);
1234 }
1235
1236 #[test]
1237 fn test_split_into_trace_stats_splits_across_payloads() {
1238 let payload1 = make_payload_with_buckets("host1", vec![make_bucket_with_stats(60)]);
1240 let payload2 = make_payload_with_buckets("host2", vec![make_bucket_with_stats(60)]);
1241
1242 let result = split_into_trace_stats(vec![payload1, payload2], 100);
1243
1244 assert_eq!(result.len(), 2);
1245 assert_eq!(count_grouped_stats(&result[0]), 100);
1247 assert_eq!(count_grouped_stats(&result[1]), 20);
1249 }
1250
1251 #[test]
1252 fn test_split_into_trace_stats_splits_single_payload_multiple_buckets() {
1253 let bucket1 = make_bucket_with_stats(70);
1255 let bucket2 = make_bucket_with_stats(80);
1256 let payload = make_payload_with_buckets("host1", vec![bucket1, bucket2]);
1257
1258 let result = split_into_trace_stats(vec![payload], 100);
1259
1260 assert_eq!(result.len(), 2);
1261 let total: usize = result.iter().map(count_grouped_stats).sum();
1262 assert_eq!(total, 150);
1263 }
1264
1265 #[test]
1266 fn test_split_into_trace_stats_preserves_metadata() {
1267 let bucket = make_bucket_with_stats(250);
1268 let payload = ClientStatsPayload::new("test-host", "prod", "2.0.0")
1269 .with_stats(vec![bucket])
1270 .with_container_id("container-abc")
1271 .with_lang("go")
1272 .with_git_commit_sha("abc123")
1273 .with_image_tag("v1.2.3")
1274 .with_process_tags_hash(12345)
1275 .with_process_tags("tag1,tag2");
1276
1277 let result = split_into_trace_stats(vec![payload], 100);
1278
1279 for trace_stats in &result {
1281 for p in trace_stats.stats() {
1282 assert_eq!(p.hostname(), "test-host");
1283 assert_eq!(p.env(), "prod");
1284 assert_eq!(p.version(), "2.0.0");
1285 assert_eq!(p.container_id(), "container-abc");
1286 assert_eq!(p.lang(), "go");
1287 assert_eq!(p.git_commit_sha(), "abc123");
1288 assert_eq!(p.image_tag(), "v1.2.3");
1289 assert_eq!(p.process_tags_hash(), 12345);
1290 assert_eq!(p.process_tags(), "tag1,tag2");
1291 }
1292 }
1293 }
1294
1295 #[test]
1296 fn test_split_into_trace_stats_preserves_bucket_metadata() {
1297 let stats: Vec<ClientGroupedStats> = (0..150)
1299 .map(|i| make_grouped_stats("svc", &format!("res-{}", i)))
1300 .collect();
1301 let bucket = ClientStatsBucket::new(999_000_000, 10_000_000_000, stats).with_agent_time_shift(42);
1302 let payload = make_payload_with_buckets("host1", vec![bucket]);
1303
1304 let result = split_into_trace_stats(vec![payload], 100);
1305
1306 for trace_stats in &result {
1308 for p in trace_stats.stats() {
1309 for b in p.stats() {
1310 assert_eq!(b.start(), 999_000_000);
1311 assert_eq!(b.duration(), 10_000_000_000);
1312 assert_eq!(b.agent_time_shift(), 42);
1313 }
1314 }
1315 }
1316 }
1317
1318 #[test]
1319 fn test_split_into_trace_stats_handles_empty_bucket() {
1320 let empty_bucket = ClientStatsBucket::new(1000, 10_000_000_000, vec![]);
1321 let payload = make_payload_with_buckets("host1", vec![empty_bucket]);
1322
1323 let result = split_into_trace_stats(vec![payload], 100);
1324
1325 assert_eq!(result.len(), 1);
1326 assert_eq!(count_grouped_stats(&result[0]), 0);
1327 }
1328
1329 #[test]
1330 fn test_split_into_trace_stats_large_split() {
1331 let bucket = make_bucket_with_stats(10_000);
1333 let payload = make_payload_with_buckets("host1", vec![bucket]);
1334
1335 let result = split_into_trace_stats(vec![payload], 4000);
1336
1337 assert_eq!(result.len(), 3);
1338 assert_eq!(count_grouped_stats(&result[0]), 4000);
1339 assert_eq!(count_grouped_stats(&result[1]), 4000);
1340 assert_eq!(count_grouped_stats(&result[2]), 2000);
1341 }
1342
1343 fn arb_grouped_stats() -> impl Strategy<Value = ClientGroupedStats> {
1347 (0..100u64, 0..1000u64).prop_map(|(hits, duration)| {
1348 ClientGroupedStats::new("service", "operation", "resource")
1349 .with_hits(hits)
1350 .with_duration(duration)
1351 })
1352 }
1353
1354 fn arb_bucket(max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsBucket> {
1356 proptest::collection::vec(arb_grouped_stats(), 0..=max_stats_per_bucket)
1357 .prop_map(|stats| ClientStatsBucket::new(1000, 10_000_000_000, stats))
1358 }
1359
1360 fn arb_payload(max_buckets: usize, max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsPayload> {
1362 proptest::collection::vec(arb_bucket(max_stats_per_bucket), 1..=max_buckets)
1363 .prop_map(|buckets| ClientStatsPayload::new("host", "env", "1.0.0").with_stats(buckets))
1364 }
1365
1366 fn arb_split_inputs() -> impl Strategy<Value = (Vec<ClientStatsPayload>, usize)> {
1374 let payloads_strategy = proptest::collection::vec(arb_payload(5, 500), 1..=10);
1375 let max_entries_strategy = 1..=1000usize;
1376
1377 (payloads_strategy, max_entries_strategy)
1378 }
1379
1380 #[test_strategy::proptest]
1381 #[cfg_attr(miri, ignore)]
1382 fn property_test_split_respects_max_entries(
1383 #[strategy(arb_split_inputs())] inputs: (Vec<ClientStatsPayload>, usize),
1384 ) {
1385 let (payloads, max_entries_per_event) = inputs;
1386
1387 let input_total: usize = payloads.iter().flat_map(|p| p.stats()).map(|b| b.stats().len()).sum();
1388
1389 let result = split_into_trace_stats(payloads, max_entries_per_event);
1390
1391 for trace_stats in &result {
1393 let count = count_grouped_stats(trace_stats);
1394 prop_assert!(
1395 count <= max_entries_per_event,
1396 "TraceStats has {} grouped stats, exceeds max of {}",
1397 count,
1398 max_entries_per_event
1399 );
1400 }
1401
1402 let output_total: usize = result.iter().map(count_grouped_stats).sum();
1404 prop_assert_eq!(input_total, output_total, "Total stats count should be preserved");
1405 }
1406}