Skip to main content

saluki_components/transforms/dogstatsd_mapper/
mod.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::LazyLock;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use bytesize::ByteSize;
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use regex::Regex;
11use saluki_config::GenericConfiguration;
12use saluki_context::{Context, ContextResolver, ContextResolverBuilder};
13use saluki_core::{
14    components::{
15        transforms::{SynchronousTransform, SynchronousTransformBuilder},
16        ComponentContext,
17    },
18    topology::EventsBuffer,
19};
20use saluki_error::{generic_error, ErrorContext, GenericError};
21use serde::{Deserialize, Serialize};
22use serde_with::{serde_as, DisplayFromStr, PickFirst};
23
24const MATCH_TYPE_WILDCARD: &str = "wildcard";
25const MATCH_TYPE_REGEX: &str = "regex";
26
27static ALLOWED_WILDCARD_MATCH_PATTERN: LazyLock<Regex> =
28    LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9\-_*.]+$").expect("Invalid regex in ALLOWED_WILDCARD_MATCH_PATTERN"));
29
30const fn default_context_string_interner_size() -> ByteSize {
31    ByteSize::kib(64)
32}
33/// DogStatsD mapper transform.
34#[serde_as]
35#[derive(Deserialize)]
36#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
37pub struct DogStatsDMapperConfiguration {
38    /// Total size of the string interner used for contexts, in bytes.
39    ///
40    /// This controls the amount of memory that will be pre-allocated for the purpose
41    /// of interning mapped metric names and tags, which can help to avoid unnecessary
42    /// allocations and allocator fragmentation.
43    #[serde(
44        rename = "dogstatsd_mapper_string_interner_size",
45        default = "default_context_string_interner_size"
46    )]
47    context_string_interner_bytes: ByteSize,
48
49    /// Configuration related to metric mapping.
50    #[serde_as(as = "PickFirst<(DisplayFromStr, _)>")]
51    #[serde(default)]
52    dogstatsd_mapper_profiles: MapperProfileConfigs,
53}
54
55#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
56struct MappingProfileConfig {
57    name: String,
58    prefix: String,
59    mappings: Vec<MetricMappingConfig>,
60}
61#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
62struct MapperProfileConfigs(pub Vec<MappingProfileConfig>);
63
64impl FromStr for MapperProfileConfigs {
65    type Err = serde_json::Error;
66
67    fn from_str(s: &str) -> Result<Self, Self::Err> {
68        let profiles: Vec<MappingProfileConfig> = serde_json::from_str(s)?;
69        Ok(MapperProfileConfigs(profiles))
70    }
71}
72
73#[cfg(test)]
74impl std::fmt::Display for MapperProfileConfigs {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(f, "{}", serde_json::to_string(&self.0).unwrap_or_default())
77    }
78}
79
80impl MapperProfileConfigs {
81    fn build(
82        &self, context: ComponentContext, context_string_interner_bytes: ByteSize,
83    ) -> Result<MetricMapper, GenericError> {
84        let mut profiles = Vec::with_capacity(self.0.len());
85        for (i, config_profile) in self.0.iter().enumerate() {
86            if config_profile.name.is_empty() {
87                return Err(generic_error!("missing profile name"));
88            }
89            if config_profile.prefix.is_empty() {
90                return Err(generic_error!("missing prefix for profile: {}", config_profile.name));
91            }
92
93            let mut profile = MappingProfile {
94                prefix: config_profile.prefix.clone(),
95                mappings: Vec::with_capacity(config_profile.mappings.len()),
96            };
97
98            for mapping in &config_profile.mappings {
99                let match_type = match mapping.match_type.as_str() {
100                    // Default to wildcard when not set.
101                    "" => MATCH_TYPE_WILDCARD,
102                    MATCH_TYPE_WILDCARD => MATCH_TYPE_WILDCARD,
103                    MATCH_TYPE_REGEX => MATCH_TYPE_REGEX,
104                    unknown => {
105                        return Err(generic_error!(
106                            "profile: {}, mapping num {}: invalid match type `{}`, expected `wildcard` or `regex`",
107                            config_profile.name,
108                            i,
109                            unknown,
110                        ))
111                    }
112                };
113                if mapping.name.is_empty() {
114                    return Err(generic_error!(
115                        "profile: {}, mapping num {}: name is required",
116                        config_profile.name,
117                        i
118                    ));
119                }
120                if mapping.metric_match.is_empty() {
121                    return Err(generic_error!(
122                        "profile: {}, mapping num {}: match is required",
123                        config_profile.name,
124                        i
125                    ));
126                }
127                let regex = build_regex(&mapping.metric_match, match_type)?;
128                profile.mappings.push(MetricMapping {
129                    name: mapping.name.clone(),
130                    tags: mapping.tags.clone(),
131                    regex,
132                });
133            }
134            profiles.push(profile);
135        }
136
137        let context_string_interner_size = NonZeroUsize::new(context_string_interner_bytes.as_u64() as usize)
138            .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))
139            .unwrap();
140
141        let context_resolver =
142            ContextResolverBuilder::from_name(format!("{}/dsd_mapper/primary", context.component_id()))
143                .expect("resolver name is not empty")
144                .with_interner_capacity_bytes(context_string_interner_size)
145                .with_idle_context_expiration(Duration::from_secs(30))
146                .build();
147
148        Ok(MetricMapper {
149            context_resolver,
150            profiles,
151        })
152    }
153}
154
155fn build_regex(match_re: &str, match_type: &str) -> Result<Regex, GenericError> {
156    let mut pattern = match_re.to_owned();
157    if match_type == MATCH_TYPE_WILDCARD {
158        // Check it against the allowed wildcard pattern
159        if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) {
160            return Err(generic_error!(
161                "invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`",
162                pattern,
163                ALLOWED_WILDCARD_MATCH_PATTERN.as_str()
164            ));
165        }
166        if pattern.contains("**") {
167            return Err(generic_error!(
168                "invalid wildcard match pattern `{}`, it should not contain consecutive `*`",
169                pattern
170            ));
171        }
172        pattern = pattern.replace(".", "\\.");
173        pattern = pattern.replace("*", "([^.]*)");
174    }
175
176    let final_pattern = format!("^{}$", pattern);
177
178    Regex::new(&final_pattern).with_error_context(|| {
179        format!(
180            "Failed to compile regular expression `{}` for `{}` match type",
181            final_pattern, match_type
182        )
183    })
184}
185
186#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
187struct MetricMappingConfig {
188    // The metric name to extract groups from with the Wildcard or Regex match logic.
189    #[serde(rename = "match")]
190    metric_match: String,
191
192    // The type of match to apply to the `metric_match`. Either wildcard or regex.
193    #[serde(default)]
194    match_type: String,
195
196    // The new metric name to send to Datadog with the tags defined in the same group.
197    name: String,
198
199    // Map with the tag key and tag values collected from the `match_type` to inline.
200    #[serde(default)]
201    tags: HashMap<String, String>,
202}
203
204struct MappingProfile {
205    prefix: String,
206    mappings: Vec<MetricMapping>,
207}
208
209struct MetricMapping {
210    name: String,
211    tags: HashMap<String, String>,
212    regex: Regex,
213}
214
215struct MetricMapper {
216    profiles: Vec<MappingProfile>,
217    context_resolver: ContextResolver,
218}
219
220impl MetricMapper {
221    fn try_map(&mut self, context: &Context) -> Option<Context> {
222        let metric_name = context.name();
223        let tags = context.tags();
224        let origin_tags = context.origin_tags();
225
226        for profile in &self.profiles {
227            if !metric_name.starts_with(&profile.prefix) && profile.prefix != "*" {
228                continue;
229            }
230
231            for mapping in &profile.mappings {
232                if let Some(captures) = mapping.regex.captures(metric_name) {
233                    let mut name = String::new();
234                    captures.expand(&mapping.name, &mut name);
235                    let mut new_tags: Vec<String> = tags.into_iter().map(|tag| tag.as_str().to_owned()).collect();
236                    for (tag_key, tag_value_expr) in &mapping.tags {
237                        let mut expanded_value = String::new();
238                        captures.expand(tag_value_expr, &mut expanded_value);
239                        new_tags.push(format!("{}:{}", tag_key, expanded_value));
240                    }
241                    return self
242                        .context_resolver
243                        .resolve_with_origin_tags(&name, new_tags, origin_tags.clone());
244                }
245            }
246        }
247        None
248    }
249}
250
251impl DogStatsDMapperConfiguration {
252    /// Creates a new `DogstatsDMapperConfiguration` from the given configuration.
253    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
254        Ok(config.as_typed()?)
255    }
256}
257
258#[async_trait]
259impl SynchronousTransformBuilder for DogStatsDMapperConfiguration {
260    async fn build(&self, context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
261        let metric_mapper = self
262            .dogstatsd_mapper_profiles
263            .build(context, self.context_string_interner_bytes)?;
264        Ok(Box::new(DogStatsDMapper { metric_mapper }))
265    }
266}
267
268impl MemoryBounds for DogStatsDMapperConfiguration {
269    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
270        builder
271            .minimum()
272            // Capture the size of the heap allocation when the component is built.
273            .with_single_value::<DogStatsDMapper>("component struct")
274            // We also allocate the backing storage for the string interner up front, which is used by our context
275            // resolver.
276            .with_fixed_amount("string interner", self.context_string_interner_bytes.as_u64() as usize);
277    }
278}
279
280pub struct DogStatsDMapper {
281    metric_mapper: MetricMapper,
282}
283
284impl SynchronousTransform for DogStatsDMapper {
285    fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) {
286        for event in event_buffer {
287            if let Some(metric) = event.try_as_metric_mut() {
288                if let Some(new_context) = self.metric_mapper.try_map(metric.context()) {
289                    *metric.context_mut() = new_context;
290                }
291            }
292        }
293    }
294}
295
296#[cfg(test)]
297mod tests {
298
299    use bytesize::ByteSize;
300    use saluki_context::Context;
301    use saluki_core::{components::ComponentContext, data_model::event::metric::Metric, topology::ComponentId};
302    use saluki_error::GenericError;
303    use serde_json::{json, Value};
304
305    use super::{MapperProfileConfigs, MetricMapper};
306
307    fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric {
308        let context = Context::from_static_parts(name, tags);
309        Metric::counter(context, 1.0)
310    }
311
312    fn mapper(json_data: Value) -> Result<MetricMapper, GenericError> {
313        let context = ComponentContext::transform(ComponentId::try_from("test_mapper").unwrap());
314        let mpc: MapperProfileConfigs = serde_json::from_value(json_data)?;
315        let context_string_interner_bytes = ByteSize::kib(64);
316        mpc.build(context, context_string_interner_bytes)
317    }
318
319    fn assert_tags(context: &Context, expected_tags: &[&str]) {
320        for tag in expected_tags {
321            assert!(context.tags().has_tag(tag), "missing tag: {}", tag);
322        }
323        assert_eq!(context.tags().len(), expected_tags.len(), "unexpected number of tags");
324    }
325
326    #[tokio::test]
327    async fn test_mapper_wildcard_simple() {
328        let json_data = json!([{
329          "name": "test",
330          "prefix": "test.",
331          "mappings": [
332            {
333              "match": "test.job.duration.*.*",
334              "name": "test.job.duration",
335              "tags": {
336                "job_type": "$1",
337                "job_name": "$2"
338              }
339            },
340            {
341              "match": "test.job.size.*.*",
342              "name": "test.job.size",
343              "tags": {
344                "foo": "$1",
345                "bar": "$2"
346              }
347            }
348          ]
349        }]);
350
351        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
352        let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
353        let context = mapper.try_map(metric.context()).expect("should have remapped");
354        assert_eq!(context.name(), "test.job.duration");
355        assert_tags(&context, &["job_type:my_job_type", "job_name:my_job_name"]);
356
357        let metric = counter_metric("test.job.size.my_job_type.my_job_name", &[]);
358        let context = mapper.try_map(metric.context()).expect("should have remapped");
359        assert_eq!(context.name(), "test.job.size");
360        assert_tags(&context, &["foo:my_job_type", "bar:my_job_name"]);
361
362        let metric = counter_metric("test.job.size.not_match", &[]);
363        assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
364    }
365
366    #[tokio::test]
367    async fn test_partial_match() {
368        let json_data = json!([{
369          "name": "test",
370          "prefix": "test.",
371          "mappings": [
372            {
373              "match": "test.job.duration.*.*",
374              "name": "test.job.duration",
375              "tags": {
376                "job_type": "$1"
377              }
378            },
379            {
380              "match": "test.task.duration.*.*",
381              "name": "test.task.duration",
382            }
383          ]
384        }]);
385        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
386        let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
387        let context = mapper.try_map(metric.context()).expect("should have remapped");
388        assert_eq!(context.name(), "test.job.duration");
389        assert!(context.tags().has_tag("job_type:my_job_type"));
390
391        let metric = counter_metric("test.task.duration.my_job_type.my_job_name", &[]);
392        let context = mapper.try_map(metric.context()).expect("should have remapped");
393        assert_eq!(context.name(), "test.task.duration");
394    }
395
396    #[tokio::test]
397    async fn test_use_regex_expansion_alternative_syntax() {
398        let json_data = json!([{
399            "name": "test",
400            "prefix": "test.",
401            "mappings": [
402                {
403                    "match": "test.job.duration.*.*",
404                    "name": "test.job.duration",
405                    "tags": {
406                        "job_type": "${1}_x",
407                        "job_name": "${2}_y"
408                    }
409                }
410            ]
411        }]);
412
413        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
414
415        let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
416        let context = mapper.try_map(metric.context()).expect("should have remapped");
417        assert_eq!(context.name(), "test.job.duration");
418        assert_tags(&context, &["job_type:my_job_type_x", "job_name:my_job_name_y"]);
419    }
420
421    #[tokio::test]
422    async fn test_expand_name() {
423        let json_data = json!([{
424            "name": "test",
425            "prefix": "test.",
426            "mappings": [
427                {
428                    "match": "test.job.duration.*.*",
429                    "name": "test.hello.$2.$1",
430                    "tags": {
431                        "job_type": "$1",
432                        "job_name": "$2"
433                    }
434                }
435            ]
436        }]);
437
438        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
439
440        let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
441        let context = mapper.try_map(metric.context()).expect("should have remapped");
442        assert_eq!(context.name(), "test.hello.my_job_name.my_job_type");
443        assert_tags(&context, &["job_type:my_job_type", "job_name:my_job_name"]);
444    }
445
446    #[tokio::test]
447    async fn test_match_before_underscore() {
448        let json_data = json!([{
449            "name": "test",
450            "prefix": "test.",
451            "mappings": [
452                {
453                    "match": "test.*_start",
454                    "name": "test.start",
455                    "tags": {
456                        "job": "$1"
457                    }
458                }
459            ]
460        }]);
461
462        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
463
464        let metric = counter_metric("test.my_job_start", &[]);
465        let context = mapper.try_map(metric.context()).expect("should have remapped");
466        assert_eq!(context.name(), "test.start");
467        assert!(context.tags().has_tag("job:my_job"));
468    }
469
470    #[tokio::test]
471    async fn test_no_tags() {
472        let json_data = json!([{
473            "name": "test",
474            "prefix": "test.",
475            "mappings": [
476                {
477                    "match": "test.my-worker.start",
478                    "name": "test.worker.start"
479                },
480                {
481                    "match": "test.my-worker.stop.*",
482                    "name": "test.worker.stop"
483                }
484            ]
485        }]);
486
487        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
488
489        let metric = counter_metric("test.my-worker.start", &[]);
490        let context = mapper.try_map(metric.context()).expect("should have remapped");
491        assert_eq!(context.name(), "test.worker.start");
492        assert!(context.tags().is_empty(), "Expected no tags");
493
494        let metric = counter_metric("test.my-worker.stop.worker-name", &[]);
495        let context = mapper.try_map(metric.context()).expect("should have remapped");
496        assert_eq!(context.name(), "test.worker.stop");
497        assert!(context.tags().is_empty(), "Expected no tags");
498    }
499
500    #[tokio::test]
501    async fn test_all_allowed_characters() {
502        let json_data = json!([{
503            "name": "test",
504            "prefix": "test.",
505            "mappings": [
506                {
507                    "match": "test.abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-01234567.*",
508                    "name": "test.alphabet"
509                }
510            ]
511        }]);
512
513        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
514
515        let metric = counter_metric(
516            "test.abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-01234567.123",
517            &[],
518        );
519        let context = mapper.try_map(metric.context()).expect("should have remapped");
520        assert_eq!(context.name(), "test.alphabet");
521        assert!(context.tags().is_empty(), "Expected no tags");
522    }
523
524    #[tokio::test]
525    async fn test_regex_match_type() {
526        let json_data = json!([{
527            "name": "test",
528            "prefix": "test.",
529            "mappings": [
530                {
531                    "match": "test\\.job\\.duration\\.(.*)",
532                    "match_type": "regex",
533                    "name": "test.job.duration",
534                    "tags": {
535                        "job_name": "$1"
536                    }
537                },
538                {
539                    "match": "test\\.task\\.duration\\.(.*)",
540                    "match_type": "regex",
541                    "name": "test.task.duration",
542                    "tags": {
543                        "task_name": "$1"
544                    }
545                }
546            ]
547        }]);
548
549        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
550        let metric = counter_metric("test.job.duration.my.funky.job$name-abc/123", &[]);
551        let context = mapper.try_map(metric.context()).expect("should have remapped");
552        assert_eq!(context.name(), "test.job.duration");
553        assert!(context.tags().has_tag("job_name:my.funky.job$name-abc/123"));
554
555        let metric = counter_metric("test.task.duration.MY_task_name", &[]);
556        let context = mapper.try_map(metric.context()).expect("should have remapped");
557        assert_eq!(context.name(), "test.task.duration");
558        assert!(context.tags().has_tag("task_name:MY_task_name"));
559    }
560
561    #[tokio::test]
562    async fn test_complex_regex_match_type() {
563        let json_data = json!([{
564            "name": "test",
565            "prefix": "test.",
566            "mappings": [
567                {
568                    "match": "test\\.job\\.([a-z][0-9]-\\w+)\\.(.*)",
569                    "match_type": "regex",
570                    "name": "test.job",
571                    "tags": {
572                        "job_type": "$1",
573                        "job_name": "$2"
574                    }
575                }
576            ]
577        }]);
578
579        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
580
581        let metric = counter_metric("test.job.a5-foo.bar", &[]);
582        let context = mapper.try_map(metric.context()).expect("should have remapped");
583        assert_eq!(context.name(), "test.job");
584        assert_tags(&context, &["job_type:a5-foo", "job_name:bar"]);
585
586        let metric = counter_metric("test.job.foo.bar-not-match", &[]);
587        assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
588    }
589
590    #[tokio::test]
591    async fn test_profile_and_prefix() {
592        let json_data = json!([{
593            "name": "test",
594            "prefix": "foo.",
595            "mappings": [
596                {
597                    "match": "foo.duration.*",
598                    "name": "foo.duration",
599                    "tags": {
600                        "name": "$1"
601                    }
602                }
603            ]
604        },
605        {
606            "name": "test",
607            "prefix": "bar.",
608            "mappings": [
609                {
610                    "match": "bar.count.*",
611                    "name": "bar.count",
612                    "tags": {
613                        "name": "$1"
614                    }
615                },
616                {
617                    "match": "foo.duration2.*",
618                    "name": "foo.duration2",
619                    "tags": {
620                        "name": "$1"
621                    }
622                }
623            ]
624        }]);
625
626        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
627
628        let metric = counter_metric("foo.duration.foo_name1", &[]);
629        let context = mapper.try_map(metric.context()).expect("should have remapped");
630        assert_eq!(context.name(), "foo.duration");
631        assert!(context.tags().has_tag("name:foo_name1"));
632
633        let metric = counter_metric("foo.duration2.foo_name1", &[]);
634        assert!(
635            mapper.try_map(metric.context()).is_none(),
636            "should not have remapped due to wrong group"
637        );
638
639        let metric = counter_metric("bar.count.bar_name1", &[]);
640        let context = mapper.try_map(metric.context()).expect("should have remapped");
641        assert_eq!(context.name(), "bar.count");
642        assert!(context.tags().has_tag("name:bar_name1"));
643
644        let metric = counter_metric("z.not.mapped", &[]);
645        assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
646    }
647
648    #[tokio::test]
649    async fn test_wildcard_prefix() {
650        let json_data = json!([{
651            "name": "test",
652            "prefix": "*",
653            "mappings": [
654                {
655                    "match": "foo.duration.*",
656                    "name": "foo.duration",
657                    "tags": {
658                        "name": "$1"
659                    }
660                }
661            ]
662        }]);
663
664        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
665
666        let metric = counter_metric("foo.duration.foo_name1", &[]);
667        let context = mapper.try_map(metric.context()).expect("should have remapped");
668        assert_eq!(context.name(), "foo.duration");
669        assert!(context.tags().has_tag("name:foo_name1"));
670    }
671
672    #[tokio::test]
673    async fn test_wildcard_prefix_order() {
674        let json_data = json!([{
675            "name": "test",
676            "prefix": "*",
677            "mappings": [
678                {
679                    "match": "foo.duration.*",
680                    "name": "foo.duration",
681                    "tags": {
682                        "name1": "$1"
683                    }
684                }
685            ]
686        },
687        {
688            "name": "test",
689            "prefix": "*",
690            "mappings": [
691                {
692                    "match": "foo.duration.*",
693                    "name": "foo.duration",
694                    "tags": {
695                        "name2": "$1"
696                    }
697                }
698            ]
699        }]);
700
701        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
702        let metric = counter_metric("foo.duration.foo_name", &[]);
703        let context = mapper.try_map(metric.context()).expect("should have remapped");
704        assert_eq!(context.name(), "foo.duration");
705        assert!(context.tags().has_tag("name1:foo_name"));
706        assert!(
707            !context.tags().has_tag("name2:foo_name"),
708            "Only the first matching profile should apply"
709        );
710    }
711
712    #[tokio::test]
713    async fn test_multiple_profiles_order() {
714        let json_data = json!([{
715            "name": "test",
716            "prefix": "foo.",
717            "mappings": [
718                {
719                    "match": "foo.*.duration.*",
720                    "name": "foo.bar1.duration",
721                    "tags": {
722                        "bar": "$1",
723                        "foo": "$2"
724                    }
725                }
726            ]
727        },
728        {
729            "name": "test",
730            "prefix": "foo.bar.",
731            "mappings": [
732                {
733                    "match": "foo.bar.duration.*",
734                    "name": "foo.bar2.duration",
735                    "tags": {
736                        "foo_bar": "$1"
737                    }
738                }
739            ]
740        }]);
741
742        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
743
744        let metric = counter_metric("foo.bar.duration.foo_name", &[]);
745        let context = mapper.try_map(metric.context()).expect("should have remapped");
746        assert_eq!(context.name(), "foo.bar1.duration");
747        assert_tags(&context, &["bar:bar", "foo:foo_name"]);
748        assert!(
749            !context.tags().has_tag("foo_bar:foo_name"),
750            "Only the first matching profile should apply"
751        );
752    }
753
754    #[tokio::test]
755    async fn test_different_regex_expansion_syntax() {
756        let json_data = json!([{
757            "name": "test",
758            "prefix": "test.",
759            "mappings": [
760                {
761                    "match": "test.user.(\\w+).action.(\\w+)",
762                    "match_type": "regex",
763                    "name": "test.user.action",
764                    "tags": {
765                        "user": "$1",
766                        "action": "$2"
767                    }
768                }
769            ]
770        }]);
771
772        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
773        let metric = counter_metric("test.user.john_doe.action.login", &[]);
774        let context = mapper.try_map(metric.context()).expect("should have remapped");
775        assert_eq!(context.name(), "test.user.action");
776        assert_tags(&context, &["user:john_doe", "action:login"]);
777    }
778
779    #[tokio::test]
780    async fn test_retain_existing_tags() {
781        let json_data = json!([{
782          "name": "test",
783          "prefix": "test.",
784          "mappings": [
785            {
786              "match": "test.job.duration.*.*",
787              "name": "test.job.duration.$2",
788              "tags": {
789                "job_type": "$1",
790                "job_name": "$2"
791              }
792            },
793          ]
794        }]);
795        let mut mapper = mapper(json_data).expect("should have parsed mapping config");
796        let metric = counter_metric("test.job.duration.abc.def", &["foo:bar", "baz"]);
797        let context = mapper.try_map(metric.context()).expect("should have remapped");
798        assert_eq!(context.name(), "test.job.duration.def");
799        assert!(context.tags().has_tag("foo:bar"));
800        assert!(context.tags().has_tag("baz"));
801    }
802
803    #[test]
804    fn test_empty_name() {
805        let json_data = json!([{
806            "name": "test",
807            "prefix": "test.",
808            "mappings": [
809                {
810                    "match": "test.job.duration.*.*",
811                    "name": "",
812                    "tags": {
813                        "job_type": "$1"
814                    }
815                }
816            ]
817        }]);
818        assert!(mapper(json_data).is_err())
819    }
820
821    #[test]
822    fn test_missing_name() {
823        let json_data = json!([{
824            "name": "test",
825            "prefix": "test.",
826            "mappings": [
827                {
828                    "match": "test.job.duration.*.*",
829                    "tags": {
830                        "job_type": "$1",
831                        "job_name": "$2"
832                    }
833                }
834            ]
835        }]);
836        assert!(mapper(json_data).is_err());
837    }
838
839    #[test]
840    fn test_invalid_match_regex_brackets() {
841        let json_data = json!([{
842            "name": "test",
843            "prefix": "test.",
844            "mappings": [
845                {
846                    "match": "test.[]duration.*.*", // Invalid regex
847                    "name": "test.job.duration"
848                }
849            ]
850        }]);
851        assert!(mapper(json_data).is_err());
852    }
853
854    #[test]
855    fn test_invalid_match_regex_caret() {
856        let json_data = json!([{
857            "name": "test",
858            "prefix": "test.",
859            "mappings": [
860                {
861                    "match": "^test.invalid.duration.*.*", // Invalid regex
862                    "name": "test.job.duration"
863                }
864            ]
865        }]);
866        assert!(mapper(json_data).is_err());
867    }
868
869    #[test]
870    fn test_consecutive_wildcards() {
871        let json_data = json!([{
872            "name": "test",
873            "prefix": "test.",
874            "mappings": [
875                {
876                    "match": "test.invalid.duration.**", // Consecutive *
877                    "name": "test.job.duration"
878                }
879            ]
880        }]);
881        assert!(mapper(json_data).is_err());
882    }
883
884    #[test]
885    fn test_invalid_match_type() {
886        let json_data = json!([{
887            "name": "test",
888            "prefix": "test.",
889            "mappings": [
890                {
891                    "match": "test.invalid.duration",
892                    "match_type": "invalid", // Invalid match_type
893                    "name": "test.job.duration"
894                }
895            ]
896        }]);
897        assert!(mapper(json_data).is_err());
898    }
899
900    #[test]
901    fn test_missing_profile_name() {
902        let json_data = json!([{
903            // "name" is missing here
904            "prefix": "test.",
905            "mappings": [
906                {
907                    "match": "test.invalid.duration",
908                    "match_type": "invalid",
909                    "name": "test.job.duration"
910                }
911            ]
912        }]);
913        assert!(mapper(json_data).is_err());
914    }
915
916    #[test]
917    fn test_missing_profile_prefix() {
918        let json_data = json!([{
919            "name": "test",
920            // "prefix" is missing here
921            "mappings": [
922                {
923                    "match": "test.invalid.duration",
924                    "match_type": "invalid",
925                    "name": "test.job.duration"
926                }
927            ]
928        }]);
929        assert!(mapper(json_data).is_err());
930    }
931}
932
933#[cfg(test)]
934mod config_smoke {
935    use serde_json::json;
936
937    use super::DogStatsDMapperConfiguration;
938    use crate::config_registry::structs;
939    use crate::config_registry::test_support::run_config_smoke_tests;
940
941    #[tokio::test]
942    async fn smoke_test() {
943        run_config_smoke_tests(structs::DOGSTATSD_MAPPER_CONFIGURATION, &[], json!({}), |cfg| {
944            cfg.as_typed::<DogStatsDMapperConfiguration>()
945                .expect("DogStatsDMapperConfiguration should deserialize")
946        })
947        .await
948    }
949}