saluki_components/transforms/apm_stats/
mod.rs

1//! APM Stats transform.
2//!
3//! Aggregates traces into time-bucketed statistics, producing `TraceStats` events.
4
5use std::{
6    sync::Arc,
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use async_trait::async_trait;
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{CONTAINER_ID, K8S_POD_UID};
13use saluki_config::GenericConfiguration;
14use saluki_context::{origin::OriginTagCardinality, tags::TagSet};
15use saluki_core::{
16    components::{transforms::*, ComponentContext},
17    data_model::event::{trace::Trace, trace_stats::TraceStats, Event, EventType},
18    topology::{EventsBuffer, OutputDefinition},
19};
20use saluki_env::{
21    host::providers::BoxedHostProvider, workload::EntityId, EnvironmentProvider, HostProvider, WorkloadProvider,
22};
23use saluki_error::GenericError;
24use stringtheory::MetaString;
25use tokio::{select, time::interval};
26use tracing::{debug, error};
27
28use crate::common::datadog::apm::ApmConfig;
29use crate::common::otlp::util::{extract_container_tags_from_resource_tagset, KEY_DATADOG_CONTAINER_ID};
30
31mod aggregation;
32
33use self::aggregation::process_tags_hash;
34mod span_concentrator;
35mod statsraw;
36mod weight;
37
38use self::aggregation::PayloadAggregationKey;
39use self::span_concentrator::{InfraTags, SpanConcentrator};
40use self::weight::weight;
41
42/// Default flush interval for the APM stats transform.
43const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
44
45/// Tag key for process tags in span meta.
46const TAG_PROCESS_TAGS: &str = "_dd.tags.process";
47
48/// APM Stats transform configuration.
49///
50/// Aggregates incoming `Trace` events into time-bucketed statistics, emitting
51/// `TraceStats` events.
52pub struct ApmStatsTransformConfiguration {
53    apm_config: ApmConfig,
54    default_hostname: Option<String>,
55    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
56}
57
58impl ApmStatsTransformConfiguration {
59    /// Creates a new `ApmStatsTransformConfiguration` from the given configuration.
60    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
61        let apm_config = ApmConfig::from_configuration(config)?;
62        Ok(Self {
63            apm_config,
64            default_hostname: None,
65            workload_provider: None,
66        })
67    }
68
69    /// Sets the default hostname using the environment provider.
70    pub async fn with_environment_provider<E>(mut self, env_provider: E) -> Result<Self, GenericError>
71    where
72        E: EnvironmentProvider<Host = BoxedHostProvider>,
73    {
74        let hostname = env_provider.host().get_hostname().await?;
75        self.default_hostname = Some(hostname);
76        Ok(self)
77    }
78
79    /// Sets the workload provider.
80    ///
81    /// Defaults to unset.
82    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
83    where
84        W: WorkloadProvider + Send + Sync + 'static,
85    {
86        self.workload_provider = Some(Arc::new(workload_provider));
87        self
88    }
89}
90
91#[async_trait]
92impl TransformBuilder for ApmStatsTransformConfiguration {
93    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
94        let mut apm_config = self.apm_config.clone();
95
96        if let Some(hostname) = &self.default_hostname {
97            apm_config.set_hostname_if_empty(hostname.as_str());
98        }
99
100        let concentrator = SpanConcentrator::new(
101            apm_config.compute_stats_by_span_kind(),
102            apm_config.peer_tags_aggregation(),
103            apm_config.peer_tags(),
104            now_nanos(),
105        );
106
107        Ok(Box::new(ApmStats {
108            concentrator,
109            flush_interval: DEFAULT_FLUSH_INTERVAL,
110            agent_env: apm_config.default_env().clone(),
111            agent_hostname: apm_config.hostname().clone(),
112            workload_provider: self.workload_provider.clone(),
113        }))
114    }
115
116    fn input_event_type(&self) -> EventType {
117        EventType::Trace
118    }
119
120    fn outputs(&self) -> &[OutputDefinition<EventType>] {
121        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::TraceStats)];
122        OUTPUTS
123    }
124}
125
126impl MemoryBounds for ApmStatsTransformConfiguration {
127    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
128        builder.minimum().with_single_value::<ApmStats>("component struct");
129        // TODO: Think about everything we need to account for here.
130    }
131}
132
133struct ApmStats {
134    concentrator: SpanConcentrator,
135    flush_interval: Duration,
136    agent_env: MetaString,
137    agent_hostname: MetaString,
138    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
139}
140
141impl ApmStats {
142    fn process_trace(&mut self, trace: &Trace) {
143        let root_span = trace
144            .spans()
145            .iter()
146            .find(|s| s.parent_id() == 0)
147            .or_else(|| trace.spans().first());
148
149        let trace_weight = root_span.map(weight).unwrap_or(1.0);
150
151        let process_tags = extract_process_tags(trace);
152
153        let payload_key = self.build_payload_key(trace, &process_tags);
154        let infra_tags = self.build_infra_tags(trace, &process_tags);
155
156        let origin = trace
157            .spans()
158            .first()
159            .and_then(|s| s.meta().get("_dd.origin"))
160            .map(|s| s.as_ref())
161            .unwrap_or("");
162
163        for span in trace.spans() {
164            if let Some(stat_span) = self.concentrator.new_stat_span_from_span(span) {
165                self.concentrator
166                    .add_span(&stat_span, trace_weight, &payload_key, &infra_tags, origin);
167            }
168        }
169    }
170
171    fn build_infra_tags(&self, trace: &Trace, process_tags: &str) -> InfraTags {
172        let resource_tags = trace.resource_tags();
173        let container_id = resolve_container_id(resource_tags);
174        let mut container_tags = if container_id.is_empty() {
175            vec![]
176        } else {
177            extract_container_tags(resource_tags)
178        };
179
180        // Query the workload provider for additional container tags.
181        if !container_id.is_empty() {
182            if let Some(workload_provider) = &self.workload_provider {
183                let entity_id = EntityId::Container(container_id.clone());
184                if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
185                    container_tags.extend((&tags).into_iter().map(|tag| MetaString::from(tag.as_str())));
186                }
187            }
188        }
189
190        container_tags.sort();
191
192        InfraTags::new(container_id, container_tags, process_tags)
193    }
194
195    fn build_payload_key(&self, trace: &Trace, process_tags: &str) -> PayloadAggregationKey {
196        let root_span = trace
197            .spans()
198            .iter()
199            .find(|s| s.parent_id() == 0)
200            .or_else(|| trace.spans().first());
201
202        let span_env = root_span.and_then(|s| s.meta().get("env")).filter(|s| !s.is_empty());
203        let env = span_env.cloned().unwrap_or_else(|| self.agent_env.clone());
204
205        let hostname = root_span
206            .and_then(|s| s.meta().get("_dd.hostname"))
207            .filter(|s| !s.is_empty())
208            .cloned()
209            .unwrap_or_else(|| self.agent_hostname.clone());
210
211        let version = root_span
212            .and_then(|s| s.meta().get("version"))
213            .cloned()
214            .unwrap_or_default();
215
216        let container_id = root_span
217            .and_then(|s| s.meta().get("_dd.container_id"))
218            .cloned()
219            .unwrap_or_default();
220
221        let git_commit_sha = root_span
222            .and_then(|s| s.meta().get("_dd.git.commit.sha"))
223            .cloned()
224            .unwrap_or_default();
225
226        let image_tag = root_span
227            .and_then(|s| s.meta().get("_dd.image_tag"))
228            .cloned()
229            .unwrap_or_default();
230
231        let lang = root_span
232            .and_then(|s| s.meta().get("language"))
233            .cloned()
234            .unwrap_or_default();
235
236        PayloadAggregationKey {
237            env,
238            hostname,
239            version,
240            container_id,
241            git_commit_sha,
242            image_tag,
243            lang,
244            process_tags_hash: process_tags_hash(process_tags),
245        }
246    }
247}
248
249#[async_trait]
250impl Transform for ApmStats {
251    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
252        let mut health = context.take_health_handle();
253
254        let mut flush_ticker = interval(self.flush_interval);
255        flush_ticker.tick().await;
256
257        let mut final_flush = false;
258
259        health.mark_ready();
260        debug!("APM Stats transform started.");
261
262        loop {
263            select! {
264                _ = health.live() => continue,
265
266                _ = flush_ticker.tick() => {
267                    let stats_payloads = self.concentrator.flush(now_nanos(), final_flush);
268
269                    if !stats_payloads.is_empty() {
270                        let trace_stats = TraceStats::new(stats_payloads);
271                        debug!(buckets = trace_stats.stats().len(), "Flushing APM stats.");
272
273                        let mut event_buffer = EventsBuffer::default();
274                        if event_buffer.try_push(Event::TraceStats(trace_stats)).is_some() {
275                            error!("Failed to push TraceStats event to buffer.");
276                        } else if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
277                            error!(error = %e, "Failed to dispatch TraceStats event.");
278                        }
279                    }
280
281                    if final_flush {
282                        debug!("Final APM stats flush complete.");
283                        break;
284                    }
285                },
286
287                maybe_events = context.events().next(), if !final_flush => {
288                    match maybe_events {
289                        Some(events) => {
290                            for event in events {
291                                if let Event::Trace(trace) = event {
292                                    self.process_trace(&trace);
293                                }
294                            }
295                        },
296                        None => {
297                            // We've reached the end of our input stream, so mark ourselves for a final flush and reset the
298                            // interval so it ticks immediately on the next loop iteration.
299                            final_flush = true;
300                            flush_ticker.reset_immediately();
301                            debug!("APM Stats transform stopping, triggering final flush...");
302                        }
303                    }
304                },
305            }
306        }
307
308        debug!("APM Stats transform stopped.");
309        Ok(())
310    }
311}
312
313/// Returns the current time as nanoseconds since Unix epoch.
314fn now_nanos() -> u64 {
315    SystemTime::now()
316        .duration_since(UNIX_EPOCH)
317        .unwrap_or_default()
318        .as_nanos() as u64
319}
320
321/// Resolves container ID from OTLP resource tags.
322fn resolve_container_id(resource_tags: &TagSet) -> MetaString {
323    for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
324        if let Some(tag) = resource_tags.get_single_tag(key) {
325            if let Some(value) = tag.value() {
326                if !value.is_empty() {
327                    return MetaString::from(value);
328                }
329            }
330        }
331    }
332    MetaString::default()
333}
334
335/// Extracts container tags from OTLP resource tags.
336fn extract_container_tags(resource_tags: &TagSet) -> Vec<MetaString> {
337    let mut container_tags_set = TagSet::default();
338    extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags_set);
339
340    container_tags_set
341        .into_iter()
342        .map(|tag| MetaString::from(tag.as_str()))
343        .collect()
344}
345
346/// Extracts process tags from trace.
347fn extract_process_tags(trace: &Trace) -> String {
348    if let Some(first_span) = trace.spans().first() {
349        if let Some(process_tags) = first_span.meta().get(TAG_PROCESS_TAGS) {
350            let tags = process_tags.as_ref();
351            if !tags.is_empty() {
352                return tags.to_string();
353            }
354        }
355    }
356
357    String::new()
358}
359
360#[cfg(test)]
361mod tests {
362    use saluki_common::collections::FastHashMap;
363    use saluki_context::tags::TagSet;
364    use saluki_core::data_model::event::trace::Span;
365
366    use super::aggregation::BUCKET_DURATION_NS;
367    use super::span_concentrator::METRIC_PARTIAL_VERSION;
368    use super::*;
369
370    /// Helper to align timestamp to bucket boundary
371    fn align_ts(ts: u64, bsize: u64) -> u64 {
372        ts - ts % bsize
373    }
374
375    /// Creates a test span with the given parameters.
376    #[allow(clippy::too_many_arguments)]
377    fn test_span(
378        aligned_now: u64, span_id: u64, parent_id: u64, duration: u64, bucket_offset: u64, service: &str,
379        resource: &str, error: i32, meta: Option<FastHashMap<MetaString, MetaString>>,
380        metrics: Option<FastHashMap<MetaString, f64>>,
381    ) -> Span {
382        // Calculate start time so that span ends in the correct bucket
383        // End time = start + duration, and we want end time to be in bucket (aligned_now - offset * bsize)
384        // Use BUCKET_DURATION_NS as the bucket size (matches the concentrator)
385        let bucket_start = aligned_now - bucket_offset * BUCKET_DURATION_NS;
386        let start = bucket_start - duration;
387
388        Span::new(
389            service, "query", resource, "db", 1, span_id, parent_id, start, duration, error,
390        )
391        .with_meta(meta)
392        .with_metrics(metrics)
393    }
394
395    /// Creates a simple measured span for basic tests
396    fn make_test_span(service: &str, name: &str, resource: &str) -> Span {
397        let mut metrics = FastHashMap::default();
398        metrics.insert(MetaString::from("_dd.measured"), 1.0);
399
400        Span::new(service, name, resource, "web", 1, 1, 0, 1000000000, 100000000, 0).with_metrics(metrics)
401    }
402
403    /// Creates a top-level span (parent_id = 0, has _top_level metric)
404    fn make_top_level_span(
405        aligned_now: u64, span_id: u64, duration: u64, bucket_offset: u64, service: &str, resource: &str, error: i32,
406        meta: Option<FastHashMap<MetaString, MetaString>>,
407    ) -> Span {
408        let mut metrics = FastHashMap::default();
409        metrics.insert(MetaString::from("_top_level"), 1.0);
410        test_span(
411            aligned_now,
412            span_id,
413            0,
414            duration,
415            bucket_offset,
416            service,
417            resource,
418            error,
419            meta,
420            Some(metrics),
421        )
422    }
423
424    #[test]
425    fn test_process_trace_creates_stats() {
426        let now = now_nanos();
427
428        let concentrator = SpanConcentrator::new(true, true, &[], now);
429        let mut transform = ApmStats {
430            concentrator,
431            flush_interval: DEFAULT_FLUSH_INTERVAL,
432            agent_env: MetaString::from("none"),
433            agent_hostname: MetaString::default(),
434            workload_provider: None,
435        };
436
437        let span = make_test_span("test-service", "test-operation", "test-resource");
438        let trace = Trace::new(vec![span], TagSet::default());
439
440        transform.process_trace(&trace);
441
442        // Flush and verify we got stats
443        let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
444        assert!(!stats.is_empty(), "Expected stats to be produced");
445    }
446
447    #[test]
448    fn test_weight_applied_to_stats() {
449        let now = now_nanos();
450
451        let concentrator = SpanConcentrator::new(true, true, &[], now);
452        let mut transform = ApmStats {
453            concentrator,
454            flush_interval: DEFAULT_FLUSH_INTERVAL,
455            agent_env: MetaString::from("none"),
456            agent_hostname: MetaString::default(),
457            workload_provider: None,
458        };
459
460        // Create a span with 0.5 sample rate (weight = 2.0)
461        let mut metrics = FastHashMap::default();
462        metrics.insert(MetaString::from("_dd.measured"), 1.0);
463        metrics.insert(MetaString::from("_sample_rate"), 0.5);
464
465        let span = Span::new(
466            "test-service",
467            "test-op",
468            "test-resource",
469            "web",
470            1,
471            1,
472            0,
473            now,
474            100000000,
475            0,
476        )
477        .with_metrics(metrics);
478
479        let trace = Trace::new(vec![span], TagSet::default());
480        transform.process_trace(&trace);
481
482        let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
483        assert!(!stats.is_empty());
484
485        // The hits should be weighted (approximately 2 due to 0.5 sample rate)
486        let bucket = &stats[0].stats()[0];
487        let grouped = &bucket.stats()[0];
488        // With stochastic rounding, hits could be 1 or 2, but with weight 2.0 it should round to 2
489        assert!(grouped.hits() >= 1, "Expected weighted hits");
490    }
491
492    #[test]
493    fn test_force_flush() {
494        let now = now_nanos();
495        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
496
497        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
498
499        // Add a span
500        let span = make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None);
501        let trace = Trace::new(vec![span], TagSet::default());
502
503        let payload_key = PayloadAggregationKey {
504            env: MetaString::from("test"),
505            hostname: MetaString::from("host"),
506            ..Default::default()
507        };
508        let infra_tags = InfraTags::default();
509
510        for span in trace.spans() {
511            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
512                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
513            }
514        }
515
516        // ts=0 so that flush always considers buckets not old enough
517        let ts: u64 = 0;
518
519        // Without force flush, should skip the bucket
520        let stats = concentrator.flush(ts, false);
521        assert!(stats.is_empty(), "Non-force flush should return empty");
522
523        // With force flush, should flush buckets regardless of age
524        let stats = concentrator.flush(ts, true);
525        assert!(!stats.is_empty(), "Force flush should return stats");
526        assert_eq!(stats[0].stats().len(), 1, "Should have 1 bucket");
527    }
528
529    #[test]
530    fn test_ignores_partial_spans() {
531        let now = now_nanos();
532        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
533
534        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
535
536        // Create a partial span (has _dd.partial_version metric)
537        let mut metrics = FastHashMap::default();
538        metrics.insert(MetaString::from("_top_level"), 1.0);
539        metrics.insert(MetaString::from(METRIC_PARTIAL_VERSION), 830604.0);
540
541        let span = test_span(aligned_now, 1, 0, 50, 5, "A1", "resource1", 0, None, Some(metrics));
542        let trace = Trace::new(vec![span], TagSet::default());
543
544        let payload_key = PayloadAggregationKey {
545            env: MetaString::from("test"),
546            hostname: MetaString::from("tracer-hostname"),
547            ..Default::default()
548        };
549        let infra_tags = InfraTags::default();
550
551        for span in trace.spans() {
552            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
553                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
554            }
555        }
556
557        // Partial spans should be ignored
558        let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
559        assert!(stats.is_empty(), "Partial spans should be ignored");
560    }
561
562    #[test]
563    fn test_concentrator_stats_totals() {
564        let now = now_nanos();
565        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
566
567        // Set oldestTs to allow old buckets
568        let oldest_ts = aligned_now - 2 * BUCKET_DURATION_NS;
569        let mut concentrator = SpanConcentrator::new(true, true, &[], oldest_ts);
570
571        // Build spans spread over time windows
572        let spans = vec![
573            make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
574            make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
575            make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
576            make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
577            make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
578            make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
579        ];
580
581        let payload_key = PayloadAggregationKey {
582            env: MetaString::from("none"),
583            ..Default::default()
584        };
585        let infra_tags = InfraTags::default();
586
587        for span in &spans {
588            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
589                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
590            }
591        }
592
593        // Flush all and collect totals
594        let all_stats = concentrator.flush(now + BUCKET_DURATION_NS * 10, true);
595
596        let mut total_duration: u64 = 0;
597        let mut total_hits: u64 = 0;
598        let mut total_errors: u64 = 0;
599        let mut total_top_level_hits: u64 = 0;
600
601        for payload in &all_stats {
602            for bucket in payload.stats() {
603                for grouped in bucket.stats() {
604                    total_duration += grouped.duration();
605                    total_hits += grouped.hits();
606                    total_errors += grouped.errors();
607                    total_top_level_hits += grouped.top_level_hits();
608                }
609            }
610        }
611
612        assert_eq!(total_duration, 50 + 40 + 30 + 20 + 10 + 1, "Wrong total duration");
613        assert_eq!(total_hits, 6, "Wrong total hits");
614        assert_eq!(total_top_level_hits, 6, "Wrong total top level hits");
615        assert_eq!(total_errors, 0, "Wrong total errors");
616    }
617
618    #[test]
619    fn test_root_tag() {
620        let now = now_nanos();
621        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
622
623        let mut concentrator = SpanConcentrator::new(true, true, &[], now);
624
625        // Root span (parent_id = 0, top_level)
626        let mut root_metrics = FastHashMap::default();
627        root_metrics.insert(MetaString::from("_top_level"), 1.0);
628        let root_span = test_span(
629            aligned_now,
630            1,
631            0,
632            40,
633            10,
634            "A1",
635            "resource1",
636            0,
637            None,
638            Some(root_metrics),
639        );
640
641        // Non-root but top level span (has _top_level but parent_id != 0)
642        let mut top_level_metrics = FastHashMap::default();
643        top_level_metrics.insert(MetaString::from("_top_level"), 1.0);
644        let top_level_span = test_span(
645            aligned_now,
646            4,
647            1000,
648            10,
649            10,
650            "A1",
651            "resource1",
652            0,
653            None,
654            Some(top_level_metrics),
655        );
656
657        // Client span (non-root, non-top level, but has span.kind = client)
658        let mut client_meta = FastHashMap::default();
659        client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
660        let client_span = test_span(aligned_now, 3, 2, 20, 10, "A1", "resource1", 0, Some(client_meta), None);
661
662        let spans = vec![root_span, top_level_span, client_span];
663
664        let payload_key = PayloadAggregationKey {
665            env: MetaString::from("none"),
666            ..Default::default()
667        };
668        let infra_tags = InfraTags::default();
669
670        for span in &spans {
671            if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
672                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
673            }
674        }
675
676        let stats = concentrator.flush(now + BUCKET_DURATION_NS * 20, true);
677        assert!(!stats.is_empty(), "Should have stats");
678
679        // Count grouped stats - should be split by IsTraceRoot
680        let mut total_grouped = 0;
681        let mut root_count = 0;
682        let mut non_root_count = 0;
683
684        for payload in &stats {
685            for bucket in payload.stats() {
686                for grouped in bucket.stats() {
687                    total_grouped += 1;
688                    match grouped.is_trace_root() {
689                        Some(true) => root_count += 1,
690                        Some(false) => non_root_count += 1,
691                        None => {}
692                    }
693                }
694            }
695        }
696
697        // We expect 3 grouped stats:
698        // 1. Root span (is_trace_root = true)
699        // 2. Non-root top-level span (is_trace_root = false)
700        // 3. Client span (is_trace_root = false, span.kind = client)
701        assert_eq!(total_grouped, 3, "Expected 3 grouped stats");
702        assert_eq!(root_count, 1, "Expected 1 root span");
703        assert_eq!(non_root_count, 2, "Expected 2 non-root spans");
704    }
705
706    #[test]
707    fn test_compute_stats_through_span_kind_check() {
708        let now = now_nanos();
709
710        // Test with compute_stats_by_span_kind DISABLED
711        {
712            let mut concentrator = SpanConcentrator::new(false, true, &[], now);
713
714            // Create a simple top-level span using the same pattern as make_test_span (which works)
715            let mut metrics = FastHashMap::default();
716            metrics.insert(MetaString::from("_top_level"), 1.0);
717            let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
718
719            let payload_key = PayloadAggregationKey {
720                env: MetaString::from("test"),
721                ..Default::default()
722            };
723            let infra_tags = InfraTags::default();
724
725            if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
726                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
727            }
728
729            // Client span with span.kind=client but no _top_level or _dd.measured
730            // Should NOT produce stats when compute_stats_by_span_kind is disabled
731            let mut client_meta = FastHashMap::default();
732            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
733            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
734                .with_meta(client_meta);
735
736            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
737                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
738            }
739
740            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
741
742            let mut count = 0;
743            for payload in &stats {
744                for bucket in payload.stats() {
745                    count += bucket.stats().len();
746                }
747            }
748
749            // When disabled, only top_level span gets stats (client span has no top_level/measured)
750            assert_eq!(count, 1, "Expected 1 stat when span kind check disabled");
751        }
752
753        // Test with compute_stats_by_span_kind ENABLED
754        {
755            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
756
757            // Create a simple top-level span
758            let mut metrics = FastHashMap::default();
759            metrics.insert(MetaString::from("_top_level"), 1.0);
760            let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
761
762            let payload_key = PayloadAggregationKey {
763                env: MetaString::from("test"),
764                ..Default::default()
765            };
766            let infra_tags = InfraTags::default();
767
768            if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
769                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
770            }
771
772            // Client span with span.kind=client
773            // SHOULD produce stats when compute_stats_by_span_kind is enabled
774            let mut client_meta = FastHashMap::default();
775            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
776            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
777                .with_meta(client_meta);
778
779            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
780                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
781            }
782
783            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
784
785            let mut count = 0;
786            for payload in &stats {
787                for bucket in payload.stats() {
788                    count += bucket.stats().len();
789                }
790            }
791
792            // When enabled, both spans get stats
793            assert_eq!(count, 2, "Expected 2 stats when span kind check enabled");
794        }
795    }
796
797    #[test]
798    fn test_peer_tags() {
799        let now = now_nanos();
800
801        // Test without peer tags aggregation enabled
802        {
803            let mut concentrator = SpanConcentrator::new(true, false, &[], now);
804
805            // Client span with db tags and _dd.measured
806            let mut client_meta = FastHashMap::default();
807            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
808            client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
809            client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
810            let mut client_metrics = FastHashMap::default();
811            client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
812            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
813                .with_meta(client_meta)
814                .with_metrics(client_metrics);
815
816            let payload_key = PayloadAggregationKey {
817                env: MetaString::from("test"),
818                ..Default::default()
819            };
820            let infra_tags = InfraTags::default();
821
822            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
823                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
824            }
825
826            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
827
828            // Without peer tags aggregation, peer_tags should be empty
829            for payload in &stats {
830                for bucket in payload.stats() {
831                    for grouped in bucket.stats() {
832                        assert!(
833                            grouped.peer_tags().is_empty(),
834                            "Peer tags should be empty when peer_tags_aggregation is false"
835                        );
836                    }
837                }
838            }
839        }
840
841        // Test with peer tags aggregation enabled
842        {
843            // Note: BASE_PEER_TAGS already includes db.instance and db.system
844            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
845
846            // Client span with db tags and _dd.measured
847            let mut client_meta = FastHashMap::default();
848            client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
849            client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
850            client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
851            let mut client_metrics = FastHashMap::default();
852            client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
853            let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
854                .with_meta(client_meta)
855                .with_metrics(client_metrics);
856
857            let payload_key = PayloadAggregationKey {
858                env: MetaString::from("test"),
859                ..Default::default()
860            };
861            let infra_tags = InfraTags::default();
862
863            if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
864                concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
865            }
866
867            let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
868
869            // With peer tags aggregation, client span should have peer_tags
870            let mut found_client_with_peer_tags = false;
871            for payload in &stats {
872                for bucket in payload.stats() {
873                    for grouped in bucket.stats() {
874                        if grouped.resource() == "SELECT ..." {
875                            assert!(!grouped.peer_tags().is_empty(), "Client span should have peer tags");
876                            // Check that peer tags contain db.instance and db.system
877                            let peer_tags: Vec<&str> = grouped.peer_tags().iter().map(|s| s.as_ref()).collect();
878                            assert!(
879                                peer_tags.iter().any(|t| t.starts_with("db.instance:")),
880                                "Should have db.instance peer tag"
881                            );
882                            assert!(
883                                peer_tags.iter().any(|t| t.starts_with("db.system:")),
884                                "Should have db.system peer tag"
885                            );
886                            found_client_with_peer_tags = true;
887                        }
888                    }
889                }
890            }
891            assert!(
892                found_client_with_peer_tags,
893                "Should have found client span with peer tags"
894            );
895        }
896    }
897
898    #[test]
899    fn test_concentrator_oldest_ts() {
900        let now = now_nanos();
901        let aligned_now = align_ts(now, BUCKET_DURATION_NS);
902
903        // Test "cold" scenario - all spans in the past should end up in current bucket
904        {
905            // Start concentrator at current time (cold start)
906            let mut concentrator = SpanConcentrator::new(true, true, &[], now);
907
908            // Build spans spread over many time windows (all in the past)
909            let spans = vec![
910                make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
911                make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
912                make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
913                make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
914                make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
915                make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
916            ];
917
918            let payload_key = PayloadAggregationKey {
919                env: MetaString::from("none"),
920                ..Default::default()
921            };
922            let infra_tags = InfraTags::default();
923
924            for span in &spans {
925                if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
926                    concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
927                }
928            }
929
930            // Flush multiple times without force
931            let mut flush_time = now;
932            let buffer_len = 2; // DEFAULT_BUFFER_LEN
933
934            for _ in 0..buffer_len {
935                let stats = concentrator.flush(flush_time, false);
936                assert!(stats.is_empty(), "Should not flush before buffer fills");
937                flush_time += BUCKET_DURATION_NS;
938            }
939
940            // After buffer_len flushes, should get aggregated stats
941            let stats = concentrator.flush(flush_time, false);
942            assert!(!stats.is_empty(), "Should flush after buffer fills");
943
944            // All spans should be aggregated into one bucket (oldest bucket aggregates old data)
945            let mut total_hits: u64 = 0;
946            let mut total_duration: u64 = 0;
947            for payload in &stats {
948                for bucket in payload.stats() {
949                    for grouped in bucket.stats() {
950                        total_hits += grouped.hits();
951                        total_duration += grouped.duration();
952                    }
953                }
954            }
955
956            assert_eq!(total_hits, 6, "All 6 spans should be counted");
957            assert_eq!(
958                total_duration,
959                50 + 40 + 30 + 20 + 10 + 1,
960                "Total duration should match"
961            );
962        }
963    }
964
965    #[test]
966    fn test_compute_stats_for_span_kind() {
967        use super::span_concentrator::compute_stats_for_span_kind;
968
969        // Valid span kinds (case insensitive)
970        assert!(compute_stats_for_span_kind("server"));
971        assert!(compute_stats_for_span_kind("consumer"));
972        assert!(compute_stats_for_span_kind("client"));
973        assert!(compute_stats_for_span_kind("producer"));
974
975        // Uppercase
976        assert!(compute_stats_for_span_kind("SERVER"));
977        assert!(compute_stats_for_span_kind("CONSUMER"));
978        assert!(compute_stats_for_span_kind("CLIENT"));
979        assert!(compute_stats_for_span_kind("PRODUCER"));
980
981        // Mixed case
982        assert!(compute_stats_for_span_kind("SErVER"));
983        assert!(compute_stats_for_span_kind("COnSUMER"));
984        assert!(compute_stats_for_span_kind("CLiENT"));
985        assert!(compute_stats_for_span_kind("PRoDUCER"));
986
987        // Invalid span kinds
988        assert!(!compute_stats_for_span_kind("internal"));
989        assert!(!compute_stats_for_span_kind("INTERNAL"));
990        assert!(!compute_stats_for_span_kind("INtERNAL"));
991        assert!(!compute_stats_for_span_kind(""));
992    }
993
994    #[test]
995    fn test_extract_process_tags() {
996        // Test with no process tags
997        {
998            let span = Span::default();
999            let trace = Trace::new(vec![span], TagSet::default());
1000            let process_tags = extract_process_tags(&trace);
1001            assert!(process_tags.is_empty(), "Should be empty when no _dd.tags.process");
1002        }
1003
1004        // Test with process tags in first span meta
1005        {
1006            let mut meta = FastHashMap::default();
1007            meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from("a:1,b:2,c:3"));
1008            let span = Span::default().with_meta(meta);
1009            let trace = Trace::new(vec![span], TagSet::default());
1010            let process_tags = extract_process_tags(&trace);
1011            assert_eq!(process_tags, "a:1,b:2,c:3");
1012        }
1013
1014        // Test with empty process tags
1015        {
1016            let mut meta = FastHashMap::default();
1017            meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from(""));
1018            let span = Span::default().with_meta(meta);
1019            let trace = Trace::new(vec![span], TagSet::default());
1020            let process_tags = extract_process_tags(&trace);
1021            assert!(
1022                process_tags.is_empty(),
1023                "Should be empty when _dd.tags.process is empty string"
1024            );
1025        }
1026
1027        // Test with empty trace
1028        {
1029            let trace = Trace::new(vec![], TagSet::default());
1030            let process_tags = extract_process_tags(&trace);
1031            assert!(process_tags.is_empty(), "Should be empty when trace has no spans");
1032        }
1033    }
1034
1035    #[test]
1036    fn test_process_tags_hash_computation() {
1037        use super::aggregation::process_tags_hash;
1038
1039        // Empty string should return 0
1040        assert_eq!(process_tags_hash(""), 0);
1041
1042        // Same tags should produce same hash
1043        let hash1 = process_tags_hash("a:1,b:2,c:3");
1044        let hash2 = process_tags_hash("a:1,b:2,c:3");
1045        assert_eq!(hash1, hash2);
1046
1047        // Different tags should produce different hash
1048        let hash3 = process_tags_hash("a:1,b:2");
1049        assert_ne!(hash1, hash3);
1050    }
1051}