Skip to main content

saluki_components/transforms/trace_sampler/
mod.rs

1//! Trace sampling transform.
2//!
3//! This transform implements agent-side head sampling for traces, supporting:
4//! - Probabilistic sampling based on trace ID
5//! - User-set priority preservation
6//! - Error-based sampling as a safety net
7//! - OTLP trace ingestion with proper sampling decision handling
8//!
9//! # Missing
10//!
11//! add trace metrics: datadog-agent/pkg/trace/sampler/metrics.go
12//! adding missing samplers (priority, nopriority, rare)
13//! add error tracking standalone mode
14
15use async_trait::async_trait;
16use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
17use saluki_common::collections::FastHashMap;
18use saluki_config::GenericConfiguration;
19use saluki_core::{
20    components::{transforms::*, ComponentContext},
21    data_model::event::{
22        trace::{Span, Trace, TraceSampling},
23        Event,
24    },
25    topology::EventsBuffer,
26};
27use saluki_error::GenericError;
28use stringtheory::MetaString;
29use tracing::debug;
30
31mod catalog;
32mod core_sampler;
33mod errors;
34mod priority_sampler;
35mod probabilistic;
36mod score_sampler;
37mod signature;
38
39use self::probabilistic::PROB_RATE_KEY;
40use crate::common::datadog::{
41    apm::ApmConfig, sample_by_rate, DECISION_MAKER_PROBABILISTIC, OTEL_TRACE_ID_META_KEY, SAMPLING_PRIORITY_METRIC_KEY,
42    TAG_DECISION_MAKER,
43};
44use crate::common::otlp::config::TracesConfig;
45
46// Sampling priority constants (matching datadog-agent)
47const PRIORITY_AUTO_DROP: i32 = 0;
48const PRIORITY_AUTO_KEEP: i32 = 1;
49const PRIORITY_USER_KEEP: i32 = 2;
50
51const ERROR_SAMPLE_RATE: f64 = 1.0; // Default extra sample rate (matches agent's ExtraSampleRate)
52
53// Single Span Sampling and Analytics Events keys
54const KEY_SPAN_SAMPLING_MECHANISM: &str = "_dd.span_sampling.mechanism";
55const KEY_ANALYZED_SPANS: &str = "_dd.analyzed";
56
57// Decision maker values for `_dd.p.dm` (matching datadog-agent).
58
59fn normalize_sampling_rate(rate: f64) -> f64 {
60    if rate <= 0.0 || rate >= 1.0 {
61        1.0
62    } else {
63        rate
64    }
65}
66
67/// Configuration for the trace sampler transform.
68#[derive(Debug)]
69pub struct TraceSamplerConfiguration {
70    apm_config: ApmConfig,
71    otlp_sampling_rate: f64,
72}
73
74impl TraceSamplerConfiguration {
75    /// Creates a new `TraceSamplerConfiguration` from the given configuration.
76    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
77        let apm_config = ApmConfig::from_configuration(config)?;
78        let otlp_traces: TracesConfig = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
79        let otlp_sampling_rate = normalize_sampling_rate(otlp_traces.probabilistic_sampler.sampling_percentage / 100.0);
80        Ok(Self {
81            apm_config,
82            otlp_sampling_rate,
83        })
84    }
85}
86
87#[async_trait]
88impl SynchronousTransformBuilder for TraceSamplerConfiguration {
89    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
90        let sampler = TraceSampler {
91            sampling_rate: self.apm_config.probabilistic_sampler_sampling_percentage() / 100.0,
92            error_sampling_enabled: self.apm_config.error_sampling_enabled(),
93            error_tracking_standalone: self.apm_config.error_tracking_standalone_enabled(),
94            probabilistic_sampler_enabled: self.apm_config.probabilistic_sampler_enabled(),
95            otlp_sampling_rate: self.otlp_sampling_rate,
96            error_sampler: errors::ErrorsSampler::new(self.apm_config.errors_per_second(), ERROR_SAMPLE_RATE),
97            priority_sampler: priority_sampler::PrioritySampler::new(
98                self.apm_config.default_env().clone(),
99                ERROR_SAMPLE_RATE,
100                self.apm_config.target_traces_per_second(),
101            ),
102            no_priority_sampler: score_sampler::NoPrioritySampler::new(
103                self.apm_config.target_traces_per_second(),
104                ERROR_SAMPLE_RATE,
105            ),
106        };
107
108        Ok(Box::new(sampler))
109    }
110}
111
112impl MemoryBounds for TraceSamplerConfiguration {
113    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
114        builder.minimum().with_single_value::<TraceSampler>("component struct");
115    }
116}
117
118pub struct TraceSampler {
119    sampling_rate: f64,
120    error_tracking_standalone: bool,
121    error_sampling_enabled: bool,
122    probabilistic_sampler_enabled: bool,
123    otlp_sampling_rate: f64,
124    error_sampler: errors::ErrorsSampler,
125    priority_sampler: priority_sampler::PrioritySampler,
126    no_priority_sampler: score_sampler::NoPrioritySampler,
127}
128
129impl TraceSampler {
130    // TODO: merge this with the other duplicate "find root span of trace" functions
131    /// Find the root span index of a trace.
132    fn get_root_span_index(&self, trace: &Trace) -> Option<usize> {
133        // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/traceutil/trace.go#L36
134        let spans = trace.spans();
135        if spans.is_empty() {
136            return None;
137        }
138        let length = spans.len();
139        // General case: go over all spans and check for one without a matching parent.
140        // This intentionally mirrors `datadog-agent/pkg/trace/traceutil/trace.go:GetRoot`:
141        // - Fast-path: return the last span with `parent_id == 0` (some clients report the root last)
142        // - Otherwise: build a map of `parent_id -> child_span_index`, delete entries whose parent
143        //   exists in the trace, and pick any remaining "orphan" child span.
144        let mut parent_id_to_child: FastHashMap<u64, usize> = FastHashMap::default();
145
146        for i in 0..length {
147            // Common case optimization: check for span with parent_id == 0, starting from the end,
148            // since some clients report the root last.
149            let j = length - 1 - i;
150            if spans[j].parent_id() == 0 {
151                return Some(j);
152            }
153            parent_id_to_child.insert(spans[j].parent_id(), j);
154        }
155
156        for span in spans.iter() {
157            parent_id_to_child.remove(&span.span_id());
158        }
159
160        // Here, if the trace is valid, we should have `len(parent_id_to_child) == 1`.
161        if parent_id_to_child.len() != 1 {
162            debug!(
163                "Didn't reliably find the root span for traceID:{}",
164                &spans[0].trace_id()
165            );
166        }
167
168        // Have a safe behavior if that's not the case.
169        // Pick a random span without its parent.
170        if let Some((_, child_idx)) = parent_id_to_child.iter().next() {
171            return Some(*child_idx);
172        }
173
174        // Gracefully fail with the last span of the trace.
175        Some(length - 1)
176    }
177
178    /// Check for user-set sampling priority in trace
179    fn get_user_priority(&self, trace: &Trace, root_span_idx: usize) -> Option<i32> {
180        // First check trace-level sampling priority (last-seen priority from OTLP ingest)
181        if let Some(sampling) = trace.sampling() {
182            if let Some(priority) = sampling.priority {
183                return Some(priority);
184            }
185        }
186
187        if trace.spans().is_empty() {
188            return None;
189        }
190
191        // Fall back to checking spans (for compatibility with non-OTLP traces)
192        // Prefer the root span (common case), but fall back to scanning all spans to be robust to ordering.
193        if let Some(root) = trace.spans().get(root_span_idx) {
194            if let Some(&p) = root.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) {
195                return Some(p as i32);
196            }
197        }
198        let spans = trace.spans();
199        spans
200            .iter()
201            .find_map(|span| span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY).map(|&p| p as i32))
202    }
203
204    /// Returns `true` if the given trace ID should be probabilistically sampled.
205    fn sample_probabilistic(&self, trace_id: u64) -> bool {
206        probabilistic::ProbabilisticSampler::sample(trace_id, self.sampling_rate)
207    }
208
209    fn is_otlp_trace(&self, trace: &Trace, root_span_idx: usize) -> bool {
210        trace
211            .spans()
212            .get(root_span_idx)
213            .map(|span| {
214                span.meta()
215                    .contains_key(&MetaString::from_static(OTEL_TRACE_ID_META_KEY))
216            })
217            .unwrap_or(false)
218    }
219
220    /// Returns `true` if the trace contains a span with an error.
221    fn trace_contains_error(&self, trace: &Trace, consider_exception_span_events: bool) -> bool {
222        trace.spans().iter().any(|span| {
223            span.error() != 0 || (consider_exception_span_events && self.span_contains_exception_span_event(span))
224        })
225    }
226
227    /// Returns `true` if the span has exception span events.
228    ///
229    /// This checks for the `_dd.span_events.has_exception` meta field set to `"true"`.
230    fn span_contains_exception_span_event(&self, span: &Span) -> bool {
231        if let Some(has_exception) = span.meta().get("_dd.span_events.has_exception") {
232            return has_exception == "true";
233        }
234        false
235    }
236
237    /// Apply analyzed span sampling to the trace.
238    ///
239    /// Returns `true` if the trace was modified.
240    fn analyzed_span_sampling(&self, trace: &mut Trace) -> bool {
241        let retained = trace.retain_spans(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS));
242        if retained > 0 {
243            // Mark trace as kept with high priority
244            let sampling = TraceSampling::new(
245                false,
246                Some(PRIORITY_USER_KEEP),
247                None,
248                Some(MetaString::from(format!("{:.2}", self.sampling_rate))),
249            );
250            trace.set_sampling(Some(sampling));
251            true
252        } else {
253            false
254        }
255    }
256
257    /// Returns `true` if the given trace has any analyzed spans.
258    fn has_analyzed_spans(&self, trace: &Trace) -> bool {
259        trace
260            .spans()
261            .iter()
262            .any(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
263    }
264
265    /// Apply Single Span Sampling to the trace
266    /// Returns true if the trace was modified
267    fn single_span_sampling(&self, trace: &mut Trace) -> bool {
268        let retained = trace.retain_spans(|span| span.metrics().contains_key(KEY_SPAN_SAMPLING_MECHANISM));
269        if retained > 0 {
270            // Set high priority and mark as kept
271            let sampling = TraceSampling::new(
272                false,
273                Some(PRIORITY_USER_KEEP),
274                None, // No decision maker for SSS
275                Some(MetaString::from(format!("{:.2}", self.sampling_rate))),
276            );
277            trace.set_sampling(Some(sampling));
278            true
279        } else {
280            false
281        }
282    }
283
284    /// Evaluates the given trace against all configured samplers.
285    ///
286    /// Return a tuple containing whether or not the trace should be kept, the decision maker tag (which sampler is responsible),
287    /// and the index of the root span used for evaluation.
288    fn run_samplers(&mut self, trace: &mut Trace) -> (bool, i32, &'static str, Option<usize>) {
289        // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1066
290        let now = std::time::SystemTime::now();
291        // Empty trace check
292        if trace.spans().is_empty() {
293            return (false, PRIORITY_AUTO_DROP, "", None);
294        }
295        let contains_error = self.trace_contains_error(trace, false);
296        let Some(root_span_idx) = self.get_root_span_index(trace) else {
297            return (false, PRIORITY_AUTO_DROP, "", None);
298        };
299
300        // Modern path: ProbabilisticSamplerEnabled = true
301        if self.probabilistic_sampler_enabled {
302            let mut prob_keep = false;
303            let mut decision_maker = "";
304
305            // Run probabilistic sampler - use root span's trace ID
306            let root_trace_id = trace.spans()[root_span_idx].trace_id();
307            if self.sample_probabilistic(root_trace_id) {
308                decision_maker = DECISION_MAKER_PROBABILISTIC; // probabilistic sampling
309                prob_keep = true;
310
311                if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
312                    let metrics = root_span.metrics_mut();
313                    metrics.insert(MetaString::from(PROB_RATE_KEY), self.sampling_rate);
314                }
315            } else if self.error_sampling_enabled && contains_error {
316                prob_keep = self.error_sampler.sample_error(now, trace, root_span_idx);
317            }
318
319            let priority = if prob_keep {
320                PRIORITY_AUTO_KEEP
321            } else {
322                PRIORITY_AUTO_DROP
323            };
324
325            return (prob_keep, priority, decision_maker, Some(root_span_idx));
326        }
327
328        let user_priority = self.get_user_priority(trace, root_span_idx);
329        if let Some(priority) = user_priority {
330            if priority < PRIORITY_AUTO_DROP {
331                // Manual drop: short-circuit and skip other samplers.
332                return (false, priority, "", Some(root_span_idx));
333            }
334
335            if self.priority_sampler.sample(now, trace, root_span_idx, priority, 0.0) {
336                return (true, priority, "", Some(root_span_idx));
337            }
338        } else if self.is_otlp_trace(trace, root_span_idx) {
339            // some sampling happens upstream in the otlp receiver in the agent: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L572
340            let root_trace_id = trace.spans()[root_span_idx].trace_id();
341            if sample_by_rate(root_trace_id, self.otlp_sampling_rate) {
342                if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
343                    root_span.metrics_mut().remove(PROB_RATE_KEY);
344                }
345                return (
346                    true,
347                    PRIORITY_AUTO_KEEP,
348                    DECISION_MAKER_PROBABILISTIC,
349                    Some(root_span_idx),
350                );
351            }
352        } else if self.no_priority_sampler.sample(now, trace, root_span_idx) {
353            return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
354        }
355
356        if self.error_sampling_enabled && contains_error {
357            let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
358            if keep {
359                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
360            }
361        }
362
363        // Default: drop the trace
364        (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx))
365    }
366
367    /// Apply sampling metadata to the trace in-place.
368    ///
369    /// The `root_span_id` parameter identifies which span should receive the sampling metadata.
370    /// This avoids recalculating the root span since it was already found in `run_samplers`.
371    fn apply_sampling_metadata(
372        &self, trace: &mut Trace, keep: bool, priority: i32, decision_maker: &str, root_span_idx: usize,
373    ) {
374        let is_otlp = self.is_otlp_trace(trace, root_span_idx);
375        let root_span_value = match trace.spans_mut().get_mut(root_span_idx) {
376            Some(span) => span,
377            None => return,
378        };
379
380        // Add tag for the decision maker
381        let existing_decision_maker = if decision_maker.is_empty() {
382            root_span_value.meta().get(TAG_DECISION_MAKER).cloned()
383        } else {
384            None
385        };
386        let decision_maker_meta = if decision_maker.is_empty() {
387            existing_decision_maker
388        } else {
389            Some(MetaString::from(decision_maker))
390        };
391
392        let meta = root_span_value.meta_mut();
393        if priority > 0 {
394            if let Some(dm) = decision_maker_meta.as_ref() {
395                meta.insert(MetaString::from(TAG_DECISION_MAKER), dm.clone());
396            }
397        }
398
399        // Now we can use trace again to set sampling metadata
400        let sampling_rate = if is_otlp {
401            self.otlp_sampling_rate
402        } else {
403            self.sampling_rate
404        };
405        let sampling = TraceSampling::new(
406            !keep,
407            Some(priority),
408            if priority > 0 { decision_maker_meta } else { None },
409            Some(MetaString::from(format!("{:.2}", sampling_rate))),
410        );
411        trace.set_sampling(Some(sampling));
412    }
413
414    fn process_trace(&mut self, trace: &mut Trace) -> bool {
415        // keep is a boolean that indicates if the trace should be kept or dropped
416        // priority is the sampling priority
417        // decision_maker is the tag that indicates the decision maker (probabilistic, error, etc.)
418        // root_span_idx is the index of the root span of the trace
419        let (keep, priority, decision_maker, root_span_idx) = self.run_samplers(trace);
420        if keep {
421            if let Some(root_idx) = root_span_idx {
422                self.apply_sampling_metadata(trace, keep, priority, decision_maker, root_idx);
423            }
424            return true;
425        }
426
427        if self.error_tracking_standalone {
428            return false;
429        }
430
431        // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L980-L990
432        // try single span sampling (keeps spans marked for sampling when trace would be dropped)
433        let modified = self.single_span_sampling(trace);
434        if !modified {
435            // Fall back to analytics events if no SSS spans
436            if self.analyzed_span_sampling(trace) {
437                return true;
438            }
439        } else if self.has_analyzed_spans(trace) {
440            // Warn about both SSS and analytics events
441            debug!(
442                "Detected both analytics events AND single span sampling in the same trace. Single span sampling wins because App Analytics is deprecated."
443            );
444            return true;
445        }
446
447        // If we modified the trace with SSS, send it
448        if modified {
449            return true;
450        }
451
452        // Neither SSS nor analytics events found, drop the trace
453        debug!("Dropping trace with priority {}", priority);
454        false
455    }
456}
457
458impl SynchronousTransform for TraceSampler {
459    fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
460        buffer.remove_if(|event| match event {
461            Event::Trace(trace) => !self.process_trace(trace),
462            _ => false,
463        });
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use std::collections::HashMap;
470
471    use saluki_context::tags::TagSet;
472    use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
473    const PRIORITY_USER_DROP: i32 = -1;
474
475    use super::*;
476    fn create_test_sampler() -> TraceSampler {
477        TraceSampler {
478            sampling_rate: 1.0,
479            error_sampling_enabled: true,
480            error_tracking_standalone: false,
481            probabilistic_sampler_enabled: true,
482            otlp_sampling_rate: 1.0,
483            error_sampler: errors::ErrorsSampler::new(10.0, 1.0),
484            priority_sampler: priority_sampler::PrioritySampler::new(MetaString::from("agent-env"), 1.0, 10.0),
485            no_priority_sampler: score_sampler::NoPrioritySampler::new(10.0, 1.0),
486        }
487    }
488
489    fn create_test_span(trace_id: u64, span_id: u64, error: i32) -> DdSpan {
490        DdSpan::new(
491            MetaString::from("test-service"),
492            MetaString::from("test-operation"),
493            MetaString::from("test-resource"),
494            MetaString::from("test-type"),
495            trace_id,
496            span_id,
497            0,    // parent_id
498            0,    // start
499            1000, // duration
500            error,
501        )
502    }
503
504    fn create_test_span_with_metrics(trace_id: u64, span_id: u64, metrics: HashMap<String, f64>) -> DdSpan {
505        let mut metrics_map = saluki_common::collections::FastHashMap::default();
506        for (k, v) in metrics {
507            metrics_map.insert(MetaString::from(k), v);
508        }
509        create_test_span(trace_id, span_id, 0).with_metrics(metrics_map)
510    }
511
512    #[allow(dead_code)]
513    fn create_test_span_with_meta(trace_id: u64, span_id: u64, meta: HashMap<String, String>) -> DdSpan {
514        let mut meta_map = saluki_common::collections::FastHashMap::default();
515        for (k, v) in meta {
516            meta_map.insert(MetaString::from(k), MetaString::from(v));
517        }
518        create_test_span(trace_id, span_id, 0).with_meta(meta_map)
519    }
520
521    fn create_test_trace(spans: Vec<DdSpan>) -> Trace {
522        let tags = TagSet::default();
523        Trace::new(spans, tags)
524    }
525
526    #[test]
527    fn test_user_priority_detection() {
528        let sampler = create_test_sampler();
529
530        // Test trace with user-set priority = 2 (UserKeep)
531        let mut metrics = HashMap::new();
532        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
533        let span = create_test_span_with_metrics(12345, 1, metrics);
534        let trace = create_test_trace(vec![span]);
535        let root_idx = sampler.get_root_span_index(&trace).unwrap();
536
537        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
538
539        // Test trace with user-set priority = -1 (UserDrop)
540        let mut metrics = HashMap::new();
541        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), -1.0);
542        let span = create_test_span_with_metrics(12345, 1, metrics);
543        let trace = create_test_trace(vec![span]);
544        let root_idx = sampler.get_root_span_index(&trace).unwrap();
545
546        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(-1));
547
548        // Test trace without user priority
549        let span = create_test_span(12345, 1, 0);
550        let trace = create_test_trace(vec![span]);
551        let root_idx = sampler.get_root_span_index(&trace).unwrap();
552
553        assert_eq!(sampler.get_user_priority(&trace, root_idx), None);
554    }
555
556    #[test]
557    fn test_trace_level_priority_takes_precedence() {
558        let sampler = create_test_sampler();
559
560        // Test trace-level priority overrides span priorities (last-seen priority)
561        // Create spans with different priorities - root has 0, later span has 2
562        let mut metrics_root = HashMap::new();
563        metrics_root.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 0.0);
564        let root_span = create_test_span_with_metrics(12345, 1, metrics_root);
565
566        let mut metrics_later = HashMap::new();
567        metrics_later.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 1.0);
568        let later_span = create_test_span_with_metrics(12345, 2, metrics_later).with_parent_id(1);
569
570        let mut trace = create_test_trace(vec![root_span, later_span]);
571        let root_idx = sampler.get_root_span_index(&trace).unwrap();
572
573        // Without trace-level priority, should get priority from root (0)
574        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(0));
575
576        // Now set trace-level priority to 2 (simulating last-seen priority from OTLP translator)
577        trace.set_sampling(Some(TraceSampling::new(false, Some(2), None, None)));
578
579        // Trace-level priority should take precedence
580        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
581
582        // Test that trace-level priority is used even when no span has priority
583        let span_no_priority = create_test_span(12345, 3, 0);
584        let mut trace_only_trace_level = create_test_trace(vec![span_no_priority]);
585        trace_only_trace_level.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
586        let root_idx = sampler.get_root_span_index(&trace_only_trace_level).unwrap();
587
588        assert_eq!(sampler.get_user_priority(&trace_only_trace_level, root_idx), Some(1));
589    }
590
591    #[test]
592    fn test_manual_keep_with_trace_level_priority() {
593        let mut sampler = create_test_sampler();
594        sampler.probabilistic_sampler_enabled = false; // Use legacy path that checks user priority
595
596        // Test that manual keep (priority = 2) works via trace-level priority
597        let span = create_test_span(12345, 1, 0);
598        let mut trace = create_test_trace(vec![span]);
599        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, None)));
600
601        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
602        assert!(keep);
603        assert_eq!(priority, PRIORITY_USER_KEEP);
604        assert_eq!(decision_maker, "");
605
606        // Test manual drop (priority = -1) via trace-level priority
607        let span = create_test_span(12345, 1, 0);
608        let mut trace = create_test_trace(vec![span]);
609        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_DROP), None, None)));
610
611        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
612        assert!(!keep); // Should not keep when user drops
613        assert_eq!(priority, PRIORITY_USER_DROP);
614
615        // Test that priority = 1 (auto keep) via trace-level is also respected
616        let span = create_test_span(12345, 1, 0);
617        let mut trace = create_test_trace(vec![span]);
618        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_AUTO_KEEP), None, None)));
619
620        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
621        assert!(keep);
622        assert_eq!(priority, PRIORITY_AUTO_KEEP);
623        assert_eq!(decision_maker, "");
624    }
625
626    #[test]
627    fn test_probabilistic_sampling_determinism() {
628        let sampler = create_test_sampler();
629
630        // Same trace ID should always produce same decision
631        let trace_id = 0x1234567890ABCDEF_u64;
632        let result1 = sampler.sample_probabilistic(trace_id);
633        let result2 = sampler.sample_probabilistic(trace_id);
634        assert_eq!(result1, result2);
635    }
636
637    #[test]
638    fn test_error_detection() {
639        let sampler = create_test_sampler();
640
641        // Test trace with error field set
642        let span_with_error = create_test_span(12345, 1, 1);
643        let trace = create_test_trace(vec![span_with_error]);
644        assert!(sampler.trace_contains_error(&trace, false));
645
646        // Test trace without error
647        let span_without_error = create_test_span(12345, 1, 0);
648        let trace = create_test_trace(vec![span_without_error]);
649        assert!(!sampler.trace_contains_error(&trace, false));
650    }
651
652    #[test]
653    fn test_sampling_priority_order() {
654        // Test modern path: error sampler overrides probabilistic drop
655        let mut sampler = create_test_sampler();
656        sampler.sampling_rate = 0.5; // 50% sampling rate
657        sampler.probabilistic_sampler_enabled = true;
658
659        // Create trace with error that would be dropped by probabilistic
660        // Using a trace ID that we know will be dropped at 50% rate
661        let span_with_error = create_test_span(u64::MAX - 1, 1, 1);
662        let mut trace = create_test_trace(vec![span_with_error]);
663
664        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
665        assert!(keep);
666        assert_eq!(priority, PRIORITY_AUTO_KEEP);
667        assert_eq!(decision_maker, ""); // Error sampler doesn't set decision_maker
668
669        // Test legacy path: user priority is respected
670        let mut sampler = create_test_sampler();
671        sampler.probabilistic_sampler_enabled = false; // Use legacy path
672
673        let mut metrics = HashMap::new();
674        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
675        let span = create_test_span_with_metrics(12345, 1, metrics);
676        let mut trace = create_test_trace(vec![span]);
677
678        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
679        assert!(keep);
680        assert_eq!(priority, 2); // UserKeep
681        assert_eq!(decision_maker, "");
682    }
683
684    #[test]
685    fn test_empty_trace_handling() {
686        let mut sampler = create_test_sampler();
687        let mut trace = create_test_trace(vec![]);
688
689        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
690        assert!(!keep);
691        assert_eq!(priority, PRIORITY_AUTO_DROP);
692    }
693
694    #[test]
695    fn test_root_span_detection() {
696        let sampler = create_test_sampler();
697
698        // Test 1: Root span with parent_id = 0 (common case)
699        let root_span = DdSpan::new(
700            MetaString::from("service"),
701            MetaString::from("operation"),
702            MetaString::from("resource"),
703            MetaString::from("type"),
704            12345,
705            1,
706            0, // parent_id = 0 indicates root
707            0,
708            1000,
709            0,
710        );
711        let child_span = DdSpan::new(
712            MetaString::from("service"),
713            MetaString::from("child_op"),
714            MetaString::from("resource"),
715            MetaString::from("type"),
716            12345,
717            2,
718            1, // parent_id = 1 (points to root)
719            100,
720            500,
721            0,
722        );
723        // Put root span second to test that we find it even when not first
724        let trace = create_test_trace(vec![child_span.clone(), root_span.clone()]);
725        let root_idx = sampler.get_root_span_index(&trace).unwrap();
726        assert_eq!(trace.spans()[root_idx].span_id(), 1);
727
728        // Test 2: Orphaned span (parent not in trace)
729        let orphan_span = DdSpan::new(
730            MetaString::from("service"),
731            MetaString::from("orphan"),
732            MetaString::from("resource"),
733            MetaString::from("type"),
734            12345,
735            3,
736            999, // parent_id = 999 (doesn't exist in trace)
737            200,
738            300,
739            0,
740        );
741        let trace = create_test_trace(vec![orphan_span]);
742        let root_idx = sampler.get_root_span_index(&trace).unwrap();
743        assert_eq!(trace.spans()[root_idx].span_id(), 3);
744
745        // Test 3: Multiple root candidates: should return the last one found (index 1)
746        let span1 = create_test_span(12345, 1, 0);
747        let span2 = create_test_span(12345, 2, 0);
748        let trace = create_test_trace(vec![span1, span2]);
749        // Both have parent_id = 0, should return the last one found (span_id = 2)
750        let root_idx = sampler.get_root_span_index(&trace).unwrap();
751        assert_eq!(trace.spans()[root_idx].span_id(), 2);
752    }
753
754    #[test]
755    fn test_single_span_sampling() {
756        let mut sampler = create_test_sampler();
757
758        // Test 1: Trace with SSS tags should be kept even when probabilistic would drop it
759        sampler.sampling_rate = 0.0; // 0% sampling rate - should drop everything
760        sampler.probabilistic_sampler_enabled = true;
761
762        // Create span with SSS metric
763        let mut metrics_map = saluki_common::collections::FastHashMap::default();
764        metrics_map.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0); // Any value
765        let sss_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
766
767        // Create regular span without SSS
768        let regular_span = create_test_span(12345, 2, 0);
769
770        let mut trace = create_test_trace(vec![sss_span.clone(), regular_span]);
771
772        // Apply SSS
773        let modified = sampler.single_span_sampling(&mut trace);
774        assert!(modified);
775        assert_eq!(trace.spans().len(), 1); // Only SSS span kept
776        assert_eq!(trace.spans()[0].span_id(), 1); // It's the SSS span
777
778        // Check that trace has been marked as kept with high priority
779        assert!(trace.sampling().is_some());
780        assert_eq!(trace.sampling().as_ref().unwrap().priority, Some(PRIORITY_USER_KEEP));
781
782        // Test 2: Trace without SSS tags should not be modified
783        let trace_without_sss = create_test_trace(vec![create_test_span(12345, 3, 0)]);
784        let mut trace_copy = trace_without_sss.clone();
785        let modified = sampler.single_span_sampling(&mut trace_copy);
786        assert!(!modified);
787        assert_eq!(trace_copy.spans().len(), trace_without_sss.spans().len());
788    }
789
790    #[test]
791    fn test_analytics_events() {
792        let sampler = create_test_sampler();
793
794        // Test 1: Trace with analyzed spans
795        let mut metrics_map = saluki_common::collections::FastHashMap::default();
796        metrics_map.insert(MetaString::from(KEY_ANALYZED_SPANS), 1.0);
797        let analyzed_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
798        let regular_span = create_test_span(12345, 2, 0);
799
800        let mut trace = create_test_trace(vec![analyzed_span.clone(), regular_span]);
801
802        let analyzed_span_ids: Vec<u64> = trace
803            .spans()
804            .iter()
805            .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
806            .map(|span| span.span_id())
807            .collect();
808        assert_eq!(analyzed_span_ids, vec![1]);
809
810        assert!(sampler.has_analyzed_spans(&trace));
811        let modified = sampler.analyzed_span_sampling(&mut trace);
812        assert!(modified);
813        assert_eq!(trace.spans().len(), 1);
814        assert_eq!(trace.spans()[0].span_id(), 1);
815        assert!(trace.sampling().is_some());
816
817        // Test 2: Trace without analyzed spans
818        let trace_no_analytics = create_test_trace(vec![create_test_span(12345, 3, 0)]);
819        let mut trace_no_analytics_copy = trace_no_analytics.clone();
820        let analyzed_span_ids: Vec<u64> = trace_no_analytics
821            .spans()
822            .iter()
823            .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
824            .map(|span| span.span_id())
825            .collect();
826        assert!(analyzed_span_ids.is_empty());
827        assert!(!sampler.has_analyzed_spans(&trace_no_analytics));
828        let modified = sampler.analyzed_span_sampling(&mut trace_no_analytics_copy);
829        assert!(!modified);
830        assert_eq!(trace_no_analytics_copy.spans().len(), trace_no_analytics.spans().len());
831    }
832
833    #[test]
834    fn test_probabilistic_sampling_with_prob_rate_key() {
835        let mut sampler = create_test_sampler();
836        sampler.sampling_rate = 0.75; // 75% sampling rate
837        sampler.probabilistic_sampler_enabled = true;
838
839        // Use a trace ID that we know will be sampled
840        let trace_id = 12345_u64;
841        let root_span = DdSpan::new(
842            MetaString::from("service"),
843            MetaString::from("operation"),
844            MetaString::from("resource"),
845            MetaString::from("type"),
846            trace_id,
847            1,
848            0, // parent_id = 0 indicates root
849            0,
850            1000,
851            0,
852        );
853        let mut trace = create_test_trace(vec![root_span]);
854
855        let (keep, priority, decision_maker, root_span_idx) = sampler.run_samplers(&mut trace);
856
857        if keep && decision_maker == DECISION_MAKER_PROBABILISTIC {
858            // If sampled probabilistically, check that probRateKey was already added
859            assert_eq!(priority, PRIORITY_AUTO_KEEP);
860            assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC); // probabilistic sampling marker
861
862            // Check that the root span already has the probRateKey (it should have been added in run_samplers)
863            let root_idx = root_span_idx.unwrap_or(0);
864            let root_span = &trace.spans()[root_idx];
865            assert!(root_span.metrics().contains_key(PROB_RATE_KEY));
866            assert_eq!(*root_span.metrics().get(PROB_RATE_KEY).unwrap(), 0.75);
867
868            // Test that apply_sampling_metadata still works correctly for other metadata
869            let mut trace_with_metadata = trace.clone();
870            sampler.apply_sampling_metadata(&mut trace_with_metadata, keep, priority, decision_maker, root_idx);
871
872            // Check that decision maker tag was added
873            let modified_root = &trace_with_metadata.spans()[root_idx];
874            assert!(modified_root.meta().contains_key(TAG_DECISION_MAKER));
875            assert_eq!(
876                modified_root.meta().get(TAG_DECISION_MAKER).unwrap(),
877                &MetaString::from(DECISION_MAKER_PROBABILISTIC)
878            );
879        }
880    }
881}