Skip to main content

saluki_components/transforms/apm_stats/
mod.rs

1//! APM Stats transform.
2//!
3//! Aggregates traces into time-bucketed statistics, producing `TraceStats` events.
4
5use std::{
6    sync::Arc,
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use async_trait::async_trait;
11use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use saluki_config::GenericConfiguration;
13use saluki_context::{origin::OriginTagCardinality, tags::TagSet};
14use saluki_core::{
15    components::{transforms::*, ComponentContext},
16    data_model::event::{
17        trace::{AttributeValue, Trace},
18        trace_stats::{ClientStatsPayload, TraceStats},
19        Event, EventType,
20    },
21    topology::OutputDefinition,
22};
23use saluki_env::{
24    host::providers::BoxedHostProvider, workload::EntityId, EnvironmentProvider, HostProvider, WorkloadProvider,
25};
26use saluki_error::{ErrorContext as _, GenericError};
27use stringtheory::MetaString;
28use tokio::{select, time::interval};
29use tracing::{debug, error};
30
31use crate::common::{datadog::apm::ApmConfig, otlp::util::extract_container_tags_from_attributes_map};
32
33mod aggregation;
34pub(crate) use self::aggregation::{process_tags_hash, PayloadAggregationKey};
35
36mod span_concentrator;
37pub(crate) use self::span_concentrator::{InfraTags, SpanConcentrator};
38
39mod statsraw;
40
41mod weight;
42use self::weight::weight;
43
44/// Default flush interval for the APM stats transform.
45const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
46
47/// Tag key for process tags in span meta.
48const TAG_PROCESS_TAGS: &str = "_dd.tags.process";
49
50/// Maximum number of `ClientGroupedStats` entries per `TraceStats` event.
51const MAX_STATS_GROUPS_PER_EVENT: usize = 4000;
52
53/// APM Stats transform configuration.
54///
55/// Aggregates incoming `Trace` events into time-bucketed statistics, emitting
56/// `TraceStats` events.
57pub struct ApmStatsTransformConfiguration {
58    apm_config: ApmConfig,
59    default_hostname: Option<String>,
60    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
61}
62
63impl ApmStatsTransformConfiguration {
64    /// Creates a new `ApmStatsTransformConfiguration` from the given configuration.
65    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
66        let apm_config = ApmConfig::from_configuration(config)?;
67        Ok(Self {
68            apm_config,
69            default_hostname: None,
70            workload_provider: None,
71        })
72    }
73
74    /// Sets the default hostname using the environment provider.
75    pub async fn with_environment_provider<E>(mut self, env_provider: E) -> Result<Self, GenericError>
76    where
77        E: EnvironmentProvider<Host = BoxedHostProvider>,
78    {
79        let hostname = env_provider.host().get_hostname().await?;
80        self.default_hostname = Some(hostname);
81        Ok(self)
82    }
83
84    /// Sets the workload provider.
85    ///
86    /// Defaults to unset.
87    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
88    where
89        W: WorkloadProvider + Send + Sync + 'static,
90    {
91        self.workload_provider = Some(Arc::new(workload_provider));
92        self
93    }
94}
95
96#[async_trait]
97impl TransformBuilder for ApmStatsTransformConfiguration {
98    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
99        let mut apm_config = self.apm_config.clone();
100
101        if let Some(hostname) = &self.default_hostname {
102            apm_config.set_hostname_if_empty(hostname.as_str());
103        }
104
105        let concentrator = SpanConcentrator::new(
106            apm_config.compute_stats_by_span_kind(),
107            apm_config.peer_tags_aggregation(),
108            apm_config.peer_tags(),
109            now_nanos(),
110        );
111
112        Ok(Box::new(ApmStats {
113            concentrator,
114            flush_interval: DEFAULT_FLUSH_INTERVAL,
115            agent_env: apm_config.default_env().clone(),
116            agent_hostname: apm_config.hostname().clone(),
117            workload_provider: self.workload_provider.clone(),
118        }))
119    }
120
121    fn input_event_type(&self) -> EventType {
122        EventType::Trace
123    }
124
125    fn outputs(&self) -> &[OutputDefinition<EventType>] {
126        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::TraceStats)];
127        OUTPUTS
128    }
129}
130
131impl MemoryBounds for ApmStatsTransformConfiguration {
132    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
133        builder.minimum().with_single_value::<ApmStats>("component struct");
134        // TODO: Think about everything we need to account for here.
135    }
136}
137
138struct ApmStats {
139    concentrator: SpanConcentrator,
140    flush_interval: Duration,
141    agent_env: MetaString,
142    agent_hostname: MetaString,
143    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
144}
145
146impl ApmStats {
147    fn process_trace(&mut self, trace: &Trace) {
148        let root_span = trace
149            .spans()
150            .iter()
151            .find(|s| s.parent_id() == 0)
152            .or_else(|| trace.spans().first());
153
154        let trace_weight = root_span.map(weight).unwrap_or(1.0);
155
156        let process_tags = extract_process_tags(trace);
157
158        let payload_key = self.build_payload_key(trace, &process_tags);
159        let infra_tags = self.build_infra_tags(trace, &process_tags);
160
161        let origin = trace
162            .spans()
163            .first()
164            .and_then(|s| s.attributes.get("_dd.origin").and_then(AttributeValue::as_string))
165            .map(|s| s.as_ref())
166            .unwrap_or("");
167
168        for span in trace.spans() {
169            if let Some(stat_span) = self.concentrator.new_stat_span_from_span(span) {
170                self.concentrator
171                    .add_span(&stat_span, trace_weight, &payload_key, &infra_tags, origin);
172            }
173        }
174    }
175
176    fn build_infra_tags(&self, trace: &Trace, process_tags: &str) -> InfraTags {
177        let container_id = trace.payload.container_id.clone();
178        let mut container_tags = if container_id.is_empty() {
179            TagSet::default()
180        } else {
181            let mut tags = TagSet::default();
182            extract_container_tags_from_attributes_map(&trace.attributes, &mut tags);
183            tags
184        };
185
186        if !container_id.is_empty() {
187            if let Some(workload_provider) = &self.workload_provider {
188                let entity_id = EntityId::Container(container_id.clone());
189                if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
190                    container_tags.merge_shared(&tags);
191                }
192            }
193        }
194
195        InfraTags::new(container_id, container_tags, process_tags)
196    }
197
198    fn build_payload_key(&self, trace: &Trace, process_tags: &str) -> PayloadAggregationKey {
199        let root_span = trace
200            .spans()
201            .iter()
202            .find(|s| s.parent_id() == 0)
203            .or_else(|| trace.spans().first());
204
205        // env fallback order mirrors get_trace_env (Go agent pkg/trace/traceutil/trace.go:GetEnv):
206        // root span attrs → all span attrs → trace.payload.env (ADP extension) → agent_env.
207        let env = root_span
208            .and_then(|s| {
209                s.attributes
210                    .get("env")
211                    .and_then(AttributeValue::as_string)
212                    .filter(|s| !s.is_empty())
213            })
214            .cloned()
215            .or_else(|| {
216                trace.spans().iter().find_map(|s| {
217                    s.attributes
218                        .get("env")
219                        .and_then(AttributeValue::as_string)
220                        .filter(|s| !s.is_empty())
221                        .cloned()
222                })
223            })
224            .unwrap_or_else(|| {
225                if !trace.payload.env.is_empty() {
226                    trace.payload.env.clone()
227                } else {
228                    self.agent_env.clone()
229                }
230            });
231
232        let hostname = root_span
233            .and_then(|s| {
234                s.attributes
235                    .get("_dd.hostname")
236                    .and_then(AttributeValue::as_string)
237                    .filter(|s| !s.is_empty())
238            })
239            .cloned()
240            .unwrap_or_else(|| {
241                if !trace.payload.hostname.is_empty() {
242                    trace.payload.hostname.clone()
243                } else {
244                    self.agent_hostname.clone()
245                }
246            });
247
248        // Version resolution mirrors Go agent pkg/trace/version/version.go:GetAppVersionFromTrace:
249        // root span meta["version"] (set by otel_span_to_dd_span with span-over-resource precedence)
250        // takes priority, falling back to the resource-level payload.app_version.
251        let version = root_span
252            .and_then(|s| {
253                s.attributes
254                    .get("version")
255                    .and_then(AttributeValue::as_string)
256                    .filter(|s| !s.is_empty())
257            })
258            .cloned()
259            .unwrap_or_else(|| trace.payload.app_version.clone());
260
261        let container_id = if !trace.payload.container_id.is_empty() {
262            trace.payload.container_id.clone()
263        } else {
264            root_span
265                .and_then(|s| s.attributes.get("_dd.container_id").and_then(AttributeValue::as_string))
266                .cloned()
267                .unwrap_or_default()
268        };
269
270        let git_commit_sha = root_span
271            .and_then(|s| {
272                s.attributes
273                    .get("_dd.git.commit.sha")
274                    .and_then(AttributeValue::as_string)
275                    .filter(|s| !s.is_empty())
276            })
277            .cloned()
278            .unwrap_or_default();
279
280        let image_tag = root_span
281            .and_then(|s| {
282                s.attributes
283                    .get("_dd.image_tag")
284                    .and_then(AttributeValue::as_string)
285                    .filter(|s| !s.is_empty())
286            })
287            .cloned()
288            .unwrap_or_default();
289
290        let lang = if !trace.payload.language_name.is_empty() {
291            trace.payload.language_name.clone()
292        } else {
293            root_span
294                .and_then(|s| s.attributes.get("language").and_then(AttributeValue::as_string))
295                .cloned()
296                .unwrap_or_default()
297        };
298
299        PayloadAggregationKey {
300            env,
301            hostname,
302            version,
303            container_id,
304            git_commit_sha,
305            image_tag,
306            lang,
307            process_tags_hash: process_tags_hash(process_tags),
308        }
309    }
310}
311
312/// Splits stats payloads into multiple `TraceStats` events, each containing at most `max_entries_per_event` grouped
313/// entries.
314///
315/// This function won't attempt to collapse/pack payloads to optimize the number of events generated: there will always
316/// be at least as many output client payloads as there are input client payloads.
317fn split_into_trace_stats(client_payloads: Vec<ClientStatsPayload>, max_entries_per_event: usize) -> Vec<TraceStats> {
318    if client_payloads.is_empty() {
319        return Vec::new();
320    }
321
322    // If the total number of grouped stats entries is below the specified threshold, then we don't do any splitting or
323    // collapsing or anything.
324    let total_grouped_entries = client_payloads
325        .iter()
326        .map(|p| p.stats().iter().map(|b| b.stats().len()).sum::<usize>())
327        .sum::<usize>();
328    if total_grouped_entries <= max_entries_per_event {
329        return vec![TraceStats::new(client_payloads)];
330    }
331
332    let mut events = Vec::new();
333    let mut current_client_payloads = Vec::new();
334    let mut current_event_len = 0;
335
336    for mut client_payload in client_payloads {
337        // If the current payload can fit entirely in the current event, then collect it as-is.
338        let client_payload_len = client_payload.stats().iter().map(|b| b.stats().len()).sum::<usize>();
339        if current_event_len + client_payload_len <= max_entries_per_event {
340            current_client_payloads.push(client_payload);
341            current_event_len += client_payload_len;
342            continue;
343        }
344
345        // Consume all the stats buckets from the current client payload, and set ourselves up to go through each
346        // bucket, splitting them as necessary.
347        //
348        // We basically iterate until we find a bucket where adding its entries would cause us to exceed our threshold,
349        // and then we split that bucket, creating a new client payload in the process, and keep repeating that until
350        // we've exhausted all buckets for our starting client payload.
351        let mut current_client_stats_buckets = Vec::new();
352        for mut client_stats_bucket in client_payload.take_stats() {
353            let bucket_len = client_stats_bucket.stats().len();
354            // If this bucket can fit in the current event, then collect it as-is.
355            if current_event_len + bucket_len <= max_entries_per_event {
356                current_client_stats_buckets.push(client_stats_bucket);
357                current_event_len += bucket_len;
358                continue;
359            }
360
361            // We have to split this bucket. We take the grouped entries it has, and we subdivide those into a new
362            // client bucket, or buckets in order to ensure we don't exceed our threshold.
363            let mut bucket_entries = client_stats_bucket.take_stats();
364            while current_event_len + bucket_entries.len() > max_entries_per_event {
365                // We calculate the split point this way because the returned vector from `split_off` is referenced
366                // against the end of the vector, so if we have 100 items, and we only want 20, we need to split at
367                // index 80 (100 - 20 == 80).
368                let split_amount = max_entries_per_event - current_event_len;
369                let split_point = bucket_entries.len() - split_amount;
370                let split_entries = bucket_entries.split_off(split_point);
371
372                // Create a new "split" bucket based on the current bucket (`client_stats_bucket`) but containing only
373                // the split-off entries. This will feed into the current client payload (`client_payload`) which we'll
374                // finalize and add to the current event before starting a new event.
375                let split_bucket = client_stats_bucket.clone().with_stats(split_entries);
376                current_client_stats_buckets.push(split_bucket);
377
378                let split_client_payload = client_payload
379                    .clone()
380                    .with_stats(std::mem::take(&mut current_client_stats_buckets));
381                current_client_payloads.push(split_client_payload);
382
383                events.push(TraceStats::new(std::mem::take(&mut current_client_payloads)));
384                current_event_len = 0;
385            }
386
387            // If we have any leftover entries from the bucket after splitting it up, put them back in the current bucket
388            // and add that bucket to the current client payload.
389            if !bucket_entries.is_empty() {
390                current_event_len += bucket_entries.len();
391                current_client_stats_buckets.push(client_stats_bucket.with_stats(bucket_entries));
392            }
393        }
394
395        // If we have buckets for this client payload (whether we split or not), add them back to the current client payload.
396        if !current_client_stats_buckets.is_empty() {
397            current_client_payloads.push(client_payload.with_stats(current_client_stats_buckets));
398        }
399    }
400
401    // Stick the remaining entries into a final `TraceStats` event, if any.
402    if !current_client_payloads.is_empty() {
403        events.push(TraceStats::new(current_client_payloads));
404    }
405
406    events
407}
408
409#[async_trait]
410impl Transform for ApmStats {
411    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
412        let mut health = context.take_health_handle();
413
414        let mut flush_ticker = interval(self.flush_interval);
415        flush_ticker.tick().await;
416
417        let mut final_flush = false;
418
419        health.mark_ready();
420        debug!("APM Stats transform started.");
421
422        loop {
423            select! {
424                _ = health.live() => continue,
425
426                _ = flush_ticker.tick() => {
427                    let stats_payloads = self.concentrator.flush(now_nanos(), final_flush);
428                    if !stats_payloads.is_empty() {
429                        debug!(stats_payloads = stats_payloads.len(), "Flushing APM stats.");
430
431                        let events = split_into_trace_stats(stats_payloads, MAX_STATS_GROUPS_PER_EVENT);
432                        let dispatcher = context.dispatcher().buffered()
433                            .error_context("Default output should be available.")?;
434
435                        if let Err(e) = dispatcher.send_all(events.into_iter().map(Event::TraceStats)).await {
436                            error!(error = %e, "Failed to dispatch events.");
437                        }
438                    }
439
440                    if final_flush {
441                        debug!("Final APM stats flush complete.");
442                        break;
443                    }
444                },
445
446                maybe_events = context.events().next(), if !final_flush => {
447                    match maybe_events {
448                        Some(events) => {
449                            for event in events {
450                                if let Event::Trace(trace) = event {
451                                    self.process_trace(&trace);
452                                }
453                            }
454                        },
455                        None => {
456                            // We've reached the end of our input stream, so mark ourselves for a final flush and reset the
457                            // interval so it ticks immediately on the next loop iteration.
458                            final_flush = true;
459                            flush_ticker.reset_immediately();
460                            debug!("APM Stats transform stopping, triggering final flush...");
461                        }
462                    }
463                },
464            }
465        }
466
467        debug!("APM Stats transform stopped.");
468        Ok(())
469    }
470}
471
472/// Returns the current time as nanoseconds since Unix epoch.
473fn now_nanos() -> u64 {
474    SystemTime::now()
475        .duration_since(UNIX_EPOCH)
476        .unwrap_or_default()
477        .as_nanos() as u64
478}
479
480/// Extracts process tags from trace, checking both span and trace attributes.
481fn extract_process_tags(trace: &Trace) -> MetaString {
482    let root_span = trace
483        .spans()
484        .iter()
485        .find(|s| s.parent_id() == 0)
486        .or_else(|| trace.spans().first());
487    if let Some(span) = root_span {
488        if let Some(tags) = span
489            .attributes
490            .get(TAG_PROCESS_TAGS)
491            .and_then(AttributeValue::as_string)
492            .filter(|s| !s.is_empty())
493        {
494            return tags.clone();
495        }
496    }
497    if let Some(AttributeValue::String(tags)) = trace.attributes.get(TAG_PROCESS_TAGS) {
498        if !tags.is_empty() {
499            return tags.clone();
500        }
501    }
502    MetaString::empty()
503}
504
505#[cfg(test)]
506mod tests {
507    use proptest::prelude::*;
508    use saluki_common::collections::FastHashMap;
509    use saluki_core::data_model::event::trace::{AttributeValue, Span};
510    use saluki_core::data_model::event::trace_stats::ClientGroupedStats;
511    use saluki_core::data_model::event::trace_stats::ClientStatsBucket;
512
513    use super::aggregation::BUCKET_DURATION_NS;
514    use super::span_concentrator::METRIC_PARTIAL_VERSION;
515    use super::*;
516
517    /// Helper to align timestamp to bucket boundary
518    fn align_ts(ts: u64, bsize: u64) -> u64 {
519        ts - ts % bsize
520    }
521
522    /// Creates a test span with the given parameters.
523    #[allow(clippy::too_many_arguments)]
524    fn test_span(
525        aligned_now: u64, span_id: u64, parent_id: u64, duration: u64, bucket_offset: u64, service: &str,
526        resource: &str, error: i32, meta: Option<FastHashMap<MetaString, MetaString>>,
527        metrics: Option<FastHashMap<MetaString, f64>>,
528    ) -> Span {
529        let bucket_start = aligned_now - bucket_offset * BUCKET_DURATION_NS;
530        let start = bucket_start - duration;
531
532        let mut attrs: FastHashMap<MetaString, AttributeValue> = FastHashMap::default();
533        if let Some(m) = meta {
534            attrs.extend(m.into_iter().map(|(k, v)| (k, AttributeValue::String(v))));
535        }
536        if let Some(m) = metrics {
537            attrs.extend(m.into_iter().map(|(k, v)| (k, AttributeValue::Float(v))));
538        }
539        Span::new(
540            service, "query", resource, "db", span_id, parent_id, start, duration, error,
541        )
542        .with_attributes(attrs)
543    }
544
545    /// Creates a simple measured span for basic tests
546    fn make_test_span(service: &str, name: &str, resource: &str) -> Span {
547        let mut attrs = FastHashMap::default();
548        attrs.insert(MetaString::from("_dd.measured"), AttributeValue::Float(1.0));
549        Span::new(service, name, resource, "web", 1, 0, 1000000000, 100000000, 0).with_attributes(attrs)
550    }
551
552    /// Creates a top-level span (`parent_id` = 0, has `_top_level` metric)
553    fn make_top_level_span(
554        aligned_now: u64, span_id: u64, duration: u64, bucket_offset: u64, service: &str, resource: &str, error: i32,
555        meta: Option<FastHashMap<MetaString, MetaString>>,
556    ) -> Span {
557        let mut metrics = FastHashMap::default();
558        metrics.insert(MetaString::from("_top_level"), 1.0);
559        test_span(
560            aligned_now,
561            span_id,
562            0,
563            duration,
564            bucket_offset,
565            service,
566            resource,
567            error,
568            meta,
569            Some(metrics),
570        )
571    }
572
573    #[test]
574    fn test_process_trace_creates_stats() {
575        let now = now_nanos();
576
577        let concentrator = SpanConcentrator::new(true, true, &[], now);
578        let mut transform = ApmStats {
579            concentrator,
580            flush_interval: DEFAULT_FLUSH_INTERVAL,
581            agent_env: MetaString::from("none"),
582            agent_hostname: MetaString::default(),
583            workload_provider: None,
584        };
585
586        let span = make_test_span("test-service", "test-operation", "test-resource");
587        let trace = Trace::new(vec![span]);
588
589        transform.process_trace(&trace);
590
591        // Flush and verify we got stats
592        let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
593        assert!(!stats.is_empty(), "Expected stats to be produced");
594    }
595
596    #[test]
597    fn test_weight_applied_to_stats() {
598        let now = now_nanos();
599
600        let concentrator = SpanConcentrator::new(true, true, &[], now);
601        let mut transform = ApmStats {
602            concentrator,
603            flush_interval: DEFAULT_FLUSH_INTERVAL,
604            agent_env: MetaString::from("none"),
605            agent_hostname: MetaString::default(),
606            workload_provider: None,
607        };
608
609        // Create a span with 0.5 sample rate (weight = 2.0)
610        let mut attrs = FastHashMap::default();
611        attrs.insert(MetaString::from("_dd.measured"), AttributeValue::Float(1.0));
612        attrs.insert(MetaString::from("_sample_rate"), AttributeValue::Float(0.5));
613
614        let span = Span::new(
615            "test-service",
616            "test-op",
617            "test-resource",
618            "web",
619            1,
620            0,
621            now,
622            100000000,
623            0,
624        )
625        .with_attributes(attrs);
626
627        let trace = Trace::new(vec![span]);
628        transform.process_trace(&trace);
629
630        let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
631        assert!(!stats.is_empty());
632
633        // The hits should be weighted (approximately 2 due to 0.5 sample rate)
634        let bucket = &stats[0].stats()[0];
635        let grouped = &bucket.stats()[0];
636        // With stochastic rounding, hits could be 1 or 2, but with weight 2.0 it should round to 2
637        assert!(grouped.hits() >= 1, "Expected weighted hits");
638    }
639
640    #[test]
641    fn test_force_flush() {
642        let now = now_nanos();
643        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
644
645        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
646
647        // Add a span
648        let span = make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None);
649        let trace = Trace::new(vec![span]);
650
651        let payload_key = PayloadAggregationKey {
652            env: MetaString::from("test"),
653            hostname: MetaString::from("host"),
654            ..Default::default()
655        };
656        let infra_tags = InfraTags::default();
657
658        for span in trace.spans() {
659            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
660                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
661            }
662        }
663
664        // ts=0 so that flush always considers buckets not old enough
665        let ts: u64 = 0;
666
667        // Without force flush, should skip the bucket
668        let stats = concentrator.flush(ts, false);
669        assert!(stats.is_empty(), "Non-force flush should return empty");
670
671        // With force flush, should flush buckets regardless of age
672        let stats = concentrator.flush(ts, true);
673        assert!(!stats.is_empty(), "Force flush should return stats");
674        assert_eq!(stats[0].stats().len(), 1, "Should have 1 bucket");
675    }
676
677    #[test]
678    fn test_ignores_partial_spans() {
679        let now = now_nanos();
680        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
681
682        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
683
684        // Create a partial span (has _dd.partial_version metric)
685        let mut metrics = FastHashMap::default();
686        metrics.insert(MetaString::from("_top_level"), 1.0);
687        metrics.insert(MetaString::from(METRIC_PARTIAL_VERSION), 830604.0);
688
689        let span = test_span(aligned_now, 1, 0, 50, 5, "A1", "resource1", 0, None, Some(metrics));
690        let trace = Trace::new(vec![span]);
691
692        let payload_key = PayloadAggregationKey {
693            env: MetaString::from("test"),
694            hostname: MetaString::from("tracer-hostname"),
695            ..Default::default()
696        };
697        let infra_tags = InfraTags::default();
698
699        for span in trace.spans() {
700            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
701                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
702            }
703        }
704
705        // Partial spans should be ignored
706        let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
707        assert!(stats.is_empty(), "Partial spans should be ignored");
708    }
709
710    #[test]
711    fn test_concentrator_stats_totals() {
712        let now = now_nanos();
713        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
714
715        // Set oldestTs to allow old buckets
716        let oldest_ts = aligned_now - 2 * BUCKET_DURATION_NS;
717        let mut concentrator = SpanConcentrator::new(true, true, &[], oldest_ts);
718
719        // Build spans spread over time windows
720        let spans = vec![
721            make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
722            make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
723            make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
724            make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
725            make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
726            make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
727        ];
728
729        let payload_key = PayloadAggregationKey {
730            env: MetaString::from("none"),
731            ..Default::default()
732        };
733        let infra_tags = InfraTags::default();
734
735        for span in &spans {
736            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
737                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
738            }
739        }
740
741        // Flush all and collect totals
742        let all_stats = concentrator.flush(now + BUCKET_DURATION_NS * 10, true);
743
744        let mut total_duration: u64 = 0;
745        let mut total_hits: u64 = 0;
746        let mut total_errors: u64 = 0;
747        let mut total_top_level_hits: u64 = 0;
748
749        for payload in &all_stats {
750            for bucket in payload.stats() {
751                for grouped in bucket.stats() {
752                    total_duration += grouped.duration();
753                    total_hits += grouped.hits();
754                    total_errors += grouped.errors();
755                    total_top_level_hits += grouped.top_level_hits();
756                }
757            }
758        }
759
760        assert_eq!(total_duration, 50 + 40 + 30 + 20 + 10 + 1, "Wrong total duration");
761        assert_eq!(total_hits, 6, "Wrong total hits");
762        assert_eq!(total_top_level_hits, 6, "Wrong total top level hits");
763        assert_eq!(total_errors, 0, "Wrong total errors");
764    }
765
766    #[test]
767    fn test_root_tag() {
768        let now = now_nanos();
769        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
770
771        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
772
773        // Root span (parent_id = 0, top_level)
774        let mut root_metrics = FastHashMap::default();
775        root_metrics.insert(MetaString::from("_top_level"), 1.0);
776        let root_span = test_span(
777            aligned_now,
778            1,
779            0,
780            40,
781            10,
782            "A1",
783            "resource1",
784            0,
785            None,
786            Some(root_metrics),
787        );
788
789        // Non-root but top level span (has _top_level but parent_id != 0)
790        let mut top_level_metrics = FastHashMap::default();
791        top_level_metrics.insert(MetaString::from("_top_level"), 1.0);
792        let top_level_span = test_span(
793            aligned_now,
794            4,
795            1000,
796            10,
797            10,
798            "A1",
799            "resource1",
800            0,
801            None,
802            Some(top_level_metrics),
803        );
804
805        // Client span (non-root, non-top level, but has span.kind = client)
806        let mut client_meta = FastHashMap::default();
807        client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
808        let client_span = test_span(aligned_now, 3, 2, 20, 10, "A1", "resource1", 0, Some(client_meta), None);
809
810        let spans = vec![root_span, top_level_span, client_span];
811
812        let payload_key = PayloadAggregationKey {
813            env: MetaString::from("none"),
814            ..Default::default()
815        };
816        let infra_tags = InfraTags::default();
817
818        for span in &spans {
819            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
820                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
821            }
822        }
823
824        let stats = concentrator.flush(now + BUCKET_DURATION_NS * 20, true);
825        assert!(!stats.is_empty(), "Should have stats");
826
827        // Count grouped stats - should be split by IsTraceRoot
828        let mut total_grouped = 0;
829        let mut root_count = 0;
830        let mut non_root_count = 0;
831
832        for payload in &stats {
833            for bucket in payload.stats() {
834                for grouped in bucket.stats() {
835                    total_grouped += 1;
836                    match grouped.is_trace_root() {
837                        Some(true) => root_count += 1,
838                        Some(false) => non_root_count += 1,
839                        None => {}
840                    }
841                }
842            }
843        }
844
845        // We expect 3 grouped stats:
846        // 1. Root span (is_trace_root = true)
847        // 2. Non-root top-level span (is_trace_root = false)
848        // 3. Client span (is_trace_root = false, span.kind = client)
849        assert_eq!(total_grouped, 3, "Expected 3 grouped stats");
850        assert_eq!(root_count, 1, "Expected 1 root span");
851        assert_eq!(non_root_count, 2, "Expected 2 non-root spans");
852    }
853
854    #[test]
855    fn test_compute_stats_through_span_kind_check() {
856        let now = now_nanos();
857
858        // Test with compute_stats_by_span_kind DISABLED
859        {
860            let mut concentrator = SpanConcentrator::new(false, true, &[], now);
861
862            let mut attrs = FastHashMap::default();
863            attrs.insert(MetaString::from("_top_level"), AttributeValue::Float(1.0));
864            let span = Span::new("myservice", "query", "GET /users", "web", 1, 0, now, 500, 0).with_attributes(attrs);
865
866            let payload_key = PayloadAggregationKey {
867                env: MetaString::from("test"),
868                ..Default::default()
869            };
870            let infra_tags = InfraTags::default();
871
872            if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
873                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
874            }
875
876            // Client span with span.kind=client but no _top_level or _dd.measured
877            // Should NOT produce stats when compute_stats_by_span_kind is disabled
878            let mut client_attrs = FastHashMap::default();
879            client_attrs.insert(
880                MetaString::from("span.kind"),
881                AttributeValue::String(MetaString::from("client")),
882            );
883            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 2, 1, now, 75, 0)
884                .with_attributes(client_attrs);
885
886            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
887                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
888            }
889
890            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
891
892            let mut count = 0;
893            for payload in &stats {
894                for bucket in payload.stats() {
895                    count += bucket.stats().len();
896                }
897            }
898
899            // When disabled, only top_level span gets stats (client span has no top_level/measured)
900            assert_eq!(count, 1, "Expected 1 stat when span kind check disabled");
901        }
902
903        // Test with compute_stats_by_span_kind ENABLED
904        {
905            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
906
907            let mut attrs = FastHashMap::default();
908            attrs.insert(MetaString::from("_top_level"), AttributeValue::Float(1.0));
909            let span = Span::new("myservice", "query", "GET /users", "web", 1, 0, now, 500, 0).with_attributes(attrs);
910
911            let payload_key = PayloadAggregationKey {
912                env: MetaString::from("test"),
913                ..Default::default()
914            };
915            let infra_tags = InfraTags::default();
916
917            if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
918                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
919            }
920
921            // Client span with span.kind=client
922            // SHOULD produce stats when compute_stats_by_span_kind is enabled
923            let mut client_attrs = FastHashMap::default();
924            client_attrs.insert(
925                MetaString::from("span.kind"),
926                AttributeValue::String(MetaString::from("client")),
927            );
928            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 2, 1, now, 75, 0)
929                .with_attributes(client_attrs);
930
931            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
932                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
933            }
934
935            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
936
937            let mut count = 0;
938            for payload in &stats {
939                for bucket in payload.stats() {
940                    count += bucket.stats().len();
941                }
942            }
943
944            // When enabled, both spans get stats
945            assert_eq!(count, 2, "Expected 2 stats when span kind check enabled");
946        }
947    }
948
949    #[test]
950    fn test_peer_tags() {
951        let now = now_nanos();
952
953        // Test without peer tags aggregation enabled
954        {
955            let mut concentrator = SpanConcentrator::new(true, false, &[], now);
956
957            let mut attrs = FastHashMap::default();
958            attrs.insert(
959                MetaString::from("span.kind"),
960                AttributeValue::String(MetaString::from("client")),
961            );
962            attrs.insert(
963                MetaString::from("db.instance"),
964                AttributeValue::String(MetaString::from("i-1234")),
965            );
966            attrs.insert(
967                MetaString::from("db.system"),
968                AttributeValue::String(MetaString::from("postgres")),
969            );
970            attrs.insert(MetaString::from("_dd.measured"), AttributeValue::Float(1.0));
971            let client_span =
972                Span::new("myservice", "postgres.query", "SELECT ...", "db", 2, 1, now, 75, 0).with_attributes(attrs);
973
974            let payload_key = PayloadAggregationKey {
975                env: MetaString::from("test"),
976                ..Default::default()
977            };
978            let infra_tags = InfraTags::default();
979
980            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
981                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
982            }
983
984            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
985
986            // Without peer tags aggregation, peer_tags should be empty
987            for payload in &stats {
988                for bucket in payload.stats() {
989                    for grouped in bucket.stats() {
990                        assert!(
991                            grouped.peer_tags().is_empty(),
992                            "Peer tags should be empty when peer_tags_aggregation is false"
993                        );
994                    }
995                }
996            }
997        }
998
999        // Test with peer tags aggregation enabled
1000        {
1001            // Note: BASE_PEER_TAGS already includes db.instance and db.system
1002            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
1003
1004            let mut attrs = FastHashMap::default();
1005            attrs.insert(
1006                MetaString::from("span.kind"),
1007                AttributeValue::String(MetaString::from("client")),
1008            );
1009            attrs.insert(
1010                MetaString::from("db.instance"),
1011                AttributeValue::String(MetaString::from("i-1234")),
1012            );
1013            attrs.insert(
1014                MetaString::from("db.system"),
1015                AttributeValue::String(MetaString::from("postgres")),
1016            );
1017            attrs.insert(MetaString::from("_dd.measured"), AttributeValue::Float(1.0));
1018            let client_span =
1019                Span::new("myservice", "postgres.query", "SELECT ...", "db", 2, 1, now, 75, 0).with_attributes(attrs);
1020
1021            let payload_key = PayloadAggregationKey {
1022                env: MetaString::from("test"),
1023                ..Default::default()
1024            };
1025            let infra_tags = InfraTags::default();
1026
1027            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
1028                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
1029            }
1030
1031            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
1032
1033            // With peer tags aggregation, client span should have peer_tags
1034            let mut found_client_with_peer_tags = false;
1035            for payload in &stats {
1036                for bucket in payload.stats() {
1037                    for grouped in bucket.stats() {
1038                        if grouped.resource() == "SELECT ..." {
1039                            assert!(!grouped.peer_tags().is_empty(), "Client span should have peer tags");
1040                            // Check that peer tags contain db.instance and db.system
1041                            let peer_tags: Vec<&str> = grouped.peer_tags().iter().map(|s| s.as_ref()).collect();
1042                            assert!(
1043                                peer_tags.iter().any(|t| t.starts_with("db.instance:")),
1044                                "Should have db.instance peer tag"
1045                            );
1046                            assert!(
1047                                peer_tags.iter().any(|t| t.starts_with("db.system:")),
1048                                "Should have db.system peer tag"
1049                            );
1050                            found_client_with_peer_tags = true;
1051                        }
1052                    }
1053                }
1054            }
1055            assert!(
1056                found_client_with_peer_tags,
1057                "Should have found client span with peer tags"
1058            );
1059        }
1060    }
1061
1062    #[test]
1063    fn test_concentrator_oldest_ts() {
1064        let now = now_nanos();
1065        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
1066
1067        // Test "cold" scenario - all spans in the past should end up in current bucket
1068        {
1069            // Start concentrator at current time (cold start)
1070            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
1071
1072            // Build spans spread over many time windows (all in the past)
1073            let spans = vec![
1074                make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
1075                make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
1076                make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
1077                make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
1078                make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
1079                make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
1080            ];
1081
1082            let payload_key = PayloadAggregationKey {
1083                env: MetaString::from("none"),
1084                ..Default::default()
1085            };
1086            let infra_tags = InfraTags::default();
1087
1088            for span in &spans {
1089                if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
1090                    concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
1091                }
1092            }
1093
1094            // Flush multiple times without force
1095            let mut flush_time = now;
1096            let buffer_len = 2; // DEFAULT_BUFFER_LEN
1097
1098            for _ in 0..buffer_len {
1099                let stats = concentrator.flush(flush_time, false);
1100                assert!(stats.is_empty(), "Should not flush before buffer fills");
1101                flush_time += BUCKET_DURATION_NS;
1102            }
1103
1104            // After buffer_len flushes, should get aggregated stats
1105            let stats = concentrator.flush(flush_time, false);
1106            assert!(!stats.is_empty(), "Should flush after buffer fills");
1107
1108            // All spans should be aggregated into one bucket (oldest bucket aggregates old data)
1109            let mut total_hits: u64 = 0;
1110            let mut total_duration: u64 = 0;
1111            for payload in &stats {
1112                for bucket in payload.stats() {
1113                    for grouped in bucket.stats() {
1114                        total_hits += grouped.hits();
1115                        total_duration += grouped.duration();
1116                    }
1117                }
1118            }
1119
1120            assert_eq!(total_hits, 6, "All 6 spans should be counted");
1121            assert_eq!(
1122                total_duration,
1123                50 + 40 + 30 + 20 + 10 + 1,
1124                "Total duration should match"
1125            );
1126        }
1127    }
1128
1129    #[test]
1130    fn test_compute_stats_for_span_kind() {
1131        use super::span_concentrator::compute_stats_for_span_kind;
1132
1133        // Valid span kinds (case insensitive)
1134        assert!(compute_stats_for_span_kind("server"));
1135        assert!(compute_stats_for_span_kind("consumer"));
1136        assert!(compute_stats_for_span_kind("client"));
1137        assert!(compute_stats_for_span_kind("producer"));
1138
1139        // Uppercase
1140        assert!(compute_stats_for_span_kind("SERVER"));
1141        assert!(compute_stats_for_span_kind("CONSUMER"));
1142        assert!(compute_stats_for_span_kind("CLIENT"));
1143        assert!(compute_stats_for_span_kind("PRODUCER"));
1144
1145        // Mixed case
1146        assert!(compute_stats_for_span_kind("SErVER"));
1147        assert!(compute_stats_for_span_kind("COnSUMER"));
1148        assert!(compute_stats_for_span_kind("CLiENT"));
1149        assert!(compute_stats_for_span_kind("PRoDUCER"));
1150
1151        // Invalid span kinds
1152        assert!(!compute_stats_for_span_kind("internal"));
1153        assert!(!compute_stats_for_span_kind("INTERNAL"));
1154        assert!(!compute_stats_for_span_kind("INtERNAL"));
1155        assert!(!compute_stats_for_span_kind(""));
1156    }
1157
1158    #[test]
1159    fn test_extract_process_tags() {
1160        // Test with no process tags
1161        {
1162            let span = Span::default();
1163            let trace = Trace::new(vec![span]);
1164            let process_tags = extract_process_tags(&trace);
1165            assert!(process_tags.is_empty(), "Should be empty when no _dd.tags.process");
1166        }
1167
1168        // Test with process tags in first span meta
1169        {
1170            let mut attrs = FastHashMap::default();
1171            attrs.insert(
1172                MetaString::from(TAG_PROCESS_TAGS),
1173                AttributeValue::String(MetaString::from("a:1,b:2,c:3")),
1174            );
1175            let span = Span::default().with_attributes(attrs);
1176            let trace = Trace::new(vec![span]);
1177            let process_tags = extract_process_tags(&trace);
1178            assert_eq!(process_tags, "a:1,b:2,c:3");
1179        }
1180
1181        // Test with empty process tags
1182        {
1183            let mut attrs = FastHashMap::default();
1184            attrs.insert(
1185                MetaString::from(TAG_PROCESS_TAGS),
1186                AttributeValue::String(MetaString::from("")),
1187            );
1188            let span = Span::default().with_attributes(attrs);
1189            let trace = Trace::new(vec![span]);
1190            let process_tags = extract_process_tags(&trace);
1191            assert!(
1192                process_tags.is_empty(),
1193                "Should be empty when _dd.tags.process is empty string"
1194            );
1195        }
1196
1197        // Test with empty trace
1198        {
1199            let trace = Trace::new(vec![]);
1200            let process_tags = extract_process_tags(&trace);
1201            assert!(process_tags.is_empty(), "Should be empty when trace has no spans");
1202        }
1203    }
1204
1205    #[test]
1206    fn test_process_tags_hash_computation() {
1207        use super::aggregation::process_tags_hash;
1208
1209        // Empty string should return 0
1210        assert_eq!(process_tags_hash(""), 0);
1211
1212        // Same tags should produce same hash
1213        let hash1 = process_tags_hash("a:1,b:2,c:3");
1214        let hash2 = process_tags_hash("a:1,b:2,c:3");
1215        assert_eq!(hash1, hash2);
1216
1217        // Different tags should produce different hash
1218        let hash3 = process_tags_hash("a:1,b:2");
1219        assert_ne!(hash1, hash3);
1220    }
1221
1222    // Helper to create a ClientGroupedStats for testing
1223    fn make_grouped_stats(service: &str, resource: &str) -> ClientGroupedStats {
1224        ClientGroupedStats::new(service, "operation", resource)
1225            .with_hits(1)
1226            .with_duration(100)
1227    }
1228
1229    // Helper to create a ClientStatsBucket with N stats
1230    fn make_bucket_with_stats(n: usize) -> ClientStatsBucket {
1231        let stats: Vec<ClientGroupedStats> = (0..n)
1232            .map(|i| make_grouped_stats("service", &format!("resource-{}", i)))
1233            .collect();
1234        ClientStatsBucket::new(1000, 10_000_000_000, stats)
1235    }
1236
1237    // Helper to create a ClientStatsPayload with specified buckets
1238    fn make_payload_with_buckets(hostname: &str, buckets: Vec<ClientStatsBucket>) -> ClientStatsPayload {
1239        ClientStatsPayload::new(hostname, "test-env", "1.0.0")
1240            .with_stats(buckets)
1241            .with_container_id("container-123")
1242            .with_lang("rust")
1243    }
1244
1245    // Helper to count total ClientGroupedStats across all payloads in a TraceStats
1246    fn count_grouped_stats(trace_stats: &TraceStats) -> usize {
1247        trace_stats
1248            .stats()
1249            .iter()
1250            .flat_map(|p| p.stats())
1251            .map(|b| b.stats().len())
1252            .sum()
1253    }
1254
1255    #[test]
1256    fn test_split_into_trace_stats_empty_input() {
1257        let result = split_into_trace_stats(vec![], 100);
1258        assert!(result.is_empty());
1259    }
1260
1261    #[test]
1262    fn test_split_into_trace_stats_no_split_needed() {
1263        // 50 stats with max 100 - no split needed
1264        let bucket = make_bucket_with_stats(50);
1265        let payload = make_payload_with_buckets("host1", vec![bucket]);
1266
1267        let result = split_into_trace_stats(vec![payload], 100);
1268
1269        assert_eq!(result.len(), 1);
1270        assert_eq!(count_grouped_stats(&result[0]), 50);
1271    }
1272
1273    #[test]
1274    fn test_split_into_trace_stats_exact_threshold() {
1275        // Exactly 100 stats with max 100 - no split needed
1276        let bucket = make_bucket_with_stats(100);
1277        let payload = make_payload_with_buckets("host1", vec![bucket]);
1278
1279        let result = split_into_trace_stats(vec![payload], 100);
1280
1281        assert_eq!(result.len(), 1);
1282        assert_eq!(count_grouped_stats(&result[0]), 100);
1283    }
1284
1285    #[test]
1286    fn test_split_into_trace_stats_splits_single_bucket() {
1287        // 250 stats in one bucket with max 100 - should split into 3 TraceStats
1288        let bucket = make_bucket_with_stats(250);
1289        let payload = make_payload_with_buckets("host1", vec![bucket]);
1290
1291        let result = split_into_trace_stats(vec![payload], 100);
1292
1293        assert_eq!(result.len(), 3);
1294        assert_eq!(count_grouped_stats(&result[0]), 100);
1295        assert_eq!(count_grouped_stats(&result[1]), 100);
1296        assert_eq!(count_grouped_stats(&result[2]), 50);
1297
1298        // Total should be preserved
1299        let total: usize = result.iter().map(count_grouped_stats).sum();
1300        assert_eq!(total, 250);
1301    }
1302
1303    #[test]
1304    fn test_split_into_trace_stats_splits_across_payloads() {
1305        // Two payloads with 60 stats each, max 100 - should combine into 2 TraceStats
1306        let payload1 = make_payload_with_buckets("host1", vec![make_bucket_with_stats(60)]);
1307        let payload2 = make_payload_with_buckets("host2", vec![make_bucket_with_stats(60)]);
1308
1309        let result = split_into_trace_stats(vec![payload1, payload2], 100);
1310
1311        assert_eq!(result.len(), 2);
1312        // First TraceStats: 60 from payload1 + 40 from payload2 = 100
1313        assert_eq!(count_grouped_stats(&result[0]), 100);
1314        // Second TraceStats: remaining 20 from payload2
1315        assert_eq!(count_grouped_stats(&result[1]), 20);
1316    }
1317
1318    #[test]
1319    fn test_split_into_trace_stats_splits_single_payload_multiple_buckets() {
1320        // One payload with two buckets (70 + 80 = 150 stats), max 100
1321        let bucket1 = make_bucket_with_stats(70);
1322        let bucket2 = make_bucket_with_stats(80);
1323        let payload = make_payload_with_buckets("host1", vec![bucket1, bucket2]);
1324
1325        let result = split_into_trace_stats(vec![payload], 100);
1326
1327        assert_eq!(result.len(), 2);
1328        let total: usize = result.iter().map(count_grouped_stats).sum();
1329        assert_eq!(total, 150);
1330    }
1331
1332    #[test]
1333    fn test_split_into_trace_stats_preserves_metadata() {
1334        let bucket = make_bucket_with_stats(250);
1335        let payload = ClientStatsPayload::new("test-host", "prod", "2.0.0")
1336            .with_stats(vec![bucket])
1337            .with_container_id("container-abc")
1338            .with_lang("go")
1339            .with_git_commit_sha("abc123")
1340            .with_image_tag("v1.2.3")
1341            .with_process_tags_hash(12345)
1342            .with_process_tags("tag1,tag2");
1343
1344        let result = split_into_trace_stats(vec![payload], 100);
1345
1346        // All split payloads should have the same metadata
1347        for trace_stats in &result {
1348            for p in trace_stats.stats() {
1349                assert_eq!(p.hostname(), "test-host");
1350                assert_eq!(p.env(), "prod");
1351                assert_eq!(p.version(), "2.0.0");
1352                assert_eq!(p.container_id(), "container-abc");
1353                assert_eq!(p.lang(), "go");
1354                assert_eq!(p.git_commit_sha(), "abc123");
1355                assert_eq!(p.image_tag(), "v1.2.3");
1356                assert_eq!(p.process_tags_hash(), 12345);
1357                assert_eq!(p.process_tags(), "tag1,tag2");
1358            }
1359        }
1360    }
1361
1362    #[test]
1363    fn test_split_into_trace_stats_preserves_bucket_metadata() {
1364        // Create a bucket with specific start/duration/time_shift
1365        let stats: Vec<ClientGroupedStats> = (0..150)
1366            .map(|i| make_grouped_stats("svc", &format!("res-{}", i)))
1367            .collect();
1368        let bucket = ClientStatsBucket::new(999_000_000, 10_000_000_000, stats).with_agent_time_shift(42);
1369        let payload = make_payload_with_buckets("host1", vec![bucket]);
1370
1371        let result = split_into_trace_stats(vec![payload], 100);
1372
1373        // All split buckets should have the same start/duration/time_shift
1374        for trace_stats in &result {
1375            for p in trace_stats.stats() {
1376                for b in p.stats() {
1377                    assert_eq!(b.start(), 999_000_000);
1378                    assert_eq!(b.duration(), 10_000_000_000);
1379                    assert_eq!(b.agent_time_shift(), 42);
1380                }
1381            }
1382        }
1383    }
1384
1385    #[test]
1386    fn test_split_into_trace_stats_handles_empty_bucket() {
1387        let empty_bucket = ClientStatsBucket::new(1000, 10_000_000_000, vec![]);
1388        let payload = make_payload_with_buckets("host1", vec![empty_bucket]);
1389
1390        let result = split_into_trace_stats(vec![payload], 100);
1391
1392        assert_eq!(result.len(), 1);
1393        assert_eq!(count_grouped_stats(&result[0]), 0);
1394    }
1395
1396    #[test]
1397    fn test_split_into_trace_stats_large_split() {
1398        // 10,000 stats with max 4000 - should produce 3 TraceStats
1399        let bucket = make_bucket_with_stats(10_000);
1400        let payload = make_payload_with_buckets("host1", vec![bucket]);
1401
1402        let result = split_into_trace_stats(vec![payload], 4000);
1403
1404        assert_eq!(result.len(), 3);
1405        assert_eq!(count_grouped_stats(&result[0]), 4000);
1406        assert_eq!(count_grouped_stats(&result[1]), 4000);
1407        assert_eq!(count_grouped_stats(&result[2]), 2000);
1408    }
1409
1410    // Property test strategies
1411
1412    /// Strategy to generate arbitrary ClientGroupedStats.
1413    fn arb_grouped_stats() -> impl Strategy<Value = ClientGroupedStats> {
1414        (0..100u64, 0..1000u64).prop_map(|(hits, duration)| {
1415            ClientGroupedStats::new("service", "operation", "resource")
1416                .with_hits(hits)
1417                .with_duration(duration)
1418        })
1419    }
1420
1421    /// Strategy to generate a bucket with 0..=max_stats_per_bucket grouped stats.
1422    fn arb_bucket(max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsBucket> {
1423        proptest::collection::vec(arb_grouped_stats(), 0..=max_stats_per_bucket)
1424            .prop_map(|stats| ClientStatsBucket::new(1000, 10_000_000_000, stats))
1425    }
1426
1427    /// Strategy to generate a payload with 1..=max_buckets buckets.
1428    fn arb_payload(max_buckets: usize, max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsPayload> {
1429        proptest::collection::vec(arb_bucket(max_stats_per_bucket), 1..=max_buckets)
1430            .prop_map(|buckets| ClientStatsPayload::new("host", "env", "1.0.0").with_stats(buckets))
1431    }
1432
1433    /// Strategy to generate test inputs for the split function.
1434    ///
1435    /// Parameters:
1436    /// - `num_payloads`: 1..=10
1437    /// - `num_buckets_per_payload`: 1..=5
1438    /// - `num_stats_per_bucket`: 0..=500
1439    /// - `max_entries_per_event`: 1..=1000 (never zero)
1440    fn arb_split_inputs() -> impl Strategy<Value = (Vec<ClientStatsPayload>, usize)> {
1441        let payloads_strategy = proptest::collection::vec(arb_payload(5, 500), 1..=10);
1442        let max_entries_strategy = 1..=1000usize;
1443
1444        (payloads_strategy, max_entries_strategy)
1445    }
1446
1447    // Mirrors Go agent behavior: for OTLP traces the version on the root span's meta["version"]
1448    // (set with span-over-resource precedence by otel_span_to_dd_span) should win over the
1449    // resource-level version carried in payload.app_version.
1450    // See: pkg/trace/version/version.go:GetAppVersionFromTrace and pkg/trace/api/otlp.go.
1451    #[test]
1452    fn test_version_span_beats_resource_for_otlp() {
1453        let now = now_nanos();
1454        let concentrator = SpanConcentrator::new(true, true, &[], now);
1455        let transform = ApmStats {
1456            concentrator,
1457            flush_interval: DEFAULT_FLUSH_INTERVAL,
1458            agent_env: MetaString::default(),
1459            agent_hostname: MetaString::default(),
1460            workload_provider: None,
1461        };
1462
1463        // Root span carries a span-level version attribute (set by otel_span_to_dd_span with
1464        // span-over-resource precedence).
1465        let mut attrs = FastHashMap::default();
1466        attrs.insert(
1467            MetaString::from("version"),
1468            AttributeValue::String(MetaString::from("span-v2")),
1469        );
1470        let root_span = Span::new("svc", "op", "res", "web", 1, 0, now, 1_000_000, 0).with_attributes(attrs);
1471
1472        let mut trace = Trace::new(vec![root_span]);
1473        // Simulate OTLP resource extraction: payload.app_version comes from resource attrs only.
1474        trace.payload.app_version = MetaString::from("resource-v1");
1475
1476        let key = transform.build_payload_key(&trace, "");
1477
1478        // The Go agent resolves version from root_span.meta["version"] first, which carries
1479        // span-over-resource precedence. The span attribute ("span-v2") must win.
1480        assert_eq!(
1481            key.version.as_ref(),
1482            "span-v2",
1483            "span-level version must take precedence over resource-level payload.app_version for OTLP traces"
1484        );
1485    }
1486
1487    #[test_strategy::proptest]
1488    #[cfg_attr(miri, ignore)]
1489    fn property_test_split_respects_max_entries(
1490        #[strategy(arb_split_inputs())] inputs: (Vec<ClientStatsPayload>, usize),
1491    ) {
1492        let (payloads, max_entries_per_event) = inputs;
1493
1494        let input_total: usize = payloads.iter().flat_map(|p| p.stats()).map(|b| b.stats().len()).sum();
1495
1496        let result = split_into_trace_stats(payloads, max_entries_per_event);
1497
1498        // Property 1: No TraceStats should exceed max_entries_per_event
1499        for trace_stats in &result {
1500            let count = count_grouped_stats(trace_stats);
1501            prop_assert!(
1502                count <= max_entries_per_event,
1503                "TraceStats has {} grouped stats, exceeds max of {}",
1504                count,
1505                max_entries_per_event
1506            );
1507        }
1508
1509        // Property 2: Total stats should be preserved
1510        let output_total: usize = result.iter().map(count_grouped_stats).sum();
1511        prop_assert_eq!(input_total, output_total, "Total stats count should be preserved");
1512    }
1513}