Skip to main content

saluki_components/transforms/apm_stats/
mod.rs

1//! APM Stats transform.
2//!
3//! Aggregates traces into time-bucketed statistics, producing `TraceStats` events.
4
5use std::{
6    sync::Arc,
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use async_trait::async_trait;
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{CONTAINER_ID, K8S_POD_UID};
13use saluki_config::GenericConfiguration;
14use saluki_context::{origin::OriginTagCardinality, tags::TagSet};
15use saluki_core::{
16    components::{transforms::*, ComponentContext},
17    data_model::event::{
18        trace::Trace,
19        trace_stats::{ClientStatsPayload, TraceStats},
20        Event, EventType,
21    },
22    topology::OutputDefinition,
23};
24use saluki_env::{
25    host::providers::BoxedHostProvider, workload::EntityId, EnvironmentProvider, HostProvider, WorkloadProvider,
26};
27use saluki_error::{ErrorContext as _, GenericError};
28use stringtheory::MetaString;
29use tokio::{select, time::interval};
30use tracing::{debug, error};
31
32use crate::common::{
33    datadog::apm::ApmConfig,
34    otlp::util::{extract_container_tags_from_resource_tagset, KEY_DATADOG_CONTAINER_ID},
35};
36
37mod aggregation;
38use self::aggregation::{process_tags_hash, PayloadAggregationKey};
39
40mod span_concentrator;
41use self::span_concentrator::{InfraTags, SpanConcentrator};
42
43mod statsraw;
44
45mod weight;
46use self::weight::weight;
47
48/// Default flush interval for the APM stats transform.
49const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
50
51/// Tag key for process tags in span meta.
52const TAG_PROCESS_TAGS: &str = "_dd.tags.process";
53
54/// Maximum number of `ClientGroupedStats` entries per `TraceStats` event.
55const MAX_STATS_GROUPS_PER_EVENT: usize = 4000;
56
57/// APM Stats transform configuration.
58///
59/// Aggregates incoming `Trace` events into time-bucketed statistics, emitting
60/// `TraceStats` events.
61pub struct ApmStatsTransformConfiguration {
62    apm_config: ApmConfig,
63    default_hostname: Option<String>,
64    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
65}
66
67impl ApmStatsTransformConfiguration {
68    /// Creates a new `ApmStatsTransformConfiguration` from the given configuration.
69    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
70        let apm_config = ApmConfig::from_configuration(config)?;
71        Ok(Self {
72            apm_config,
73            default_hostname: None,
74            workload_provider: None,
75        })
76    }
77
78    /// Sets the default hostname using the environment provider.
79    pub async fn with_environment_provider<E>(mut self, env_provider: E) -> Result<Self, GenericError>
80    where
81        E: EnvironmentProvider<Host = BoxedHostProvider>,
82    {
83        let hostname = env_provider.host().get_hostname().await?;
84        self.default_hostname = Some(hostname);
85        Ok(self)
86    }
87
88    /// Sets the workload provider.
89    ///
90    /// Defaults to unset.
91    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
92    where
93        W: WorkloadProvider + Send + Sync + 'static,
94    {
95        self.workload_provider = Some(Arc::new(workload_provider));
96        self
97    }
98}
99
100#[async_trait]
101impl TransformBuilder for ApmStatsTransformConfiguration {
102    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
103        let mut apm_config = self.apm_config.clone();
104
105        if let Some(hostname) = &self.default_hostname {
106            apm_config.set_hostname_if_empty(hostname.as_str());
107        }
108
109        let concentrator = SpanConcentrator::new(
110            apm_config.compute_stats_by_span_kind(),
111            apm_config.peer_tags_aggregation(),
112            apm_config.peer_tags(),
113            now_nanos(),
114        );
115
116        Ok(Box::new(ApmStats {
117            concentrator,
118            flush_interval: DEFAULT_FLUSH_INTERVAL,
119            agent_env: apm_config.default_env().clone(),
120            agent_hostname: apm_config.hostname().clone(),
121            workload_provider: self.workload_provider.clone(),
122        }))
123    }
124
125    fn input_event_type(&self) -> EventType {
126        EventType::Trace
127    }
128
129    fn outputs(&self) -> &[OutputDefinition<EventType>] {
130        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::TraceStats)];
131        OUTPUTS
132    }
133}
134
135impl MemoryBounds for ApmStatsTransformConfiguration {
136    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
137        builder.minimum().with_single_value::<ApmStats>("component struct");
138        // TODO: Think about everything we need to account for here.
139    }
140}
141
142struct ApmStats {
143    concentrator: SpanConcentrator,
144    flush_interval: Duration,
145    agent_env: MetaString,
146    agent_hostname: MetaString,
147    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
148}
149
150impl ApmStats {
151    fn process_trace(&mut self, trace: &Trace) {
152        let root_span = trace
153            .spans()
154            .iter()
155            .find(|s| s.parent_id() == 0)
156            .or_else(|| trace.spans().first());
157
158        let trace_weight = root_span.map(weight).unwrap_or(1.0);
159
160        let process_tags = extract_process_tags(trace);
161
162        let payload_key = self.build_payload_key(trace, &process_tags);
163        let infra_tags = self.build_infra_tags(trace, &process_tags);
164
165        let origin = trace
166            .spans()
167            .first()
168            .and_then(|s| s.meta().get("_dd.origin"))
169            .map(|s| s.as_ref())
170            .unwrap_or("");
171
172        for span in trace.spans() {
173            if let Some(stat_span) = self.concentrator.new_stat_span_from_span(span) {
174                self.concentrator
175                    .add_span(&stat_span, trace_weight, &payload_key, &infra_tags, origin);
176            }
177        }
178    }
179
180    fn build_infra_tags(&self, trace: &Trace, process_tags: &str) -> InfraTags {
181        let resource_tags = trace.resource_tags();
182        let container_id = resolve_container_id(resource_tags);
183        let mut container_tags = if container_id.is_empty() {
184            TagSet::default()
185        } else {
186            extract_container_tags(resource_tags)
187        };
188
189        // Query the workload provider for additional container tags.
190        if !container_id.is_empty() {
191            if let Some(workload_provider) = &self.workload_provider {
192                let entity_id = EntityId::Container(container_id.clone());
193                if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
194                    container_tags.merge_shared(&tags);
195                }
196            }
197        }
198
199        InfraTags::new(container_id, container_tags, process_tags)
200    }
201
202    fn build_payload_key(&self, trace: &Trace, process_tags: &str) -> PayloadAggregationKey {
203        let root_span = trace
204            .spans()
205            .iter()
206            .find(|s| s.parent_id() == 0)
207            .or_else(|| trace.spans().first());
208
209        let span_env = root_span.and_then(|s| s.meta().get("env")).filter(|s| !s.is_empty());
210        let env = span_env.cloned().unwrap_or_else(|| self.agent_env.clone());
211
212        let hostname = root_span
213            .and_then(|s| s.meta().get("_dd.hostname"))
214            .filter(|s| !s.is_empty())
215            .cloned()
216            .unwrap_or_else(|| self.agent_hostname.clone());
217
218        let version = root_span
219            .and_then(|s| s.meta().get("version"))
220            .cloned()
221            .unwrap_or_default();
222
223        let container_id = root_span
224            .and_then(|s| s.meta().get("_dd.container_id"))
225            .cloned()
226            .unwrap_or_default();
227
228        let git_commit_sha = root_span
229            .and_then(|s| s.meta().get("_dd.git.commit.sha"))
230            .cloned()
231            .unwrap_or_default();
232
233        let image_tag = root_span
234            .and_then(|s| s.meta().get("_dd.image_tag"))
235            .cloned()
236            .unwrap_or_default();
237
238        let lang = root_span
239            .and_then(|s| s.meta().get("language"))
240            .cloned()
241            .unwrap_or_default();
242
243        PayloadAggregationKey {
244            env,
245            hostname,
246            version,
247            container_id,
248            git_commit_sha,
249            image_tag,
250            lang,
251            process_tags_hash: process_tags_hash(process_tags),
252        }
253    }
254}
255
256/// Splits stats payloads into multiple `TraceStats` events, each containing at most `max_entries_per_event` grouped
257/// entries.
258///
259/// This function will not attempt to collapse/pack payloads to optimize the number of events generated: there will always
260/// be at least as many output client payloads as there are input client payloads.
261fn split_into_trace_stats(client_payloads: Vec<ClientStatsPayload>, max_entries_per_event: usize) -> Vec<TraceStats> {
262    if client_payloads.is_empty() {
263        return Vec::new();
264    }
265
266    // If the total number of grouped stats entries is below the specified threshold, then we don't do any splitting or
267    // collapsing or anything.
268    let total_grouped_entries = client_payloads
269        .iter()
270        .map(|p| p.stats().iter().map(|b| b.stats().len()).sum::<usize>())
271        .sum::<usize>();
272    if total_grouped_entries <= max_entries_per_event {
273        return vec![TraceStats::new(client_payloads)];
274    }
275
276    let mut events = Vec::new();
277    let mut current_client_payloads = Vec::new();
278    let mut current_event_len = 0;
279
280    for mut client_payload in client_payloads {
281        // If the current payload can fit entirely in the current event, then collect it as-is.
282        let client_payload_len = client_payload.stats().iter().map(|b| b.stats().len()).sum::<usize>();
283        if current_event_len + client_payload_len <= max_entries_per_event {
284            current_client_payloads.push(client_payload);
285            current_event_len += client_payload_len;
286            continue;
287        }
288
289        // Consume all the stats buckets from the current client payload, and set ourselves up to go through each
290        // bucket, splitting them as necessary.
291        //
292        // We basically iterate until we find a bucket where adding its entries would cause us to exceed our threshold,
293        // and then we split that bucket, creating a new client payload in the process, and keep repeating that until
294        // we've exhausted all buckets for our starting client payload.
295        let mut current_client_stats_buckets = Vec::new();
296        for mut client_stats_bucket in client_payload.take_stats() {
297            let bucket_len = client_stats_bucket.stats().len();
298            // If this bucket can fit in the current event, then collect it as-is.
299            if current_event_len + bucket_len <= max_entries_per_event {
300                current_client_stats_buckets.push(client_stats_bucket);
301                current_event_len += bucket_len;
302                continue;
303            }
304
305            // We have to split this bucket. We take the grouped entries it has, and we subdivide those into a new
306            // client bucket, or buckets in order to ensure we don't exceed our threshold.
307            let mut bucket_entries = client_stats_bucket.take_stats();
308            while current_event_len + bucket_entries.len() > max_entries_per_event {
309                // We calculate the split point this way because the returned vector from `split_off` is referenced
310                // against the end of the vector, so if we have 100 items, and we only want 20, we need to split at
311                // index 80 (100 - 20 == 80).
312                let split_amount = max_entries_per_event - current_event_len;
313                let split_point = bucket_entries.len() - split_amount;
314                let split_entries = bucket_entries.split_off(split_point);
315
316                // Create a new "split" bucket based on the current bucket (`client_stats_bucket`) but containing only
317                // the split-off entries. This will feed into the current client payload (`client_payload`) which we'll
318                // finalize and add to the current event before starting a new event.
319                let split_bucket = client_stats_bucket.clone().with_stats(split_entries);
320                current_client_stats_buckets.push(split_bucket);
321
322                let split_client_payload = client_payload
323                    .clone()
324                    .with_stats(std::mem::take(&mut current_client_stats_buckets));
325                current_client_payloads.push(split_client_payload);
326
327                events.push(TraceStats::new(std::mem::take(&mut current_client_payloads)));
328                current_event_len = 0;
329            }
330
331            // If we have any leftover entries from the bucket after splitting it up, put them back in the current bucket
332            // and add that bucket to the current client payload.
333            if !bucket_entries.is_empty() {
334                current_event_len += bucket_entries.len();
335                current_client_stats_buckets.push(client_stats_bucket.with_stats(bucket_entries));
336            }
337        }
338
339        // If we have buckets for this client payload (whether we split or not), add them back to the current client payload.
340        if !current_client_stats_buckets.is_empty() {
341            current_client_payloads.push(client_payload.with_stats(current_client_stats_buckets));
342        }
343    }
344
345    // Stick the remaining entries into a final `TraceStats` event, if any.
346    if !current_client_payloads.is_empty() {
347        events.push(TraceStats::new(current_client_payloads));
348    }
349
350    events
351}
352
353#[async_trait]
354impl Transform for ApmStats {
355    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
356        let mut health = context.take_health_handle();
357
358        let mut flush_ticker = interval(self.flush_interval);
359        flush_ticker.tick().await;
360
361        let mut final_flush = false;
362
363        health.mark_ready();
364        debug!("APM Stats transform started.");
365
366        loop {
367            select! {
368                _ = health.live() => continue,
369
370                _ = flush_ticker.tick() => {
371                    let stats_payloads = self.concentrator.flush(now_nanos(), final_flush);
372                    if !stats_payloads.is_empty() {
373                        debug!(stats_payloads = stats_payloads.len(), "Flushing APM stats.");
374
375                        let events = split_into_trace_stats(stats_payloads, MAX_STATS_GROUPS_PER_EVENT);
376                        let dispatcher = context.dispatcher().buffered()
377                            .error_context("Default output should be available.")?;
378
379                        if let Err(e) = dispatcher.send_all(events.into_iter().map(Event::TraceStats)).await {
380                            error!(error = %e, "Failed to dispatch events.");
381                        }
382                    }
383
384                    if final_flush {
385                        debug!("Final APM stats flush complete.");
386                        break;
387                    }
388                },
389
390                maybe_events = context.events().next(), if !final_flush => {
391                    match maybe_events {
392                        Some(events) => {
393                            for event in events {
394                                if let Event::Trace(trace) = event {
395                                    self.process_trace(&trace);
396                                }
397                            }
398                        },
399                        None => {
400                            // We've reached the end of our input stream, so mark ourselves for a final flush and reset the
401                            // interval so it ticks immediately on the next loop iteration.
402                            final_flush = true;
403                            flush_ticker.reset_immediately();
404                            debug!("APM Stats transform stopping, triggering final flush...");
405                        }
406                    }
407                },
408            }
409        }
410
411        debug!("APM Stats transform stopped.");
412        Ok(())
413    }
414}
415
416/// Returns the current time as nanoseconds since Unix epoch.
417fn now_nanos() -> u64 {
418    SystemTime::now()
419        .duration_since(UNIX_EPOCH)
420        .unwrap_or_default()
421        .as_nanos() as u64
422}
423
424/// Resolves container ID from OTLP resource tags.
425fn resolve_container_id(resource_tags: &TagSet) -> MetaString {
426    for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
427        if let Some(tag) = resource_tags.get_single_tag(key) {
428            if let Some(value) = tag.value() {
429                if !value.is_empty() {
430                    return MetaString::from(value);
431                }
432            }
433        }
434    }
435
436    MetaString::empty()
437}
438
439/// Extracts container tags from OTLP resource tags.
440fn extract_container_tags(resource_tags: &TagSet) -> TagSet {
441    let mut container_tags_set = TagSet::default();
442    extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags_set);
443
444    container_tags_set
445}
446
447/// Extracts process tags from trace.
448fn extract_process_tags(trace: &Trace) -> MetaString {
449    if let Some(first_span) = trace.spans().first() {
450        if let Some(process_tags) = first_span.meta().get(TAG_PROCESS_TAGS) {
451            return process_tags.clone();
452        }
453    }
454
455    MetaString::empty()
456}
457
458#[cfg(test)]
459mod tests {
460    use proptest::prelude::*;
461    use saluki_common::collections::FastHashMap;
462    use saluki_context::tags::TagSet;
463    use saluki_core::data_model::event::trace_stats::ClientStatsBucket;
464    use saluki_core::data_model::event::{trace::Span, trace_stats::ClientGroupedStats};
465
466    use super::aggregation::BUCKET_DURATION_NS;
467    use super::span_concentrator::METRIC_PARTIAL_VERSION;
468    use super::*;
469
470    /// Helper to align timestamp to bucket boundary
471    fn align_ts(ts: u64, bsize: u64) -> u64 {
472        ts - ts % bsize
473    }
474
475    /// Creates a test span with the given parameters.
476    #[allow(clippy::too_many_arguments)]
477    fn test_span(
478        aligned_now: u64, span_id: u64, parent_id: u64, duration: u64, bucket_offset: u64, service: &str,
479        resource: &str, error: i32, meta: Option<FastHashMap<MetaString, MetaString>>,
480        metrics: Option<FastHashMap<MetaString, f64>>,
481    ) -> Span {
482        // Calculate start time so that span ends in the correct bucket
483        // End time = start + duration, and we want end time to be in bucket (aligned_now - offset * bsize)
484        // Use BUCKET_DURATION_NS as the bucket size (matches the concentrator)
485        let bucket_start = aligned_now - bucket_offset * BUCKET_DURATION_NS;
486        let start = bucket_start - duration;
487
488        Span::new(
489            service, "query", resource, "db", 1, span_id, parent_id, start, duration, error,
490        )
491        .with_meta(meta)
492        .with_metrics(metrics)
493    }
494
495    /// Creates a simple measured span for basic tests
496    fn make_test_span(service: &str, name: &str, resource: &str) -> Span {
497        let mut metrics = FastHashMap::default();
498        metrics.insert(MetaString::from("_dd.measured"), 1.0);
499
500        Span::new(service, name, resource, "web", 1, 1, 0, 1000000000, 100000000, 0).with_metrics(metrics)
501    }
502
503    /// Creates a top-level span (parent_id = 0, has _top_level metric)
504    fn make_top_level_span(
505        aligned_now: u64, span_id: u64, duration: u64, bucket_offset: u64, service: &str, resource: &str, error: i32,
506        meta: Option<FastHashMap<MetaString, MetaString>>,
507    ) -> Span {
508        let mut metrics = FastHashMap::default();
509        metrics.insert(MetaString::from("_top_level"), 1.0);
510        test_span(
511            aligned_now,
512            span_id,
513            0,
514            duration,
515            bucket_offset,
516            service,
517            resource,
518            error,
519            meta,
520            Some(metrics),
521        )
522    }
523
524    #[test]
525    fn test_process_trace_creates_stats() {
526        let now = now_nanos();
527
528        let concentrator = SpanConcentrator::new(true, true, &[], now);
529        let mut transform = ApmStats {
530            concentrator,
531            flush_interval: DEFAULT_FLUSH_INTERVAL,
532            agent_env: MetaString::from("none"),
533            agent_hostname: MetaString::default(),
534            workload_provider: None,
535        };
536
537        let span = make_test_span("test-service", "test-operation", "test-resource");
538        let trace = Trace::new(vec![span], TagSet::default());
539
540        transform.process_trace(&trace);
541
542        // Flush and verify we got stats
543        let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
544        assert!(!stats.is_empty(), "Expected stats to be produced");
545    }
546
547    #[test]
548    fn test_weight_applied_to_stats() {
549        let now = now_nanos();
550
551        let concentrator = SpanConcentrator::new(true, true, &[], now);
552        let mut transform = ApmStats {
553            concentrator,
554            flush_interval: DEFAULT_FLUSH_INTERVAL,
555            agent_env: MetaString::from("none"),
556            agent_hostname: MetaString::default(),
557            workload_provider: None,
558        };
559
560        // Create a span with 0.5 sample rate (weight = 2.0)
561        let mut metrics = FastHashMap::default();
562        metrics.insert(MetaString::from("_dd.measured"), 1.0);
563        metrics.insert(MetaString::from("_sample_rate"), 0.5);
564
565        let span = Span::new(
566            "test-service",
567            "test-op",
568            "test-resource",
569            "web",
570            1,
571            1,
572            0,
573            now,
574            100000000,
575            0,
576        )
577        .with_metrics(metrics);
578
579        let trace = Trace::new(vec![span], TagSet::default());
580        transform.process_trace(&trace);
581
582        let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
583        assert!(!stats.is_empty());
584
585        // The hits should be weighted (approximately 2 due to 0.5 sample rate)
586        let bucket = &stats[0].stats()[0];
587        let grouped = &bucket.stats()[0];
588        // With stochastic rounding, hits could be 1 or 2, but with weight 2.0 it should round to 2
589        assert!(grouped.hits() >= 1, "Expected weighted hits");
590    }
591
592    #[test]
593    fn test_force_flush() {
594        let now = now_nanos();
595        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
596
597        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
598
599        // Add a span
600        let span = make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None);
601        let trace = Trace::new(vec![span], TagSet::default());
602
603        let payload_key = PayloadAggregationKey {
604            env: MetaString::from("test"),
605            hostname: MetaString::from("host"),
606            ..Default::default()
607        };
608        let infra_tags = InfraTags::default();
609
610        for span in trace.spans() {
611            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
612                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
613            }
614        }
615
616        // ts=0 so that flush always considers buckets not old enough
617        let ts: u64 = 0;
618
619        // Without force flush, should skip the bucket
620        let stats = concentrator.flush(ts, false);
621        assert!(stats.is_empty(), "Non-force flush should return empty");
622
623        // With force flush, should flush buckets regardless of age
624        let stats = concentrator.flush(ts, true);
625        assert!(!stats.is_empty(), "Force flush should return stats");
626        assert_eq!(stats[0].stats().len(), 1, "Should have 1 bucket");
627    }
628
629    #[test]
630    fn test_ignores_partial_spans() {
631        let now = now_nanos();
632        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
633
634        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
635
636        // Create a partial span (has _dd.partial_version metric)
637        let mut metrics = FastHashMap::default();
638        metrics.insert(MetaString::from("_top_level"), 1.0);
639        metrics.insert(MetaString::from(METRIC_PARTIAL_VERSION), 830604.0);
640
641        let span = test_span(aligned_now, 1, 0, 50, 5, "A1", "resource1", 0, None, Some(metrics));
642        let trace = Trace::new(vec![span], TagSet::default());
643
644        let payload_key = PayloadAggregationKey {
645            env: MetaString::from("test"),
646            hostname: MetaString::from("tracer-hostname"),
647            ..Default::default()
648        };
649        let infra_tags = InfraTags::default();
650
651        for span in trace.spans() {
652            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
653                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
654            }
655        }
656
657        // Partial spans should be ignored
658        let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
659        assert!(stats.is_empty(), "Partial spans should be ignored");
660    }
661
662    #[test]
663    fn test_concentrator_stats_totals() {
664        let now = now_nanos();
665        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
666
667        // Set oldestTs to allow old buckets
668        let oldest_ts = aligned_now - 2 * BUCKET_DURATION_NS;
669        let mut concentrator = SpanConcentrator::new(true, true, &[], oldest_ts);
670
671        // Build spans spread over time windows
672        let spans = vec![
673            make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
674            make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
675            make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
676            make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
677            make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
678            make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
679        ];
680
681        let payload_key = PayloadAggregationKey {
682            env: MetaString::from("none"),
683            ..Default::default()
684        };
685        let infra_tags = InfraTags::default();
686
687        for span in &spans {
688            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
689                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
690            }
691        }
692
693        // Flush all and collect totals
694        let all_stats = concentrator.flush(now + BUCKET_DURATION_NS * 10, true);
695
696        let mut total_duration: u64 = 0;
697        let mut total_hits: u64 = 0;
698        let mut total_errors: u64 = 0;
699        let mut total_top_level_hits: u64 = 0;
700
701        for payload in &all_stats {
702            for bucket in payload.stats() {
703                for grouped in bucket.stats() {
704                    total_duration += grouped.duration();
705                    total_hits += grouped.hits();
706                    total_errors += grouped.errors();
707                    total_top_level_hits += grouped.top_level_hits();
708                }
709            }
710        }
711
712        assert_eq!(total_duration, 50 + 40 + 30 + 20 + 10 + 1, "Wrong total duration");
713        assert_eq!(total_hits, 6, "Wrong total hits");
714        assert_eq!(total_top_level_hits, 6, "Wrong total top level hits");
715        assert_eq!(total_errors, 0, "Wrong total errors");
716    }
717
718    #[test]
719    fn test_root_tag() {
720        let now = now_nanos();
721        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
722
723        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
724
725        // Root span (parent_id = 0, top_level)
726        let mut root_metrics = FastHashMap::default();
727        root_metrics.insert(MetaString::from("_top_level"), 1.0);
728        let root_span = test_span(
729            aligned_now,
730            1,
731            0,
732            40,
733            10,
734            "A1",
735            "resource1",
736            0,
737            None,
738            Some(root_metrics),
739        );
740
741        // Non-root but top level span (has _top_level but parent_id != 0)
742        let mut top_level_metrics = FastHashMap::default();
743        top_level_metrics.insert(MetaString::from("_top_level"), 1.0);
744        let top_level_span = test_span(
745            aligned_now,
746            4,
747            1000,
748            10,
749            10,
750            "A1",
751            "resource1",
752            0,
753            None,
754            Some(top_level_metrics),
755        );
756
757        // Client span (non-root, non-top level, but has span.kind = client)
758        let mut client_meta = FastHashMap::default();
759        client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
760        let client_span = test_span(aligned_now, 3, 2, 20, 10, "A1", "resource1", 0, Some(client_meta), None);
761
762        let spans = vec![root_span, top_level_span, client_span];
763
764        let payload_key = PayloadAggregationKey {
765            env: MetaString::from("none"),
766            ..Default::default()
767        };
768        let infra_tags = InfraTags::default();
769
770        for span in &spans {
771            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
772                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
773            }
774        }
775
776        let stats = concentrator.flush(now + BUCKET_DURATION_NS * 20, true);
777        assert!(!stats.is_empty(), "Should have stats");
778
779        // Count grouped stats - should be split by IsTraceRoot
780        let mut total_grouped = 0;
781        let mut root_count = 0;
782        let mut non_root_count = 0;
783
784        for payload in &stats {
785            for bucket in payload.stats() {
786                for grouped in bucket.stats() {
787                    total_grouped += 1;
788                    match grouped.is_trace_root() {
789                        Some(true) => root_count += 1,
790                        Some(false) => non_root_count += 1,
791                        None => {}
792                    }
793                }
794            }
795        }
796
797        // We expect 3 grouped stats:
798        // 1. Root span (is_trace_root = true)
799        // 2. Non-root top-level span (is_trace_root = false)
800        // 3. Client span (is_trace_root = false, span.kind = client)
801        assert_eq!(total_grouped, 3, "Expected 3 grouped stats");
802        assert_eq!(root_count, 1, "Expected 1 root span");
803        assert_eq!(non_root_count, 2, "Expected 2 non-root spans");
804    }
805
806    #[test]
807    fn test_compute_stats_through_span_kind_check() {
808        let now = now_nanos();
809
810        // Test with compute_stats_by_span_kind DISABLED
811        {
812            let mut concentrator = SpanConcentrator::new(false, true, &[], now);
813
814            // Create a simple top-level span using the same pattern as make_test_span (which works)
815            let mut metrics = FastHashMap::default();
816            metrics.insert(MetaString::from("_top_level"), 1.0);
817            let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
818
819            let payload_key = PayloadAggregationKey {
820                env: MetaString::from("test"),
821                ..Default::default()
822            };
823            let infra_tags = InfraTags::default();
824
825            if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
826                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
827            }
828
829            // Client span with span.kind=client but no _top_level or _dd.measured
830            // Should NOT produce stats when compute_stats_by_span_kind is disabled
831            let mut client_meta = FastHashMap::default();
832            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
833            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
834                .with_meta(client_meta);
835
836            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
837                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
838            }
839
840            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
841
842            let mut count = 0;
843            for payload in &stats {
844                for bucket in payload.stats() {
845                    count += bucket.stats().len();
846                }
847            }
848
849            // When disabled, only top_level span gets stats (client span has no top_level/measured)
850            assert_eq!(count, 1, "Expected 1 stat when span kind check disabled");
851        }
852
853        // Test with compute_stats_by_span_kind ENABLED
854        {
855            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
856
857            // Create a simple top-level span
858            let mut metrics = FastHashMap::default();
859            metrics.insert(MetaString::from("_top_level"), 1.0);
860            let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
861
862            let payload_key = PayloadAggregationKey {
863                env: MetaString::from("test"),
864                ..Default::default()
865            };
866            let infra_tags = InfraTags::default();
867
868            if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
869                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
870            }
871
872            // Client span with span.kind=client
873            // SHOULD produce stats when compute_stats_by_span_kind is enabled
874            let mut client_meta = FastHashMap::default();
875            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
876            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
877                .with_meta(client_meta);
878
879            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
880                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
881            }
882
883            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
884
885            let mut count = 0;
886            for payload in &stats {
887                for bucket in payload.stats() {
888                    count += bucket.stats().len();
889                }
890            }
891
892            // When enabled, both spans get stats
893            assert_eq!(count, 2, "Expected 2 stats when span kind check enabled");
894        }
895    }
896
897    #[test]
898    fn test_peer_tags() {
899        let now = now_nanos();
900
901        // Test without peer tags aggregation enabled
902        {
903            let mut concentrator = SpanConcentrator::new(true, false, &[], now);
904
905            // Client span with db tags and _dd.measured
906            let mut client_meta = FastHashMap::default();
907            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
908            client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
909            client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
910            let mut client_metrics = FastHashMap::default();
911            client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
912            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
913                .with_meta(client_meta)
914                .with_metrics(client_metrics);
915
916            let payload_key = PayloadAggregationKey {
917                env: MetaString::from("test"),
918                ..Default::default()
919            };
920            let infra_tags = InfraTags::default();
921
922            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
923                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
924            }
925
926            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
927
928            // Without peer tags aggregation, peer_tags should be empty
929            for payload in &stats {
930                for bucket in payload.stats() {
931                    for grouped in bucket.stats() {
932                        assert!(
933                            grouped.peer_tags().is_empty(),
934                            "Peer tags should be empty when peer_tags_aggregation is false"
935                        );
936                    }
937                }
938            }
939        }
940
941        // Test with peer tags aggregation enabled
942        {
943            // Note: BASE_PEER_TAGS already includes db.instance and db.system
944            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
945
946            // Client span with db tags and _dd.measured
947            let mut client_meta = FastHashMap::default();
948            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
949            client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
950            client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
951            let mut client_metrics = FastHashMap::default();
952            client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
953            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
954                .with_meta(client_meta)
955                .with_metrics(client_metrics);
956
957            let payload_key = PayloadAggregationKey {
958                env: MetaString::from("test"),
959                ..Default::default()
960            };
961            let infra_tags = InfraTags::default();
962
963            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
964                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
965            }
966
967            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
968
969            // With peer tags aggregation, client span should have peer_tags
970            let mut found_client_with_peer_tags = false;
971            for payload in &stats {
972                for bucket in payload.stats() {
973                    for grouped in bucket.stats() {
974                        if grouped.resource() == "SELECT ..." {
975                            assert!(!grouped.peer_tags().is_empty(), "Client span should have peer tags");
976                            // Check that peer tags contain db.instance and db.system
977                            let peer_tags: Vec<&str> = grouped.peer_tags().iter().map(|s| s.as_ref()).collect();
978                            assert!(
979                                peer_tags.iter().any(|t| t.starts_with("db.instance:")),
980                                "Should have db.instance peer tag"
981                            );
982                            assert!(
983                                peer_tags.iter().any(|t| t.starts_with("db.system:")),
984                                "Should have db.system peer tag"
985                            );
986                            found_client_with_peer_tags = true;
987                        }
988                    }
989                }
990            }
991            assert!(
992                found_client_with_peer_tags,
993                "Should have found client span with peer tags"
994            );
995        }
996    }
997
998    #[test]
999    fn test_concentrator_oldest_ts() {
1000        let now = now_nanos();
1001        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
1002
1003        // Test "cold" scenario - all spans in the past should end up in current bucket
1004        {
1005            // Start concentrator at current time (cold start)
1006            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
1007
1008            // Build spans spread over many time windows (all in the past)
1009            let spans = vec![
1010                make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
1011                make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
1012                make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
1013                make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
1014                make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
1015                make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
1016            ];
1017
1018            let payload_key = PayloadAggregationKey {
1019                env: MetaString::from("none"),
1020                ..Default::default()
1021            };
1022            let infra_tags = InfraTags::default();
1023
1024            for span in &spans {
1025                if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
1026                    concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
1027                }
1028            }
1029
1030            // Flush multiple times without force
1031            let mut flush_time = now;
1032            let buffer_len = 2; // DEFAULT_BUFFER_LEN
1033
1034            for _ in 0..buffer_len {
1035                let stats = concentrator.flush(flush_time, false);
1036                assert!(stats.is_empty(), "Should not flush before buffer fills");
1037                flush_time += BUCKET_DURATION_NS;
1038            }
1039
1040            // After buffer_len flushes, should get aggregated stats
1041            let stats = concentrator.flush(flush_time, false);
1042            assert!(!stats.is_empty(), "Should flush after buffer fills");
1043
1044            // All spans should be aggregated into one bucket (oldest bucket aggregates old data)
1045            let mut total_hits: u64 = 0;
1046            let mut total_duration: u64 = 0;
1047            for payload in &stats {
1048                for bucket in payload.stats() {
1049                    for grouped in bucket.stats() {
1050                        total_hits += grouped.hits();
1051                        total_duration += grouped.duration();
1052                    }
1053                }
1054            }
1055
1056            assert_eq!(total_hits, 6, "All 6 spans should be counted");
1057            assert_eq!(
1058                total_duration,
1059                50 + 40 + 30 + 20 + 10 + 1,
1060                "Total duration should match"
1061            );
1062        }
1063    }
1064
1065    #[test]
1066    fn test_compute_stats_for_span_kind() {
1067        use super::span_concentrator::compute_stats_for_span_kind;
1068
1069        // Valid span kinds (case insensitive)
1070        assert!(compute_stats_for_span_kind("server"));
1071        assert!(compute_stats_for_span_kind("consumer"));
1072        assert!(compute_stats_for_span_kind("client"));
1073        assert!(compute_stats_for_span_kind("producer"));
1074
1075        // Uppercase
1076        assert!(compute_stats_for_span_kind("SERVER"));
1077        assert!(compute_stats_for_span_kind("CONSUMER"));
1078        assert!(compute_stats_for_span_kind("CLIENT"));
1079        assert!(compute_stats_for_span_kind("PRODUCER"));
1080
1081        // Mixed case
1082        assert!(compute_stats_for_span_kind("SErVER"));
1083        assert!(compute_stats_for_span_kind("COnSUMER"));
1084        assert!(compute_stats_for_span_kind("CLiENT"));
1085        assert!(compute_stats_for_span_kind("PRoDUCER"));
1086
1087        // Invalid span kinds
1088        assert!(!compute_stats_for_span_kind("internal"));
1089        assert!(!compute_stats_for_span_kind("INTERNAL"));
1090        assert!(!compute_stats_for_span_kind("INtERNAL"));
1091        assert!(!compute_stats_for_span_kind(""));
1092    }
1093
1094    #[test]
1095    fn test_extract_process_tags() {
1096        // Test with no process tags
1097        {
1098            let span = Span::default();
1099            let trace = Trace::new(vec![span], TagSet::default());
1100            let process_tags = extract_process_tags(&trace);
1101            assert!(process_tags.is_empty(), "Should be empty when no _dd.tags.process");
1102        }
1103
1104        // Test with process tags in first span meta
1105        {
1106            let mut meta = FastHashMap::default();
1107            meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from("a:1,b:2,c:3"));
1108            let span = Span::default().with_meta(meta);
1109            let trace = Trace::new(vec![span], TagSet::default());
1110            let process_tags = extract_process_tags(&trace);
1111            assert_eq!(process_tags, "a:1,b:2,c:3");
1112        }
1113
1114        // Test with empty process tags
1115        {
1116            let mut meta = FastHashMap::default();
1117            meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from(""));
1118            let span = Span::default().with_meta(meta);
1119            let trace = Trace::new(vec![span], TagSet::default());
1120            let process_tags = extract_process_tags(&trace);
1121            assert!(
1122                process_tags.is_empty(),
1123                "Should be empty when _dd.tags.process is empty string"
1124            );
1125        }
1126
1127        // Test with empty trace
1128        {
1129            let trace = Trace::new(vec![], TagSet::default());
1130            let process_tags = extract_process_tags(&trace);
1131            assert!(process_tags.is_empty(), "Should be empty when trace has no spans");
1132        }
1133    }
1134
1135    #[test]
1136    fn test_process_tags_hash_computation() {
1137        use super::aggregation::process_tags_hash;
1138
1139        // Empty string should return 0
1140        assert_eq!(process_tags_hash(""), 0);
1141
1142        // Same tags should produce same hash
1143        let hash1 = process_tags_hash("a:1,b:2,c:3");
1144        let hash2 = process_tags_hash("a:1,b:2,c:3");
1145        assert_eq!(hash1, hash2);
1146
1147        // Different tags should produce different hash
1148        let hash3 = process_tags_hash("a:1,b:2");
1149        assert_ne!(hash1, hash3);
1150    }
1151
1152    // Helper to create a ClientGroupedStats for testing
1153    fn make_grouped_stats(service: &str, resource: &str) -> ClientGroupedStats {
1154        ClientGroupedStats::new(service, "operation", resource)
1155            .with_hits(1)
1156            .with_duration(100)
1157    }
1158
1159    // Helper to create a ClientStatsBucket with N stats
1160    fn make_bucket_with_stats(n: usize) -> ClientStatsBucket {
1161        let stats: Vec<ClientGroupedStats> = (0..n)
1162            .map(|i| make_grouped_stats("service", &format!("resource-{}", i)))
1163            .collect();
1164        ClientStatsBucket::new(1000, 10_000_000_000, stats)
1165    }
1166
1167    // Helper to create a ClientStatsPayload with specified buckets
1168    fn make_payload_with_buckets(hostname: &str, buckets: Vec<ClientStatsBucket>) -> ClientStatsPayload {
1169        ClientStatsPayload::new(hostname, "test-env", "1.0.0")
1170            .with_stats(buckets)
1171            .with_container_id("container-123")
1172            .with_lang("rust")
1173    }
1174
1175    // Helper to count total ClientGroupedStats across all payloads in a TraceStats
1176    fn count_grouped_stats(trace_stats: &TraceStats) -> usize {
1177        trace_stats
1178            .stats()
1179            .iter()
1180            .flat_map(|p| p.stats())
1181            .map(|b| b.stats().len())
1182            .sum()
1183    }
1184
1185    #[test]
1186    fn test_split_into_trace_stats_empty_input() {
1187        let result = split_into_trace_stats(vec![], 100);
1188        assert!(result.is_empty());
1189    }
1190
1191    #[test]
1192    fn test_split_into_trace_stats_no_split_needed() {
1193        // 50 stats with max 100 - no split needed
1194        let bucket = make_bucket_with_stats(50);
1195        let payload = make_payload_with_buckets("host1", vec![bucket]);
1196
1197        let result = split_into_trace_stats(vec![payload], 100);
1198
1199        assert_eq!(result.len(), 1);
1200        assert_eq!(count_grouped_stats(&result[0]), 50);
1201    }
1202
1203    #[test]
1204    fn test_split_into_trace_stats_exact_threshold() {
1205        // Exactly 100 stats with max 100 - no split needed
1206        let bucket = make_bucket_with_stats(100);
1207        let payload = make_payload_with_buckets("host1", vec![bucket]);
1208
1209        let result = split_into_trace_stats(vec![payload], 100);
1210
1211        assert_eq!(result.len(), 1);
1212        assert_eq!(count_grouped_stats(&result[0]), 100);
1213    }
1214
1215    #[test]
1216    fn test_split_into_trace_stats_splits_single_bucket() {
1217        // 250 stats in one bucket with max 100 - should split into 3 TraceStats
1218        let bucket = make_bucket_with_stats(250);
1219        let payload = make_payload_with_buckets("host1", vec![bucket]);
1220
1221        let result = split_into_trace_stats(vec![payload], 100);
1222
1223        assert_eq!(result.len(), 3);
1224        assert_eq!(count_grouped_stats(&result[0]), 100);
1225        assert_eq!(count_grouped_stats(&result[1]), 100);
1226        assert_eq!(count_grouped_stats(&result[2]), 50);
1227
1228        // Total should be preserved
1229        let total: usize = result.iter().map(count_grouped_stats).sum();
1230        assert_eq!(total, 250);
1231    }
1232
1233    #[test]
1234    fn test_split_into_trace_stats_splits_across_payloads() {
1235        // Two payloads with 60 stats each, max 100 - should combine into 2 TraceStats
1236        let payload1 = make_payload_with_buckets("host1", vec![make_bucket_with_stats(60)]);
1237        let payload2 = make_payload_with_buckets("host2", vec![make_bucket_with_stats(60)]);
1238
1239        let result = split_into_trace_stats(vec![payload1, payload2], 100);
1240
1241        assert_eq!(result.len(), 2);
1242        // First TraceStats: 60 from payload1 + 40 from payload2 = 100
1243        assert_eq!(count_grouped_stats(&result[0]), 100);
1244        // Second TraceStats: remaining 20 from payload2
1245        assert_eq!(count_grouped_stats(&result[1]), 20);
1246    }
1247
1248    #[test]
1249    fn test_split_into_trace_stats_splits_single_payload_multiple_buckets() {
1250        // One payload with two buckets (70 + 80 = 150 stats), max 100
1251        let bucket1 = make_bucket_with_stats(70);
1252        let bucket2 = make_bucket_with_stats(80);
1253        let payload = make_payload_with_buckets("host1", vec![bucket1, bucket2]);
1254
1255        let result = split_into_trace_stats(vec![payload], 100);
1256
1257        assert_eq!(result.len(), 2);
1258        let total: usize = result.iter().map(count_grouped_stats).sum();
1259        assert_eq!(total, 150);
1260    }
1261
1262    #[test]
1263    fn test_split_into_trace_stats_preserves_metadata() {
1264        let bucket = make_bucket_with_stats(250);
1265        let payload = ClientStatsPayload::new("test-host", "prod", "2.0.0")
1266            .with_stats(vec![bucket])
1267            .with_container_id("container-abc")
1268            .with_lang("go")
1269            .with_git_commit_sha("abc123")
1270            .with_image_tag("v1.2.3")
1271            .with_process_tags_hash(12345)
1272            .with_process_tags("tag1,tag2");
1273
1274        let result = split_into_trace_stats(vec![payload], 100);
1275
1276        // All split payloads should have the same metadata
1277        for trace_stats in &result {
1278            for p in trace_stats.stats() {
1279                assert_eq!(p.hostname(), "test-host");
1280                assert_eq!(p.env(), "prod");
1281                assert_eq!(p.version(), "2.0.0");
1282                assert_eq!(p.container_id(), "container-abc");
1283                assert_eq!(p.lang(), "go");
1284                assert_eq!(p.git_commit_sha(), "abc123");
1285                assert_eq!(p.image_tag(), "v1.2.3");
1286                assert_eq!(p.process_tags_hash(), 12345);
1287                assert_eq!(p.process_tags(), "tag1,tag2");
1288            }
1289        }
1290    }
1291
1292    #[test]
1293    fn test_split_into_trace_stats_preserves_bucket_metadata() {
1294        // Create a bucket with specific start/duration/time_shift
1295        let stats: Vec<ClientGroupedStats> = (0..150)
1296            .map(|i| make_grouped_stats("svc", &format!("res-{}", i)))
1297            .collect();
1298        let bucket = ClientStatsBucket::new(999_000_000, 10_000_000_000, stats).with_agent_time_shift(42);
1299        let payload = make_payload_with_buckets("host1", vec![bucket]);
1300
1301        let result = split_into_trace_stats(vec![payload], 100);
1302
1303        // All split buckets should have the same start/duration/time_shift
1304        for trace_stats in &result {
1305            for p in trace_stats.stats() {
1306                for b in p.stats() {
1307                    assert_eq!(b.start(), 999_000_000);
1308                    assert_eq!(b.duration(), 10_000_000_000);
1309                    assert_eq!(b.agent_time_shift(), 42);
1310                }
1311            }
1312        }
1313    }
1314
1315    #[test]
1316    fn test_split_into_trace_stats_handles_empty_bucket() {
1317        let empty_bucket = ClientStatsBucket::new(1000, 10_000_000_000, vec![]);
1318        let payload = make_payload_with_buckets("host1", vec![empty_bucket]);
1319
1320        let result = split_into_trace_stats(vec![payload], 100);
1321
1322        assert_eq!(result.len(), 1);
1323        assert_eq!(count_grouped_stats(&result[0]), 0);
1324    }
1325
1326    #[test]
1327    fn test_split_into_trace_stats_large_split() {
1328        // 10,000 stats with max 4000 - should produce 3 TraceStats
1329        let bucket = make_bucket_with_stats(10_000);
1330        let payload = make_payload_with_buckets("host1", vec![bucket]);
1331
1332        let result = split_into_trace_stats(vec![payload], 4000);
1333
1334        assert_eq!(result.len(), 3);
1335        assert_eq!(count_grouped_stats(&result[0]), 4000);
1336        assert_eq!(count_grouped_stats(&result[1]), 4000);
1337        assert_eq!(count_grouped_stats(&result[2]), 2000);
1338    }
1339
1340    // Property test strategies
1341
1342    /// Strategy to generate arbitrary ClientGroupedStats.
1343    fn arb_grouped_stats() -> impl Strategy<Value = ClientGroupedStats> {
1344        (0..100u64, 0..1000u64).prop_map(|(hits, duration)| {
1345            ClientGroupedStats::new("service", "operation", "resource")
1346                .with_hits(hits)
1347                .with_duration(duration)
1348        })
1349    }
1350
1351    /// Strategy to generate a bucket with 0..=max_stats_per_bucket grouped stats.
1352    fn arb_bucket(max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsBucket> {
1353        proptest::collection::vec(arb_grouped_stats(), 0..=max_stats_per_bucket)
1354            .prop_map(|stats| ClientStatsBucket::new(1000, 10_000_000_000, stats))
1355    }
1356
1357    /// Strategy to generate a payload with 1..=max_buckets buckets.
1358    fn arb_payload(max_buckets: usize, max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsPayload> {
1359        proptest::collection::vec(arb_bucket(max_stats_per_bucket), 1..=max_buckets)
1360            .prop_map(|buckets| ClientStatsPayload::new("host", "env", "1.0.0").with_stats(buckets))
1361    }
1362
1363    /// Strategy to generate test inputs for the split function.
1364    ///
1365    /// Parameters:
1366    /// - num_payloads: 1..=10
1367    /// - num_buckets_per_payload: 1..=5
1368    /// - num_stats_per_bucket: 0..=500
1369    /// - max_entries_per_event: 1..=1000 (never zero)
1370    fn arb_split_inputs() -> impl Strategy<Value = (Vec<ClientStatsPayload>, usize)> {
1371        let payloads_strategy = proptest::collection::vec(arb_payload(5, 500), 1..=10);
1372        let max_entries_strategy = 1..=1000usize;
1373
1374        (payloads_strategy, max_entries_strategy)
1375    }
1376
1377    #[test_strategy::proptest]
1378    #[cfg_attr(miri, ignore)]
1379    fn property_test_split_respects_max_entries(
1380        #[strategy(arb_split_inputs())] inputs: (Vec<ClientStatsPayload>, usize),
1381    ) {
1382        let (payloads, max_entries_per_event) = inputs;
1383
1384        let input_total: usize = payloads.iter().flat_map(|p| p.stats()).map(|b| b.stats().len()).sum();
1385
1386        let result = split_into_trace_stats(payloads, max_entries_per_event);
1387
1388        // Property 1: No TraceStats should exceed max_entries_per_event
1389        for trace_stats in &result {
1390            let count = count_grouped_stats(trace_stats);
1391            prop_assert!(
1392                count <= max_entries_per_event,
1393                "TraceStats has {} grouped stats, exceeds max of {}",
1394                count,
1395                max_entries_per_event
1396            );
1397        }
1398
1399        // Property 2: Total stats should be preserved
1400        let output_total: usize = result.iter().map(count_grouped_stats).sum();
1401        prop_assert_eq!(input_total, output_total, "Total stats count should be preserved");
1402    }
1403}