Skip to main content

saluki_components/transforms/apm_stats/
mod.rs

1//! APM Stats transform.
2//!
3//! Aggregates traces into time-bucketed statistics, producing `TraceStats` events.
4
5use std::{
6    sync::Arc,
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use async_trait::async_trait;
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{CONTAINER_ID, K8S_POD_UID};
13use saluki_config::GenericConfiguration;
14use saluki_context::{
15    origin::OriginTagCardinality,
16    tags::{SharedTagSet, TagSet},
17};
18use saluki_core::{
19    components::{transforms::*, ComponentContext},
20    data_model::event::{
21        trace::Trace,
22        trace_stats::{ClientStatsPayload, TraceStats},
23        Event, EventType,
24    },
25    topology::OutputDefinition,
26};
27use saluki_env::{
28    host::providers::BoxedHostProvider, workload::EntityId, EnvironmentProvider, HostProvider, WorkloadProvider,
29};
30use saluki_error::{ErrorContext as _, GenericError};
31use stringtheory::MetaString;
32use tokio::{select, time::interval};
33use tracing::{debug, error};
34
35use crate::common::{
36    datadog::apm::ApmConfig,
37    otlp::util::{extract_container_tags_from_resource_tagset, KEY_DATADOG_CONTAINER_ID},
38};
39
40mod aggregation;
41use self::aggregation::{process_tags_hash, PayloadAggregationKey};
42
43mod span_concentrator;
44use self::span_concentrator::{InfraTags, SpanConcentrator};
45
46mod statsraw;
47
48mod weight;
49use self::weight::weight;
50
51/// Default flush interval for the APM stats transform.
52const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
53
54/// Tag key for process tags in span meta.
55const TAG_PROCESS_TAGS: &str = "_dd.tags.process";
56
57/// Maximum number of `ClientGroupedStats` entries per `TraceStats` event.
58const MAX_STATS_GROUPS_PER_EVENT: usize = 4000;
59
60/// APM Stats transform configuration.
61///
62/// Aggregates incoming `Trace` events into time-bucketed statistics, emitting
63/// `TraceStats` events.
64pub struct ApmStatsTransformConfiguration {
65    apm_config: ApmConfig,
66    default_hostname: Option<String>,
67    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
68}
69
70impl ApmStatsTransformConfiguration {
71    /// Creates a new `ApmStatsTransformConfiguration` from the given configuration.
72    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
73        let apm_config = ApmConfig::from_configuration(config)?;
74        Ok(Self {
75            apm_config,
76            default_hostname: None,
77            workload_provider: None,
78        })
79    }
80
81    /// Sets the default hostname using the environment provider.
82    pub async fn with_environment_provider<E>(mut self, env_provider: E) -> Result<Self, GenericError>
83    where
84        E: EnvironmentProvider<Host = BoxedHostProvider>,
85    {
86        let hostname = env_provider.host().get_hostname().await?;
87        self.default_hostname = Some(hostname);
88        Ok(self)
89    }
90
91    /// Sets the workload provider.
92    ///
93    /// Defaults to unset.
94    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
95    where
96        W: WorkloadProvider + Send + Sync + 'static,
97    {
98        self.workload_provider = Some(Arc::new(workload_provider));
99        self
100    }
101}
102
103#[async_trait]
104impl TransformBuilder for ApmStatsTransformConfiguration {
105    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
106        let mut apm_config = self.apm_config.clone();
107
108        if let Some(hostname) = &self.default_hostname {
109            apm_config.set_hostname_if_empty(hostname.as_str());
110        }
111
112        let concentrator = SpanConcentrator::new(
113            apm_config.compute_stats_by_span_kind(),
114            apm_config.peer_tags_aggregation(),
115            apm_config.peer_tags(),
116            now_nanos(),
117        );
118
119        Ok(Box::new(ApmStats {
120            concentrator,
121            flush_interval: DEFAULT_FLUSH_INTERVAL,
122            agent_env: apm_config.default_env().clone(),
123            agent_hostname: apm_config.hostname().clone(),
124            workload_provider: self.workload_provider.clone(),
125        }))
126    }
127
128    fn input_event_type(&self) -> EventType {
129        EventType::Trace
130    }
131
132    fn outputs(&self) -> &[OutputDefinition<EventType>] {
133        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::TraceStats)];
134        OUTPUTS
135    }
136}
137
138impl MemoryBounds for ApmStatsTransformConfiguration {
139    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
140        builder.minimum().with_single_value::<ApmStats>("component struct");
141        // TODO: Think about everything we need to account for here.
142    }
143}
144
145struct ApmStats {
146    concentrator: SpanConcentrator,
147    flush_interval: Duration,
148    agent_env: MetaString,
149    agent_hostname: MetaString,
150    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
151}
152
153impl ApmStats {
154    fn process_trace(&mut self, trace: &Trace) {
155        let root_span = trace
156            .spans()
157            .iter()
158            .find(|s| s.parent_id() == 0)
159            .or_else(|| trace.spans().first());
160
161        let trace_weight = root_span.map(weight).unwrap_or(1.0);
162
163        let process_tags = extract_process_tags(trace);
164
165        let payload_key = self.build_payload_key(trace, &process_tags);
166        let infra_tags = self.build_infra_tags(trace, &process_tags);
167
168        let origin = trace
169            .spans()
170            .first()
171            .and_then(|s| s.meta().get("_dd.origin"))
172            .map(|s| s.as_ref())
173            .unwrap_or("");
174
175        for span in trace.spans() {
176            if let Some(stat_span) = self.concentrator.new_stat_span_from_span(span) {
177                self.concentrator
178                    .add_span(&stat_span, trace_weight, &payload_key, &infra_tags, origin);
179            }
180        }
181    }
182
183    fn build_infra_tags(&self, trace: &Trace, process_tags: &str) -> InfraTags {
184        let resource_tags = trace.resource_tags();
185        let container_id = resolve_container_id(resource_tags);
186        let mut container_tags = if container_id.is_empty() {
187            SharedTagSet::default()
188        } else {
189            extract_container_tags(resource_tags)
190        };
191
192        // Query the workload provider for additional container tags.
193        if !container_id.is_empty() {
194            if let Some(workload_provider) = &self.workload_provider {
195                let entity_id = EntityId::Container(container_id.clone());
196                if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
197                    container_tags.extend_from_shared(&tags);
198                }
199            }
200        }
201
202        InfraTags::new(container_id, container_tags, process_tags)
203    }
204
205    fn build_payload_key(&self, trace: &Trace, process_tags: &str) -> PayloadAggregationKey {
206        let root_span = trace
207            .spans()
208            .iter()
209            .find(|s| s.parent_id() == 0)
210            .or_else(|| trace.spans().first());
211
212        let span_env = root_span.and_then(|s| s.meta().get("env")).filter(|s| !s.is_empty());
213        let env = span_env.cloned().unwrap_or_else(|| self.agent_env.clone());
214
215        let hostname = root_span
216            .and_then(|s| s.meta().get("_dd.hostname"))
217            .filter(|s| !s.is_empty())
218            .cloned()
219            .unwrap_or_else(|| self.agent_hostname.clone());
220
221        let version = root_span
222            .and_then(|s| s.meta().get("version"))
223            .cloned()
224            .unwrap_or_default();
225
226        let container_id = root_span
227            .and_then(|s| s.meta().get("_dd.container_id"))
228            .cloned()
229            .unwrap_or_default();
230
231        let git_commit_sha = root_span
232            .and_then(|s| s.meta().get("_dd.git.commit.sha"))
233            .cloned()
234            .unwrap_or_default();
235
236        let image_tag = root_span
237            .and_then(|s| s.meta().get("_dd.image_tag"))
238            .cloned()
239            .unwrap_or_default();
240
241        let lang = root_span
242            .and_then(|s| s.meta().get("language"))
243            .cloned()
244            .unwrap_or_default();
245
246        PayloadAggregationKey {
247            env,
248            hostname,
249            version,
250            container_id,
251            git_commit_sha,
252            image_tag,
253            lang,
254            process_tags_hash: process_tags_hash(process_tags),
255        }
256    }
257}
258
259/// Splits stats payloads into multiple `TraceStats` events, each containing at most `max_entries_per_event` grouped
260/// entries.
261///
262/// This function will not attempt to collapse/pack payloads to optimize the number of events generated: there will always
263/// be at least as many output client payloads as there are input client payloads.
264fn split_into_trace_stats(client_payloads: Vec<ClientStatsPayload>, max_entries_per_event: usize) -> Vec<TraceStats> {
265    if client_payloads.is_empty() {
266        return Vec::new();
267    }
268
269    // If the total number of grouped stats entries is below the specified threshold, then we don't do any splitting or
270    // collapsing or anything.
271    let total_grouped_entries = client_payloads
272        .iter()
273        .map(|p| p.stats().iter().map(|b| b.stats().len()).sum::<usize>())
274        .sum::<usize>();
275    if total_grouped_entries <= max_entries_per_event {
276        return vec![TraceStats::new(client_payloads)];
277    }
278
279    let mut events = Vec::new();
280    let mut current_client_payloads = Vec::new();
281    let mut current_event_len = 0;
282
283    for mut client_payload in client_payloads {
284        // If the current payload can fit entirely in the current event, then collect it as-is.
285        let client_payload_len = client_payload.stats().iter().map(|b| b.stats().len()).sum::<usize>();
286        if current_event_len + client_payload_len <= max_entries_per_event {
287            current_client_payloads.push(client_payload);
288            current_event_len += client_payload_len;
289            continue;
290        }
291
292        // Consume all the stats buckets from the current client payload, and set ourselves up to go through each
293        // bucket, splitting them as necessary.
294        //
295        // We basically iterate until we find a bucket where adding its entries would cause us to exceed our threshold,
296        // and then we split that bucket, creating a new client payload in the process, and keep repeating that until
297        // we've exhausted all buckets for our starting client payload.
298        let mut current_client_stats_buckets = Vec::new();
299        for mut client_stats_bucket in client_payload.take_stats() {
300            let bucket_len = client_stats_bucket.stats().len();
301            // If this bucket can fit in the current event, then collect it as-is.
302            if current_event_len + bucket_len <= max_entries_per_event {
303                current_client_stats_buckets.push(client_stats_bucket);
304                current_event_len += bucket_len;
305                continue;
306            }
307
308            // We have to split this bucket. We take the grouped entries it has, and we subdivide those into a new
309            // client bucket, or buckets in order to ensure we don't exceed our threshold.
310            let mut bucket_entries = client_stats_bucket.take_stats();
311            while current_event_len + bucket_entries.len() > max_entries_per_event {
312                // We calculate the split point this way because the returned vector from `split_off` is referenced
313                // against the end of the vector, so if we have 100 items, and we only want 20, we need to split at
314                // index 80 (100 - 20 == 80).
315                let split_amount = max_entries_per_event - current_event_len;
316                let split_point = bucket_entries.len() - split_amount;
317                let split_entries = bucket_entries.split_off(split_point);
318
319                // Create a new "split" bucket based on the current bucket (`client_stats_bucket`) but containing only
320                // the split-off entries. This will feed into the current client payload (`client_payload`) which we'll
321                // finalize and add to the current event before starting a new event.
322                let split_bucket = client_stats_bucket.clone().with_stats(split_entries);
323                current_client_stats_buckets.push(split_bucket);
324
325                let split_client_payload = client_payload
326                    .clone()
327                    .with_stats(std::mem::take(&mut current_client_stats_buckets));
328                current_client_payloads.push(split_client_payload);
329
330                events.push(TraceStats::new(std::mem::take(&mut current_client_payloads)));
331                current_event_len = 0;
332            }
333
334            // If we have any leftover entries from the bucket after splitting it up, put them back in the current bucket
335            // and add that bucket to the current client payload.
336            if !bucket_entries.is_empty() {
337                current_event_len += bucket_entries.len();
338                current_client_stats_buckets.push(client_stats_bucket.with_stats(bucket_entries));
339            }
340        }
341
342        // If we have buckets for this client payload (whether we split or not), add them back to the current client payload.
343        if !current_client_stats_buckets.is_empty() {
344            current_client_payloads.push(client_payload.with_stats(current_client_stats_buckets));
345        }
346    }
347
348    // Stick the remaining entries into a final `TraceStats` event, if any.
349    if !current_client_payloads.is_empty() {
350        events.push(TraceStats::new(current_client_payloads));
351    }
352
353    events
354}
355
356#[async_trait]
357impl Transform for ApmStats {
358    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
359        let mut health = context.take_health_handle();
360
361        let mut flush_ticker = interval(self.flush_interval);
362        flush_ticker.tick().await;
363
364        let mut final_flush = false;
365
366        health.mark_ready();
367        debug!("APM Stats transform started.");
368
369        loop {
370            select! {
371                _ = health.live() => continue,
372
373                _ = flush_ticker.tick() => {
374                    let stats_payloads = self.concentrator.flush(now_nanos(), final_flush);
375                    if !stats_payloads.is_empty() {
376                        debug!(stats_payloads = stats_payloads.len(), "Flushing APM stats.");
377
378                        let events = split_into_trace_stats(stats_payloads, MAX_STATS_GROUPS_PER_EVENT);
379                        let dispatcher = context.dispatcher().buffered()
380                            .error_context("Default output should be available.")?;
381
382                        if let Err(e) = dispatcher.send_all(events.into_iter().map(Event::TraceStats)).await {
383                            error!(error = %e, "Failed to dispatch events.");
384                        }
385                    }
386
387                    if final_flush {
388                        debug!("Final APM stats flush complete.");
389                        break;
390                    }
391                },
392
393                maybe_events = context.events().next(), if !final_flush => {
394                    match maybe_events {
395                        Some(events) => {
396                            for event in events {
397                                if let Event::Trace(trace) = event {
398                                    self.process_trace(&trace);
399                                }
400                            }
401                        },
402                        None => {
403                            // We've reached the end of our input stream, so mark ourselves for a final flush and reset the
404                            // interval so it ticks immediately on the next loop iteration.
405                            final_flush = true;
406                            flush_ticker.reset_immediately();
407                            debug!("APM Stats transform stopping, triggering final flush...");
408                        }
409                    }
410                },
411            }
412        }
413
414        debug!("APM Stats transform stopped.");
415        Ok(())
416    }
417}
418
419/// Returns the current time as nanoseconds since Unix epoch.
420fn now_nanos() -> u64 {
421    SystemTime::now()
422        .duration_since(UNIX_EPOCH)
423        .unwrap_or_default()
424        .as_nanos() as u64
425}
426
427/// Resolves container ID from OTLP resource tags.
428fn resolve_container_id(resource_tags: &SharedTagSet) -> MetaString {
429    for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
430        if let Some(tag) = resource_tags.get_single_tag(key) {
431            if let Some(value) = tag.value() {
432                if !value.is_empty() {
433                    return MetaString::from(value);
434                }
435            }
436        }
437    }
438
439    MetaString::empty()
440}
441
442/// Extracts container tags from OTLP resource tags.
443fn extract_container_tags(resource_tags: &SharedTagSet) -> SharedTagSet {
444    let mut container_tags_set = TagSet::default();
445    extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags_set);
446
447    container_tags_set.into_shared()
448}
449
450/// Extracts process tags from trace.
451fn extract_process_tags(trace: &Trace) -> MetaString {
452    if let Some(first_span) = trace.spans().first() {
453        if let Some(process_tags) = first_span.meta().get(TAG_PROCESS_TAGS) {
454            return process_tags.clone();
455        }
456    }
457
458    MetaString::empty()
459}
460
461#[cfg(test)]
462mod tests {
463    use proptest::prelude::*;
464    use saluki_common::collections::FastHashMap;
465    use saluki_context::tags::TagSet;
466    use saluki_core::data_model::event::trace_stats::ClientStatsBucket;
467    use saluki_core::data_model::event::{trace::Span, trace_stats::ClientGroupedStats};
468
469    use super::aggregation::BUCKET_DURATION_NS;
470    use super::span_concentrator::METRIC_PARTIAL_VERSION;
471    use super::*;
472
473    /// Helper to align timestamp to bucket boundary
474    fn align_ts(ts: u64, bsize: u64) -> u64 {
475        ts - ts % bsize
476    }
477
478    /// Creates a test span with the given parameters.
479    #[allow(clippy::too_many_arguments)]
480    fn test_span(
481        aligned_now: u64, span_id: u64, parent_id: u64, duration: u64, bucket_offset: u64, service: &str,
482        resource: &str, error: i32, meta: Option<FastHashMap<MetaString, MetaString>>,
483        metrics: Option<FastHashMap<MetaString, f64>>,
484    ) -> Span {
485        // Calculate start time so that span ends in the correct bucket
486        // End time = start + duration, and we want end time to be in bucket (aligned_now - offset * bsize)
487        // Use BUCKET_DURATION_NS as the bucket size (matches the concentrator)
488        let bucket_start = aligned_now - bucket_offset * BUCKET_DURATION_NS;
489        let start = bucket_start - duration;
490
491        Span::new(
492            service, "query", resource, "db", 1, span_id, parent_id, start, duration, error,
493        )
494        .with_meta(meta)
495        .with_metrics(metrics)
496    }
497
498    /// Creates a simple measured span for basic tests
499    fn make_test_span(service: &str, name: &str, resource: &str) -> Span {
500        let mut metrics = FastHashMap::default();
501        metrics.insert(MetaString::from("_dd.measured"), 1.0);
502
503        Span::new(service, name, resource, "web", 1, 1, 0, 1000000000, 100000000, 0).with_metrics(metrics)
504    }
505
506    /// Creates a top-level span (parent_id = 0, has _top_level metric)
507    fn make_top_level_span(
508        aligned_now: u64, span_id: u64, duration: u64, bucket_offset: u64, service: &str, resource: &str, error: i32,
509        meta: Option<FastHashMap<MetaString, MetaString>>,
510    ) -> Span {
511        let mut metrics = FastHashMap::default();
512        metrics.insert(MetaString::from("_top_level"), 1.0);
513        test_span(
514            aligned_now,
515            span_id,
516            0,
517            duration,
518            bucket_offset,
519            service,
520            resource,
521            error,
522            meta,
523            Some(metrics),
524        )
525    }
526
527    #[test]
528    fn test_process_trace_creates_stats() {
529        let now = now_nanos();
530
531        let concentrator = SpanConcentrator::new(true, true, &[], now);
532        let mut transform = ApmStats {
533            concentrator,
534            flush_interval: DEFAULT_FLUSH_INTERVAL,
535            agent_env: MetaString::from("none"),
536            agent_hostname: MetaString::default(),
537            workload_provider: None,
538        };
539
540        let span = make_test_span("test-service", "test-operation", "test-resource");
541        let trace = Trace::new(vec![span], TagSet::default());
542
543        transform.process_trace(&trace);
544
545        // Flush and verify we got stats
546        let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
547        assert!(!stats.is_empty(), "Expected stats to be produced");
548    }
549
550    #[test]
551    fn test_weight_applied_to_stats() {
552        let now = now_nanos();
553
554        let concentrator = SpanConcentrator::new(true, true, &[], now);
555        let mut transform = ApmStats {
556            concentrator,
557            flush_interval: DEFAULT_FLUSH_INTERVAL,
558            agent_env: MetaString::from("none"),
559            agent_hostname: MetaString::default(),
560            workload_provider: None,
561        };
562
563        // Create a span with 0.5 sample rate (weight = 2.0)
564        let mut metrics = FastHashMap::default();
565        metrics.insert(MetaString::from("_dd.measured"), 1.0);
566        metrics.insert(MetaString::from("_sample_rate"), 0.5);
567
568        let span = Span::new(
569            "test-service",
570            "test-op",
571            "test-resource",
572            "web",
573            1,
574            1,
575            0,
576            now,
577            100000000,
578            0,
579        )
580        .with_metrics(metrics);
581
582        let trace = Trace::new(vec![span], TagSet::default());
583        transform.process_trace(&trace);
584
585        let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
586        assert!(!stats.is_empty());
587
588        // The hits should be weighted (approximately 2 due to 0.5 sample rate)
589        let bucket = &stats[0].stats()[0];
590        let grouped = &bucket.stats()[0];
591        // With stochastic rounding, hits could be 1 or 2, but with weight 2.0 it should round to 2
592        assert!(grouped.hits() >= 1, "Expected weighted hits");
593    }
594
595    #[test]
596    fn test_force_flush() {
597        let now = now_nanos();
598        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
599
600        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
601
602        // Add a span
603        let span = make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None);
604        let trace = Trace::new(vec![span], TagSet::default());
605
606        let payload_key = PayloadAggregationKey {
607            env: MetaString::from("test"),
608            hostname: MetaString::from("host"),
609            ..Default::default()
610        };
611        let infra_tags = InfraTags::default();
612
613        for span in trace.spans() {
614            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
615                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
616            }
617        }
618
619        // ts=0 so that flush always considers buckets not old enough
620        let ts: u64 = 0;
621
622        // Without force flush, should skip the bucket
623        let stats = concentrator.flush(ts, false);
624        assert!(stats.is_empty(), "Non-force flush should return empty");
625
626        // With force flush, should flush buckets regardless of age
627        let stats = concentrator.flush(ts, true);
628        assert!(!stats.is_empty(), "Force flush should return stats");
629        assert_eq!(stats[0].stats().len(), 1, "Should have 1 bucket");
630    }
631
632    #[test]
633    fn test_ignores_partial_spans() {
634        let now = now_nanos();
635        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
636
637        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
638
639        // Create a partial span (has _dd.partial_version metric)
640        let mut metrics = FastHashMap::default();
641        metrics.insert(MetaString::from("_top_level"), 1.0);
642        metrics.insert(MetaString::from(METRIC_PARTIAL_VERSION), 830604.0);
643
644        let span = test_span(aligned_now, 1, 0, 50, 5, "A1", "resource1", 0, None, Some(metrics));
645        let trace = Trace::new(vec![span], TagSet::default());
646
647        let payload_key = PayloadAggregationKey {
648            env: MetaString::from("test"),
649            hostname: MetaString::from("tracer-hostname"),
650            ..Default::default()
651        };
652        let infra_tags = InfraTags::default();
653
654        for span in trace.spans() {
655            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
656                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
657            }
658        }
659
660        // Partial spans should be ignored
661        let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
662        assert!(stats.is_empty(), "Partial spans should be ignored");
663    }
664
665    #[test]
666    fn test_concentrator_stats_totals() {
667        let now = now_nanos();
668        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
669
670        // Set oldestTs to allow old buckets
671        let oldest_ts = aligned_now - 2 * BUCKET_DURATION_NS;
672        let mut concentrator = SpanConcentrator::new(true, true, &[], oldest_ts);
673
674        // Build spans spread over time windows
675        let spans = vec![
676            make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
677            make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
678            make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
679            make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
680            make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
681            make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
682        ];
683
684        let payload_key = PayloadAggregationKey {
685            env: MetaString::from("none"),
686            ..Default::default()
687        };
688        let infra_tags = InfraTags::default();
689
690        for span in &spans {
691            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
692                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
693            }
694        }
695
696        // Flush all and collect totals
697        let all_stats = concentrator.flush(now + BUCKET_DURATION_NS * 10, true);
698
699        let mut total_duration: u64 = 0;
700        let mut total_hits: u64 = 0;
701        let mut total_errors: u64 = 0;
702        let mut total_top_level_hits: u64 = 0;
703
704        for payload in &all_stats {
705            for bucket in payload.stats() {
706                for grouped in bucket.stats() {
707                    total_duration += grouped.duration();
708                    total_hits += grouped.hits();
709                    total_errors += grouped.errors();
710                    total_top_level_hits += grouped.top_level_hits();
711                }
712            }
713        }
714
715        assert_eq!(total_duration, 50 + 40 + 30 + 20 + 10 + 1, "Wrong total duration");
716        assert_eq!(total_hits, 6, "Wrong total hits");
717        assert_eq!(total_top_level_hits, 6, "Wrong total top level hits");
718        assert_eq!(total_errors, 0, "Wrong total errors");
719    }
720
721    #[test]
722    fn test_root_tag() {
723        let now = now_nanos();
724        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
725
726        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
727
728        // Root span (parent_id = 0, top_level)
729        let mut root_metrics = FastHashMap::default();
730        root_metrics.insert(MetaString::from("_top_level"), 1.0);
731        let root_span = test_span(
732            aligned_now,
733            1,
734            0,
735            40,
736            10,
737            "A1",
738            "resource1",
739            0,
740            None,
741            Some(root_metrics),
742        );
743
744        // Non-root but top level span (has _top_level but parent_id != 0)
745        let mut top_level_metrics = FastHashMap::default();
746        top_level_metrics.insert(MetaString::from("_top_level"), 1.0);
747        let top_level_span = test_span(
748            aligned_now,
749            4,
750            1000,
751            10,
752            10,
753            "A1",
754            "resource1",
755            0,
756            None,
757            Some(top_level_metrics),
758        );
759
760        // Client span (non-root, non-top level, but has span.kind = client)
761        let mut client_meta = FastHashMap::default();
762        client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
763        let client_span = test_span(aligned_now, 3, 2, 20, 10, "A1", "resource1", 0, Some(client_meta), None);
764
765        let spans = vec![root_span, top_level_span, client_span];
766
767        let payload_key = PayloadAggregationKey {
768            env: MetaString::from("none"),
769            ..Default::default()
770        };
771        let infra_tags = InfraTags::default();
772
773        for span in &spans {
774            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
775                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
776            }
777        }
778
779        let stats = concentrator.flush(now + BUCKET_DURATION_NS * 20, true);
780        assert!(!stats.is_empty(), "Should have stats");
781
782        // Count grouped stats - should be split by IsTraceRoot
783        let mut total_grouped = 0;
784        let mut root_count = 0;
785        let mut non_root_count = 0;
786
787        for payload in &stats {
788            for bucket in payload.stats() {
789                for grouped in bucket.stats() {
790                    total_grouped += 1;
791                    match grouped.is_trace_root() {
792                        Some(true) => root_count += 1,
793                        Some(false) => non_root_count += 1,
794                        None => {}
795                    }
796                }
797            }
798        }
799
800        // We expect 3 grouped stats:
801        // 1. Root span (is_trace_root = true)
802        // 2. Non-root top-level span (is_trace_root = false)
803        // 3. Client span (is_trace_root = false, span.kind = client)
804        assert_eq!(total_grouped, 3, "Expected 3 grouped stats");
805        assert_eq!(root_count, 1, "Expected 1 root span");
806        assert_eq!(non_root_count, 2, "Expected 2 non-root spans");
807    }
808
809    #[test]
810    fn test_compute_stats_through_span_kind_check() {
811        let now = now_nanos();
812
813        // Test with compute_stats_by_span_kind DISABLED
814        {
815            let mut concentrator = SpanConcentrator::new(false, true, &[], now);
816
817            // Create a simple top-level span using the same pattern as make_test_span (which works)
818            let mut metrics = FastHashMap::default();
819            metrics.insert(MetaString::from("_top_level"), 1.0);
820            let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
821
822            let payload_key = PayloadAggregationKey {
823                env: MetaString::from("test"),
824                ..Default::default()
825            };
826            let infra_tags = InfraTags::default();
827
828            if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
829                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
830            }
831
832            // Client span with span.kind=client but no _top_level or _dd.measured
833            // Should NOT produce stats when compute_stats_by_span_kind is disabled
834            let mut client_meta = FastHashMap::default();
835            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
836            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
837                .with_meta(client_meta);
838
839            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
840                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
841            }
842
843            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
844
845            let mut count = 0;
846            for payload in &stats {
847                for bucket in payload.stats() {
848                    count += bucket.stats().len();
849                }
850            }
851
852            // When disabled, only top_level span gets stats (client span has no top_level/measured)
853            assert_eq!(count, 1, "Expected 1 stat when span kind check disabled");
854        }
855
856        // Test with compute_stats_by_span_kind ENABLED
857        {
858            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
859
860            // Create a simple top-level span
861            let mut metrics = FastHashMap::default();
862            metrics.insert(MetaString::from("_top_level"), 1.0);
863            let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
864
865            let payload_key = PayloadAggregationKey {
866                env: MetaString::from("test"),
867                ..Default::default()
868            };
869            let infra_tags = InfraTags::default();
870
871            if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
872                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
873            }
874
875            // Client span with span.kind=client
876            // SHOULD produce stats when compute_stats_by_span_kind is enabled
877            let mut client_meta = FastHashMap::default();
878            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
879            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
880                .with_meta(client_meta);
881
882            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
883                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
884            }
885
886            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
887
888            let mut count = 0;
889            for payload in &stats {
890                for bucket in payload.stats() {
891                    count += bucket.stats().len();
892                }
893            }
894
895            // When enabled, both spans get stats
896            assert_eq!(count, 2, "Expected 2 stats when span kind check enabled");
897        }
898    }
899
900    #[test]
901    fn test_peer_tags() {
902        let now = now_nanos();
903
904        // Test without peer tags aggregation enabled
905        {
906            let mut concentrator = SpanConcentrator::new(true, false, &[], now);
907
908            // Client span with db tags and _dd.measured
909            let mut client_meta = FastHashMap::default();
910            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
911            client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
912            client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
913            let mut client_metrics = FastHashMap::default();
914            client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
915            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
916                .with_meta(client_meta)
917                .with_metrics(client_metrics);
918
919            let payload_key = PayloadAggregationKey {
920                env: MetaString::from("test"),
921                ..Default::default()
922            };
923            let infra_tags = InfraTags::default();
924
925            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
926                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
927            }
928
929            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
930
931            // Without peer tags aggregation, peer_tags should be empty
932            for payload in &stats {
933                for bucket in payload.stats() {
934                    for grouped in bucket.stats() {
935                        assert!(
936                            grouped.peer_tags().is_empty(),
937                            "Peer tags should be empty when peer_tags_aggregation is false"
938                        );
939                    }
940                }
941            }
942        }
943
944        // Test with peer tags aggregation enabled
945        {
946            // Note: BASE_PEER_TAGS already includes db.instance and db.system
947            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
948
949            // Client span with db tags and _dd.measured
950            let mut client_meta = FastHashMap::default();
951            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
952            client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
953            client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
954            let mut client_metrics = FastHashMap::default();
955            client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
956            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
957                .with_meta(client_meta)
958                .with_metrics(client_metrics);
959
960            let payload_key = PayloadAggregationKey {
961                env: MetaString::from("test"),
962                ..Default::default()
963            };
964            let infra_tags = InfraTags::default();
965
966            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
967                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
968            }
969
970            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
971
972            // With peer tags aggregation, client span should have peer_tags
973            let mut found_client_with_peer_tags = false;
974            for payload in &stats {
975                for bucket in payload.stats() {
976                    for grouped in bucket.stats() {
977                        if grouped.resource() == "SELECT ..." {
978                            assert!(!grouped.peer_tags().is_empty(), "Client span should have peer tags");
979                            // Check that peer tags contain db.instance and db.system
980                            let peer_tags: Vec<&str> = grouped.peer_tags().iter().map(|s| s.as_ref()).collect();
981                            assert!(
982                                peer_tags.iter().any(|t| t.starts_with("db.instance:")),
983                                "Should have db.instance peer tag"
984                            );
985                            assert!(
986                                peer_tags.iter().any(|t| t.starts_with("db.system:")),
987                                "Should have db.system peer tag"
988                            );
989                            found_client_with_peer_tags = true;
990                        }
991                    }
992                }
993            }
994            assert!(
995                found_client_with_peer_tags,
996                "Should have found client span with peer tags"
997            );
998        }
999    }
1000
1001    #[test]
1002    fn test_concentrator_oldest_ts() {
1003        let now = now_nanos();
1004        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
1005
1006        // Test "cold" scenario - all spans in the past should end up in current bucket
1007        {
1008            // Start concentrator at current time (cold start)
1009            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
1010
1011            // Build spans spread over many time windows (all in the past)
1012            let spans = vec![
1013                make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
1014                make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
1015                make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
1016                make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
1017                make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
1018                make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
1019            ];
1020
1021            let payload_key = PayloadAggregationKey {
1022                env: MetaString::from("none"),
1023                ..Default::default()
1024            };
1025            let infra_tags = InfraTags::default();
1026
1027            for span in &spans {
1028                if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
1029                    concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
1030                }
1031            }
1032
1033            // Flush multiple times without force
1034            let mut flush_time = now;
1035            let buffer_len = 2; // DEFAULT_BUFFER_LEN
1036
1037            for _ in 0..buffer_len {
1038                let stats = concentrator.flush(flush_time, false);
1039                assert!(stats.is_empty(), "Should not flush before buffer fills");
1040                flush_time += BUCKET_DURATION_NS;
1041            }
1042
1043            // After buffer_len flushes, should get aggregated stats
1044            let stats = concentrator.flush(flush_time, false);
1045            assert!(!stats.is_empty(), "Should flush after buffer fills");
1046
1047            // All spans should be aggregated into one bucket (oldest bucket aggregates old data)
1048            let mut total_hits: u64 = 0;
1049            let mut total_duration: u64 = 0;
1050            for payload in &stats {
1051                for bucket in payload.stats() {
1052                    for grouped in bucket.stats() {
1053                        total_hits += grouped.hits();
1054                        total_duration += grouped.duration();
1055                    }
1056                }
1057            }
1058
1059            assert_eq!(total_hits, 6, "All 6 spans should be counted");
1060            assert_eq!(
1061                total_duration,
1062                50 + 40 + 30 + 20 + 10 + 1,
1063                "Total duration should match"
1064            );
1065        }
1066    }
1067
1068    #[test]
1069    fn test_compute_stats_for_span_kind() {
1070        use super::span_concentrator::compute_stats_for_span_kind;
1071
1072        // Valid span kinds (case insensitive)
1073        assert!(compute_stats_for_span_kind("server"));
1074        assert!(compute_stats_for_span_kind("consumer"));
1075        assert!(compute_stats_for_span_kind("client"));
1076        assert!(compute_stats_for_span_kind("producer"));
1077
1078        // Uppercase
1079        assert!(compute_stats_for_span_kind("SERVER"));
1080        assert!(compute_stats_for_span_kind("CONSUMER"));
1081        assert!(compute_stats_for_span_kind("CLIENT"));
1082        assert!(compute_stats_for_span_kind("PRODUCER"));
1083
1084        // Mixed case
1085        assert!(compute_stats_for_span_kind("SErVER"));
1086        assert!(compute_stats_for_span_kind("COnSUMER"));
1087        assert!(compute_stats_for_span_kind("CLiENT"));
1088        assert!(compute_stats_for_span_kind("PRoDUCER"));
1089
1090        // Invalid span kinds
1091        assert!(!compute_stats_for_span_kind("internal"));
1092        assert!(!compute_stats_for_span_kind("INTERNAL"));
1093        assert!(!compute_stats_for_span_kind("INtERNAL"));
1094        assert!(!compute_stats_for_span_kind(""));
1095    }
1096
1097    #[test]
1098    fn test_extract_process_tags() {
1099        // Test with no process tags
1100        {
1101            let span = Span::default();
1102            let trace = Trace::new(vec![span], TagSet::default());
1103            let process_tags = extract_process_tags(&trace);
1104            assert!(process_tags.is_empty(), "Should be empty when no _dd.tags.process");
1105        }
1106
1107        // Test with process tags in first span meta
1108        {
1109            let mut meta = FastHashMap::default();
1110            meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from("a:1,b:2,c:3"));
1111            let span = Span::default().with_meta(meta);
1112            let trace = Trace::new(vec![span], TagSet::default());
1113            let process_tags = extract_process_tags(&trace);
1114            assert_eq!(process_tags, "a:1,b:2,c:3");
1115        }
1116
1117        // Test with empty process tags
1118        {
1119            let mut meta = FastHashMap::default();
1120            meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from(""));
1121            let span = Span::default().with_meta(meta);
1122            let trace = Trace::new(vec![span], TagSet::default());
1123            let process_tags = extract_process_tags(&trace);
1124            assert!(
1125                process_tags.is_empty(),
1126                "Should be empty when _dd.tags.process is empty string"
1127            );
1128        }
1129
1130        // Test with empty trace
1131        {
1132            let trace = Trace::new(vec![], TagSet::default());
1133            let process_tags = extract_process_tags(&trace);
1134            assert!(process_tags.is_empty(), "Should be empty when trace has no spans");
1135        }
1136    }
1137
1138    #[test]
1139    fn test_process_tags_hash_computation() {
1140        use super::aggregation::process_tags_hash;
1141
1142        // Empty string should return 0
1143        assert_eq!(process_tags_hash(""), 0);
1144
1145        // Same tags should produce same hash
1146        let hash1 = process_tags_hash("a:1,b:2,c:3");
1147        let hash2 = process_tags_hash("a:1,b:2,c:3");
1148        assert_eq!(hash1, hash2);
1149
1150        // Different tags should produce different hash
1151        let hash3 = process_tags_hash("a:1,b:2");
1152        assert_ne!(hash1, hash3);
1153    }
1154
1155    // Helper to create a ClientGroupedStats for testing
1156    fn make_grouped_stats(service: &str, resource: &str) -> ClientGroupedStats {
1157        ClientGroupedStats::new(service, "operation", resource)
1158            .with_hits(1)
1159            .with_duration(100)
1160    }
1161
1162    // Helper to create a ClientStatsBucket with N stats
1163    fn make_bucket_with_stats(n: usize) -> ClientStatsBucket {
1164        let stats: Vec<ClientGroupedStats> = (0..n)
1165            .map(|i| make_grouped_stats("service", &format!("resource-{}", i)))
1166            .collect();
1167        ClientStatsBucket::new(1000, 10_000_000_000, stats)
1168    }
1169
1170    // Helper to create a ClientStatsPayload with specified buckets
1171    fn make_payload_with_buckets(hostname: &str, buckets: Vec<ClientStatsBucket>) -> ClientStatsPayload {
1172        ClientStatsPayload::new(hostname, "test-env", "1.0.0")
1173            .with_stats(buckets)
1174            .with_container_id("container-123")
1175            .with_lang("rust")
1176    }
1177
1178    // Helper to count total ClientGroupedStats across all payloads in a TraceStats
1179    fn count_grouped_stats(trace_stats: &TraceStats) -> usize {
1180        trace_stats
1181            .stats()
1182            .iter()
1183            .flat_map(|p| p.stats())
1184            .map(|b| b.stats().len())
1185            .sum()
1186    }
1187
1188    #[test]
1189    fn test_split_into_trace_stats_empty_input() {
1190        let result = split_into_trace_stats(vec![], 100);
1191        assert!(result.is_empty());
1192    }
1193
1194    #[test]
1195    fn test_split_into_trace_stats_no_split_needed() {
1196        // 50 stats with max 100 - no split needed
1197        let bucket = make_bucket_with_stats(50);
1198        let payload = make_payload_with_buckets("host1", vec![bucket]);
1199
1200        let result = split_into_trace_stats(vec![payload], 100);
1201
1202        assert_eq!(result.len(), 1);
1203        assert_eq!(count_grouped_stats(&result[0]), 50);
1204    }
1205
1206    #[test]
1207    fn test_split_into_trace_stats_exact_threshold() {
1208        // Exactly 100 stats with max 100 - no split needed
1209        let bucket = make_bucket_with_stats(100);
1210        let payload = make_payload_with_buckets("host1", vec![bucket]);
1211
1212        let result = split_into_trace_stats(vec![payload], 100);
1213
1214        assert_eq!(result.len(), 1);
1215        assert_eq!(count_grouped_stats(&result[0]), 100);
1216    }
1217
1218    #[test]
1219    fn test_split_into_trace_stats_splits_single_bucket() {
1220        // 250 stats in one bucket with max 100 - should split into 3 TraceStats
1221        let bucket = make_bucket_with_stats(250);
1222        let payload = make_payload_with_buckets("host1", vec![bucket]);
1223
1224        let result = split_into_trace_stats(vec![payload], 100);
1225
1226        assert_eq!(result.len(), 3);
1227        assert_eq!(count_grouped_stats(&result[0]), 100);
1228        assert_eq!(count_grouped_stats(&result[1]), 100);
1229        assert_eq!(count_grouped_stats(&result[2]), 50);
1230
1231        // Total should be preserved
1232        let total: usize = result.iter().map(count_grouped_stats).sum();
1233        assert_eq!(total, 250);
1234    }
1235
1236    #[test]
1237    fn test_split_into_trace_stats_splits_across_payloads() {
1238        // Two payloads with 60 stats each, max 100 - should combine into 2 TraceStats
1239        let payload1 = make_payload_with_buckets("host1", vec![make_bucket_with_stats(60)]);
1240        let payload2 = make_payload_with_buckets("host2", vec![make_bucket_with_stats(60)]);
1241
1242        let result = split_into_trace_stats(vec![payload1, payload2], 100);
1243
1244        assert_eq!(result.len(), 2);
1245        // First TraceStats: 60 from payload1 + 40 from payload2 = 100
1246        assert_eq!(count_grouped_stats(&result[0]), 100);
1247        // Second TraceStats: remaining 20 from payload2
1248        assert_eq!(count_grouped_stats(&result[1]), 20);
1249    }
1250
1251    #[test]
1252    fn test_split_into_trace_stats_splits_single_payload_multiple_buckets() {
1253        // One payload with two buckets (70 + 80 = 150 stats), max 100
1254        let bucket1 = make_bucket_with_stats(70);
1255        let bucket2 = make_bucket_with_stats(80);
1256        let payload = make_payload_with_buckets("host1", vec![bucket1, bucket2]);
1257
1258        let result = split_into_trace_stats(vec![payload], 100);
1259
1260        assert_eq!(result.len(), 2);
1261        let total: usize = result.iter().map(count_grouped_stats).sum();
1262        assert_eq!(total, 150);
1263    }
1264
1265    #[test]
1266    fn test_split_into_trace_stats_preserves_metadata() {
1267        let bucket = make_bucket_with_stats(250);
1268        let payload = ClientStatsPayload::new("test-host", "prod", "2.0.0")
1269            .with_stats(vec![bucket])
1270            .with_container_id("container-abc")
1271            .with_lang("go")
1272            .with_git_commit_sha("abc123")
1273            .with_image_tag("v1.2.3")
1274            .with_process_tags_hash(12345)
1275            .with_process_tags("tag1,tag2");
1276
1277        let result = split_into_trace_stats(vec![payload], 100);
1278
1279        // All split payloads should have the same metadata
1280        for trace_stats in &result {
1281            for p in trace_stats.stats() {
1282                assert_eq!(p.hostname(), "test-host");
1283                assert_eq!(p.env(), "prod");
1284                assert_eq!(p.version(), "2.0.0");
1285                assert_eq!(p.container_id(), "container-abc");
1286                assert_eq!(p.lang(), "go");
1287                assert_eq!(p.git_commit_sha(), "abc123");
1288                assert_eq!(p.image_tag(), "v1.2.3");
1289                assert_eq!(p.process_tags_hash(), 12345);
1290                assert_eq!(p.process_tags(), "tag1,tag2");
1291            }
1292        }
1293    }
1294
1295    #[test]
1296    fn test_split_into_trace_stats_preserves_bucket_metadata() {
1297        // Create a bucket with specific start/duration/time_shift
1298        let stats: Vec<ClientGroupedStats> = (0..150)
1299            .map(|i| make_grouped_stats("svc", &format!("res-{}", i)))
1300            .collect();
1301        let bucket = ClientStatsBucket::new(999_000_000, 10_000_000_000, stats).with_agent_time_shift(42);
1302        let payload = make_payload_with_buckets("host1", vec![bucket]);
1303
1304        let result = split_into_trace_stats(vec![payload], 100);
1305
1306        // All split buckets should have the same start/duration/time_shift
1307        for trace_stats in &result {
1308            for p in trace_stats.stats() {
1309                for b in p.stats() {
1310                    assert_eq!(b.start(), 999_000_000);
1311                    assert_eq!(b.duration(), 10_000_000_000);
1312                    assert_eq!(b.agent_time_shift(), 42);
1313                }
1314            }
1315        }
1316    }
1317
1318    #[test]
1319    fn test_split_into_trace_stats_handles_empty_bucket() {
1320        let empty_bucket = ClientStatsBucket::new(1000, 10_000_000_000, vec![]);
1321        let payload = make_payload_with_buckets("host1", vec![empty_bucket]);
1322
1323        let result = split_into_trace_stats(vec![payload], 100);
1324
1325        assert_eq!(result.len(), 1);
1326        assert_eq!(count_grouped_stats(&result[0]), 0);
1327    }
1328
1329    #[test]
1330    fn test_split_into_trace_stats_large_split() {
1331        // 10,000 stats with max 4000 - should produce 3 TraceStats
1332        let bucket = make_bucket_with_stats(10_000);
1333        let payload = make_payload_with_buckets("host1", vec![bucket]);
1334
1335        let result = split_into_trace_stats(vec![payload], 4000);
1336
1337        assert_eq!(result.len(), 3);
1338        assert_eq!(count_grouped_stats(&result[0]), 4000);
1339        assert_eq!(count_grouped_stats(&result[1]), 4000);
1340        assert_eq!(count_grouped_stats(&result[2]), 2000);
1341    }
1342
1343    // Property test strategies
1344
1345    /// Strategy to generate arbitrary ClientGroupedStats.
1346    fn arb_grouped_stats() -> impl Strategy<Value = ClientGroupedStats> {
1347        (0..100u64, 0..1000u64).prop_map(|(hits, duration)| {
1348            ClientGroupedStats::new("service", "operation", "resource")
1349                .with_hits(hits)
1350                .with_duration(duration)
1351        })
1352    }
1353
1354    /// Strategy to generate a bucket with 0..=max_stats_per_bucket grouped stats.
1355    fn arb_bucket(max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsBucket> {
1356        proptest::collection::vec(arb_grouped_stats(), 0..=max_stats_per_bucket)
1357            .prop_map(|stats| ClientStatsBucket::new(1000, 10_000_000_000, stats))
1358    }
1359
1360    /// Strategy to generate a payload with 1..=max_buckets buckets.
1361    fn arb_payload(max_buckets: usize, max_stats_per_bucket: usize) -> impl Strategy<Value = ClientStatsPayload> {
1362        proptest::collection::vec(arb_bucket(max_stats_per_bucket), 1..=max_buckets)
1363            .prop_map(|buckets| ClientStatsPayload::new("host", "env", "1.0.0").with_stats(buckets))
1364    }
1365
1366    /// Strategy to generate test inputs for the split function.
1367    ///
1368    /// Parameters:
1369    /// - num_payloads: 1..=10
1370    /// - num_buckets_per_payload: 1..=5
1371    /// - num_stats_per_bucket: 0..=500
1372    /// - max_entries_per_event: 1..=1000 (never zero)
1373    fn arb_split_inputs() -> impl Strategy<Value = (Vec<ClientStatsPayload>, usize)> {
1374        let payloads_strategy = proptest::collection::vec(arb_payload(5, 500), 1..=10);
1375        let max_entries_strategy = 1..=1000usize;
1376
1377        (payloads_strategy, max_entries_strategy)
1378    }
1379
1380    #[test_strategy::proptest]
1381    #[cfg_attr(miri, ignore)]
1382    fn property_test_split_respects_max_entries(
1383        #[strategy(arb_split_inputs())] inputs: (Vec<ClientStatsPayload>, usize),
1384    ) {
1385        let (payloads, max_entries_per_event) = inputs;
1386
1387        let input_total: usize = payloads.iter().flat_map(|p| p.stats()).map(|b| b.stats().len()).sum();
1388
1389        let result = split_into_trace_stats(payloads, max_entries_per_event);
1390
1391        // Property 1: No TraceStats should exceed max_entries_per_event
1392        for trace_stats in &result {
1393            let count = count_grouped_stats(trace_stats);
1394            prop_assert!(
1395                count <= max_entries_per_event,
1396                "TraceStats has {} grouped stats, exceeds max of {}",
1397                count,
1398                max_entries_per_event
1399            );
1400        }
1401
1402        // Property 2: Total stats should be preserved
1403        let output_total: usize = result.iter().map(count_grouped_stats).sum();
1404        prop_assert_eq!(input_total, output_total, "Total stats count should be preserved");
1405    }
1406}