Skip to main content

saluki_components/transforms/trace_obfuscation/
mod.rs

1//! Trace obfuscation transform.
2
3mod credit_cards;
4mod http;
5mod json;
6mod memcached;
7mod obfuscator;
8mod redis;
9mod sql;
10mod sql_filters;
11mod sql_tokenizer;
12
13use async_trait::async_trait;
14use facet::Facet;
15use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
16use saluki_config::GenericConfiguration;
17use saluki_core::{
18    components::{transforms::*, ComponentContext},
19    data_model::event::{trace::Span, Event},
20    topology::EventsBuffer,
21};
22use saluki_error::GenericError;
23use serde::Deserialize;
24use stringtheory::MetaString;
25
26pub use self::obfuscator::{tags, ObfuscationConfig, Obfuscator};
27use crate::common::datadog::apm::ApmConfig;
28
29const TEXT_NON_PARSABLE_SQL: &str = "Non-parsable SQL query";
30
31/// Trace obfuscation configuration.
32#[derive(Deserialize, Facet)]
33#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
34pub struct TraceObfuscationConfiguration {
35    /// Obfuscator configuration.
36    #[serde(default)]
37    pub config: ObfuscationConfig,
38}
39
40impl TraceObfuscationConfiguration {
41    /// Creates a new `TraceObfuscationConfiguration` from the given generic configuration.
42    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
43        Self::from_apm_configuration(config)
44    }
45
46    /// Creates a new `TraceObfuscationConfiguration` from the APM configuration section.
47    pub fn from_apm_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
48        let apm_config = ApmConfig::from_configuration(config)?;
49        Ok(Self {
50            config: apm_config.obfuscation().clone(),
51        })
52    }
53
54    /// Creates a new `TraceObfuscationConfiguration` with default settings.
55    pub fn new() -> Self {
56        Self {
57            config: ObfuscationConfig::default(),
58        }
59    }
60}
61
62impl Default for TraceObfuscationConfiguration {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68#[async_trait]
69impl SynchronousTransformBuilder for TraceObfuscationConfiguration {
70    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
71        Ok(Box::new(TraceObfuscation {
72            obfuscator: Obfuscator::new(self.config.clone()),
73        }))
74    }
75}
76
77impl MemoryBounds for TraceObfuscationConfiguration {
78    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
79        builder
80            .minimum()
81            .with_single_value::<TraceObfuscation>("component struct");
82    }
83}
84
85/// The obfuscation transform that processes traces.
86pub struct TraceObfuscation {
87    obfuscator: Obfuscator,
88}
89
90impl TraceObfuscation {
91    fn obfuscate_span(&mut self, span: &mut Span) {
92        if self.obfuscator.config.credit_cards.enabled {
93            self.obfuscate_credit_cards_in_span(span);
94        }
95
96        match span.span_type() {
97            "http" | "web" => self.obfuscate_http_span(span),
98            "sql" | "cassandra" => self.obfuscate_sql_span(span),
99            "redis" | "valkey" => self.obfuscate_redis_span(span),
100            "memcached" => self.obfuscate_memcached_span(span),
101            "mongodb" => self.obfuscate_mongodb_span(span),
102            "elasticsearch" | "opensearch" => self.obfuscate_elasticsearch_span(span),
103            _ => {}
104        }
105    }
106
107    fn obfuscate_credit_cards_in_span(&mut self, span: &mut Span) {
108        for (key, value) in span.meta_mut().iter_mut() {
109            if let Some(replacement) = self
110                .obfuscator
111                .obfuscate_credit_card_number(key.as_ref(), value.as_ref())
112            {
113                *value = replacement;
114            }
115        }
116    }
117
118    fn obfuscate_http_span(&mut self, span: &mut Span) {
119        let url_value = match span.meta().get(tags::HTTP_URL) {
120            Some(v) if !v.is_empty() => v.as_ref(),
121            _ => return,
122        };
123
124        if let Some(obfuscated) = self.obfuscator.obfuscate_url(url_value) {
125            span.meta_mut().insert(tags::HTTP_URL.into(), obfuscated);
126        }
127    }
128
129    fn obfuscate_sql_span(&mut self, span: &mut Span) {
130        let sql_query: &str = span
131            .meta()
132            .get(tags::DB_STATEMENT)
133            .map(|v| v.as_ref())
134            .filter(|s| !s.is_empty())
135            .unwrap_or_else(|| span.resource());
136
137        if sql_query.is_empty() {
138            return;
139        }
140
141        let dbms = span.meta().get(tags::DBMS);
142
143        let config = match dbms {
144            Some(d) if !d.is_empty() => self.obfuscator.config.sql.with_dbms(d.to_string()),
145            _ => self.obfuscator.config.sql.clone(),
146        };
147
148        match sql::obfuscate_sql_string(sql_query, &config) {
149            Ok(obfuscated) => {
150                let query: MetaString = obfuscated.query.into();
151
152                span.set_resource(query.clone());
153                span.meta_mut().insert(tags::SQL_QUERY.into(), query.clone());
154
155                if span.meta().contains_key(tags::DB_STATEMENT) {
156                    span.meta_mut().insert(tags::DB_STATEMENT.into(), query);
157                }
158
159                if !obfuscated.table_names.is_empty() {
160                    span.meta_mut()
161                        .insert("sql.tables".into(), obfuscated.table_names.into());
162                }
163            }
164            Err(_) => {
165                let non_parsable: MetaString = TEXT_NON_PARSABLE_SQL.into();
166                span.set_resource(non_parsable.clone());
167                span.meta_mut().insert(tags::SQL_QUERY.into(), non_parsable);
168            }
169        }
170    }
171
172    fn obfuscate_redis_span(&mut self, span: &mut Span) {
173        let resource = span.resource();
174        if resource.is_empty() {
175            return;
176        }
177
178        if let Some(quantized) = self.obfuscator.quantize_redis_string(resource) {
179            span.set_resource(quantized.to_string());
180        }
181
182        if span.span_type() == "redis" && self.obfuscator.config.redis.enabled {
183            if let Some(cmd_value) = span.meta().get(tags::REDIS_RAW_COMMAND) {
184                if let Some(obfuscated) = self.obfuscator.obfuscate_redis_string(cmd_value.as_ref()) {
185                    span.meta_mut().insert(tags::REDIS_RAW_COMMAND.into(), obfuscated);
186                }
187            }
188        }
189
190        if span.span_type() == "valkey" && self.obfuscator.config.valkey.enabled {
191            if let Some(cmd_value) = span.meta().get(tags::VALKEY_RAW_COMMAND) {
192                if let Some(obfuscated) = self.obfuscator.obfuscate_valkey_string(cmd_value.as_ref()) {
193                    span.meta_mut().insert(tags::VALKEY_RAW_COMMAND.into(), obfuscated);
194                }
195            }
196        }
197    }
198
199    fn obfuscate_memcached_span(&mut self, span: &mut Span) {
200        if !self.obfuscator.config.memcached.enabled {
201            return;
202        }
203
204        let cmd_value = match span.meta().get(tags::MEMCACHED_COMMAND) {
205            Some(v) if !v.is_empty() => v.as_ref(),
206            _ => return,
207        };
208
209        if let Some(obfuscated) = self.obfuscator.obfuscate_memcached_command(cmd_value) {
210            if obfuscated.is_empty() {
211                span.meta_mut().remove(tags::MEMCACHED_COMMAND);
212            } else {
213                span.meta_mut().insert(tags::MEMCACHED_COMMAND.into(), obfuscated);
214            }
215        }
216    }
217
218    fn obfuscate_mongodb_span(&mut self, span: &mut Span) {
219        let query_value = match span.meta().get(tags::MONGODB_QUERY) {
220            Some(v) => v.as_ref(),
221            None => return,
222        };
223
224        if let Some(obfuscated) = self.obfuscator.obfuscate_mongodb_string(query_value) {
225            span.meta_mut().insert(tags::MONGODB_QUERY.into(), obfuscated);
226        }
227    }
228
229    fn obfuscate_elasticsearch_span(&mut self, span: &mut Span) {
230        if let Some(body_value) = span.meta().get(tags::ELASTIC_BODY) {
231            if let Some(obfuscated) = self.obfuscator.obfuscate_elasticsearch_string(body_value.as_ref()) {
232                span.meta_mut().insert(tags::ELASTIC_BODY.into(), obfuscated);
233            }
234        }
235
236        if let Some(body_value) = span.meta().get(tags::OPENSEARCH_BODY) {
237            if let Some(obfuscated) = self.obfuscator.obfuscate_opensearch_string(body_value.as_ref()) {
238                span.meta_mut().insert(tags::OPENSEARCH_BODY.into(), obfuscated);
239            }
240        }
241    }
242}
243
244impl SynchronousTransform for TraceObfuscation {
245    fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
246        for event in buffer {
247            if let Event::Trace(ref mut trace) = event {
248                for span in trace.spans_mut() {
249                    self.obfuscate_span(span);
250                }
251            }
252        }
253    }
254}
255
256#[cfg(test)]
257mod config_smoke {
258    use serde_json::json;
259
260    use super::TraceObfuscationConfiguration;
261    use crate::config_registry::structs;
262    use crate::config_registry::test_support::run_config_smoke_tests;
263
264    #[tokio::test]
265    async fn smoke_test() {
266        run_config_smoke_tests(structs::TRACE_OBFUSCATION_CONFIGURATION, &[], json!({}), |cfg| {
267            TraceObfuscationConfiguration::from_apm_configuration(&cfg)
268                .expect("TraceObfuscationConfiguration should deserialize")
269        })
270        .await
271    }
272}