Skip to main content

saluki_components/transforms/dogstatsd_mapper/
mod.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::LazyLock;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use bytesize::ByteSize;
9use regex::Regex;
10use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
11use saluki_common::cache::{Cache, CacheBuilder};
12use saluki_config::GenericConfiguration;
13use saluki_context::tags::SharedTagSet;
14use saluki_context::tags::TagSet;
15use saluki_context::{Context, ContextResolver, ContextResolverBuilder};
16use saluki_core::{
17    components::{
18        transforms::{SynchronousTransform, SynchronousTransformBuilder},
19        ComponentContext,
20    },
21    topology::EventsBuffer,
22};
23use saluki_error::{generic_error, ErrorContext, GenericError};
24use serde::{Deserialize, Serialize};
25use serde_with::{serde_as, DisplayFromStr, PickFirst};
26use stringtheory::MetaString;
27
28const MATCH_TYPE_WILDCARD: &str = "wildcard";
29const MATCH_TYPE_REGEX: &str = "regex";
30
31static ALLOWED_WILDCARD_MATCH_PATTERN: LazyLock<Regex> =
32    LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9\-_*.]+$").expect("Invalid regex in ALLOWED_WILDCARD_MATCH_PATTERN"));
33
34const fn default_context_string_interner_size() -> ByteSize {
35    ByteSize::kib(64)
36}
37
38const fn default_dogstatsd_mapper_cache_size() -> usize {
39    1000
40}
41/// DogStatsD mapper transform.
42#[serde_as]
43#[derive(Deserialize)]
44#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
45pub struct DogStatsDMapperConfiguration {
46    /// Total size of the string interner used for contexts, in bytes.
47    ///
48    /// This controls the amount of memory that will be pre-allocated for the purpose
49    /// of interning mapped metric names and tags, which can help to avoid unnecessary
50    /// allocations and allocator fragmentation.
51    #[serde(
52        rename = "dogstatsd_mapper_string_interner_size",
53        default = "default_context_string_interner_size"
54    )]
55    context_string_interner_bytes: ByteSize,
56
57    /// Maximum number of mapped results to cache.
58    ///
59    /// When enabled, mapped metrics will be cached by name to avoid repeat evaluation of the configured mapper rules.
60    ///
61    /// When set to `0`, the cache is disabled.
62    ///
63    /// Defaults to `1000`.
64    #[serde(
65        rename = "dogstatsd_mapper_cache_size",
66        default = "default_dogstatsd_mapper_cache_size"
67    )]
68    cache_size: usize,
69
70    /// Configuration related to metric mapping.
71    #[serde_as(as = "PickFirst<(DisplayFromStr, _)>")]
72    #[serde(default)]
73    dogstatsd_mapper_profiles: MapperProfileConfigs,
74}
75
76#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
77struct MappingProfileConfig {
78    name: String,
79    prefix: String,
80    mappings: Vec<MetricMappingConfig>,
81}
82#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
83struct MapperProfileConfigs(pub Vec<MappingProfileConfig>);
84
85impl FromStr for MapperProfileConfigs {
86    type Err = serde_json::Error;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        let profiles: Vec<MappingProfileConfig> = serde_json::from_str(s)?;
90        Ok(MapperProfileConfigs(profiles))
91    }
92}
93
94#[cfg(test)]
95impl std::fmt::Display for MapperProfileConfigs {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        write!(f, "{}", serde_json::to_string(&self.0).unwrap_or_default())
98    }
99}
100
101impl MapperProfileConfigs {
102    fn build(
103        &self, context: ComponentContext, context_string_interner_bytes: ByteSize, cache_size: usize,
104    ) -> Result<MetricMapper, GenericError> {
105        let mut profiles = Vec::with_capacity(self.0.len());
106        for (i, config_profile) in self.0.iter().enumerate() {
107            if config_profile.name.is_empty() {
108                return Err(generic_error!("missing profile name"));
109            }
110            if config_profile.prefix.is_empty() {
111                return Err(generic_error!("missing prefix for profile: {}", config_profile.name));
112            }
113
114            let mut profile = MappingProfile {
115                prefix: config_profile.prefix.clone(),
116                mappings: Vec::with_capacity(config_profile.mappings.len()),
117            };
118
119            for mapping in &config_profile.mappings {
120                let match_type = match mapping.match_type.as_str() {
121                    // Default to wildcard when not set.
122                    "" => MATCH_TYPE_WILDCARD,
123                    MATCH_TYPE_WILDCARD => MATCH_TYPE_WILDCARD,
124                    MATCH_TYPE_REGEX => MATCH_TYPE_REGEX,
125                    unknown => {
126                        return Err(generic_error!(
127                            "profile: {}, mapping num {}: invalid match type `{}`, expected `wildcard` or `regex`",
128                            config_profile.name,
129                            i,
130                            unknown,
131                        ))
132                    }
133                };
134                if mapping.name.is_empty() {
135                    return Err(generic_error!(
136                        "profile: {}, mapping num {}: name is required",
137                        config_profile.name,
138                        i
139                    ));
140                }
141                if mapping.metric_match.is_empty() {
142                    return Err(generic_error!(
143                        "profile: {}, mapping num {}: match is required",
144                        config_profile.name,
145                        i
146                    ));
147                }
148                let regex = build_regex(&mapping.metric_match, match_type)?;
149                profile.mappings.push(MetricMapping {
150                    name: mapping.name.clone(),
151                    tags: mapping.tags.clone(),
152                    regex,
153                });
154            }
155            profiles.push(profile);
156        }
157
158        let context_string_interner_size = NonZeroUsize::new(context_string_interner_bytes.as_u64() as usize)
159            .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))
160            .unwrap();
161
162        let context_resolver =
163            ContextResolverBuilder::from_name(format!("{}/dsd_mapper/primary", context.component_id()))
164                .expect("resolver name is not empty")
165                .with_interner_capacity_bytes(context_string_interner_size)
166                .with_idle_context_expiration(Duration::from_secs(30))
167                .build();
168
169        let cache = match NonZeroUsize::new(cache_size) {
170            Some(capacity) => Some(
171                CacheBuilder::from_identifier(format!("{}/dsd_mapper/result_cache", context.component_id()))?
172                    .with_capacity(capacity)
173                    .build(),
174            ),
175            None => None,
176        };
177
178        Ok(MetricMapper {
179            context_resolver,
180            profiles,
181            cache,
182        })
183    }
184}
185
186fn build_regex(match_re: &str, match_type: &str) -> Result<Regex, GenericError> {
187    let mut pattern = match_re.to_owned();
188    if match_type == MATCH_TYPE_WILDCARD {
189        // Check it against the allowed wildcard pattern
190        if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) {
191            return Err(generic_error!(
192                "invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`",
193                pattern,
194                ALLOWED_WILDCARD_MATCH_PATTERN.as_str()
195            ));
196        }
197        if pattern.contains("**") {
198            return Err(generic_error!(
199                "invalid wildcard match pattern `{}`, it should not contain consecutive `*`",
200                pattern
201            ));
202        }
203        pattern = pattern.replace(".", "\\.");
204        pattern = pattern.replace("*", "([^.]*)");
205    }
206
207    let final_pattern = format!("^{}$", pattern);
208
209    Regex::new(&final_pattern).with_error_context(|| {
210        format!(
211            "Failed to compile regular expression `{}` for `{}` match type",
212            final_pattern, match_type
213        )
214    })
215}
216
217#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
218struct MetricMappingConfig {
219    // The metric name to extract groups from with the Wildcard or Regex match logic.
220    #[serde(rename = "match")]
221    metric_match: String,
222
223    // The type of match to apply to the `metric_match`. Either wildcard or regex.
224    #[serde(default)]
225    match_type: String,
226
227    // The new metric name to send to Datadog with the tags defined in the same group.
228    name: String,
229
230    // Map with the tag key and tag values collected from the `match_type` to inline.
231    #[serde(default)]
232    tags: HashMap<String, String>,
233}
234
235struct MappingProfile {
236    prefix: String,
237    mappings: Vec<MetricMapping>,
238}
239
240struct MetricMapping {
241    name: String,
242    tags: HashMap<String, String>,
243    regex: Regex,
244}
245
246#[derive(Clone)]
247struct CachedMapResult {
248    name: MetaString,
249    extra_tags: SharedTagSet,
250}
251
252struct MetricMapper {
253    profiles: Vec<MappingProfile>,
254    context_resolver: ContextResolver,
255    cache: Option<Cache<MetaString, Option<CachedMapResult>>>,
256}
257
258impl MetricMapper {
259    fn try_map(&mut self, context: &Context) -> Option<Context> {
260        // TODO: We should really be able to immutably borrow both the incoming tag set and the cached extra tags and
261        // chain them together for our call into `resolve_with_origin_tags`, avoiding any allocations... but we need
262        // some supporting work on the `TagSet` side to make it possible.
263
264        let metric_name = context.name();
265        let tags = context.tags();
266        let origin_tags = context.origin_tags();
267
268        // See if we have a cached result for this metric name.
269        if let Some(cache) = &self.cache {
270            if let Some(cached) = cache.get(metric_name) {
271                return match cached {
272                    None => None,
273                    Some(result) => {
274                        let mut merged_tags = tags.clone();
275                        merged_tags.merge_shared(&result.extra_tags);
276
277                        self.context_resolver.resolve_with_origin_tags(
278                            result.name.clone(),
279                            merged_tags,
280                            origin_tags.clone(),
281                        )
282                    }
283                };
284            }
285        }
286
287        // Slow path: iterate profiles and run regexes.
288        let mut new_name = String::new();
289        let mut expanded_tag_value = String::new();
290
291        for profile in &self.profiles {
292            if !metric_name.starts_with(&profile.prefix) && profile.prefix != "*" {
293                continue;
294            }
295
296            for mapping in &profile.mappings {
297                if let Some(captures) = mapping.regex.captures(metric_name) {
298                    new_name.clear();
299                    captures.expand(&mapping.name, &mut new_name);
300
301                    let mut extra_tags = TagSet::with_capacity(mapping.tags.len());
302                    for (tag_key, tag_value_expr) in &mapping.tags {
303                        expanded_tag_value.clear();
304                        expanded_tag_value.push_str(tag_key);
305                        expanded_tag_value.push(':');
306                        captures.expand(tag_value_expr, &mut expanded_tag_value);
307
308                        extra_tags.insert_tag(expanded_tag_value.as_str());
309                    }
310
311                    // Freeze the tags here so they can be shared / cached.
312                    let extra_tags = extra_tags.into_shared();
313
314                    let mut merged_tags = tags.clone();
315                    merged_tags.merge_shared(&extra_tags);
316
317                    let resolved = self.context_resolver.resolve_with_origin_tags(
318                        new_name.as_str(),
319                        merged_tags,
320                        origin_tags.clone(),
321                    )?;
322
323                    if let Some(cache) = &self.cache {
324                        cache.insert(
325                            metric_name.clone(),
326                            Some(CachedMapResult {
327                                name: resolved.name().clone(),
328                                extra_tags,
329                            }),
330                        );
331                    }
332                    return Some(resolved);
333                }
334            }
335        }
336
337        // We also cache "negative" results -- no match for this metric in the configured profiles -- to save ourselves some work.
338        if let Some(cache) = &self.cache {
339            cache.insert(metric_name.clone(), None);
340        }
341        None
342    }
343
344    #[cfg(test)]
345    fn cache_len(&self) -> Option<usize> {
346        self.cache.as_ref().map(|c| c.len())
347    }
348}
349
350impl DogStatsDMapperConfiguration {
351    /// Creates a new `DogstatsDMapperConfiguration` from the given configuration.
352    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
353        Ok(config.as_typed()?)
354    }
355}
356
357#[async_trait]
358impl SynchronousTransformBuilder for DogStatsDMapperConfiguration {
359    async fn build(&self, context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
360        let metric_mapper =
361            self.dogstatsd_mapper_profiles
362                .build(context, self.context_string_interner_bytes, self.cache_size)?;
363        Ok(Box::new(DogStatsDMapper { metric_mapper }))
364    }
365}
366
367impl MemoryBounds for DogStatsDMapperConfiguration {
368    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
369        let mut min = builder.minimum();
370        min
371            // Capture the size of the heap allocation when the component is built.
372            .with_single_value::<DogStatsDMapper>("component struct")
373            // We also allocate the backing storage for the string interner up front, which is used by our context
374            // resolver.
375            .with_fixed_amount("string interner", self.context_string_interner_bytes.as_u64() as usize);
376
377        // Account for the per-name result cache when enabled.
378        if self.cache_size > 0 {
379            min.with_array::<(MetaString, Option<CachedMapResult>)>("mapper result cache", self.cache_size);
380        }
381    }
382}
383
384pub struct DogStatsDMapper {
385    metric_mapper: MetricMapper,
386}
387
388impl SynchronousTransform for DogStatsDMapper {
389    fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) {
390        for event in event_buffer {
391            if let Some(metric) = event.try_as_metric_mut() {
392                if let Some(new_context) = self.metric_mapper.try_map(metric.context()) {
393                    *metric.context_mut() = new_context;
394                }
395            }
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402
403    use bytesize::ByteSize;
404    use saluki_context::Context;
405    use saluki_core::{components::ComponentContext, data_model::event::metric::Metric, topology::ComponentId};
406    use saluki_error::GenericError;
407    use serde_json::{json, Value};
408
409    use super::{MapperProfileConfigs, MetricMapper};
410
411    fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric {
412        let context = Context::from_static_parts(name, tags);
413        Metric::counter(context, 1.0)
414    }
415
416    fn mapper(json_data: Value) -> Result<MetricMapper, GenericError> {
417        mapper_with_cache(json_data, 1000)
418    }
419
420    fn mapper_with_cache(json_data: Value, cache_size: usize) -> Result<MetricMapper, GenericError> {
421        let context = ComponentContext::transform(ComponentId::try_from("test_mapper").unwrap());
422        let mpc: MapperProfileConfigs = serde_json::from_value(json_data)?;
423        let context_string_interner_bytes = ByteSize::kib(64);
424        mpc.build(context, context_string_interner_bytes, cache_size)
425    }
426
427    fn assert_tags(context: &Context, expected_tags: &[&str]) {
428        for tag in expected_tags {
429            assert!(context.tags().has_tag(tag), "missing tag: {}", tag);
430        }
431        assert_eq!(context.tags().len(), expected_tags.len(), "unexpected number of tags");
432    }
433
434    #[tokio::test]
435    async fn test_mapper_wildcard_simple() {
436        let json_data = json!([{
437          "name": "test",
438          "prefix": "test.",
439          "mappings": [
440            {
441              "match": "test.job.duration.*.*",
442              "name": "test.job.duration",
443              "tags": {
444                "job_type": "$1",
445                "job_name": "$2"
446              }
447            },
448            {
449              "match": "test.job.size.*.*",
450              "name": "test.job.size",
451              "tags": {
452                "foo": "$1",
453                "bar": "$2"
454              }
455            }
456          ]
457        }]);
458
459        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
460        let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
461        let context = mapper.try_map(metric.context()).expect("should have remapped");
462        assert_eq!(context.name(), "test.job.duration");
463        assert_tags(&context, &["job_type:my_job_type", "job_name:my_job_name"]);
464
465        let metric = counter_metric("test.job.size.my_job_type.my_job_name", &[]);
466        let context = mapper.try_map(metric.context()).expect("should have remapped");
467        assert_eq!(context.name(), "test.job.size");
468        assert_tags(&context, &["foo:my_job_type", "bar:my_job_name"]);
469
470        let metric = counter_metric("test.job.size.not_match", &[]);
471        assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
472    }
473
474    #[tokio::test]
475    async fn test_partial_match() {
476        let json_data = json!([{
477          "name": "test",
478          "prefix": "test.",
479          "mappings": [
480            {
481              "match": "test.job.duration.*.*",
482              "name": "test.job.duration",
483              "tags": {
484                "job_type": "$1"
485              }
486            },
487            {
488              "match": "test.task.duration.*.*",
489              "name": "test.task.duration",
490            }
491          ]
492        }]);
493        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
494        let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
495        let context = mapper.try_map(metric.context()).expect("should have remapped");
496        assert_eq!(context.name(), "test.job.duration");
497        assert!(context.tags().has_tag("job_type:my_job_type"));
498
499        let metric = counter_metric("test.task.duration.my_job_type.my_job_name", &[]);
500        let context = mapper.try_map(metric.context()).expect("should have remapped");
501        assert_eq!(context.name(), "test.task.duration");
502    }
503
504    #[tokio::test]
505    async fn test_use_regex_expansion_alternative_syntax() {
506        let json_data = json!([{
507            "name": "test",
508            "prefix": "test.",
509            "mappings": [
510                {
511                    "match": "test.job.duration.*.*",
512                    "name": "test.job.duration",
513                    "tags": {
514                        "job_type": "${1}_x",
515                        "job_name": "${2}_y"
516                    }
517                }
518            ]
519        }]);
520
521        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
522
523        let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
524        let context = mapper.try_map(metric.context()).expect("should have remapped");
525        assert_eq!(context.name(), "test.job.duration");
526        assert_tags(&context, &["job_type:my_job_type_x", "job_name:my_job_name_y"]);
527    }
528
529    #[tokio::test]
530    async fn test_expand_name() {
531        let json_data = json!([{
532            "name": "test",
533            "prefix": "test.",
534            "mappings": [
535                {
536                    "match": "test.job.duration.*.*",
537                    "name": "test.hello.$2.$1",
538                    "tags": {
539                        "job_type": "$1",
540                        "job_name": "$2"
541                    }
542                }
543            ]
544        }]);
545
546        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
547
548        let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
549        let context = mapper.try_map(metric.context()).expect("should have remapped");
550        assert_eq!(context.name(), "test.hello.my_job_name.my_job_type");
551        assert_tags(&context, &["job_type:my_job_type", "job_name:my_job_name"]);
552    }
553
554    #[tokio::test]
555    async fn test_match_before_underscore() {
556        let json_data = json!([{
557            "name": "test",
558            "prefix": "test.",
559            "mappings": [
560                {
561                    "match": "test.*_start",
562                    "name": "test.start",
563                    "tags": {
564                        "job": "$1"
565                    }
566                }
567            ]
568        }]);
569
570        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
571
572        let metric = counter_metric("test.my_job_start", &[]);
573        let context = mapper.try_map(metric.context()).expect("should have remapped");
574        assert_eq!(context.name(), "test.start");
575        assert!(context.tags().has_tag("job:my_job"));
576    }
577
578    #[tokio::test]
579    async fn test_no_tags() {
580        let json_data = json!([{
581            "name": "test",
582            "prefix": "test.",
583            "mappings": [
584                {
585                    "match": "test.my-worker.start",
586                    "name": "test.worker.start"
587                },
588                {
589                    "match": "test.my-worker.stop.*",
590                    "name": "test.worker.stop"
591                }
592            ]
593        }]);
594
595        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
596
597        let metric = counter_metric("test.my-worker.start", &[]);
598        let context = mapper.try_map(metric.context()).expect("should have remapped");
599        assert_eq!(context.name(), "test.worker.start");
600        assert!(context.tags().is_empty(), "Expected no tags");
601
602        let metric = counter_metric("test.my-worker.stop.worker-name", &[]);
603        let context = mapper.try_map(metric.context()).expect("should have remapped");
604        assert_eq!(context.name(), "test.worker.stop");
605        assert!(context.tags().is_empty(), "Expected no tags");
606    }
607
608    #[tokio::test]
609    async fn test_all_allowed_characters() {
610        let json_data = json!([{
611            "name": "test",
612            "prefix": "test.",
613            "mappings": [
614                {
615                    "match": "test.abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-01234567.*",
616                    "name": "test.alphabet"
617                }
618            ]
619        }]);
620
621        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
622
623        let metric = counter_metric(
624            "test.abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-01234567.123",
625            &[],
626        );
627        let context = mapper.try_map(metric.context()).expect("should have remapped");
628        assert_eq!(context.name(), "test.alphabet");
629        assert!(context.tags().is_empty(), "Expected no tags");
630    }
631
632    #[tokio::test]
633    async fn test_regex_match_type() {
634        let json_data = json!([{
635            "name": "test",
636            "prefix": "test.",
637            "mappings": [
638                {
639                    "match": "test\\.job\\.duration\\.(.*)",
640                    "match_type": "regex",
641                    "name": "test.job.duration",
642                    "tags": {
643                        "job_name": "$1"
644                    }
645                },
646                {
647                    "match": "test\\.task\\.duration\\.(.*)",
648                    "match_type": "regex",
649                    "name": "test.task.duration",
650                    "tags": {
651                        "task_name": "$1"
652                    }
653                }
654            ]
655        }]);
656
657        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
658        let metric = counter_metric("test.job.duration.my.funky.job$name-abc/123", &[]);
659        let context = mapper.try_map(metric.context()).expect("should have remapped");
660        assert_eq!(context.name(), "test.job.duration");
661        assert!(context.tags().has_tag("job_name:my.funky.job$name-abc/123"));
662
663        let metric = counter_metric("test.task.duration.MY_task_name", &[]);
664        let context = mapper.try_map(metric.context()).expect("should have remapped");
665        assert_eq!(context.name(), "test.task.duration");
666        assert!(context.tags().has_tag("task_name:MY_task_name"));
667    }
668
669    #[tokio::test]
670    async fn test_complex_regex_match_type() {
671        let json_data = json!([{
672            "name": "test",
673            "prefix": "test.",
674            "mappings": [
675                {
676                    "match": "test\\.job\\.([a-z][0-9]-\\w+)\\.(.*)",
677                    "match_type": "regex",
678                    "name": "test.job",
679                    "tags": {
680                        "job_type": "$1",
681                        "job_name": "$2"
682                    }
683                }
684            ]
685        }]);
686
687        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
688
689        let metric = counter_metric("test.job.a5-foo.bar", &[]);
690        let context = mapper.try_map(metric.context()).expect("should have remapped");
691        assert_eq!(context.name(), "test.job");
692        assert_tags(&context, &["job_type:a5-foo", "job_name:bar"]);
693
694        let metric = counter_metric("test.job.foo.bar-not-match", &[]);
695        assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
696    }
697
698    #[tokio::test]
699    async fn test_profile_and_prefix() {
700        let json_data = json!([{
701            "name": "test",
702            "prefix": "foo.",
703            "mappings": [
704                {
705                    "match": "foo.duration.*",
706                    "name": "foo.duration",
707                    "tags": {
708                        "name": "$1"
709                    }
710                }
711            ]
712        },
713        {
714            "name": "test",
715            "prefix": "bar.",
716            "mappings": [
717                {
718                    "match": "bar.count.*",
719                    "name": "bar.count",
720                    "tags": {
721                        "name": "$1"
722                    }
723                },
724                {
725                    "match": "foo.duration2.*",
726                    "name": "foo.duration2",
727                    "tags": {
728                        "name": "$1"
729                    }
730                }
731            ]
732        }]);
733
734        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
735
736        let metric = counter_metric("foo.duration.foo_name1", &[]);
737        let context = mapper.try_map(metric.context()).expect("should have remapped");
738        assert_eq!(context.name(), "foo.duration");
739        assert!(context.tags().has_tag("name:foo_name1"));
740
741        let metric = counter_metric("foo.duration2.foo_name1", &[]);
742        assert!(
743            mapper.try_map(metric.context()).is_none(),
744            "should not have remapped due to wrong group"
745        );
746
747        let metric = counter_metric("bar.count.bar_name1", &[]);
748        let context = mapper.try_map(metric.context()).expect("should have remapped");
749        assert_eq!(context.name(), "bar.count");
750        assert!(context.tags().has_tag("name:bar_name1"));
751
752        let metric = counter_metric("z.not.mapped", &[]);
753        assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
754    }
755
756    #[tokio::test]
757    async fn test_wildcard_prefix() {
758        let json_data = json!([{
759            "name": "test",
760            "prefix": "*",
761            "mappings": [
762                {
763                    "match": "foo.duration.*",
764                    "name": "foo.duration",
765                    "tags": {
766                        "name": "$1"
767                    }
768                }
769            ]
770        }]);
771
772        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
773
774        let metric = counter_metric("foo.duration.foo_name1", &[]);
775        let context = mapper.try_map(metric.context()).expect("should have remapped");
776        assert_eq!(context.name(), "foo.duration");
777        assert!(context.tags().has_tag("name:foo_name1"));
778    }
779
780    #[tokio::test]
781    async fn test_wildcard_prefix_order() {
782        let json_data = json!([{
783            "name": "test",
784            "prefix": "*",
785            "mappings": [
786                {
787                    "match": "foo.duration.*",
788                    "name": "foo.duration",
789                    "tags": {
790                        "name1": "$1"
791                    }
792                }
793            ]
794        },
795        {
796            "name": "test",
797            "prefix": "*",
798            "mappings": [
799                {
800                    "match": "foo.duration.*",
801                    "name": "foo.duration",
802                    "tags": {
803                        "name2": "$1"
804                    }
805                }
806            ]
807        }]);
808
809        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
810        let metric = counter_metric("foo.duration.foo_name", &[]);
811        let context = mapper.try_map(metric.context()).expect("should have remapped");
812        assert_eq!(context.name(), "foo.duration");
813        assert!(context.tags().has_tag("name1:foo_name"));
814        assert!(
815            !context.tags().has_tag("name2:foo_name"),
816            "Only the first matching profile should apply"
817        );
818    }
819
820    #[tokio::test]
821    async fn test_multiple_profiles_order() {
822        let json_data = json!([{
823            "name": "test",
824            "prefix": "foo.",
825            "mappings": [
826                {
827                    "match": "foo.*.duration.*",
828                    "name": "foo.bar1.duration",
829                    "tags": {
830                        "bar": "$1",
831                        "foo": "$2"
832                    }
833                }
834            ]
835        },
836        {
837            "name": "test",
838            "prefix": "foo.bar.",
839            "mappings": [
840                {
841                    "match": "foo.bar.duration.*",
842                    "name": "foo.bar2.duration",
843                    "tags": {
844                        "foo_bar": "$1"
845                    }
846                }
847            ]
848        }]);
849
850        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
851
852        let metric = counter_metric("foo.bar.duration.foo_name", &[]);
853        let context = mapper.try_map(metric.context()).expect("should have remapped");
854        assert_eq!(context.name(), "foo.bar1.duration");
855        assert_tags(&context, &["bar:bar", "foo:foo_name"]);
856        assert!(
857            !context.tags().has_tag("foo_bar:foo_name"),
858            "Only the first matching profile should apply"
859        );
860    }
861
862    #[tokio::test]
863    async fn test_different_regex_expansion_syntax() {
864        let json_data = json!([{
865            "name": "test",
866            "prefix": "test.",
867            "mappings": [
868                {
869                    "match": "test.user.(\\w+).action.(\\w+)",
870                    "match_type": "regex",
871                    "name": "test.user.action",
872                    "tags": {
873                        "user": "$1",
874                        "action": "$2"
875                    }
876                }
877            ]
878        }]);
879
880        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
881        let metric = counter_metric("test.user.john_doe.action.login", &[]);
882        let context = mapper.try_map(metric.context()).expect("should have remapped");
883        assert_eq!(context.name(), "test.user.action");
884        assert_tags(&context, &["user:john_doe", "action:login"]);
885    }
886
887    #[tokio::test]
888    async fn test_retain_existing_tags() {
889        let json_data = json!([{
890          "name": "test",
891          "prefix": "test.",
892          "mappings": [
893            {
894              "match": "test.job.duration.*.*",
895              "name": "test.job.duration.$2",
896              "tags": {
897                "job_type": "$1",
898                "job_name": "$2"
899              }
900            },
901          ]
902        }]);
903        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
904        let metric = counter_metric("test.job.duration.abc.def", &["foo:bar", "baz"]);
905        let context = mapper.try_map(metric.context()).expect("should have remapped");
906        assert_eq!(context.name(), "test.job.duration.def");
907        assert!(context.tags().has_tag("foo:bar"));
908        assert!(context.tags().has_tag("baz"));
909    }
910
911    #[test]
912    fn test_empty_name() {
913        let json_data = json!([{
914            "name": "test",
915            "prefix": "test.",
916            "mappings": [
917                {
918                    "match": "test.job.duration.*.*",
919                    "name": "",
920                    "tags": {
921                        "job_type": "$1"
922                    }
923                }
924            ]
925        }]);
926        assert!(mapper(json_data).is_err())
927    }
928
929    #[test]
930    fn test_missing_name() {
931        let json_data = json!([{
932            "name": "test",
933            "prefix": "test.",
934            "mappings": [
935                {
936                    "match": "test.job.duration.*.*",
937                    "tags": {
938                        "job_type": "$1",
939                        "job_name": "$2"
940                    }
941                }
942            ]
943        }]);
944        assert!(mapper(json_data).is_err());
945    }
946
947    #[test]
948    fn test_invalid_match_regex_brackets() {
949        let json_data = json!([{
950            "name": "test",
951            "prefix": "test.",
952            "mappings": [
953                {
954                    "match": "test.[]duration.*.*", // Invalid regex
955                    "name": "test.job.duration"
956                }
957            ]
958        }]);
959        assert!(mapper(json_data).is_err());
960    }
961
962    #[test]
963    fn test_invalid_match_regex_caret() {
964        let json_data = json!([{
965            "name": "test",
966            "prefix": "test.",
967            "mappings": [
968                {
969                    "match": "^test.invalid.duration.*.*", // Invalid regex
970                    "name": "test.job.duration"
971                }
972            ]
973        }]);
974        assert!(mapper(json_data).is_err());
975    }
976
977    #[test]
978    fn test_consecutive_wildcards() {
979        let json_data = json!([{
980            "name": "test",
981            "prefix": "test.",
982            "mappings": [
983                {
984                    "match": "test.invalid.duration.**", // Consecutive *
985                    "name": "test.job.duration"
986                }
987            ]
988        }]);
989        assert!(mapper(json_data).is_err());
990    }
991
992    #[test]
993    fn test_invalid_match_type() {
994        let json_data = json!([{
995            "name": "test",
996            "prefix": "test.",
997            "mappings": [
998                {
999                    "match": "test.invalid.duration",
1000                    "match_type": "invalid", // Invalid match_type
1001                    "name": "test.job.duration"
1002                }
1003            ]
1004        }]);
1005        assert!(mapper(json_data).is_err());
1006    }
1007
1008    #[test]
1009    fn test_missing_profile_name() {
1010        let json_data = json!([{
1011            // "name" is missing here
1012            "prefix": "test.",
1013            "mappings": [
1014                {
1015                    "match": "test.invalid.duration",
1016                    "match_type": "invalid",
1017                    "name": "test.job.duration"
1018                }
1019            ]
1020        }]);
1021        assert!(mapper(json_data).is_err());
1022    }
1023
1024    #[test]
1025    fn test_missing_profile_prefix() {
1026        let json_data = json!([{
1027            "name": "test",
1028            // "prefix" is missing here
1029            "mappings": [
1030                {
1031                    "match": "test.invalid.duration",
1032                    "match_type": "invalid",
1033                    "name": "test.job.duration"
1034                }
1035            ]
1036        }]);
1037        assert!(mapper(json_data).is_err());
1038    }
1039
1040    fn simple_mapping_profile() -> Value {
1041        json!([{
1042            "name": "test",
1043            "prefix": "test.",
1044            "mappings": [
1045                {
1046                    "match": "test.job.duration.*.*",
1047                    "name": "test.job.duration",
1048                    "tags": {
1049                        "job_type": "$1",
1050                        "job_name": "$2"
1051                    }
1052                }
1053            ]
1054        }])
1055    }
1056
1057    #[tokio::test]
1058    async fn test_cache_hit_returns_same_result_as_miss() {
1059        let mut mapper = mapper_with_cache(simple_mapping_profile(), 1000).expect("should have parsed mapping config");
1060        assert_eq!(mapper.cache_len(), Some(0));
1061
1062        let metric = counter_metric("test.job.duration.my_type.my_name", &[]);
1063        let first = mapper.try_map(metric.context()).expect("should have remapped");
1064        assert_eq!(mapper.cache_len(), Some(1));
1065
1066        let metric = counter_metric("test.job.duration.my_type.my_name", &[]);
1067        let second = mapper.try_map(metric.context()).expect("should have remapped");
1068        assert_eq!(mapper.cache_len(), Some(1));
1069
1070        assert_eq!(first.name(), second.name());
1071        assert_eq!(first.name(), "test.job.duration");
1072        assert_tags(&first, &["job_type:my_type", "job_name:my_name"]);
1073        assert_tags(&second, &["job_type:my_type", "job_name:my_name"]);
1074    }
1075
1076    #[tokio::test]
1077    async fn test_negative_cache() {
1078        let mut mapper = mapper_with_cache(simple_mapping_profile(), 1000).expect("should have parsed mapping config");
1079
1080        let metric = counter_metric("unrelated.metric.name", &[]);
1081        assert!(mapper.try_map(metric.context()).is_none());
1082        assert_eq!(mapper.cache_len(), Some(1));
1083
1084        let metric = counter_metric("unrelated.metric.name", &[]);
1085        assert!(mapper.try_map(metric.context()).is_none());
1086        assert_eq!(mapper.cache_len(), Some(1));
1087    }
1088
1089    #[tokio::test]
1090    async fn test_cache_disabled_when_zero() {
1091        let mut mapper = mapper_with_cache(simple_mapping_profile(), 0).expect("should have parsed mapping config");
1092        assert_eq!(mapper.cache_len(), None);
1093
1094        let metric = counter_metric("test.job.duration.my_type.my_name", &[]);
1095        let context = mapper.try_map(metric.context()).expect("should have remapped");
1096        assert_eq!(context.name(), "test.job.duration");
1097        assert_tags(&context, &["job_type:my_type", "job_name:my_name"]);
1098
1099        assert!(mapper
1100            .try_map(counter_metric("unrelated.metric", &[]).context())
1101            .is_none());
1102        assert_eq!(mapper.cache_len(), None);
1103    }
1104
1105    #[tokio::test]
1106    async fn test_cache_eviction() {
1107        let mut mapper = mapper_with_cache(simple_mapping_profile(), 2).expect("should have parsed mapping config");
1108
1109        for suffix in ["a", "b", "c"] {
1110            let name = format!("test.job.duration.t.{}", suffix);
1111            let metric = counter_metric(Box::leak(name.into_boxed_str()), &[]);
1112            mapper.try_map(metric.context()).expect("should have remapped");
1113        }
1114
1115        assert!(
1116            mapper.cache_len().unwrap() <= 2,
1117            "cache should not exceed configured capacity (got {})",
1118            mapper.cache_len().unwrap()
1119        );
1120    }
1121
1122    #[tokio::test]
1123    async fn test_flood_does_not_grow_cache() {
1124        // Many profiles, only the last one matches the test metric. A flood of identical
1125        // names should be served from the cache after the first call.
1126        let mut profiles: Vec<Value> = (0..50)
1127            .map(|i| {
1128                json!({
1129                    "name": format!("noise-{}", i),
1130                    "prefix": format!("noise{}.", i),
1131                    "mappings": [{
1132                        "match": format!("noise{}.*", i),
1133                        "name": "noise.mapped"
1134                    }]
1135                })
1136            })
1137            .collect();
1138        profiles.push(json!({
1139            "name": "real",
1140            "prefix": "real.",
1141            "mappings": [{
1142                "match": "real.metric.*",
1143                "name": "real.mapped",
1144                "tags": { "x": "$1" }
1145            }]
1146        }));
1147        let json_data = Value::Array(profiles);
1148
1149        let mut mapper = mapper_with_cache(json_data, 16).expect("should have parsed mapping config");
1150
1151        for _ in 0..10_000 {
1152            let metric = counter_metric("real.metric.flood", &[]);
1153            let context = mapper.try_map(metric.context()).expect("should have remapped");
1154            assert_eq!(context.name(), "real.mapped");
1155        }
1156
1157        assert_eq!(
1158            mapper.cache_len(),
1159            Some(1),
1160            "flood of identical names should populate exactly one cache entry"
1161        );
1162    }
1163}
1164
1165#[cfg(test)]
1166mod config_smoke {
1167    use serde_json::json;
1168
1169    use super::DogStatsDMapperConfiguration;
1170    use crate::config_registry::structs;
1171    use crate::config_registry::test_support::run_config_smoke_tests;
1172
1173    #[tokio::test]
1174    async fn smoke_test() {
1175        run_config_smoke_tests(structs::DOGSTATSD_MAPPER_CONFIGURATION, &[], json!({}), |cfg| {
1176            cfg.as_typed::<DogStatsDMapperConfiguration>()
1177                .expect("DogStatsDMapperConfiguration should deserialize")
1178        })
1179        .await
1180    }
1181}