saluki_components/transforms/trace_obfuscation/
mod.rs1mod credit_cards;
4mod http;
5mod json;
6mod memcached;
7mod obfuscator;
8mod redis;
9mod sql;
10mod sql_filters;
11mod sql_tokenizer;
12
13use async_trait::async_trait;
14use facet::Facet;
15use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
16use saluki_config::GenericConfiguration;
17use saluki_core::{
18 components::{transforms::*, ComponentContext},
19 data_model::event::{
20 trace::{AttributeValue, Span},
21 Event,
22 },
23 topology::EventsBuffer,
24};
25use saluki_error::GenericError;
26use serde::Deserialize;
27use stringtheory::MetaString;
28
29pub use self::obfuscator::{tags, ObfuscationConfig, Obfuscator};
30use crate::common::datadog::apm::ApmConfig;
31
32const TEXT_NON_PARSABLE_SQL: &str = "Non-parsable SQL query";
33
34#[derive(Deserialize, Facet)]
36#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
37pub struct TraceObfuscationConfiguration {
38 #[serde(default)]
40 pub config: ObfuscationConfig,
41}
42
43impl TraceObfuscationConfiguration {
44 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46 Self::from_apm_configuration(config)
47 }
48
49 pub fn from_apm_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
51 let apm_config = ApmConfig::from_configuration(config)?;
52 Ok(Self {
53 config: apm_config.obfuscation().clone(),
54 })
55 }
56
57 pub fn new() -> Self {
59 Self {
60 config: ObfuscationConfig::default(),
61 }
62 }
63}
64
65impl Default for TraceObfuscationConfiguration {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71#[async_trait]
72impl SynchronousTransformBuilder for TraceObfuscationConfiguration {
73 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
74 Ok(Box::new(TraceObfuscation {
75 obfuscator: Obfuscator::new(self.config.clone()),
76 }))
77 }
78}
79
80impl MemoryBounds for TraceObfuscationConfiguration {
81 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
82 builder
83 .minimum()
84 .with_single_value::<TraceObfuscation>("component struct");
85 }
86}
87
88pub struct TraceObfuscation {
90 obfuscator: Obfuscator,
91}
92
93impl TraceObfuscation {
94 fn obfuscate_span(&mut self, span: &mut Span) {
95 if self.obfuscator.config.credit_cards.enabled {
96 self.obfuscate_credit_cards_in_span(span);
97 }
98
99 match span.span_type() {
100 "http" | "web" => self.obfuscate_http_span(span),
101 "sql" | "cassandra" => self.obfuscate_sql_span(span),
102 "redis" | "valkey" => self.obfuscate_redis_span(span),
103 "memcached" => self.obfuscate_memcached_span(span),
104 "mongodb" => self.obfuscate_mongodb_span(span),
105 "elasticsearch" | "opensearch" => self.obfuscate_elasticsearch_span(span),
106 _ => {}
107 }
108 }
109
110 fn obfuscate_credit_cards_in_span(&mut self, span: &mut Span) {
111 for (key, value) in span.attributes.iter_mut() {
112 if let AttributeValue::String(str_val) = value {
113 if let Some(replacement) = self
114 .obfuscator
115 .obfuscate_credit_card_number(key.as_ref(), str_val.as_ref())
116 {
117 *str_val = replacement;
118 }
119 }
120 }
121 }
122
123 fn obfuscate_http_span(&mut self, span: &mut Span) {
124 let url_value = match span.attributes.get(tags::HTTP_URL).and_then(AttributeValue::as_string) {
125 Some(v) if !v.is_empty() => v.as_ref().to_owned(),
126 _ => return,
127 };
128
129 if let Some(obfuscated) = self.obfuscator.obfuscate_url(&url_value) {
130 span.attributes
131 .insert(tags::HTTP_URL.into(), AttributeValue::String(obfuscated));
132 }
133 }
134
135 fn obfuscate_sql_span(&mut self, span: &mut Span) {
136 let sql_query_owned: Option<String> = span
137 .attributes
138 .get(tags::DB_STATEMENT)
139 .and_then(AttributeValue::as_string)
140 .filter(|s| !s.is_empty())
141 .map(|s| s.as_ref().to_owned());
142 let sql_query: &str = match &sql_query_owned {
143 Some(s) => s.as_str(),
144 None => span.resource(),
145 };
146
147 if sql_query.is_empty() {
148 return;
149 }
150
151 let dbms_owned: Option<String> = span
152 .attributes
153 .get(tags::DBMS)
154 .and_then(AttributeValue::as_string)
155 .filter(|s| !s.is_empty())
156 .map(|s| s.as_ref().to_owned());
157
158 let config = match &dbms_owned {
159 Some(d) => self.obfuscator.config.sql.with_dbms(d.clone()),
160 None => self.obfuscator.config.sql.clone(),
161 };
162
163 match sql::obfuscate_sql_string(sql_query, &config) {
164 Ok(obfuscated) => {
165 let query: MetaString = obfuscated.query.into();
166
167 span.set_resource(query.clone());
168 span.attributes
169 .insert(tags::SQL_QUERY.into(), AttributeValue::String(query.clone()));
170
171 if span.attributes.contains_key(tags::DB_STATEMENT) {
172 span.attributes
173 .insert(tags::DB_STATEMENT.into(), AttributeValue::String(query));
174 }
175
176 if !obfuscated.table_names.is_empty() {
177 span.attributes.insert(
178 "sql.tables".into(),
179 AttributeValue::String(obfuscated.table_names.into()),
180 );
181 }
182 }
183 Err(_) => {
184 let non_parsable: MetaString = TEXT_NON_PARSABLE_SQL.into();
185 span.set_resource(non_parsable.clone());
186 span.attributes
187 .insert(tags::SQL_QUERY.into(), AttributeValue::String(non_parsable));
188 }
189 }
190 }
191
192 fn obfuscate_redis_span(&mut self, span: &mut Span) {
193 let resource = span.resource();
194 if resource.is_empty() {
195 return;
196 }
197
198 if let Some(quantized) = self.obfuscator.quantize_redis_string(resource) {
199 span.set_resource(quantized.to_string());
200 }
201
202 if span.span_type() == "redis" && self.obfuscator.config.redis.enabled {
203 if let Some(cmd_value) = span
204 .attributes
205 .get(tags::REDIS_RAW_COMMAND)
206 .and_then(AttributeValue::as_string)
207 .map(|s| s.as_ref().to_owned())
208 {
209 if let Some(obfuscated) = self.obfuscator.obfuscate_redis_string(&cmd_value) {
210 span.attributes
211 .insert(tags::REDIS_RAW_COMMAND.into(), AttributeValue::String(obfuscated));
212 }
213 }
214 }
215
216 if span.span_type() == "valkey" && self.obfuscator.config.valkey.enabled {
217 if let Some(cmd_value) = span
218 .attributes
219 .get(tags::VALKEY_RAW_COMMAND)
220 .and_then(AttributeValue::as_string)
221 .map(|s| s.as_ref().to_owned())
222 {
223 if let Some(obfuscated) = self.obfuscator.obfuscate_valkey_string(&cmd_value) {
224 span.attributes
225 .insert(tags::VALKEY_RAW_COMMAND.into(), AttributeValue::String(obfuscated));
226 }
227 }
228 }
229 }
230
231 fn obfuscate_memcached_span(&mut self, span: &mut Span) {
232 if !self.obfuscator.config.memcached.enabled {
233 return;
234 }
235
236 let cmd_value = match span
237 .attributes
238 .get(tags::MEMCACHED_COMMAND)
239 .and_then(AttributeValue::as_string)
240 {
241 Some(v) if !v.is_empty() => v.as_ref().to_owned(),
242 _ => return,
243 };
244
245 if let Some(obfuscated) = self.obfuscator.obfuscate_memcached_command(&cmd_value) {
246 if obfuscated.is_empty() {
247 span.attributes.remove(tags::MEMCACHED_COMMAND);
248 } else {
249 span.attributes
250 .insert(tags::MEMCACHED_COMMAND.into(), AttributeValue::String(obfuscated));
251 }
252 }
253 }
254
255 fn obfuscate_mongodb_span(&mut self, span: &mut Span) {
256 let query_value = match span
257 .attributes
258 .get(tags::MONGODB_QUERY)
259 .and_then(AttributeValue::as_string)
260 {
261 Some(v) => v.as_ref().to_owned(),
262 None => return,
263 };
264
265 if let Some(obfuscated) = self.obfuscator.obfuscate_mongodb_string(&query_value) {
266 span.attributes
267 .insert(tags::MONGODB_QUERY.into(), AttributeValue::String(obfuscated));
268 }
269 }
270
271 fn obfuscate_elasticsearch_span(&mut self, span: &mut Span) {
272 if let Some(body_value) = span
273 .attributes
274 .get(tags::ELASTIC_BODY)
275 .and_then(AttributeValue::as_string)
276 .map(|s| s.as_ref().to_owned())
277 {
278 if let Some(obfuscated) = self.obfuscator.obfuscate_elasticsearch_string(&body_value) {
279 span.attributes
280 .insert(tags::ELASTIC_BODY.into(), AttributeValue::String(obfuscated));
281 }
282 }
283
284 if let Some(body_value) = span
285 .attributes
286 .get(tags::OPENSEARCH_BODY)
287 .and_then(AttributeValue::as_string)
288 .map(|s| s.as_ref().to_owned())
289 {
290 if let Some(obfuscated) = self.obfuscator.obfuscate_opensearch_string(&body_value) {
291 span.attributes
292 .insert(tags::OPENSEARCH_BODY.into(), AttributeValue::String(obfuscated));
293 }
294 }
295 }
296}
297
298impl SynchronousTransform for TraceObfuscation {
299 fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
300 for event in buffer {
301 if let Event::Trace(ref mut trace) = event {
302 for span in trace.spans_mut() {
303 self.obfuscate_span(span);
304 }
305 }
306 }
307 }
308}
309
310#[cfg(test)]
311mod config_smoke {
312 use serde_json::json;
313
314 use super::TraceObfuscationConfiguration;
315 use crate::config_registry::structs;
316 use crate::config_registry::test_support::run_config_smoke_tests;
317
318 #[tokio::test]
319 async fn smoke_test() {
320 run_config_smoke_tests(structs::TRACE_OBFUSCATION_CONFIGURATION, &[], json!({}), |cfg| {
321 TraceObfuscationConfiguration::from_apm_configuration(&cfg)
322 .expect("TraceObfuscationConfiguration should deserialize")
323 })
324 .await
325 }
326}