1use async_trait::async_trait;
16use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
17use saluki_common::collections::FastHashMap;
18use saluki_config::GenericConfiguration;
19use saluki_core::{
20 components::{transforms::*, ComponentContext},
21 data_model::event::{
22 trace::{Span, Trace, TraceSampling},
23 Event,
24 },
25 topology::EventsBuffer,
26};
27use saluki_error::GenericError;
28use stringtheory::MetaString;
29use tracing::debug;
30
31mod catalog;
32mod core_sampler;
33mod errors;
34mod priority_sampler;
35mod probabilistic;
36mod score_sampler;
37mod signature;
38
39use self::probabilistic::PROB_RATE_KEY;
40use crate::common::datadog::{
41 apm::ApmConfig, sample_by_rate, DECISION_MAKER_PROBABILISTIC, OTEL_TRACE_ID_META_KEY, SAMPLING_PRIORITY_METRIC_KEY,
42 TAG_DECISION_MAKER,
43};
44use crate::common::otlp::config::TracesConfig;
45
46const PRIORITY_AUTO_DROP: i32 = 0;
48const PRIORITY_AUTO_KEEP: i32 = 1;
49const PRIORITY_USER_KEEP: i32 = 2;
50
51const ERROR_SAMPLE_RATE: f64 = 1.0; const KEY_SPAN_SAMPLING_MECHANISM: &str = "_dd.span_sampling.mechanism";
55const KEY_ANALYZED_SPANS: &str = "_dd.analyzed";
56
57fn normalize_sampling_rate(rate: f64) -> f64 {
60 if rate <= 0.0 || rate >= 1.0 {
61 1.0
62 } else {
63 rate
64 }
65}
66
67#[derive(Debug)]
69pub struct TraceSamplerConfiguration {
70 apm_config: ApmConfig,
71 otlp_sampling_rate: f64,
72}
73
74impl TraceSamplerConfiguration {
75 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
77 let apm_config = ApmConfig::from_configuration(config)?;
78 let otlp_traces: TracesConfig = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
79 let otlp_sampling_rate = normalize_sampling_rate(otlp_traces.probabilistic_sampler.sampling_percentage / 100.0);
80 Ok(Self {
81 apm_config,
82 otlp_sampling_rate,
83 })
84 }
85}
86
87#[async_trait]
88impl SynchronousTransformBuilder for TraceSamplerConfiguration {
89 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
90 let sampler = TraceSampler {
91 sampling_rate: self.apm_config.probabilistic_sampler_sampling_percentage() / 100.0,
92 error_sampling_enabled: self.apm_config.error_sampling_enabled(),
93 error_tracking_standalone: self.apm_config.error_tracking_standalone_enabled(),
94 probabilistic_sampler_enabled: self.apm_config.probabilistic_sampler_enabled(),
95 otlp_sampling_rate: self.otlp_sampling_rate,
96 error_sampler: errors::ErrorsSampler::new(self.apm_config.errors_per_second(), ERROR_SAMPLE_RATE),
97 priority_sampler: priority_sampler::PrioritySampler::new(
98 self.apm_config.default_env().clone(),
99 ERROR_SAMPLE_RATE,
100 self.apm_config.target_traces_per_second(),
101 ),
102 no_priority_sampler: score_sampler::NoPrioritySampler::new(
103 self.apm_config.target_traces_per_second(),
104 ERROR_SAMPLE_RATE,
105 ),
106 };
107
108 Ok(Box::new(sampler))
109 }
110}
111
112impl MemoryBounds for TraceSamplerConfiguration {
113 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
114 builder.minimum().with_single_value::<TraceSampler>("component struct");
115 }
116}
117
118pub struct TraceSampler {
119 sampling_rate: f64,
120 error_tracking_standalone: bool,
121 error_sampling_enabled: bool,
122 probabilistic_sampler_enabled: bool,
123 otlp_sampling_rate: f64,
124 error_sampler: errors::ErrorsSampler,
125 priority_sampler: priority_sampler::PrioritySampler,
126 no_priority_sampler: score_sampler::NoPrioritySampler,
127}
128
129impl TraceSampler {
130 fn get_root_span_index(&self, trace: &Trace) -> Option<usize> {
133 let spans = trace.spans();
135 if spans.is_empty() {
136 return None;
137 }
138 let length = spans.len();
139 let mut parent_id_to_child: FastHashMap<u64, usize> = FastHashMap::default();
145
146 for i in 0..length {
147 let j = length - 1 - i;
150 if spans[j].parent_id() == 0 {
151 return Some(j);
152 }
153 parent_id_to_child.insert(spans[j].parent_id(), j);
154 }
155
156 for span in spans.iter() {
157 parent_id_to_child.remove(&span.span_id());
158 }
159
160 if parent_id_to_child.len() != 1 {
162 debug!(
163 "Didn't reliably find the root span for traceID:{}",
164 &spans[0].trace_id()
165 );
166 }
167
168 if let Some((_, child_idx)) = parent_id_to_child.iter().next() {
171 return Some(*child_idx);
172 }
173
174 Some(length - 1)
176 }
177
178 fn get_user_priority(&self, trace: &Trace, root_span_idx: usize) -> Option<i32> {
180 if let Some(sampling) = trace.sampling() {
182 if let Some(priority) = sampling.priority {
183 return Some(priority);
184 }
185 }
186
187 if trace.spans().is_empty() {
188 return None;
189 }
190
191 if let Some(root) = trace.spans().get(root_span_idx) {
194 if let Some(&p) = root.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) {
195 return Some(p as i32);
196 }
197 }
198 let spans = trace.spans();
199 spans
200 .iter()
201 .find_map(|span| span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY).map(|&p| p as i32))
202 }
203
204 fn sample_probabilistic(&self, trace_id: u64) -> bool {
206 probabilistic::ProbabilisticSampler::sample(trace_id, self.sampling_rate)
207 }
208
209 fn is_otlp_trace(&self, trace: &Trace, root_span_idx: usize) -> bool {
210 trace
211 .spans()
212 .get(root_span_idx)
213 .map(|span| {
214 span.meta()
215 .contains_key(&MetaString::from_static(OTEL_TRACE_ID_META_KEY))
216 })
217 .unwrap_or(false)
218 }
219
220 fn trace_contains_error(&self, trace: &Trace, consider_exception_span_events: bool) -> bool {
222 trace.spans().iter().any(|span| {
223 span.error() != 0 || (consider_exception_span_events && self.span_contains_exception_span_event(span))
224 })
225 }
226
227 fn span_contains_exception_span_event(&self, span: &Span) -> bool {
231 if let Some(has_exception) = span.meta().get("_dd.span_events.has_exception") {
232 return has_exception == "true";
233 }
234 false
235 }
236
237 fn analyzed_span_sampling(&self, trace: &mut Trace) -> bool {
241 let retained = trace.retain_spans(|_, span| span.metrics().contains_key(KEY_ANALYZED_SPANS));
242 if retained > 0 {
243 let sampling = TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, Some(self.sampling_rate));
245 trace.set_sampling(Some(sampling));
246 true
247 } else {
248 false
249 }
250 }
251
252 fn has_analyzed_spans(&self, trace: &Trace) -> bool {
254 trace
255 .spans()
256 .iter()
257 .any(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
258 }
259
260 fn single_span_sampling(&self, trace: &mut Trace) -> bool {
263 let retained = trace.retain_spans(|_, span| span.metrics().contains_key(KEY_SPAN_SAMPLING_MECHANISM));
264 if retained > 0 {
265 let sampling = TraceSampling::new(
267 false,
268 Some(PRIORITY_USER_KEEP),
269 None, Some(self.sampling_rate),
271 );
272 trace.set_sampling(Some(sampling));
273 true
274 } else {
275 false
276 }
277 }
278
279 fn run_samplers(&mut self, trace: &mut Trace) -> (bool, i32, &'static str, Option<usize>) {
284 let now = std::time::SystemTime::now();
286 if trace.spans().is_empty() {
288 return (false, PRIORITY_AUTO_DROP, "", None);
289 }
290 let contains_error = self.trace_contains_error(trace, false);
291 let Some(root_span_idx) = self.get_root_span_index(trace) else {
292 return (false, PRIORITY_AUTO_DROP, "", None);
293 };
294
295 if self.probabilistic_sampler_enabled {
297 let mut prob_keep = false;
298 let mut decision_maker = "";
299
300 let root_trace_id = trace.spans()[root_span_idx].trace_id();
302 if self.sample_probabilistic(root_trace_id) {
303 decision_maker = DECISION_MAKER_PROBABILISTIC; prob_keep = true;
305
306 if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
307 let metrics = root_span.metrics_mut();
308 metrics.insert(MetaString::from(PROB_RATE_KEY), self.sampling_rate);
309 }
310 } else if self.error_sampling_enabled && contains_error {
311 prob_keep = self.error_sampler.sample_error(now, trace, root_span_idx);
312 }
313
314 let priority = if prob_keep {
315 PRIORITY_AUTO_KEEP
316 } else {
317 PRIORITY_AUTO_DROP
318 };
319
320 return (prob_keep, priority, decision_maker, Some(root_span_idx));
321 }
322
323 let user_priority = self.get_user_priority(trace, root_span_idx);
324 if let Some(priority) = user_priority {
325 if priority < PRIORITY_AUTO_DROP {
326 return (false, priority, "", Some(root_span_idx));
328 }
329
330 if self.priority_sampler.sample(now, trace, root_span_idx, priority, 0.0) {
331 return (true, priority, "", Some(root_span_idx));
332 }
333 } else if self.is_otlp_trace(trace, root_span_idx) {
334 let root_trace_id = trace.spans()[root_span_idx].trace_id();
336 if sample_by_rate(root_trace_id, self.otlp_sampling_rate) {
337 if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
338 root_span.metrics_mut().remove(PROB_RATE_KEY);
339 }
340 return (
341 true,
342 PRIORITY_AUTO_KEEP,
343 DECISION_MAKER_PROBABILISTIC,
344 Some(root_span_idx),
345 );
346 }
347 } else if self.no_priority_sampler.sample(now, trace, root_span_idx) {
348 return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
349 }
350
351 if self.error_sampling_enabled && contains_error {
352 let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
353 if keep {
354 return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
355 }
356 }
357
358 (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx))
360 }
361
362 fn apply_sampling_metadata(
367 &self, trace: &mut Trace, keep: bool, priority: i32, decision_maker: &str, root_span_idx: usize,
368 ) {
369 let is_otlp = self.is_otlp_trace(trace, root_span_idx);
370 let root_span_value = match trace.spans_mut().get_mut(root_span_idx) {
371 Some(span) => span,
372 None => return,
373 };
374
375 let existing_decision_maker = if decision_maker.is_empty() {
377 root_span_value.meta().get(TAG_DECISION_MAKER).cloned()
378 } else {
379 None
380 };
381 let decision_maker_meta = if decision_maker.is_empty() {
382 existing_decision_maker
383 } else {
384 Some(MetaString::from(decision_maker))
385 };
386
387 let meta = root_span_value.meta_mut();
388 if priority > 0 {
389 if let Some(dm) = decision_maker_meta.as_ref() {
390 meta.insert(MetaString::from(TAG_DECISION_MAKER), dm.clone());
391 }
392 }
393
394 let sampling = TraceSampling::new(
396 !keep,
397 Some(priority),
398 if priority > 0 { decision_maker_meta } else { None },
399 Some(if is_otlp {
400 self.otlp_sampling_rate
401 } else {
402 self.sampling_rate
403 }),
404 );
405 trace.set_sampling(Some(sampling));
406 }
407
408 fn process_trace(&mut self, trace: &mut Trace) -> bool {
409 let (keep, priority, decision_maker, root_span_idx) = self.run_samplers(trace);
414 if keep {
415 if let Some(root_idx) = root_span_idx {
416 self.apply_sampling_metadata(trace, keep, priority, decision_maker, root_idx);
417 }
418 return true;
419 }
420
421 if self.error_tracking_standalone {
422 return false;
423 }
424
425 let modified = self.single_span_sampling(trace);
428 if !modified {
429 if self.analyzed_span_sampling(trace) {
431 return true;
432 }
433 } else if self.has_analyzed_spans(trace) {
434 debug!(
436 "Detected both analytics events AND single span sampling in the same trace. Single span sampling wins because App Analytics is deprecated."
437 );
438 return true;
439 }
440
441 if modified {
443 return true;
444 }
445
446 debug!("Dropping trace with priority {}", priority);
448 false
449 }
450}
451
452impl SynchronousTransform for TraceSampler {
453 fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
454 buffer.remove_if(|event| match event {
455 Event::Trace(trace) => !self.process_trace(trace),
456 _ => false,
457 });
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::collections::HashMap;
464
465 use saluki_context::tags::TagSet;
466 use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
467 const PRIORITY_USER_DROP: i32 = -1;
468
469 use super::*;
470 fn create_test_sampler() -> TraceSampler {
471 TraceSampler {
472 sampling_rate: 1.0,
473 error_sampling_enabled: true,
474 error_tracking_standalone: false,
475 probabilistic_sampler_enabled: true,
476 otlp_sampling_rate: 1.0,
477 error_sampler: errors::ErrorsSampler::new(10.0, 1.0),
478 priority_sampler: priority_sampler::PrioritySampler::new(MetaString::from("agent-env"), 1.0, 10.0),
479 no_priority_sampler: score_sampler::NoPrioritySampler::new(10.0, 1.0),
480 }
481 }
482
483 fn create_test_span(trace_id: u64, span_id: u64, error: i32) -> DdSpan {
484 DdSpan::new(
485 MetaString::from("test-service"),
486 MetaString::from("test-operation"),
487 MetaString::from("test-resource"),
488 MetaString::from("test-type"),
489 trace_id,
490 span_id,
491 0, 0, 1000, error,
495 )
496 }
497
498 fn create_test_span_with_metrics(trace_id: u64, span_id: u64, metrics: HashMap<String, f64>) -> DdSpan {
499 let mut metrics_map = saluki_common::collections::FastHashMap::default();
500 for (k, v) in metrics {
501 metrics_map.insert(MetaString::from(k), v);
502 }
503 create_test_span(trace_id, span_id, 0).with_metrics(metrics_map)
504 }
505
506 #[allow(dead_code)]
507 fn create_test_span_with_meta(trace_id: u64, span_id: u64, meta: HashMap<String, String>) -> DdSpan {
508 let mut meta_map = saluki_common::collections::FastHashMap::default();
509 for (k, v) in meta {
510 meta_map.insert(MetaString::from(k), MetaString::from(v));
511 }
512 create_test_span(trace_id, span_id, 0).with_meta(meta_map)
513 }
514
515 fn create_test_trace(spans: Vec<DdSpan>) -> Trace {
516 let tags = TagSet::default();
517 Trace::new(spans, tags)
518 }
519
520 #[test]
521 fn test_user_priority_detection() {
522 let sampler = create_test_sampler();
523
524 let mut metrics = HashMap::new();
526 metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
527 let span = create_test_span_with_metrics(12345, 1, metrics);
528 let trace = create_test_trace(vec![span]);
529 let root_idx = sampler.get_root_span_index(&trace).unwrap();
530
531 assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
532
533 let mut metrics = HashMap::new();
535 metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), -1.0);
536 let span = create_test_span_with_metrics(12345, 1, metrics);
537 let trace = create_test_trace(vec![span]);
538 let root_idx = sampler.get_root_span_index(&trace).unwrap();
539
540 assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(-1));
541
542 let span = create_test_span(12345, 1, 0);
544 let trace = create_test_trace(vec![span]);
545 let root_idx = sampler.get_root_span_index(&trace).unwrap();
546
547 assert_eq!(sampler.get_user_priority(&trace, root_idx), None);
548 }
549
550 #[test]
551 fn test_trace_level_priority_takes_precedence() {
552 let sampler = create_test_sampler();
553
554 let mut metrics_root = HashMap::new();
557 metrics_root.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 0.0);
558 let root_span = create_test_span_with_metrics(12345, 1, metrics_root);
559
560 let mut metrics_later = HashMap::new();
561 metrics_later.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 1.0);
562 let later_span = create_test_span_with_metrics(12345, 2, metrics_later).with_parent_id(1);
563
564 let mut trace = create_test_trace(vec![root_span, later_span]);
565 let root_idx = sampler.get_root_span_index(&trace).unwrap();
566
567 assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(0));
569
570 trace.set_sampling(Some(TraceSampling::new(false, Some(2), None, None)));
572
573 assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
575
576 let span_no_priority = create_test_span(12345, 3, 0);
578 let mut trace_only_trace_level = create_test_trace(vec![span_no_priority]);
579 trace_only_trace_level.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
580 let root_idx = sampler.get_root_span_index(&trace_only_trace_level).unwrap();
581
582 assert_eq!(sampler.get_user_priority(&trace_only_trace_level, root_idx), Some(1));
583 }
584
585 #[test]
586 fn test_manual_keep_with_trace_level_priority() {
587 let mut sampler = create_test_sampler();
588 sampler.probabilistic_sampler_enabled = false; let span = create_test_span(12345, 1, 0);
592 let mut trace = create_test_trace(vec![span]);
593 trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, None)));
594
595 let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
596 assert!(keep);
597 assert_eq!(priority, PRIORITY_USER_KEEP);
598 assert_eq!(decision_maker, "");
599
600 let span = create_test_span(12345, 1, 0);
602 let mut trace = create_test_trace(vec![span]);
603 trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_DROP), None, None)));
604
605 let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
606 assert!(!keep); assert_eq!(priority, PRIORITY_USER_DROP);
608
609 let span = create_test_span(12345, 1, 0);
611 let mut trace = create_test_trace(vec![span]);
612 trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_AUTO_KEEP), None, None)));
613
614 let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
615 assert!(keep);
616 assert_eq!(priority, PRIORITY_AUTO_KEEP);
617 assert_eq!(decision_maker, "");
618 }
619
620 #[test]
621 fn test_probabilistic_sampling_determinism() {
622 let sampler = create_test_sampler();
623
624 let trace_id = 0x1234567890ABCDEF_u64;
626 let result1 = sampler.sample_probabilistic(trace_id);
627 let result2 = sampler.sample_probabilistic(trace_id);
628 assert_eq!(result1, result2);
629 }
630
631 #[test]
632 fn test_error_detection() {
633 let sampler = create_test_sampler();
634
635 let span_with_error = create_test_span(12345, 1, 1);
637 let trace = create_test_trace(vec![span_with_error]);
638 assert!(sampler.trace_contains_error(&trace, false));
639
640 let span_without_error = create_test_span(12345, 1, 0);
642 let trace = create_test_trace(vec![span_without_error]);
643 assert!(!sampler.trace_contains_error(&trace, false));
644 }
645
646 #[test]
647 fn test_sampling_priority_order() {
648 let mut sampler = create_test_sampler();
650 sampler.sampling_rate = 0.5; sampler.probabilistic_sampler_enabled = true;
652
653 let span_with_error = create_test_span(u64::MAX - 1, 1, 1);
656 let mut trace = create_test_trace(vec![span_with_error]);
657
658 let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
659 assert!(keep);
660 assert_eq!(priority, PRIORITY_AUTO_KEEP);
661 assert_eq!(decision_maker, ""); let mut sampler = create_test_sampler();
665 sampler.probabilistic_sampler_enabled = false; let mut metrics = HashMap::new();
668 metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
669 let span = create_test_span_with_metrics(12345, 1, metrics);
670 let mut trace = create_test_trace(vec![span]);
671
672 let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
673 assert!(keep);
674 assert_eq!(priority, 2); assert_eq!(decision_maker, "");
676 }
677
678 #[test]
679 fn test_empty_trace_handling() {
680 let mut sampler = create_test_sampler();
681 let mut trace = create_test_trace(vec![]);
682
683 let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
684 assert!(!keep);
685 assert_eq!(priority, PRIORITY_AUTO_DROP);
686 }
687
688 #[test]
689 fn test_root_span_detection() {
690 let sampler = create_test_sampler();
691
692 let root_span = DdSpan::new(
694 MetaString::from("service"),
695 MetaString::from("operation"),
696 MetaString::from("resource"),
697 MetaString::from("type"),
698 12345,
699 1,
700 0, 0,
702 1000,
703 0,
704 );
705 let child_span = DdSpan::new(
706 MetaString::from("service"),
707 MetaString::from("child_op"),
708 MetaString::from("resource"),
709 MetaString::from("type"),
710 12345,
711 2,
712 1, 100,
714 500,
715 0,
716 );
717 let trace = create_test_trace(vec![child_span.clone(), root_span.clone()]);
719 let root_idx = sampler.get_root_span_index(&trace).unwrap();
720 assert_eq!(trace.spans()[root_idx].span_id(), 1);
721
722 let orphan_span = DdSpan::new(
724 MetaString::from("service"),
725 MetaString::from("orphan"),
726 MetaString::from("resource"),
727 MetaString::from("type"),
728 12345,
729 3,
730 999, 200,
732 300,
733 0,
734 );
735 let trace = create_test_trace(vec![orphan_span]);
736 let root_idx = sampler.get_root_span_index(&trace).unwrap();
737 assert_eq!(trace.spans()[root_idx].span_id(), 3);
738
739 let span1 = create_test_span(12345, 1, 0);
741 let span2 = create_test_span(12345, 2, 0);
742 let trace = create_test_trace(vec![span1, span2]);
743 let root_idx = sampler.get_root_span_index(&trace).unwrap();
745 assert_eq!(trace.spans()[root_idx].span_id(), 2);
746 }
747
748 #[test]
749 fn test_single_span_sampling() {
750 let mut sampler = create_test_sampler();
751
752 sampler.sampling_rate = 0.0; sampler.probabilistic_sampler_enabled = true;
755
756 let mut metrics_map = saluki_common::collections::FastHashMap::default();
758 metrics_map.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0); let sss_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
760
761 let regular_span = create_test_span(12345, 2, 0);
763
764 let mut trace = create_test_trace(vec![sss_span.clone(), regular_span]);
765
766 let modified = sampler.single_span_sampling(&mut trace);
768 assert!(modified);
769 assert_eq!(trace.spans().len(), 1); assert_eq!(trace.spans()[0].span_id(), 1); assert!(trace.sampling().is_some());
774 assert_eq!(trace.sampling().as_ref().unwrap().priority, Some(PRIORITY_USER_KEEP));
775
776 let trace_without_sss = create_test_trace(vec![create_test_span(12345, 3, 0)]);
778 let mut trace_copy = trace_without_sss.clone();
779 let modified = sampler.single_span_sampling(&mut trace_copy);
780 assert!(!modified);
781 assert_eq!(trace_copy.spans().len(), trace_without_sss.spans().len());
782 }
783
784 #[test]
785 fn test_analytics_events() {
786 let sampler = create_test_sampler();
787
788 let mut metrics_map = saluki_common::collections::FastHashMap::default();
790 metrics_map.insert(MetaString::from(KEY_ANALYZED_SPANS), 1.0);
791 let analyzed_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
792 let regular_span = create_test_span(12345, 2, 0);
793
794 let mut trace = create_test_trace(vec![analyzed_span.clone(), regular_span]);
795
796 let analyzed_span_ids: Vec<u64> = trace
797 .spans()
798 .iter()
799 .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
800 .map(|span| span.span_id())
801 .collect();
802 assert_eq!(analyzed_span_ids, vec![1]);
803
804 assert!(sampler.has_analyzed_spans(&trace));
805 let modified = sampler.analyzed_span_sampling(&mut trace);
806 assert!(modified);
807 assert_eq!(trace.spans().len(), 1);
808 assert_eq!(trace.spans()[0].span_id(), 1);
809 assert!(trace.sampling().is_some());
810
811 let trace_no_analytics = create_test_trace(vec![create_test_span(12345, 3, 0)]);
813 let mut trace_no_analytics_copy = trace_no_analytics.clone();
814 let analyzed_span_ids: Vec<u64> = trace_no_analytics
815 .spans()
816 .iter()
817 .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
818 .map(|span| span.span_id())
819 .collect();
820 assert!(analyzed_span_ids.is_empty());
821 assert!(!sampler.has_analyzed_spans(&trace_no_analytics));
822 let modified = sampler.analyzed_span_sampling(&mut trace_no_analytics_copy);
823 assert!(!modified);
824 assert_eq!(trace_no_analytics_copy.spans().len(), trace_no_analytics.spans().len());
825 }
826
827 #[test]
828 fn test_probabilistic_sampling_with_prob_rate_key() {
829 let mut sampler = create_test_sampler();
830 sampler.sampling_rate = 0.75; sampler.probabilistic_sampler_enabled = true;
832
833 let trace_id = 12345_u64;
835 let root_span = DdSpan::new(
836 MetaString::from("service"),
837 MetaString::from("operation"),
838 MetaString::from("resource"),
839 MetaString::from("type"),
840 trace_id,
841 1,
842 0, 0,
844 1000,
845 0,
846 );
847 let mut trace = create_test_trace(vec![root_span]);
848
849 let (keep, priority, decision_maker, root_span_idx) = sampler.run_samplers(&mut trace);
850
851 if keep && decision_maker == DECISION_MAKER_PROBABILISTIC {
852 assert_eq!(priority, PRIORITY_AUTO_KEEP);
854 assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC); let root_idx = root_span_idx.unwrap_or(0);
858 let root_span = &trace.spans()[root_idx];
859 assert!(root_span.metrics().contains_key(PROB_RATE_KEY));
860 assert_eq!(*root_span.metrics().get(PROB_RATE_KEY).unwrap(), 0.75);
861
862 let mut trace_with_metadata = trace.clone();
864 sampler.apply_sampling_metadata(&mut trace_with_metadata, keep, priority, decision_maker, root_idx);
865
866 let modified_root = &trace_with_metadata.spans()[root_idx];
868 assert!(modified_root.meta().contains_key(TAG_DECISION_MAKER));
869 assert_eq!(
870 modified_root.meta().get(TAG_DECISION_MAKER).unwrap(),
871 &MetaString::from(DECISION_MAKER_PROBABILISTIC)
872 );
873 }
874 }
875}