Skip to main content

saluki_components/transforms/trace_obfuscation/
mod.rs

1//! Trace obfuscation transform.
2
3mod credit_cards;
4mod http;
5mod json;
6mod memcached;
7mod obfuscator;
8mod redis;
9mod sql;
10mod sql_filters;
11mod sql_tokenizer;
12
13use async_trait::async_trait;
14use facet::Facet;
15use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
16use saluki_config::GenericConfiguration;
17use saluki_core::{
18    components::{transforms::*, ComponentContext},
19    data_model::event::{
20        trace::{AttributeValue, Span},
21        Event,
22    },
23    topology::EventsBuffer,
24};
25use saluki_error::GenericError;
26use serde::Deserialize;
27use stringtheory::MetaString;
28
29pub use self::obfuscator::{tags, ObfuscationConfig, Obfuscator};
30use crate::common::datadog::apm::ApmConfig;
31
32const TEXT_NON_PARSABLE_SQL: &str = "Non-parsable SQL query";
33
34/// Trace obfuscation configuration.
35#[derive(Deserialize, Facet)]
36#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
37pub struct TraceObfuscationConfiguration {
38    /// Obfuscator configuration.
39    #[serde(default)]
40    pub config: ObfuscationConfig,
41}
42
43impl TraceObfuscationConfiguration {
44    /// Creates a new `TraceObfuscationConfiguration` from the given generic configuration.
45    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46        Self::from_apm_configuration(config)
47    }
48
49    /// Creates a new `TraceObfuscationConfiguration` from the APM configuration section.
50    pub fn from_apm_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
51        let apm_config = ApmConfig::from_configuration(config)?;
52        Ok(Self {
53            config: apm_config.obfuscation().clone(),
54        })
55    }
56
57    /// Creates a new `TraceObfuscationConfiguration` with default settings.
58    pub fn new() -> Self {
59        Self {
60            config: ObfuscationConfig::default(),
61        }
62    }
63}
64
65impl Default for TraceObfuscationConfiguration {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71#[async_trait]
72impl SynchronousTransformBuilder for TraceObfuscationConfiguration {
73    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
74        Ok(Box::new(TraceObfuscation {
75            obfuscator: Obfuscator::new(self.config.clone()),
76        }))
77    }
78}
79
80impl MemoryBounds for TraceObfuscationConfiguration {
81    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
82        builder
83            .minimum()
84            .with_single_value::<TraceObfuscation>("component struct");
85    }
86}
87
88/// The obfuscation transform that processes traces.
89pub struct TraceObfuscation {
90    obfuscator: Obfuscator,
91}
92
93impl TraceObfuscation {
94    fn obfuscate_span(&mut self, span: &mut Span) {
95        if self.obfuscator.config.credit_cards.enabled {
96            self.obfuscate_credit_cards_in_span(span);
97        }
98
99        match span.span_type() {
100            "http" | "web" => self.obfuscate_http_span(span),
101            "sql" | "cassandra" => self.obfuscate_sql_span(span),
102            "redis" | "valkey" => self.obfuscate_redis_span(span),
103            "memcached" => self.obfuscate_memcached_span(span),
104            "mongodb" => self.obfuscate_mongodb_span(span),
105            "elasticsearch" | "opensearch" => self.obfuscate_elasticsearch_span(span),
106            _ => {}
107        }
108    }
109
110    fn obfuscate_credit_cards_in_span(&mut self, span: &mut Span) {
111        for (key, value) in span.attributes.iter_mut() {
112            if let AttributeValue::String(str_val) = value {
113                if let Some(replacement) = self
114                    .obfuscator
115                    .obfuscate_credit_card_number(key.as_ref(), str_val.as_ref())
116                {
117                    *str_val = replacement;
118                }
119            }
120        }
121    }
122
123    fn obfuscate_http_span(&mut self, span: &mut Span) {
124        let url_value = match span.attributes.get(tags::HTTP_URL).and_then(AttributeValue::as_string) {
125            Some(v) if !v.is_empty() => v.as_ref().to_owned(),
126            _ => return,
127        };
128
129        if let Some(obfuscated) = self.obfuscator.obfuscate_url(&url_value) {
130            span.attributes
131                .insert(tags::HTTP_URL.into(), AttributeValue::String(obfuscated));
132        }
133    }
134
135    fn obfuscate_sql_span(&mut self, span: &mut Span) {
136        let sql_query_owned: Option<String> = span
137            .attributes
138            .get(tags::DB_STATEMENT)
139            .and_then(AttributeValue::as_string)
140            .filter(|s| !s.is_empty())
141            .map(|s| s.as_ref().to_owned());
142        let sql_query: &str = match &sql_query_owned {
143            Some(s) => s.as_str(),
144            None => span.resource(),
145        };
146
147        if sql_query.is_empty() {
148            return;
149        }
150
151        let dbms_owned: Option<String> = span
152            .attributes
153            .get(tags::DBMS)
154            .and_then(AttributeValue::as_string)
155            .filter(|s| !s.is_empty())
156            .map(|s| s.as_ref().to_owned());
157
158        let config = match &dbms_owned {
159            Some(d) => self.obfuscator.config.sql.with_dbms(d.clone()),
160            None => self.obfuscator.config.sql.clone(),
161        };
162
163        match sql::obfuscate_sql_string(sql_query, &config) {
164            Ok(obfuscated) => {
165                let query: MetaString = obfuscated.query.into();
166
167                span.set_resource(query.clone());
168                span.attributes
169                    .insert(tags::SQL_QUERY.into(), AttributeValue::String(query.clone()));
170
171                if span.attributes.contains_key(tags::DB_STATEMENT) {
172                    span.attributes
173                        .insert(tags::DB_STATEMENT.into(), AttributeValue::String(query));
174                }
175
176                if !obfuscated.table_names.is_empty() {
177                    span.attributes.insert(
178                        "sql.tables".into(),
179                        AttributeValue::String(obfuscated.table_names.into()),
180                    );
181                }
182            }
183            Err(_) => {
184                let non_parsable: MetaString = TEXT_NON_PARSABLE_SQL.into();
185                span.set_resource(non_parsable.clone());
186                span.attributes
187                    .insert(tags::SQL_QUERY.into(), AttributeValue::String(non_parsable));
188            }
189        }
190    }
191
192    fn obfuscate_redis_span(&mut self, span: &mut Span) {
193        let resource = span.resource();
194        if resource.is_empty() {
195            return;
196        }
197
198        if let Some(quantized) = self.obfuscator.quantize_redis_string(resource) {
199            span.set_resource(quantized.to_string());
200        }
201
202        if span.span_type() == "redis" && self.obfuscator.config.redis.enabled {
203            if let Some(cmd_value) = span
204                .attributes
205                .get(tags::REDIS_RAW_COMMAND)
206                .and_then(AttributeValue::as_string)
207                .map(|s| s.as_ref().to_owned())
208            {
209                if let Some(obfuscated) = self.obfuscator.obfuscate_redis_string(&cmd_value) {
210                    span.attributes
211                        .insert(tags::REDIS_RAW_COMMAND.into(), AttributeValue::String(obfuscated));
212                }
213            }
214        }
215
216        if span.span_type() == "valkey" && self.obfuscator.config.valkey.enabled {
217            if let Some(cmd_value) = span
218                .attributes
219                .get(tags::VALKEY_RAW_COMMAND)
220                .and_then(AttributeValue::as_string)
221                .map(|s| s.as_ref().to_owned())
222            {
223                if let Some(obfuscated) = self.obfuscator.obfuscate_valkey_string(&cmd_value) {
224                    span.attributes
225                        .insert(tags::VALKEY_RAW_COMMAND.into(), AttributeValue::String(obfuscated));
226                }
227            }
228        }
229    }
230
231    fn obfuscate_memcached_span(&mut self, span: &mut Span) {
232        if !self.obfuscator.config.memcached.enabled {
233            return;
234        }
235
236        let cmd_value = match span
237            .attributes
238            .get(tags::MEMCACHED_COMMAND)
239            .and_then(AttributeValue::as_string)
240        {
241            Some(v) if !v.is_empty() => v.as_ref().to_owned(),
242            _ => return,
243        };
244
245        if let Some(obfuscated) = self.obfuscator.obfuscate_memcached_command(&cmd_value) {
246            if obfuscated.is_empty() {
247                span.attributes.remove(tags::MEMCACHED_COMMAND);
248            } else {
249                span.attributes
250                    .insert(tags::MEMCACHED_COMMAND.into(), AttributeValue::String(obfuscated));
251            }
252        }
253    }
254
255    fn obfuscate_mongodb_span(&mut self, span: &mut Span) {
256        let query_value = match span
257            .attributes
258            .get(tags::MONGODB_QUERY)
259            .and_then(AttributeValue::as_string)
260        {
261            Some(v) => v.as_ref().to_owned(),
262            None => return,
263        };
264
265        if let Some(obfuscated) = self.obfuscator.obfuscate_mongodb_string(&query_value) {
266            span.attributes
267                .insert(tags::MONGODB_QUERY.into(), AttributeValue::String(obfuscated));
268        }
269    }
270
271    fn obfuscate_elasticsearch_span(&mut self, span: &mut Span) {
272        if let Some(body_value) = span
273            .attributes
274            .get(tags::ELASTIC_BODY)
275            .and_then(AttributeValue::as_string)
276            .map(|s| s.as_ref().to_owned())
277        {
278            if let Some(obfuscated) = self.obfuscator.obfuscate_elasticsearch_string(&body_value) {
279                span.attributes
280                    .insert(tags::ELASTIC_BODY.into(), AttributeValue::String(obfuscated));
281            }
282        }
283
284        if let Some(body_value) = span
285            .attributes
286            .get(tags::OPENSEARCH_BODY)
287            .and_then(AttributeValue::as_string)
288            .map(|s| s.as_ref().to_owned())
289        {
290            if let Some(obfuscated) = self.obfuscator.obfuscate_opensearch_string(&body_value) {
291                span.attributes
292                    .insert(tags::OPENSEARCH_BODY.into(), AttributeValue::String(obfuscated));
293            }
294        }
295    }
296}
297
298impl SynchronousTransform for TraceObfuscation {
299    fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
300        for event in buffer {
301            if let Event::Trace(ref mut trace) = event {
302                for span in trace.spans_mut() {
303                    self.obfuscate_span(span);
304                }
305            }
306        }
307    }
308}
309
310#[cfg(test)]
311mod config_smoke {
312    use serde_json::json;
313
314    use super::TraceObfuscationConfiguration;
315    use crate::config_registry::structs;
316    use crate::config_registry::test_support::run_config_smoke_tests;
317
318    #[tokio::test]
319    async fn smoke_test() {
320        run_config_smoke_tests(structs::TRACE_OBFUSCATION_CONFIGURATION, &[], json!({}), |cfg| {
321            TraceObfuscationConfiguration::from_apm_configuration(&cfg)
322                .expect("TraceObfuscationConfiguration should deserialize")
323        })
324        .await
325    }
326}