1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::LazyLock;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use bytesize::ByteSize;
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use regex::Regex;
11use saluki_config::GenericConfiguration;
12use saluki_context::{Context, ContextResolver, ContextResolverBuilder};
13use saluki_core::{
14 components::{
15 transforms::{SynchronousTransform, SynchronousTransformBuilder},
16 ComponentContext,
17 },
18 topology::EventsBuffer,
19};
20use saluki_error::{generic_error, ErrorContext, GenericError};
21use serde::{Deserialize, Serialize};
22use serde_with::{serde_as, DisplayFromStr, PickFirst};
23
24const MATCH_TYPE_WILDCARD: &str = "wildcard";
25const MATCH_TYPE_REGEX: &str = "regex";
26
27static ALLOWED_WILDCARD_MATCH_PATTERN: LazyLock<Regex> =
28 LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9\-_*.]+$").expect("Invalid regex in ALLOWED_WILDCARD_MATCH_PATTERN"));
29
30const fn default_context_string_interner_size() -> ByteSize {
31 ByteSize::kib(64)
32}
33#[serde_as]
35#[derive(Deserialize)]
36#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
37pub struct DogStatsDMapperConfiguration {
38 #[serde(
44 rename = "dogstatsd_mapper_string_interner_size",
45 default = "default_context_string_interner_size"
46 )]
47 context_string_interner_bytes: ByteSize,
48
49 #[serde_as(as = "PickFirst<(DisplayFromStr, _)>")]
51 #[serde(default)]
52 dogstatsd_mapper_profiles: MapperProfileConfigs,
53}
54
55#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
56struct MappingProfileConfig {
57 name: String,
58 prefix: String,
59 mappings: Vec<MetricMappingConfig>,
60}
61#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
62struct MapperProfileConfigs(pub Vec<MappingProfileConfig>);
63
64impl FromStr for MapperProfileConfigs {
65 type Err = serde_json::Error;
66
67 fn from_str(s: &str) -> Result<Self, Self::Err> {
68 let profiles: Vec<MappingProfileConfig> = serde_json::from_str(s)?;
69 Ok(MapperProfileConfigs(profiles))
70 }
71}
72
73#[cfg(test)]
74impl std::fmt::Display for MapperProfileConfigs {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "{}", serde_json::to_string(&self.0).unwrap_or_default())
77 }
78}
79
80impl MapperProfileConfigs {
81 fn build(
82 &self, context: ComponentContext, context_string_interner_bytes: ByteSize,
83 ) -> Result<MetricMapper, GenericError> {
84 let mut profiles = Vec::with_capacity(self.0.len());
85 for (i, config_profile) in self.0.iter().enumerate() {
86 if config_profile.name.is_empty() {
87 return Err(generic_error!("missing profile name"));
88 }
89 if config_profile.prefix.is_empty() {
90 return Err(generic_error!("missing prefix for profile: {}", config_profile.name));
91 }
92
93 let mut profile = MappingProfile {
94 prefix: config_profile.prefix.clone(),
95 mappings: Vec::with_capacity(config_profile.mappings.len()),
96 };
97
98 for mapping in &config_profile.mappings {
99 let match_type = match mapping.match_type.as_str() {
100 "" => MATCH_TYPE_WILDCARD,
102 MATCH_TYPE_WILDCARD => MATCH_TYPE_WILDCARD,
103 MATCH_TYPE_REGEX => MATCH_TYPE_REGEX,
104 unknown => {
105 return Err(generic_error!(
106 "profile: {}, mapping num {}: invalid match type `{}`, expected `wildcard` or `regex`",
107 config_profile.name,
108 i,
109 unknown,
110 ))
111 }
112 };
113 if mapping.name.is_empty() {
114 return Err(generic_error!(
115 "profile: {}, mapping num {}: name is required",
116 config_profile.name,
117 i
118 ));
119 }
120 if mapping.metric_match.is_empty() {
121 return Err(generic_error!(
122 "profile: {}, mapping num {}: match is required",
123 config_profile.name,
124 i
125 ));
126 }
127 let regex = build_regex(&mapping.metric_match, match_type)?;
128 profile.mappings.push(MetricMapping {
129 name: mapping.name.clone(),
130 tags: mapping.tags.clone(),
131 regex,
132 });
133 }
134 profiles.push(profile);
135 }
136
137 let context_string_interner_size = NonZeroUsize::new(context_string_interner_bytes.as_u64() as usize)
138 .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))
139 .unwrap();
140
141 let context_resolver =
142 ContextResolverBuilder::from_name(format!("{}/dsd_mapper/primary", context.component_id()))
143 .expect("resolver name is not empty")
144 .with_interner_capacity_bytes(context_string_interner_size)
145 .with_idle_context_expiration(Duration::from_secs(30))
146 .build();
147
148 Ok(MetricMapper {
149 context_resolver,
150 profiles,
151 })
152 }
153}
154
155fn build_regex(match_re: &str, match_type: &str) -> Result<Regex, GenericError> {
156 let mut pattern = match_re.to_owned();
157 if match_type == MATCH_TYPE_WILDCARD {
158 if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) {
160 return Err(generic_error!(
161 "invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`",
162 pattern,
163 ALLOWED_WILDCARD_MATCH_PATTERN.as_str()
164 ));
165 }
166 if pattern.contains("**") {
167 return Err(generic_error!(
168 "invalid wildcard match pattern `{}`, it should not contain consecutive `*`",
169 pattern
170 ));
171 }
172 pattern = pattern.replace(".", "\\.");
173 pattern = pattern.replace("*", "([^.]*)");
174 }
175
176 let final_pattern = format!("^{}$", pattern);
177
178 Regex::new(&final_pattern).with_error_context(|| {
179 format!(
180 "Failed to compile regular expression `{}` for `{}` match type",
181 final_pattern, match_type
182 )
183 })
184}
185
186#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
187struct MetricMappingConfig {
188 #[serde(rename = "match")]
190 metric_match: String,
191
192 #[serde(default)]
194 match_type: String,
195
196 name: String,
198
199 #[serde(default)]
201 tags: HashMap<String, String>,
202}
203
204struct MappingProfile {
205 prefix: String,
206 mappings: Vec<MetricMapping>,
207}
208
209struct MetricMapping {
210 name: String,
211 tags: HashMap<String, String>,
212 regex: Regex,
213}
214
215struct MetricMapper {
216 profiles: Vec<MappingProfile>,
217 context_resolver: ContextResolver,
218}
219
220impl MetricMapper {
221 fn try_map(&mut self, context: &Context) -> Option<Context> {
222 let metric_name = context.name();
223 let tags = context.tags();
224 let origin_tags = context.origin_tags();
225
226 for profile in &self.profiles {
227 if !metric_name.starts_with(&profile.prefix) && profile.prefix != "*" {
228 continue;
229 }
230
231 for mapping in &profile.mappings {
232 if let Some(captures) = mapping.regex.captures(metric_name) {
233 let mut name = String::new();
234 captures.expand(&mapping.name, &mut name);
235 let mut new_tags: Vec<String> = tags.into_iter().map(|tag| tag.as_str().to_owned()).collect();
236 for (tag_key, tag_value_expr) in &mapping.tags {
237 let mut expanded_value = String::new();
238 captures.expand(tag_value_expr, &mut expanded_value);
239 new_tags.push(format!("{}:{}", tag_key, expanded_value));
240 }
241 return self
242 .context_resolver
243 .resolve_with_origin_tags(&name, new_tags, origin_tags.clone());
244 }
245 }
246 }
247 None
248 }
249}
250
251impl DogStatsDMapperConfiguration {
252 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
254 Ok(config.as_typed()?)
255 }
256}
257
258#[async_trait]
259impl SynchronousTransformBuilder for DogStatsDMapperConfiguration {
260 async fn build(&self, context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
261 let metric_mapper = self
262 .dogstatsd_mapper_profiles
263 .build(context, self.context_string_interner_bytes)?;
264 Ok(Box::new(DogStatsDMapper { metric_mapper }))
265 }
266}
267
268impl MemoryBounds for DogStatsDMapperConfiguration {
269 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
270 builder
271 .minimum()
272 .with_single_value::<DogStatsDMapper>("component struct")
274 .with_fixed_amount("string interner", self.context_string_interner_bytes.as_u64() as usize);
277 }
278}
279
280pub struct DogStatsDMapper {
281 metric_mapper: MetricMapper,
282}
283
284impl SynchronousTransform for DogStatsDMapper {
285 fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) {
286 for event in event_buffer {
287 if let Some(metric) = event.try_as_metric_mut() {
288 if let Some(new_context) = self.metric_mapper.try_map(metric.context()) {
289 *metric.context_mut() = new_context;
290 }
291 }
292 }
293 }
294}
295
296#[cfg(test)]
297mod tests {
298
299 use bytesize::ByteSize;
300 use saluki_context::Context;
301 use saluki_core::{components::ComponentContext, data_model::event::metric::Metric, topology::ComponentId};
302 use saluki_error::GenericError;
303 use serde_json::{json, Value};
304
305 use super::{MapperProfileConfigs, MetricMapper};
306
307 fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric {
308 let context = Context::from_static_parts(name, tags);
309 Metric::counter(context, 1.0)
310 }
311
312 fn mapper(json_data: Value) -> Result<MetricMapper, GenericError> {
313 let context = ComponentContext::transform(ComponentId::try_from("test_mapper").unwrap());
314 let mpc: MapperProfileConfigs = serde_json::from_value(json_data)?;
315 let context_string_interner_bytes = ByteSize::kib(64);
316 mpc.build(context, context_string_interner_bytes)
317 }
318
319 fn assert_tags(context: &Context, expected_tags: &[&str]) {
320 for tag in expected_tags {
321 assert!(context.tags().has_tag(tag), "missing tag: {}", tag);
322 }
323 assert_eq!(context.tags().len(), expected_tags.len(), "unexpected number of tags");
324 }
325
326 #[tokio::test]
327 async fn test_mapper_wildcard_simple() {
328 let json_data = json!([{
329 "name": "test",
330 "prefix": "test.",
331 "mappings": [
332 {
333 "match": "test.job.duration.*.*",
334 "name": "test.job.duration",
335 "tags": {
336 "job_type": "$1",
337 "job_name": "$2"
338 }
339 },
340 {
341 "match": "test.job.size.*.*",
342 "name": "test.job.size",
343 "tags": {
344 "foo": "$1",
345 "bar": "$2"
346 }
347 }
348 ]
349 }]);
350
351 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
352 let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
353 let context = mapper.try_map(metric.context()).expect("should have remapped");
354 assert_eq!(context.name(), "test.job.duration");
355 assert_tags(&context, &["job_type:my_job_type", "job_name:my_job_name"]);
356
357 let metric = counter_metric("test.job.size.my_job_type.my_job_name", &[]);
358 let context = mapper.try_map(metric.context()).expect("should have remapped");
359 assert_eq!(context.name(), "test.job.size");
360 assert_tags(&context, &["foo:my_job_type", "bar:my_job_name"]);
361
362 let metric = counter_metric("test.job.size.not_match", &[]);
363 assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
364 }
365
366 #[tokio::test]
367 async fn test_partial_match() {
368 let json_data = json!([{
369 "name": "test",
370 "prefix": "test.",
371 "mappings": [
372 {
373 "match": "test.job.duration.*.*",
374 "name": "test.job.duration",
375 "tags": {
376 "job_type": "$1"
377 }
378 },
379 {
380 "match": "test.task.duration.*.*",
381 "name": "test.task.duration",
382 }
383 ]
384 }]);
385 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
386 let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
387 let context = mapper.try_map(metric.context()).expect("should have remapped");
388 assert_eq!(context.name(), "test.job.duration");
389 assert!(context.tags().has_tag("job_type:my_job_type"));
390
391 let metric = counter_metric("test.task.duration.my_job_type.my_job_name", &[]);
392 let context = mapper.try_map(metric.context()).expect("should have remapped");
393 assert_eq!(context.name(), "test.task.duration");
394 }
395
396 #[tokio::test]
397 async fn test_use_regex_expansion_alternative_syntax() {
398 let json_data = json!([{
399 "name": "test",
400 "prefix": "test.",
401 "mappings": [
402 {
403 "match": "test.job.duration.*.*",
404 "name": "test.job.duration",
405 "tags": {
406 "job_type": "${1}_x",
407 "job_name": "${2}_y"
408 }
409 }
410 ]
411 }]);
412
413 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
414
415 let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
416 let context = mapper.try_map(metric.context()).expect("should have remapped");
417 assert_eq!(context.name(), "test.job.duration");
418 assert_tags(&context, &["job_type:my_job_type_x", "job_name:my_job_name_y"]);
419 }
420
421 #[tokio::test]
422 async fn test_expand_name() {
423 let json_data = json!([{
424 "name": "test",
425 "prefix": "test.",
426 "mappings": [
427 {
428 "match": "test.job.duration.*.*",
429 "name": "test.hello.$2.$1",
430 "tags": {
431 "job_type": "$1",
432 "job_name": "$2"
433 }
434 }
435 ]
436 }]);
437
438 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
439
440 let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
441 let context = mapper.try_map(metric.context()).expect("should have remapped");
442 assert_eq!(context.name(), "test.hello.my_job_name.my_job_type");
443 assert_tags(&context, &["job_type:my_job_type", "job_name:my_job_name"]);
444 }
445
446 #[tokio::test]
447 async fn test_match_before_underscore() {
448 let json_data = json!([{
449 "name": "test",
450 "prefix": "test.",
451 "mappings": [
452 {
453 "match": "test.*_start",
454 "name": "test.start",
455 "tags": {
456 "job": "$1"
457 }
458 }
459 ]
460 }]);
461
462 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
463
464 let metric = counter_metric("test.my_job_start", &[]);
465 let context = mapper.try_map(metric.context()).expect("should have remapped");
466 assert_eq!(context.name(), "test.start");
467 assert!(context.tags().has_tag("job:my_job"));
468 }
469
470 #[tokio::test]
471 async fn test_no_tags() {
472 let json_data = json!([{
473 "name": "test",
474 "prefix": "test.",
475 "mappings": [
476 {
477 "match": "test.my-worker.start",
478 "name": "test.worker.start"
479 },
480 {
481 "match": "test.my-worker.stop.*",
482 "name": "test.worker.stop"
483 }
484 ]
485 }]);
486
487 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
488
489 let metric = counter_metric("test.my-worker.start", &[]);
490 let context = mapper.try_map(metric.context()).expect("should have remapped");
491 assert_eq!(context.name(), "test.worker.start");
492 assert!(context.tags().is_empty(), "Expected no tags");
493
494 let metric = counter_metric("test.my-worker.stop.worker-name", &[]);
495 let context = mapper.try_map(metric.context()).expect("should have remapped");
496 assert_eq!(context.name(), "test.worker.stop");
497 assert!(context.tags().is_empty(), "Expected no tags");
498 }
499
500 #[tokio::test]
501 async fn test_all_allowed_characters() {
502 let json_data = json!([{
503 "name": "test",
504 "prefix": "test.",
505 "mappings": [
506 {
507 "match": "test.abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-01234567.*",
508 "name": "test.alphabet"
509 }
510 ]
511 }]);
512
513 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
514
515 let metric = counter_metric(
516 "test.abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-01234567.123",
517 &[],
518 );
519 let context = mapper.try_map(metric.context()).expect("should have remapped");
520 assert_eq!(context.name(), "test.alphabet");
521 assert!(context.tags().is_empty(), "Expected no tags");
522 }
523
524 #[tokio::test]
525 async fn test_regex_match_type() {
526 let json_data = json!([{
527 "name": "test",
528 "prefix": "test.",
529 "mappings": [
530 {
531 "match": "test\\.job\\.duration\\.(.*)",
532 "match_type": "regex",
533 "name": "test.job.duration",
534 "tags": {
535 "job_name": "$1"
536 }
537 },
538 {
539 "match": "test\\.task\\.duration\\.(.*)",
540 "match_type": "regex",
541 "name": "test.task.duration",
542 "tags": {
543 "task_name": "$1"
544 }
545 }
546 ]
547 }]);
548
549 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
550 let metric = counter_metric("test.job.duration.my.funky.job$name-abc/123", &[]);
551 let context = mapper.try_map(metric.context()).expect("should have remapped");
552 assert_eq!(context.name(), "test.job.duration");
553 assert!(context.tags().has_tag("job_name:my.funky.job$name-abc/123"));
554
555 let metric = counter_metric("test.task.duration.MY_task_name", &[]);
556 let context = mapper.try_map(metric.context()).expect("should have remapped");
557 assert_eq!(context.name(), "test.task.duration");
558 assert!(context.tags().has_tag("task_name:MY_task_name"));
559 }
560
561 #[tokio::test]
562 async fn test_complex_regex_match_type() {
563 let json_data = json!([{
564 "name": "test",
565 "prefix": "test.",
566 "mappings": [
567 {
568 "match": "test\\.job\\.([a-z][0-9]-\\w+)\\.(.*)",
569 "match_type": "regex",
570 "name": "test.job",
571 "tags": {
572 "job_type": "$1",
573 "job_name": "$2"
574 }
575 }
576 ]
577 }]);
578
579 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
580
581 let metric = counter_metric("test.job.a5-foo.bar", &[]);
582 let context = mapper.try_map(metric.context()).expect("should have remapped");
583 assert_eq!(context.name(), "test.job");
584 assert_tags(&context, &["job_type:a5-foo", "job_name:bar"]);
585
586 let metric = counter_metric("test.job.foo.bar-not-match", &[]);
587 assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
588 }
589
590 #[tokio::test]
591 async fn test_profile_and_prefix() {
592 let json_data = json!([{
593 "name": "test",
594 "prefix": "foo.",
595 "mappings": [
596 {
597 "match": "foo.duration.*",
598 "name": "foo.duration",
599 "tags": {
600 "name": "$1"
601 }
602 }
603 ]
604 },
605 {
606 "name": "test",
607 "prefix": "bar.",
608 "mappings": [
609 {
610 "match": "bar.count.*",
611 "name": "bar.count",
612 "tags": {
613 "name": "$1"
614 }
615 },
616 {
617 "match": "foo.duration2.*",
618 "name": "foo.duration2",
619 "tags": {
620 "name": "$1"
621 }
622 }
623 ]
624 }]);
625
626 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
627
628 let metric = counter_metric("foo.duration.foo_name1", &[]);
629 let context = mapper.try_map(metric.context()).expect("should have remapped");
630 assert_eq!(context.name(), "foo.duration");
631 assert!(context.tags().has_tag("name:foo_name1"));
632
633 let metric = counter_metric("foo.duration2.foo_name1", &[]);
634 assert!(
635 mapper.try_map(metric.context()).is_none(),
636 "should not have remapped due to wrong group"
637 );
638
639 let metric = counter_metric("bar.count.bar_name1", &[]);
640 let context = mapper.try_map(metric.context()).expect("should have remapped");
641 assert_eq!(context.name(), "bar.count");
642 assert!(context.tags().has_tag("name:bar_name1"));
643
644 let metric = counter_metric("z.not.mapped", &[]);
645 assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
646 }
647
648 #[tokio::test]
649 async fn test_wildcard_prefix() {
650 let json_data = json!([{
651 "name": "test",
652 "prefix": "*",
653 "mappings": [
654 {
655 "match": "foo.duration.*",
656 "name": "foo.duration",
657 "tags": {
658 "name": "$1"
659 }
660 }
661 ]
662 }]);
663
664 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
665
666 let metric = counter_metric("foo.duration.foo_name1", &[]);
667 let context = mapper.try_map(metric.context()).expect("should have remapped");
668 assert_eq!(context.name(), "foo.duration");
669 assert!(context.tags().has_tag("name:foo_name1"));
670 }
671
672 #[tokio::test]
673 async fn test_wildcard_prefix_order() {
674 let json_data = json!([{
675 "name": "test",
676 "prefix": "*",
677 "mappings": [
678 {
679 "match": "foo.duration.*",
680 "name": "foo.duration",
681 "tags": {
682 "name1": "$1"
683 }
684 }
685 ]
686 },
687 {
688 "name": "test",
689 "prefix": "*",
690 "mappings": [
691 {
692 "match": "foo.duration.*",
693 "name": "foo.duration",
694 "tags": {
695 "name2": "$1"
696 }
697 }
698 ]
699 }]);
700
701 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
702 let metric = counter_metric("foo.duration.foo_name", &[]);
703 let context = mapper.try_map(metric.context()).expect("should have remapped");
704 assert_eq!(context.name(), "foo.duration");
705 assert!(context.tags().has_tag("name1:foo_name"));
706 assert!(
707 !context.tags().has_tag("name2:foo_name"),
708 "Only the first matching profile should apply"
709 );
710 }
711
712 #[tokio::test]
713 async fn test_multiple_profiles_order() {
714 let json_data = json!([{
715 "name": "test",
716 "prefix": "foo.",
717 "mappings": [
718 {
719 "match": "foo.*.duration.*",
720 "name": "foo.bar1.duration",
721 "tags": {
722 "bar": "$1",
723 "foo": "$2"
724 }
725 }
726 ]
727 },
728 {
729 "name": "test",
730 "prefix": "foo.bar.",
731 "mappings": [
732 {
733 "match": "foo.bar.duration.*",
734 "name": "foo.bar2.duration",
735 "tags": {
736 "foo_bar": "$1"
737 }
738 }
739 ]
740 }]);
741
742 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
743
744 let metric = counter_metric("foo.bar.duration.foo_name", &[]);
745 let context = mapper.try_map(metric.context()).expect("should have remapped");
746 assert_eq!(context.name(), "foo.bar1.duration");
747 assert_tags(&context, &["bar:bar", "foo:foo_name"]);
748 assert!(
749 !context.tags().has_tag("foo_bar:foo_name"),
750 "Only the first matching profile should apply"
751 );
752 }
753
754 #[tokio::test]
755 async fn test_different_regex_expansion_syntax() {
756 let json_data = json!([{
757 "name": "test",
758 "prefix": "test.",
759 "mappings": [
760 {
761 "match": "test.user.(\\w+).action.(\\w+)",
762 "match_type": "regex",
763 "name": "test.user.action",
764 "tags": {
765 "user": "$1",
766 "action": "$2"
767 }
768 }
769 ]
770 }]);
771
772 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
773 let metric = counter_metric("test.user.john_doe.action.login", &[]);
774 let context = mapper.try_map(metric.context()).expect("should have remapped");
775 assert_eq!(context.name(), "test.user.action");
776 assert_tags(&context, &["user:john_doe", "action:login"]);
777 }
778
779 #[tokio::test]
780 async fn test_retain_existing_tags() {
781 let json_data = json!([{
782 "name": "test",
783 "prefix": "test.",
784 "mappings": [
785 {
786 "match": "test.job.duration.*.*",
787 "name": "test.job.duration.$2",
788 "tags": {
789 "job_type": "$1",
790 "job_name": "$2"
791 }
792 },
793 ]
794 }]);
795 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
796 let metric = counter_metric("test.job.duration.abc.def", &["foo:bar", "baz"]);
797 let context = mapper.try_map(metric.context()).expect("should have remapped");
798 assert_eq!(context.name(), "test.job.duration.def");
799 assert!(context.tags().has_tag("foo:bar"));
800 assert!(context.tags().has_tag("baz"));
801 }
802
803 #[test]
804 fn test_empty_name() {
805 let json_data = json!([{
806 "name": "test",
807 "prefix": "test.",
808 "mappings": [
809 {
810 "match": "test.job.duration.*.*",
811 "name": "",
812 "tags": {
813 "job_type": "$1"
814 }
815 }
816 ]
817 }]);
818 assert!(mapper(json_data).is_err())
819 }
820
821 #[test]
822 fn test_missing_name() {
823 let json_data = json!([{
824 "name": "test",
825 "prefix": "test.",
826 "mappings": [
827 {
828 "match": "test.job.duration.*.*",
829 "tags": {
830 "job_type": "$1",
831 "job_name": "$2"
832 }
833 }
834 ]
835 }]);
836 assert!(mapper(json_data).is_err());
837 }
838
839 #[test]
840 fn test_invalid_match_regex_brackets() {
841 let json_data = json!([{
842 "name": "test",
843 "prefix": "test.",
844 "mappings": [
845 {
846 "match": "test.[]duration.*.*", "name": "test.job.duration"
848 }
849 ]
850 }]);
851 assert!(mapper(json_data).is_err());
852 }
853
854 #[test]
855 fn test_invalid_match_regex_caret() {
856 let json_data = json!([{
857 "name": "test",
858 "prefix": "test.",
859 "mappings": [
860 {
861 "match": "^test.invalid.duration.*.*", "name": "test.job.duration"
863 }
864 ]
865 }]);
866 assert!(mapper(json_data).is_err());
867 }
868
869 #[test]
870 fn test_consecutive_wildcards() {
871 let json_data = json!([{
872 "name": "test",
873 "prefix": "test.",
874 "mappings": [
875 {
876 "match": "test.invalid.duration.**", "name": "test.job.duration"
878 }
879 ]
880 }]);
881 assert!(mapper(json_data).is_err());
882 }
883
884 #[test]
885 fn test_invalid_match_type() {
886 let json_data = json!([{
887 "name": "test",
888 "prefix": "test.",
889 "mappings": [
890 {
891 "match": "test.invalid.duration",
892 "match_type": "invalid", "name": "test.job.duration"
894 }
895 ]
896 }]);
897 assert!(mapper(json_data).is_err());
898 }
899
900 #[test]
901 fn test_missing_profile_name() {
902 let json_data = json!([{
903 "prefix": "test.",
905 "mappings": [
906 {
907 "match": "test.invalid.duration",
908 "match_type": "invalid",
909 "name": "test.job.duration"
910 }
911 ]
912 }]);
913 assert!(mapper(json_data).is_err());
914 }
915
916 #[test]
917 fn test_missing_profile_prefix() {
918 let json_data = json!([{
919 "name": "test",
920 "mappings": [
922 {
923 "match": "test.invalid.duration",
924 "match_type": "invalid",
925 "name": "test.job.duration"
926 }
927 ]
928 }]);
929 assert!(mapper(json_data).is_err());
930 }
931}
932
933#[cfg(test)]
934mod config_smoke {
935 use serde_json::json;
936
937 use super::DogStatsDMapperConfiguration;
938 use crate::config_registry::structs;
939 use crate::config_registry::test_support::run_config_smoke_tests;
940
941 #[tokio::test]
942 async fn smoke_test() {
943 run_config_smoke_tests(structs::DOGSTATSD_MAPPER_CONFIGURATION, &[], json!({}), |cfg| {
944 cfg.as_typed::<DogStatsDMapperConfiguration>()
945 .expect("DogStatsDMapperConfiguration should deserialize")
946 })
947 .await
948 }
949}