1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::LazyLock;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use bytesize::ByteSize;
9use regex::Regex;
10use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
11use saluki_common::cache::{Cache, CacheBuilder};
12use saluki_config::GenericConfiguration;
13use saluki_context::tags::SharedTagSet;
14use saluki_context::tags::TagSet;
15use saluki_context::{Context, ContextResolver, ContextResolverBuilder};
16use saluki_core::{
17 components::{
18 transforms::{SynchronousTransform, SynchronousTransformBuilder},
19 ComponentContext,
20 },
21 topology::EventsBuffer,
22};
23use saluki_error::{generic_error, ErrorContext, GenericError};
24use serde::{Deserialize, Serialize};
25use serde_with::{serde_as, DisplayFromStr, PickFirst};
26use stringtheory::MetaString;
27
28const MATCH_TYPE_WILDCARD: &str = "wildcard";
29const MATCH_TYPE_REGEX: &str = "regex";
30
31static ALLOWED_WILDCARD_MATCH_PATTERN: LazyLock<Regex> =
32 LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9\-_*.]+$").expect("Invalid regex in ALLOWED_WILDCARD_MATCH_PATTERN"));
33
34const fn default_context_string_interner_size() -> ByteSize {
35 ByteSize::kib(64)
36}
37
38const fn default_dogstatsd_mapper_cache_size() -> usize {
39 1000
40}
41#[serde_as]
43#[derive(Deserialize)]
44#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
45pub struct DogStatsDMapperConfiguration {
46 #[serde(
52 rename = "dogstatsd_mapper_string_interner_size",
53 default = "default_context_string_interner_size"
54 )]
55 context_string_interner_bytes: ByteSize,
56
57 #[serde(
65 rename = "dogstatsd_mapper_cache_size",
66 default = "default_dogstatsd_mapper_cache_size"
67 )]
68 cache_size: usize,
69
70 #[serde_as(as = "PickFirst<(DisplayFromStr, _)>")]
72 #[serde(default)]
73 dogstatsd_mapper_profiles: MapperProfileConfigs,
74}
75
76#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
77struct MappingProfileConfig {
78 name: String,
79 prefix: String,
80 mappings: Vec<MetricMappingConfig>,
81}
82#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
83struct MapperProfileConfigs(pub Vec<MappingProfileConfig>);
84
85impl FromStr for MapperProfileConfigs {
86 type Err = serde_json::Error;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 let profiles: Vec<MappingProfileConfig> = serde_json::from_str(s)?;
90 Ok(MapperProfileConfigs(profiles))
91 }
92}
93
94#[cfg(test)]
95impl std::fmt::Display for MapperProfileConfigs {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 write!(f, "{}", serde_json::to_string(&self.0).unwrap_or_default())
98 }
99}
100
101impl MapperProfileConfigs {
102 fn build(
103 &self, context: ComponentContext, context_string_interner_bytes: ByteSize, cache_size: usize,
104 ) -> Result<MetricMapper, GenericError> {
105 let mut profiles = Vec::with_capacity(self.0.len());
106 for (i, config_profile) in self.0.iter().enumerate() {
107 if config_profile.name.is_empty() {
108 return Err(generic_error!("missing profile name"));
109 }
110 if config_profile.prefix.is_empty() {
111 return Err(generic_error!("missing prefix for profile: {}", config_profile.name));
112 }
113
114 let mut profile = MappingProfile {
115 prefix: config_profile.prefix.clone(),
116 mappings: Vec::with_capacity(config_profile.mappings.len()),
117 };
118
119 for mapping in &config_profile.mappings {
120 let match_type = match mapping.match_type.as_str() {
121 "" => MATCH_TYPE_WILDCARD,
123 MATCH_TYPE_WILDCARD => MATCH_TYPE_WILDCARD,
124 MATCH_TYPE_REGEX => MATCH_TYPE_REGEX,
125 unknown => {
126 return Err(generic_error!(
127 "profile: {}, mapping num {}: invalid match type `{}`, expected `wildcard` or `regex`",
128 config_profile.name,
129 i,
130 unknown,
131 ))
132 }
133 };
134 if mapping.name.is_empty() {
135 return Err(generic_error!(
136 "profile: {}, mapping num {}: name is required",
137 config_profile.name,
138 i
139 ));
140 }
141 if mapping.metric_match.is_empty() {
142 return Err(generic_error!(
143 "profile: {}, mapping num {}: match is required",
144 config_profile.name,
145 i
146 ));
147 }
148 let regex = build_regex(&mapping.metric_match, match_type)?;
149 profile.mappings.push(MetricMapping {
150 name: mapping.name.clone(),
151 tags: mapping.tags.clone(),
152 regex,
153 });
154 }
155 profiles.push(profile);
156 }
157
158 let context_string_interner_size = NonZeroUsize::new(context_string_interner_bytes.as_u64() as usize)
159 .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))
160 .unwrap();
161
162 let context_resolver =
163 ContextResolverBuilder::from_name(format!("{}/dsd_mapper/primary", context.component_id()))
164 .expect("resolver name is not empty")
165 .with_interner_capacity_bytes(context_string_interner_size)
166 .with_idle_context_expiration(Duration::from_secs(30))
167 .build();
168
169 let cache = match NonZeroUsize::new(cache_size) {
170 Some(capacity) => Some(
171 CacheBuilder::from_identifier(format!("{}/dsd_mapper/result_cache", context.component_id()))?
172 .with_capacity(capacity)
173 .build(),
174 ),
175 None => None,
176 };
177
178 Ok(MetricMapper {
179 context_resolver,
180 profiles,
181 cache,
182 })
183 }
184}
185
186fn build_regex(match_re: &str, match_type: &str) -> Result<Regex, GenericError> {
187 let mut pattern = match_re.to_owned();
188 if match_type == MATCH_TYPE_WILDCARD {
189 if !ALLOWED_WILDCARD_MATCH_PATTERN.is_match(&pattern) {
191 return Err(generic_error!(
192 "invalid wildcard match pattern `{}`, it does not match allowed match regex `{}`",
193 pattern,
194 ALLOWED_WILDCARD_MATCH_PATTERN.as_str()
195 ));
196 }
197 if pattern.contains("**") {
198 return Err(generic_error!(
199 "invalid wildcard match pattern `{}`, it should not contain consecutive `*`",
200 pattern
201 ));
202 }
203 pattern = pattern.replace(".", "\\.");
204 pattern = pattern.replace("*", "([^.]*)");
205 }
206
207 let final_pattern = format!("^{}$", pattern);
208
209 Regex::new(&final_pattern).with_error_context(|| {
210 format!(
211 "Failed to compile regular expression `{}` for `{}` match type",
212 final_pattern, match_type
213 )
214 })
215}
216
217#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
218struct MetricMappingConfig {
219 #[serde(rename = "match")]
221 metric_match: String,
222
223 #[serde(default)]
225 match_type: String,
226
227 name: String,
229
230 #[serde(default)]
232 tags: HashMap<String, String>,
233}
234
235struct MappingProfile {
236 prefix: String,
237 mappings: Vec<MetricMapping>,
238}
239
240struct MetricMapping {
241 name: String,
242 tags: HashMap<String, String>,
243 regex: Regex,
244}
245
246#[derive(Clone)]
247struct CachedMapResult {
248 name: MetaString,
249 extra_tags: SharedTagSet,
250}
251
252struct MetricMapper {
253 profiles: Vec<MappingProfile>,
254 context_resolver: ContextResolver,
255 cache: Option<Cache<MetaString, Option<CachedMapResult>>>,
256}
257
258impl MetricMapper {
259 fn try_map(&mut self, context: &Context) -> Option<Context> {
260 let metric_name = context.name();
265 let tags = context.tags();
266 let origin_tags = context.origin_tags();
267
268 if let Some(cache) = &self.cache {
270 if let Some(cached) = cache.get(metric_name) {
271 return match cached {
272 None => None,
273 Some(result) => {
274 let mut merged_tags = tags.clone();
275 merged_tags.merge_shared(&result.extra_tags);
276
277 self.context_resolver.resolve_with_origin_tags(
278 result.name.clone(),
279 merged_tags,
280 origin_tags.clone(),
281 )
282 }
283 };
284 }
285 }
286
287 let mut new_name = String::new();
289 let mut expanded_tag_value = String::new();
290
291 for profile in &self.profiles {
292 if !metric_name.starts_with(&profile.prefix) && profile.prefix != "*" {
293 continue;
294 }
295
296 for mapping in &profile.mappings {
297 if let Some(captures) = mapping.regex.captures(metric_name) {
298 new_name.clear();
299 captures.expand(&mapping.name, &mut new_name);
300
301 let mut extra_tags = TagSet::with_capacity(mapping.tags.len());
302 for (tag_key, tag_value_expr) in &mapping.tags {
303 expanded_tag_value.clear();
304 expanded_tag_value.push_str(tag_key);
305 expanded_tag_value.push(':');
306 captures.expand(tag_value_expr, &mut expanded_tag_value);
307
308 extra_tags.insert_tag(expanded_tag_value.as_str());
309 }
310
311 let extra_tags = extra_tags.into_shared();
313
314 let mut merged_tags = tags.clone();
315 merged_tags.merge_shared(&extra_tags);
316
317 let resolved = self.context_resolver.resolve_with_origin_tags(
318 new_name.as_str(),
319 merged_tags,
320 origin_tags.clone(),
321 )?;
322
323 if let Some(cache) = &self.cache {
324 cache.insert(
325 metric_name.clone(),
326 Some(CachedMapResult {
327 name: resolved.name().clone(),
328 extra_tags,
329 }),
330 );
331 }
332 return Some(resolved);
333 }
334 }
335 }
336
337 if let Some(cache) = &self.cache {
339 cache.insert(metric_name.clone(), None);
340 }
341 None
342 }
343
344 #[cfg(test)]
345 fn cache_len(&self) -> Option<usize> {
346 self.cache.as_ref().map(|c| c.len())
347 }
348}
349
350impl DogStatsDMapperConfiguration {
351 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
353 Ok(config.as_typed()?)
354 }
355}
356
357#[async_trait]
358impl SynchronousTransformBuilder for DogStatsDMapperConfiguration {
359 async fn build(&self, context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
360 let metric_mapper =
361 self.dogstatsd_mapper_profiles
362 .build(context, self.context_string_interner_bytes, self.cache_size)?;
363 Ok(Box::new(DogStatsDMapper { metric_mapper }))
364 }
365}
366
367impl MemoryBounds for DogStatsDMapperConfiguration {
368 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
369 let mut min = builder.minimum();
370 min
371 .with_single_value::<DogStatsDMapper>("component struct")
373 .with_fixed_amount("string interner", self.context_string_interner_bytes.as_u64() as usize);
376
377 if self.cache_size > 0 {
379 min.with_array::<(MetaString, Option<CachedMapResult>)>("mapper result cache", self.cache_size);
380 }
381 }
382}
383
384pub struct DogStatsDMapper {
385 metric_mapper: MetricMapper,
386}
387
388impl SynchronousTransform for DogStatsDMapper {
389 fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) {
390 for event in event_buffer {
391 if let Some(metric) = event.try_as_metric_mut() {
392 if let Some(new_context) = self.metric_mapper.try_map(metric.context()) {
393 *metric.context_mut() = new_context;
394 }
395 }
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402
403 use bytesize::ByteSize;
404 use saluki_context::Context;
405 use saluki_core::{components::ComponentContext, data_model::event::metric::Metric, topology::ComponentId};
406 use saluki_error::GenericError;
407 use serde_json::{json, Value};
408
409 use super::{MapperProfileConfigs, MetricMapper};
410
411 fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric {
412 let context = Context::from_static_parts(name, tags);
413 Metric::counter(context, 1.0)
414 }
415
416 fn mapper(json_data: Value) -> Result<MetricMapper, GenericError> {
417 mapper_with_cache(json_data, 1000)
418 }
419
420 fn mapper_with_cache(json_data: Value, cache_size: usize) -> Result<MetricMapper, GenericError> {
421 let context = ComponentContext::transform(ComponentId::try_from("test_mapper").unwrap());
422 let mpc: MapperProfileConfigs = serde_json::from_value(json_data)?;
423 let context_string_interner_bytes = ByteSize::kib(64);
424 mpc.build(context, context_string_interner_bytes, cache_size)
425 }
426
427 fn assert_tags(context: &Context, expected_tags: &[&str]) {
428 for tag in expected_tags {
429 assert!(context.tags().has_tag(tag), "missing tag: {}", tag);
430 }
431 assert_eq!(context.tags().len(), expected_tags.len(), "unexpected number of tags");
432 }
433
434 #[tokio::test]
435 async fn test_mapper_wildcard_simple() {
436 let json_data = json!([{
437 "name": "test",
438 "prefix": "test.",
439 "mappings": [
440 {
441 "match": "test.job.duration.*.*",
442 "name": "test.job.duration",
443 "tags": {
444 "job_type": "$1",
445 "job_name": "$2"
446 }
447 },
448 {
449 "match": "test.job.size.*.*",
450 "name": "test.job.size",
451 "tags": {
452 "foo": "$1",
453 "bar": "$2"
454 }
455 }
456 ]
457 }]);
458
459 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
460 let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
461 let context = mapper.try_map(metric.context()).expect("should have remapped");
462 assert_eq!(context.name(), "test.job.duration");
463 assert_tags(&context, &["job_type:my_job_type", "job_name:my_job_name"]);
464
465 let metric = counter_metric("test.job.size.my_job_type.my_job_name", &[]);
466 let context = mapper.try_map(metric.context()).expect("should have remapped");
467 assert_eq!(context.name(), "test.job.size");
468 assert_tags(&context, &["foo:my_job_type", "bar:my_job_name"]);
469
470 let metric = counter_metric("test.job.size.not_match", &[]);
471 assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
472 }
473
474 #[tokio::test]
475 async fn test_partial_match() {
476 let json_data = json!([{
477 "name": "test",
478 "prefix": "test.",
479 "mappings": [
480 {
481 "match": "test.job.duration.*.*",
482 "name": "test.job.duration",
483 "tags": {
484 "job_type": "$1"
485 }
486 },
487 {
488 "match": "test.task.duration.*.*",
489 "name": "test.task.duration",
490 }
491 ]
492 }]);
493 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
494 let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
495 let context = mapper.try_map(metric.context()).expect("should have remapped");
496 assert_eq!(context.name(), "test.job.duration");
497 assert!(context.tags().has_tag("job_type:my_job_type"));
498
499 let metric = counter_metric("test.task.duration.my_job_type.my_job_name", &[]);
500 let context = mapper.try_map(metric.context()).expect("should have remapped");
501 assert_eq!(context.name(), "test.task.duration");
502 }
503
504 #[tokio::test]
505 async fn test_use_regex_expansion_alternative_syntax() {
506 let json_data = json!([{
507 "name": "test",
508 "prefix": "test.",
509 "mappings": [
510 {
511 "match": "test.job.duration.*.*",
512 "name": "test.job.duration",
513 "tags": {
514 "job_type": "${1}_x",
515 "job_name": "${2}_y"
516 }
517 }
518 ]
519 }]);
520
521 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
522
523 let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
524 let context = mapper.try_map(metric.context()).expect("should have remapped");
525 assert_eq!(context.name(), "test.job.duration");
526 assert_tags(&context, &["job_type:my_job_type_x", "job_name:my_job_name_y"]);
527 }
528
529 #[tokio::test]
530 async fn test_expand_name() {
531 let json_data = json!([{
532 "name": "test",
533 "prefix": "test.",
534 "mappings": [
535 {
536 "match": "test.job.duration.*.*",
537 "name": "test.hello.$2.$1",
538 "tags": {
539 "job_type": "$1",
540 "job_name": "$2"
541 }
542 }
543 ]
544 }]);
545
546 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
547
548 let metric = counter_metric("test.job.duration.my_job_type.my_job_name", &[]);
549 let context = mapper.try_map(metric.context()).expect("should have remapped");
550 assert_eq!(context.name(), "test.hello.my_job_name.my_job_type");
551 assert_tags(&context, &["job_type:my_job_type", "job_name:my_job_name"]);
552 }
553
554 #[tokio::test]
555 async fn test_match_before_underscore() {
556 let json_data = json!([{
557 "name": "test",
558 "prefix": "test.",
559 "mappings": [
560 {
561 "match": "test.*_start",
562 "name": "test.start",
563 "tags": {
564 "job": "$1"
565 }
566 }
567 ]
568 }]);
569
570 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
571
572 let metric = counter_metric("test.my_job_start", &[]);
573 let context = mapper.try_map(metric.context()).expect("should have remapped");
574 assert_eq!(context.name(), "test.start");
575 assert!(context.tags().has_tag("job:my_job"));
576 }
577
578 #[tokio::test]
579 async fn test_no_tags() {
580 let json_data = json!([{
581 "name": "test",
582 "prefix": "test.",
583 "mappings": [
584 {
585 "match": "test.my-worker.start",
586 "name": "test.worker.start"
587 },
588 {
589 "match": "test.my-worker.stop.*",
590 "name": "test.worker.stop"
591 }
592 ]
593 }]);
594
595 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
596
597 let metric = counter_metric("test.my-worker.start", &[]);
598 let context = mapper.try_map(metric.context()).expect("should have remapped");
599 assert_eq!(context.name(), "test.worker.start");
600 assert!(context.tags().is_empty(), "Expected no tags");
601
602 let metric = counter_metric("test.my-worker.stop.worker-name", &[]);
603 let context = mapper.try_map(metric.context()).expect("should have remapped");
604 assert_eq!(context.name(), "test.worker.stop");
605 assert!(context.tags().is_empty(), "Expected no tags");
606 }
607
608 #[tokio::test]
609 async fn test_all_allowed_characters() {
610 let json_data = json!([{
611 "name": "test",
612 "prefix": "test.",
613 "mappings": [
614 {
615 "match": "test.abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-01234567.*",
616 "name": "test.alphabet"
617 }
618 ]
619 }]);
620
621 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
622
623 let metric = counter_metric(
624 "test.abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-01234567.123",
625 &[],
626 );
627 let context = mapper.try_map(metric.context()).expect("should have remapped");
628 assert_eq!(context.name(), "test.alphabet");
629 assert!(context.tags().is_empty(), "Expected no tags");
630 }
631
632 #[tokio::test]
633 async fn test_regex_match_type() {
634 let json_data = json!([{
635 "name": "test",
636 "prefix": "test.",
637 "mappings": [
638 {
639 "match": "test\\.job\\.duration\\.(.*)",
640 "match_type": "regex",
641 "name": "test.job.duration",
642 "tags": {
643 "job_name": "$1"
644 }
645 },
646 {
647 "match": "test\\.task\\.duration\\.(.*)",
648 "match_type": "regex",
649 "name": "test.task.duration",
650 "tags": {
651 "task_name": "$1"
652 }
653 }
654 ]
655 }]);
656
657 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
658 let metric = counter_metric("test.job.duration.my.funky.job$name-abc/123", &[]);
659 let context = mapper.try_map(metric.context()).expect("should have remapped");
660 assert_eq!(context.name(), "test.job.duration");
661 assert!(context.tags().has_tag("job_name:my.funky.job$name-abc/123"));
662
663 let metric = counter_metric("test.task.duration.MY_task_name", &[]);
664 let context = mapper.try_map(metric.context()).expect("should have remapped");
665 assert_eq!(context.name(), "test.task.duration");
666 assert!(context.tags().has_tag("task_name:MY_task_name"));
667 }
668
669 #[tokio::test]
670 async fn test_complex_regex_match_type() {
671 let json_data = json!([{
672 "name": "test",
673 "prefix": "test.",
674 "mappings": [
675 {
676 "match": "test\\.job\\.([a-z][0-9]-\\w+)\\.(.*)",
677 "match_type": "regex",
678 "name": "test.job",
679 "tags": {
680 "job_type": "$1",
681 "job_name": "$2"
682 }
683 }
684 ]
685 }]);
686
687 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
688
689 let metric = counter_metric("test.job.a5-foo.bar", &[]);
690 let context = mapper.try_map(metric.context()).expect("should have remapped");
691 assert_eq!(context.name(), "test.job");
692 assert_tags(&context, &["job_type:a5-foo", "job_name:bar"]);
693
694 let metric = counter_metric("test.job.foo.bar-not-match", &[]);
695 assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
696 }
697
698 #[tokio::test]
699 async fn test_profile_and_prefix() {
700 let json_data = json!([{
701 "name": "test",
702 "prefix": "foo.",
703 "mappings": [
704 {
705 "match": "foo.duration.*",
706 "name": "foo.duration",
707 "tags": {
708 "name": "$1"
709 }
710 }
711 ]
712 },
713 {
714 "name": "test",
715 "prefix": "bar.",
716 "mappings": [
717 {
718 "match": "bar.count.*",
719 "name": "bar.count",
720 "tags": {
721 "name": "$1"
722 }
723 },
724 {
725 "match": "foo.duration2.*",
726 "name": "foo.duration2",
727 "tags": {
728 "name": "$1"
729 }
730 }
731 ]
732 }]);
733
734 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
735
736 let metric = counter_metric("foo.duration.foo_name1", &[]);
737 let context = mapper.try_map(metric.context()).expect("should have remapped");
738 assert_eq!(context.name(), "foo.duration");
739 assert!(context.tags().has_tag("name:foo_name1"));
740
741 let metric = counter_metric("foo.duration2.foo_name1", &[]);
742 assert!(
743 mapper.try_map(metric.context()).is_none(),
744 "should not have remapped due to wrong group"
745 );
746
747 let metric = counter_metric("bar.count.bar_name1", &[]);
748 let context = mapper.try_map(metric.context()).expect("should have remapped");
749 assert_eq!(context.name(), "bar.count");
750 assert!(context.tags().has_tag("name:bar_name1"));
751
752 let metric = counter_metric("z.not.mapped", &[]);
753 assert!(mapper.try_map(metric.context()).is_none(), "should not have remapped");
754 }
755
756 #[tokio::test]
757 async fn test_wildcard_prefix() {
758 let json_data = json!([{
759 "name": "test",
760 "prefix": "*",
761 "mappings": [
762 {
763 "match": "foo.duration.*",
764 "name": "foo.duration",
765 "tags": {
766 "name": "$1"
767 }
768 }
769 ]
770 }]);
771
772 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
773
774 let metric = counter_metric("foo.duration.foo_name1", &[]);
775 let context = mapper.try_map(metric.context()).expect("should have remapped");
776 assert_eq!(context.name(), "foo.duration");
777 assert!(context.tags().has_tag("name:foo_name1"));
778 }
779
780 #[tokio::test]
781 async fn test_wildcard_prefix_order() {
782 let json_data = json!([{
783 "name": "test",
784 "prefix": "*",
785 "mappings": [
786 {
787 "match": "foo.duration.*",
788 "name": "foo.duration",
789 "tags": {
790 "name1": "$1"
791 }
792 }
793 ]
794 },
795 {
796 "name": "test",
797 "prefix": "*",
798 "mappings": [
799 {
800 "match": "foo.duration.*",
801 "name": "foo.duration",
802 "tags": {
803 "name2": "$1"
804 }
805 }
806 ]
807 }]);
808
809 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
810 let metric = counter_metric("foo.duration.foo_name", &[]);
811 let context = mapper.try_map(metric.context()).expect("should have remapped");
812 assert_eq!(context.name(), "foo.duration");
813 assert!(context.tags().has_tag("name1:foo_name"));
814 assert!(
815 !context.tags().has_tag("name2:foo_name"),
816 "Only the first matching profile should apply"
817 );
818 }
819
820 #[tokio::test]
821 async fn test_multiple_profiles_order() {
822 let json_data = json!([{
823 "name": "test",
824 "prefix": "foo.",
825 "mappings": [
826 {
827 "match": "foo.*.duration.*",
828 "name": "foo.bar1.duration",
829 "tags": {
830 "bar": "$1",
831 "foo": "$2"
832 }
833 }
834 ]
835 },
836 {
837 "name": "test",
838 "prefix": "foo.bar.",
839 "mappings": [
840 {
841 "match": "foo.bar.duration.*",
842 "name": "foo.bar2.duration",
843 "tags": {
844 "foo_bar": "$1"
845 }
846 }
847 ]
848 }]);
849
850 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
851
852 let metric = counter_metric("foo.bar.duration.foo_name", &[]);
853 let context = mapper.try_map(metric.context()).expect("should have remapped");
854 assert_eq!(context.name(), "foo.bar1.duration");
855 assert_tags(&context, &["bar:bar", "foo:foo_name"]);
856 assert!(
857 !context.tags().has_tag("foo_bar:foo_name"),
858 "Only the first matching profile should apply"
859 );
860 }
861
862 #[tokio::test]
863 async fn test_different_regex_expansion_syntax() {
864 let json_data = json!([{
865 "name": "test",
866 "prefix": "test.",
867 "mappings": [
868 {
869 "match": "test.user.(\\w+).action.(\\w+)",
870 "match_type": "regex",
871 "name": "test.user.action",
872 "tags": {
873 "user": "$1",
874 "action": "$2"
875 }
876 }
877 ]
878 }]);
879
880 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
881 let metric = counter_metric("test.user.john_doe.action.login", &[]);
882 let context = mapper.try_map(metric.context()).expect("should have remapped");
883 assert_eq!(context.name(), "test.user.action");
884 assert_tags(&context, &["user:john_doe", "action:login"]);
885 }
886
887 #[tokio::test]
888 async fn test_retain_existing_tags() {
889 let json_data = json!([{
890 "name": "test",
891 "prefix": "test.",
892 "mappings": [
893 {
894 "match": "test.job.duration.*.*",
895 "name": "test.job.duration.$2",
896 "tags": {
897 "job_type": "$1",
898 "job_name": "$2"
899 }
900 },
901 ]
902 }]);
903 let mut mapper = mapper(json_data).expect("should have parsed mapping config");
904 let metric = counter_metric("test.job.duration.abc.def", &["foo:bar", "baz"]);
905 let context = mapper.try_map(metric.context()).expect("should have remapped");
906 assert_eq!(context.name(), "test.job.duration.def");
907 assert!(context.tags().has_tag("foo:bar"));
908 assert!(context.tags().has_tag("baz"));
909 }
910
911 #[test]
912 fn test_empty_name() {
913 let json_data = json!([{
914 "name": "test",
915 "prefix": "test.",
916 "mappings": [
917 {
918 "match": "test.job.duration.*.*",
919 "name": "",
920 "tags": {
921 "job_type": "$1"
922 }
923 }
924 ]
925 }]);
926 assert!(mapper(json_data).is_err())
927 }
928
929 #[test]
930 fn test_missing_name() {
931 let json_data = json!([{
932 "name": "test",
933 "prefix": "test.",
934 "mappings": [
935 {
936 "match": "test.job.duration.*.*",
937 "tags": {
938 "job_type": "$1",
939 "job_name": "$2"
940 }
941 }
942 ]
943 }]);
944 assert!(mapper(json_data).is_err());
945 }
946
947 #[test]
948 fn test_invalid_match_regex_brackets() {
949 let json_data = json!([{
950 "name": "test",
951 "prefix": "test.",
952 "mappings": [
953 {
954 "match": "test.[]duration.*.*", "name": "test.job.duration"
956 }
957 ]
958 }]);
959 assert!(mapper(json_data).is_err());
960 }
961
962 #[test]
963 fn test_invalid_match_regex_caret() {
964 let json_data = json!([{
965 "name": "test",
966 "prefix": "test.",
967 "mappings": [
968 {
969 "match": "^test.invalid.duration.*.*", "name": "test.job.duration"
971 }
972 ]
973 }]);
974 assert!(mapper(json_data).is_err());
975 }
976
977 #[test]
978 fn test_consecutive_wildcards() {
979 let json_data = json!([{
980 "name": "test",
981 "prefix": "test.",
982 "mappings": [
983 {
984 "match": "test.invalid.duration.**", "name": "test.job.duration"
986 }
987 ]
988 }]);
989 assert!(mapper(json_data).is_err());
990 }
991
992 #[test]
993 fn test_invalid_match_type() {
994 let json_data = json!([{
995 "name": "test",
996 "prefix": "test.",
997 "mappings": [
998 {
999 "match": "test.invalid.duration",
1000 "match_type": "invalid", "name": "test.job.duration"
1002 }
1003 ]
1004 }]);
1005 assert!(mapper(json_data).is_err());
1006 }
1007
1008 #[test]
1009 fn test_missing_profile_name() {
1010 let json_data = json!([{
1011 "prefix": "test.",
1013 "mappings": [
1014 {
1015 "match": "test.invalid.duration",
1016 "match_type": "invalid",
1017 "name": "test.job.duration"
1018 }
1019 ]
1020 }]);
1021 assert!(mapper(json_data).is_err());
1022 }
1023
1024 #[test]
1025 fn test_missing_profile_prefix() {
1026 let json_data = json!([{
1027 "name": "test",
1028 "mappings": [
1030 {
1031 "match": "test.invalid.duration",
1032 "match_type": "invalid",
1033 "name": "test.job.duration"
1034 }
1035 ]
1036 }]);
1037 assert!(mapper(json_data).is_err());
1038 }
1039
1040 fn simple_mapping_profile() -> Value {
1041 json!([{
1042 "name": "test",
1043 "prefix": "test.",
1044 "mappings": [
1045 {
1046 "match": "test.job.duration.*.*",
1047 "name": "test.job.duration",
1048 "tags": {
1049 "job_type": "$1",
1050 "job_name": "$2"
1051 }
1052 }
1053 ]
1054 }])
1055 }
1056
1057 #[tokio::test]
1058 async fn test_cache_hit_returns_same_result_as_miss() {
1059 let mut mapper = mapper_with_cache(simple_mapping_profile(), 1000).expect("should have parsed mapping config");
1060 assert_eq!(mapper.cache_len(), Some(0));
1061
1062 let metric = counter_metric("test.job.duration.my_type.my_name", &[]);
1063 let first = mapper.try_map(metric.context()).expect("should have remapped");
1064 assert_eq!(mapper.cache_len(), Some(1));
1065
1066 let metric = counter_metric("test.job.duration.my_type.my_name", &[]);
1067 let second = mapper.try_map(metric.context()).expect("should have remapped");
1068 assert_eq!(mapper.cache_len(), Some(1));
1069
1070 assert_eq!(first.name(), second.name());
1071 assert_eq!(first.name(), "test.job.duration");
1072 assert_tags(&first, &["job_type:my_type", "job_name:my_name"]);
1073 assert_tags(&second, &["job_type:my_type", "job_name:my_name"]);
1074 }
1075
1076 #[tokio::test]
1077 async fn test_negative_cache() {
1078 let mut mapper = mapper_with_cache(simple_mapping_profile(), 1000).expect("should have parsed mapping config");
1079
1080 let metric = counter_metric("unrelated.metric.name", &[]);
1081 assert!(mapper.try_map(metric.context()).is_none());
1082 assert_eq!(mapper.cache_len(), Some(1));
1083
1084 let metric = counter_metric("unrelated.metric.name", &[]);
1085 assert!(mapper.try_map(metric.context()).is_none());
1086 assert_eq!(mapper.cache_len(), Some(1));
1087 }
1088
1089 #[tokio::test]
1090 async fn test_cache_disabled_when_zero() {
1091 let mut mapper = mapper_with_cache(simple_mapping_profile(), 0).expect("should have parsed mapping config");
1092 assert_eq!(mapper.cache_len(), None);
1093
1094 let metric = counter_metric("test.job.duration.my_type.my_name", &[]);
1095 let context = mapper.try_map(metric.context()).expect("should have remapped");
1096 assert_eq!(context.name(), "test.job.duration");
1097 assert_tags(&context, &["job_type:my_type", "job_name:my_name"]);
1098
1099 assert!(mapper
1100 .try_map(counter_metric("unrelated.metric", &[]).context())
1101 .is_none());
1102 assert_eq!(mapper.cache_len(), None);
1103 }
1104
1105 #[tokio::test]
1106 async fn test_cache_eviction() {
1107 let mut mapper = mapper_with_cache(simple_mapping_profile(), 2).expect("should have parsed mapping config");
1108
1109 for suffix in ["a", "b", "c"] {
1110 let name = format!("test.job.duration.t.{}", suffix);
1111 let metric = counter_metric(Box::leak(name.into_boxed_str()), &[]);
1112 mapper.try_map(metric.context()).expect("should have remapped");
1113 }
1114
1115 assert!(
1116 mapper.cache_len().unwrap() <= 2,
1117 "cache should not exceed configured capacity (got {})",
1118 mapper.cache_len().unwrap()
1119 );
1120 }
1121
1122 #[tokio::test]
1123 async fn test_flood_does_not_grow_cache() {
1124 let mut profiles: Vec<Value> = (0..50)
1127 .map(|i| {
1128 json!({
1129 "name": format!("noise-{}", i),
1130 "prefix": format!("noise{}.", i),
1131 "mappings": [{
1132 "match": format!("noise{}.*", i),
1133 "name": "noise.mapped"
1134 }]
1135 })
1136 })
1137 .collect();
1138 profiles.push(json!({
1139 "name": "real",
1140 "prefix": "real.",
1141 "mappings": [{
1142 "match": "real.metric.*",
1143 "name": "real.mapped",
1144 "tags": { "x": "$1" }
1145 }]
1146 }));
1147 let json_data = Value::Array(profiles);
1148
1149 let mut mapper = mapper_with_cache(json_data, 16).expect("should have parsed mapping config");
1150
1151 for _ in 0..10_000 {
1152 let metric = counter_metric("real.metric.flood", &[]);
1153 let context = mapper.try_map(metric.context()).expect("should have remapped");
1154 assert_eq!(context.name(), "real.mapped");
1155 }
1156
1157 assert_eq!(
1158 mapper.cache_len(),
1159 Some(1),
1160 "flood of identical names should populate exactly one cache entry"
1161 );
1162 }
1163}
1164
1165#[cfg(test)]
1166mod config_smoke {
1167 use serde_json::json;
1168
1169 use super::DogStatsDMapperConfiguration;
1170 use crate::config_registry::structs;
1171 use crate::config_registry::test_support::run_config_smoke_tests;
1172
1173 #[tokio::test]
1174 async fn smoke_test() {
1175 run_config_smoke_tests(structs::DOGSTATSD_MAPPER_CONFIGURATION, &[], json!({}), |cfg| {
1176 cfg.as_typed::<DogStatsDMapperConfiguration>()
1177 .expect("DogStatsDMapperConfiguration should deserialize")
1178 })
1179 .await
1180 }
1181}