Skip to main content

saluki_components/transforms/trace_sampler/
mod.rs

1//! Trace sampling transform.
2//!
3//! This transform implements agent-side head sampling for traces, supporting:
4//! - Probabilistic sampling based on trace ID
5//! - User-set priority preservation
6//! - Error-based sampling as a safety net
7//! - OTLP trace ingestion with proper sampling decision handling
8//!
9//! # Missing
10//!
11//! add trace metrics: datadog-agent/pkg/trace/sampler/metrics.go
12//! adding missing samplers (priority, nopriority, rare)
13//! add error tracking standalone mode
14
15use async_trait::async_trait;
16use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
17use saluki_common::collections::FastHashMap;
18use saluki_config::GenericConfiguration;
19use saluki_core::{
20    components::{transforms::*, ComponentContext},
21    data_model::event::{
22        trace::{Span, Trace, TraceSampling},
23        Event,
24    },
25    topology::EventsBuffer,
26};
27use saluki_error::GenericError;
28use stringtheory::MetaString;
29use tracing::debug;
30
31mod catalog;
32mod core_sampler;
33mod errors;
34mod priority_sampler;
35mod probabilistic;
36mod score_sampler;
37mod signature;
38
39use self::probabilistic::PROB_RATE_KEY;
40use crate::common::datadog::{
41    apm::ApmConfig, sample_by_rate, DECISION_MAKER_PROBABILISTIC, OTEL_TRACE_ID_META_KEY, SAMPLING_PRIORITY_METRIC_KEY,
42    TAG_DECISION_MAKER,
43};
44use crate::common::otlp::config::TracesConfig;
45
46// Sampling priority constants (matching datadog-agent)
47const PRIORITY_AUTO_DROP: i32 = 0;
48const PRIORITY_AUTO_KEEP: i32 = 1;
49const PRIORITY_USER_KEEP: i32 = 2;
50
51const ERROR_SAMPLE_RATE: f64 = 1.0; // Default extra sample rate (matches agent's ExtraSampleRate)
52
53// Single Span Sampling and Analytics Events keys
54const KEY_SPAN_SAMPLING_MECHANISM: &str = "_dd.span_sampling.mechanism";
55const KEY_ANALYZED_SPANS: &str = "_dd.analyzed";
56
57// Decision maker values for `_dd.p.dm` (matching datadog-agent).
58
59fn normalize_sampling_rate(rate: f64) -> f64 {
60    if rate <= 0.0 || rate >= 1.0 {
61        1.0
62    } else {
63        rate
64    }
65}
66
67/// Configuration for the trace sampler transform.
68#[derive(Debug)]
69pub struct TraceSamplerConfiguration {
70    apm_config: ApmConfig,
71    otlp_sampling_rate: f64,
72}
73
74impl TraceSamplerConfiguration {
75    /// Creates a new `TraceSamplerConfiguration` from the given configuration.
76    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
77        let apm_config = ApmConfig::from_configuration(config)?;
78        let otlp_traces: TracesConfig = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
79        let otlp_sampling_rate = normalize_sampling_rate(otlp_traces.probabilistic_sampler.sampling_percentage / 100.0);
80        Ok(Self {
81            apm_config,
82            otlp_sampling_rate,
83        })
84    }
85}
86
87#[async_trait]
88impl SynchronousTransformBuilder for TraceSamplerConfiguration {
89    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
90        let sampler = TraceSampler {
91            sampling_rate: self.apm_config.probabilistic_sampler_sampling_percentage() / 100.0,
92            error_sampling_enabled: self.apm_config.error_sampling_enabled(),
93            error_tracking_standalone: self.apm_config.error_tracking_standalone_enabled(),
94            probabilistic_sampler_enabled: self.apm_config.probabilistic_sampler_enabled(),
95            otlp_sampling_rate: self.otlp_sampling_rate,
96            error_sampler: errors::ErrorsSampler::new(self.apm_config.errors_per_second(), ERROR_SAMPLE_RATE),
97            priority_sampler: priority_sampler::PrioritySampler::new(
98                self.apm_config.default_env().clone(),
99                ERROR_SAMPLE_RATE,
100                self.apm_config.target_traces_per_second(),
101            ),
102            no_priority_sampler: score_sampler::NoPrioritySampler::new(
103                self.apm_config.target_traces_per_second(),
104                ERROR_SAMPLE_RATE,
105            ),
106        };
107
108        Ok(Box::new(sampler))
109    }
110}
111
112impl MemoryBounds for TraceSamplerConfiguration {
113    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
114        builder.minimum().with_single_value::<TraceSampler>("component struct");
115    }
116}
117
118pub struct TraceSampler {
119    sampling_rate: f64,
120    error_tracking_standalone: bool,
121    error_sampling_enabled: bool,
122    probabilistic_sampler_enabled: bool,
123    otlp_sampling_rate: f64,
124    error_sampler: errors::ErrorsSampler,
125    priority_sampler: priority_sampler::PrioritySampler,
126    no_priority_sampler: score_sampler::NoPrioritySampler,
127}
128
129impl TraceSampler {
130    // TODO: merge this with the other duplicate "find root span of trace" functions
131    /// Find the root span index of a trace.
132    fn get_root_span_index(&self, trace: &Trace) -> Option<usize> {
133        // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/traceutil/trace.go#L36
134        let spans = trace.spans();
135        if spans.is_empty() {
136            return None;
137        }
138        let length = spans.len();
139        // General case: go over all spans and check for one without a matching parent.
140        // This intentionally mirrors `datadog-agent/pkg/trace/traceutil/trace.go:GetRoot`:
141        // - Fast-path: return the last span with `parent_id == 0` (some clients report the root last)
142        // - Otherwise: build a map of `parent_id -> child_span_index`, delete entries whose parent
143        //   exists in the trace, and pick any remaining "orphan" child span.
144        let mut parent_id_to_child: FastHashMap<u64, usize> = FastHashMap::default();
145
146        for i in 0..length {
147            // Common case optimization: check for span with parent_id == 0, starting from the end,
148            // since some clients report the root last.
149            let j = length - 1 - i;
150            if spans[j].parent_id() == 0 {
151                return Some(j);
152            }
153            parent_id_to_child.insert(spans[j].parent_id(), j);
154        }
155
156        for span in spans.iter() {
157            parent_id_to_child.remove(&span.span_id());
158        }
159
160        // Here, if the trace is valid, we should have `len(parent_id_to_child) == 1`.
161        if parent_id_to_child.len() != 1 {
162            debug!(
163                "Didn't reliably find the root span for traceID:{}",
164                &spans[0].trace_id()
165            );
166        }
167
168        // Have a safe behavior if that's not the case.
169        // Pick a random span without its parent.
170        if let Some((_, child_idx)) = parent_id_to_child.iter().next() {
171            return Some(*child_idx);
172        }
173
174        // Gracefully fail with the last span of the trace.
175        Some(length - 1)
176    }
177
178    /// Check for user-set sampling priority in trace
179    fn get_user_priority(&self, trace: &Trace, root_span_idx: usize) -> Option<i32> {
180        // First check trace-level sampling priority (last-seen priority from OTLP ingest)
181        if let Some(sampling) = trace.sampling() {
182            if let Some(priority) = sampling.priority {
183                return Some(priority);
184            }
185        }
186
187        if trace.spans().is_empty() {
188            return None;
189        }
190
191        // Fall back to checking spans (for compatibility with non-OTLP traces)
192        // Prefer the root span (common case), but fall back to scanning all spans to be robust to ordering.
193        if let Some(root) = trace.spans().get(root_span_idx) {
194            if let Some(&p) = root.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) {
195                return Some(p as i32);
196            }
197        }
198        let spans = trace.spans();
199        spans
200            .iter()
201            .find_map(|span| span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY).map(|&p| p as i32))
202    }
203
204    /// Returns `true` if the given trace ID should be probabilistically sampled.
205    fn sample_probabilistic(&self, trace_id: u64) -> bool {
206        probabilistic::ProbabilisticSampler::sample(trace_id, self.sampling_rate)
207    }
208
209    fn is_otlp_trace(&self, trace: &Trace, root_span_idx: usize) -> bool {
210        trace
211            .spans()
212            .get(root_span_idx)
213            .map(|span| {
214                span.meta()
215                    .contains_key(&MetaString::from_static(OTEL_TRACE_ID_META_KEY))
216            })
217            .unwrap_or(false)
218    }
219
220    /// Returns `true` if the trace contains a span with an error.
221    fn trace_contains_error(&self, trace: &Trace, consider_exception_span_events: bool) -> bool {
222        trace.spans().iter().any(|span| {
223            span.error() != 0 || (consider_exception_span_events && self.span_contains_exception_span_event(span))
224        })
225    }
226
227    /// Returns `true` if the span has exception span events.
228    ///
229    /// This checks for the `_dd.span_events.has_exception` meta field set to `"true"`.
230    fn span_contains_exception_span_event(&self, span: &Span) -> bool {
231        if let Some(has_exception) = span.meta().get("_dd.span_events.has_exception") {
232            return has_exception == "true";
233        }
234        false
235    }
236
237    /// Apply analyzed span sampling to the trace.
238    ///
239    /// Returns `true` if the trace was modified.
240    fn analyzed_span_sampling(&self, trace: &mut Trace) -> bool {
241        let retained = trace.retain_spans(|_, span| span.metrics().contains_key(KEY_ANALYZED_SPANS));
242        if retained > 0 {
243            // Mark trace as kept with high priority
244            let sampling = TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, Some(self.sampling_rate));
245            trace.set_sampling(Some(sampling));
246            true
247        } else {
248            false
249        }
250    }
251
252    /// Returns `true` if the given trace has any analyzed spans.
253    fn has_analyzed_spans(&self, trace: &Trace) -> bool {
254        trace
255            .spans()
256            .iter()
257            .any(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
258    }
259
260    /// Apply Single Span Sampling to the trace
261    /// Returns true if the trace was modified
262    fn single_span_sampling(&self, trace: &mut Trace) -> bool {
263        let retained = trace.retain_spans(|_, span| span.metrics().contains_key(KEY_SPAN_SAMPLING_MECHANISM));
264        if retained > 0 {
265            // Set high priority and mark as kept
266            let sampling = TraceSampling::new(
267                false,
268                Some(PRIORITY_USER_KEEP),
269                None, // No decision maker for SSS
270                Some(self.sampling_rate),
271            );
272            trace.set_sampling(Some(sampling));
273            true
274        } else {
275            false
276        }
277    }
278
279    /// Evaluates the given trace against all configured samplers.
280    ///
281    /// Return a tuple containing whether or not the trace should be kept, the decision maker tag (which sampler is responsible),
282    /// and the index of the root span used for evaluation.
283    fn run_samplers(&mut self, trace: &mut Trace) -> (bool, i32, &'static str, Option<usize>) {
284        // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1066
285        let now = std::time::SystemTime::now();
286        // Empty trace check
287        if trace.spans().is_empty() {
288            return (false, PRIORITY_AUTO_DROP, "", None);
289        }
290        let contains_error = self.trace_contains_error(trace, false);
291        let Some(root_span_idx) = self.get_root_span_index(trace) else {
292            return (false, PRIORITY_AUTO_DROP, "", None);
293        };
294
295        // Modern path: ProbabilisticSamplerEnabled = true
296        if self.probabilistic_sampler_enabled {
297            let mut prob_keep = false;
298            let mut decision_maker = "";
299
300            // Run probabilistic sampler - use root span's trace ID
301            let root_trace_id = trace.spans()[root_span_idx].trace_id();
302            if self.sample_probabilistic(root_trace_id) {
303                decision_maker = DECISION_MAKER_PROBABILISTIC; // probabilistic sampling
304                prob_keep = true;
305
306                if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
307                    let metrics = root_span.metrics_mut();
308                    metrics.insert(MetaString::from(PROB_RATE_KEY), self.sampling_rate);
309                }
310            } else if self.error_sampling_enabled && contains_error {
311                prob_keep = self.error_sampler.sample_error(now, trace, root_span_idx);
312            }
313
314            let priority = if prob_keep {
315                PRIORITY_AUTO_KEEP
316            } else {
317                PRIORITY_AUTO_DROP
318            };
319
320            return (prob_keep, priority, decision_maker, Some(root_span_idx));
321        }
322
323        let user_priority = self.get_user_priority(trace, root_span_idx);
324        if let Some(priority) = user_priority {
325            if priority < PRIORITY_AUTO_DROP {
326                // Manual drop: short-circuit and skip other samplers.
327                return (false, priority, "", Some(root_span_idx));
328            }
329
330            if self.priority_sampler.sample(now, trace, root_span_idx, priority, 0.0) {
331                return (true, priority, "", Some(root_span_idx));
332            }
333        } else if self.is_otlp_trace(trace, root_span_idx) {
334            // some sampling happens upstream in the otlp receiver in the agent: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L572
335            let root_trace_id = trace.spans()[root_span_idx].trace_id();
336            if sample_by_rate(root_trace_id, self.otlp_sampling_rate) {
337                if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
338                    root_span.metrics_mut().remove(PROB_RATE_KEY);
339                }
340                return (
341                    true,
342                    PRIORITY_AUTO_KEEP,
343                    DECISION_MAKER_PROBABILISTIC,
344                    Some(root_span_idx),
345                );
346            }
347        } else if self.no_priority_sampler.sample(now, trace, root_span_idx) {
348            return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
349        }
350
351        if self.error_sampling_enabled && contains_error {
352            let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
353            if keep {
354                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
355            }
356        }
357
358        // Default: drop the trace
359        (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx))
360    }
361
362    /// Apply sampling metadata to the trace in-place.
363    ///
364    /// The `root_span_id` parameter identifies which span should receive the sampling metadata.
365    /// This avoids recalculating the root span since it was already found in `run_samplers`.
366    fn apply_sampling_metadata(
367        &self, trace: &mut Trace, keep: bool, priority: i32, decision_maker: &str, root_span_idx: usize,
368    ) {
369        let is_otlp = self.is_otlp_trace(trace, root_span_idx);
370        let root_span_value = match trace.spans_mut().get_mut(root_span_idx) {
371            Some(span) => span,
372            None => return,
373        };
374
375        // Add tag for the decision maker
376        let existing_decision_maker = if decision_maker.is_empty() {
377            root_span_value.meta().get(TAG_DECISION_MAKER).cloned()
378        } else {
379            None
380        };
381        let decision_maker_meta = if decision_maker.is_empty() {
382            existing_decision_maker
383        } else {
384            Some(MetaString::from(decision_maker))
385        };
386
387        let meta = root_span_value.meta_mut();
388        if priority > 0 {
389            if let Some(dm) = decision_maker_meta.as_ref() {
390                meta.insert(MetaString::from(TAG_DECISION_MAKER), dm.clone());
391            }
392        }
393
394        // Now we can use trace again to set sampling metadata.
395        let sampling = TraceSampling::new(
396            !keep,
397            Some(priority),
398            if priority > 0 { decision_maker_meta } else { None },
399            Some(if is_otlp {
400                self.otlp_sampling_rate
401            } else {
402                self.sampling_rate
403            }),
404        );
405        trace.set_sampling(Some(sampling));
406    }
407
408    fn process_trace(&mut self, trace: &mut Trace) -> bool {
409        // keep is a boolean that indicates if the trace should be kept or dropped
410        // priority is the sampling priority
411        // decision_maker is the tag that indicates the decision maker (probabilistic, error, etc.)
412        // root_span_idx is the index of the root span of the trace
413        let (keep, priority, decision_maker, root_span_idx) = self.run_samplers(trace);
414        if keep {
415            if let Some(root_idx) = root_span_idx {
416                self.apply_sampling_metadata(trace, keep, priority, decision_maker, root_idx);
417            }
418            return true;
419        }
420
421        if self.error_tracking_standalone {
422            return false;
423        }
424
425        // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L980-L990
426        // try single span sampling (keeps spans marked for sampling when trace would be dropped)
427        let modified = self.single_span_sampling(trace);
428        if !modified {
429            // Fall back to analytics events if no SSS spans
430            if self.analyzed_span_sampling(trace) {
431                return true;
432            }
433        } else if self.has_analyzed_spans(trace) {
434            // Warn about both SSS and analytics events
435            debug!(
436                "Detected both analytics events AND single span sampling in the same trace. Single span sampling wins because App Analytics is deprecated."
437            );
438            return true;
439        }
440
441        // If we modified the trace with SSS, send it
442        if modified {
443            return true;
444        }
445
446        // Neither SSS nor analytics events found, drop the trace
447        debug!("Dropping trace with priority {}", priority);
448        false
449    }
450}
451
452impl SynchronousTransform for TraceSampler {
453    fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
454        buffer.remove_if(|event| match event {
455            Event::Trace(trace) => !self.process_trace(trace),
456            _ => false,
457        });
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::collections::HashMap;
464
465    use saluki_context::tags::TagSet;
466    use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
467    const PRIORITY_USER_DROP: i32 = -1;
468
469    use super::*;
470    fn create_test_sampler() -> TraceSampler {
471        TraceSampler {
472            sampling_rate: 1.0,
473            error_sampling_enabled: true,
474            error_tracking_standalone: false,
475            probabilistic_sampler_enabled: true,
476            otlp_sampling_rate: 1.0,
477            error_sampler: errors::ErrorsSampler::new(10.0, 1.0),
478            priority_sampler: priority_sampler::PrioritySampler::new(MetaString::from("agent-env"), 1.0, 10.0),
479            no_priority_sampler: score_sampler::NoPrioritySampler::new(10.0, 1.0),
480        }
481    }
482
483    fn create_test_span(trace_id: u64, span_id: u64, error: i32) -> DdSpan {
484        DdSpan::new(
485            MetaString::from("test-service"),
486            MetaString::from("test-operation"),
487            MetaString::from("test-resource"),
488            MetaString::from("test-type"),
489            trace_id,
490            span_id,
491            0,    // parent_id
492            0,    // start
493            1000, // duration
494            error,
495        )
496    }
497
498    fn create_test_span_with_metrics(trace_id: u64, span_id: u64, metrics: HashMap<String, f64>) -> DdSpan {
499        let mut metrics_map = saluki_common::collections::FastHashMap::default();
500        for (k, v) in metrics {
501            metrics_map.insert(MetaString::from(k), v);
502        }
503        create_test_span(trace_id, span_id, 0).with_metrics(metrics_map)
504    }
505
506    #[allow(dead_code)]
507    fn create_test_span_with_meta(trace_id: u64, span_id: u64, meta: HashMap<String, String>) -> DdSpan {
508        let mut meta_map = saluki_common::collections::FastHashMap::default();
509        for (k, v) in meta {
510            meta_map.insert(MetaString::from(k), MetaString::from(v));
511        }
512        create_test_span(trace_id, span_id, 0).with_meta(meta_map)
513    }
514
515    fn create_test_trace(spans: Vec<DdSpan>) -> Trace {
516        let tags = TagSet::default();
517        Trace::new(spans, tags)
518    }
519
520    #[test]
521    fn test_user_priority_detection() {
522        let sampler = create_test_sampler();
523
524        // Test trace with user-set priority = 2 (UserKeep)
525        let mut metrics = HashMap::new();
526        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
527        let span = create_test_span_with_metrics(12345, 1, metrics);
528        let trace = create_test_trace(vec![span]);
529        let root_idx = sampler.get_root_span_index(&trace).unwrap();
530
531        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
532
533        // Test trace with user-set priority = -1 (UserDrop)
534        let mut metrics = HashMap::new();
535        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), -1.0);
536        let span = create_test_span_with_metrics(12345, 1, metrics);
537        let trace = create_test_trace(vec![span]);
538        let root_idx = sampler.get_root_span_index(&trace).unwrap();
539
540        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(-1));
541
542        // Test trace without user priority
543        let span = create_test_span(12345, 1, 0);
544        let trace = create_test_trace(vec![span]);
545        let root_idx = sampler.get_root_span_index(&trace).unwrap();
546
547        assert_eq!(sampler.get_user_priority(&trace, root_idx), None);
548    }
549
550    #[test]
551    fn test_trace_level_priority_takes_precedence() {
552        let sampler = create_test_sampler();
553
554        // Test trace-level priority overrides span priorities (last-seen priority)
555        // Create spans with different priorities - root has 0, later span has 2
556        let mut metrics_root = HashMap::new();
557        metrics_root.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 0.0);
558        let root_span = create_test_span_with_metrics(12345, 1, metrics_root);
559
560        let mut metrics_later = HashMap::new();
561        metrics_later.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 1.0);
562        let later_span = create_test_span_with_metrics(12345, 2, metrics_later).with_parent_id(1);
563
564        let mut trace = create_test_trace(vec![root_span, later_span]);
565        let root_idx = sampler.get_root_span_index(&trace).unwrap();
566
567        // Without trace-level priority, should get priority from root (0)
568        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(0));
569
570        // Now set trace-level priority to 2 (simulating last-seen priority from OTLP translator)
571        trace.set_sampling(Some(TraceSampling::new(false, Some(2), None, None)));
572
573        // Trace-level priority should take precedence
574        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
575
576        // Test that trace-level priority is used even when no span has priority
577        let span_no_priority = create_test_span(12345, 3, 0);
578        let mut trace_only_trace_level = create_test_trace(vec![span_no_priority]);
579        trace_only_trace_level.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
580        let root_idx = sampler.get_root_span_index(&trace_only_trace_level).unwrap();
581
582        assert_eq!(sampler.get_user_priority(&trace_only_trace_level, root_idx), Some(1));
583    }
584
585    #[test]
586    fn test_manual_keep_with_trace_level_priority() {
587        let mut sampler = create_test_sampler();
588        sampler.probabilistic_sampler_enabled = false; // Use legacy path that checks user priority
589
590        // Test that manual keep (priority = 2) works via trace-level priority
591        let span = create_test_span(12345, 1, 0);
592        let mut trace = create_test_trace(vec![span]);
593        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, None)));
594
595        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
596        assert!(keep);
597        assert_eq!(priority, PRIORITY_USER_KEEP);
598        assert_eq!(decision_maker, "");
599
600        // Test manual drop (priority = -1) via trace-level priority
601        let span = create_test_span(12345, 1, 0);
602        let mut trace = create_test_trace(vec![span]);
603        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_DROP), None, None)));
604
605        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
606        assert!(!keep); // Should not keep when user drops
607        assert_eq!(priority, PRIORITY_USER_DROP);
608
609        // Test that priority = 1 (auto keep) via trace-level is also respected
610        let span = create_test_span(12345, 1, 0);
611        let mut trace = create_test_trace(vec![span]);
612        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_AUTO_KEEP), None, None)));
613
614        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
615        assert!(keep);
616        assert_eq!(priority, PRIORITY_AUTO_KEEP);
617        assert_eq!(decision_maker, "");
618    }
619
620    #[test]
621    fn test_probabilistic_sampling_determinism() {
622        let sampler = create_test_sampler();
623
624        // Same trace ID should always produce same decision
625        let trace_id = 0x1234567890ABCDEF_u64;
626        let result1 = sampler.sample_probabilistic(trace_id);
627        let result2 = sampler.sample_probabilistic(trace_id);
628        assert_eq!(result1, result2);
629    }
630
631    #[test]
632    fn test_error_detection() {
633        let sampler = create_test_sampler();
634
635        // Test trace with error field set
636        let span_with_error = create_test_span(12345, 1, 1);
637        let trace = create_test_trace(vec![span_with_error]);
638        assert!(sampler.trace_contains_error(&trace, false));
639
640        // Test trace without error
641        let span_without_error = create_test_span(12345, 1, 0);
642        let trace = create_test_trace(vec![span_without_error]);
643        assert!(!sampler.trace_contains_error(&trace, false));
644    }
645
646    #[test]
647    fn test_sampling_priority_order() {
648        // Test modern path: error sampler overrides probabilistic drop
649        let mut sampler = create_test_sampler();
650        sampler.sampling_rate = 0.5; // 50% sampling rate
651        sampler.probabilistic_sampler_enabled = true;
652
653        // Create trace with error that would be dropped by probabilistic
654        // Using a trace ID that we know will be dropped at 50% rate
655        let span_with_error = create_test_span(u64::MAX - 1, 1, 1);
656        let mut trace = create_test_trace(vec![span_with_error]);
657
658        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
659        assert!(keep);
660        assert_eq!(priority, PRIORITY_AUTO_KEEP);
661        assert_eq!(decision_maker, ""); // Error sampler doesn't set decision_maker
662
663        // Test legacy path: user priority is respected
664        let mut sampler = create_test_sampler();
665        sampler.probabilistic_sampler_enabled = false; // Use legacy path
666
667        let mut metrics = HashMap::new();
668        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
669        let span = create_test_span_with_metrics(12345, 1, metrics);
670        let mut trace = create_test_trace(vec![span]);
671
672        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
673        assert!(keep);
674        assert_eq!(priority, 2); // UserKeep
675        assert_eq!(decision_maker, "");
676    }
677
678    #[test]
679    fn test_empty_trace_handling() {
680        let mut sampler = create_test_sampler();
681        let mut trace = create_test_trace(vec![]);
682
683        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
684        assert!(!keep);
685        assert_eq!(priority, PRIORITY_AUTO_DROP);
686    }
687
688    #[test]
689    fn test_root_span_detection() {
690        let sampler = create_test_sampler();
691
692        // Test 1: Root span with parent_id = 0 (common case)
693        let root_span = DdSpan::new(
694            MetaString::from("service"),
695            MetaString::from("operation"),
696            MetaString::from("resource"),
697            MetaString::from("type"),
698            12345,
699            1,
700            0, // parent_id = 0 indicates root
701            0,
702            1000,
703            0,
704        );
705        let child_span = DdSpan::new(
706            MetaString::from("service"),
707            MetaString::from("child_op"),
708            MetaString::from("resource"),
709            MetaString::from("type"),
710            12345,
711            2,
712            1, // parent_id = 1 (points to root)
713            100,
714            500,
715            0,
716        );
717        // Put root span second to test that we find it even when not first
718        let trace = create_test_trace(vec![child_span.clone(), root_span.clone()]);
719        let root_idx = sampler.get_root_span_index(&trace).unwrap();
720        assert_eq!(trace.spans()[root_idx].span_id(), 1);
721
722        // Test 2: Orphaned span (parent not in trace)
723        let orphan_span = DdSpan::new(
724            MetaString::from("service"),
725            MetaString::from("orphan"),
726            MetaString::from("resource"),
727            MetaString::from("type"),
728            12345,
729            3,
730            999, // parent_id = 999 (doesn't exist in trace)
731            200,
732            300,
733            0,
734        );
735        let trace = create_test_trace(vec![orphan_span]);
736        let root_idx = sampler.get_root_span_index(&trace).unwrap();
737        assert_eq!(trace.spans()[root_idx].span_id(), 3);
738
739        // Test 3: Multiple root candidates: should return the last one found (index 1)
740        let span1 = create_test_span(12345, 1, 0);
741        let span2 = create_test_span(12345, 2, 0);
742        let trace = create_test_trace(vec![span1, span2]);
743        // Both have parent_id = 0, should return the last one found (span_id = 2)
744        let root_idx = sampler.get_root_span_index(&trace).unwrap();
745        assert_eq!(trace.spans()[root_idx].span_id(), 2);
746    }
747
748    #[test]
749    fn test_single_span_sampling() {
750        let mut sampler = create_test_sampler();
751
752        // Test 1: Trace with SSS tags should be kept even when probabilistic would drop it
753        sampler.sampling_rate = 0.0; // 0% sampling rate - should drop everything
754        sampler.probabilistic_sampler_enabled = true;
755
756        // Create span with SSS metric
757        let mut metrics_map = saluki_common::collections::FastHashMap::default();
758        metrics_map.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0); // Any value
759        let sss_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
760
761        // Create regular span without SSS
762        let regular_span = create_test_span(12345, 2, 0);
763
764        let mut trace = create_test_trace(vec![sss_span.clone(), regular_span]);
765
766        // Apply SSS
767        let modified = sampler.single_span_sampling(&mut trace);
768        assert!(modified);
769        assert_eq!(trace.spans().len(), 1); // Only SSS span kept
770        assert_eq!(trace.spans()[0].span_id(), 1); // It's the SSS span
771
772        // Check that trace has been marked as kept with high priority
773        assert!(trace.sampling().is_some());
774        assert_eq!(trace.sampling().as_ref().unwrap().priority, Some(PRIORITY_USER_KEEP));
775
776        // Test 2: Trace without SSS tags should not be modified
777        let trace_without_sss = create_test_trace(vec![create_test_span(12345, 3, 0)]);
778        let mut trace_copy = trace_without_sss.clone();
779        let modified = sampler.single_span_sampling(&mut trace_copy);
780        assert!(!modified);
781        assert_eq!(trace_copy.spans().len(), trace_without_sss.spans().len());
782    }
783
784    #[test]
785    fn test_analytics_events() {
786        let sampler = create_test_sampler();
787
788        // Test 1: Trace with analyzed spans
789        let mut metrics_map = saluki_common::collections::FastHashMap::default();
790        metrics_map.insert(MetaString::from(KEY_ANALYZED_SPANS), 1.0);
791        let analyzed_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
792        let regular_span = create_test_span(12345, 2, 0);
793
794        let mut trace = create_test_trace(vec![analyzed_span.clone(), regular_span]);
795
796        let analyzed_span_ids: Vec<u64> = trace
797            .spans()
798            .iter()
799            .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
800            .map(|span| span.span_id())
801            .collect();
802        assert_eq!(analyzed_span_ids, vec![1]);
803
804        assert!(sampler.has_analyzed_spans(&trace));
805        let modified = sampler.analyzed_span_sampling(&mut trace);
806        assert!(modified);
807        assert_eq!(trace.spans().len(), 1);
808        assert_eq!(trace.spans()[0].span_id(), 1);
809        assert!(trace.sampling().is_some());
810
811        // Test 2: Trace without analyzed spans
812        let trace_no_analytics = create_test_trace(vec![create_test_span(12345, 3, 0)]);
813        let mut trace_no_analytics_copy = trace_no_analytics.clone();
814        let analyzed_span_ids: Vec<u64> = trace_no_analytics
815            .spans()
816            .iter()
817            .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
818            .map(|span| span.span_id())
819            .collect();
820        assert!(analyzed_span_ids.is_empty());
821        assert!(!sampler.has_analyzed_spans(&trace_no_analytics));
822        let modified = sampler.analyzed_span_sampling(&mut trace_no_analytics_copy);
823        assert!(!modified);
824        assert_eq!(trace_no_analytics_copy.spans().len(), trace_no_analytics.spans().len());
825    }
826
827    #[test]
828    fn test_probabilistic_sampling_with_prob_rate_key() {
829        let mut sampler = create_test_sampler();
830        sampler.sampling_rate = 0.75; // 75% sampling rate
831        sampler.probabilistic_sampler_enabled = true;
832
833        // Use a trace ID that we know will be sampled
834        let trace_id = 12345_u64;
835        let root_span = DdSpan::new(
836            MetaString::from("service"),
837            MetaString::from("operation"),
838            MetaString::from("resource"),
839            MetaString::from("type"),
840            trace_id,
841            1,
842            0, // parent_id = 0 indicates root
843            0,
844            1000,
845            0,
846        );
847        let mut trace = create_test_trace(vec![root_span]);
848
849        let (keep, priority, decision_maker, root_span_idx) = sampler.run_samplers(&mut trace);
850
851        if keep && decision_maker == DECISION_MAKER_PROBABILISTIC {
852            // If sampled probabilistically, check that probRateKey was already added
853            assert_eq!(priority, PRIORITY_AUTO_KEEP);
854            assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC); // probabilistic sampling marker
855
856            // Check that the root span already has the probRateKey (it should have been added in run_samplers)
857            let root_idx = root_span_idx.unwrap_or(0);
858            let root_span = &trace.spans()[root_idx];
859            assert!(root_span.metrics().contains_key(PROB_RATE_KEY));
860            assert_eq!(*root_span.metrics().get(PROB_RATE_KEY).unwrap(), 0.75);
861
862            // Test that apply_sampling_metadata still works correctly for other metadata
863            let mut trace_with_metadata = trace.clone();
864            sampler.apply_sampling_metadata(&mut trace_with_metadata, keep, priority, decision_maker, root_idx);
865
866            // Check that decision maker tag was added
867            let modified_root = &trace_with_metadata.spans()[root_idx];
868            assert!(modified_root.meta().contains_key(TAG_DECISION_MAKER));
869            assert_eq!(
870                modified_root.meta().get(TAG_DECISION_MAKER).unwrap(),
871                &MetaString::from(DECISION_MAKER_PROBABILISTIC)
872            );
873        }
874    }
875}