dd_sds/scanner/
mod.rs

1use crate::encoding::Encoding;
2use crate::event::Event;
3use std::future::Future;
4
5use crate::match_validation::{
6    config::InternalMatchValidationType, config::MatchValidationType, match_status::MatchStatus,
7    match_validator::MatchValidator,
8};
9
10use error::MatchValidatorCreationError;
11
12use self::metrics::ScannerMetrics;
13use crate::match_validation::match_validator::RAYON_THREAD_POOL;
14use crate::observability::labels::Labels;
15use crate::rule_match::{InternalRuleMatch, RuleMatch};
16use crate::scanner::config::RuleConfig;
17use crate::scanner::internal_rule_match_set::InternalRuleMatchSet;
18use crate::scanner::regex_rule::compiled::RegexCompiledRule;
19use crate::scanner::regex_rule::{RegexCaches, access_regex_caches};
20use crate::scanner::scope::Scope;
21pub use crate::scanner::shared_data::SharedData;
22use crate::scanner::suppression::{CompiledSuppressions, SuppressionValidationError, Suppressions};
23use crate::scoped_ruleset::{ContentVisitor, ExclusionCheck, ScopedRuleSet};
24pub use crate::secondary_validation::Validator;
25use crate::stats::GLOBAL_STATS;
26use crate::tokio::TOKIO_RUNTIME;
27use crate::{CreateScannerError, EncodeIndices, MatchAction, Path, ScannerError};
28use ahash::AHashMap;
29use futures::executor::block_on;
30use serde::{Deserialize, Serialize};
31use serde_with::serde_as;
32use std::ops::Deref;
33use std::pin::Pin;
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36use tokio::task::JoinHandle;
37use tokio::time::timeout;
38
39pub mod config;
40pub mod debug_scan;
41pub mod error;
42pub mod metrics;
43pub mod regex_rule;
44pub mod scope;
45pub mod shared_data;
46pub mod shared_pool;
47pub mod suppression;
48
49mod internal_rule_match_set;
50#[cfg(test)]
51mod test;
52
53#[derive(Clone)]
54pub struct StringMatch {
55    pub start: usize,
56    pub end: usize,
57    // The keyword that was used to match this rule. Optional, only some rules may set this value.
58    pub keyword: Option<String>,
59}
60
61pub trait MatchEmitter<T = ()> {
62    fn emit(&mut self, string_match: StringMatch) -> T;
63}
64
65// This implements MatchEmitter for mutable closures (so you can use a closure instead of a custom
66// struct that implements MatchEmitter)
67impl<F, T> MatchEmitter<T> for F
68where
69    F: FnMut(StringMatch) -> T,
70{
71    fn emit(&mut self, string_match: StringMatch) -> T {
72        // This just calls the closure (itself)
73        (self)(string_match)
74    }
75}
76
77/// The precedence of a rule. Catchall is the lowest precedence, Specific is the highest precedence.
78/// The default precedence is Specific.
79/// For rules that:
80/// - Have the same mutation priority
81/// - Match at the same index
82/// - Match the same number of characters
83///
84/// Then the rule with the highest precedence will be used.
85#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Default)]
86pub enum Precedence {
87    Catchall,
88    Generic,
89    #[default]
90    Specific,
91}
92
93#[serde_as]
94#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
95pub struct RootRuleConfig<T> {
96    pub match_action: MatchAction,
97    #[serde(default)]
98    pub scope: Scope,
99    #[deprecated(note = "Use `third_party_active_checker` instead")]
100    match_validation_type: Option<MatchValidationType>,
101    third_party_active_checker: Option<MatchValidationType>,
102    suppressions: Option<Suppressions>,
103    #[serde(default)]
104    precedence: Precedence,
105    #[serde(default)]
106    pub is_supporting_rule: bool,
107    #[serde(flatten)]
108    pub inner: T,
109}
110
111impl<T> RootRuleConfig<T>
112where
113    T: RuleConfig + 'static,
114{
115    pub fn new_dyn(inner: T) -> RootRuleConfig<Arc<dyn RuleConfig>> {
116        RootRuleConfig::new(Arc::new(inner) as Arc<dyn RuleConfig>)
117    }
118
119    pub fn into_dyn(self) -> RootRuleConfig<Arc<dyn RuleConfig>> {
120        self.map_inner(|x| Arc::new(x) as Arc<dyn RuleConfig>)
121    }
122}
123
124impl<T> RootRuleConfig<T> {
125    pub fn new(inner: T) -> Self {
126        #[allow(deprecated)]
127        Self {
128            match_action: MatchAction::None,
129            scope: Scope::all(),
130            match_validation_type: None,
131            third_party_active_checker: None,
132            suppressions: None,
133            precedence: Precedence::default(),
134            is_supporting_rule: false,
135            inner,
136        }
137    }
138
139    pub fn map_inner<U>(self, func: impl FnOnce(T) -> U) -> RootRuleConfig<U> {
140        #[allow(deprecated)]
141        RootRuleConfig {
142            match_action: self.match_action,
143            scope: self.scope,
144            match_validation_type: self.match_validation_type,
145            third_party_active_checker: self.third_party_active_checker,
146            suppressions: self.suppressions,
147            precedence: self.precedence,
148            is_supporting_rule: self.is_supporting_rule,
149            inner: func(self.inner),
150        }
151    }
152
153    pub fn match_action(mut self, action: MatchAction) -> Self {
154        self.match_action = action;
155        self
156    }
157
158    pub fn precedence(mut self, precedence: Precedence) -> Self {
159        self.precedence = precedence;
160        self
161    }
162
163    pub fn scope(mut self, scope: Scope) -> Self {
164        self.scope = scope;
165        self
166    }
167
168    pub fn third_party_active_checker(
169        mut self,
170        match_validation_type: MatchValidationType,
171    ) -> Self {
172        self.third_party_active_checker = Some(match_validation_type);
173        self
174    }
175
176    pub fn suppressions(mut self, suppressions: Suppressions) -> Self {
177        self.suppressions = Some(suppressions);
178        self
179    }
180
181    pub fn is_supporting_rule(mut self, value: bool) -> Self {
182        self.is_supporting_rule = value;
183        self
184    }
185
186    fn get_third_party_active_checker(&self) -> Option<&MatchValidationType> {
187        #[allow(deprecated)]
188        self.third_party_active_checker
189            .as_ref()
190            .or(self.match_validation_type.as_ref())
191    }
192}
193
194impl<T> Deref for RootRuleConfig<T> {
195    type Target = T;
196
197    fn deref(&self) -> &Self::Target {
198        &self.inner
199    }
200}
201pub struct RootCompiledRule {
202    pub inner: Box<dyn CompiledRule>,
203    pub scope: Scope,
204    pub match_action: MatchAction,
205    pub match_validation_type: Option<MatchValidationType>,
206    pub suppressions: Option<CompiledSuppressions>,
207    pub precedence: Precedence,
208    pub is_supporting_rule: bool,
209}
210
211impl RootCompiledRule {
212    pub fn internal_match_validation_type(&self) -> Option<InternalMatchValidationType> {
213        self.match_validation_type
214            .as_ref()
215            .map(|x| x.get_internal_match_validation_type())
216    }
217}
218
219impl Deref for RootCompiledRule {
220    type Target = dyn CompiledRule;
221
222    fn deref(&self) -> &Self::Target {
223        self.inner.as_ref()
224    }
225}
226
227pub struct StringMatchesCtx<'a> {
228    rule_index: usize,
229    pub regex_caches: &'a mut RegexCaches,
230    pub exclusion_check: &'a ExclusionCheck<'a>,
231    pub excluded_matches: &'a mut AHashMap<String, String>,
232    pub match_emitter: &'a mut dyn MatchEmitter,
233    pub wildcard_indices: Option<&'a Vec<(usize, usize)>>,
234    pub enable_debug_observability: bool,
235
236    // Shared Data
237    pub per_string_data: &'a mut SharedData,
238    pub per_scanner_data: &'a SharedData,
239    pub per_event_data: &'a mut SharedData,
240    pub event_id: Option<&'a str>,
241
242    // Per-scan metadata supplied by the caller via `ScanOptions::scan_metadata`.
243    pub scan_metadata: &'a AHashMap<String, String>,
244}
245
246impl StringMatchesCtx<'_> {
247    /// If a `get_string_matches` implementation needs to do any async processing (e.g. I/O),
248    /// this function can be used to return an "async job" to find matches. The return value
249    /// of `process_async` should be returned from the `get_string_matches` function. The future
250    /// passed into this function will be spawned and executed immediately without blocking
251    /// other `get_string_matches` calls. This means all the async jobs will run concurrently.
252    ///
253    /// The `ctx` available to async jobs is more restrictive than the normal `ctx` available in
254    /// `get_string_matches`. The only thing you can do is return matches. If other data is needed,
255    /// it should be accessed before `process_async` is called.
256    pub fn process_async(
257        &self,
258        func: impl for<'a> FnOnce(
259            &'a mut AsyncStringMatchesCtx,
260        )
261            -> Pin<Box<dyn Future<Output = Result<(), ScannerError>> + Send + 'a>>
262        + Send
263        + 'static,
264    ) -> RuleResult {
265        let rule_index = self.rule_index;
266
267        // The future is spawned onto the tokio runtime immediately so it starts running
268        // in the background
269        let fut = TOKIO_RUNTIME.spawn(async move {
270            let start = Instant::now();
271            let mut ctx = AsyncStringMatchesCtx {
272                rule_matches: vec![],
273            };
274            (func)(&mut ctx).await?;
275            let io_duration = start.elapsed();
276
277            Ok(AsyncRuleInfo {
278                rule_index,
279                rule_matches: ctx.rule_matches,
280                io_duration,
281            })
282        });
283
284        Ok(RuleStatus::Pending(fut))
285    }
286}
287
288pub struct AsyncStringMatchesCtx {
289    rule_matches: Vec<StringMatch>,
290}
291
292impl AsyncStringMatchesCtx {
293    pub fn emit_match(&mut self, string_match: StringMatch) {
294        self.rule_matches.push(string_match);
295    }
296}
297
298#[must_use]
299pub enum RuleStatus {
300    Done,
301    Pending(PendingRuleResult),
302}
303
304// pub type PendingRuleResult = BoxFuture<'static, Result<AsyncRuleInfo, ScannerError>>;
305pub type PendingRuleResult = JoinHandle<Result<AsyncRuleInfo, ScannerError>>;
306
307pub struct PendingRuleJob {
308    fut: PendingRuleResult,
309    path: Path<'static>,
310}
311
312pub struct AsyncRuleInfo {
313    rule_index: usize,
314    rule_matches: Vec<StringMatch>,
315    io_duration: Duration,
316}
317
318/// A rule result that cannot be async
319pub type RuleResult = Result<RuleStatus, ScannerError>;
320
321// This is the public trait that is used to define the behavior of a compiled rule.
322pub trait CompiledRule: Send + Sync {
323    fn init_per_scanner_data(&self, _per_scanner_data: &mut SharedData) {
324        // by default, no per-scanner data is initialized
325    }
326
327    fn init_per_string_data(&self, _labels: &Labels, _per_string_data: &mut SharedData) {
328        // by default, no per-string data is initialized
329    }
330
331    fn init_per_event_data(&self, _per_event_data: &mut SharedData) {
332        // by default, no per-event data is initialized
333    }
334
335    fn get_string_matches(
336        &self,
337        content: &str,
338        path: &Path,
339        ctx: &mut StringMatchesCtx<'_>,
340    ) -> RuleResult;
341
342    // Whether a match from this rule should be excluded (marked as a false-positive)
343    // if the content of this match was found in a match from an excluded scope
344    fn should_exclude_multipass_v0(&self) -> bool {
345        // default is to NOT use Multi-pass V0
346        false
347    }
348
349    fn on_excluded_match_multipass_v0(
350        &self,
351        _path: &Path,
352        _excluded_path: &str,
353        _enable_debug_observability: bool,
354    ) {
355        // default is to do nothing
356    }
357
358    fn as_regex_rule(&self) -> Option<&RegexCompiledRule> {
359        None
360    }
361
362    fn as_regex_rule_mut(&mut self) -> Option<&mut RegexCompiledRule> {
363        None
364    }
365
366    fn allow_scanner_to_exclude_namespace(&self) -> bool {
367        true
368    }
369}
370
371impl<T> RuleConfig for Box<T>
372where
373    T: RuleConfig + ?Sized,
374{
375    fn convert_to_compiled_rule(
376        &self,
377        rule_index: usize,
378        labels: Labels,
379    ) -> Result<Box<dyn CompiledRule>, CreateScannerError> {
380        self.as_ref().convert_to_compiled_rule(rule_index, labels)
381    }
382}
383
384#[derive(Debug, PartialEq, Clone)]
385struct ScannerFeatures {
386    pub add_implicit_index_wildcards: bool,
387    pub multipass_v0_enabled: bool,
388    pub return_matches: bool,
389    pub enable_debug_observability: bool,
390}
391
392impl Default for ScannerFeatures {
393    fn default() -> Self {
394        Self {
395            add_implicit_index_wildcards: false,
396            multipass_v0_enabled: true,
397            return_matches: false,
398            enable_debug_observability: false,
399        }
400    }
401}
402
403pub struct ScanOptions {
404    // The blocked_rules_idx parameter is a list of rule indices that should be skipped for this scan.
405    // this list shall be small (<10), so a linear search is acceptable otherwise performance will be impacted.
406    pub blocked_rules_idx: Vec<usize>,
407    // The wildcarded_indices parameter is a map containing a list of tuples of (start, end) indices that should be treated as wildcards (for the message key only) per path.
408    pub wildcarded_indices: AHashMap<Path<'static>, Vec<(usize, usize)>>,
409    // Whether to validate matches using third-party validators (e.g., checksum validation for credit cards).
410    // When enabled, the scanner automatically collects match content needed for validation.
411    pub validate_matches: bool,
412    // Arbitrary key-value metadata passed through to each rule's `get_string_matches` call via
413    // `StringMatchesCtx::scan_metadata`. Rules may use this to receive per-scan context (e.g.
414    // an org identifier) without requiring changes to the `CompiledRule` trait signature.
415    pub scan_metadata: AHashMap<String, String>,
416}
417
418impl Default for ScanOptions {
419    fn default() -> Self {
420        Self {
421            blocked_rules_idx: vec![],
422            wildcarded_indices: AHashMap::new(),
423            validate_matches: false,
424            scan_metadata: AHashMap::new(),
425        }
426    }
427}
428
429pub struct ScanOptionBuilder {
430    blocked_rules_idx: Vec<usize>,
431    wildcarded_indices: AHashMap<Path<'static>, Vec<(usize, usize)>>,
432    validate_matches: bool,
433    scan_metadata: AHashMap<String, String>,
434}
435
436impl ScanOptionBuilder {
437    pub fn new() -> Self {
438        Self {
439            blocked_rules_idx: vec![],
440            wildcarded_indices: AHashMap::new(),
441            validate_matches: false,
442            scan_metadata: AHashMap::new(),
443        }
444    }
445
446    pub fn with_blocked_rules_idx(mut self, blocked_rules_idx: Vec<usize>) -> Self {
447        self.blocked_rules_idx = blocked_rules_idx;
448        self
449    }
450
451    pub fn with_wildcarded_indices(
452        mut self,
453        wildcarded_indices: AHashMap<Path<'static>, Vec<(usize, usize)>>,
454    ) -> Self {
455        self.wildcarded_indices = wildcarded_indices;
456        self
457    }
458
459    pub fn with_validate_matching(mut self, validate_matches: bool) -> Self {
460        self.validate_matches = validate_matches;
461        self
462    }
463
464    pub fn with_scan_metadata(mut self, scan_metadata: AHashMap<String, String>) -> Self {
465        self.scan_metadata = scan_metadata;
466        self
467    }
468
469    pub fn build(self) -> ScanOptions {
470        ScanOptions {
471            blocked_rules_idx: self.blocked_rules_idx,
472            wildcarded_indices: self.wildcarded_indices,
473            validate_matches: self.validate_matches,
474            scan_metadata: self.scan_metadata,
475        }
476    }
477}
478
479pub struct Scanner {
480    rules: Vec<RootCompiledRule>,
481    scoped_ruleset: ScopedRuleSet,
482    scanner_features: ScannerFeatures,
483    metrics: ScannerMetrics,
484    labels: Labels,
485    match_validators_per_type: AHashMap<InternalMatchValidationType, Box<dyn MatchValidator>>,
486    per_scanner_data: SharedData,
487    async_scan_timeout: Duration,
488}
489
490impl Scanner {
491    pub fn builder(rules: &[RootRuleConfig<Arc<dyn RuleConfig>>]) -> ScannerBuilder<'_> {
492        ScannerBuilder::new(rules)
493    }
494
495    // This function scans the given event with the rules configured in the scanner.
496    // The event parameter is a mutable reference to the event that should be scanned (implemented the Event trait).
497    // The return value is a list of RuleMatch objects, which contain information about the matches that were found.
498    // This version uses default scan options (no validation, no blocked rules, no wildcarded indices).
499    pub fn scan<E: Event>(&self, event: &mut E) -> Result<Vec<RuleMatch>, ScannerError> {
500        self.scan_with_options(event, ScanOptions::default())
501    }
502
503    // This function scans the given event with the rules configured in the scanner.
504    // The event parameter is a mutable reference to the event that should be scanned (implemented the Event trait).
505    // The options parameter allows customizing the scan behavior (validation, blocked rules, etc.).
506    // The return value is a list of RuleMatch objects, which contain information about the matches that were found.
507    pub fn scan_with_options<E: Event>(
508        &self,
509        event: &mut E,
510        options: ScanOptions,
511    ) -> Result<Vec<RuleMatch>, ScannerError> {
512        let start = Instant::now();
513        let validate = options.validate_matches;
514        // Collect matches inside block_on, then run finalize_matches (which uses rayon) outside of
515        // it to avoid re-entrancy between the futures LocalPool executor and RAYON_THREAD_POOL.
516        let result = block_on(self.internal_scan_collect(event, options));
517        match result {
518            Ok((mut rule_matches, io_duration)) => {
519                self.finalize_matches(&mut rule_matches, validate);
520                self.record_metrics(&rule_matches, start, Some(io_duration));
521                Ok(rule_matches)
522            }
523            Err(e) => {
524                self.record_metrics(&[], start, None);
525                Err(e)
526            }
527        }
528    }
529
530    // This function scans the given event with the rules configured in the scanner.
531    // The event parameter is a mutable reference to the event that should be scanned (implemented the Event trait).
532    // The return value is a list of RuleMatch objects, which contain information about the matches that were found.
533    pub async fn scan_async<E: Event>(
534        &self,
535        event: &mut E,
536    ) -> Result<Vec<RuleMatch>, ScannerError> {
537        self.scan_async_with_options(event, ScanOptions::default())
538            .await
539    }
540
541    pub async fn scan_async_with_options<E: Event>(
542        &self,
543        event: &mut E,
544        options: ScanOptions,
545    ) -> Result<Vec<RuleMatch>, ScannerError> {
546        let start = Instant::now();
547        let validate = options.validate_matches;
548        let fut = self.internal_scan_collect(event, options);
549
550        // The sleep from the timeout requires being in a tokio context
551        // The guard needs to be dropped before await since the guard is !Send
552        let timeout_result = {
553            let _tokio_guard = TOKIO_RUNTIME.enter();
554            timeout(self.async_scan_timeout, fut)
555        };
556
557        let result = timeout_result.await.unwrap_or(Err(ScannerError::Transient(
558            "Async scan timeout".to_string(),
559        )));
560
561        match result {
562            Ok((mut rule_matches, io_duration)) => {
563                self.finalize_matches(&mut rule_matches, validate);
564                self.record_metrics(&rule_matches, start, Some(io_duration));
565                Ok(rule_matches)
566            }
567            Err(e) => {
568                self.record_metrics(&[], start, None);
569                Err(e)
570            }
571        }
572    }
573
574    fn record_metrics(
575        &self,
576        output_rule_matches: &[RuleMatch],
577        start: Instant,
578        io_duration: Option<Duration>,
579    ) {
580        // Add number of scanned events
581        self.metrics.num_scanned_events.increment(1);
582        // Add number of matches
583        self.metrics
584            .match_count
585            .increment(output_rule_matches.len() as u64);
586
587        if let Some(io_duration) = io_duration {
588            let total_duration = start.elapsed();
589            let cpu_duration = total_duration.saturating_sub(io_duration);
590            self.metrics
591                .cpu_duration
592                .increment(cpu_duration.as_nanos() as u64);
593        }
594    }
595
596    fn process_rule_matches<E: Event>(
597        &self,
598        event: &mut E,
599        rule_matches: InternalRuleMatchSet<E::Encoding>,
600        excluded_matches: AHashMap<String, String>,
601        output_rule_matches: &mut Vec<RuleMatch>,
602        need_match_content: bool,
603    ) {
604        if rule_matches.is_empty() {
605            return;
606        }
607        access_regex_caches(|regex_caches| {
608            for (path, mut rule_matches) in rule_matches.into_iter() {
609                // All rule matches in each inner list are for a single path, so they can be processed independently.
610                event.visit_string_mut(&path, |content| {
611                    // calculate_indices requires that matches are sorted by start index
612                    rule_matches.sort_unstable_by_key(|rule_match| rule_match.utf8_start);
613
614                    <<E as Event>::Encoding>::calculate_indices(
615                        content,
616                        rule_matches.iter_mut().map(
617                            |rule_match: &mut InternalRuleMatch<E::Encoding>| EncodeIndices {
618                                utf8_start: rule_match.utf8_start,
619                                utf8_end: rule_match.utf8_end,
620                                custom_start: &mut rule_match.custom_start,
621                                custom_end: &mut rule_match.custom_end,
622                            },
623                        ),
624                    );
625
626                    if self.scanner_features.multipass_v0_enabled {
627                        // Now that the `excluded_matches` set is fully populated, filter out any matches
628                        // that are the same as excluded matches (also known as "Multi-pass V0")
629                        rule_matches.retain(|rule_match| {
630                            if self.rules[rule_match.rule_index]
631                                .inner
632                                .should_exclude_multipass_v0()
633                            {
634                                let match_content =
635                                    &content[rule_match.utf8_start..rule_match.utf8_end];
636                                let excluded_path = excluded_matches.get(match_content);
637                                if let Some(excluded_path) = excluded_path {
638                                    self.rules[rule_match.rule_index]
639                                        .on_excluded_match_multipass_v0(
640                                            &path,
641                                            excluded_path,
642                                            self.scanner_features.enable_debug_observability,
643                                        );
644                                }
645                                excluded_path.is_none()
646                            } else {
647                                true
648                            }
649                        });
650                    }
651
652                    self.suppress_matches::<E::Encoding>(&mut rule_matches, content, regex_caches);
653
654                    self.sort_and_remove_overlapping_rules::<E::Encoding>(&mut rule_matches);
655
656                    let will_mutate = rule_matches.iter().any(|rule_match| {
657                        self.rules[rule_match.rule_index].match_action.is_mutating()
658                    });
659
660                    self.apply_match_actions(
661                        content,
662                        &path,
663                        rule_matches,
664                        output_rule_matches,
665                        need_match_content,
666                    );
667
668                    will_mutate
669                });
670            }
671        });
672    }
673
674    async fn internal_scan_collect<E: Event>(
675        &self,
676        event: &mut E,
677        options: ScanOptions,
678    ) -> Result<(Vec<RuleMatch>, Duration), ScannerError> {
679        // If validation is requested, we need to collect match content even if the scanner
680        // wasn't originally configured to return matches
681        let need_match_content = self.scanner_features.return_matches || options.validate_matches;
682        // All matches, after some (but not all) false-positives have been removed.
683        let mut rule_matches = InternalRuleMatchSet::new();
684        let mut excluded_matches = AHashMap::new();
685        let mut async_jobs = vec![];
686
687        access_regex_caches(|regex_caches| {
688            self.scoped_ruleset.visit_string_rule_combinations(
689                event,
690                ScannerContentVisitor {
691                    scanner: self,
692                    regex_caches,
693                    rule_matches: &mut rule_matches,
694                    blocked_rules: &options.blocked_rules_idx,
695                    excluded_matches: &mut excluded_matches,
696                    per_event_data: SharedData::new(),
697                    wildcarded_indexes: &options.wildcarded_indices,
698                    async_jobs: &mut async_jobs,
699                    event_id: event.get_id().map(|s| s.to_string()),
700                    scan_metadata: &options.scan_metadata,
701                },
702            )
703        })?;
704
705        // The async jobs were already spawned on the tokio runtime, so the
706        // results just need to be collected
707        let mut total_io_duration = Duration::ZERO;
708        for job in async_jobs {
709            let rule_info = job.fut.await.unwrap()?;
710            total_io_duration += rule_info.io_duration;
711            rule_matches.push_async_matches(
712                &job.path,
713                rule_info
714                    .rule_matches
715                    .into_iter()
716                    .map(|x| InternalRuleMatch::new(rule_info.rule_index, x)),
717            );
718        }
719
720        let mut output_rule_matches = vec![];
721
722        self.process_rule_matches(
723            event,
724            rule_matches,
725            excluded_matches,
726            &mut output_rule_matches,
727            need_match_content,
728        );
729
730        Ok((output_rule_matches, total_io_duration))
731    }
732
733    pub fn suppress_matches<E: Encoding>(
734        &self,
735        rule_matches: &mut Vec<InternalRuleMatch<E>>,
736        content: &str,
737        regex_caches: &mut RegexCaches,
738    ) {
739        rule_matches.retain(|rule_match| {
740            if let Some(suppressions) = &self.rules[rule_match.rule_index].suppressions {
741                let match_should_be_suppressed = suppressions.should_match_be_suppressed(
742                    &content[rule_match.utf8_start..rule_match.utf8_end],
743                    regex_caches,
744                );
745
746                if match_should_be_suppressed {
747                    self.metrics.suppressed_match_count.increment(1);
748                }
749                !match_should_be_suppressed
750            } else {
751                true
752            }
753        });
754    }
755
756    pub fn validate_matches(&self, rule_matches: &mut Vec<RuleMatch>) {
757        // Create MatchValidatorRuleMatch per match_validator_type to pass it to each match_validator
758        let mut match_validator_rule_match_per_type = AHashMap::new();
759
760        let mut validated_rule_matches = vec![];
761
762        for mut rule_match in rule_matches.drain(..) {
763            let rule = &self.rules[rule_match.rule_index];
764            if let Some(match_validation_type) = rule.internal_match_validation_type() {
765                match_validator_rule_match_per_type
766                    .entry(match_validation_type)
767                    .or_insert_with(Vec::new)
768                    .push(rule_match)
769            } else {
770                // There is no match validator for this rule, so mark it as not available.
771                rule_match.match_status.merge(MatchStatus::NotAvailable);
772                validated_rule_matches.push(rule_match);
773            }
774        }
775
776        // Skip the pool hop when there's nothing to validate.
777        if !match_validator_rule_match_per_type.is_empty() {
778            let run_validation = || {
779                use rayon::prelude::*;
780
781                match_validator_rule_match_per_type.par_iter_mut().for_each(
782                    |(match_validation_type, matches_per_type)| {
783                        let match_validator =
784                            self.match_validators_per_type.get(match_validation_type);
785                        if let Some(match_validator) = match_validator {
786                            match_validator
787                                .as_ref()
788                                .validate(matches_per_type, &self.rules)
789                        }
790                    },
791                );
792            };
793
794            // TODO(SDSP-450): move validation onto the async TOKIO_RUNTIME. It is I/O-bound
795            // (blocking HTTP to third-party checkers), so a Rayon (CPU-bound) pool is a poor
796            // fit and forces the workaround below. Async validation would remove it entirely.
797            //
798            // If we're already on a Rayon worker (e.g. the caller scans from its own parallel
799            // iterator), calling `install` here blocks the worker, and Rayon keeps it busy by
800            // stealing more of the caller's jobs — which scan and validate and steal again,
801            // recursing until the stack overflows. Running `install` on a separate OS thread
802            // parks this worker instead, breaking the recursion; validation still runs in
803            // parallel on RAYON_THREAD_POOL.
804            if rayon::current_thread_index().is_some() {
805                std::thread::scope(|s| {
806                    s.spawn(|| RAYON_THREAD_POOL.install(run_validation));
807                });
808            } else {
809                RAYON_THREAD_POOL.install(run_validation);
810            }
811        }
812
813        // Refill the rule_matches with the validated matches
814        for (_, mut matches) in match_validator_rule_match_per_type {
815            validated_rule_matches.append(&mut matches);
816        }
817
818        // Sort rule_matches by start index
819        validated_rule_matches.sort_by_key(|rule_match| rule_match.start_index);
820        *rule_matches = validated_rule_matches;
821    }
822
823    // Runs optional validation and drops supporting-rule matches from the output.
824    // Must be called OUTSIDE of any futures executor (e.g. block_on) because validate_matches
825    // uses RAYON_THREAD_POOL internally; running rayon inside block_on causes an EnterError panic
826    // when the calling thread re-enters the LocalPool executor context.
827    fn finalize_matches(&self, rule_matches: &mut Vec<RuleMatch>, validate: bool) {
828        if validate {
829            self.validate_matches(rule_matches);
830        }
831        // Supporting rules exist only to provide template variables to CustomHttpV2 validators of
832        // other rules. Their matches must not appear in the final output. They are retained until
833        // after validate_matches so that match pairing can reference their match values.
834        rule_matches.retain(|rule_match| !self.rules[rule_match.rule_index].is_supporting_rule);
835    }
836
837    /// Apply mutations from actions, and shift indices to match the mutated values.
838    /// This assumes the matches are all from the content given, and are sorted by start index.
839    fn apply_match_actions<E: Encoding>(
840        &self,
841        content: &mut String,
842        path: &Path<'static>,
843        rule_matches: Vec<InternalRuleMatch<E>>,
844        output_rule_matches: &mut Vec<RuleMatch>,
845        need_match_content: bool,
846    ) {
847        let mut utf8_byte_delta: isize = 0;
848        let mut custom_index_delta: <E>::IndexShift = <E>::zero_shift();
849
850        for rule_match in rule_matches {
851            output_rule_matches.push(self.apply_match_actions_for_string::<E>(
852                content,
853                path.clone(),
854                rule_match,
855                &mut utf8_byte_delta,
856                &mut custom_index_delta,
857                need_match_content,
858            ));
859        }
860    }
861
862    /// This will be called once for each match of a single string. The rules must be passed in in order of the start index. Mutating rules must not overlap.
863    fn apply_match_actions_for_string<E: Encoding>(
864        &self,
865        content: &mut String,
866        path: Path<'static>,
867        rule_match: InternalRuleMatch<E>,
868        // The current difference in length between the original and mutated string
869        utf8_byte_delta: &mut isize,
870
871        // The difference between the custom index on the original string and the mutated string
872        custom_index_delta: &mut <E>::IndexShift,
873        need_match_content: bool,
874    ) -> RuleMatch {
875        let rule = &self.rules[rule_match.rule_index];
876
877        let custom_start =
878            (<E>::get_index(&rule_match.custom_start, rule_match.utf8_start) as isize
879                + <E>::get_shift(custom_index_delta, *utf8_byte_delta)) as usize;
880
881        let mut matched_content_copy = None;
882
883        if need_match_content {
884            // This copies part of the is_mutating block but is seperate since can't mix compilation condition and code condition
885            let mutated_utf8_match_start =
886                (rule_match.utf8_start as isize + *utf8_byte_delta) as usize;
887            let mutated_utf8_match_end = (rule_match.utf8_end as isize + *utf8_byte_delta) as usize;
888
889            // Matches for mutating rules must have valid indices
890            debug_assert!(content.is_char_boundary(mutated_utf8_match_start));
891            debug_assert!(content.is_char_boundary(mutated_utf8_match_end));
892
893            let matched_content = &content[mutated_utf8_match_start..mutated_utf8_match_end];
894            matched_content_copy = Some(matched_content.to_string());
895        }
896
897        if rule.match_action.is_mutating() {
898            let mutated_utf8_match_start =
899                (rule_match.utf8_start as isize + *utf8_byte_delta) as usize;
900            let mutated_utf8_match_end = (rule_match.utf8_end as isize + *utf8_byte_delta) as usize;
901
902            // Matches for mutating rules must have valid indices
903            debug_assert!(content.is_char_boundary(mutated_utf8_match_start));
904            debug_assert!(content.is_char_boundary(mutated_utf8_match_end));
905
906            let matched_content = &content[mutated_utf8_match_start..mutated_utf8_match_end];
907            if let Some(replacement) = rule.match_action.get_replacement(matched_content) {
908                let before_replacement = &matched_content[replacement.start..replacement.end];
909
910                // update indices to match the new mutated content
911                <E>::adjust_shift(
912                    custom_index_delta,
913                    before_replacement,
914                    &replacement.replacement,
915                );
916                *utf8_byte_delta +=
917                    replacement.replacement.len() as isize - before_replacement.len() as isize;
918
919                let replacement_start = mutated_utf8_match_start + replacement.start;
920                let replacement_end = mutated_utf8_match_start + replacement.end;
921                content.replace_range(replacement_start..replacement_end, &replacement.replacement);
922            }
923        }
924
925        let shift_offset = <E>::get_shift(custom_index_delta, *utf8_byte_delta);
926        let custom_end = (<E>::get_index(&rule_match.custom_end, rule_match.utf8_end) as isize
927            + shift_offset) as usize;
928
929        let rule = &self.rules[rule_match.rule_index];
930
931        let match_status: MatchStatus = if rule.match_validation_type.is_some() {
932            MatchStatus::NotChecked
933        } else {
934            MatchStatus::NotAvailable
935        };
936
937        RuleMatch {
938            rule_index: rule_match.rule_index,
939            path,
940            replacement_type: rule.match_action.replacement_type(),
941            start_index: custom_start,
942            end_index_exclusive: custom_end,
943            shift_offset,
944            match_value: matched_content_copy,
945            match_status,
946            keyword: rule_match.keyword,
947        }
948    }
949
950    fn sort_and_remove_overlapping_rules<E: Encoding>(
951        &self,
952        rule_matches: &mut Vec<InternalRuleMatch<E>>,
953    ) {
954        // Some of the scanner code relies on the behavior here, such as the sort order and removal of overlapping mutating rules.
955        // Be very careful if this function is modified.
956
957        rule_matches.sort_unstable_by(|a, b| {
958            // Mutating rules are a higher priority (earlier in the list)
959            let ord = self.rules[a.rule_index]
960                .match_action
961                .is_mutating()
962                .cmp(&self.rules[b.rule_index].match_action.is_mutating())
963                .reverse();
964
965            // Earlier start offset
966            let ord = ord.then(a.utf8_start.cmp(&b.utf8_start));
967
968            // Longer matches
969            let ord = ord.then(a.len().cmp(&b.len()).reverse());
970
971            // Matches with higher precedence come first
972            let ord = ord.then(
973                self.rules[a.rule_index]
974                    .precedence
975                    .cmp(&self.rules[b.rule_index].precedence)
976                    .reverse(),
977            );
978
979            // Matches from earlier rules
980            let ord = ord.then(a.rule_index.cmp(&b.rule_index));
981
982            // swap the order of everything so matches can be efficiently popped off the back as they are processed
983            ord.reverse()
984        });
985
986        let mut retained_rules: Vec<InternalRuleMatch<E>> = vec![];
987
988        'rule_matches: while let Some(rule_match) = rule_matches.pop() {
989            if self.rules[rule_match.rule_index].match_action.is_mutating() {
990                // Mutating rules are kept only if they don't overlap with a previous rule.
991                if let Some(last) = retained_rules.last()
992                    && last.utf8_end > rule_match.utf8_start
993                {
994                    continue;
995                }
996            } else {
997                // Only retain if it doesn't overlap with any other rule. Since mutating matches are sorted before non-mutated matches
998                // this needs to check all retained matches (instead of just the last one)
999                for retained_rule in &retained_rules {
1000                    if retained_rule.utf8_start < rule_match.utf8_end
1001                        && retained_rule.utf8_end > rule_match.utf8_start
1002                    {
1003                        continue 'rule_matches;
1004                    }
1005                }
1006            };
1007            retained_rules.push(rule_match);
1008        }
1009
1010        // ensure rules are sorted by start index (other parts of the library required this to function correctly)
1011        retained_rules.sort_unstable_by_key(|rule_match| rule_match.utf8_start);
1012
1013        *rule_matches = retained_rules;
1014    }
1015}
1016
1017impl Drop for Scanner {
1018    fn drop(&mut self) {
1019        let stats = &*GLOBAL_STATS;
1020        stats.scanner_deletions.increment(1);
1021        stats.decrement_total_scanners();
1022    }
1023}
1024
1025#[derive(Default)]
1026pub struct ScannerBuilder<'a> {
1027    rules: &'a [RootRuleConfig<Arc<dyn RuleConfig>>],
1028    labels: Labels,
1029    scanner_features: ScannerFeatures,
1030    async_scan_timeout: Duration,
1031}
1032
1033impl ScannerBuilder<'_> {
1034    pub fn new(rules: &[RootRuleConfig<Arc<dyn RuleConfig>>]) -> ScannerBuilder<'_> {
1035        ScannerBuilder {
1036            rules,
1037            labels: Labels::empty(),
1038            scanner_features: ScannerFeatures::default(),
1039            async_scan_timeout: Duration::from_secs(60 * 5),
1040        }
1041    }
1042
1043    pub fn labels(mut self, labels: Labels) -> Self {
1044        self.labels = labels;
1045        self
1046    }
1047
1048    pub fn with_async_scan_timeout(mut self, duration: Duration) -> Self {
1049        self.async_scan_timeout = duration;
1050        self
1051    }
1052
1053    pub fn with_implicit_wildcard_indexes_for_scopes(mut self, value: bool) -> Self {
1054        self.scanner_features.add_implicit_index_wildcards = value;
1055        self
1056    }
1057
1058    pub fn with_return_matches(mut self, value: bool) -> Self {
1059        self.scanner_features.return_matches = value;
1060        self
1061    }
1062
1063    /// Enables/Disables the Multipass V0 feature. This defaults to TRUE.
1064    /// Multipass V0 saves matches from excluded scopes, and marks any identical
1065    /// matches in included scopes as a false positive.
1066    pub fn with_multipass_v0(mut self, value: bool) -> Self {
1067        self.scanner_features.multipass_v0_enabled = value;
1068        self
1069    }
1070
1071    /// Enables/Disables debug observability features. This defaults to FALSE.
1072    /// When enabled, metrics will include additional tags (such as `sds_namespace`)
1073    /// to help debug the source of matches.
1074    pub fn with_debug_observability(mut self, value: bool) -> Self {
1075        self.scanner_features.enable_debug_observability = value;
1076        self
1077    }
1078
1079    pub fn build(self) -> Result<Scanner, CreateScannerError> {
1080        let mut match_validators_per_type = AHashMap::new();
1081
1082        for rule in self.rules.iter() {
1083            if let Some(match_validation_type) = &rule.get_third_party_active_checker()
1084                && match_validation_type.can_create_match_validator()
1085            {
1086                let internal_type = match_validation_type.get_internal_match_validation_type();
1087                let match_validator = match_validation_type.into_match_validator();
1088                if let Ok(match_validator) = match_validator {
1089                    if !match_validators_per_type.contains_key(&internal_type) {
1090                        match_validators_per_type.insert(internal_type, match_validator);
1091                    }
1092                } else {
1093                    return Err(CreateScannerError::InvalidMatchValidator(
1094                        MatchValidatorCreationError::InternalError,
1095                    ));
1096                }
1097            }
1098        }
1099
1100        let compiled_rules = self
1101            .rules
1102            .iter()
1103            .enumerate()
1104            .map(|(rule_index, config)| {
1105                if config.is_supporting_rule && config.match_action != MatchAction::None {
1106                    return Err(CreateScannerError::SupportingRuleHasMatchAction);
1107                }
1108                let inner = config.convert_to_compiled_rule(rule_index, self.labels.clone())?;
1109                config.match_action.validate()?;
1110                let compiled_suppressions = match &config.suppressions {
1111                    Some(s) => s.compile()?,
1112                    None => None,
1113                };
1114                Ok(RootCompiledRule {
1115                    inner,
1116                    scope: config.scope.clone(),
1117                    match_action: config.match_action.clone(),
1118                    match_validation_type: config.get_third_party_active_checker().cloned(),
1119                    suppressions: compiled_suppressions,
1120                    precedence: config.precedence,
1121                    is_supporting_rule: config.is_supporting_rule,
1122                })
1123            })
1124            .collect::<Result<Vec<RootCompiledRule>, CreateScannerError>>()?;
1125
1126        let mut per_scanner_data = SharedData::new();
1127
1128        compiled_rules.iter().for_each(|rule| {
1129            rule.init_per_scanner_data(&mut per_scanner_data);
1130        });
1131
1132        let scoped_ruleset = ScopedRuleSet::new(
1133            &compiled_rules
1134                .iter()
1135                .map(|rule| rule.scope.clone())
1136                .collect::<Vec<_>>(),
1137        )
1138        .with_implicit_index_wildcards(self.scanner_features.add_implicit_index_wildcards);
1139
1140        {
1141            let stats = &*GLOBAL_STATS;
1142            stats.scanner_creations.increment(1);
1143            stats.increment_total_scanners();
1144        }
1145
1146        Ok(Scanner {
1147            rules: compiled_rules,
1148            scoped_ruleset,
1149            scanner_features: self.scanner_features,
1150            metrics: ScannerMetrics::new(&self.labels),
1151            match_validators_per_type,
1152            labels: self.labels,
1153            per_scanner_data,
1154            async_scan_timeout: self.async_scan_timeout,
1155        })
1156    }
1157}
1158
1159struct ScannerContentVisitor<'a, E: Encoding> {
1160    scanner: &'a Scanner,
1161    regex_caches: &'a mut RegexCaches,
1162    rule_matches: &'a mut InternalRuleMatchSet<E>,
1163    // Rules that shall be skipped for this scan
1164    // This list shall be small (<10), so a linear search is acceptable
1165    blocked_rules: &'a Vec<usize>,
1166    excluded_matches: &'a mut AHashMap<String, String>,
1167    per_event_data: SharedData,
1168    wildcarded_indexes: &'a AHashMap<Path<'static>, Vec<(usize, usize)>>,
1169    async_jobs: &'a mut Vec<PendingRuleJob>,
1170    event_id: Option<String>,
1171    scan_metadata: &'a AHashMap<String, String>,
1172}
1173
1174impl<'a, E: Encoding> ContentVisitor<'a> for ScannerContentVisitor<'a, E> {
1175    fn visit_content<'b>(
1176        &'b mut self,
1177        path: &Path<'a>,
1178        content: &str,
1179        mut rule_visitor: crate::scoped_ruleset::RuleIndexVisitor,
1180        exclusion_check: ExclusionCheck<'b>,
1181    ) -> Result<bool, ScannerError> {
1182        // matches for a single path
1183        let mut path_rules_matches = vec![];
1184
1185        // Create a map of per rule type data that can be shared between rules of the same type
1186        let mut per_string_data = SharedData::new();
1187        let wildcard_indices_per_path = self.wildcarded_indexes.get(path);
1188
1189        rule_visitor.visit_rule_indices(|rule_index| {
1190            if self.blocked_rules.contains(&rule_index) {
1191                return Ok(());
1192            }
1193            let rule = &self.scanner.rules[rule_index];
1194            {
1195                if rule.inner.allow_scanner_to_exclude_namespace() {
1196                    // check if the path is excluded
1197                    if exclusion_check.is_excluded(rule_index) {
1198                        return Ok(());
1199                    }
1200                }
1201                // creating the emitter is basically free, it will get mostly optimized away
1202                let mut emitter = |rule_match: StringMatch| {
1203                    // This should never happen, but to ensure no empty match is ever generated
1204                    // (which may cause an infinite loop), this will panic instead.
1205                    assert_ne!(
1206                        rule_match.start, rule_match.end,
1207                        "empty match detected on rule with index {rule_index}"
1208                    );
1209                    path_rules_matches.push(InternalRuleMatch::new(rule_index, rule_match));
1210                };
1211
1212                rule.init_per_string_data(&self.scanner.labels, &mut per_string_data);
1213
1214                // TODO: move this somewhere higher?
1215                rule.init_per_event_data(&mut self.per_event_data);
1216
1217                let mut ctx = StringMatchesCtx {
1218                    rule_index,
1219                    regex_caches: self.regex_caches,
1220                    exclusion_check: &exclusion_check,
1221                    excluded_matches: self.excluded_matches,
1222                    match_emitter: &mut emitter,
1223                    wildcard_indices: wildcard_indices_per_path,
1224                    enable_debug_observability: self
1225                        .scanner
1226                        .scanner_features
1227                        .enable_debug_observability,
1228                    per_string_data: &mut per_string_data,
1229                    per_scanner_data: &self.scanner.per_scanner_data,
1230                    per_event_data: &mut self.per_event_data,
1231                    event_id: self.event_id.as_deref(),
1232                    scan_metadata: self.scan_metadata,
1233                };
1234
1235                let async_status = rule.get_string_matches(content, path, &mut ctx)?;
1236
1237                match async_status {
1238                    RuleStatus::Done => {
1239                        // nothing to do
1240                    }
1241                    RuleStatus::Pending(fut) => {
1242                        self.async_jobs.push(PendingRuleJob {
1243                            fut,
1244                            path: path.into_static(),
1245                        });
1246                    }
1247                }
1248            }
1249            Ok(())
1250        })?;
1251
1252        // If there are any matches, the string will need to be accessed to check for false positives from
1253        // excluded matches, any to potentially mutate the string.
1254        // If there are any async jobs, this is also true since it's not known yet whether there
1255        // will be a match
1256        let needs_to_access_content = !path_rules_matches.is_empty() || !self.async_jobs.is_empty();
1257
1258        self.rule_matches
1259            .push_sync_matches(path, path_rules_matches);
1260
1261        Ok(needs_to_access_content)
1262    }
1263}
1264
1265// Calculates the next starting position for a regex match if a the previous match is a false positive
1266fn get_next_regex_start(content: &str, regex_match: (usize, usize)) -> Option<usize> {
1267    // The next valid UTF8 char after the start of the regex match is used
1268    if let Some((i, _)) = content[regex_match.0..].char_indices().nth(1) {
1269        Some(regex_match.0 + i)
1270    } else {
1271        // There are no more chars left in the string to scan
1272        None
1273    }
1274}
1275
1276fn is_false_positive_match(
1277    regex_match_range: (usize, usize),
1278    rule: &RegexCompiledRule,
1279    content: &str,
1280    check_excluded_keywords: bool,
1281) -> bool {
1282    if check_excluded_keywords
1283        && let Some(excluded_keywords) = &rule.excluded_keywords
1284        && excluded_keywords.is_false_positive_match(content, regex_match_range.0)
1285    {
1286        return true;
1287    }
1288
1289    if let Some(validator) = rule.validator.as_ref()
1290        && !validator.is_valid_match(&content[regex_match_range.0..regex_match_range.1])
1291    {
1292        return true;
1293    }
1294    false
1295}