1use std::{
6 sync::Arc,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use async_trait::async_trait;
11use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use saluki_config::GenericConfiguration;
13use saluki_context::{origin::OriginTagCardinality, tags::TagSet};
14use saluki_core::{
15 components::{transforms::*, ComponentContext},
16 data_model::event::{
17 trace::{AttributeValue, Trace},
18 trace_stats::{ClientStatsPayload, TraceStats},
19 Event, EventType,
20 },
21 topology::OutputDefinition,
22};
23use saluki_env::{
24 host::providers::BoxedHostProvider, workload::EntityId, EnvironmentProvider, HostProvider, WorkloadProvider,
25};
26use saluki_error::{ErrorContext as _, GenericError};
27use stringtheory::MetaString;
28use tokio::{select, time::interval};
29use tracing::{debug, error};
30
31use crate::common::{datadog::apm::ApmConfig, otlp::util::extract_container_tags_from_attributes_map};
32
33mod aggregation;
34pub(crate) use self::aggregation::{process_tags_hash, PayloadAggregationKey};
35
36mod span_concentrator;
37pub(crate) use self::span_concentrator::{InfraTags, SpanConcentrator};
38
39mod statsraw;
40
41mod weight;
42use self::weight::weight;
43
44const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
46
47const TAG_PROCESS_TAGS: &str = "_dd.tags.process";
49
50const MAX_STATS_GROUPS_PER_EVENT: usize = 4000;
52
53pub struct ApmStatsTransformConfiguration {
58 apm_config: ApmConfig,
59 default_hostname: Option<String>,
60 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
61}
62
63impl ApmStatsTransformConfiguration {
64 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
66 let apm_config = ApmConfig::from_configuration(config)?;
67 Ok(Self {
68 apm_config,
69 default_hostname: None,
70 workload_provider: None,
71 })
72 }
73
74 pub async fn with_environment_provider<E>(mut self, env_provider: E) -> Result<Self, GenericError>
76 where
77 E: EnvironmentProvider<Host = BoxedHostProvider>,
78 {
79 let hostname = env_provider.host().get_hostname().await?;
80 self.default_hostname = Some(hostname);
81 Ok(self)
82 }
83
84 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
88 where
89 W: WorkloadProvider + Send + Sync + 'static,
90 {
91 self.workload_provider = Some(Arc::new(workload_provider));
92 self
93 }
94}
95
96#[async_trait]
97impl TransformBuilder for ApmStatsTransformConfiguration {
98 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
99 let mut apm_config = self.apm_config.clone();
100
101 if let Some(hostname) = &self.default_hostname {
102 apm_config.set_hostname_if_empty(hostname.as_str());
103 }
104
105 let concentrator = SpanConcentrator::new(
106 apm_config.compute_stats_by_span_kind(),
107 apm_config.peer_tags_aggregation(),
108 apm_config.peer_tags(),
109 now_nanos(),
110 );
111
112 Ok(Box::new(ApmStats {
113 concentrator,
114 flush_interval: DEFAULT_FLUSH_INTERVAL,
115 agent_env: apm_config.default_env().clone(),
116 agent_hostname: apm_config.hostname().clone(),
117 workload_provider: self.workload_provider.clone(),
118 }))
119 }
120
121 fn input_event_type(&self) -> EventType {
122 EventType::Trace
123 }
124
125 fn outputs(&self) -> &[OutputDefinition<EventType>] {
126 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::TraceStats)];
127 OUTPUTS
128 }
129}
130
131impl MemoryBounds for ApmStatsTransformConfiguration {
132 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
133 builder.minimum().with_single_value::<ApmStats>("component struct");
134 }
136}
137
138struct ApmStats {
139 concentrator: SpanConcentrator,
140 flush_interval: Duration,
141 agent_env: MetaString,
142 agent_hostname: MetaString,
143 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
144}
145
146impl ApmStats {
147 fn process_trace(&mut self, trace: &Trace) {
148 let root_span = trace
149 .spans()
150 .iter()
151 .find(|s| s.parent_id() == 0)
152 .or_else(|| trace.spans().first());
153
154 let trace_weight = root_span.map(weight).unwrap_or(1.0);
155
156 let process_tags = extract_process_tags(trace);
157
158 let payload_key = self.build_payload_key(trace, &process_tags);
159 let infra_tags = self.build_infra_tags(trace, &process_tags);
160
161 let origin = trace
162 .spans()
163 .first()
164 .and_then(|s| s.attributes.get("_dd.origin").and_then(AttributeValue::as_string))
165 .map(|s| s.as_ref())
166 .unwrap_or("");
167
168 for span in trace.spans() {
169 if let Some(stat_span) = self.concentrator.new_stat_span_from_span(span) {
170 self.concentrator
171 .add_span(&stat_span, trace_weight, &payload_key, &infra_tags, origin);
172 }
173 }
174 }
175
176 fn build_infra_tags(&self, trace: &Trace, process_tags: &str) -> InfraTags {
177 let container_id = trace.payload.container_id.clone();
178 let mut container_tags = if container_id.is_empty() {
179 TagSet::default()
180 } else {
181 let mut tags = TagSet::default();
182 extract_container_tags_from_attributes_map(&trace.attributes, &mut tags);
183 tags
184 };
185
186 if !container_id.is_empty() {
187 if let Some(workload_provider) = &self.workload_provider {
188 let entity_id = EntityId::Container(container_id.clone());
189 if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
190 container_tags.merge_shared(&tags);
191 }
192 }
193 }
194
195 InfraTags::new(container_id, container_tags, process_tags)
196 }
197
198 fn build_payload_key(&self, trace: &Trace, process_tags: &str) -> PayloadAggregationKey {
199 let root_span = trace
200 .spans()
201 .iter()
202 .find(|s| s.parent_id() == 0)
203 .or_else(|| trace.spans().first());
204
205 let env = root_span
208 .and_then(|s| {
209 s.attributes
210 .get("env")
211 .and_then(AttributeValue::as_string)
212 .filter(|s| !s.is_empty())
213 })
214 .cloned()
215 .or_else(|| {
216 trace.spans().iter().find_map(|s| {
217 s.attributes
218 .get("env")
219 .and_then(AttributeValue::as_string)
220 .filter(|s| !s.is_empty())
221 .cloned()
222 })
223 })
224 .unwrap_or_else(|| {
225 if !trace.payload.env.is_empty() {
226 trace.payload.env.clone()
227 } else {
228 self.agent_env.clone()
229 }
230 });
231
232 let hostname = root_span
233 .and_then(|s| {
234 s.attributes
235 .get("_dd.hostname")
236 .and_then(AttributeValue::as_string)
237 .filter(|s| !s.is_empty())
238 })
239 .cloned()
240 .unwrap_or_else(|| {
241 if !trace.payload.hostname.is_empty() {
242 trace.payload.hostname.clone()
243 } else {
244 self.agent_hostname.clone()
245 }
246 });
247
248 let version = root_span
252 .and_then(|s| {
253 s.attributes
254 .get("version")
255 .and_then(AttributeValue::as_string)
256 .filter(|s| !s.is_empty())
257 })
258 .cloned()
259 .unwrap_or_else(|| trace.payload.app_version.clone());
260
261 let container_id = if !trace.payload.container_id.is_empty() {
262 trace.payload.container_id.clone()
263 } else {
264 root_span
265 .and_then(|s| s.attributes.get("_dd.container_id").and_then(AttributeValue::as_string))
266 .cloned()
267 .unwrap_or_default()
268 };
269
270 let git_commit_sha = root_span
271 .and_then(|s| {
272 s.attributes
273 .get("_dd.git.commit.sha")
274 .and_then(AttributeValue::as_string)
275 .filter(|s| !s.is_empty())
276 })
277 .cloned()
278 .unwrap_or_default();
279
280 let image_tag = root_span
281 .and_then(|s| {
282 s.attributes
283 .get("_dd.image_tag")
284 .and_then(AttributeValue::as_string)
285 .filter(|s| !s.is_empty())
286 })
287 .cloned()
288 .unwrap_or_default();
289
290 let lang = if !trace.payload.language_name.is_empty() {
291 trace.payload.language_name.clone()
292 } else {
293 root_span
294 .and_then(|s| s.attributes.get("language").and_then(AttributeValue::as_string))
295 .cloned()
296 .unwrap_or_default()
297 };
298
299 PayloadAggregationKey {
300 env,
301 hostname,
302 version,
303 container_id,
304 git_commit_sha,
305 image_tag,
306 lang,
307 process_tags_hash: process_tags_hash(process_tags),
308 }
309 }
310}
311
312fn split_into_trace_stats(client_payloads: Vec<ClientStatsPayload>, max_entries_per_event: usize) -> Vec<TraceStats> {
318 if client_payloads.is_empty() {
319 return Vec::new();
320 }
321
322 let total_grouped_entries = client_payloads
325 .iter()
326 .map(|p| p.stats().iter().map(|b| b.stats().len()).sum::<usize>())
327 .sum::<usize>();
328 if total_grouped_entries <= max_entries_per_event {
329 return vec![TraceStats::new(client_payloads)];
330 }
331
332 let mut events = Vec::new();
333 let mut current_client_payloads = Vec::new();
334 let mut current_event_len = 0;
335
336 for mut client_payload in client_payloads {
337 let client_payload_len = client_payload.stats().iter().map(|b| b.stats().len()).sum::<usize>();
339 if current_event_len + client_payload_len <= max_entries_per_event {
340 current_client_payloads.push(client_payload);
341 current_event_len += client_payload_len;
342 continue;
343 }
344
345 let mut current_client_stats_buckets = Vec::new();
352 for mut client_stats_bucket in client_payload.take_stats() {
353 let bucket_len = client_stats_bucket.stats().len();
354 if current_event_len + bucket_len <= max_entries_per_event {
356 current_client_stats_buckets.push(client_stats_bucket);
357 current_event_len += bucket_len;
358 continue;
359 }
360
361 let mut bucket_entries = client_stats_bucket.take_stats();
364 while current_event_len + bucket_entries.len() > max_entries_per_event {
365 let split_amount = max_entries_per_event - current_event_len;
369 let split_point = bucket_entries.len() - split_amount;
370 let split_entries = bucket_entries.split_off(split_point);
371
372 let split_bucket = client_stats_bucket.clone().with_stats(split_entries);
376 current_client_stats_buckets.push(split_bucket);
377
378 let split_client_payload = client_payload
379 .clone()
380 .with_stats(std::mem::take(&mut current_client_stats_buckets));
381 current_client_payloads.push(split_client_payload);
382
383 events.push(TraceStats::new(std::mem::take(&mut current_client_payloads)));
384 current_event_len = 0;
385 }
386
387 if !bucket_entries.is_empty() {
390 current_event_len += bucket_entries.len();
391 current_client_stats_buckets.push(client_stats_bucket.with_stats(bucket_entries));
392 }
393 }
394
395 if !current_client_stats_buckets.is_empty() {
397 current_client_payloads.push(client_payload.with_stats(current_client_stats_buckets));
398 }
399 }
400
401 if !current_client_payloads.is_empty() {
403 events.push(TraceStats::new(current_client_payloads));
404 }
405
406 events
407}
408
409#[async_trait]
410impl Transform for ApmStats {
411 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
412 let mut health = context.take_health_handle();
413
414 let mut flush_ticker = interval(self.flush_interval);
415 flush_ticker.tick().await;
416
417 let mut final_flush = false;
418
419 health.mark_ready();
420 debug!("APM Stats transform started.");
421
422 loop {
423 select! {
424 _ = health.live() => continue,
425
426 _ = flush_ticker.tick() => {
427 let stats_payloads = self.concentrator.flush(now_nanos(), final_flush);
428 if !stats_payloads.is_empty() {
429 debug!(stats_payloads = stats_payloads.len(), "Flushing APM stats.");
430
431 let events = split_into_trace_stats(stats_payloads, MAX_STATS_GROUPS_PER_EVENT);
432 let dispatcher = context.dispatcher().buffered()
433 .error_context("Default output should be available.")?;
434
435 if let Err(e) = dispatcher.send_all(events.into_iter().map(Event::TraceStats)).await {
436 error!(error = %e, "Failed to dispatch events.");
437 }
438 }
439
440 if final_flush {
441 debug!("Final APM stats flush complete.");
442 break;
443 }
444 },
445
446 maybe_events = context.events().next(), if !final_flush => {
447 match maybe_events {
448 Some(events) => {
449 for event in events {
450 if let Event::Trace(trace) = event {
451 self.process_trace(&trace);
452 }
453 }
454 },
455 None => {
456 final_flush = true;
459 flush_ticker.reset_immediately();
460 debug!("APM Stats transform stopping, triggering final flush...");
461 }
462 }
463 },
464 }
465 }
466
467 debug!("APM Stats transform stopped.");
468 Ok(())
469 }
470}
471
472fn now_nanos() -> u64 {
474 SystemTime::now()
475 .duration_since(UNIX_EPOCH)
476 .unwrap_or_default()
477 .as_nanos() as u64
478}
479
480fn extract_process_tags(trace: &Trace) -> MetaString {
482 let root_span = trace
483 .spans()
484 .iter()
485 .find(|s| s.parent_id() == 0)
486 .or_else(|| trace.spans().first());
487 if let Some(span) = root_span {
488 if let Some(tags) = span
489 .attributes
490 .get(TAG_PROCESS_TAGS)
491 .and_then(AttributeValue::as_string)
492 .filter(|s| !s.is_empty())
493 {
494 return tags.clone();
495 }
496 }
497 if let Some(AttributeValue::String(tags)) = trace.attributes.get(TAG_PROCESS_TAGS) {
498 if !tags.is_empty() {
499 return tags.clone();
500 }
501 }
502 MetaString::empty()
503}
504
505#[cfg(test)]
506mod tests {
507 use proptest::prelude::*;
508 use saluki_common::collections::FastHashMap;
509 use saluki_core::data_model::event::trace::{AttributeValue, Span};
510 use saluki_core::data_model::event::trace_stats::ClientGroupedStats;
511 use saluki_core::data_model::event::trace_stats::ClientStatsBucket;
512
513 use super::aggregation::BUCKET_DURATION_NS;
514 use super::span_concentrator::METRIC_PARTIAL_VERSION;
515 use super::*;
516
517 fn align_ts(ts: u64, bsize: u64) -> u64 {
519 ts - ts % bsize
520 }
521
522 #[allow(clippy::too_many_arguments)]
524 fn test_span(
525 aligned_now: u64, span_id: u64, parent_id: u64, duration: u64, bucket_offset: u64, service: &str,
526 resource: &str, error: i32, meta: Option<FastHashMap<MetaString, MetaString>>,
527 metrics: Option<FastHashMap<MetaString, f64>>,
528 ) -> Span {
529 let bucket_start = aligned_now - bucket_offset * BUCKET_DURATION_NS;
530 let start = bucket_start - duration;
531
532 let mut attrs: FastHashMap<MetaString, AttributeValue> = FastHashMap::default();
533 if let Some(m) = meta {
534 attrs.extend(m.into_iter().map(|(k, v)| (k, AttributeValue::String(v))));
535 }
536 if let Some(m) = metrics {
537 attrs.extend(m.into_iter().map(|(k, v)| (k, AttributeValue::Float(v))));
538 }
539 Span::new(
540 service, "query", resource, "db", span_id, parent_id, start, duration, error,
541 )
542 .with_attributes(attrs)
543 }
544
545 fn make_test_span(service: &str, name: &str, resource: &str) -> Span {
547 let mut attrs = FastHashMap::default();
548 attrs.insert(MetaString::from("_dd.measured"), AttributeValue::Float(1.0));
549 Span::new(service, name, resource, "web", 1, 0, 1000000000, 100000000, 0).with_attributes(attrs)
550 }
551
552 fn make_top_level_span(
554 aligned_now: u64, span_id: u64, duration: u64, bucket_offset: u64, service: &str, resource: &str, error: i32,
555 meta: Option<FastHashMap<MetaString, MetaString>>,
556 ) -> Span {
557 let mut metrics = FastHashMap::default();
558 metrics.insert(MetaString::from("_top_level"), 1.0);
559 test_span(
560 aligned_now,
561 span_id,
562 0,
563 duration,
564 bucket_offset,
565 service,
566 resource,
567 error,
568 meta,
569 Some(metrics),
570 )
571 }
572
573 #[test]
574 fn test_process_trace_creates_stats() {
575 let now = now_nanos();
576
577 let concentrator = SpanConcentrator::new(true, true, &[], now);
578 let mut transform = ApmStats {
579 concentrator,
580 flush_interval: DEFAULT_FLUSH_INTERVAL,
581 agent_env: MetaString::from("none"),
582 agent_hostname: MetaString::default(),
583 workload_provider: None,
584 };
585
586 let span = make_test_span("test-service", "test-operation", "test-resource");
587 let trace = Trace::new(vec![span]);
588
589 transform.process_trace(&trace);
590
591 let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
593 assert!(!stats.is_empty(), "Expected stats to be produced");
594 }
595
596 #[test]
597 fn test_weight_applied_to_stats() {
598 let now = now_nanos();
599
600 let concentrator = SpanConcentrator::new(true, true, &[], now);
601 let mut transform = ApmStats {
602 concentrator,
603 flush_interval: DEFAULT_FLUSH_INTERVAL,
604 agent_env: MetaString::from("none"),
605 agent_hostname: MetaString::default(),
606 workload_provider: None,
607 };
608
609 let mut attrs = FastHashMap::default();
611 attrs.insert(MetaString::from("_dd.measured"), AttributeValue::Float(1.0));
612 attrs.insert(MetaString::from("_sample_rate"), AttributeValue::Float(0.5));
613
614 let span = Span::new(
615 "test-service",
616 "test-op",
617 "test-resource",
618 "web",
619 1,
620 0,
621 now,
622 100000000,
623 0,
624 )
625 .with_attributes(attrs);
626
627 let trace = Trace::new(vec![span]);
628 transform.process_trace(&trace);
629
630 let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
631 assert!(!stats.is_empty());
632
633 let bucket = &stats[0].stats()[0];
635 let grouped = &bucket.stats()[0];
636 assert!(grouped.hits() >= 1, "Expected weighted hits");
638 }
639
640 #[test]
641 fn test_force_flush() {
642 let now = now_nanos();
643 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
644
645 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
646
647 let span = make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None);
649 let trace = Trace::new(vec![span]);
650
651 let payload_key = PayloadAggregationKey {
652 env: MetaString::from("test"),
653 hostname: MetaString::from("host"),
654 ..Default::default()
655 };
656 let infra_tags = InfraTags::default();
657
658 for span in trace.spans() {
659 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
660 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
661 }
662 }
663
664 let ts: u64 = 0;
666
667 let stats = concentrator.flush(ts, false);
669 assert!(stats.is_empty(), "Non-force flush should return empty");
670
671 let stats = concentrator.flush(ts, true);
673 assert!(!stats.is_empty(), "Force flush should return stats");
674 assert_eq!(stats[0].stats().len(), 1, "Should have 1 bucket");
675 }
676
677 #[test]
678 fn test_ignores_partial_spans() {
679 let now = now_nanos();
680 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
681
682 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
683
684 let mut metrics = FastHashMap::default();
686 metrics.insert(MetaString::from("_top_level"), 1.0);
687 metrics.insert(MetaString::from(METRIC_PARTIAL_VERSION), 830604.0);
688
689 let span = test_span(aligned_now, 1, 0, 50, 5, "A1", "resource1", 0, None, Some(metrics));
690 let trace = Trace::new(vec![span]);
691
692 let payload_key = PayloadAggregationKey {
693 env: MetaString::from("test"),
694 hostname: MetaString::from("tracer-hostname"),
695 ..Default::default()
696 };
697 let infra_tags = InfraTags::default();
698
699 for span in trace.spans() {
700 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
701 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
702 }
703 }
704
705 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
707 assert!(stats.is_empty(), "Partial spans should be ignored");
708 }
709
710 #[test]
711 fn test_concentrator_stats_totals() {
712 let now = now_nanos();
713 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
714
715 let oldest_ts = aligned_now - 2 * BUCKET_DURATION_NS;
717 let mut concentrator = SpanConcentrator::new(true, true, &[], oldest_ts);
718
719 let spans = vec![
721 make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
722 make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
723 make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
724 make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
725 make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
726 make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
727 ];
728
729 let payload_key = PayloadAggregationKey {
730 env: MetaString::from("none"),
731 ..Default::default()
732 };
733 let infra_tags = InfraTags::default();
734
735 for span in &spans {
736 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
737 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
738 }
739 }
740
741 let all_stats = concentrator.flush(now + BUCKET_DURATION_NS * 10, true);
743
744 let mut total_duration: u64 = 0;
745 let mut total_hits: u64 = 0;
746 let mut total_errors: u64 = 0;
747 let mut total_top_level_hits: u64 = 0;
748
749 for payload in &all_stats {
750 for bucket in payload.stats() {
751 for grouped in bucket.stats() {
752 total_duration += grouped.duration();
753 total_hits += grouped.hits();
754 total_errors += grouped.errors();
755 total_top_level_hits += grouped.top_level_hits();
756 }
757 }
758 }
759
760 assert_eq!(total_duration, 50 + 40 + 30 + 20 + 10 + 1, "Wrong total duration");
761 assert_eq!(total_hits, 6, "Wrong total hits");
762 assert_eq!(total_top_level_hits, 6, "Wrong total top level hits");
763 assert_eq!(total_errors, 0, "Wrong total errors");
764 }
765
766 #[test]
767 fn test_root_tag() {
768 let now = now_nanos();
769 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
770
771 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
772
773 let mut root_metrics = FastHashMap::default();
775 root_metrics.insert(MetaString::from("_top_level"), 1.0);
776 let root_span = test_span(
777 aligned_now,
778 1,
779 0,
780 40,
781 10,
782 "A1",
783 "resource1",
784 0,
785 None,
786 Some(root_metrics),
787 );
788
789 let mut top_level_metrics = FastHashMap::default();
791 top_level_metrics.insert(MetaString::from("_top_level"), 1.0);
792 let top_level_span = test_span(
793 aligned_now,
794 4,
795 1000,
796 10,
797 10,
798 "A1",
799 "resource1",
800 0,
801 None,
802 Some(top_level_metrics),
803 );
804
805 let mut client_meta = FastHashMap::default();
807 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
808 let client_span = test_span(aligned_now, 3, 2, 20, 10, "A1", "resource1", 0, Some(client_meta), None);
809
810 let spans = vec![root_span, top_level_span, client_span];
811
812 let payload_key = PayloadAggregationKey {
813 env: MetaString::from("none"),
814 ..Default::default()
815 };
816 let infra_tags = InfraTags::default();
817
818 for span in &spans {
819 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
820 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
821 }
822 }
823
824 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 20, true);
825 assert!(!stats.is_empty(), "Should have stats");
826
827 let mut total_grouped = 0;
829 let mut root_count = 0;
830 let mut non_root_count = 0;
831
832 for payload in &stats {
833 for bucket in payload.stats() {
834 for grouped in bucket.stats() {
835 total_grouped += 1;
836 match grouped.is_trace_root() {
837 Some(true) => root_count += 1,
838 Some(false) => non_root_count += 1,
839 None => {}
840 }
841 }
842 }
843 }
844
845 assert_eq!(total_grouped, 3, "Expected 3 grouped stats");
850 assert_eq!(root_count, 1, "Expected 1 root span");
851 assert_eq!(non_root_count, 2, "Expected 2 non-root spans");
852 }
853
854 #[test]
855 fn test_compute_stats_through_span_kind_check() {
856 let now = now_nanos();
857
858 {
860 let mut concentrator = SpanConcentrator::new(false, true, &[], now);
861
862 let mut attrs = FastHashMap::default();
863 attrs.insert(MetaString::from("_top_level"), AttributeValue::Float(1.0));
864 let span = Span::new("myservice", "query", "GET /users", "web", 1, 0, now, 500, 0).with_attributes(attrs);
865
866 let payload_key = PayloadAggregationKey {
867 env: MetaString::from("test"),
868 ..Default::default()
869 };
870 let infra_tags = InfraTags::default();
871
872 if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
873 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
874 }
875
876 let mut client_attrs = FastHashMap::default();
879 client_attrs.insert(
880 MetaString::from("span.kind"),
881 AttributeValue::String(MetaString::from("client")),
882 );
883 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 2, 1, now, 75, 0)
884 .with_attributes(client_attrs);
885
886 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
887 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
888 }
889
890 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
891
892 let mut count = 0;
893 for payload in &stats {
894 for bucket in payload.stats() {
895 count += bucket.stats().len();
896 }
897 }
898
899 assert_eq!(count, 1, "Expected 1 stat when span kind check disabled");
901 }
902
903 {
905 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
906
907 let mut attrs = FastHashMap::default();
908 attrs.insert(MetaString::from("_top_level"), AttributeValue::Float(1.0));
909 let span = Span::new("myservice", "query", "GET /users", "web", 1, 0, now, 500, 0).with_attributes(attrs);
910
911 let payload_key = PayloadAggregationKey {
912 env: MetaString::from("test"),
913 ..Default::default()
914 };
915 let infra_tags = InfraTags::default();
916
917 if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
918 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
919 }
920
921 let mut client_attrs = FastHashMap::default();
924 client_attrs.insert(
925 MetaString::from("span.kind"),
926 AttributeValue::String(MetaString::from("client")),
927 );
928 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 2, 1, now, 75, 0)
929 .with_attributes(client_attrs);
930
931 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
932 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
933 }
934
935 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
936
937 let mut count = 0;
938 for payload in &stats {
939 for bucket in payload.stats() {
940 count += bucket.stats().len();
941 }
942 }
943
944 assert_eq!(count, 2, "Expected 2 stats when span kind check enabled");
946 }
947 }
948
949 #[test]
950 fn test_peer_tags() {
951 let now = now_nanos();
952
953 {
955 let mut concentrator = SpanConcentrator::new(true, false, &[], now);
956
957 let mut attrs = FastHashMap::default();
958 attrs.insert(
959 MetaString::from("span.kind"),
960 AttributeValue::String(MetaString::from("client")),
961 );
962 attrs.insert(
963 MetaString::from("db.instance"),
964 AttributeValue::String(MetaString::from("i-1234")),
965 );
966 attrs.insert(
967 MetaString::from("db.system"),
968 AttributeValue::String(MetaString::from("postgres")),
969 );
970 attrs.insert(MetaString::from("_dd.measured"), AttributeValue::Float(1.0));
971 let client_span =
972 Span::new("myservice", "postgres.query", "SELECT ...", "db", 2, 1, now, 75, 0).with_attributes(attrs);
973
974 let payload_key = PayloadAggregationKey {
975 env: MetaString::from("test"),
976 ..Default::default()
977 };
978 let infra_tags = InfraTags::default();
979
980 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
981 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
982 }
983
984 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
985
986 for payload in &stats {
988 for bucket in payload.stats() {
989 for grouped in bucket.stats() {
990 assert!(
991 grouped.peer_tags().is_empty(),
992 "Peer tags should be empty when peer_tags_aggregation is false"
993 );
994 }
995 }
996 }
997 }
998
999 {
1001 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
1003
1004 let mut attrs = FastHashMap::default();
1005 attrs.insert(
1006 MetaString::from("span.kind"),
1007 AttributeValue::String(MetaString::from("client")),
1008 );
1009 attrs.insert(
1010 MetaString::from("db.instance"),
1011 AttributeValue::String(MetaString::from("i-1234")),
1012 );
1013 attrs.insert(
1014 MetaString::from("db.system"),
1015 AttributeValue::String(MetaString::from("postgres")),
1016 );
1017 attrs.insert(MetaString::from("_dd.measured"), AttributeValue::Float(1.0));
1018 let client_span =
1019 Span::new("myservice", "postgres.query", "SELECT ...", "db", 2, 1, now, 75, 0).with_attributes(attrs);
1020
1021 let payload_key = PayloadAggregationKey {
1022 env: MetaString::from("test"),
1023 ..Default::default()
1024 };
1025 let infra_tags = InfraTags::default();
1026
1027 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
1028 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
1029 }
1030
1031 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
1032
1033 let mut found_client_with_peer_tags = false;
1035 for payload in &stats {
1036 for bucket in payload.stats() {
1037 for grouped in bucket.stats() {
1038 if grouped.resource() == "SELECT ..." {
1039 assert!(!grouped.peer_tags().is_empty(), "Client span should have peer tags");
1040 let peer_tags: Vec<&str> = grouped.peer_tags().iter().map(|s| s.as_ref()).collect();
1042 assert!(
1043 peer_tags.iter().any(|t| t.starts_with("db.instance:")),
1044 "Should have db.instance peer tag"
1045 );
1046 assert!(
1047 peer_tags.iter().any(|t| t.starts_with("db.system:")),
1048 "Should have db.system peer tag"
1049 );
1050 found_client_with_peer_tags = true;
1051 }
1052 }
1053 }
1054 }
1055 assert!(
1056 found_client_with_peer_tags,
1057 "Should have found client span with peer tags"
1058 );
1059 }
1060 }
1061
1062 #[test]
1063 fn test_concentrator_oldest_ts() {
1064 let now = now_nanos();
1065 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
1066
1067 {
1069 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
1071
1072 let spans = vec![
1074 make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
1075 make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
1076 make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
1077 make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
1078 make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
1079 make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
1080 ];
1081
1082 let payload_key = PayloadAggregationKey {
1083 env: MetaString::from("none"),
1084 ..Default::default()
1085 };
1086 let infra_tags = InfraTags::default();
1087
1088 for span in &spans {
1089 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
1090 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
1091 }
1092 }
1093
1094 let mut flush_time = now;
1096 let buffer_len = 2; for _ in 0..buffer_len {
1099 let stats = concentrator.flush(flush_time, false);
1100 assert!(stats.is_empty(), "Should not flush before buffer fills");
1101 flush_time += BUCKET_DURATION_NS;
1102 }
1103
1104 let stats = concentrator.flush(flush_time, false);
1106 assert!(!stats.is_empty(), "Should flush after buffer fills");
1107
1108 let mut total_hits: u64 = 0;
1110 let mut total_duration: u64 = 0;
1111 for payload in &stats {
1112 for bucket in payload.stats() {
1113 for grouped in bucket.stats() {
1114 total_hits += grouped.hits();
1115 total_duration += grouped.duration();
1116 }
1117 }
1118 }
1119
1120 assert_eq!(total_hits, 6, "All 6 spans should be counted");
1121 assert_eq!(
1122 total_duration,
1123 50 + 40 + 30 + 20 + 10 + 1,
1124 "Total duration should match"
1125 );
1126 }
1127 }
1128
1129 #[test]
1130 fn test_compute_stats_for_span_kind() {
1131 use super::span_concentrator::compute_stats_for_span_kind;
1132
1133 assert!(compute_stats_for_span_kind("server"));
1135 assert!(compute_stats_for_span_kind("consumer"));
1136 assert!(compute_stats_for_span_kind("client"));
1137 assert!(compute_stats_for_span_kind("producer"));
1138
1139 assert!(compute_stats_for_span_kind("SERVER"));
1141 assert!(compute_stats_for_span_kind("CONSUMER"));
1142 assert!(compute_stats_for_span_kind("CLIENT"));
1143 assert!(compute_stats_for_span_kind("PRODUCER"));
1144
1145 assert!(compute_stats_for_span_kind("SErVER"));
1147 assert!(compute_stats_for_span_kind("COnSUMER"));
1148 assert!(compute_stats_for_span_kind("CLiENT"));
1149 assert!(compute_stats_for_span_kind("PRoDUCER"));
1150
1151 assert!(!compute_stats_for_span_kind("internal"));
1153 assert!(!compute_stats_for_span_kind("INTERNAL"));
1154 assert!(!compute_stats_for_span_kind("INtERNAL"));
1155 assert!(!compute_stats_for_span_kind(""));
1156 }
1157
1158 #[test]
1159 fn test_extract_process_tags() {
1160 {
1162 let span = Span::default();
1163 let trace = Trace::new(vec![span]);
1164 let process_tags = extract_process_tags(&trace);
1165 assert!(process_tags.is_empty(), "Should be empty when no _dd.tags.process");
1166 }
1167
1168 {
1170 let mut attrs = FastHashMap::default();
1171 attrs.insert(
1172 MetaString::from(TAG_PROCESS_TAGS),
1173 AttributeValue::String(MetaString::from("a:1,b:2,c:3")),
1174 );
1175 let span = Span::default().with_attributes(attrs);
1176 let trace = Trace::new(vec![span]);
1177 let process_tags = extract_process_tags(&trace);
1178 assert_eq!(process_tags, "a:1,b:2,c:3");
1179 }
1180
1181 {
1183 let mut attrs = FastHashMap::default();
1184 attrs.insert(
1185 MetaString::from(TAG_PROCESS_TAGS),
1186 AttributeValue::String(MetaString::from("")),
1187 );
1188 let span = Span::default().with_attributes(attrs);
1189 let trace = Trace::new(vec![span]);
1190 let process_tags = extract_process_tags(&trace);
1191 assert!(
1192 process_tags.is_empty(),
1193 "Should be empty when _dd.tags.process is empty string"
1194 );
1195 }
1196
1197 {
1199 let trace = Trace::new(vec![]);
1200 let process_tags = extract_process_tags(&trace);
1201 assert!(process_tags.is_empty(), "Should be empty when trace has no spans");
1202 }
1203 }
1204
1205 #[test]
1206 fn test_process_tags_hash_computation() {
1207 use super::aggregation::process_tags_hash;
1208
1209 assert_eq!(process_tags_hash(""), 0);
1211
1212 let hash1 = process_tags_hash("a:1,b:2,c:3");
1214 let hash2 = process_tags_hash("a:1,b:2,c:3");
1215 assert_eq!(hash1, hash2);
1216
1217 let hash3 = process_tags_hash("a:1,b:2");
1219 assert_ne!(hash1, hash3);
1220 }
1221
1222 fn make_grouped_stats(service: &str, resource: &str) -> ClientGroupedStats {
1224 ClientGroupedStats::new(service, "operation", resource)
1225 .with_hits(1)
1226 .with_duration(100)
1227 }
1228
1229 fn make_bucket_with_stats(n: usize) -> ClientStatsBucket {
1231 let stats: Vec<ClientGroupedStats> = (0..n)
1232 .map(|i| make_grouped_stats("service", &format!("resource-{}", i)))
1233 .collect();
1234 ClientStatsBucket::new(1000, 10_000_000_000, stats)
1235 }
1236
1237 fn make_payload_with_buckets(hostname: &str, buckets: Vec<ClientStatsBucket>) -> ClientStatsPayload {
1239 ClientStatsPayload::new(hostname, "test-env", "1.0.0")
1240 .with_stats(buckets)
1241 .with_container_id("container-123")
1242 .with_lang("rust")
1243 }
1244
1245 fn count_grouped_stats(trace_stats: &TraceStats) -> usize {
1247 trace_stats
1248 .stats()
1249 .iter()
1250 .flat_map(|p| p.stats())
1251 .map(|b| b.stats().len())
1252 .sum()
1253 }
1254
1255 #[test]
1256 fn test_split_into_trace_stats_empty_input() {
1257 let result = split_into_trace_stats(vec![], 100);
1258 assert!(result.is_empty());
1259 }
1260
1261 #[test]
1262 fn test_split_into_trace_stats_no_split_needed() {
1263 let bucket = make_bucket_with_stats(50);
1265 let payload = make_payload_with_buckets("host1", vec![bucket]);
1266
1267 let result = split_into_trace_stats(vec![payload], 100);
1268
1269 assert_eq!(result.len(), 1);
1270 assert_eq!(count_grouped_stats(&result[0]), 50);
1271 }
1272
1273 #[test]
1274 fn test_split_into_trace_stats_exact_threshold() {
1275 let bucket = make_bucket_with_stats(100);
1277 let payload = make_payload_with_buckets("host1", vec![bucket]);
1278
1279 let result = split_into_trace_stats(vec![payload], 100);
1280
1281 assert_eq!(result.len(), 1);
1282 assert_eq!(count_grouped_stats(&result[0]), 100);
1283 }
1284
1285 #[test]
1286 fn test_split_into_trace_stats_splits_single_bucket() {
1287 let bucket = make_bucket_with_stats(250);
1289 let payload = make_payload_with_buckets("host1", vec![bucket]);
1290
1291 let result = split_into_trace_stats(vec![payload], 100);
1292
1293 assert_eq!(result.len(), 3);
1294 assert_eq!(count_grouped_stats(&result[0]), 100);
1295 assert_eq!(count_grouped_stats(&result[1]), 100);
1296 assert_eq!(count_grouped_stats(&result[2]), 50);
1297
1298 let total: usize = result.iter().map(count_grouped_stats).sum();
1300 assert_eq!(total, 250);
1301 }
1302
1303 #[test]
1304 fn test_split_into_trace_stats_splits_across_payloads() {
1305 let payload1 = make_payload_with_buckets("host1", vec![make_bucket_with_stats(60)]);
1307 let payload2 = make_payload_with_buckets("host2", vec![make_bucket_with_stats(60)]);
1308
1309 let result = split_into_trace_stats(vec![payload1, payload2], 100);
1310
1311 assert_eq!(result.len(), 2);
1312 assert_eq!(count_grouped_stats(&result[0]), 100);
1314 assert_eq!(count_grouped_stats(&result[1]), 20);
1316 }
1317
1318 #[test]
1319 fn test_split_into_trace_stats_splits_single_payload_multiple_buckets() {
1320 let bucket1 = make_bucket_with_stats(70);
1322 let bucket2 = make_bucket_with_stats(80);
1323 let payload = make_payload_with_buckets("host1", vec![bucket1, bucket2]);
1324
1325 let result = split_into_trace_stats(vec![payload], 100);
1326
1327 assert_eq!(result.len(), 2);
1328 let total: usize = result.iter().map(count_grouped_stats).sum();
1329 assert_eq!(total, 150);
1330 }
1331
1332 #[test]
1333 fn test_split_into_trace_stats_preserves_metadata() {
1334 let bucket = make_bucket_with_stats(250);
1335 let payload = ClientStatsPayload::new("test-host", "prod", "2.0.0")
1336 .with_stats(vec![bucket])
1337 .with_container_id("container-abc")
1338 .with_lang("go")
1339 .with_git_commit_sha("abc123")
1340 .with_image_tag("v1.2.3")
1341 .with_process_tags_hash(12345)
1342 .with_process_tags("tag1,tag2");
1343
1344 let result = split_into_trace_stats(vec![payload], 100);
1345
1346 for trace_stats in &result {
1348 for p in trace_stats.stats() {
1349 assert_eq!(p.hostname(), "test-host");
1350 assert_eq!(p.env(), "prod");
1351 assert_eq!(p.version(), "2.0.0");
1352 assert_eq!(p.container_id(), "container-abc");
1353 assert_eq!(p.lang(), "go");
1354 assert_eq!(p.git_commit_sha(), "abc123");
1355 assert_eq!(p.image_tag(), "v1.2.3");
1356 assert_eq!(p.process_tags_hash(), 12345);
1357 assert_eq!(p.process_tags(), "tag1,tag2");
1358 }
1359 }
1360 }
1361
1362 #[test]
1363 fn test_split_into_trace_stats_preserves_bucket_metadata() {
1364 let stats: Vec<ClientGroupedStats> = (0..150)
1366 .map(|i| make_grouped_stats("svc", &format!("res-{}", i)))
1367 .collect();
1368 let bucket = ClientStatsBucket::new(999_000_000, 10_000_000_000, stats).with_agent_time_shift(42);
1369 let payload = make_payload_with_buckets("host1", vec![bucket]);
1370
1371 let result = split_into_trace_stats(vec![payload], 100);
1372
1373 for trace_stats in &result {
1375 for p in trace_stats.stats() {
1376 for b in p.stats() {
1377 assert_eq!(b.start(), 999_000_000);
1378 assert_eq!(b.duration(), 10_000_000_000);
1379 assert_eq!(b.agent_time_shift(), 42);
1380 }
1381 }
1382 }
1383 }
1384
1385 #[test]
1386 fn test_split_into_trace_stats_handles_empty_bucket() {
1387 let empty_bucket = ClientStatsBucket::new(1000, 10_000_000_000, vec![]);
1388 let payload = make_payload_with_buckets("host1", vec![empty_bucket]);
1389
1390 let result = split_into_trace_stats(vec![payload], 100);
1391
1392 assert_eq!(result.len(), 1);
1393 assert_eq!(count_grouped_stats(&result[0]), 0);
1394 }
1395
1396 #[test]
1397 fn test_split_into_trace_stats_large_split() {
1398 let bucket = make_bucket_with_stats(10_000);
1400 let payload = make_payload_with_buckets("host1", vec![bucket]);
1401
1402 let result = split_into_trace_stats(vec![payload], 4000);
1403
1404 assert_eq!(result.len(), 3);
1405 assert_eq!(count_grouped_stats(&result[0]), 4000);
1406 assert_eq!(count_grouped_stats(&result[1]), 4000);
1407 assert_eq!(count_grouped_stats(&result[2]), 2000);
1408 }
1409
1410 fn arb_grouped_stats() -> impl Strategy<Value = ClientGroupedStats> {
1414 (0..100u64, 0..1000u64).prop_map(|(hits, duration)| {
1415 ClientGroupedStats::new("service", "operation", "resource")
1416 .with_hits(hits)
1417 .with_duration(duration)
1418 })
1419 }
1420
1421 fn arb_bucket(max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsBucket> {
1423 proptest::collection::vec(arb_grouped_stats(), 0..=max_stats_per_bucket)
1424 .prop_map(|stats| ClientStatsBucket::new(1000, 10_000_000_000, stats))
1425 }
1426
1427 fn arb_payload(max_buckets: usize, max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsPayload> {
1429 proptest::collection::vec(arb_bucket(max_stats_per_bucket), 1..=max_buckets)
1430 .prop_map(|buckets| ClientStatsPayload::new("host", "env", "1.0.0").with_stats(buckets))
1431 }
1432
1433 fn arb_split_inputs() -> impl Strategy<Value = (Vec<ClientStatsPayload>, usize)> {
1441 let payloads_strategy = proptest::collection::vec(arb_payload(5, 500), 1..=10);
1442 let max_entries_strategy = 1..=1000usize;
1443
1444 (payloads_strategy, max_entries_strategy)
1445 }
1446
1447 #[test]
1452 fn test_version_span_beats_resource_for_otlp() {
1453 let now = now_nanos();
1454 let concentrator = SpanConcentrator::new(true, true, &[], now);
1455 let transform = ApmStats {
1456 concentrator,
1457 flush_interval: DEFAULT_FLUSH_INTERVAL,
1458 agent_env: MetaString::default(),
1459 agent_hostname: MetaString::default(),
1460 workload_provider: None,
1461 };
1462
1463 let mut attrs = FastHashMap::default();
1466 attrs.insert(
1467 MetaString::from("version"),
1468 AttributeValue::String(MetaString::from("span-v2")),
1469 );
1470 let root_span = Span::new("svc", "op", "res", "web", 1, 0, now, 1_000_000, 0).with_attributes(attrs);
1471
1472 let mut trace = Trace::new(vec![root_span]);
1473 trace.payload.app_version = MetaString::from("resource-v1");
1475
1476 let key = transform.build_payload_key(&trace, "");
1477
1478 assert_eq!(
1481 key.version.as_ref(),
1482 "span-v2",
1483 "span-level version must take precedence over resource-level payload.app_version for OTLP traces"
1484 );
1485 }
1486
1487 #[test_strategy::proptest]
1488 #[cfg_attr(miri, ignore)]
1489 fn property_test_split_respects_max_entries(
1490 #[strategy(arb_split_inputs())] inputs: (Vec<ClientStatsPayload>, usize),
1491 ) {
1492 let (payloads, max_entries_per_event) = inputs;
1493
1494 let input_total: usize = payloads.iter().flat_map(|p| p.stats()).map(|b| b.stats().len()).sum();
1495
1496 let result = split_into_trace_stats(payloads, max_entries_per_event);
1497
1498 for trace_stats in &result {
1500 let count = count_grouped_stats(trace_stats);
1501 prop_assert!(
1502 count <= max_entries_per_event,
1503 "TraceStats has {} grouped stats, exceeds max of {}",
1504 count,
1505 max_entries_per_event
1506 );
1507 }
1508
1509 let output_total: usize = result.iter().map(count_grouped_stats).sum();
1511 prop_assert_eq!(input_total, output_total, "Total stats count should be preserved");
1512 }
1513}