Skip to main content

saluki_components/transforms/trace_sampler/
mod.rs

1//! Trace sampling transform.
2//!
3//! This transform implements agent-side head sampling for traces, supporting:
4//! - Probabilistic sampling based on trace ID
5//! - User-set priority preservation
6//! - Error-based sampling as a safety net
7//! - OTLP trace ingestion with proper sampling decision handling
8//!
9//! TODO:
10//!
11//! - add trace metrics: datadog-agent/pkg/trace/sampler/metrics.go
12//! - adding missing samplers (priority, nopriority)
13//! - add error tracking standalone mode
14
15use async_trait::async_trait;
16use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
17use saluki_common::collections::FastHashMap;
18use saluki_config::GenericConfiguration;
19use saluki_core::{
20    components::{transforms::*, ComponentContext},
21    data_model::event::{
22        trace::{Span, Trace, TraceSampling},
23        Event,
24    },
25    topology::EventsBuffer,
26};
27use saluki_error::GenericError;
28use stringtheory::MetaString;
29use tracing::debug;
30
31mod catalog;
32mod core_sampler;
33mod errors;
34mod priority_sampler;
35mod probabilistic;
36mod rare_sampler;
37mod score_sampler;
38mod signature;
39
40use self::probabilistic::PROB_RATE_KEY;
41use crate::common::datadog::{
42    apm::ApmConfig, sample_by_rate, DECISION_MAKER_MANUAL, DECISION_MAKER_PROBABILISTIC, OTEL_TRACE_ID_META_KEY,
43    SAMPLING_PRIORITY_METRIC_KEY, TAG_DECISION_MAKER,
44};
45use crate::common::otlp::config::TracesConfig;
46
47// Sampling priority constants (matching datadog-agent)
48const PRIORITY_AUTO_DROP: i32 = 0;
49const PRIORITY_AUTO_KEEP: i32 = 1;
50const PRIORITY_USER_KEEP: i32 = 2;
51
52const ERROR_SAMPLE_RATE: f64 = 1.0; // Default extra sample rate (matches agent's ExtraSampleRate)
53
54// Single Span Sampling and Analytics Events keys
55const KEY_SPAN_SAMPLING_MECHANISM: &str = "_dd.span_sampling.mechanism";
56const KEY_ANALYZED_SPANS: &str = "_dd.analyzed";
57
58// Decision maker values for `_dd.p.dm` (matching datadog-agent).
59
60fn normalize_sampling_rate(rate: f64) -> f64 {
61    if rate <= 0.0 || rate >= 1.0 {
62        1.0
63    } else {
64        rate
65    }
66}
67
68/// Configuration for the trace sampler transform.
69#[derive(Debug)]
70pub struct TraceSamplerConfiguration {
71    apm_config: ApmConfig,
72    otlp_sampling_rate: f64,
73}
74
75impl TraceSamplerConfiguration {
76    /// Creates a new `TraceSamplerConfiguration` from the given configuration.
77    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
78        let apm_config = ApmConfig::from_configuration(config)?;
79        let otlp_traces: TracesConfig = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
80        let otlp_sampling_rate = normalize_sampling_rate(otlp_traces.probabilistic_sampler.sampling_percentage / 100.0);
81        Ok(Self {
82            apm_config,
83            otlp_sampling_rate,
84        })
85    }
86}
87
88#[async_trait]
89impl SynchronousTransformBuilder for TraceSamplerConfiguration {
90    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
91        // TODO: Need to support remote configuration changing these at runtime
92        // See https://github.com/DataDog/saluki/issues/1326
93        let sampler = TraceSampler {
94            sampling_rate: self.apm_config.probabilistic_sampler_sampling_percentage() / 100.0,
95            error_sampling_enabled: self.apm_config.error_sampling_enabled(),
96            error_tracking_standalone: self.apm_config.error_tracking_standalone_enabled(),
97            probabilistic_sampler_enabled: self.apm_config.probabilistic_sampler_enabled(),
98            otlp_sampling_rate: self.otlp_sampling_rate,
99            error_sampler: errors::ErrorsSampler::new(self.apm_config.errors_per_second(), ERROR_SAMPLE_RATE),
100            priority_sampler: priority_sampler::PrioritySampler::new(
101                self.apm_config.default_env().clone(),
102                ERROR_SAMPLE_RATE,
103                self.apm_config.target_traces_per_second(),
104            ),
105            no_priority_sampler: score_sampler::NoPrioritySampler::new(
106                self.apm_config.target_traces_per_second(),
107                ERROR_SAMPLE_RATE,
108            ),
109            rare_sampler: rare_sampler::RareSampler::new(
110                self.apm_config.rare_sampler_enabled(),
111                self.apm_config.rare_sampler_tps(),
112                std::time::Duration::from_secs_f64(self.apm_config.rare_sampler_cooldown_period_secs()),
113                self.apm_config.rare_sampler_cardinality(),
114            ),
115        };
116
117        Ok(Box::new(sampler))
118    }
119}
120
121impl MemoryBounds for TraceSamplerConfiguration {
122    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
123        builder.minimum().with_single_value::<TraceSampler>("component struct");
124    }
125}
126
127pub struct TraceSampler {
128    sampling_rate: f64,
129    error_tracking_standalone: bool,
130    error_sampling_enabled: bool,
131    probabilistic_sampler_enabled: bool,
132    otlp_sampling_rate: f64,
133    error_sampler: errors::ErrorsSampler,
134    priority_sampler: priority_sampler::PrioritySampler,
135    no_priority_sampler: score_sampler::NoPrioritySampler,
136    rare_sampler: rare_sampler::RareSampler,
137}
138
139impl TraceSampler {
140    // TODO: merge this with the other duplicate "find root span of trace" functions
141    /// Find the root span index of a trace.
142    fn get_root_span_index(&self, trace: &Trace) -> Option<usize> {
143        // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/traceutil/trace.go#L36
144        let spans = trace.spans();
145        if spans.is_empty() {
146            return None;
147        }
148        let length = spans.len();
149        // General case: go over all spans and check for one without a matching parent.
150        // This intentionally mirrors `datadog-agent/pkg/trace/traceutil/trace.go:GetRoot`:
151        // - Fast-path: return the last span with `parent_id == 0` (some clients report the root last)
152        // - Otherwise: build a map of `parent_id -> child_span_index`, delete entries whose parent
153        //   exists in the trace, and pick any remaining "orphan" child span.
154        let mut parent_id_to_child: FastHashMap<u64, usize> = FastHashMap::default();
155
156        for i in 0..length {
157            // Common case optimization: check for span with parent_id == 0, starting from the end,
158            // since some clients report the root last.
159            let j = length - 1 - i;
160            if spans[j].parent_id() == 0 {
161                return Some(j);
162            }
163            parent_id_to_child.insert(spans[j].parent_id(), j);
164        }
165
166        for span in spans.iter() {
167            parent_id_to_child.remove(&span.span_id());
168        }
169
170        // Here, if the trace is valid, we should have `len(parent_id_to_child) == 1`.
171        if parent_id_to_child.len() != 1 {
172            debug!(
173                "Didn't reliably find the root span for traceID:{}",
174                &spans[0].trace_id()
175            );
176        }
177
178        // Have a safe behavior if that's not the case.
179        // Pick a random span without its parent.
180        if let Some((_, child_idx)) = parent_id_to_child.iter().next() {
181            return Some(*child_idx);
182        }
183
184        // Gracefully fail with the last span of the trace.
185        Some(length - 1)
186    }
187
188    /// Check for user-set sampling priority in trace
189    fn get_user_priority(&self, trace: &Trace, root_span_idx: usize) -> Option<i32> {
190        // First check trace-level sampling priority (last-seen priority from OTLP ingest)
191        if let Some(sampling) = trace.sampling() {
192            if let Some(priority) = sampling.priority {
193                return Some(priority);
194            }
195        }
196
197        if trace.spans().is_empty() {
198            return None;
199        }
200
201        // Fall back to checking spans (for compatibility with non-OTLP traces)
202        // Prefer the root span (common case), but fall back to scanning all spans to be robust to ordering.
203        if let Some(root) = trace.spans().get(root_span_idx) {
204            if let Some(&p) = root.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) {
205                return Some(p as i32);
206            }
207        }
208        let spans = trace.spans();
209        spans
210            .iter()
211            .find_map(|span| span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY).map(|&p| p as i32))
212    }
213
214    /// Returns `true` if the given trace ID should be probabilistically sampled.
215    fn sample_probabilistic(&self, trace_id: u64) -> bool {
216        probabilistic::ProbabilisticSampler::sample(trace_id, self.sampling_rate)
217    }
218
219    fn is_otlp_trace(&self, trace: &Trace, root_span_idx: usize) -> bool {
220        trace
221            .spans()
222            .get(root_span_idx)
223            .map(|span| {
224                span.meta()
225                    .contains_key(&MetaString::from_static(OTEL_TRACE_ID_META_KEY))
226            })
227            .unwrap_or(false)
228    }
229
230    /// Returns `true` if the trace contains a span with an error.
231    fn trace_contains_error(&self, trace: &Trace, consider_exception_span_events: bool) -> bool {
232        trace.spans().iter().any(|span| {
233            span.error() != 0 || (consider_exception_span_events && self.span_contains_exception_span_event(span))
234        })
235    }
236
237    /// Returns `true` if the span has exception span events.
238    ///
239    /// This checks for the `_dd.span_events.has_exception` meta field set to `"true"`.
240    fn span_contains_exception_span_event(&self, span: &Span) -> bool {
241        if let Some(has_exception) = span.meta().get("_dd.span_events.has_exception") {
242            return has_exception == "true";
243        }
244        false
245    }
246
247    /// Computes the OTLP pre-sampling priority and decision maker for a trace, mirroring
248    /// `OTLPReceiver.createChunks` in DDA which runs before `runSamplersV1`.
249    ///
250    /// Returns `Some((priority, dm))` for OTLP traces when the probabilistic sampler is disabled,
251    /// or `None` if pre-sampling does not apply.
252    ///
253    /// See: https://github.com/DataDog/datadog-agent/blob/be33ac1490c4a34602cbc65a211406b73ad6d00b/pkg/trace/api/otlp.go#L561-L585
254    fn otlp_pre_sample(&mut self, trace: &mut Trace, root_span_idx: usize) -> Option<(i32, &'static str)> {
255        if self.probabilistic_sampler_enabled || !self.is_otlp_trace(trace, root_span_idx) {
256            return None;
257        }
258        let (priority, dm) = if let Some(user_priority) = self.get_user_priority(trace, root_span_idx) {
259            (user_priority, DECISION_MAKER_MANUAL)
260        } else {
261            let root_trace_id = trace.spans()[root_span_idx].trace_id();
262            if sample_by_rate(root_trace_id, self.otlp_sampling_rate) {
263                (PRIORITY_AUTO_KEEP, DECISION_MAKER_PROBABILISTIC)
264            } else {
265                (PRIORITY_AUTO_DROP, DECISION_MAKER_PROBABILISTIC)
266            }
267        };
268        if priority == PRIORITY_AUTO_KEEP {
269            if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
270                root_span.metrics_mut().remove(PROB_RATE_KEY);
271            }
272        }
273        Some((priority, dm))
274    }
275
276    /// Apply analyzed span sampling to the trace.
277    ///
278    /// Returns `true` if the trace was modified.
279    fn analyzed_span_sampling(&self, trace: &mut Trace) -> bool {
280        let retained = trace.retain_spans(|_, span| span.metrics().contains_key(KEY_ANALYZED_SPANS));
281        if retained > 0 {
282            // Mark trace as kept with high priority
283            let sampling = TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, Some(self.sampling_rate));
284            trace.set_sampling(Some(sampling));
285            true
286        } else {
287            false
288        }
289    }
290
291    /// Returns `true` if the given trace has any analyzed spans.
292    fn has_analyzed_spans(&self, trace: &Trace) -> bool {
293        trace
294            .spans()
295            .iter()
296            .any(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
297    }
298
299    /// Apply Single Span Sampling to the trace
300    /// Returns true if the trace was modified
301    fn single_span_sampling(&self, trace: &mut Trace) -> bool {
302        let retained = trace.retain_spans(|_, span| span.metrics().contains_key(KEY_SPAN_SAMPLING_MECHANISM));
303        if retained > 0 {
304            // Set high priority and mark as kept
305            let sampling = TraceSampling::new(
306                false,
307                Some(PRIORITY_USER_KEEP),
308                None, // No decision maker for SSS
309                Some(self.sampling_rate),
310            );
311            trace.set_sampling(Some(sampling));
312            true
313        } else {
314            false
315        }
316    }
317
318    /// Evaluates the given trace against all configured samplers.
319    ///
320    /// Return a tuple containing whether or not the trace should be kept, the decision maker tag (which sampler is responsible),
321    /// and the index of the root span used for evaluation.
322    fn run_samplers(&mut self, trace: &mut Trace) -> (bool, i32, &'static str, Option<usize>) {
323        // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1066
324        // Empty trace check
325        if trace.spans().is_empty() {
326            return (false, PRIORITY_AUTO_DROP, "", None);
327        }
328
329        let now = std::time::SystemTime::now();
330        let Some(root_span_idx) = self.get_root_span_index(trace) else {
331            return (false, PRIORITY_AUTO_DROP, "", None);
332        };
333
334        // ETS: only sample traces containing errors (including exception span events); skip all other samplers.
335        // logic taken from: https://github.com/DataDog/datadog-agent/blob/be33ac1490c4a34602cbc65a211406b73ad6d00b/pkg/trace/agent/agent.go#L1068
336        if self.error_tracking_standalone {
337            let otlp_pre_sample = self.otlp_pre_sample(trace, root_span_idx);
338            if self.trace_contains_error(trace, true) {
339                let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
340                let default_priority = if keep { PRIORITY_AUTO_KEEP } else { PRIORITY_AUTO_DROP };
341                let (priority, dm) = otlp_pre_sample.unwrap_or((default_priority, ""));
342                return (keep, priority, dm, Some(root_span_idx));
343            }
344            let (pre_priority, pre_dm) = otlp_pre_sample.unwrap_or((PRIORITY_AUTO_DROP, ""));
345            return (false, pre_priority, pre_dm, Some(root_span_idx));
346        }
347
348        let contains_error = self.trace_contains_error(trace, false);
349
350        // Run the rare sampler early, before all other samplers. This mirrors the Go agent behavior
351        // where the rare sampler runs first to catch traces that would otherwise be dropped entirely.
352        // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1078
353        let rare = self.rare_sampler.sample(trace, root_span_idx);
354
355        // Modern path: ProbabilisticSamplerEnabled = true
356        if self.probabilistic_sampler_enabled {
357            let mut prob_keep = false;
358            let mut decision_maker = "";
359
360            if rare {
361                // Rare sampler wins over probabilistic sampling.
362                prob_keep = true;
363            } else {
364                // Run probabilistic sampler - use root span's trace ID
365                let root_trace_id = trace.spans()[root_span_idx].trace_id();
366                if self.sample_probabilistic(root_trace_id) {
367                    decision_maker = DECISION_MAKER_PROBABILISTIC;
368                    prob_keep = true;
369
370                    if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
371                        let metrics = root_span.metrics_mut();
372                        metrics.insert(MetaString::from(PROB_RATE_KEY), self.sampling_rate);
373                    }
374                } else if self.error_sampling_enabled && contains_error {
375                    prob_keep = self.error_sampler.sample_error(now, trace, root_span_idx);
376                }
377            }
378
379            let priority = if prob_keep {
380                PRIORITY_AUTO_KEEP
381            } else {
382                PRIORITY_AUTO_DROP
383            };
384
385            return (prob_keep, priority, decision_maker, Some(root_span_idx));
386        }
387
388        let user_priority = self.get_user_priority(trace, root_span_idx);
389        if let Some(priority) = user_priority {
390            if priority < PRIORITY_AUTO_DROP {
391                // Manual drop: short-circuit and skip other samplers.
392                return (false, priority, "", Some(root_span_idx));
393            }
394
395            if rare {
396                return (true, priority, "", Some(root_span_idx));
397            }
398
399            if self.priority_sampler.sample(now, trace, root_span_idx, priority, 0.0) {
400                return (true, priority, "", Some(root_span_idx));
401            }
402        } else if self.is_otlp_trace(trace, root_span_idx) {
403            // Rare check mirrors agent behavior: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1129-L1140
404            if rare {
405                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
406            }
407
408            // some sampling happens upstream in the otlp receiver in the agent: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L572
409            let root_trace_id = trace.spans()[root_span_idx].trace_id();
410            if sample_by_rate(root_trace_id, self.otlp_sampling_rate) {
411                if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
412                    root_span.metrics_mut().remove(PROB_RATE_KEY);
413                }
414                return (
415                    true,
416                    PRIORITY_AUTO_KEEP,
417                    DECISION_MAKER_PROBABILISTIC,
418                    Some(root_span_idx),
419                );
420            }
421        } else {
422            if rare {
423                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
424            }
425            if self.no_priority_sampler.sample(now, trace, root_span_idx) {
426                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
427            }
428        }
429
430        if self.error_sampling_enabled && contains_error {
431            let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
432            if keep {
433                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
434            }
435        }
436
437        // Default: drop the trace
438        (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx))
439    }
440
441    /// Apply sampling metadata to the trace in-place.
442    ///
443    /// The `root_span_id` parameter identifies which span should receive the sampling metadata.
444    /// This avoids recalculating the root span since it was already found in `run_samplers`.
445    fn apply_sampling_metadata(
446        &self, trace: &mut Trace, keep: bool, priority: i32, decision_maker: &str, root_span_idx: usize,
447    ) {
448        let is_otlp = self.is_otlp_trace(trace, root_span_idx);
449        let root_span_value = match trace.spans_mut().get_mut(root_span_idx) {
450            Some(span) => span,
451            None => return,
452        };
453
454        // Add tag for the decision maker
455        let existing_decision_maker = if decision_maker.is_empty() {
456            root_span_value.meta().get(TAG_DECISION_MAKER).cloned()
457        } else {
458            None
459        };
460        let decision_maker_meta = if decision_maker.is_empty() {
461            existing_decision_maker
462        } else {
463            Some(MetaString::from(decision_maker))
464        };
465
466        let meta = root_span_value.meta_mut();
467        // When the APM-level probabilistic sampler is used with OTLP traces, the DD Agent writes
468        // _dd.p.dm to trace chunk tags only (not span meta). For the legacy OTLP sampling path,
469        // it is written to both. We match that behavior by skipping the span meta write only when
470        // both conditions hold; the DM value still flows through TraceSampling to the encoder.
471        if priority > 0 && !(is_otlp && self.probabilistic_sampler_enabled) {
472            if let Some(dm) = decision_maker_meta.as_ref() {
473                meta.insert(MetaString::from(TAG_DECISION_MAKER), dm.clone());
474            }
475        }
476
477        // Now we can use trace again to set sampling metadata.
478        let sampling = TraceSampling::new(
479            !keep,
480            Some(priority),
481            if priority > 0 { decision_maker_meta } else { None },
482            Some(if is_otlp {
483                self.otlp_sampling_rate
484            } else {
485                self.sampling_rate
486            }),
487        );
488        trace.set_sampling(Some(sampling));
489    }
490
491    fn process_trace(&mut self, trace: &mut Trace) -> bool {
492        // keep is a boolean that indicates if the trace should be kept or dropped
493        // priority is the sampling priority
494        // decision_maker is the tag that indicates the decision maker (probabilistic, error, etc.)
495        // root_span_idx is the index of the root span of the trace
496        let (keep, priority, decision_maker, root_span_idx) = self.run_samplers(trace);
497
498        // Apply sampling metadata and forward if kept, or if ETS (dropped non-error traces are
499        // forwarded with DroppedTrace=true, suppressing SSS/analytics).
500        if keep || self.error_tracking_standalone {
501            if let Some(root_idx) = root_span_idx {
502                self.apply_sampling_metadata(trace, keep, priority, decision_maker, root_idx);
503            }
504            return true;
505        }
506
507        // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L980-L990
508        // try single span sampling (keeps spans marked for sampling when trace would be dropped)
509        let modified = self.single_span_sampling(trace);
510        if !modified {
511            // Fall back to analytics events if no SSS spans
512            if self.analyzed_span_sampling(trace) {
513                return true;
514            }
515        } else if self.has_analyzed_spans(trace) {
516            // Warn about both SSS and analytics events
517            debug!(
518                "Detected both analytics events AND single span sampling in the same trace. Single span sampling wins because App Analytics is deprecated."
519            );
520            return true;
521        }
522
523        // If we modified the trace with SSS, send it
524        if modified {
525            return true;
526        }
527
528        // Neither SSS nor analytics events found, drop the trace
529        debug!("Dropping trace with priority {}", priority);
530        false
531    }
532}
533
534impl SynchronousTransform for TraceSampler {
535    fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
536        buffer.remove_if(|event| match event {
537            Event::Trace(trace) => !self.process_trace(trace),
538            _ => false,
539        });
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use std::collections::HashMap;
546
547    use saluki_context::tags::TagSet;
548    use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
549    const PRIORITY_USER_DROP: i32 = -1;
550
551    use super::*;
552    fn create_test_sampler() -> TraceSampler {
553        TraceSampler {
554            sampling_rate: 1.0,
555            error_sampling_enabled: true,
556            error_tracking_standalone: false,
557            probabilistic_sampler_enabled: true,
558            otlp_sampling_rate: 1.0,
559            error_sampler: errors::ErrorsSampler::new(10.0, 1.0),
560            priority_sampler: priority_sampler::PrioritySampler::new(MetaString::from("agent-env"), 1.0, 10.0),
561            no_priority_sampler: score_sampler::NoPrioritySampler::new(10.0, 1.0),
562            rare_sampler: rare_sampler::RareSampler::new(false, 5.0, std::time::Duration::from_secs(300), 200),
563        }
564    }
565
566    fn create_test_span(trace_id: u64, span_id: u64, error: i32) -> DdSpan {
567        DdSpan::new(
568            MetaString::from("test-service"),
569            MetaString::from("test-operation"),
570            MetaString::from("test-resource"),
571            MetaString::from("test-type"),
572            trace_id,
573            span_id,
574            0,    // parent_id
575            0,    // start
576            1000, // duration
577            error,
578        )
579    }
580
581    fn create_test_span_with_metrics(trace_id: u64, span_id: u64, metrics: HashMap<String, f64>) -> DdSpan {
582        let mut metrics_map = saluki_common::collections::FastHashMap::default();
583        for (k, v) in metrics {
584            metrics_map.insert(MetaString::from(k), v);
585        }
586        create_test_span(trace_id, span_id, 0).with_metrics(metrics_map)
587    }
588
589    #[allow(dead_code)]
590    fn create_test_span_with_meta(trace_id: u64, span_id: u64, meta: HashMap<String, String>) -> DdSpan {
591        let mut meta_map = saluki_common::collections::FastHashMap::default();
592        for (k, v) in meta {
593            meta_map.insert(MetaString::from(k), MetaString::from(v));
594        }
595        create_test_span(trace_id, span_id, 0).with_meta(meta_map)
596    }
597
598    fn create_test_trace(spans: Vec<DdSpan>) -> Trace {
599        let tags = TagSet::default();
600        Trace::new(spans, tags)
601    }
602
603    #[test]
604    fn test_user_priority_detection() {
605        let sampler = create_test_sampler();
606
607        // Test trace with user-set priority = 2 (UserKeep)
608        let mut metrics = HashMap::new();
609        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
610        let span = create_test_span_with_metrics(12345, 1, metrics);
611        let trace = create_test_trace(vec![span]);
612        let root_idx = sampler.get_root_span_index(&trace).unwrap();
613
614        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
615
616        // Test trace with user-set priority = -1 (UserDrop)
617        let mut metrics = HashMap::new();
618        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), -1.0);
619        let span = create_test_span_with_metrics(12345, 1, metrics);
620        let trace = create_test_trace(vec![span]);
621        let root_idx = sampler.get_root_span_index(&trace).unwrap();
622
623        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(-1));
624
625        // Test trace without user priority
626        let span = create_test_span(12345, 1, 0);
627        let trace = create_test_trace(vec![span]);
628        let root_idx = sampler.get_root_span_index(&trace).unwrap();
629
630        assert_eq!(sampler.get_user_priority(&trace, root_idx), None);
631    }
632
633    #[test]
634    fn test_trace_level_priority_takes_precedence() {
635        let sampler = create_test_sampler();
636
637        // Test trace-level priority overrides span priorities (last-seen priority)
638        // Create spans with different priorities - root has 0, later span has 2
639        let mut metrics_root = HashMap::new();
640        metrics_root.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 0.0);
641        let root_span = create_test_span_with_metrics(12345, 1, metrics_root);
642
643        let mut metrics_later = HashMap::new();
644        metrics_later.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 1.0);
645        let later_span = create_test_span_with_metrics(12345, 2, metrics_later).with_parent_id(1);
646
647        let mut trace = create_test_trace(vec![root_span, later_span]);
648        let root_idx = sampler.get_root_span_index(&trace).unwrap();
649
650        // Without trace-level priority, should get priority from root (0)
651        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(0));
652
653        // Now set trace-level priority to 2 (simulating last-seen priority from OTLP translator)
654        trace.set_sampling(Some(TraceSampling::new(false, Some(2), None, None)));
655
656        // Trace-level priority should take precedence
657        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
658
659        // Test that trace-level priority is used even when no span has priority
660        let span_no_priority = create_test_span(12345, 3, 0);
661        let mut trace_only_trace_level = create_test_trace(vec![span_no_priority]);
662        trace_only_trace_level.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
663        let root_idx = sampler.get_root_span_index(&trace_only_trace_level).unwrap();
664
665        assert_eq!(sampler.get_user_priority(&trace_only_trace_level, root_idx), Some(1));
666    }
667
668    #[test]
669    fn test_manual_keep_with_trace_level_priority() {
670        let mut sampler = create_test_sampler();
671        sampler.probabilistic_sampler_enabled = false; // Use legacy path that checks user priority
672
673        // Test that manual keep (priority = 2) works via trace-level priority
674        let span = create_test_span(12345, 1, 0);
675        let mut trace = create_test_trace(vec![span]);
676        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, None)));
677
678        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
679        assert!(keep);
680        assert_eq!(priority, PRIORITY_USER_KEEP);
681        assert_eq!(decision_maker, "");
682
683        // Test manual drop (priority = -1) via trace-level priority
684        let span = create_test_span(12345, 1, 0);
685        let mut trace = create_test_trace(vec![span]);
686        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_DROP), None, None)));
687
688        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
689        assert!(!keep); // Should not keep when user drops
690        assert_eq!(priority, PRIORITY_USER_DROP);
691
692        // Test that priority = 1 (auto keep) via trace-level is also respected
693        let span = create_test_span(12345, 1, 0);
694        let mut trace = create_test_trace(vec![span]);
695        trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_AUTO_KEEP), None, None)));
696
697        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
698        assert!(keep);
699        assert_eq!(priority, PRIORITY_AUTO_KEEP);
700        assert_eq!(decision_maker, "");
701    }
702
703    #[test]
704    fn test_probabilistic_sampling_determinism() {
705        let sampler = create_test_sampler();
706
707        // Same trace ID should always produce same decision
708        let trace_id = 0x1234567890ABCDEF_u64;
709        let result1 = sampler.sample_probabilistic(trace_id);
710        let result2 = sampler.sample_probabilistic(trace_id);
711        assert_eq!(result1, result2);
712    }
713
714    #[test]
715    fn test_error_detection() {
716        let sampler = create_test_sampler();
717
718        // Test trace with error field set
719        let span_with_error = create_test_span(12345, 1, 1);
720        let trace = create_test_trace(vec![span_with_error]);
721        assert!(sampler.trace_contains_error(&trace, false));
722
723        // Test trace without error
724        let span_without_error = create_test_span(12345, 1, 0);
725        let trace = create_test_trace(vec![span_without_error]);
726        assert!(!sampler.trace_contains_error(&trace, false));
727    }
728
729    #[test]
730    fn test_sampling_priority_order() {
731        // Test modern path: error sampler overrides probabilistic drop
732        let mut sampler = create_test_sampler();
733        sampler.sampling_rate = 0.5; // 50% sampling rate
734        sampler.probabilistic_sampler_enabled = true;
735
736        // Create trace with error that would be dropped by probabilistic
737        // Using a trace ID that we know will be dropped at 50% rate
738        let span_with_error = create_test_span(u64::MAX - 1, 1, 1);
739        let mut trace = create_test_trace(vec![span_with_error]);
740
741        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
742        assert!(keep);
743        assert_eq!(priority, PRIORITY_AUTO_KEEP);
744        assert_eq!(decision_maker, ""); // Error sampler doesn't set decision_maker
745
746        // Test legacy path: user priority is respected
747        let mut sampler = create_test_sampler();
748        sampler.probabilistic_sampler_enabled = false; // Use legacy path
749
750        let mut metrics = HashMap::new();
751        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
752        let span = create_test_span_with_metrics(12345, 1, metrics);
753        let mut trace = create_test_trace(vec![span]);
754
755        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
756        assert!(keep);
757        assert_eq!(priority, 2); // UserKeep
758        assert_eq!(decision_maker, "");
759    }
760
761    #[test]
762    fn test_empty_trace_handling() {
763        let mut sampler = create_test_sampler();
764        let mut trace = create_test_trace(vec![]);
765
766        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
767        assert!(!keep);
768        assert_eq!(priority, PRIORITY_AUTO_DROP);
769    }
770
771    #[test]
772    fn test_root_span_detection() {
773        let sampler = create_test_sampler();
774
775        // Test 1: Root span with parent_id = 0 (common case)
776        let root_span = DdSpan::new(
777            MetaString::from("service"),
778            MetaString::from("operation"),
779            MetaString::from("resource"),
780            MetaString::from("type"),
781            12345,
782            1,
783            0, // parent_id = 0 indicates root
784            0,
785            1000,
786            0,
787        );
788        let child_span = DdSpan::new(
789            MetaString::from("service"),
790            MetaString::from("child_op"),
791            MetaString::from("resource"),
792            MetaString::from("type"),
793            12345,
794            2,
795            1, // parent_id = 1 (points to root)
796            100,
797            500,
798            0,
799        );
800        // Put root span second to test that we find it even when not first
801        let trace = create_test_trace(vec![child_span.clone(), root_span.clone()]);
802        let root_idx = sampler.get_root_span_index(&trace).unwrap();
803        assert_eq!(trace.spans()[root_idx].span_id(), 1);
804
805        // Test 2: Orphaned span (parent not in trace)
806        let orphan_span = DdSpan::new(
807            MetaString::from("service"),
808            MetaString::from("orphan"),
809            MetaString::from("resource"),
810            MetaString::from("type"),
811            12345,
812            3,
813            999, // parent_id = 999 (doesn't exist in trace)
814            200,
815            300,
816            0,
817        );
818        let trace = create_test_trace(vec![orphan_span]);
819        let root_idx = sampler.get_root_span_index(&trace).unwrap();
820        assert_eq!(trace.spans()[root_idx].span_id(), 3);
821
822        // Test 3: Multiple root candidates: should return the last one found (index 1)
823        let span1 = create_test_span(12345, 1, 0);
824        let span2 = create_test_span(12345, 2, 0);
825        let trace = create_test_trace(vec![span1, span2]);
826        // Both have parent_id = 0, should return the last one found (span_id = 2)
827        let root_idx = sampler.get_root_span_index(&trace).unwrap();
828        assert_eq!(trace.spans()[root_idx].span_id(), 2);
829    }
830
831    #[test]
832    fn test_single_span_sampling() {
833        let mut sampler = create_test_sampler();
834
835        // Test 1: Trace with SSS tags should be kept even when probabilistic would drop it
836        sampler.sampling_rate = 0.0; // 0% sampling rate - should drop everything
837        sampler.probabilistic_sampler_enabled = true;
838
839        // Create span with SSS metric
840        let mut metrics_map = saluki_common::collections::FastHashMap::default();
841        metrics_map.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0); // Any value
842        let sss_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
843
844        // Create regular span without SSS
845        let regular_span = create_test_span(12345, 2, 0);
846
847        let mut trace = create_test_trace(vec![sss_span.clone(), regular_span]);
848
849        // Apply SSS
850        let modified = sampler.single_span_sampling(&mut trace);
851        assert!(modified);
852        assert_eq!(trace.spans().len(), 1); // Only SSS span kept
853        assert_eq!(trace.spans()[0].span_id(), 1); // It's the SSS span
854
855        // Check that trace has been marked as kept with high priority
856        assert!(trace.sampling().is_some());
857        assert_eq!(trace.sampling().as_ref().unwrap().priority, Some(PRIORITY_USER_KEEP));
858
859        // Test 2: Trace without SSS tags should not be modified
860        let trace_without_sss = create_test_trace(vec![create_test_span(12345, 3, 0)]);
861        let mut trace_copy = trace_without_sss.clone();
862        let modified = sampler.single_span_sampling(&mut trace_copy);
863        assert!(!modified);
864        assert_eq!(trace_copy.spans().len(), trace_without_sss.spans().len());
865    }
866
867    #[test]
868    fn test_analytics_events() {
869        let sampler = create_test_sampler();
870
871        // Test 1: Trace with analyzed spans
872        let mut metrics_map = saluki_common::collections::FastHashMap::default();
873        metrics_map.insert(MetaString::from(KEY_ANALYZED_SPANS), 1.0);
874        let analyzed_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone());
875        let regular_span = create_test_span(12345, 2, 0);
876
877        let mut trace = create_test_trace(vec![analyzed_span.clone(), regular_span]);
878
879        let analyzed_span_ids: Vec<u64> = trace
880            .spans()
881            .iter()
882            .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
883            .map(|span| span.span_id())
884            .collect();
885        assert_eq!(analyzed_span_ids, vec![1]);
886
887        assert!(sampler.has_analyzed_spans(&trace));
888        let modified = sampler.analyzed_span_sampling(&mut trace);
889        assert!(modified);
890        assert_eq!(trace.spans().len(), 1);
891        assert_eq!(trace.spans()[0].span_id(), 1);
892        assert!(trace.sampling().is_some());
893
894        // Test 2: Trace without analyzed spans
895        let trace_no_analytics = create_test_trace(vec![create_test_span(12345, 3, 0)]);
896        let mut trace_no_analytics_copy = trace_no_analytics.clone();
897        let analyzed_span_ids: Vec<u64> = trace_no_analytics
898            .spans()
899            .iter()
900            .filter(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS))
901            .map(|span| span.span_id())
902            .collect();
903        assert!(analyzed_span_ids.is_empty());
904        assert!(!sampler.has_analyzed_spans(&trace_no_analytics));
905        let modified = sampler.analyzed_span_sampling(&mut trace_no_analytics_copy);
906        assert!(!modified);
907        assert_eq!(trace_no_analytics_copy.spans().len(), trace_no_analytics.spans().len());
908    }
909
910    #[test]
911    fn test_probabilistic_sampling_with_prob_rate_key() {
912        let mut sampler = create_test_sampler();
913        sampler.sampling_rate = 0.75; // 75% sampling rate
914        sampler.probabilistic_sampler_enabled = true;
915
916        // Use a trace ID that we know will be sampled
917        let trace_id = 12345_u64;
918        let root_span = DdSpan::new(
919            MetaString::from("service"),
920            MetaString::from("operation"),
921            MetaString::from("resource"),
922            MetaString::from("type"),
923            trace_id,
924            1,
925            0, // parent_id = 0 indicates root
926            0,
927            1000,
928            0,
929        );
930        let mut trace = create_test_trace(vec![root_span]);
931
932        let (keep, priority, decision_maker, root_span_idx) = sampler.run_samplers(&mut trace);
933
934        if keep && decision_maker == DECISION_MAKER_PROBABILISTIC {
935            // If sampled probabilistically, check that probRateKey was already added
936            assert_eq!(priority, PRIORITY_AUTO_KEEP);
937            assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC); // probabilistic sampling marker
938
939            // Check that the root span already has the probRateKey (it should have been added in run_samplers)
940            let root_idx = root_span_idx.unwrap_or(0);
941            let root_span = &trace.spans()[root_idx];
942            assert!(root_span.metrics().contains_key(PROB_RATE_KEY));
943            assert_eq!(*root_span.metrics().get(PROB_RATE_KEY).unwrap(), 0.75);
944
945            // Test that apply_sampling_metadata still works correctly for other metadata
946            let mut trace_with_metadata = trace.clone();
947            sampler.apply_sampling_metadata(&mut trace_with_metadata, keep, priority, decision_maker, root_idx);
948
949            // Check that decision maker tag was added
950            let modified_root = &trace_with_metadata.spans()[root_idx];
951            assert!(modified_root.meta().contains_key(TAG_DECISION_MAKER));
952            assert_eq!(
953                modified_root.meta().get(TAG_DECISION_MAKER).unwrap(),
954                &MetaString::from(DECISION_MAKER_PROBABILISTIC)
955            );
956        }
957    }
958
959    // ── Rare-sampler interaction tests ──────────────────────────────────────────
960    // Adapted from datadog-agent/pkg/trace/agent/agent_test.go TestSampling cases:
961    // "rare-sampler-catch-unsampled", "rare-sampler-catch-sampled",
962    // "rare-sampler-disabled", and related probabilistic path interactions.
963
964    /// Create a top-level span eligible for rare sampling.
965    ///
966    /// The rare sampler only considers spans that have `_top_level=1` or `_dd.measured=1`.
967    /// This helper sets `_top_level=1` so that the rare sampler can consider the span.
968    fn create_top_level_span(trace_id: u64, span_id: u64) -> DdSpan {
969        let mut metrics = saluki_common::collections::FastHashMap::default();
970        metrics.insert(MetaString::from("_top_level"), 1.0);
971        create_test_span(trace_id, span_id, 0).with_metrics(metrics)
972    }
973
974    /// Create a `TraceSampler` with the rare sampler enabled and a very high TPS limit so it
975    /// freely samples first occurrences, plus a long TTL so second occurrences stay within TTL.
976    fn create_sampler_with_rare_enabled() -> TraceSampler {
977        TraceSampler {
978            rare_sampler: rare_sampler::RareSampler::new(true, 1000.0, std::time::Duration::from_secs(300), 200),
979            ..create_test_sampler()
980        }
981    }
982
983    /// Adapted from Go "rare-sampler-catch-unsampled":
984    ///
985    /// Rare is enabled + probabilistic would drop → rare catches it (first occurrence).
986    #[test]
987    fn rare_sampler_catches_unsampled_trace() {
988        let mut sampler = create_sampler_with_rare_enabled();
989        sampler.sampling_rate = 0.0; // probabilistic drops everything
990        sampler.probabilistic_sampler_enabled = true;
991
992        let span = create_top_level_span(111, 1);
993        let mut trace = create_test_trace(vec![span]);
994
995        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
996        assert!(keep, "rare sampler should catch first occurrence");
997        assert_eq!(priority, PRIORITY_AUTO_KEEP);
998        assert_eq!(decision_maker, "", "rare sampler does not set _dd.p.dm");
999    }
1000
1001    /// Adapted from Go "rare-sampler-catch-sampled" (first trace):
1002    ///
1003    /// Rare is enabled, first occurrence — trace is kept and `_dd.rare` is set on the span.
1004    #[test]
1005    fn rare_sampler_sets_rare_metric_on_first_occurrence() {
1006        let mut sampler = create_sampler_with_rare_enabled();
1007        sampler.sampling_rate = 0.0;
1008        sampler.probabilistic_sampler_enabled = true;
1009
1010        let span = create_top_level_span(222, 1);
1011        let mut trace = create_test_trace(vec![span]);
1012
1013        let (keep, _, _, root_idx) = sampler.run_samplers(&mut trace);
1014        assert!(keep);
1015        let root = &trace.spans()[root_idx.unwrap()];
1016        assert_eq!(
1017            root.metrics().get(rare_sampler::RARE_KEY).copied(),
1018            Some(1.0),
1019            "_dd.rare should be 1 on first occurrence"
1020        );
1021    }
1022
1023    /// Adapted from Go "rare-sampler-catch-sampled" (second trace same signature):
1024    ///
1025    /// Within the TTL, the same signature is no longer "rare" and rare does not re-sample it.
1026    /// With probabilistic at 0%, the trace should be dropped.
1027    #[test]
1028    fn rare_sampler_does_not_resample_within_ttl() {
1029        let mut sampler = create_sampler_with_rare_enabled();
1030        sampler.sampling_rate = 0.0;
1031        sampler.probabilistic_sampler_enabled = true;
1032
1033        // First trace: rare catches it.
1034        let span1 = create_top_level_span(333, 1);
1035        let mut trace1 = create_test_trace(vec![span1]);
1036        let (keep1, _, _, _) = sampler.run_samplers(&mut trace1);
1037        assert!(keep1, "first occurrence should be kept by rare sampler");
1038
1039        // Second trace: same signature (same service/operation/resource on the top-level span),
1040        // still within TTL → rare won't catch it; probabilistic at 0% drops it.
1041        let span2 = create_top_level_span(333, 2);
1042        let mut trace2 = create_test_trace(vec![span2]);
1043        let (keep2, priority2, _, _) = sampler.run_samplers(&mut trace2);
1044        assert!(!keep2, "second occurrence within TTL should be dropped");
1045        assert_eq!(priority2, PRIORITY_AUTO_DROP);
1046    }
1047
1048    /// Adapted from Go "rare-sampler-disabled":
1049    ///
1050    /// Rare is disabled + probabilistic at 0% → trace is dropped.
1051    #[test]
1052    fn rare_sampler_disabled_does_not_catch_unsampled() {
1053        let mut sampler = create_test_sampler(); // rare disabled by default
1054        sampler.sampling_rate = 0.0;
1055        sampler.probabilistic_sampler_enabled = true;
1056
1057        let span = create_top_level_span(444, 1);
1058        let mut trace = create_test_trace(vec![span]);
1059
1060        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1061        assert!(!keep, "rare disabled should not catch the trace");
1062        assert_eq!(priority, PRIORITY_AUTO_DROP);
1063    }
1064
1065    /// Rare + non-probabilistic path (priority path): rare catches `PriorityAutoDrop` on first
1066    /// occurrence, preserving the tracer-set priority rather than upgrading to AutoKeep.
1067    #[test]
1068    fn rare_sampler_catches_priority_auto_drop_in_legacy_path() {
1069        let mut sampler = create_sampler_with_rare_enabled();
1070        sampler.probabilistic_sampler_enabled = false;
1071
1072        let mut metrics = saluki_common::collections::FastHashMap::default();
1073        metrics.insert(MetaString::from("_top_level"), 1.0);
1074        metrics.insert(
1075            MetaString::from(SAMPLING_PRIORITY_METRIC_KEY),
1076            PRIORITY_AUTO_DROP as f64,
1077        );
1078        let span = create_test_span(555, 1, 0).with_metrics(metrics);
1079        let mut trace = create_test_trace(vec![span]);
1080
1081        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1082        assert!(keep, "rare sampler should catch PriorityAutoDrop on first occurrence");
1083        assert_eq!(priority, PRIORITY_AUTO_DROP, "tracer-set priority should be preserved");
1084        assert_eq!(decision_maker, "");
1085    }
1086
1087    /// Rare + non-probabilistic path (priority path): UserKeep priority is preserved, not
1088    /// downgraded to AutoKeep. Mirrors Go agent behavior at agent.go#L1129-1131.
1089    #[test]
1090    fn rare_sampler_preserves_user_keep_priority_in_legacy_path() {
1091        let mut sampler = create_sampler_with_rare_enabled();
1092        sampler.probabilistic_sampler_enabled = false;
1093
1094        let mut metrics = saluki_common::collections::FastHashMap::default();
1095        metrics.insert(MetaString::from("_top_level"), 1.0);
1096        metrics.insert(MetaString::from(SAMPLING_PRIORITY_METRIC_KEY), 2.0); // UserKeep
1097        let span = create_test_span(556, 1, 0).with_metrics(metrics);
1098        let mut trace = create_test_trace(vec![span]);
1099
1100        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1101        assert!(keep);
1102        assert_eq!(priority, 2, "UserKeep priority must not be downgraded to AutoKeep");
1103    }
1104
1105    /// Probabilistic path with 100% rate and rare disabled: keep with `_dd.p.dm = "-9"`.
1106    #[test]
1107    fn probabilistic_100_percent_keeps_trace_with_decision_maker() {
1108        let mut sampler = create_test_sampler(); // rare disabled
1109        sampler.sampling_rate = 1.0;
1110        sampler.probabilistic_sampler_enabled = true;
1111
1112        let span = create_top_level_span(666, 1);
1113        let mut trace = create_test_trace(vec![span]);
1114
1115        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1116        assert!(keep);
1117        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1118        assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC);
1119    }
1120
1121    /// Probabilistic path with 0% rate and rare disabled: drop.
1122    #[test]
1123    fn probabilistic_0_percent_drops_trace() {
1124        let mut sampler = create_test_sampler(); // rare disabled
1125        sampler.sampling_rate = 0.0;
1126        sampler.probabilistic_sampler_enabled = true;
1127        sampler.error_sampling_enabled = false;
1128
1129        let span = create_top_level_span(777, 1);
1130        let mut trace = create_test_trace(vec![span]);
1131
1132        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1133        assert!(!keep);
1134        assert_eq!(priority, PRIORITY_AUTO_DROP);
1135    }
1136
1137    /// Rare sampler should catch OTLP traces without a sampling priority on their first occurrence,
1138    /// matching the Go agent behavior: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1129-L1140
1139    #[test]
1140    fn rare_sampler_catches_otlp_no_priority_trace() {
1141        let mut sampler = create_sampler_with_rare_enabled();
1142        sampler.probabilistic_sampler_enabled = false;
1143        sampler.error_sampling_enabled = false;
1144        sampler.otlp_sampling_rate = 0.0;
1145
1146        let mut meta = saluki_common::collections::FastHashMap::default();
1147        meta.insert(
1148            MetaString::from_static(OTEL_TRACE_ID_META_KEY),
1149            MetaString::from("00000000000000000000000000000001"),
1150        );
1151        let span = create_top_level_span(888, 1).with_meta(meta);
1152        let mut trace = create_test_trace(vec![span]);
1153
1154        let (keep, priority, decision_maker, root_idx) = sampler.run_samplers(&mut trace);
1155        assert!(
1156            keep,
1157            "rare sampler should keep OTLP trace with no priority on first occurrence"
1158        );
1159        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1160        assert_eq!(decision_maker, "");
1161        assert_eq!(
1162            trace.spans()[root_idx.unwrap()]
1163                .metrics()
1164                .get(rare_sampler::RARE_KEY)
1165                .copied(),
1166            Some(1.0),
1167            "_dd.rare should be set to 1 on first occurrence"
1168        );
1169    }
1170
1171    /// Adapted from Go "probabilistic-rare-100":
1172    ///
1173    /// Rare fires before probabilistic is consulted, so even at 100% sampling rate the decision
1174    /// maker tag is not set — the trace is attributed to rare, not probabilistic.
1175    #[test]
1176    fn rare_wins_over_probabilistic_no_decision_maker_tag() {
1177        let mut sampler = create_sampler_with_rare_enabled();
1178        sampler.sampling_rate = 1.0;
1179        sampler.probabilistic_sampler_enabled = true;
1180
1181        let span = create_top_level_span(901, 1);
1182        let mut trace = create_test_trace(vec![span]);
1183
1184        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1185        assert!(keep);
1186        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1187        assert_eq!(decision_maker, "", "rare takes precedence — _dd.p.dm must not be set");
1188    }
1189
1190    /// Adapted from Go "error-sampled-prio-unsampled":
1191    ///
1192    /// Rare fires before the error sampler is reached. A no-priority error trace on its first
1193    /// occurrence is kept by rare, not by the error sampler.
1194    #[test]
1195    fn rare_catches_error_trace_before_error_sampler() {
1196        let mut sampler = create_sampler_with_rare_enabled();
1197        sampler.probabilistic_sampler_enabled = false;
1198        sampler.error_sampling_enabled = true;
1199
1200        let span = create_top_level_span(902, 1);
1201        let error_span = create_test_span(902, 2, 1); // error=1
1202        let mut trace = create_test_trace(vec![span, error_span]);
1203
1204        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1205        assert!(keep, "rare should catch the trace before the error sampler");
1206        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1207        assert_eq!(decision_maker, "");
1208    }
1209
1210    /// Adapted from Go manual-drop short-circuit behavior:
1211    ///
1212    /// UserDrop (-1) priority is checked before rare runs in the priority path. A UserDrop trace
1213    /// must be dropped even when rare is enabled and would otherwise match.
1214    #[test]
1215    fn manual_drop_short_circuits_before_rare() {
1216        let mut sampler = create_sampler_with_rare_enabled();
1217        sampler.probabilistic_sampler_enabled = false;
1218
1219        let mut metrics = saluki_common::collections::FastHashMap::default();
1220        metrics.insert(MetaString::from("_top_level"), 1.0);
1221        metrics.insert(MetaString::from(SAMPLING_PRIORITY_METRIC_KEY), -1.0); // UserDrop
1222        let span = create_test_span(903, 1, 0).with_metrics(metrics);
1223        let mut trace = create_test_trace(vec![span]);
1224
1225        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1226        assert!(!keep, "UserDrop must be dropped even when rare would match");
1227        assert_eq!(priority, -1);
1228    }
1229
1230    // ── Error Tracking Standalone tests ─────────────────────────────────────────
1231    // Adapted from datadog-agent/pkg/trace/agent/agent.go runSamplers ETS block.
1232
1233    fn create_sampler_with_ets() -> TraceSampler {
1234        TraceSampler {
1235            error_tracking_standalone: true,
1236            ..create_test_sampler()
1237        }
1238    }
1239
1240    /// ETS enabled + trace with error → kept by error sampler.
1241    #[test]
1242    fn ets_keeps_trace_with_error() {
1243        let mut sampler = create_sampler_with_ets();
1244
1245        let span = create_test_span(100, 1, 1); // error=1
1246        let mut trace = create_test_trace(vec![span]);
1247
1248        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1249        assert!(keep, "ETS should keep traces with errors");
1250        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1251        assert_eq!(decision_maker, "", "ETS does not set a decision maker");
1252    }
1253
1254    /// ETS enabled + trace without error → dropped; rare/probabilistic/priority not consulted.
1255    #[test]
1256    fn ets_drops_trace_without_error() {
1257        let mut sampler = create_sampler_with_ets();
1258
1259        let span = create_test_span(101, 1, 0); // error=0
1260        let mut trace = create_test_trace(vec![span]);
1261
1262        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1263        assert!(!keep, "ETS should drop traces without errors");
1264        assert_eq!(priority, PRIORITY_AUTO_DROP);
1265    }
1266
1267    /// ETS enabled + non-error trace → forwarded with DroppedTrace=true; SSS/analytics suppressed.
1268    #[test]
1269    fn ets_forwards_dropped_trace_with_dropped_flag() {
1270        let mut sampler = create_sampler_with_ets();
1271
1272        // Span with SSS metric — would trigger single span sampling in non-ETS mode.
1273        let mut metrics = saluki_common::collections::FastHashMap::default();
1274        metrics.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0);
1275        let span = create_test_span(102, 1, 0).with_metrics(metrics);
1276        let mut trace = create_test_trace(vec![span]);
1277
1278        let forwarded = sampler.process_trace(&mut trace);
1279        assert!(forwarded, "ETS should forward non-error traces to intake");
1280        assert!(
1281            trace.sampling().is_some_and(|s| s.dropped_trace),
1282            "non-error ETS trace should have DroppedTrace=true"
1283        );
1284    }
1285
1286    /// ETS enabled + trace with exception span event → kept (exception events count as errors in ETS).
1287    #[test]
1288    fn ets_keeps_trace_with_exception_span_event() {
1289        let mut sampler = create_sampler_with_ets();
1290
1291        // Span with error=0 but exception span event metadata.
1292        let mut meta = saluki_common::collections::FastHashMap::default();
1293        meta.insert(
1294            MetaString::from("_dd.span_events.has_exception"),
1295            MetaString::from("true"),
1296        );
1297        let span = create_test_span(104, 1, 0).with_meta(meta);
1298        let mut trace = create_test_trace(vec![span]);
1299
1300        let (keep, _, _, _) = sampler.run_samplers(&mut trace);
1301        assert!(keep, "ETS should treat exception span events as errors");
1302    }
1303
1304    /// ETS disabled → normal sampling path (probabilistic) is used.
1305    #[test]
1306    fn ets_disabled_uses_normal_sampling() {
1307        let mut sampler = create_test_sampler(); // ETS disabled
1308        sampler.sampling_rate = 1.0;
1309        sampler.probabilistic_sampler_enabled = true;
1310
1311        let span = create_test_span(105, 1, 0); // no error
1312        let mut trace = create_test_trace(vec![span]);
1313
1314        let (keep, _, decision_maker, _) = sampler.run_samplers(&mut trace);
1315        assert!(keep, "normal probabilistic sampling should keep the trace");
1316        assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC);
1317    }
1318
1319    // ── ETS + OTLP pre-sampling tests ────────────────────────────────────────────
1320    // These mirror DDA's OTLPReceiver.createChunks behavior which pre-assigns
1321    // priority/dm before runSamplersV1, so ETS sees those values even when it
1322    // short-circuits. See: pkg/trace/api/otlp.go#L561-L585.
1323
1324    fn create_otlp_test_span(trace_id: u64, span_id: u64, error: i32) -> DdSpan {
1325        let mut meta = saluki_common::collections::FastHashMap::default();
1326        meta.insert(
1327            MetaString::from_static(OTEL_TRACE_ID_META_KEY),
1328            MetaString::from("0000000000000000deadbeefcafebabe"),
1329        );
1330        create_test_span(trace_id, span_id, error).with_meta(meta)
1331    }
1332
1333    fn create_sampler_with_ets_legacy() -> TraceSampler {
1334        TraceSampler {
1335            error_tracking_standalone: true,
1336            probabilistic_sampler_enabled: false,
1337            otlp_sampling_rate: 1.0,
1338            ..create_test_sampler()
1339        }
1340    }
1341
1342    /// ETS + OTLP non-error trace (legacy sampler path): pre-sampling sets priority=AutoKeep and dm="-9".
1343    /// Mirrors DDA OTLPReceiver assigning priority=1 + dm=-9 before ETS returns early.
1344    #[test]
1345    fn ets_otlp_non_error_gets_presample_priority_and_dm() {
1346        let mut sampler = create_sampler_with_ets_legacy();
1347
1348        let span = create_otlp_test_span(200, 1, 0); // no error
1349        let mut trace = create_test_trace(vec![span]);
1350
1351        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1352        assert!(!keep, "ETS should drop non-error OTLP traces");
1353        assert_eq!(
1354            priority, PRIORITY_AUTO_KEEP,
1355            "OTLP pre-sampling sets priority=AutoKeep even for ETS-dropped traces"
1356        );
1357        assert_eq!(dm, DECISION_MAKER_PROBABILISTIC, "OTLP pre-sampling sets dm=-9");
1358    }
1359
1360    /// ETS + OTLP error trace (legacy sampler path): pre-sampling sets priority=AutoKeep and dm="-9".
1361    #[test]
1362    fn ets_otlp_error_gets_presample_priority_and_dm() {
1363        let mut sampler = create_sampler_with_ets_legacy();
1364
1365        let span = create_otlp_test_span(201, 1, 1); // error=1
1366        let mut trace = create_test_trace(vec![span]);
1367
1368        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1369        assert!(keep, "ETS should keep error OTLP traces");
1370        assert_eq!(priority, PRIORITY_AUTO_KEEP, "OTLP pre-sampling sets priority=AutoKeep");
1371        assert_eq!(dm, DECISION_MAKER_PROBABILISTIC, "OTLP pre-sampling sets dm=-9");
1372    }
1373
1374    /// ETS + OTLP + probabilistic_sampler_enabled=true: OTLPReceiver defers, no pre-sampling.
1375    /// DDA's OTLPReceiver sets PriorityNone and skips when ProbabilisticSamplerEnabled.
1376    #[test]
1377    fn ets_otlp_probabilistic_path_skips_presample() {
1378        let mut sampler = create_sampler_with_ets_legacy();
1379        sampler.probabilistic_sampler_enabled = true; // override to prob path
1380
1381        let span = create_otlp_test_span(202, 1, 0); // no error
1382        let mut trace = create_test_trace(vec![span]);
1383
1384        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1385        assert!(!keep, "ETS should drop non-error traces");
1386        assert_eq!(
1387            priority, PRIORITY_AUTO_DROP,
1388            "no pre-sampling when probabilistic path active"
1389        );
1390        assert_eq!(dm, "", "no dm when probabilistic path active");
1391    }
1392
1393    /// ETS + non-OTLP trace (legacy sampler path): behavior unchanged — no pre-sampling.
1394    #[test]
1395    fn ets_non_otlp_unaffected_by_presample() {
1396        let mut sampler = create_sampler_with_ets_legacy();
1397
1398        let span = create_test_span(203, 1, 0); // no error, no OTLP meta
1399        let mut trace = create_test_trace(vec![span]);
1400
1401        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1402        assert!(!keep, "ETS should drop non-error non-OTLP traces");
1403        assert_eq!(priority, PRIORITY_AUTO_DROP, "non-OTLP traces use default ETS priority");
1404        assert_eq!(dm, "", "non-OTLP traces get no dm");
1405    }
1406
1407    /// ETS + OTLP trace with user-set priority: dm="-4" (manual sampling), matching DDA.
1408    #[test]
1409    fn ets_otlp_user_priority_gets_manual_dm() {
1410        let mut sampler = create_sampler_with_ets_legacy();
1411
1412        let mut metrics = saluki_common::collections::FastHashMap::default();
1413        metrics.insert(MetaString::from(SAMPLING_PRIORITY_METRIC_KEY), 2.0); // UserKeep
1414        let span = create_otlp_test_span(204, 1, 0).with_metrics(metrics); // no error
1415        let mut trace = create_test_trace(vec![span]);
1416
1417        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1418        assert!(!keep, "ETS drops non-error traces regardless of user priority");
1419        assert_eq!(priority, PRIORITY_USER_KEEP, "user priority is preserved");
1420        assert_eq!(dm, DECISION_MAKER_MANUAL, "user-set priority gets dm=-4");
1421    }
1422}