1use async_trait::async_trait;
16use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
17use saluki_common::collections::FastHashMap;
18use saluki_config::GenericConfiguration;
19use saluki_core::{
20 components::{transforms::*, ComponentContext},
21 data_model::event::{
22 trace::{Span, Trace, TraceSampling},
23 Event,
24 },
25 topology::EventsBuffer,
26};
27use saluki_error::GenericError;
28use stringtheory::MetaString;
29use tracing::debug;
30
31mod catalog;
32mod core_sampler;
33mod errors;
34mod priority_sampler;
35mod probabilistic;
36mod score_sampler;
37mod signature;
38
39use self::probabilistic::PROB_RATE_KEY;
40use crate::common::datadog::{
41 apm::ApmConfig, sample_by_rate, DECISION_MAKER_PROBABILISTIC, OTEL_TRACE_ID_META_KEY, SAMPLING_PRIORITY_METRIC_KEY,
42 TAG_DECISION_MAKER,
43};
44use crate::common::otlp::config::TracesConfig;
45
46const PRIORITY_AUTO_DROP: i32 = 0;
48const PRIORITY_AUTO_KEEP: i32 = 1;
49const PRIORITY_USER_KEEP: i32 = 2;
50
51const ERROR_SAMPLE_RATE: f64 = 1.0; const KEY_SPAN_SAMPLING_MECHANISM: &str = "_dd.span_sampling.mechanism";
55const KEY_ANALYZED_SPANS: &str = "_dd.analyzed";
56
57fn normalize_sampling_rate(rate: f64) -> f64 {
60 if rate <= 0.0 || rate >= 1.0 {
61 1.0
62 } else {
63 rate
64 }
65}
66
67#[derive(Debug)]
69pub struct TraceSamplerConfiguration {
70 apm_config: ApmConfig,
71 otlp_sampling_rate: f64,
72}
73
74impl TraceSamplerConfiguration {
75 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
77 let apm_config = ApmConfig::from_configuration(config)?;
78 let otlp_traces: TracesConfig = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
79 let otlp_sampling_rate = normalize_sampling_rate(otlp_traces.probabilistic_sampler.sampling_percentage / 100.0);
80 Ok(Self {
81 apm_config,
82 otlp_sampling_rate,
83 })
84 }
85}
86
87#[async_trait]
88impl SynchronousTransformBuilder for TraceSamplerConfiguration {
89 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
90 let sampler = TraceSampler {
91 sampling_rate: self.apm_config.probabilistic_sampler_sampling_percentage() / 100.0,
92 error_sampling_enabled: self.apm_config.error_sampling_enabled(),
93 error_tracking_standalone: self.apm_config.error_tracking_standalone_enabled(),
94 probabilistic_sampler_enabled: self.apm_config.probabilistic_sampler_enabled(),
95 otlp_sampling_rate: self.otlp_sampling_rate,
96 error_sampler: errors::ErrorsSampler::new(self.apm_config.errors_per_second(), ERROR_SAMPLE_RATE),
97 priority_sampler: priority_sampler::PrioritySampler::new(
98 self.apm_config.default_env().clone(),
99 ERROR_SAMPLE_RATE,
100 self.apm_config.target_traces_per_second(),
101 ),
102 no_priority_sampler: score_sampler::NoPrioritySampler::new(
103 self.apm_config.target_traces_per_second(),
104 ERROR_SAMPLE_RATE,
105 ),
106 };
107
108 Ok(Box::new(sampler))
109 }
110}
111
112impl MemoryBounds for TraceSamplerConfiguration {
113 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
114 builder.minimum().with_single_value::<TraceSampler>("component struct");
115 }
116}
117
118pub struct TraceSampler {
119 sampling_rate: f64,
120 error_tracking_standalone: bool,
121 error_sampling_enabled: bool,
122 probabilistic_sampler_enabled: bool,
123 otlp_sampling_rate: f64,
124 error_sampler: errors::ErrorsSampler,
125 priority_sampler: priority_sampler::PrioritySampler,
126 no_priority_sampler: score_sampler::NoPrioritySampler,
127}
128
129impl TraceSampler {
130 fn get_root_span_index(&self, trace: &Trace) -> Option<usize> {
133 let spans = trace.spans();
135 if spans.is_empty() {
136 return None;
137 }
138 let length = spans.len();
139 let mut parent_id_to_child: FastHashMap<u64, usize> = FastHashMap::default();
145
146 for i in 0..length {
147 let j = length - 1 - i;
150 if spans[j].parent_id() == 0 {
151 return Some(j);
152 }
153 parent_id_to_child.insert(spans[j].parent_id(), j);
154 }
155
156 for span in spans.iter() {
157 parent_id_to_child.remove(&span.span_id());
158 }
159
160 if parent_id_to_child.len() != 1 {
162 debug!(
163 "Didn't reliably find the root span for traceID:{}",
164 &spans[0].trace_id()
165 );
166 }
167
168 if let Some((_, child_idx)) = parent_id_to_child.iter().next() {
171 return Some(*child_idx);
172 }
173
174 Some(length - 1)
176 }
177
178 fn get_user_priority(&self, trace: &Trace, root_span_idx: usize) -> Option<i32> {
180 if let Some(sampling) = trace.sampling() {
182 if let Some(priority) = sampling.priority {
183 return Some(priority);
184 }
185 }
186
187 if trace.spans().is_empty() {
188 return None;
189 }
190
191 if let Some(root) = trace.spans().get(root_span_idx) {
194 if let Some(&p) = root.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) {
195 return Some(p as i32);
196 }
197 }
198 let spans = trace.spans();
199 spans
200 .iter()
201 .find_map(|span| span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY).map(|&p| p as i32))
202 }
203
204 fn sample_probabilistic(&self, trace_id: u64) -> bool {
206 probabilistic::ProbabilisticSampler::sample(trace_id, self.sampling_rate)
207 }
208
209 fn is_otlp_trace(&self, trace: &Trace, root_span_idx: usize) -> bool {
210 trace
211 .spans()
212 .get(root_span_idx)
213 .map(|span| {
214 span.meta()
215 .contains_key(&MetaString::from_static(OTEL_TRACE_ID_META_KEY))
216 })
217 .unwrap_or(false)
218 }
219
220 fn trace_contains_error(&self, trace: &Trace, consider_exception_span_events: bool) -> bool {
222 trace.spans().iter().any(|span| {
223 span.error() != 0 || (consider_exception_span_events && self.span_contains_exception_span_event(span))
224 })
225 }
226
227 fn span_contains_exception_span_event(&self, span: &Span) -> bool {
231 if let Some(has_exception) = span.meta().get("_dd.span_events.has_exception") {
232 return has_exception == "true";
233 }
234 false
235 }
236
237 fn analyzed_span_sampling(&self, trace: &mut Trace) -> bool {
241 let retained = trace.retain_spans(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS));
242 if retained > 0 {
243 let sampling = TraceSampling::new(
245 false,
246 Some(PRIORITY_USER_KEEP),
247 None,
248 Some(MetaString::from(format!("{:.2}", self.sampling_rate))),
249 );
250 trace.set_sampling(Some(sampling));
251 true
252 } else {
253 false
254 }
255 }
256
257 fn has_analyzed_spans(&self, trace: &Trace) -> bool {
259 trace
260 .spans()
261 .iter()
262 .any(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
263 }
264
265 fn single_span_sampling(&self, trace: &mut Trace) -> bool {
268 let retained = trace.retain_spans(|span| span.metrics().contains_key(KEY_SPAN_SAMPLING_MECHANISM));
269 if retained > 0 {
270 let sampling = TraceSampling::new(
272 false,
273 Some(PRIORITY_USER_KEEP),
274 None, Some(MetaString::from(format!("{:.2}", self.sampling_rate))),
276 );
277 trace.set_sampling(Some(sampling));
278 true
279 } else {
280 false
281 }
282 }
283
284 fn run_samplers(&mut self, trace: &mut Trace) -> (bool, i32, &'static str, Option<usize>) {
289 let now = std::time::SystemTime::now();
291 if trace.spans().is_empty() {
293 return (false, PRIORITY_AUTO_DROP, "", None);
294 }
295 let contains_error = self.trace_contains_error(trace, false);
296 let Some(root_span_idx) = self.get_root_span_index(trace) else {
297 return (false, PRIORITY_AUTO_DROP, "", None);
298 };
299
300 if self.probabilistic_sampler_enabled {
302 let mut prob_keep = false;
303 let mut decision_maker = "";
304
305 let root_trace_id = trace.spans()[root_span_idx].trace_id();
307 if self.sample_probabilistic(root_trace_id) {
308 decision_maker = DECISION_MAKER_PROBABILISTIC; prob_keep = true;
310
311 if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
312 let metrics = root_span.metrics_mut();
313 metrics.insert(MetaString::from(PROB_RATE_KEY), self.sampling_rate);
314 }
315 } else if self.error_sampling_enabled && contains_error {
316 prob_keep = self.error_sampler.sample_error(now, trace, root_span_idx);
317 }
318
319 let priority = if prob_keep {
320 PRIORITY_AUTO_KEEP
321 } else {
322 PRIORITY_AUTO_DROP
323 };
324
325 return (prob_keep, priority, decision_maker, Some(root_span_idx));
326 }
327
328 let user_priority = self.get_user_priority(trace, root_span_idx);
329 if let Some(priority) = user_priority {
330 if priority < PRIORITY_AUTO_DROP {
331 return (false, priority, "", Some(root_span_idx));
333 }
334
335 if self.priority_sampler.sample(now, trace, root_span_idx, priority, 0.0) {
336 return (true, priority, "", Some(root_span_idx));
337 }
338 } else if self.is_otlp_trace(trace, root_span_idx) {
339 let root_trace_id = trace.spans()[root_span_idx].trace_id();
341 if sample_by_rate(root_trace_id, self.otlp_sampling_rate) {
342 if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
343 root_span.metrics_mut().remove(PROB_RATE_KEY);
344 }
345 return (
346 true,
347 PRIORITY_AUTO_KEEP,
348 DECISION_MAKER_PROBABILISTIC,
349 Some(root_span_idx),
350 );
351 }
352 } else if self.no_priority_sampler.sample(now, trace, root_span_idx) {
353 return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
354 }
355
356 if self.error_sampling_enabled && contains_error {
357 let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
358 if keep {
359 return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
360 }
361 }
362
363 (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx))
365 }
366
367 fn apply_sampling_metadata(
372 &self, trace: &mut Trace, keep: bool, priority: i32, decision_maker: &str, root_span_idx: usize,
373 ) {
374 let is_otlp = self.is_otlp_trace(trace, root_span_idx);
375 let root_span_value = match trace.spans_mut().get_mut(root_span_idx) {
376 Some(span) => span,
377 None => return,
378 };
379
380 let existing_decision_maker = if decision_maker.is_empty() {
382 root_span_value.meta().get(TAG_DECISION_MAKER).cloned()
383 } else {
384 None
385 };
386 let decision_maker_meta = if decision_maker.is_empty() {
387 existing_decision_maker
388 } else {
389 Some(MetaString::from(decision_maker))
390 };
391
392 let meta = root_span_value.meta_mut();
393 if priority > 0 {
394 if let Some(dm) = decision_maker_meta.as_ref() {
395 meta.insert(MetaString::from(TAG_DECISION_MAKER), dm.clone());
396 }
397 }
398
399 let sampling_rate = if is_otlp {
401 self.otlp_sampling_rate
402 } else {
403 self.sampling_rate
404 };
405 let sampling = TraceSampling::new(
406 !keep,
407 Some(priority),
408 if priority > 0 { decision_maker_meta } else { None },
409 Some(MetaString::from(format!("{:.2}", sampling_rate))),
410 );
411 trace.set_sampling(Some(sampling));
412 }
413
414 fn process_trace(&mut self, trace: &mut Trace) -> bool {
415 let (keep, priority, decision_maker, root_span_idx) = self.run_samplers(trace);
420 if keep {
421 if let Some(root_idx) = root_span_idx {
422 self.apply_sampling_metadata(trace, keep, priority, decision_maker, root_idx);
423 }
424 return true;
425 }
426
427 if self.error_tracking_standalone {
428 return false;
429 }
430
431 let modified = self.single_span_sampling(trace);
434 if !modified {
435 if self.analyzed_span_sampling(trace) {
437 return true;
438 }
439 } else if self.has_analyzed_spans(trace) {
440 debug!(
442 "Detected both analytics events AND single span sampling in the same trace. Single span sampling wins because App Analytics is deprecated."
443 );
444 return true;
445 }
446
447 if modified {
449 return true;
450 }
451
452 debug!("Dropping trace with priority {}", priority);
454 false
455 }
456}
457
458impl SynchronousTransform for TraceSampler {
459 fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
460 buffer.remove_if(|event| match event {
461 Event::Trace(trace) => !self.process_trace(trace),
462 _ => false,
463 });
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use std::collections::HashMap;
470
471 use saluki_context::tags::TagSet;
472 use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
473 const PRIORITY_USER_DROP: i32 = -1;
474
475 use super::*;
476 fn create_test_sampler() -> TraceSampler {
477 TraceSampler {
478 sampling_rate: 1.0,
479 error_sampling_enabled: true,
480 error_tracking_standalone: false,
481 probabilistic_sampler_enabled: true,
482 otlp_sampling_rate: 1.0,
483 error_sampler: errors::ErrorsSampler::new(10.0, 1.0),
484 priority_sampler: priority_sampler::PrioritySampler::new(MetaString::from("agent-env"), 1.0, 10.0),
485 no_priority_sampler: score_sampler::NoPrioritySampler::new(10.0, 1.0),
486 }
487 }
488
489 fn create_test_span(trace_id: u64, span_id: u64, error: i32) -> DdSpan {
490 DdSpan::new(
491 MetaString::from("test-service"),
492 MetaString::from("test-operation"),
493 MetaString::from("test-resource"),
494 MetaString::from("test-type"),
495 trace_id,
496 span_id,
497 0, 0, 1000, error,
501 )
502 }
503
504 fn create_test_span_with_metrics(trace_id: u64, span_id: u64, metrics: HashMap<String, f64>) -> DdSpan {
505 let mut metrics_map = saluki_common::collections::FastHashMap::default();
506 for (k, v) in metrics {
507 metrics_map.insert(MetaString::from(k), v);
508 }
509 create_test_span(trace_id, span_id, 0).with_metrics(metrics_map)
510 }
511
512 #[allow(dead_code)]
513 fn create_test_span_with_meta(trace_id: u64, span_id: u64, meta: HashMap<String, String>) -> DdSpan {
514 let mut meta_map = saluki_common::collections::FastHashMap::default();
515 for (k, v) in meta {
516 meta_map.insert(MetaString::from(k), MetaString::from(v));
517 }
518 create_test_span(trace_id, span_id, 0).with_meta(meta_map)
519 }
520
521 fn create_test_trace(spans: Vec<DdSpan>) -> Trace {
522 let tags = TagSet::default();
523 Trace::new(spans, tags)
524 }
525
526 #[test]
527 fn test_user_priority_detection() {
528 let sampler = create_test_sampler();
529
530 let mut metrics = HashMap::new();
532 metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
533 let span = create_test_span_with_metrics(12345, 1, metrics);
534 let trace = create_test_trace(vec![span]);
535 let root_idx = sampler.get_root_span_index(&trace).unwrap();
536
537 assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
538
539 let mut metrics = HashMap::new();
541 metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), -1.0);
542 let span = create_test_span_with_metrics(12345, 1, metrics);
543 let trace = create_test_trace(vec![span]);
544 let root_idx = sampler.get_root_span_index(&trace).unwrap();
545
546 assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(-1));
547
548 let span = create_test_span(12345, 1, 0);
550 let trace = create_test_trace(vec![span]);
551 let root_idx = sampler.get_root_span_index(&trace).unwrap();
552
553 assert_eq!(sampler.get_user_priority(&trace, root_idx), None);
554 }
555
556 #[test]
557 fn test_trace_level_priority_takes_precedence() {
558 let sampler = create_test_sampler();
559
560 let mut metrics_root = HashMap::new();
563 metrics_root.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 0.0);
564 let root_span = create_test_span_with_metrics(12345, 1, metrics_root);
565
566 let mut metrics_later = HashMap::new();
567 metrics_later.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 1.0);
568 let later_span = create_test_span_with_metrics(12345, 2, metrics_later).with_parent_id(1);
569
570 let mut trace = create_test_trace(vec![root_span, later_span]);
571 let root_idx = sampler.get_root_span_index(&trace).unwrap();
572
573 assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(0));
575
576 trace.set_sampling(Some(TraceSampling::new(false, Some(2), None, None)));
578
579 assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
581
582 let span_no_priority = create_test_span(12345, 3, 0);
584 let mut trace_only_trace_level = create_test_trace(vec![span_no_priority]);
585 trace_only_trace_level.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
586 let root_idx = sampler.get_root_span_index(&trace_only_trace_level).unwrap();
587
588 assert_eq!(sampler.get_user_priority(&trace_only_trace_level, root_idx), Some(1));
589 }
590
591 #[test]
592 fn test_manual_keep_with_trace_level_priority() {
593 let mut sampler = create_test_sampler();
594 sampler.probabilistic_sampler_enabled = false; let span = create_test_span(12345, 1, 0);
598 let mut trace = create_test_trace(vec![span]);
599 trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, None)));
600
601 let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
602 assert!(keep);
603 assert_eq!(priority, PRIORITY_USER_KEEP);
604 assert_eq!(decision_maker, "");
605
606 let span = create_test_span(12345, 1, 0);
608 let mut trace = create_test_trace(vec![span]);
609 trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_DROP), None, None)));
610
611 let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
612 assert!(!keep); assert_eq!(priority, PRIORITY_USER_DROP);
614
615 let span = create_test_span(12345, 1, 0);
617 let mut trace = create_test_trace(vec![span]);
618 trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_AUTO_KEEP), None, None)));
619
620 let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
621 assert!(keep);
622 assert_eq!(priority, PRIORITY_AUTO_KEEP);
623 assert_eq!(decision_maker, "");
624 }
625
626 #[test]
627 fn test_probabilistic_sampling_determinism() {
628 let sampler = create_test_sampler();
629
630 let trace_id = 0x1234567890ABCDEF_u64;
632 let result1 = sampler.sample_probabilistic(trace_id);
633 let result2 = sampler.sample_probabilistic(trace_id);
634 assert_eq!(result1, result2);
635 }
636
637 #[test]
638 fn test_error_detection() {
639 let sampler = create_test_sampler();
640
641 let span_with_error = create_test_span(12345, 1, 1);
643 let trace = create_test_trace(vec![span_with_error]);
644 assert!(sampler.trace_contains_error(&trace, false));
645
646 let span_without_error = create_test_span(12345, 1, 0);
648 let trace = create_test_trace(vec![span_without_error]);
649 assert!(!sampler.trace_contains_error(&trace, false));
650 }
651
652 #[test]
653 fn test_sampling_priority_order() {
654 let mut sampler = create_test_sampler();
656 sampler.sampling_rate = 0.5; sampler.probabilistic_sampler_enabled = true;
658
659 let span_with_error = create_test_span(u64::MAX - 1, 1, 1);
662 let mut trace = create_test_trace(vec![span_with_error]);
663
664 let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
665 assert!(keep);
666 assert_eq!(priority, PRIORITY_AUTO_KEEP);
667 assert_eq!(decision_maker, ""); let mut sampler = create_test_sampler();
671 sampler.probabilistic_sampler_enabled = false; let mut metrics = HashMap::new();
674 metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
675 let span = create_test_span_with_metrics(12345, 1, metrics);
676 let mut trace = create_test_trace(vec![span]);
677
678 let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
679 assert!(keep);
680 assert_eq!(priority, 2); assert_eq!(decision_maker, "");
682 }
683
684 #[test]
685 fn test_empty_trace_handling() {
686 let mut sampler = create_test_sampler();
687 let mut trace = create_test_trace(vec![]);
688
689 let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
690 assert!(!keep);
691 assert_eq!(priority, PRIORITY_AUTO_DROP);
692 }
693
694 #[test]
695 fn test_root_span_detection() {
696 let sampler = create_test_sampler();
697
698 let root_span = DdSpan::new(
700 MetaString::from("service"),
701 MetaString::from("operation"),
702 MetaString::from("resource"),
703 MetaString::from("type"),
704 12345,
705 1,
706 0, 0,
708 1000,
709 0,
710 );
711 let child_span = DdSpan::new(
712 MetaString::from("service"),
713 MetaString::from("child_op"),
714 MetaString::from("resource"),
715 MetaString::from("type"),
716 12345,
717 2,
718 1, 100,
720 500,
721 0,
722 );
723 let trace = create_test_trace(vec![child_span.clone(), root_span.clone()]);
725 let root_idx = sampler.get_root_span_index(&trace).unwrap();
726 assert_eq!(trace.spans()[root_idx].span_id(), 1);
727
728 let orphan_span = DdSpan::new(
730 MetaString::from("service"),
731 MetaString::from("orphan"),
732 MetaString::from("resource"),
733 MetaString::from("type"),
734 12345,
735 3,
736 999, 200,
738 300,
739 0,
740 );
741 let trace = create_test_trace(vec![orphan_span]);
742 let root_idx = sampler.get_root_span_index(&trace).unwrap();
743 assert_eq!(trace.spans()[root_idx].span_id(), 3);
744
745 let span1 = create_test_span(12345, 1, 0);
747 let span2 = create_test_span(12345, 2, 0);
748 let trace = create_test_trace(vec![span1, span2]);
749 let root_idx = sampler.get_root_span_index(&trace).unwrap();
751 assert_eq!(trace.spans()[root_idx].span_id(), 2);
752 }
753
754 #[test]
755 fn test_single_span_sampling() {
756 let mut sampler = create_test_sampler();
757
758 sampler.sampling_rate = 0.0; sampler.probabilistic_sampler_enabled = true;
761
762 let mut metrics_map = saluki_common::collections::FastHashMap::default();
764 metrics_map.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0); let sss_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
766
767 let regular_span = create_test_span(12345, 2, 0);
769
770 let mut trace = create_test_trace(vec![sss_span.clone(), regular_span]);
771
772 let modified = sampler.single_span_sampling(&mut trace);
774 assert!(modified);
775 assert_eq!(trace.spans().len(), 1); assert_eq!(trace.spans()[0].span_id(), 1); assert!(trace.sampling().is_some());
780 assert_eq!(trace.sampling().as_ref().unwrap().priority, Some(PRIORITY_USER_KEEP));
781
782 let trace_without_sss = create_test_trace(vec![create_test_span(12345, 3, 0)]);
784 let mut trace_copy = trace_without_sss.clone();
785 let modified = sampler.single_span_sampling(&mut trace_copy);
786 assert!(!modified);
787 assert_eq!(trace_copy.spans().len(), trace_without_sss.spans().len());
788 }
789
790 #[test]
791 fn test_analytics_events() {
792 let sampler = create_test_sampler();
793
794 let mut metrics_map = saluki_common::collections::FastHashMap::default();
796 metrics_map.insert(MetaString::from(KEY_ANALYZED_SPANS), 1.0);
797 let analyzed_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
798 let regular_span = create_test_span(12345, 2, 0);
799
800 let mut trace = create_test_trace(vec![analyzed_span.clone(), regular_span]);
801
802 let analyzed_span_ids: Vec<u64> = trace
803 .spans()
804 .iter()
805 .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
806 .map(|span| span.span_id())
807 .collect();
808 assert_eq!(analyzed_span_ids, vec![1]);
809
810 assert!(sampler.has_analyzed_spans(&trace));
811 let modified = sampler.analyzed_span_sampling(&mut trace);
812 assert!(modified);
813 assert_eq!(trace.spans().len(), 1);
814 assert_eq!(trace.spans()[0].span_id(), 1);
815 assert!(trace.sampling().is_some());
816
817 let trace_no_analytics = create_test_trace(vec![create_test_span(12345, 3, 0)]);
819 let mut trace_no_analytics_copy = trace_no_analytics.clone();
820 let analyzed_span_ids: Vec<u64> = trace_no_analytics
821 .spans()
822 .iter()
823 .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
824 .map(|span| span.span_id())
825 .collect();
826 assert!(analyzed_span_ids.is_empty());
827 assert!(!sampler.has_analyzed_spans(&trace_no_analytics));
828 let modified = sampler.analyzed_span_sampling(&mut trace_no_analytics_copy);
829 assert!(!modified);
830 assert_eq!(trace_no_analytics_copy.spans().len(), trace_no_analytics.spans().len());
831 }
832
833 #[test]
834 fn test_probabilistic_sampling_with_prob_rate_key() {
835 let mut sampler = create_test_sampler();
836 sampler.sampling_rate = 0.75; sampler.probabilistic_sampler_enabled = true;
838
839 let trace_id = 12345_u64;
841 let root_span = DdSpan::new(
842 MetaString::from("service"),
843 MetaString::from("operation"),
844 MetaString::from("resource"),
845 MetaString::from("type"),
846 trace_id,
847 1,
848 0, 0,
850 1000,
851 0,
852 );
853 let mut trace = create_test_trace(vec![root_span]);
854
855 let (keep, priority, decision_maker, root_span_idx) = sampler.run_samplers(&mut trace);
856
857 if keep && decision_maker == DECISION_MAKER_PROBABILISTIC {
858 assert_eq!(priority, PRIORITY_AUTO_KEEP);
860 assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC); let root_idx = root_span_idx.unwrap_or(0);
864 let root_span = &trace.spans()[root_idx];
865 assert!(root_span.metrics().contains_key(PROB_RATE_KEY));
866 assert_eq!(*root_span.metrics().get(PROB_RATE_KEY).unwrap(), 0.75);
867
868 let mut trace_with_metadata = trace.clone();
870 sampler.apply_sampling_metadata(&mut trace_with_metadata, keep, priority, decision_maker, root_idx);
871
872 let modified_root = &trace_with_metadata.spans()[root_idx];
874 assert!(modified_root.meta().contains_key(TAG_DECISION_MAKER));
875 assert_eq!(
876 modified_root.meta().get(TAG_DECISION_MAKER).unwrap(),
877 &MetaString::from(DECISION_MAKER_PROBABILISTIC)
878 );
879 }
880 }
881}