saluki_components/transforms/trace_obfuscation/
mod.rs1mod credit_cards;
4mod http;
5mod json;
6mod memcached;
7mod obfuscator;
8mod redis;
9mod sql;
10mod sql_filters;
11mod sql_tokenizer;
12
13use async_trait::async_trait;
14use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
15use saluki_config::GenericConfiguration;
16use saluki_core::{
17 components::{transforms::*, ComponentContext},
18 data_model::event::{trace::Span, Event},
19 topology::EventsBuffer,
20};
21use saluki_error::GenericError;
22use stringtheory::MetaString;
23
24pub use self::obfuscator::{tags, ObfuscationConfig, Obfuscator};
25use crate::common::datadog::apm::ApmConfig;
26
27const TEXT_NON_PARSABLE_SQL: &str = "Non-parsable SQL query";
28
29#[derive(serde::Deserialize)]
31pub struct TraceObfuscationConfiguration {
32 #[serde(default)]
34 pub config: ObfuscationConfig,
35}
36
37impl TraceObfuscationConfiguration {
38 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
40 Ok(config.as_typed()?)
41 }
42
43 pub fn from_apm_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
45 let apm_config = ApmConfig::from_configuration(config)?;
46 Ok(Self {
47 config: apm_config.obfuscation().clone(),
48 })
49 }
50
51 pub fn new() -> Self {
53 Self {
54 config: ObfuscationConfig::default(),
55 }
56 }
57}
58
59impl Default for TraceObfuscationConfiguration {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65#[async_trait]
66impl SynchronousTransformBuilder for TraceObfuscationConfiguration {
67 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
68 Ok(Box::new(TraceObfuscation {
69 obfuscator: Obfuscator::new(self.config.clone()),
70 }))
71 }
72}
73
74impl MemoryBounds for TraceObfuscationConfiguration {
75 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
76 builder
77 .minimum()
78 .with_single_value::<TraceObfuscation>("component struct");
79 }
80}
81
82pub struct TraceObfuscation {
84 obfuscator: Obfuscator,
85}
86
87impl TraceObfuscation {
88 fn obfuscate_span(&mut self, span: &mut Span) {
89 if self.obfuscator.config.credit_cards().enabled() {
90 self.obfuscate_credit_cards_in_span(span);
91 }
92
93 match span.span_type() {
94 "http" | "web" => self.obfuscate_http_span(span),
95 "sql" | "cassandra" => self.obfuscate_sql_span(span),
96 "redis" | "valkey" => self.obfuscate_redis_span(span),
97 "memcached" => self.obfuscate_memcached_span(span),
98 "mongodb" => self.obfuscate_mongodb_span(span),
99 "elasticsearch" | "opensearch" => self.obfuscate_elasticsearch_span(span),
100 _ => {}
101 }
102 }
103
104 fn obfuscate_credit_cards_in_span(&mut self, span: &mut Span) {
105 for (key, value) in span.meta_mut().iter_mut() {
106 if let Some(replacement) = self
107 .obfuscator
108 .obfuscate_credit_card_number(key.as_ref(), value.as_ref())
109 {
110 *value = replacement;
111 }
112 }
113 }
114
115 fn obfuscate_http_span(&mut self, span: &mut Span) {
116 let url_value = match span.meta().get(tags::HTTP_URL) {
117 Some(v) if !v.is_empty() => v.as_ref(),
118 _ => return,
119 };
120
121 if let Some(obfuscated) = self.obfuscator.obfuscate_url(url_value) {
122 span.meta_mut().insert(tags::HTTP_URL.into(), obfuscated);
123 }
124 }
125
126 fn obfuscate_sql_span(&mut self, span: &mut Span) {
127 let sql_query: &str = span
128 .meta()
129 .get(tags::DB_STATEMENT)
130 .map(|v| v.as_ref())
131 .filter(|s| !s.is_empty())
132 .unwrap_or_else(|| span.resource());
133
134 if sql_query.is_empty() {
135 return;
136 }
137
138 let dbms = span.meta().get(tags::DBMS);
139
140 let config = match dbms {
141 Some(d) if !d.is_empty() => self.obfuscator.config.sql().with_dbms(d.to_string()),
142 _ => self.obfuscator.config.sql().clone(),
143 };
144
145 match sql::obfuscate_sql_string(sql_query, &config) {
146 Ok(obfuscated) => {
147 let query: MetaString = obfuscated.query.into();
148
149 span.set_resource(query.clone());
150 span.meta_mut().insert(tags::SQL_QUERY.into(), query.clone());
151
152 if span.meta().contains_key(tags::DB_STATEMENT) {
153 span.meta_mut().insert(tags::DB_STATEMENT.into(), query);
154 }
155
156 if !obfuscated.table_names.is_empty() {
157 span.meta_mut()
158 .insert("sql.tables".into(), obfuscated.table_names.into());
159 }
160 }
161 Err(_) => {
162 let non_parsable: MetaString = TEXT_NON_PARSABLE_SQL.into();
163 span.set_resource(non_parsable.clone());
164 span.meta_mut().insert(tags::SQL_QUERY.into(), non_parsable);
165 }
166 }
167 }
168
169 fn obfuscate_redis_span(&mut self, span: &mut Span) {
170 let resource = span.resource();
171 if resource.is_empty() {
172 return;
173 }
174
175 if let Some(quantized) = self.obfuscator.quantize_redis_string(resource) {
176 span.set_resource(quantized.to_string());
177 }
178
179 if span.span_type() == "redis" && self.obfuscator.config.redis().enabled() {
180 if let Some(cmd_value) = span.meta().get(tags::REDIS_RAW_COMMAND) {
181 if let Some(obfuscated) = self.obfuscator.obfuscate_redis_string(cmd_value.as_ref()) {
182 span.meta_mut().insert(tags::REDIS_RAW_COMMAND.into(), obfuscated);
183 }
184 }
185 }
186
187 if span.span_type() == "valkey" && self.obfuscator.config.valkey().enabled() {
188 if let Some(cmd_value) = span.meta().get(tags::VALKEY_RAW_COMMAND) {
189 if let Some(obfuscated) = self.obfuscator.obfuscate_valkey_string(cmd_value.as_ref()) {
190 span.meta_mut().insert(tags::VALKEY_RAW_COMMAND.into(), obfuscated);
191 }
192 }
193 }
194 }
195
196 fn obfuscate_memcached_span(&mut self, span: &mut Span) {
197 if !self.obfuscator.config.memcached().enabled() {
198 return;
199 }
200
201 let cmd_value = match span.meta().get(tags::MEMCACHED_COMMAND) {
202 Some(v) if !v.is_empty() => v.as_ref(),
203 _ => return,
204 };
205
206 if let Some(obfuscated) = self.obfuscator.obfuscate_memcached_command(cmd_value) {
207 if obfuscated.is_empty() {
208 span.meta_mut().remove(tags::MEMCACHED_COMMAND);
209 } else {
210 span.meta_mut().insert(tags::MEMCACHED_COMMAND.into(), obfuscated);
211 }
212 }
213 }
214
215 fn obfuscate_mongodb_span(&mut self, span: &mut Span) {
216 let query_value = match span.meta().get(tags::MONGODB_QUERY) {
217 Some(v) => v.as_ref(),
218 None => return,
219 };
220
221 if let Some(obfuscated) = self.obfuscator.obfuscate_mongodb_string(query_value) {
222 span.meta_mut().insert(tags::MONGODB_QUERY.into(), obfuscated);
223 }
224 }
225
226 fn obfuscate_elasticsearch_span(&mut self, span: &mut Span) {
227 if let Some(body_value) = span.meta().get(tags::ELASTIC_BODY) {
228 if let Some(obfuscated) = self.obfuscator.obfuscate_elasticsearch_string(body_value.as_ref()) {
229 span.meta_mut().insert(tags::ELASTIC_BODY.into(), obfuscated);
230 }
231 }
232
233 if let Some(body_value) = span.meta().get(tags::OPENSEARCH_BODY) {
234 if let Some(obfuscated) = self.obfuscator.obfuscate_opensearch_string(body_value.as_ref()) {
235 span.meta_mut().insert(tags::OPENSEARCH_BODY.into(), obfuscated);
236 }
237 }
238 }
239}
240
241impl SynchronousTransform for TraceObfuscation {
242 fn transform_buffer(&mut self, buffer: &mut EventsBuffer) {
243 for event in buffer {
244 if let Event::Trace(ref mut trace) = event {
245 for span in trace.spans_mut() {
246 self.obfuscate_span(span);
247 }
248 }
249 }
250 }
251}