1use std::{
6 sync::Arc,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use async_trait::async_trait;
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{CONTAINER_ID, K8S_POD_UID};
13use saluki_config::GenericConfiguration;
14use saluki_context::{origin::OriginTagCardinality, tags::TagSet};
15use saluki_core::{
16 components::{transforms::*, ComponentContext},
17 data_model::event::{trace::Trace, trace_stats::TraceStats, Event, EventType},
18 topology::{EventsBuffer, OutputDefinition},
19};
20use saluki_env::{
21 host::providers::BoxedHostProvider, workload::EntityId, EnvironmentProvider, HostProvider, WorkloadProvider,
22};
23use saluki_error::GenericError;
24use stringtheory::MetaString;
25use tokio::{select, time::interval};
26use tracing::{debug, error};
27
28use crate::common::datadog::apm::ApmConfig;
29use crate::common::otlp::util::{extract_container_tags_from_resource_tagset, KEY_DATADOG_CONTAINER_ID};
30
31mod aggregation;
32
33use self::aggregation::process_tags_hash;
34mod span_concentrator;
35mod statsraw;
36mod weight;
37
38use self::aggregation::PayloadAggregationKey;
39use self::span_concentrator::{InfraTags, SpanConcentrator};
40use self::weight::weight;
41
42const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
44
45const TAG_PROCESS_TAGS: &str = "_dd.tags.process";
47
48pub struct ApmStatsTransformConfiguration {
53 apm_config: ApmConfig,
54 default_hostname: Option<String>,
55 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
56}
57
58impl ApmStatsTransformConfiguration {
59 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
61 let apm_config = ApmConfig::from_configuration(config)?;
62 Ok(Self {
63 apm_config,
64 default_hostname: None,
65 workload_provider: None,
66 })
67 }
68
69 pub async fn with_environment_provider<E>(mut self, env_provider: E) -> Result<Self, GenericError>
71 where
72 E: EnvironmentProvider<Host = BoxedHostProvider>,
73 {
74 let hostname = env_provider.host().get_hostname().await?;
75 self.default_hostname = Some(hostname);
76 Ok(self)
77 }
78
79 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
83 where
84 W: WorkloadProvider + Send + Sync + 'static,
85 {
86 self.workload_provider = Some(Arc::new(workload_provider));
87 self
88 }
89}
90
91#[async_trait]
92impl TransformBuilder for ApmStatsTransformConfiguration {
93 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
94 let mut apm_config = self.apm_config.clone();
95
96 if let Some(hostname) = &self.default_hostname {
97 apm_config.set_hostname_if_empty(hostname.as_str());
98 }
99
100 let concentrator = SpanConcentrator::new(
101 apm_config.compute_stats_by_span_kind(),
102 apm_config.peer_tags_aggregation(),
103 apm_config.peer_tags(),
104 now_nanos(),
105 );
106
107 Ok(Box::new(ApmStats {
108 concentrator,
109 flush_interval: DEFAULT_FLUSH_INTERVAL,
110 agent_env: apm_config.default_env().clone(),
111 agent_hostname: apm_config.hostname().clone(),
112 workload_provider: self.workload_provider.clone(),
113 }))
114 }
115
116 fn input_event_type(&self) -> EventType {
117 EventType::Trace
118 }
119
120 fn outputs(&self) -> &[OutputDefinition<EventType>] {
121 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::TraceStats)];
122 OUTPUTS
123 }
124}
125
126impl MemoryBounds for ApmStatsTransformConfiguration {
127 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
128 builder.minimum().with_single_value::<ApmStats>("component struct");
129 }
131}
132
133struct ApmStats {
134 concentrator: SpanConcentrator,
135 flush_interval: Duration,
136 agent_env: MetaString,
137 agent_hostname: MetaString,
138 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
139}
140
141impl ApmStats {
142 fn process_trace(&mut self, trace: &Trace) {
143 let root_span = trace
144 .spans()
145 .iter()
146 .find(|s| s.parent_id() == 0)
147 .or_else(|| trace.spans().first());
148
149 let trace_weight = root_span.map(weight).unwrap_or(1.0);
150
151 let process_tags = extract_process_tags(trace);
152
153 let payload_key = self.build_payload_key(trace, &process_tags);
154 let infra_tags = self.build_infra_tags(trace, &process_tags);
155
156 let origin = trace
157 .spans()
158 .first()
159 .and_then(|s| s.meta().get("_dd.origin"))
160 .map(|s| s.as_ref())
161 .unwrap_or("");
162
163 for span in trace.spans() {
164 if let Some(stat_span) = self.concentrator.new_stat_span_from_span(span) {
165 self.concentrator
166 .add_span(&stat_span, trace_weight, &payload_key, &infra_tags, origin);
167 }
168 }
169 }
170
171 fn build_infra_tags(&self, trace: &Trace, process_tags: &str) -> InfraTags {
172 let resource_tags = trace.resource_tags();
173 let container_id = resolve_container_id(resource_tags);
174 let mut container_tags = if container_id.is_empty() {
175 vec![]
176 } else {
177 extract_container_tags(resource_tags)
178 };
179
180 if !container_id.is_empty() {
182 if let Some(workload_provider) = &self.workload_provider {
183 let entity_id = EntityId::Container(container_id.clone());
184 if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
185 container_tags.extend((&tags).into_iter().map(|tag| MetaString::from(tag.as_str())));
186 }
187 }
188 }
189
190 container_tags.sort();
191
192 InfraTags::new(container_id, container_tags, process_tags)
193 }
194
195 fn build_payload_key(&self, trace: &Trace, process_tags: &str) -> PayloadAggregationKey {
196 let root_span = trace
197 .spans()
198 .iter()
199 .find(|s| s.parent_id() == 0)
200 .or_else(|| trace.spans().first());
201
202 let span_env = root_span.and_then(|s| s.meta().get("env")).filter(|s| !s.is_empty());
203 let env = span_env.cloned().unwrap_or_else(|| self.agent_env.clone());
204
205 let hostname = root_span
206 .and_then(|s| s.meta().get("_dd.hostname"))
207 .filter(|s| !s.is_empty())
208 .cloned()
209 .unwrap_or_else(|| self.agent_hostname.clone());
210
211 let version = root_span
212 .and_then(|s| s.meta().get("version"))
213 .cloned()
214 .unwrap_or_default();
215
216 let container_id = root_span
217 .and_then(|s| s.meta().get("_dd.container_id"))
218 .cloned()
219 .unwrap_or_default();
220
221 let git_commit_sha = root_span
222 .and_then(|s| s.meta().get("_dd.git.commit.sha"))
223 .cloned()
224 .unwrap_or_default();
225
226 let image_tag = root_span
227 .and_then(|s| s.meta().get("_dd.image_tag"))
228 .cloned()
229 .unwrap_or_default();
230
231 let lang = root_span
232 .and_then(|s| s.meta().get("language"))
233 .cloned()
234 .unwrap_or_default();
235
236 PayloadAggregationKey {
237 env,
238 hostname,
239 version,
240 container_id,
241 git_commit_sha,
242 image_tag,
243 lang,
244 process_tags_hash: process_tags_hash(process_tags),
245 }
246 }
247}
248
249#[async_trait]
250impl Transform for ApmStats {
251 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
252 let mut health = context.take_health_handle();
253
254 let mut flush_ticker = interval(self.flush_interval);
255 flush_ticker.tick().await;
256
257 let mut final_flush = false;
258
259 health.mark_ready();
260 debug!("APM Stats transform started.");
261
262 loop {
263 select! {
264 _ = health.live() => continue,
265
266 _ = flush_ticker.tick() => {
267 let stats_payloads = self.concentrator.flush(now_nanos(), final_flush);
268
269 if !stats_payloads.is_empty() {
270 let trace_stats = TraceStats::new(stats_payloads);
271 debug!(buckets = trace_stats.stats().len(), "Flushing APM stats.");
272
273 let mut event_buffer = EventsBuffer::default();
274 if event_buffer.try_push(Event::TraceStats(trace_stats)).is_some() {
275 error!("Failed to push TraceStats event to buffer.");
276 } else if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
277 error!(error = %e, "Failed to dispatch TraceStats event.");
278 }
279 }
280
281 if final_flush {
282 debug!("Final APM stats flush complete.");
283 break;
284 }
285 },
286
287 maybe_events = context.events().next(), if !final_flush => {
288 match maybe_events {
289 Some(events) => {
290 for event in events {
291 if let Event::Trace(trace) = event {
292 self.process_trace(&trace);
293 }
294 }
295 },
296 None => {
297 final_flush = true;
300 flush_ticker.reset_immediately();
301 debug!("APM Stats transform stopping, triggering final flush...");
302 }
303 }
304 },
305 }
306 }
307
308 debug!("APM Stats transform stopped.");
309 Ok(())
310 }
311}
312
313fn now_nanos() -> u64 {
315 SystemTime::now()
316 .duration_since(UNIX_EPOCH)
317 .unwrap_or_default()
318 .as_nanos() as u64
319}
320
321fn resolve_container_id(resource_tags: &TagSet) -> MetaString {
323 for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
324 if let Some(tag) = resource_tags.get_single_tag(key) {
325 if let Some(value) = tag.value() {
326 if !value.is_empty() {
327 return MetaString::from(value);
328 }
329 }
330 }
331 }
332 MetaString::default()
333}
334
335fn extract_container_tags(resource_tags: &TagSet) -> Vec<MetaString> {
337 let mut container_tags_set = TagSet::default();
338 extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags_set);
339
340 container_tags_set
341 .into_iter()
342 .map(|tag| MetaString::from(tag.as_str()))
343 .collect()
344}
345
346fn extract_process_tags(trace: &Trace) -> String {
348 if let Some(first_span) = trace.spans().first() {
349 if let Some(process_tags) = first_span.meta().get(TAG_PROCESS_TAGS) {
350 let tags = process_tags.as_ref();
351 if !tags.is_empty() {
352 return tags.to_string();
353 }
354 }
355 }
356
357 String::new()
358}
359
360#[cfg(test)]
361mod tests {
362 use saluki_common::collections::FastHashMap;
363 use saluki_context::tags::TagSet;
364 use saluki_core::data_model::event::trace::Span;
365
366 use super::aggregation::BUCKET_DURATION_NS;
367 use super::span_concentrator::METRIC_PARTIAL_VERSION;
368 use super::*;
369
370 fn align_ts(ts: u64, bsize: u64) -> u64 {
372 ts - ts % bsize
373 }
374
375 #[allow(clippy::too_many_arguments)]
377 fn test_span(
378 aligned_now: u64, span_id: u64, parent_id: u64, duration: u64, bucket_offset: u64, service: &str,
379 resource: &str, error: i32, meta: Option<FastHashMap<MetaString, MetaString>>,
380 metrics: Option<FastHashMap<MetaString, f64>>,
381 ) -> Span {
382 let bucket_start = aligned_now - bucket_offset * BUCKET_DURATION_NS;
386 let start = bucket_start - duration;
387
388 Span::new(
389 service, "query", resource, "db", 1, span_id, parent_id, start, duration, error,
390 )
391 .with_meta(meta)
392 .with_metrics(metrics)
393 }
394
395 fn make_test_span(service: &str, name: &str, resource: &str) -> Span {
397 let mut metrics = FastHashMap::default();
398 metrics.insert(MetaString::from("_dd.measured"), 1.0);
399
400 Span::new(service, name, resource, "web", 1, 1, 0, 1000000000, 100000000, 0).with_metrics(metrics)
401 }
402
403 fn make_top_level_span(
405 aligned_now: u64, span_id: u64, duration: u64, bucket_offset: u64, service: &str, resource: &str, error: i32,
406 meta: Option<FastHashMap<MetaString, MetaString>>,
407 ) -> Span {
408 let mut metrics = FastHashMap::default();
409 metrics.insert(MetaString::from("_top_level"), 1.0);
410 test_span(
411 aligned_now,
412 span_id,
413 0,
414 duration,
415 bucket_offset,
416 service,
417 resource,
418 error,
419 meta,
420 Some(metrics),
421 )
422 }
423
424 #[test]
425 fn test_process_trace_creates_stats() {
426 let now = now_nanos();
427
428 let concentrator = SpanConcentrator::new(true, true, &[], now);
429 let mut transform = ApmStats {
430 concentrator,
431 flush_interval: DEFAULT_FLUSH_INTERVAL,
432 agent_env: MetaString::from("none"),
433 agent_hostname: MetaString::default(),
434 workload_provider: None,
435 };
436
437 let span = make_test_span("test-service", "test-operation", "test-resource");
438 let trace = Trace::new(vec![span], TagSet::default());
439
440 transform.process_trace(&trace);
441
442 let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
444 assert!(!stats.is_empty(), "Expected stats to be produced");
445 }
446
447 #[test]
448 fn test_weight_applied_to_stats() {
449 let now = now_nanos();
450
451 let concentrator = SpanConcentrator::new(true, true, &[], now);
452 let mut transform = ApmStats {
453 concentrator,
454 flush_interval: DEFAULT_FLUSH_INTERVAL,
455 agent_env: MetaString::from("none"),
456 agent_hostname: MetaString::default(),
457 workload_provider: None,
458 };
459
460 let mut metrics = FastHashMap::default();
462 metrics.insert(MetaString::from("_dd.measured"), 1.0);
463 metrics.insert(MetaString::from("_sample_rate"), 0.5);
464
465 let span = Span::new(
466 "test-service",
467 "test-op",
468 "test-resource",
469 "web",
470 1,
471 1,
472 0,
473 now,
474 100000000,
475 0,
476 )
477 .with_metrics(metrics);
478
479 let trace = Trace::new(vec![span], TagSet::default());
480 transform.process_trace(&trace);
481
482 let stats = transform.concentrator.flush(now + BUCKET_DURATION_NS * 2, true);
483 assert!(!stats.is_empty());
484
485 let bucket = &stats[0].stats()[0];
487 let grouped = &bucket.stats()[0];
488 assert!(grouped.hits() >= 1, "Expected weighted hits");
490 }
491
492 #[test]
493 fn test_force_flush() {
494 let now = now_nanos();
495 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
496
497 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
498
499 let span = make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None);
501 let trace = Trace::new(vec![span], TagSet::default());
502
503 let payload_key = PayloadAggregationKey {
504 env: MetaString::from("test"),
505 hostname: MetaString::from("host"),
506 ..Default::default()
507 };
508 let infra_tags = InfraTags::default();
509
510 for span in trace.spans() {
511 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
512 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
513 }
514 }
515
516 let ts: u64 = 0;
518
519 let stats = concentrator.flush(ts, false);
521 assert!(stats.is_empty(), "Non-force flush should return empty");
522
523 let stats = concentrator.flush(ts, true);
525 assert!(!stats.is_empty(), "Force flush should return stats");
526 assert_eq!(stats[0].stats().len(), 1, "Should have 1 bucket");
527 }
528
529 #[test]
530 fn test_ignores_partial_spans() {
531 let now = now_nanos();
532 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
533
534 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
535
536 let mut metrics = FastHashMap::default();
538 metrics.insert(MetaString::from("_top_level"), 1.0);
539 metrics.insert(MetaString::from(METRIC_PARTIAL_VERSION), 830604.0);
540
541 let span = test_span(aligned_now, 1, 0, 50, 5, "A1", "resource1", 0, None, Some(metrics));
542 let trace = Trace::new(vec![span], TagSet::default());
543
544 let payload_key = PayloadAggregationKey {
545 env: MetaString::from("test"),
546 hostname: MetaString::from("tracer-hostname"),
547 ..Default::default()
548 };
549 let infra_tags = InfraTags::default();
550
551 for span in trace.spans() {
552 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
553 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
554 }
555 }
556
557 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
559 assert!(stats.is_empty(), "Partial spans should be ignored");
560 }
561
562 #[test]
563 fn test_concentrator_stats_totals() {
564 let now = now_nanos();
565 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
566
567 let oldest_ts = aligned_now - 2 * BUCKET_DURATION_NS;
569 let mut concentrator = SpanConcentrator::new(true, true, &[], oldest_ts);
570
571 let spans = vec![
573 make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
574 make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
575 make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
576 make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
577 make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
578 make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
579 ];
580
581 let payload_key = PayloadAggregationKey {
582 env: MetaString::from("none"),
583 ..Default::default()
584 };
585 let infra_tags = InfraTags::default();
586
587 for span in &spans {
588 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
589 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
590 }
591 }
592
593 let all_stats = concentrator.flush(now + BUCKET_DURATION_NS * 10, true);
595
596 let mut total_duration: u64 = 0;
597 let mut total_hits: u64 = 0;
598 let mut total_errors: u64 = 0;
599 let mut total_top_level_hits: u64 = 0;
600
601 for payload in &all_stats {
602 for bucket in payload.stats() {
603 for grouped in bucket.stats() {
604 total_duration += grouped.duration();
605 total_hits += grouped.hits();
606 total_errors += grouped.errors();
607 total_top_level_hits += grouped.top_level_hits();
608 }
609 }
610 }
611
612 assert_eq!(total_duration, 50 + 40 + 30 + 20 + 10 + 1, "Wrong total duration");
613 assert_eq!(total_hits, 6, "Wrong total hits");
614 assert_eq!(total_top_level_hits, 6, "Wrong total top level hits");
615 assert_eq!(total_errors, 0, "Wrong total errors");
616 }
617
618 #[test]
619 fn test_root_tag() {
620 let now = now_nanos();
621 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
622
623 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
624
625 let mut root_metrics = FastHashMap::default();
627 root_metrics.insert(MetaString::from("_top_level"), 1.0);
628 let root_span = test_span(
629 aligned_now,
630 1,
631 0,
632 40,
633 10,
634 "A1",
635 "resource1",
636 0,
637 None,
638 Some(root_metrics),
639 );
640
641 let mut top_level_metrics = FastHashMap::default();
643 top_level_metrics.insert(MetaString::from("_top_level"), 1.0);
644 let top_level_span = test_span(
645 aligned_now,
646 4,
647 1000,
648 10,
649 10,
650 "A1",
651 "resource1",
652 0,
653 None,
654 Some(top_level_metrics),
655 );
656
657 let mut client_meta = FastHashMap::default();
659 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
660 let client_span = test_span(aligned_now, 3, 2, 20, 10, "A1", "resource1", 0, Some(client_meta), None);
661
662 let spans = vec![root_span, top_level_span, client_span];
663
664 let payload_key = PayloadAggregationKey {
665 env: MetaString::from("none"),
666 ..Default::default()
667 };
668 let infra_tags = InfraTags::default();
669
670 for span in &spans {
671 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
672 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
673 }
674 }
675
676 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 20, true);
677 assert!(!stats.is_empty(), "Should have stats");
678
679 let mut total_grouped = 0;
681 let mut root_count = 0;
682 let mut non_root_count = 0;
683
684 for payload in &stats {
685 for bucket in payload.stats() {
686 for grouped in bucket.stats() {
687 total_grouped += 1;
688 match grouped.is_trace_root() {
689 Some(true) => root_count += 1,
690 Some(false) => non_root_count += 1,
691 None => {}
692 }
693 }
694 }
695 }
696
697 assert_eq!(total_grouped, 3, "Expected 3 grouped stats");
702 assert_eq!(root_count, 1, "Expected 1 root span");
703 assert_eq!(non_root_count, 2, "Expected 2 non-root spans");
704 }
705
706 #[test]
707 fn test_compute_stats_through_span_kind_check() {
708 let now = now_nanos();
709
710 {
712 let mut concentrator = SpanConcentrator::new(false, true, &[], now);
713
714 let mut metrics = FastHashMap::default();
716 metrics.insert(MetaString::from("_top_level"), 1.0);
717 let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
718
719 let payload_key = PayloadAggregationKey {
720 env: MetaString::from("test"),
721 ..Default::default()
722 };
723 let infra_tags = InfraTags::default();
724
725 if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
726 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
727 }
728
729 let mut client_meta = FastHashMap::default();
732 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
733 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
734 .with_meta(client_meta);
735
736 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
737 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
738 }
739
740 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
741
742 let mut count = 0;
743 for payload in &stats {
744 for bucket in payload.stats() {
745 count += bucket.stats().len();
746 }
747 }
748
749 assert_eq!(count, 1, "Expected 1 stat when span kind check disabled");
751 }
752
753 {
755 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
756
757 let mut metrics = FastHashMap::default();
759 metrics.insert(MetaString::from("_top_level"), 1.0);
760 let span = Span::new("myservice", "query", "GET /users", "web", 1, 1, 0, now, 500, 0).with_metrics(metrics);
761
762 let payload_key = PayloadAggregationKey {
763 env: MetaString::from("test"),
764 ..Default::default()
765 };
766 let infra_tags = InfraTags::default();
767
768 if let Some(stat_span) = concentrator.new_stat_span_from_span(&span) {
769 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
770 }
771
772 let mut client_meta = FastHashMap::default();
775 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
776 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
777 .with_meta(client_meta);
778
779 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
780 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
781 }
782
783 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
784
785 let mut count = 0;
786 for payload in &stats {
787 for bucket in payload.stats() {
788 count += bucket.stats().len();
789 }
790 }
791
792 assert_eq!(count, 2, "Expected 2 stats when span kind check enabled");
794 }
795 }
796
797 #[test]
798 fn test_peer_tags() {
799 let now = now_nanos();
800
801 {
803 let mut concentrator = SpanConcentrator::new(true, false, &[], now);
804
805 let mut client_meta = FastHashMap::default();
807 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
808 client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
809 client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
810 let mut client_metrics = FastHashMap::default();
811 client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
812 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
813 .with_meta(client_meta)
814 .with_metrics(client_metrics);
815
816 let payload_key = PayloadAggregationKey {
817 env: MetaString::from("test"),
818 ..Default::default()
819 };
820 let infra_tags = InfraTags::default();
821
822 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
823 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
824 }
825
826 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
827
828 for payload in &stats {
830 for bucket in payload.stats() {
831 for grouped in bucket.stats() {
832 assert!(
833 grouped.peer_tags().is_empty(),
834 "Peer tags should be empty when peer_tags_aggregation is false"
835 );
836 }
837 }
838 }
839 }
840
841 {
843 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
845
846 let mut client_meta = FastHashMap::default();
848 client_meta.insert(MetaString::from("span.kind"), MetaString::from("client"));
849 client_meta.insert(MetaString::from("db.instance"), MetaString::from("i-1234"));
850 client_meta.insert(MetaString::from("db.system"), MetaString::from("postgres"));
851 let mut client_metrics = FastHashMap::default();
852 client_metrics.insert(MetaString::from("_dd.measured"), 1.0);
853 let client_span = Span::new("myservice", "postgres.query", "SELECT ...", "db", 1, 2, 1, now, 75, 0)
854 .with_meta(client_meta)
855 .with_metrics(client_metrics);
856
857 let payload_key = PayloadAggregationKey {
858 env: MetaString::from("test"),
859 ..Default::default()
860 };
861 let infra_tags = InfraTags::default();
862
863 if let Some(stat_span) = concentrator.new_stat_span_from_span(&client_span) {
864 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
865 }
866
867 let stats = concentrator.flush(now + BUCKET_DURATION_NS * 3, true);
868
869 let mut found_client_with_peer_tags = false;
871 for payload in &stats {
872 for bucket in payload.stats() {
873 for grouped in bucket.stats() {
874 if grouped.resource() == "SELECT ..." {
875 assert!(!grouped.peer_tags().is_empty(), "Client span should have peer tags");
876 let peer_tags: Vec<&str> = grouped.peer_tags().iter().map(|s| s.as_ref()).collect();
878 assert!(
879 peer_tags.iter().any(|t| t.starts_with("db.instance:")),
880 "Should have db.instance peer tag"
881 );
882 assert!(
883 peer_tags.iter().any(|t| t.starts_with("db.system:")),
884 "Should have db.system peer tag"
885 );
886 found_client_with_peer_tags = true;
887 }
888 }
889 }
890 }
891 assert!(
892 found_client_with_peer_tags,
893 "Should have found client span with peer tags"
894 );
895 }
896 }
897
898 #[test]
899 fn test_concentrator_oldest_ts() {
900 let now = now_nanos();
901 let aligned_now = align_ts(now, BUCKET_DURATION_NS);
902
903 {
905 let mut concentrator = SpanConcentrator::new(true, true, &[], now);
907
908 let spans = vec![
910 make_top_level_span(aligned_now, 1, 50, 5, "A1", "resource1", 0, None),
911 make_top_level_span(aligned_now, 2, 40, 4, "A1", "resource1", 0, None),
912 make_top_level_span(aligned_now, 3, 30, 3, "A1", "resource1", 0, None),
913 make_top_level_span(aligned_now, 4, 20, 2, "A1", "resource1", 0, None),
914 make_top_level_span(aligned_now, 5, 10, 1, "A1", "resource1", 0, None),
915 make_top_level_span(aligned_now, 6, 1, 0, "A1", "resource1", 0, None),
916 ];
917
918 let payload_key = PayloadAggregationKey {
919 env: MetaString::from("none"),
920 ..Default::default()
921 };
922 let infra_tags = InfraTags::default();
923
924 for span in &spans {
925 if let Some(stat_span) = concentrator.new_stat_span_from_span(span) {
926 concentrator.add_span(&stat_span, 1.0, &payload_key, &infra_tags, "");
927 }
928 }
929
930 let mut flush_time = now;
932 let buffer_len = 2; for _ in 0..buffer_len {
935 let stats = concentrator.flush(flush_time, false);
936 assert!(stats.is_empty(), "Should not flush before buffer fills");
937 flush_time += BUCKET_DURATION_NS;
938 }
939
940 let stats = concentrator.flush(flush_time, false);
942 assert!(!stats.is_empty(), "Should flush after buffer fills");
943
944 let mut total_hits: u64 = 0;
946 let mut total_duration: u64 = 0;
947 for payload in &stats {
948 for bucket in payload.stats() {
949 for grouped in bucket.stats() {
950 total_hits += grouped.hits();
951 total_duration += grouped.duration();
952 }
953 }
954 }
955
956 assert_eq!(total_hits, 6, "All 6 spans should be counted");
957 assert_eq!(
958 total_duration,
959 50 + 40 + 30 + 20 + 10 + 1,
960 "Total duration should match"
961 );
962 }
963 }
964
965 #[test]
966 fn test_compute_stats_for_span_kind() {
967 use super::span_concentrator::compute_stats_for_span_kind;
968
969 assert!(compute_stats_for_span_kind("server"));
971 assert!(compute_stats_for_span_kind("consumer"));
972 assert!(compute_stats_for_span_kind("client"));
973 assert!(compute_stats_for_span_kind("producer"));
974
975 assert!(compute_stats_for_span_kind("SERVER"));
977 assert!(compute_stats_for_span_kind("CONSUMER"));
978 assert!(compute_stats_for_span_kind("CLIENT"));
979 assert!(compute_stats_for_span_kind("PRODUCER"));
980
981 assert!(compute_stats_for_span_kind("SErVER"));
983 assert!(compute_stats_for_span_kind("COnSUMER"));
984 assert!(compute_stats_for_span_kind("CLiENT"));
985 assert!(compute_stats_for_span_kind("PRoDUCER"));
986
987 assert!(!compute_stats_for_span_kind("internal"));
989 assert!(!compute_stats_for_span_kind("INTERNAL"));
990 assert!(!compute_stats_for_span_kind("INtERNAL"));
991 assert!(!compute_stats_for_span_kind(""));
992 }
993
994 #[test]
995 fn test_extract_process_tags() {
996 {
998 let span = Span::default();
999 let trace = Trace::new(vec![span], TagSet::default());
1000 let process_tags = extract_process_tags(&trace);
1001 assert!(process_tags.is_empty(), "Should be empty when no _dd.tags.process");
1002 }
1003
1004 {
1006 let mut meta = FastHashMap::default();
1007 meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from("a:1,b:2,c:3"));
1008 let span = Span::default().with_meta(meta);
1009 let trace = Trace::new(vec![span], TagSet::default());
1010 let process_tags = extract_process_tags(&trace);
1011 assert_eq!(process_tags, "a:1,b:2,c:3");
1012 }
1013
1014 {
1016 let mut meta = FastHashMap::default();
1017 meta.insert(MetaString::from(TAG_PROCESS_TAGS), MetaString::from(""));
1018 let span = Span::default().with_meta(meta);
1019 let trace = Trace::new(vec![span], TagSet::default());
1020 let process_tags = extract_process_tags(&trace);
1021 assert!(
1022 process_tags.is_empty(),
1023 "Should be empty when _dd.tags.process is empty string"
1024 );
1025 }
1026
1027 {
1029 let trace = Trace::new(vec![], TagSet::default());
1030 let process_tags = extract_process_tags(&trace);
1031 assert!(process_tags.is_empty(), "Should be empty when trace has no spans");
1032 }
1033 }
1034
1035 #[test]
1036 fn test_process_tags_hash_computation() {
1037 use super::aggregation::process_tags_hash;
1038
1039 assert_eq!(process_tags_hash(""), 0);
1041
1042 let hash1 = process_tags_hash("a:1,b:2,c:3");
1044 let hash2 = process_tags_hash("a:1,b:2,c:3");
1045 assert_eq!(hash1, hash2);
1046
1047 let hash3 = process_tags_hash("a:1,b:2");
1049 assert_ne!(hash1, hash3);
1050 }
1051}