saluki_components/transforms/trace_obfuscation/
mod.rs1mod credit_cards;
4mod http;
5mod json;
6mod memcached;
7mod obfuscator;
8mod redis;
9mod sql;
10mod sql_filters;
11mod sql_tokenizer;
12
13use async_trait::async_trait;
14use facet::Facet;
15use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
16use saluki_config::GenericConfiguration;
17use saluki_core::{
18 components::{transforms::*, ComponentContext},
19 data_model::event::{trace::Span, Event},
20 topology::EventsBuffer,
21};
22use saluki_error::GenericError;
23use serde::Deserialize;
24use stringtheory::MetaString;
25
26pub use self::obfuscator::{tags, ObfuscationConfig, Obfuscator};
27use crate::common::datadog::apm::ApmConfig;
28
29const TEXT_NON_PARSABLE_SQL: &str = "Non-parsable SQL query";
30
31#[derive(Deserialize, Facet)]
33#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
34pub struct TraceObfuscationConfiguration {
35 #[serde(default)]
37 pub config: ObfuscationConfig,
38}
39
40impl TraceObfuscationConfiguration {
41 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
43 Self::from_apm_configuration(config)
44 }
45
46 pub fn from_apm_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
48 let apm_config = ApmConfig::from_configuration(config)?;
49 Ok(Self {
50 config: apm_config.obfuscation().clone(),
51 })
52 }
53
54 pub fn new() -> Self {
56 Self {
57 config: ObfuscationConfig::default(),
58 }
59 }
60}
61
62impl Default for TraceObfuscationConfiguration {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68#[async_trait]
69impl SynchronousTransformBuilder for TraceObfuscationConfiguration {
70 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
71 Ok(Box::new(TraceObfuscation {
72 obfuscator: Obfuscator::new(self.config.clone()),
73 }))
74 }
75}
76
77impl MemoryBounds for TraceObfuscationConfiguration {
78 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
79 builder
80 .minimum()
81 .with_single_value::<TraceObfuscation>("component struct");
82 }
83}
84
85pub struct TraceObfuscation {
87 obfuscator: Obfuscator,
88}
89
90impl TraceObfuscation {
91 fn obfuscate_span(&mut self, span: &mut Span) {
92 if self.obfuscator.config.credit_cards.enabled {
93 self.obfuscate_credit_cards_in_span(span);
94 }
95
96 match span.span_type() {
97 "http" | "web" => self.obfuscate_http_span(span),
98 "sql" | "cassandra" => self.obfuscate_sql_span(span),
99 "redis" | "valkey" => self.obfuscate_redis_span(span),
100 "memcached" => self.obfuscate_memcached_span(span),
101 "mongodb" => self.obfuscate_mongodb_span(span),
102 "elasticsearch" | "opensearch" => self.obfuscate_elasticsearch_span(span),
103 _ => {}
104 }
105 }
106
107 fn obfuscate_credit_cards_in_span(&mut self, span: &mut Span) {
108 for (key, value) in span.meta_mut().iter_mut() {
109 if let Some(replacement) = self
110 .obfuscator
111 .obfuscate_credit_card_number(key.as_ref(), value.as_ref())
112 {
113 *value = replacement;
114 }
115 }
116 }
117
118 fn obfuscate_http_span(&mut self, span: &mut Span) {
119 let url_value = match span.meta().get(tags::HTTP_URL) {
120 Some(v) if !v.is_empty() => v.as_ref(),
121 _ => return,
122 };
123
124 if let Some(obfuscated) = self.obfuscator.obfuscate_url(url_value) {
125 span.meta_mut().insert(tags::HTTP_URL.into(), obfuscated);
126 }
127 }
128
129 fn obfuscate_sql_span(&mut self, span: &mut Span) {
130 let sql_query: &str = span
131 .meta()
132 .get(tags::DB_STATEMENT)
133 .map(|v| v.as_ref())
134 .filter(|s| !s.is_empty())
135 .unwrap_or_else(|| span.resource());
136
137 if sql_query.is_empty() {
138 return;
139 }
140
141 let dbms = span.meta().get(tags::DBMS);
142
143 let config = match dbms {
144 Some(d) if !d.is_empty() => self.obfuscator.config.sql.with_dbms(d.to_string()),
145 _ => self.obfuscator.config.sql.clone(),
146 };
147
148 match sql::obfuscate_sql_string(sql_query, &config) {
149 Ok(obfuscated) => {
150 let query: MetaString = obfuscated.query.into();
151
152 span.set_resource(query.clone());
153 span.meta_mut().insert(tags::SQL_QUERY.into(), query.clone());
154
155 if span.meta().contains_key(tags::DB_STATEMENT) {
156 span.meta_mut().insert(tags::DB_STATEMENT.into(), query);
157 }
158
159 if !obfuscated.table_names.is_empty() {
160 span.meta_mut()
161 .insert("sql.tables".into(), obfuscated.table_names.into());
162 }
163 }
164 Err(_) => {
165 let non_parsable: MetaString = TEXT_NON_PARSABLE_SQL.into();
166 span.set_resource(non_parsable.clone());
167 span.meta_mut().insert(tags::SQL_QUERY.into(), non_parsable);
168 }
169 }
170 }
171
172 fn obfuscate_redis_span(&mut self, span: &mut Span) {
173 let resource = span.resource();
174 if resource.is_empty() {
175 return;
176 }
177
178 if let Some(quantized) = self.obfuscator.quantize_redis_string(resource) {
179 span.set_resource(quantized.to_string());
180 }
181
182 if span.span_type() == "redis" && self.obfuscator.config.redis.enabled {
183 if let Some(cmd_value) = span.meta().get(tags::REDIS_RAW_COMMAND) {
184 if let Some(obfuscated) = self.obfuscator.obfuscate_redis_string(cmd_value.as_ref()) {
185 span.meta_mut().insert(tags::REDIS_RAW_COMMAND.into(), obfuscated);
186 }
187 }
188 }
189
190 if span.span_type() == "valkey" && self.obfuscator.config.valkey.enabled {
191 if let Some(cmd_value) = span.meta().get(tags::VALKEY_RAW_COMMAND) {
192 if let Some(obfuscated) = self.obfuscator.obfuscate_valkey_string(cmd_value.as_ref()) {
193 span.meta_mut().insert(tags::VALKEY_RAW_COMMAND.into(), obfuscated);
194 }
195 }
196 }
197 }
198
199 fn obfuscate_memcached_span(&mut self, span: &mut Span) {
200 if !self.obfuscator.config.memcached.enabled {
201 return;
202 }
203
204 let cmd_value = match span.meta().get(tags::MEMCACHED_COMMAND) {
205 Some(v) if !v.is_empty() => v.as_ref(),
206 _ => return,
207 };
208
209 if let Some(obfuscated) = self.obfuscator.obfuscate_memcached_command(cmd_value) {
210 if obfuscated.is_empty() {
211 span.meta_mut().remove(tags::MEMCACHED_COMMAND);
212 } else {
213 span.meta_mut().insert(tags::MEMCACHED_COMMAND.into(), obfuscated);
214 }
215 }
216 }
217
218 fn obfuscate_mongodb_span(&mut self, span: &mut Span) {
219 let query_value = match span.meta().get(tags::MONGODB_QUERY) {
220 Some(v) => v.as_ref(),
221 None => return,
222 };
223
224 if let Some(obfuscated) = self.obfuscator.obfuscate_mongodb_string(query_value) {
225 span.meta_mut().insert(tags::MONGODB_QUERY.into(), obfuscated);
226 }
227 }
228
229 fn obfuscate_elasticsearch_span(&mut self, span: &mut Span) {
230 if let Some(body_value) = span.meta().get(tags::ELASTIC_BODY) {
231 if let Some(obfuscated) = self.obfuscator.obfuscate_elasticsearch_string(body_value.as_ref()) {
232 span.meta_mut().insert(tags::ELASTIC_BODY.into(), obfuscated);
233 }
234 }
235
236 if let Some(body_value) = span.meta().get(tags::OPENSEARCH_BODY) {
237 if let Some(obfuscated) = self.obfuscator.obfuscate_opensearch_string(body_value.as_ref()) {
238 span.meta_mut().insert(tags::OPENSEARCH_BODY.into(), obfuscated);
239 }
240 }
241 }
242}
243
244impl SynchronousTransform for TraceObfuscation {
245 fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
246 for event in buffer {
247 if let Event::Trace(ref mut trace) = event {
248 for span in trace.spans_mut() {
249 self.obfuscate_span(span);
250 }
251 }
252 }
253 }
254}
255
256#[cfg(test)]
257mod config_smoke {
258 use serde_json::json;
259
260 use super::TraceObfuscationConfiguration;
261 use crate::config_registry::structs;
262 use crate::config_registry::test_support::run_config_smoke_tests;
263
264 #[tokio::test]
265 async fn smoke_test() {
266 run_config_smoke_tests(structs::TRACE_OBFUSCATION_CONFIGURATION, &[], json!({}), |cfg| {
267 TraceObfuscationConfiguration::from_apm_configuration(&cfg)
268 .expect("TraceObfuscationConfiguration should deserialize")
269 })
270 .await
271 }
272}