Skip to main content

saluki_components/transforms/trace_sampler/
mod.rs

1//! Trace sampling transform.
2//!
3//! This transform implements agent-side head sampling for traces, supporting:
4//! - Probabilistic sampling based on trace ID
5//! - User-set priority preservation
6//! - Error-based sampling as a safety net
7//! - OTLP trace ingestion with proper sampling decision handling
8//!
9//! TODO:
10//!
11//! - add trace metrics: datadog-agent/pkg/trace/sampler/metrics.go
12//! - adding missing samplers (priority, nopriority)
13//! - add error tracking standalone mode
14
15use async_trait::async_trait;
16use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
17use saluki_common::collections::FastHashMap;
18use saluki_config::GenericConfiguration;
19use saluki_core::{
20    components::{transforms::*, ComponentContext},
21    data_model::event::{
22        trace::{AttributeValue, Span, Trace},
23        Event,
24    },
25    topology::EventsBuffer,
26};
27use saluki_error::GenericError;
28use stringtheory::MetaString;
29use tracing::debug;
30
31mod catalog;
32mod core_sampler;
33mod errors;
34mod priority_sampler;
35mod probabilistic;
36mod rare_sampler;
37mod score_sampler;
38mod signature;
39
40use self::probabilistic::PROB_RATE_KEY;
41use crate::common::datadog::{
42    apm::ApmConfig, sample_by_rate, DECISION_MAKER_MANUAL, DECISION_MAKER_PROBABILISTIC, OTEL_TRACE_ID_META_KEY,
43    SAMPLING_PRIORITY_METRIC_KEY, TAG_DECISION_MAKER,
44};
45use crate::common::otlp::config::TracesConfig;
46
47// Sampling priority constants (matching datadog-agent)
48const PRIORITY_AUTO_DROP: i32 = 0;
49const PRIORITY_AUTO_KEEP: i32 = 1;
50const PRIORITY_USER_KEEP: i32 = 2;
51
52const ERROR_SAMPLE_RATE: f64 = 1.0; // Default extra sample rate (matches agent's ExtraSampleRate)
53
54// Single Span Sampling and Analytics Events keys
55const KEY_SPAN_SAMPLING_MECHANISM: &str = "_dd.span_sampling.mechanism";
56const KEY_ANALYZED_SPANS: &str = "_dd.analyzed";
57
58// Decision maker values for `_dd.p.dm` (matching datadog-agent).
59
60fn normalize_sampling_rate(rate: f64) -> f64 {
61    if rate <= 0.0 || rate >= 1.0 {
62        1.0
63    } else {
64        rate
65    }
66}
67
68/// Configuration for the trace sampler transform.
69#[derive(Debug)]
70pub struct TraceSamplerConfiguration {
71    apm_config: ApmConfig,
72    otlp_sampling_rate: f64,
73}
74
75impl TraceSamplerConfiguration {
76    /// Creates a new `TraceSamplerConfiguration` from the given configuration.
77    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
78        let apm_config = ApmConfig::from_configuration(config)?;
79        let otlp_traces: TracesConfig = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
80        let otlp_sampling_rate = normalize_sampling_rate(otlp_traces.probabilistic_sampler.sampling_percentage / 100.0);
81        Ok(Self {
82            apm_config,
83            otlp_sampling_rate,
84        })
85    }
86}
87
88#[async_trait]
89impl SynchronousTransformBuilder for TraceSamplerConfiguration {
90    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
91        // TODO: Need to support remote configuration changing these at runtime
92        // See https://github.com/DataDog/saluki/issues/1326
93        let sampler = TraceSampler {
94            sampling_rate: self.apm_config.probabilistic_sampler_sampling_percentage() / 100.0,
95            error_sampling_enabled: self.apm_config.error_sampling_enabled(),
96            error_tracking_standalone: self.apm_config.error_tracking_standalone_enabled(),
97            probabilistic_sampler_enabled: self.apm_config.probabilistic_sampler_enabled(),
98            otlp_sampling_rate: self.otlp_sampling_rate,
99            error_sampler: errors::ErrorsSampler::new(self.apm_config.errors_per_second(), ERROR_SAMPLE_RATE),
100            priority_sampler: priority_sampler::PrioritySampler::new(
101                self.apm_config.default_env().clone(),
102                ERROR_SAMPLE_RATE,
103                self.apm_config.target_traces_per_second(),
104            ),
105            no_priority_sampler: score_sampler::NoPrioritySampler::new(
106                self.apm_config.target_traces_per_second(),
107                ERROR_SAMPLE_RATE,
108            ),
109            rare_sampler: rare_sampler::RareSampler::new(
110                self.apm_config.rare_sampler_enabled(),
111                self.apm_config.rare_sampler_tps(),
112                std::time::Duration::from_secs_f64(self.apm_config.rare_sampler_cooldown_period_secs()),
113                self.apm_config.rare_sampler_cardinality(),
114            ),
115        };
116
117        Ok(Box::new(sampler))
118    }
119}
120
121impl MemoryBounds for TraceSamplerConfiguration {
122    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
123        builder.minimum().with_single_value::<TraceSampler>("component struct");
124    }
125}
126
127pub struct TraceSampler {
128    sampling_rate: f64,
129    error_tracking_standalone: bool,
130    error_sampling_enabled: bool,
131    probabilistic_sampler_enabled: bool,
132    otlp_sampling_rate: f64,
133    error_sampler: errors::ErrorsSampler,
134    priority_sampler: priority_sampler::PrioritySampler,
135    no_priority_sampler: score_sampler::NoPrioritySampler,
136    rare_sampler: rare_sampler::RareSampler,
137}
138
139impl TraceSampler {
140    // TODO: merge this with the other duplicate "find root span of trace" functions
141    /// Find the root span index of a trace.
142    fn get_root_span_index(&self, trace: &Trace) -> Option<usize> {
143        // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/traceutil/trace.go#L36
144        let spans = trace.spans();
145        if spans.is_empty() {
146            return None;
147        }
148        let length = spans.len();
149        // General case: go over all spans and check for one without a matching parent.
150        // This intentionally mirrors `datadog-agent/pkg/trace/traceutil/trace.go:GetRoot`:
151        // - Fast-path: return the last span with `parent_id == 0` (some clients report the root last)
152        // - Otherwise: build a map of `parent_id -> child_span_index`, delete entries whose parent
153        //   exists in the trace, and pick any remaining "orphan" child span.
154        let mut parent_id_to_child: FastHashMap<u64, usize> = FastHashMap::default();
155
156        for i in 0..length {
157            // Common case optimization: check for span with parent_id == 0, starting from the end,
158            // since some clients report the root last.
159            let j = length - 1 - i;
160            if spans[j].parent_id() == 0 {
161                return Some(j);
162            }
163            parent_id_to_child.insert(spans[j].parent_id(), j);
164        }
165
166        for span in spans.iter() {
167            parent_id_to_child.remove(&span.span_id());
168        }
169
170        // Here, if the trace is valid, we should have `len(parent_id_to_child) == 1`.
171        if parent_id_to_child.len() != 1 {
172            debug!(
173                "Didn't reliably find the root span for traceID:{:016x}{:016x}",
174                trace.trace_id_high, trace.trace_id_low,
175            );
176        }
177
178        // Have a safe behavior if that's not the case.
179        // Pick a random span without its parent.
180        if let Some((_, child_idx)) = parent_id_to_child.iter().next() {
181            return Some(*child_idx);
182        }
183
184        // Gracefully fail with the last span of the trace.
185        Some(length - 1)
186    }
187
188    /// Check for user-set sampling priority in trace
189    fn get_user_priority(&self, trace: &Trace, root_span_idx: usize) -> Option<i32> {
190        // First check trace-level sampling priority (last-seen priority from OTLP ingest)
191        if let Some(priority) = trace.priority {
192            return Some(priority);
193        }
194
195        if trace.spans().is_empty() {
196            return None;
197        }
198
199        // Fall back to checking spans (for compatibility with non-OTLP traces)
200        // Prefer the root span (common case), but fall back to scanning all spans to be robust to ordering.
201        if let Some(root) = trace.spans().get(root_span_idx) {
202            if let Some(p) = root
203                .attributes
204                .get(SAMPLING_PRIORITY_METRIC_KEY)
205                .and_then(AttributeValue::as_num)
206            {
207                return Some(p as i32);
208            }
209        }
210        let spans = trace.spans();
211        spans.iter().find_map(|span| {
212            span.attributes
213                .get(SAMPLING_PRIORITY_METRIC_KEY)
214                .and_then(AttributeValue::as_num)
215                .map(|p| p as i32)
216        })
217    }
218
219    /// Returns `true` if the given trace ID should be probabilistically sampled.
220    fn sample_probabilistic(&self, trace_id: u64) -> bool {
221        probabilistic::ProbabilisticSampler::sample(trace_id, self.sampling_rate)
222    }
223
224    fn is_otlp_trace(&self, trace: &Trace, root_span_idx: usize) -> bool {
225        trace
226            .spans()
227            .get(root_span_idx)
228            .map(|span| {
229                span.attributes
230                    .contains_key(&MetaString::from_static(OTEL_TRACE_ID_META_KEY))
231            })
232            .unwrap_or(false)
233    }
234
235    /// Returns `true` if the trace contains a span with an error.
236    fn trace_contains_error(&self, trace: &Trace, consider_exception_span_events: bool) -> bool {
237        trace.spans().iter().any(|span| {
238            span.error() != 0 || (consider_exception_span_events && self.span_contains_exception_span_event(span))
239        })
240    }
241
242    /// Returns `true` if the span has exception span events.
243    ///
244    /// This checks for the `_dd.span_events.has_exception` meta field set to `"true"`.
245    fn span_contains_exception_span_event(&self, span: &Span) -> bool {
246        if let Some(has_exception) = span
247            .attributes
248            .get("_dd.span_events.has_exception")
249            .and_then(AttributeValue::as_string)
250        {
251            return has_exception == "true";
252        }
253        false
254    }
255
256    /// Computes the OTLP pre-sampling priority and decision maker for a trace, mirroring
257    /// `OTLPReceiver.createChunks` in DDA which runs before `runSamplersV1`.
258    ///
259    /// Returns `Some((priority, dm))` for OTLP traces when the probabilistic sampler is disabled,
260    /// or `None` if pre-sampling doesn't apply.
261    ///
262    /// See: https://github.com/DataDog/datadog-agent/blob/be33ac1490c4a34602cbc65a211406b73ad6d00b/pkg/trace/api/otlp.go#L561-L585
263    fn otlp_pre_sample(&mut self, trace: &mut Trace, root_span_idx: usize) -> Option<(i32, &'static str)> {
264        if self.probabilistic_sampler_enabled || !self.is_otlp_trace(trace, root_span_idx) {
265            return None;
266        }
267        let (priority, dm) = if let Some(user_priority) = self.get_user_priority(trace, root_span_idx) {
268            (user_priority, DECISION_MAKER_MANUAL)
269        } else {
270            let root_trace_id = trace.trace_id_low;
271            if sample_by_rate(root_trace_id, self.otlp_sampling_rate) {
272                (PRIORITY_AUTO_KEEP, DECISION_MAKER_PROBABILISTIC)
273            } else {
274                (PRIORITY_AUTO_DROP, DECISION_MAKER_PROBABILISTIC)
275            }
276        };
277        if priority == PRIORITY_AUTO_KEEP {
278            if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
279                root_span.attributes.remove(PROB_RATE_KEY);
280            }
281        }
282        Some((priority, dm))
283    }
284
285    /// Apply analyzed span sampling to the trace.
286    ///
287    /// Returns `true` if the trace was modified.
288    fn analyzed_span_sampling(&self, trace: &mut Trace) -> bool {
289        let retained = trace.retain_spans(|_, span| span.attributes.contains_key(KEY_ANALYZED_SPANS));
290        if retained > 0 {
291            trace.dropped_trace = false;
292            trace.priority = Some(PRIORITY_USER_KEEP);
293            trace.otlp_sampling_rate = Some(self.sampling_rate);
294            true
295        } else {
296            false
297        }
298    }
299
300    /// Returns `true` if the given trace has any analyzed spans.
301    fn has_analyzed_spans(&self, trace: &Trace) -> bool {
302        trace
303            .spans()
304            .iter()
305            .any(|span| span.attributes.contains_key(KEY_ANALYZED_SPANS))
306    }
307
308    /// Apply Single Span Sampling to the trace
309    /// Returns true if the trace was modified
310    fn single_span_sampling(&self, trace: &mut Trace) -> bool {
311        let retained = trace.retain_spans(|_, span| span.attributes.contains_key(KEY_SPAN_SAMPLING_MECHANISM));
312        if retained > 0 {
313            trace.dropped_trace = false;
314            trace.priority = Some(PRIORITY_USER_KEEP);
315            trace.otlp_sampling_rate = Some(self.sampling_rate);
316            true
317        } else {
318            false
319        }
320    }
321
322    /// Evaluates the given trace against all configured samplers.
323    ///
324    /// Return a tuple containing whether or not the trace should be kept, the decision maker tag (which sampler is responsible),
325    /// and the index of the root span used for evaluation.
326    fn run_samplers(&mut self, trace: &mut Trace) -> (bool, i32, &'static str, Option<usize>) {
327        // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1066
328        // Empty trace check
329        if trace.spans().is_empty() {
330            return (false, PRIORITY_AUTO_DROP, "", None);
331        }
332
333        let now = std::time::SystemTime::now();
334        let Some(root_span_idx) = self.get_root_span_index(trace) else {
335            return (false, PRIORITY_AUTO_DROP, "", None);
336        };
337
338        // ETS: only sample traces containing errors (including exception span events); skip all other samplers.
339        // logic taken from: https://github.com/DataDog/datadog-agent/blob/be33ac1490c4a34602cbc65a211406b73ad6d00b/pkg/trace/agent/agent.go#L1068
340        if self.error_tracking_standalone {
341            let otlp_pre_sample = self.otlp_pre_sample(trace, root_span_idx);
342            if self.trace_contains_error(trace, true) {
343                let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
344                let default_priority = if keep { PRIORITY_AUTO_KEEP } else { PRIORITY_AUTO_DROP };
345                let (priority, dm) = otlp_pre_sample.unwrap_or((default_priority, ""));
346                return (keep, priority, dm, Some(root_span_idx));
347            }
348            let (pre_priority, pre_dm) = otlp_pre_sample.unwrap_or((PRIORITY_AUTO_DROP, ""));
349            return (false, pre_priority, pre_dm, Some(root_span_idx));
350        }
351
352        let contains_error = self.trace_contains_error(trace, false);
353
354        // Run the rare sampler early, before all other samplers. This mirrors the Go agent behavior
355        // where the rare sampler runs first to catch traces that would otherwise be dropped entirely.
356        // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1078
357        let rare = self.rare_sampler.sample(trace, root_span_idx);
358
359        // Modern path: ProbabilisticSamplerEnabled = true
360        if self.probabilistic_sampler_enabled {
361            let mut prob_keep = false;
362            let mut decision_maker = "";
363
364            if rare {
365                // Rare sampler wins over probabilistic sampling.
366                prob_keep = true;
367            } else {
368                // Run probabilistic sampler - use trace ID
369                let root_trace_id = trace.trace_id_low;
370                if self.sample_probabilistic(root_trace_id) {
371                    decision_maker = DECISION_MAKER_PROBABILISTIC;
372                    prob_keep = true;
373
374                    if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
375                        root_span.attributes.insert(
376                            MetaString::from(PROB_RATE_KEY),
377                            AttributeValue::Float(self.sampling_rate),
378                        );
379                    }
380                } else if self.error_sampling_enabled && contains_error {
381                    prob_keep = self.error_sampler.sample_error(now, trace, root_span_idx);
382                }
383            }
384
385            let priority = if prob_keep {
386                PRIORITY_AUTO_KEEP
387            } else {
388                PRIORITY_AUTO_DROP
389            };
390
391            return (prob_keep, priority, decision_maker, Some(root_span_idx));
392        }
393
394        let user_priority = self.get_user_priority(trace, root_span_idx);
395        if let Some(priority) = user_priority {
396            if priority < PRIORITY_AUTO_DROP {
397                // Manual drop: short-circuit and skip other samplers.
398                return (false, priority, "", Some(root_span_idx));
399            }
400
401            if rare {
402                return (true, priority, "", Some(root_span_idx));
403            }
404
405            if self.priority_sampler.sample(now, trace, root_span_idx, priority, 0.0) {
406                return (true, priority, "", Some(root_span_idx));
407            }
408        } else if self.is_otlp_trace(trace, root_span_idx) {
409            // Rare check mirrors agent behavior: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1129-L1140
410            if rare {
411                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
412            }
413
414            // some sampling happens upstream in the otlp receiver in the agent: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L572
415            let root_trace_id = trace.trace_id_low;
416            if sample_by_rate(root_trace_id, self.otlp_sampling_rate) {
417                if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) {
418                    root_span.attributes.remove(PROB_RATE_KEY);
419                }
420                return (
421                    true,
422                    PRIORITY_AUTO_KEEP,
423                    DECISION_MAKER_PROBABILISTIC,
424                    Some(root_span_idx),
425                );
426            }
427        } else {
428            if rare {
429                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
430            }
431            if self.no_priority_sampler.sample(now, trace, root_span_idx) {
432                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
433            }
434        }
435
436        if self.error_sampling_enabled && contains_error {
437            let keep = self.error_sampler.sample_error(now, trace, root_span_idx);
438            if keep {
439                return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx));
440            }
441        }
442
443        // Default: drop the trace
444        (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx))
445    }
446
447    /// Apply sampling metadata to the trace in-place.
448    ///
449    /// The `root_span_id` parameter identifies which span should receive the sampling metadata.
450    /// This avoids recalculating the root span since it was already found in `run_samplers`.
451    fn apply_sampling_metadata(
452        &self, trace: &mut Trace, keep: bool, priority: i32, decision_maker: &str, root_span_idx: usize,
453    ) {
454        let is_otlp = self.is_otlp_trace(trace, root_span_idx);
455        let root_span_value = match trace.spans_mut().get_mut(root_span_idx) {
456            Some(span) => span,
457            None => return,
458        };
459
460        // Add tag for the decision maker
461        let existing_decision_maker = if decision_maker.is_empty() {
462            root_span_value
463                .attributes
464                .get(TAG_DECISION_MAKER)
465                .and_then(AttributeValue::as_string)
466                .cloned()
467        } else {
468            None
469        };
470        let decision_maker_meta = if decision_maker.is_empty() {
471            existing_decision_maker
472        } else {
473            Some(MetaString::from(decision_maker))
474        };
475
476        // When the APM-level probabilistic sampler is used with OTLP traces, the DD Agent writes
477        // _dd.p.dm to trace chunk tags only (not span meta). For the legacy OTLP sampling path,
478        // it is written to both. We match that behavior by skipping the span meta write only when
479        // both conditions hold; the DM value still flows through trace fields to the encoder.
480        if priority > 0 && !(is_otlp && self.probabilistic_sampler_enabled) {
481            if let Some(dm) = decision_maker_meta.as_ref() {
482                root_span_value
483                    .attributes
484                    .insert(MetaString::from(TAG_DECISION_MAKER), AttributeValue::String(dm.clone()));
485            }
486        }
487
488        // Now set sampling metadata directly on the trace.
489        trace.dropped_trace = !keep;
490        trace.priority = Some(priority);
491        trace.decision_maker = if priority > 0 { decision_maker_meta } else { None };
492        trace.otlp_sampling_rate = Some(if is_otlp {
493            self.otlp_sampling_rate
494        } else {
495            self.sampling_rate
496        });
497    }
498
499    fn process_trace(&mut self, trace: &mut Trace) -> bool {
500        // keep is a boolean that indicates if the trace should be kept or dropped
501        // priority is the sampling priority
502        // decision_maker is the tag that indicates the decision maker (probabilistic, error, etc.)
503        // root_span_idx is the index of the root span of the trace
504        let (keep, priority, decision_maker, root_span_idx) = self.run_samplers(trace);
505
506        // Apply sampling metadata and forward if kept, or if ETS (dropped non-error traces are
507        // forwarded with DroppedTrace=true, suppressing SSS/analytics).
508        if keep || self.error_tracking_standalone {
509            if let Some(root_idx) = root_span_idx {
510                self.apply_sampling_metadata(trace, keep, priority, decision_maker, root_idx);
511            }
512            return true;
513        }
514
515        // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L980-L990
516        // try single span sampling (keeps spans marked for sampling when trace would be dropped)
517        let modified = self.single_span_sampling(trace);
518        if !modified {
519            // Fall back to analytics events if no SSS spans
520            if self.analyzed_span_sampling(trace) {
521                return true;
522            }
523        } else if self.has_analyzed_spans(trace) {
524            // Warn about both SSS and analytics events
525            debug!(
526                "Detected both analytics events AND single span sampling in the same trace. Single span sampling wins because App Analytics is deprecated."
527            );
528            return true;
529        }
530
531        // If we modified the trace with SSS, send it
532        if modified {
533            return true;
534        }
535
536        // Neither SSS nor analytics events found, drop the trace
537        debug!("Dropping trace with priority {}", priority);
538        false
539    }
540}
541
542impl SynchronousTransform for TraceSampler {
543    fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
544        buffer.remove_if(|event| match event {
545            Event::Trace(trace) => !self.process_trace(trace),
546            _ => false,
547        });
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use std::collections::HashMap;
554
555    use saluki_core::data_model::event::trace::{AttributeValue, Span as DdSpan, Trace};
556    const PRIORITY_USER_DROP: i32 = -1;
557
558    use super::*;
559    fn create_test_sampler() -> TraceSampler {
560        TraceSampler {
561            sampling_rate: 1.0,
562            error_sampling_enabled: true,
563            error_tracking_standalone: false,
564            probabilistic_sampler_enabled: true,
565            otlp_sampling_rate: 1.0,
566            error_sampler: errors::ErrorsSampler::new(10.0, 1.0),
567            priority_sampler: priority_sampler::PrioritySampler::new(MetaString::from("agent-env"), 1.0, 10.0),
568            no_priority_sampler: score_sampler::NoPrioritySampler::new(10.0, 1.0),
569            rare_sampler: rare_sampler::RareSampler::new(false, 5.0, std::time::Duration::from_secs(300), 200),
570        }
571    }
572
573    fn create_test_span(span_id: u64, error: i32) -> DdSpan {
574        DdSpan::new(
575            MetaString::from("test-service"),
576            MetaString::from("test-operation"),
577            MetaString::from("test-resource"),
578            MetaString::from("test-type"),
579            span_id,
580            0,    // parent_id
581            0,    // start
582            1000, // duration
583            error,
584        )
585    }
586
587    fn create_test_span_with_metrics(span_id: u64, metrics: HashMap<String, f64>) -> DdSpan {
588        let attrs: saluki_common::collections::FastHashMap<MetaString, AttributeValue> = metrics
589            .into_iter()
590            .map(|(k, v)| (MetaString::from(k), AttributeValue::Float(v)))
591            .collect();
592        create_test_span(span_id, 0).with_attributes(attrs)
593    }
594
595    #[allow(dead_code)]
596    fn create_test_span_with_meta(span_id: u64, meta: HashMap<String, String>) -> DdSpan {
597        let attrs: saluki_common::collections::FastHashMap<MetaString, AttributeValue> = meta
598            .into_iter()
599            .map(|(k, v)| (MetaString::from(k), AttributeValue::String(MetaString::from(v))))
600            .collect();
601        create_test_span(span_id, 0).with_attributes(attrs)
602    }
603
604    fn create_test_trace(spans: Vec<DdSpan>) -> Trace {
605        Trace::new(spans)
606    }
607
608    #[test]
609    fn test_user_priority_detection() {
610        let sampler = create_test_sampler();
611
612        // Test trace with user-set priority = 2 (UserKeep)
613        let mut metrics = HashMap::new();
614        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
615        let span = create_test_span_with_metrics(1, metrics);
616        let trace = create_test_trace(vec![span]);
617        let root_idx = sampler.get_root_span_index(&trace).unwrap();
618
619        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
620
621        // Test trace with user-set priority = -1 (UserDrop)
622        let mut metrics = HashMap::new();
623        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), -1.0);
624        let span = create_test_span_with_metrics(1, metrics);
625        let trace = create_test_trace(vec![span]);
626        let root_idx = sampler.get_root_span_index(&trace).unwrap();
627
628        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(-1));
629
630        // Test trace without user priority
631        let span = create_test_span(1, 0);
632        let trace = create_test_trace(vec![span]);
633        let root_idx = sampler.get_root_span_index(&trace).unwrap();
634
635        assert_eq!(sampler.get_user_priority(&trace, root_idx), None);
636    }
637
638    #[test]
639    fn test_trace_level_priority_takes_precedence() {
640        let sampler = create_test_sampler();
641
642        // Test trace-level priority overrides span priorities (last-seen priority)
643        // Create spans with different priorities - root has 0, later span has 2
644        let mut metrics_root = HashMap::new();
645        metrics_root.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 0.0);
646        let root_span = create_test_span_with_metrics(1, metrics_root);
647
648        let mut metrics_later = HashMap::new();
649        metrics_later.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 1.0);
650        let later_span = create_test_span_with_metrics(2, metrics_later).with_parent_id(1);
651
652        let mut trace = create_test_trace(vec![root_span, later_span]);
653        let root_idx = sampler.get_root_span_index(&trace).unwrap();
654
655        // Without trace-level priority, should get priority from root (0)
656        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(0));
657
658        // Now set trace-level priority to 2 (simulating last-seen priority from OTLP translator)
659        trace.priority = Some(2);
660
661        // Trace-level priority should take precedence
662        assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2));
663
664        // Test that trace-level priority is used even when no span has priority
665        let span_no_priority = create_test_span(3, 0);
666        let mut trace_only_trace_level = create_test_trace(vec![span_no_priority]);
667        trace_only_trace_level.priority = Some(1);
668        let root_idx = sampler.get_root_span_index(&trace_only_trace_level).unwrap();
669
670        assert_eq!(sampler.get_user_priority(&trace_only_trace_level, root_idx), Some(1));
671    }
672
673    #[test]
674    fn test_manual_keep_with_trace_level_priority() {
675        let mut sampler = create_test_sampler();
676        sampler.probabilistic_sampler_enabled = false; // Use legacy path that checks user priority
677
678        // Test that manual keep (priority = 2) works via trace-level priority
679        let span = create_test_span(1, 0);
680        let mut trace = create_test_trace(vec![span]);
681        trace.priority = Some(PRIORITY_USER_KEEP);
682
683        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
684        assert!(keep);
685        assert_eq!(priority, PRIORITY_USER_KEEP);
686        assert_eq!(decision_maker, "");
687
688        // Test manual drop (priority = -1) via trace-level priority
689        let span = create_test_span(1, 0);
690        let mut trace = create_test_trace(vec![span]);
691        trace.priority = Some(PRIORITY_USER_DROP);
692
693        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
694        assert!(!keep); // Should not keep when user drops
695        assert_eq!(priority, PRIORITY_USER_DROP);
696
697        // Test that priority = 1 (auto keep) via trace-level is also respected
698        let span = create_test_span(1, 0);
699        let mut trace = create_test_trace(vec![span]);
700        trace.priority = Some(PRIORITY_AUTO_KEEP);
701
702        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
703        assert!(keep);
704        assert_eq!(priority, PRIORITY_AUTO_KEEP);
705        assert_eq!(decision_maker, "");
706    }
707
708    #[test]
709    fn test_probabilistic_sampling_determinism() {
710        let sampler = create_test_sampler();
711
712        // Same trace ID should always produce same decision
713        let trace_id = 0x1234567890ABCDEF_u64;
714        let result1 = sampler.sample_probabilistic(trace_id);
715        let result2 = sampler.sample_probabilistic(trace_id);
716        assert_eq!(result1, result2);
717    }
718
719    #[test]
720    fn test_error_detection() {
721        let sampler = create_test_sampler();
722
723        // Test trace with error field set
724        let span_with_error = create_test_span(1, 1);
725        let trace = create_test_trace(vec![span_with_error]);
726        assert!(sampler.trace_contains_error(&trace, false));
727
728        // Test trace without error
729        let span_without_error = create_test_span(1, 0);
730        let trace = create_test_trace(vec![span_without_error]);
731        assert!(!sampler.trace_contains_error(&trace, false));
732    }
733
734    #[test]
735    fn test_sampling_priority_order() {
736        // Test modern path: error sampler overrides probabilistic drop
737        let mut sampler = create_test_sampler();
738        sampler.sampling_rate = 0.5; // 50% sampling rate
739        sampler.probabilistic_sampler_enabled = true;
740
741        // Create trace with error that would be dropped by probabilistic
742        // Using a trace ID that we know will be dropped at 50% rate
743        let span_with_error = create_test_span(1, 1);
744        let mut trace = create_test_trace(vec![span_with_error]);
745        trace.trace_id_low = u64::MAX - 1;
746
747        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
748        assert!(keep);
749        assert_eq!(priority, PRIORITY_AUTO_KEEP);
750        assert_eq!(decision_maker, ""); // Error sampler doesn't set decision_maker
751
752        // Test legacy path: user priority is respected
753        let mut sampler = create_test_sampler();
754        sampler.probabilistic_sampler_enabled = false; // Use legacy path
755
756        let mut metrics = HashMap::new();
757        metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0);
758        let span = create_test_span_with_metrics(1, metrics);
759        let mut trace = create_test_trace(vec![span]);
760
761        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
762        assert!(keep);
763        assert_eq!(priority, 2); // UserKeep
764        assert_eq!(decision_maker, "");
765    }
766
767    #[test]
768    fn test_empty_trace_handling() {
769        let mut sampler = create_test_sampler();
770        let mut trace = create_test_trace(vec![]);
771
772        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
773        assert!(!keep);
774        assert_eq!(priority, PRIORITY_AUTO_DROP);
775    }
776
777    #[test]
778    fn test_root_span_detection() {
779        let sampler = create_test_sampler();
780
781        // Test 1: Root span with parent_id = 0 (common case)
782        let root_span = DdSpan::new(
783            MetaString::from("service"),
784            MetaString::from("operation"),
785            MetaString::from("resource"),
786            MetaString::from("type"),
787            1,
788            0, // parent_id = 0 indicates root
789            0,
790            1000,
791            0,
792        );
793        let child_span = DdSpan::new(
794            MetaString::from("service"),
795            MetaString::from("child_op"),
796            MetaString::from("resource"),
797            MetaString::from("type"),
798            2,
799            1, // parent_id = 1 (points to root)
800            100,
801            500,
802            0,
803        );
804        // Put root span second to test that we find it even when not first
805        let trace = create_test_trace(vec![child_span.clone(), root_span.clone()]);
806        let root_idx = sampler.get_root_span_index(&trace).unwrap();
807        assert_eq!(trace.spans()[root_idx].span_id(), 1);
808
809        // Test 2: Orphaned span (parent not in trace)
810        let orphan_span = DdSpan::new(
811            MetaString::from("service"),
812            MetaString::from("orphan"),
813            MetaString::from("resource"),
814            MetaString::from("type"),
815            3,
816            999, // parent_id = 999 (doesn't exist in trace)
817            200,
818            300,
819            0,
820        );
821        let trace = create_test_trace(vec![orphan_span]);
822        let root_idx = sampler.get_root_span_index(&trace).unwrap();
823        assert_eq!(trace.spans()[root_idx].span_id(), 3);
824
825        // Test 3: Multiple root candidates: should return the last one found (index 1)
826        let span1 = create_test_span(1, 0);
827        let span2 = create_test_span(2, 0);
828        let trace = create_test_trace(vec![span1, span2]);
829        // Both have parent_id = 0, should return the last one found (span_id = 2)
830        let root_idx = sampler.get_root_span_index(&trace).unwrap();
831        assert_eq!(trace.spans()[root_idx].span_id(), 2);
832    }
833
834    #[test]
835    fn test_single_span_sampling() {
836        let mut sampler = create_test_sampler();
837
838        // Test 1: Trace with SSS tags should be kept even when probabilistic would drop it
839        sampler.sampling_rate = 0.0; // 0% sampling rate - should drop everything
840        sampler.probabilistic_sampler_enabled = true;
841
842        // Create span with SSS metric
843        let mut attrs_map = saluki_common::collections::FastHashMap::default();
844        attrs_map.insert(
845            MetaString::from(KEY_SPAN_SAMPLING_MECHANISM),
846            AttributeValue::Float(8.0),
847        );
848        let sss_span = create_test_span(1, 0).with_attributes(attrs_map.clone());
849
850        // Create regular span without SSS
851        let regular_span = create_test_span(2, 0);
852
853        let mut trace = create_test_trace(vec![sss_span.clone(), regular_span]);
854
855        // Apply SSS
856        let modified = sampler.single_span_sampling(&mut trace);
857        assert!(modified);
858        assert_eq!(trace.spans().len(), 1); // Only SSS span kept
859        assert_eq!(trace.spans()[0].span_id(), 1); // It's the SSS span
860
861        // Check that trace has been marked as kept with high priority
862        assert_eq!(trace.priority, Some(PRIORITY_USER_KEEP));
863
864        // Test 2: Trace without SSS tags should not be modified
865        let trace_without_sss = create_test_trace(vec![create_test_span(3, 0)]);
866        let mut trace_copy = trace_without_sss.clone();
867        let modified = sampler.single_span_sampling(&mut trace_copy);
868        assert!(!modified);
869        assert_eq!(trace_copy.spans().len(), trace_without_sss.spans().len());
870    }
871
872    #[test]
873    fn test_analytics_events() {
874        let sampler = create_test_sampler();
875
876        // Test 1: Trace with analyzed spans
877        let mut attrs_map = saluki_common::collections::FastHashMap::default();
878        attrs_map.insert(MetaString::from(KEY_ANALYZED_SPANS), AttributeValue::Float(1.0));
879        let analyzed_span = create_test_span(1, 0).with_attributes(attrs_map.clone());
880        let regular_span = create_test_span(2, 0);
881
882        let mut trace = create_test_trace(vec![analyzed_span.clone(), regular_span]);
883
884        let analyzed_span_ids: Vec<u64> = trace
885            .spans()
886            .iter()
887            .filter(|span| span.attributes.contains_key(KEY_ANALYZED_SPANS))
888            .map(|span| span.span_id())
889            .collect();
890        assert_eq!(analyzed_span_ids, vec![1]);
891
892        assert!(sampler.has_analyzed_spans(&trace));
893        let modified = sampler.analyzed_span_sampling(&mut trace);
894        assert!(modified);
895        assert_eq!(trace.spans().len(), 1);
896        assert_eq!(trace.spans()[0].span_id(), 1);
897        assert_eq!(trace.priority, Some(PRIORITY_USER_KEEP));
898
899        // Test 2: Trace without analyzed spans
900        let trace_no_analytics = create_test_trace(vec![create_test_span(3, 0)]);
901        let mut trace_no_analytics_copy = trace_no_analytics.clone();
902        let analyzed_span_ids: Vec<u64> = trace_no_analytics
903            .spans()
904            .iter()
905            .filter(|span| span.attributes.contains_key(KEY_ANALYZED_SPANS))
906            .map(|span| span.span_id())
907            .collect();
908        assert!(analyzed_span_ids.is_empty());
909        assert!(!sampler.has_analyzed_spans(&trace_no_analytics));
910        let modified = sampler.analyzed_span_sampling(&mut trace_no_analytics_copy);
911        assert!(!modified);
912        assert_eq!(trace_no_analytics_copy.spans().len(), trace_no_analytics.spans().len());
913    }
914
915    #[test]
916    fn test_probabilistic_sampling_with_prob_rate_key() {
917        let mut sampler = create_test_sampler();
918        sampler.sampling_rate = 0.75; // 75% sampling rate
919        sampler.probabilistic_sampler_enabled = true;
920
921        // Use a trace ID that we know will be sampled
922        let trace_id = 12345_u64;
923        let root_span = DdSpan::new(
924            MetaString::from("service"),
925            MetaString::from("operation"),
926            MetaString::from("resource"),
927            MetaString::from("type"),
928            1,
929            0, // parent_id = 0 indicates root
930            0,
931            1000,
932            0,
933        );
934        let mut trace = create_test_trace(vec![root_span]);
935        trace.trace_id_low = trace_id;
936
937        let (keep, priority, decision_maker, root_span_idx) = sampler.run_samplers(&mut trace);
938
939        if keep && decision_maker == DECISION_MAKER_PROBABILISTIC {
940            // If sampled probabilistically, check that probRateKey was already added
941            assert_eq!(priority, PRIORITY_AUTO_KEEP);
942            assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC); // probabilistic sampling marker
943
944            // Check that the root span already has the probRateKey (it should have been added in run_samplers)
945            let root_idx = root_span_idx.unwrap_or(0);
946            let root_span = &trace.spans()[root_idx];
947            assert!(root_span.attributes.contains_key(PROB_RATE_KEY));
948            assert_eq!(
949                root_span
950                    .attributes
951                    .get(PROB_RATE_KEY)
952                    .and_then(AttributeValue::as_float),
953                Some(0.75)
954            );
955
956            // Test that apply_sampling_metadata still works correctly for other metadata
957            let mut trace_with_metadata = trace.clone();
958            sampler.apply_sampling_metadata(&mut trace_with_metadata, keep, priority, decision_maker, root_idx);
959
960            // Check that decision maker tag was added
961            let modified_root = &trace_with_metadata.spans()[root_idx];
962            assert!(modified_root.attributes.contains_key(TAG_DECISION_MAKER));
963            assert_eq!(
964                modified_root
965                    .attributes
966                    .get(TAG_DECISION_MAKER)
967                    .and_then(AttributeValue::as_string),
968                Some(&MetaString::from(DECISION_MAKER_PROBABILISTIC))
969            );
970        }
971    }
972
973    // ── Rare-sampler interaction tests ──────────────────────────────────────────
974    // Adapted from datadog-agent/pkg/trace/agent/agent_test.go TestSampling cases:
975    // "rare-sampler-catch-unsampled", "rare-sampler-catch-sampled",
976    // "rare-sampler-disabled", and related probabilistic path interactions.
977
978    /// Create a top-level span eligible for rare sampling.
979    ///
980    /// The rare sampler only considers spans that have `_top_level=1` or `_dd.measured=1`.
981    /// This helper sets `_top_level=1` so that the rare sampler can consider the span.
982    fn create_top_level_span(span_id: u64) -> DdSpan {
983        let mut attrs = saluki_common::collections::FastHashMap::default();
984        attrs.insert(MetaString::from("_top_level"), AttributeValue::Float(1.0));
985        create_test_span(span_id, 0).with_attributes(attrs)
986    }
987
988    /// Create a `TraceSampler` with the rare sampler enabled and a very high TPS limit so it
989    /// freely samples first occurrences, plus a long TTL so second occurrences stay within TTL.
990    fn create_sampler_with_rare_enabled() -> TraceSampler {
991        TraceSampler {
992            rare_sampler: rare_sampler::RareSampler::new(true, 1000.0, std::time::Duration::from_secs(300), 200),
993            ..create_test_sampler()
994        }
995    }
996
997    /// Adapted from Go "rare-sampler-catch-unsampled":
998    ///
999    /// Rare is enabled + probabilistic would drop → rare catches it (first occurrence).
1000    #[test]
1001    fn rare_sampler_catches_unsampled_trace() {
1002        let mut sampler = create_sampler_with_rare_enabled();
1003        sampler.sampling_rate = 0.0; // probabilistic drops everything
1004        sampler.probabilistic_sampler_enabled = true;
1005
1006        let span = create_top_level_span(1);
1007        let mut trace = create_test_trace(vec![span]);
1008
1009        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1010        assert!(keep, "rare sampler should catch first occurrence");
1011        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1012        assert_eq!(decision_maker, "", "rare sampler does not set _dd.p.dm");
1013    }
1014
1015    /// Adapted from Go "rare-sampler-catch-sampled" (first trace):
1016    ///
1017    /// Rare is enabled, first occurrence—trace is kept and `_dd.rare` is set on the span.
1018    #[test]
1019    fn rare_sampler_sets_rare_metric_on_first_occurrence() {
1020        let mut sampler = create_sampler_with_rare_enabled();
1021        sampler.sampling_rate = 0.0;
1022        sampler.probabilistic_sampler_enabled = true;
1023
1024        let span = create_top_level_span(1);
1025        let mut trace = create_test_trace(vec![span]);
1026
1027        let (keep, _, _, root_idx) = sampler.run_samplers(&mut trace);
1028        assert!(keep);
1029        let root = &trace.spans()[root_idx.unwrap()];
1030        assert_eq!(
1031            root.attributes
1032                .get(rare_sampler::RARE_KEY)
1033                .and_then(AttributeValue::as_float),
1034            Some(1.0),
1035            "_dd.rare should be 1 on first occurrence"
1036        );
1037    }
1038
1039    /// Adapted from Go "rare-sampler-catch-sampled" (second trace same signature):
1040    ///
1041    /// Within the TTL, the same signature is no longer "rare" and rare doesn't re-sample it.
1042    /// With probabilistic at 0%, the trace should be dropped.
1043    #[test]
1044    fn rare_sampler_does_not_resample_within_ttl() {
1045        let mut sampler = create_sampler_with_rare_enabled();
1046        sampler.sampling_rate = 0.0;
1047        sampler.probabilistic_sampler_enabled = true;
1048
1049        // First trace: rare catches it.
1050        let span1 = create_top_level_span(1);
1051        let mut trace1 = create_test_trace(vec![span1]);
1052        let (keep1, _, _, _) = sampler.run_samplers(&mut trace1);
1053        assert!(keep1, "first occurrence should be kept by rare sampler");
1054
1055        // Second trace: same signature (same service/operation/resource on the top-level span),
1056        // still within TTL → rare won't catch it; probabilistic at 0% drops it.
1057        let span2 = create_top_level_span(2);
1058        let mut trace2 = create_test_trace(vec![span2]);
1059        let (keep2, priority2, _, _) = sampler.run_samplers(&mut trace2);
1060        assert!(!keep2, "second occurrence within TTL should be dropped");
1061        assert_eq!(priority2, PRIORITY_AUTO_DROP);
1062    }
1063
1064    /// Adapted from Go "rare-sampler-disabled":
1065    ///
1066    /// Rare is disabled + probabilistic at 0% → trace is dropped.
1067    #[test]
1068    fn rare_sampler_disabled_does_not_catch_unsampled() {
1069        let mut sampler = create_test_sampler(); // rare disabled by default
1070        sampler.sampling_rate = 0.0;
1071        sampler.probabilistic_sampler_enabled = true;
1072
1073        let span = create_top_level_span(1);
1074        let mut trace = create_test_trace(vec![span]);
1075
1076        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1077        assert!(!keep, "rare disabled should not catch the trace");
1078        assert_eq!(priority, PRIORITY_AUTO_DROP);
1079    }
1080
1081    /// Rare + non-probabilistic path (priority path): rare catches `PriorityAutoDrop` on first
1082    /// occurrence, preserving the tracer-set priority rather than upgrading to AutoKeep.
1083    #[test]
1084    fn rare_sampler_catches_priority_auto_drop_in_legacy_path() {
1085        let mut sampler = create_sampler_with_rare_enabled();
1086        sampler.probabilistic_sampler_enabled = false;
1087
1088        let mut attrs = saluki_common::collections::FastHashMap::default();
1089        attrs.insert(MetaString::from("_top_level"), AttributeValue::Float(1.0));
1090        attrs.insert(
1091            MetaString::from(SAMPLING_PRIORITY_METRIC_KEY),
1092            AttributeValue::Float(PRIORITY_AUTO_DROP as f64),
1093        );
1094        let span = create_test_span(1, 0).with_attributes(attrs);
1095        let mut trace = create_test_trace(vec![span]);
1096
1097        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1098        assert!(keep, "rare sampler should catch PriorityAutoDrop on first occurrence");
1099        assert_eq!(priority, PRIORITY_AUTO_DROP, "tracer-set priority should be preserved");
1100        assert_eq!(decision_maker, "");
1101    }
1102
1103    /// Rare + non-probabilistic path (priority path): UserKeep priority is preserved, not
1104    /// downgraded to AutoKeep. Mirrors Go agent behavior at agent.go#L1129-1131.
1105    #[test]
1106    fn rare_sampler_preserves_user_keep_priority_in_legacy_path() {
1107        let mut sampler = create_sampler_with_rare_enabled();
1108        sampler.probabilistic_sampler_enabled = false;
1109
1110        let mut attrs = saluki_common::collections::FastHashMap::default();
1111        attrs.insert(MetaString::from("_top_level"), AttributeValue::Float(1.0));
1112        attrs.insert(
1113            MetaString::from(SAMPLING_PRIORITY_METRIC_KEY),
1114            AttributeValue::Float(2.0),
1115        ); // UserKeep
1116        let span = create_test_span(1, 0).with_attributes(attrs);
1117        let mut trace = create_test_trace(vec![span]);
1118
1119        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1120        assert!(keep);
1121        assert_eq!(priority, 2, "UserKeep priority must not be downgraded to AutoKeep");
1122    }
1123
1124    /// Probabilistic path with 100% rate and rare disabled: keep with `_dd.p.dm = "-9"`.
1125    #[test]
1126    fn probabilistic_100_percent_keeps_trace_with_decision_maker() {
1127        let mut sampler = create_test_sampler(); // rare disabled
1128        sampler.sampling_rate = 1.0;
1129        sampler.probabilistic_sampler_enabled = true;
1130
1131        let span = create_top_level_span(1);
1132        let mut trace = create_test_trace(vec![span]);
1133
1134        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1135        assert!(keep);
1136        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1137        assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC);
1138    }
1139
1140    /// Probabilistic path with 0% rate and rare disabled: drop.
1141    #[test]
1142    fn probabilistic_0_percent_drops_trace() {
1143        let mut sampler = create_test_sampler(); // rare disabled
1144        sampler.sampling_rate = 0.0;
1145        sampler.probabilistic_sampler_enabled = true;
1146        sampler.error_sampling_enabled = false;
1147
1148        let span = create_top_level_span(1);
1149        let mut trace = create_test_trace(vec![span]);
1150
1151        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1152        assert!(!keep);
1153        assert_eq!(priority, PRIORITY_AUTO_DROP);
1154    }
1155
1156    /// Rare sampler should catch OTLP traces without a sampling priority on their first occurrence,
1157    /// matching the Go agent behavior: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1129-L1140
1158    #[test]
1159    fn rare_sampler_catches_otlp_no_priority_trace() {
1160        let mut sampler = create_sampler_with_rare_enabled();
1161        sampler.probabilistic_sampler_enabled = false;
1162        sampler.error_sampling_enabled = false;
1163        sampler.otlp_sampling_rate = 0.0;
1164
1165        let mut span = create_top_level_span(1);
1166        span.attributes.insert(
1167            MetaString::from_static(OTEL_TRACE_ID_META_KEY),
1168            AttributeValue::String(MetaString::from("00000000000000000000000000000001")),
1169        );
1170        let mut trace = create_test_trace(vec![span]);
1171
1172        let (keep, priority, decision_maker, root_idx) = sampler.run_samplers(&mut trace);
1173        assert!(
1174            keep,
1175            "rare sampler should keep OTLP trace with no priority on first occurrence"
1176        );
1177        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1178        assert_eq!(decision_maker, "");
1179        assert_eq!(
1180            trace.spans()[root_idx.unwrap()]
1181                .attributes
1182                .get(rare_sampler::RARE_KEY)
1183                .and_then(AttributeValue::as_float),
1184            Some(1.0),
1185            "_dd.rare should be set to 1 on first occurrence"
1186        );
1187    }
1188
1189    /// Adapted from Go "probabilistic-rare-100":
1190    ///
1191    /// Rare fires before probabilistic is consulted, so even at 100% sampling rate the decision
1192    /// maker tag isn't set—the trace is attributed to rare, not probabilistic.
1193    #[test]
1194    fn rare_wins_over_probabilistic_no_decision_maker_tag() {
1195        let mut sampler = create_sampler_with_rare_enabled();
1196        sampler.sampling_rate = 1.0;
1197        sampler.probabilistic_sampler_enabled = true;
1198
1199        let span = create_top_level_span(1);
1200        let mut trace = create_test_trace(vec![span]);
1201
1202        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1203        assert!(keep);
1204        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1205        assert_eq!(decision_maker, "", "rare takes precedence—_dd.p.dm must not be set");
1206    }
1207
1208    /// Adapted from Go "error-sampled-prio-unsampled":
1209    ///
1210    /// Rare fires before the error sampler is reached. A no-priority error trace on its first
1211    /// occurrence is kept by rare, not by the error sampler.
1212    #[test]
1213    fn rare_catches_error_trace_before_error_sampler() {
1214        let mut sampler = create_sampler_with_rare_enabled();
1215        sampler.probabilistic_sampler_enabled = false;
1216        sampler.error_sampling_enabled = true;
1217
1218        let span = create_top_level_span(1);
1219        let error_span = create_test_span(2, 1); // error=1
1220        let mut trace = create_test_trace(vec![span, error_span]);
1221
1222        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1223        assert!(keep, "rare should catch the trace before the error sampler");
1224        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1225        assert_eq!(decision_maker, "");
1226    }
1227
1228    /// Adapted from Go manual-drop short-circuit behavior:
1229    ///
1230    /// UserDrop (-1) priority is checked before rare runs in the priority path. A UserDrop trace
1231    /// must be dropped even when rare is enabled and would otherwise match.
1232    #[test]
1233    fn manual_drop_short_circuits_before_rare() {
1234        let mut sampler = create_sampler_with_rare_enabled();
1235        sampler.probabilistic_sampler_enabled = false;
1236
1237        let mut attrs = saluki_common::collections::FastHashMap::default();
1238        attrs.insert(MetaString::from("_top_level"), AttributeValue::Float(1.0));
1239        attrs.insert(
1240            MetaString::from(SAMPLING_PRIORITY_METRIC_KEY),
1241            AttributeValue::Float(-1.0),
1242        ); // UserDrop
1243        let span = create_test_span(1, 0).with_attributes(attrs);
1244        let mut trace = create_test_trace(vec![span]);
1245
1246        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1247        assert!(!keep, "UserDrop must be dropped even when rare would match");
1248        assert_eq!(priority, -1);
1249    }
1250
1251    // ── Error Tracking Standalone tests ─────────────────────────────────────────
1252    // Adapted from datadog-agent/pkg/trace/agent/agent.go runSamplers ETS block.
1253
1254    fn create_sampler_with_ets() -> TraceSampler {
1255        TraceSampler {
1256            error_tracking_standalone: true,
1257            ..create_test_sampler()
1258        }
1259    }
1260
1261    /// ETS enabled + trace with error → kept by error sampler.
1262    #[test]
1263    fn ets_keeps_trace_with_error() {
1264        let mut sampler = create_sampler_with_ets();
1265
1266        let span = create_test_span(1, 1); // error=1
1267        let mut trace = create_test_trace(vec![span]);
1268
1269        let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace);
1270        assert!(keep, "ETS should keep traces with errors");
1271        assert_eq!(priority, PRIORITY_AUTO_KEEP);
1272        assert_eq!(decision_maker, "", "ETS does not set a decision maker");
1273    }
1274
1275    /// ETS enabled + trace without error → dropped; rare/probabilistic/priority not consulted.
1276    #[test]
1277    fn ets_drops_trace_without_error() {
1278        let mut sampler = create_sampler_with_ets();
1279
1280        let span = create_test_span(1, 0); // error=0
1281        let mut trace = create_test_trace(vec![span]);
1282
1283        let (keep, priority, _, _) = sampler.run_samplers(&mut trace);
1284        assert!(!keep, "ETS should drop traces without errors");
1285        assert_eq!(priority, PRIORITY_AUTO_DROP);
1286    }
1287
1288    /// ETS enabled + non-error trace → forwarded with DroppedTrace=true; SSS/analytics suppressed.
1289    #[test]
1290    fn ets_forwards_dropped_trace_with_dropped_flag() {
1291        let mut sampler = create_sampler_with_ets();
1292
1293        // Span with SSS metric — would trigger single span sampling in non-ETS mode.
1294        let mut attrs = saluki_common::collections::FastHashMap::default();
1295        attrs.insert(
1296            MetaString::from(KEY_SPAN_SAMPLING_MECHANISM),
1297            AttributeValue::Float(8.0),
1298        );
1299        let span = create_test_span(1, 0).with_attributes(attrs);
1300        let mut trace = create_test_trace(vec![span]);
1301
1302        let forwarded = sampler.process_trace(&mut trace);
1303        assert!(forwarded, "ETS should forward non-error traces to intake");
1304        assert!(trace.dropped_trace, "non-error ETS trace should have DroppedTrace=true");
1305    }
1306
1307    /// ETS enabled + trace with exception span event → kept (exception events count as errors in ETS).
1308    #[test]
1309    fn ets_keeps_trace_with_exception_span_event() {
1310        let mut sampler = create_sampler_with_ets();
1311
1312        // Span with error=0 but exception span event metadata.
1313        let mut attrs = saluki_common::collections::FastHashMap::default();
1314        attrs.insert(
1315            MetaString::from("_dd.span_events.has_exception"),
1316            AttributeValue::String(MetaString::from("true")),
1317        );
1318        let span = create_test_span(1, 0).with_attributes(attrs);
1319        let mut trace = create_test_trace(vec![span]);
1320
1321        let (keep, _, _, _) = sampler.run_samplers(&mut trace);
1322        assert!(keep, "ETS should treat exception span events as errors");
1323    }
1324
1325    /// ETS disabled → normal sampling path (probabilistic) is used.
1326    #[test]
1327    fn ets_disabled_uses_normal_sampling() {
1328        let mut sampler = create_test_sampler(); // ETS disabled
1329        sampler.sampling_rate = 1.0;
1330        sampler.probabilistic_sampler_enabled = true;
1331
1332        let span = create_test_span(1, 0); // no error
1333        let mut trace = create_test_trace(vec![span]);
1334
1335        let (keep, _, decision_maker, _) = sampler.run_samplers(&mut trace);
1336        assert!(keep, "normal probabilistic sampling should keep the trace");
1337        assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC);
1338    }
1339
1340    // ── ETS + OTLP pre-sampling tests ────────────────────────────────────────────
1341    // These mirror DDA's OTLPReceiver.createChunks behavior which pre-assigns
1342    // priority/dm before runSamplersV1, so ETS sees those values even when it
1343    // short-circuits. See: pkg/trace/api/otlp.go#L561-L585.
1344
1345    fn create_otlp_test_span(span_id: u64, error: i32) -> DdSpan {
1346        let mut attrs = saluki_common::collections::FastHashMap::default();
1347        attrs.insert(
1348            MetaString::from_static(OTEL_TRACE_ID_META_KEY),
1349            AttributeValue::String(MetaString::from("0000000000000000deadbeefcafebabe")),
1350        );
1351        create_test_span(span_id, error).with_attributes(attrs)
1352    }
1353
1354    fn create_sampler_with_ets_legacy() -> TraceSampler {
1355        TraceSampler {
1356            error_tracking_standalone: true,
1357            probabilistic_sampler_enabled: false,
1358            otlp_sampling_rate: 1.0,
1359            ..create_test_sampler()
1360        }
1361    }
1362
1363    /// ETS + OTLP non-error trace (legacy sampler path): pre-sampling sets priority=AutoKeep and dm=-9.
1364    /// Mirrors DDA `OTLPReceiver` assigning priority=1 + dm=-9 before ETS returns early.
1365    #[test]
1366    fn ets_otlp_non_error_gets_presample_priority_and_dm() {
1367        let mut sampler = create_sampler_with_ets_legacy();
1368
1369        let span = create_otlp_test_span(1, 0); // no error
1370        let mut trace = create_test_trace(vec![span]);
1371
1372        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1373        assert!(!keep, "ETS should drop non-error OTLP traces");
1374        assert_eq!(
1375            priority, PRIORITY_AUTO_KEEP,
1376            "OTLP pre-sampling sets priority=AutoKeep even for ETS-dropped traces"
1377        );
1378        assert_eq!(dm, DECISION_MAKER_PROBABILISTIC, "OTLP pre-sampling sets dm=-9");
1379    }
1380
1381    /// ETS + OTLP error trace (legacy sampler path): pre-sampling sets priority=AutoKeep and dm=-9.
1382    #[test]
1383    fn ets_otlp_error_gets_presample_priority_and_dm() {
1384        let mut sampler = create_sampler_with_ets_legacy();
1385
1386        let span = create_otlp_test_span(1, 1); // error=1
1387        let mut trace = create_test_trace(vec![span]);
1388
1389        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1390        assert!(keep, "ETS should keep error OTLP traces");
1391        assert_eq!(priority, PRIORITY_AUTO_KEEP, "OTLP pre-sampling sets priority=AutoKeep");
1392        assert_eq!(dm, DECISION_MAKER_PROBABILISTIC, "OTLP pre-sampling sets dm=-9");
1393    }
1394
1395    /// ETS + OTLP + probabilistic_sampler_enabled=true: `OTLPReceiver` defers, no pre-sampling.
1396    /// DDA's `OTLPReceiver` sets PriorityNone and skips when ProbabilisticSamplerEnabled.
1397    #[test]
1398    fn ets_otlp_probabilistic_path_skips_presample() {
1399        let mut sampler = create_sampler_with_ets_legacy();
1400        sampler.probabilistic_sampler_enabled = true; // override to prob path
1401
1402        let span = create_otlp_test_span(1, 0); // no error
1403        let mut trace = create_test_trace(vec![span]);
1404
1405        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1406        assert!(!keep, "ETS should drop non-error traces");
1407        assert_eq!(
1408            priority, PRIORITY_AUTO_DROP,
1409            "no pre-sampling when probabilistic path active"
1410        );
1411        assert_eq!(dm, "", "no dm when probabilistic path active");
1412    }
1413
1414    /// ETS + non-OTLP trace (legacy sampler path): behavior unchanged—no pre-sampling.
1415    #[test]
1416    fn ets_non_otlp_unaffected_by_presample() {
1417        let mut sampler = create_sampler_with_ets_legacy();
1418
1419        let span = create_test_span(1, 0); // no error, no OTLP meta
1420        let mut trace = create_test_trace(vec![span]);
1421
1422        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1423        assert!(!keep, "ETS should drop non-error non-OTLP traces");
1424        assert_eq!(priority, PRIORITY_AUTO_DROP, "non-OTLP traces use default ETS priority");
1425        assert_eq!(dm, "", "non-OTLP traces get no dm");
1426    }
1427
1428    /// ETS + OTLP trace with user-set priority: dm="-4" (manual sampling), matching DDA.
1429    #[test]
1430    fn ets_otlp_user_priority_gets_manual_dm() {
1431        let mut sampler = create_sampler_with_ets_legacy();
1432
1433        let mut span = create_otlp_test_span(1, 0);
1434        span.attributes.insert(
1435            MetaString::from(SAMPLING_PRIORITY_METRIC_KEY),
1436            AttributeValue::Float(2.0),
1437        ); // UserKeep
1438        let mut trace = create_test_trace(vec![span]);
1439
1440        let (keep, priority, dm, _) = sampler.run_samplers(&mut trace);
1441        assert!(!keep, "ETS drops non-error traces regardless of user priority");
1442        assert_eq!(priority, PRIORITY_USER_KEEP, "user priority is preserved");
1443        assert_eq!(dm, DECISION_MAKER_MANUAL, "user-set priority gets dm=-4");
1444    }
1445}