saluki_context/
resolver.rs

1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
2
3use saluki_common::{
4    cache::{weight::ItemCountWeighter, Cache, CacheBuilder},
5    collections::PrehashedHashSet,
6    hash::NoopU64BuildHasher,
7};
8use saluki_error::{generic_error, GenericError};
9use saluki_metrics::static_metrics;
10use stringtheory::{
11    interning::{GenericMapInterner, Interner as _},
12    CheapMetaString, MetaString,
13};
14use tokio::time::sleep;
15use tracing::debug;
16
17use crate::{
18    context::{Context, ContextInner},
19    hash::{hash_context_with_seen, ContextKey, TagSetKey},
20    origin::{OriginTagsResolver, RawOrigin},
21    tags::{SharedTagSet, TagSet},
22};
23
24// SAFETY: We know, unquestionably, that this value is not zero.
25const DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT: NonZeroUsize = NonZeroUsize::new(500_000).unwrap();
26
27// SAFETY: We know, unquestionably, that this value is not zero.
28const DEFAULT_CONTEXT_RESOLVER_INTERNER_CAPACITY_BYTES: NonZeroUsize = NonZeroUsize::new(2 * 1024 * 1024).unwrap();
29
30const SEEN_HASHSET_INITIAL_CAPACITY: usize = 128;
31
32type ContextCache = Cache<ContextKey, Context, ItemCountWeighter, NoopU64BuildHasher>;
33type TagSetCache = Cache<TagSetKey, SharedTagSet, ItemCountWeighter, NoopU64BuildHasher>;
34
35static_metrics! {
36    name => Telemetry,
37    prefix => context_resolver,
38    labels => [resolver_id: String],
39    metrics => [
40        gauge(interner_capacity_bytes),
41        gauge(interner_len_bytes),
42        gauge(interner_entries),
43        counter(intern_fallback_total),
44
45        counter(resolved_existing_context_total),
46        counter(resolved_new_context_total),
47        gauge(active_contexts),
48
49        counter(resolved_existing_tagset_total),
50        counter(resolved_new_tagset_total),
51    ],
52}
53
54/// Builder for creating a [`ContextResolver`].
55///
56/// # Missing
57///
58/// - Support for configuring the size limit of cached contexts.
59pub struct ContextResolverBuilder {
60    name: String,
61    caching_enabled: bool,
62    cached_contexts_limit: Option<NonZeroUsize>,
63    idle_context_expiration: Option<Duration>,
64    interner_capacity_bytes: Option<NonZeroUsize>,
65    allow_heap_allocations: Option<bool>,
66    tags_resolver: Option<TagsResolver>,
67    interner: Option<GenericMapInterner>,
68    origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
69    telemetry_enabled: bool,
70}
71
72impl ContextResolverBuilder {
73    /// Creates a new `ContextResolverBuilder` with the given resolver name.
74    ///
75    /// The resolver name _should_ be unique, but it is not required to be. Metrics for the resolver will be
76    /// emitted using the given name, so in cases where the name is not unique, those metrics will be aggregated
77    /// together and it will not be possible to distinguish between the different resolvers.
78    ///
79    /// # Errors
80    ///
81    /// If the given resolver name is empty, an error is returned.
82    pub fn from_name<S: Into<String>>(name: S) -> Result<Self, GenericError> {
83        let name = name.into();
84        if name.is_empty() {
85            return Err(generic_error!("resolver name must not be empty"));
86        }
87
88        Ok(Self {
89            name,
90            caching_enabled: true,
91            cached_contexts_limit: None,
92            idle_context_expiration: None,
93            interner_capacity_bytes: None,
94            allow_heap_allocations: None,
95            tags_resolver: None,
96            interner: None,
97            origin_tags_resolver: None,
98            telemetry_enabled: true,
99        })
100    }
101
102    /// Sets whether or not to enable caching of resolved contexts.
103    ///
104    /// [`ContextResolver`] provides two main benefits: consistent behavior for resolving contexts (interning, origin
105    /// tags, etc), and the caching of those resolved contexts to speed up future resolutions. However, caching contexts
106    /// means that we pay a memory cost for the cache itself, even if the contexts are not ever reused or are seen
107    /// infrequently. While expiration can help free up cache capacity, it cannot help recover the memory used by the
108    /// underlying cache data structure once they have expanded to hold the contexts.
109    ///
110    /// Disabling caching allows normal resolving to take place without the overhead of caching the contexts. This can
111    /// lead to lower average memory usage, as contexts will only live as long as they are needed, but it will reduce
112    /// memory determinism as memory will be allocated for every resolved context (minus interned strings), which means
113    /// that resolving the same context ten times in a row will result in ten separate allocations, and so on.
114    ///
115    /// Defaults to caching enabled.
116    pub fn without_caching(mut self) -> Self {
117        self.caching_enabled = false;
118        self.idle_context_expiration = None;
119        self
120    }
121
122    /// Sets the limit on the number of cached contexts.
123    ///
124    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
125    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
126    /// and whether or not heap allocations are allowed.
127    ///
128    /// Caching contexts is beneficial when the same context is resolved frequently, and it is generally worth
129    /// allowing for higher limits on cached contexts when heap allocations are allowed, as this can better amortize the
130    /// cost of those heap allocations.
131    ///
132    /// If value is zero, caching will be disabled, and no contexts will be cached. This is equivalent to calling
133    /// `without_caching`.
134    ///
135    /// Defaults to 500,000.
136    pub fn with_cached_contexts_limit(mut self, limit: usize) -> Self {
137        match NonZeroUsize::new(limit) {
138            Some(limit) => {
139                self.cached_contexts_limit = Some(limit);
140                self
141            }
142            None => self.without_caching(),
143        }
144    }
145
146    /// Sets the time before contexts are considered "idle" and eligible for expiration.
147    ///
148    /// This controls how long a context will be kept in the cache after its last access or creation time. This value is
149    /// a lower bound, as contexts eligible for expiration may not be expired immediately. Contexts may still be removed
150    /// prior to their natural expiration time if the cache is full and evictions are required to make room for a new
151    /// context.
152    ///
153    /// Defaults to no expiration.
154    pub fn with_idle_context_expiration(mut self, time_to_idle: Duration) -> Self {
155        self.idle_context_expiration = Some(time_to_idle);
156        self
157    }
158
159    /// Sets the capacity of the string interner, in bytes.
160    ///
161    /// This is the maximum number of bytes that the interner will use for interning strings that are present in
162    /// contexts being resolved. This capacity may or may not be allocated entirely when the resolver is built, but the
163    /// interner will not exceed the configured capacity when allocating any backing storage.
164    ///
165    /// This value directly impacts the number of contexts that can be resolved when heap allocations are disabled, as
166    /// all resolved contexts must either have values (name or tags) that can be inlined or interned. Once the interner
167    /// is full, contexts may fail to be resolved if heap allocations are disabled.
168    ///
169    /// The optimal value will almost always be workload-dependent, but a good starting point can be to estimate around
170    /// 150 - 200 bytes per context based on empirical measurements around common metric name and tag lengths. This
171    /// translate to around 5000 unique contexts per 1MB of interner size.
172    ///
173    /// Defaults to 2MB.
174    pub fn with_interner_capacity_bytes(mut self, capacity: NonZeroUsize) -> Self {
175        self.interner_capacity_bytes = Some(capacity);
176        self
177    }
178
179    /// Sets whether or not to allow heap allocations when interning strings.
180    ///
181    /// In cases where the interner is full, this setting determines whether or not we refuse to resolve a context, or
182    /// if we allow it be resolved by allocating strings on the heap. When heap allocations are enabled, the amount of
183    /// memory that can be used by the interner is effectively unlimited, as contexts that cannot be interned will be
184    /// simply spill to the heap instead of being limited in any way.
185    ///
186    /// Defaults to `true`.
187    pub fn with_heap_allocations(mut self, allow: bool) -> Self {
188        self.allow_heap_allocations = Some(allow);
189        self
190    }
191
192    /// Sets the tags resolver.
193    ///
194    /// Defaults to unset.
195    pub fn with_tags_resolver(mut self, resolver: Option<TagsResolver>) -> Self {
196        self.tags_resolver = resolver;
197        self
198    }
199
200    /// Sets whether or not to enable telemetry for this resolver.
201    ///
202    /// Reporting the telemetry of the resolver requires running an asynchronous task to override adding additional
203    /// overhead in the hot path of resolving contexts. In some cases, it may be cumbersome to always create the
204    /// resolver in an asynchronous context so that the telemetry task can be spawned. This method allows disabling
205    /// telemetry reporting in those cases.
206    ///
207    /// Defaults to telemetry enabled.
208    pub fn without_telemetry(mut self) -> Self {
209        self.telemetry_enabled = false;
210        self
211    }
212
213    /// Sets the interner to use for this resolver.
214    ///
215    /// If an interner is not provided, an interner will be created in [`ContextResolverBuilder::build`]
216    pub fn with_interner(mut self, interner: GenericMapInterner) -> Self {
217        self.interner = Some(interner);
218        self
219    }
220
221    /// Configures a [`ContextResolverBuilder`] that is suitable for tests.
222    ///
223    /// This configures the builder with the following defaults:
224    ///
225    /// - resolver name of "noop"
226    /// - unlimited cache capacity
227    /// - no-op interner (all strings are heap-allocated)
228    /// - heap allocations allowed
229    /// - telemetry disabled
230    ///
231    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate
232    /// testing scenarios.
233    pub fn for_tests() -> Self {
234        ContextResolverBuilder::from_name("noop")
235            .expect("resolver name not empty")
236            .with_cached_contexts_limit(usize::MAX)
237            .with_interner_capacity_bytes(NonZeroUsize::new(1).expect("not zero"))
238            .with_heap_allocations(true)
239            .with_tags_resolver(Some(TagsResolverBuilder::for_tests().build()))
240            .without_telemetry()
241    }
242
243    /// Builds a [`ContextResolver`] from the current configuration.
244    pub fn build(self) -> ContextResolver {
245        let interner_capacity_bytes = self
246            .interner_capacity_bytes
247            .unwrap_or(DEFAULT_CONTEXT_RESOLVER_INTERNER_CAPACITY_BYTES);
248
249        let interner = match self.interner {
250            Some(interner) => interner,
251            None => GenericMapInterner::new(interner_capacity_bytes),
252        };
253
254        let cached_context_limit = self
255            .cached_contexts_limit
256            .unwrap_or(DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT);
257
258        let allow_heap_allocations = self.allow_heap_allocations.unwrap_or(true);
259
260        let telemetry = Telemetry::new(self.name.clone());
261        telemetry
262            .interner_capacity_bytes()
263            .set(interner.capacity_bytes() as f64);
264
265        // NOTE: We should switch to using a size-based weighter so that we can do more firm bounding of what we cache.
266        let context_cache = CacheBuilder::from_identifier(format!("{}/contexts", self.name))
267            .expect("cache identifier cannot possibly be empty")
268            .with_capacity(cached_context_limit)
269            .with_time_to_idle(self.idle_context_expiration)
270            .with_hasher::<NoopU64BuildHasher>()
271            .with_telemetry(self.telemetry_enabled)
272            .build();
273
274        // If no tags resolver is provided, we need to create one using the same interner used for the context resolver.
275        let tags_resolver = match self.tags_resolver {
276            Some(tags_resolver) => tags_resolver,
277            None => TagsResolverBuilder::new(format!("{}/tags", self.name), interner.clone())
278                .expect("tags resolver name not empty")
279                .with_cached_tagsets_limit(cached_context_limit.get())
280                .with_idle_tagsets_expiration(self.idle_context_expiration.unwrap_or_default())
281                .with_heap_allocations(allow_heap_allocations)
282                .with_origin_tags_resolver(self.origin_tags_resolver.clone())
283                .build(),
284        };
285
286        if self.telemetry_enabled {
287            tokio::spawn(drive_telemetry(interner.clone(), telemetry.clone()));
288        }
289
290        ContextResolver {
291            telemetry,
292            interner,
293            caching_enabled: self.caching_enabled,
294            context_cache,
295            hash_seen_buffer: PrehashedHashSet::with_capacity_and_hasher(
296                SEEN_HASHSET_INITIAL_CAPACITY,
297                NoopU64BuildHasher,
298            ),
299            allow_heap_allocations,
300            tags_resolver,
301        }
302    }
303}
304
305/// A centralized store for resolved contexts.
306///
307/// Contexts are the combination of a name and a set of tags. They are used to identify a specific metric series. As contexts
308/// are constructed entirely of strings, they are expensive to construct in a way that allows sending between tasks, as
309/// this usually requires allocations. Even further, the same context may be "hot", used frequently by the
310/// applications/services sending us metrics.
311///
312/// In order to optimize this, the context resolver is responsible for both interning the strings involved where
313/// possible, as well as keeping a map of contexts that can be referred to with a cheap handle. We can cheaply search
314/// for an existing context without needing to allocate an entirely new one, and get a clone of the handle to use going
315/// forward.
316///
317/// # Design
318///
319/// `ContextResolver` specifically manages interning and mapping of contexts. It can be cheaply cloned itself.
320///
321/// In order to resolve a context, `resolve` must be called which requires taking a lock to check for an existing
322/// context. A read/write lock is used in order to prioritize lookups over inserts, as lookups are expected to be more
323/// common given how often a given context is used and resolved.
324///
325/// Once a context is resolved, a cheap handle -- `Context` -- is returned. This handle, like `ContextResolver`, can be
326/// cheaply cloned. It points directly to the underlying context data (name and tags) and provides access to these
327/// components.
328pub struct ContextResolver {
329    telemetry: Telemetry,
330    interner: GenericMapInterner,
331    caching_enabled: bool,
332    context_cache: ContextCache,
333    hash_seen_buffer: PrehashedHashSet<u64>,
334    allow_heap_allocations: bool,
335    tags_resolver: TagsResolver,
336}
337
338impl ContextResolver {
339    fn intern<S>(&self, s: S) -> Option<MetaString>
340    where
341        S: AsRef<str> + CheapMetaString,
342    {
343        // Try to cheaply clone the string, and if that fails, try to intern it. If that fails, then we fall back to
344        // allocating it on the heap if we allow it.
345        s.try_cheap_clone()
346            .or_else(|| self.interner.try_intern(s.as_ref()).map(MetaString::from))
347            .or_else(|| {
348                self.allow_heap_allocations.then(|| {
349                    self.telemetry.intern_fallback_total().increment(1);
350                    MetaString::from(s.as_ref())
351                })
352            })
353    }
354
355    fn create_context_key<N, I, I2, T, T2>(&mut self, name: N, tags: I, origin_tags: I2) -> (ContextKey, TagSetKey)
356    where
357        N: AsRef<str>,
358        I: IntoIterator<Item = T>,
359        T: AsRef<str>,
360        I2: IntoIterator<Item = T2>,
361        T2: AsRef<str>,
362    {
363        hash_context_with_seen(name.as_ref(), tags, origin_tags, &mut self.hash_seen_buffer)
364    }
365
366    fn create_context<N>(
367        &self, key: ContextKey, name: N, context_tags: SharedTagSet, origin_tags: SharedTagSet,
368    ) -> Option<Context>
369    where
370        N: AsRef<str> + CheapMetaString,
371    {
372        // Intern the name and tags of the context.
373        let context_name = self.intern(name)?;
374
375        self.telemetry.resolved_new_context_total().increment(1);
376        self.telemetry.active_contexts().increment(1);
377
378        Some(Context::from_inner(ContextInner::from_parts(
379            key,
380            context_name,
381            context_tags,
382            origin_tags,
383            self.telemetry.active_contexts().clone(),
384        )))
385    }
386
387    /// Resolves the given context.
388    ///
389    /// If the context has not yet been resolved, the name and tags are interned and a new context is created and
390    /// stored. Otherwise, the existing context is returned. If an origin tags resolver is configured, and origin info
391    /// is available, any enriched tags will be added to the context.
392    ///
393    /// `None` may be returned if the interner is full and outside allocations are disallowed. See
394    /// `allow_heap_allocations` for more information.
395    pub fn resolve<N, I, T>(&mut self, name: N, tags: I, maybe_origin: Option<RawOrigin<'_>>) -> Option<Context>
396    where
397        N: AsRef<str> + CheapMetaString,
398        I: IntoIterator<Item = T> + Clone,
399        T: AsRef<str> + CheapMetaString,
400    {
401        // Try and resolve our origin tags from the provided origin information, if any.
402        let origin_tags = self.tags_resolver.resolve_origin_tags(maybe_origin);
403
404        self.resolve_inner(name, tags, origin_tags)
405    }
406
407    /// Resolves the given context using the provided origin tags.
408    ///
409    /// If the context has not yet been resolved, the name and tags are interned and a new context is created and
410    /// stored. Otherwise, the existing context is returned. The provided origin tags are used to enrich the context.
411    ///
412    /// `None` may be returned if the interner is full and outside allocations are disallowed. See
413    /// `allow_heap_allocations` for more information.
414    ///
415    /// ## Origin tags resolver mismatch
416    ///
417    /// When passing in origin tags, they will be inherently tied to a specific `OriginTagsResolver`, which may
418    /// differ from the configured origin tags resolver in this context resolver. This means that the context that is
419    /// generated and cached may not be reused in the future if an attempt is made to resolve it using the raw origin
420    /// information instead.
421    ///
422    /// This method is intended primarily to allow for resolving contexts in a consistent way while _reusing_ the origin
423    /// tags from another context, such as when remapping the name and/or instrumented tags of a given metric, while
424    /// maintaining its origin association.
425    pub fn resolve_with_origin_tags<N, I, T>(&mut self, name: N, tags: I, origin_tags: SharedTagSet) -> Option<Context>
426    where
427        N: AsRef<str> + CheapMetaString,
428        I: IntoIterator<Item = T> + Clone,
429        T: AsRef<str> + CheapMetaString,
430    {
431        self.resolve_inner(name, tags, origin_tags)
432    }
433
434    fn resolve_inner<N, I, T>(&mut self, name: N, tags: I, origin_tags: SharedTagSet) -> Option<Context>
435    where
436        N: AsRef<str> + CheapMetaString,
437        I: IntoIterator<Item = T> + Clone,
438        T: AsRef<str> + CheapMetaString,
439    {
440        let (context_key, tagset_key) = self.create_context_key(&name, tags.clone(), &origin_tags);
441
442        // Fast path to avoid looking up the context in the cache if caching is disabled.
443        if !self.caching_enabled {
444            let tag_set = self.tags_resolver.create_tag_set(tags).unwrap_or_default();
445
446            let context = self.create_context(context_key, name, tag_set, origin_tags)?;
447
448            debug!(?context_key, ?context, "Resolved new non-cached context.");
449            return Some(context);
450        }
451
452        match self.context_cache.get(&context_key) {
453            Some(context) => {
454                self.telemetry.resolved_existing_context_total().increment(1);
455                Some(context)
456            }
457            None => {
458                // Try seeing if we have the tagset cached already, and create it if not.
459                let tag_set = match self.tags_resolver.get_tag_set(tagset_key) {
460                    Some(tag_set) => {
461                        self.telemetry.resolved_existing_tagset_total().increment(1);
462                        tag_set
463                    }
464                    None => {
465                        // If the tagset is not cached, we need to create it.
466                        let tag_set = self.tags_resolver.create_tag_set(tags.clone()).unwrap_or_default();
467
468                        self.tags_resolver.insert_tag_set(tagset_key, tag_set.clone());
469
470                        tag_set
471                    }
472                };
473
474                let context = self.create_context(context_key, name, tag_set, origin_tags)?;
475                self.context_cache.insert(context_key, context.clone());
476
477                debug!(?context_key, ?context, "Resolved new context.");
478                Some(context)
479            }
480        }
481    }
482}
483
484impl Clone for ContextResolver {
485    fn clone(&self) -> Self {
486        Self {
487            telemetry: self.telemetry.clone(),
488            interner: self.interner.clone(),
489            caching_enabled: self.caching_enabled,
490            context_cache: self.context_cache.clone(),
491            hash_seen_buffer: PrehashedHashSet::with_capacity_and_hasher(
492                SEEN_HASHSET_INITIAL_CAPACITY,
493                NoopU64BuildHasher,
494            ),
495            allow_heap_allocations: self.allow_heap_allocations,
496            tags_resolver: self.tags_resolver.clone(),
497        }
498    }
499}
500
501async fn drive_telemetry(interner: GenericMapInterner, telemetry: Telemetry) {
502    loop {
503        sleep(Duration::from_secs(1)).await;
504
505        telemetry.interner_entries().set(interner.len() as f64);
506        telemetry
507            .interner_capacity_bytes()
508            .set(interner.capacity_bytes() as f64);
509        telemetry.interner_len_bytes().set(interner.len_bytes() as f64);
510    }
511}
512
513/// A builder for a tag resolver.
514pub struct TagsResolverBuilder {
515    name: String,
516    caching_enabled: bool,
517    cached_tagset_limit: Option<NonZeroUsize>,
518    idle_tagset_expiration: Option<Duration>,
519    allow_heap_allocations: Option<bool>,
520    origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
521    telemetry_enabled: bool,
522    interner: GenericMapInterner,
523}
524
525impl TagsResolverBuilder {
526    /// Creates a new [`TagsResolverBuilder`] with the given name and interner.
527    pub fn new<S: Into<String>>(name: S, interner: GenericMapInterner) -> Result<Self, GenericError> {
528        let name = name.into();
529        if name.is_empty() {
530            return Err(generic_error!("resolver name must not be empty"));
531        }
532
533        Ok(Self {
534            name,
535            caching_enabled: true,
536            cached_tagset_limit: None,
537            idle_tagset_expiration: None,
538            allow_heap_allocations: None,
539            origin_tags_resolver: None,
540            telemetry_enabled: true,
541            interner,
542        })
543    }
544
545    /// Sets the interner to use for this resolver.
546    ///
547    /// This is used when we want to use a separate internet for tagsets, different from the one used for contexts.
548    ///
549    /// Defaults to using the interner passed to the builder.
550    pub fn with_interner(mut self, interner: GenericMapInterner) -> Self {
551        self.interner = interner;
552        self
553    }
554
555    /// Sets whether or not to enable caching of resolved tag sets.
556    ///
557    /// [`TagsResolver`] provides two main benefits: consistent behavior for resolving tag sets (interning, origin
558    /// tags, etc), and the caching of those resolved tag sets to speed up future resolutions. However, caching tag
559    /// sets means that we pay a memory cost for the cache itself, even if the tag sets are not ever reused or are seen
560    /// infrequently. While expiration can help free up cache capacity, it cannot help recover the memory used by the
561    /// underlying cache data structure once they have expanded to hold the tag sets.
562    ///
563    /// Disabling caching allows normal resolving to take place without the overhead of caching the tag sets. This can
564    /// lead to lower average memory usage, as tag sets will only live as long as they are needed, but it will reduce
565    /// memory determinism as memory will be allocated for every resolved tag set (minus interned strings), which means
566    /// that resolving the same tag set ten times in a row will result in ten separate allocations, and so on.
567    ///
568    /// Defaults to caching enabled.
569    pub fn without_caching(mut self) -> Self {
570        self.caching_enabled = false;
571        self.idle_tagset_expiration = None;
572        self
573    }
574
575    /// Sets the limit on the number of cached tagsets.
576    ///
577    /// This is the maximum number of resolved tag sets that can be cached at any given time. This limit does not affect
578    /// the total number of tag sets that can be _alive_ at any given time, which is dependent on the interner capacity
579    /// and whether or not heap allocations are allowed.
580    ///
581    /// Caching tag sets is beneficial when the same tag set is resolved frequently, and it is generally worth
582    /// allowing for higher limits on cached tag sets when heap allocations are allowed, as this can better amortize the
583    /// cost of those heap allocations.
584    ///
585    /// If value is zero, caching will be disabled, and no tag sets will be cached. This is equivalent to calling
586    /// `without_caching`.
587    ///
588    /// Defaults to 500,000.
589    pub fn with_cached_tagsets_limit(mut self, limit: usize) -> Self {
590        match NonZeroUsize::new(limit) {
591            Some(limit) => {
592                self.cached_tagset_limit = Some(limit);
593                self
594            }
595            None => self.without_caching(),
596        }
597    }
598
599    /// Sets the time before tag sets are considered "idle" and eligible for expiration.
600    ///
601    /// This controls how long a tag set will be kept in the cache after its last access or creation time. This value is
602    /// a lower bound, as tag sets eligible for expiration may not be expired immediately. Tag sets may still be removed
603    /// prior to their natural expiration time if the cache is full and evictions are required to make room for a new
604    /// context.
605    ///
606    /// Defaults to no expiration.
607    pub fn with_idle_tagsets_expiration(mut self, time_to_idle: Duration) -> Self {
608        self.idle_tagset_expiration = Some(time_to_idle);
609        self
610    }
611
612    /// Sets whether or not to allow heap allocations when interning strings.
613    ///
614    /// In cases where the interner is full, this setting determines whether or not we refuse to resolve a context, or
615    /// if we allow it be resolved by allocating strings on the heap. When heap allocations are enabled, the amount of
616    /// memory that can be used by the interner is effectively unlimited, as contexts that cannot be interned will be
617    /// simply spill to the heap instead of being limited in any way.
618    ///
619    /// Defaults to `true`.
620    pub fn with_heap_allocations(mut self, allow: bool) -> Self {
621        self.allow_heap_allocations = Some(allow);
622        self
623    }
624
625    /// Sets the origin tags resolver to use when building a context.
626    ///
627    /// In some cases, metrics, events, and service checks may have enriched tags based on their origin -- the
628    /// application/host/container/etc that emitted the metric -- which has to be considered when building the context
629    /// itself. As this can be expensive, it is useful to split the logic of actually grabbing the enriched tags based
630    /// on the available origin info into a separate phase, and implementation, that can run separately from the
631    /// initial hash-based approach of checking if a context has already been resolved.
632    ///
633    /// When set, any origin information provided will be considered during hashing when looking up a context, and any
634    /// enriched tags attached to the detected origin will be accessible from the context.
635    ///
636    /// Defaults to unset.
637    pub fn with_origin_tags_resolver(mut self, resolver: Option<Arc<dyn OriginTagsResolver>>) -> Self {
638        self.origin_tags_resolver = resolver;
639        self
640    }
641
642    /// Sets whether or not to enable telemetry for this resolver.
643    ///
644    /// Reporting the telemetry of the resolver requires running an asynchronous task to override adding additional
645    /// overhead in the hot path of resolving contexts. In some cases, it may be cumbersome to always create the
646    /// resolver in an asynchronous context so that the telemetry task can be spawned. This method allows disabling
647    /// telemetry reporting in those cases.
648    ///
649    /// Defaults to telemetry enabled.
650    pub fn without_telemetry(mut self) -> Self {
651        self.telemetry_enabled = false;
652        self
653    }
654
655    /// Builds a [`TagsResolver`] from the current configuration.
656    pub fn build(self) -> TagsResolver {
657        let cached_tagsets_limit = self
658            .cached_tagset_limit
659            .unwrap_or(DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT);
660
661        let allow_heap_allocations = self.allow_heap_allocations.unwrap_or(true);
662
663        let telemetry = Telemetry::new(self.name.clone());
664        telemetry
665            .interner_capacity_bytes()
666            .set(self.interner.capacity_bytes() as f64);
667
668        let tagset_cache = CacheBuilder::from_identifier(format!("{}/tagsets", self.name))
669            .expect("cache identifier cannot possibly be empty")
670            .with_capacity(cached_tagsets_limit)
671            .with_time_to_idle(self.idle_tagset_expiration)
672            .with_hasher::<NoopU64BuildHasher>()
673            .with_telemetry(self.telemetry_enabled)
674            .build();
675
676        TagsResolver {
677            telemetry,
678            interner: self.interner,
679            caching_enabled: self.caching_enabled,
680            tagset_cache,
681            origin_tags_resolver: self.origin_tags_resolver,
682            allow_heap_allocations,
683        }
684    }
685
686    /// Configures a [`TagsResolverBuilder`] that is suitable for tests.
687    ///
688    /// This configures the builder with the following defaults:
689    ///
690    /// - resolver name of "noop"
691    /// - unlimited cache capacity
692    /// - no-op interner (all strings are heap-allocated)
693    /// - heap allocations allowed
694    /// - telemetry disabled
695    ///
696    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate
697    /// testing scenarios.
698    pub fn for_tests() -> Self {
699        TagsResolverBuilder::new("noop", GenericMapInterner::new(NonZeroUsize::new(1).expect("not zero")))
700            .expect("resolver name not empty")
701            .with_cached_tagsets_limit(usize::MAX)
702            .with_heap_allocations(true)
703            .without_telemetry()
704    }
705}
706
707/// A resolver for tags.
708pub struct TagsResolver {
709    telemetry: Telemetry,
710    interner: GenericMapInterner,
711    caching_enabled: bool,
712    tagset_cache: TagSetCache,
713    origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
714    allow_heap_allocations: bool,
715}
716
717impl TagsResolver {
718    fn intern<S>(&self, s: S) -> Option<MetaString>
719    where
720        S: AsRef<str> + CheapMetaString,
721    {
722        // Try to cheaply clone the string, and if that fails, try to intern it. If that fails, then we fall back to
723        // allocating it on the heap if we allow it.
724        s.try_cheap_clone()
725            .or_else(|| self.interner.try_intern(s.as_ref()).map(MetaString::from))
726            .or_else(|| {
727                self.allow_heap_allocations.then(|| {
728                    self.telemetry.intern_fallback_total().increment(1);
729                    MetaString::from(s.as_ref())
730                })
731            })
732    }
733
734    /// Creates a new tag set from the given tags.
735    ///
736    /// This will intern the tags, and then return a shared tag set. If the interner is full, and heap allocations are
737    /// not allowed, then this will return `None`.
738    ///
739    /// If heap allocations are allowed, then this will return a shared tag set, and the tag set will be cached.
740    pub fn create_tag_set<I, T>(&mut self, tags: I) -> Option<SharedTagSet>
741    where
742        I: IntoIterator<Item = T>,
743        T: AsRef<str> + CheapMetaString,
744    {
745        let mut tag_set = TagSet::default();
746        for tag in tags {
747            let tag = self.intern(tag)?;
748            tag_set.insert_tag(tag);
749        }
750
751        self.telemetry.resolved_new_tagset_total().increment(1);
752
753        Some(tag_set.into_shared())
754    }
755
756    /// Resolves the origin tags for the given origin.
757    ///
758    /// This will return the origin tags for the given origin, or an empty tag set if no origin tags resolver is set.
759    pub fn resolve_origin_tags(&self, maybe_origin: Option<RawOrigin<'_>>) -> SharedTagSet {
760        self.origin_tags_resolver
761            .as_ref()
762            .and_then(|resolver| maybe_origin.map(|origin| resolver.resolve_origin_tags(origin)))
763            .unwrap_or_default()
764    }
765
766    fn get_tag_set(&self, key: TagSetKey) -> Option<SharedTagSet> {
767        self.tagset_cache.get(&key)
768    }
769
770    fn insert_tag_set(&self, key: TagSetKey, tag_set: SharedTagSet) {
771        self.tagset_cache.insert(key, tag_set);
772    }
773}
774
775impl Clone for TagsResolver {
776    fn clone(&self) -> Self {
777        Self {
778            telemetry: self.telemetry.clone(),
779            interner: self.interner.clone(),
780            caching_enabled: self.caching_enabled,
781            tagset_cache: self.tagset_cache.clone(),
782            origin_tags_resolver: self.origin_tags_resolver.clone(),
783            allow_heap_allocations: self.allow_heap_allocations,
784        }
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use metrics::{SharedString, Unit};
791    use metrics_util::{
792        debugging::{DebugValue, DebuggingRecorder},
793        CompositeKey,
794    };
795    use saluki_common::hash::hash_single_fast;
796
797    use super::*;
798
799    fn get_gauge_value(metrics: &[(CompositeKey, Option<Unit>, Option<SharedString>, DebugValue)], key: &str) -> f64 {
800        metrics
801            .iter()
802            .find(|(k, _, _, _)| k.key().name() == key)
803            .map(|(_, _, _, value)| match value {
804                DebugValue::Gauge(value) => value.into_inner(),
805                other => panic!("expected a gauge, got: {:?}", other),
806            })
807            .unwrap_or_else(|| panic!("no metric found with key: {}", key))
808    }
809
810    struct DummyOriginTagsResolver;
811
812    impl OriginTagsResolver for DummyOriginTagsResolver {
813        fn resolve_origin_tags(&self, origin: RawOrigin<'_>) -> SharedTagSet {
814            let origin_key = hash_single_fast(origin);
815
816            let mut tags = TagSet::default();
817            tags.insert_tag(format!("origin_key:{}", origin_key));
818            tags.into_shared()
819        }
820    }
821
822    #[test]
823    fn basic() {
824        let mut resolver = ContextResolverBuilder::for_tests().build();
825
826        // Create two distinct contexts with the same name but different tags:
827        let name = "metric_name";
828        let tags1: [&str; 0] = [];
829        let tags2 = ["tag1"];
830
831        assert_ne!(&tags1[..], &tags2[..]);
832
833        let context1 = resolver
834            .resolve(name, &tags1[..], None)
835            .expect("should not fail to resolve");
836        let context2 = resolver
837            .resolve(name, &tags2[..], None)
838            .expect("should not fail to resolve");
839
840        // The contexts should not be equal to each other, and should have distinct underlying pointers to the shared
841        // context state:
842        assert_ne!(context1, context2);
843        assert!(!context1.ptr_eq(&context2));
844
845        // If we create the context references again, we _should_ get back the same contexts as before:
846        let context1_redo = resolver
847            .resolve(name, &tags1[..], None)
848            .expect("should not fail to resolve");
849        let context2_redo = resolver
850            .resolve(name, &tags2[..], None)
851            .expect("should not fail to resolve");
852
853        assert_ne!(context1_redo, context2_redo);
854        assert_eq!(context1, context1_redo);
855        assert_eq!(context2, context2_redo);
856        assert!(context1.ptr_eq(&context1_redo));
857        assert!(context2.ptr_eq(&context2_redo));
858    }
859
860    #[test]
861    fn tag_order() {
862        let mut resolver = ContextResolverBuilder::for_tests().build();
863
864        // Create two distinct contexts with the same name and tags, but with the tags in a different order:
865        let name = "metric_name";
866        let tags1 = ["tag1", "tag2"];
867        let tags2 = ["tag2", "tag1"];
868
869        assert_ne!(&tags1[..], &tags2[..]);
870
871        let context1 = resolver
872            .resolve(name, &tags1[..], None)
873            .expect("should not fail to resolve");
874        let context2 = resolver
875            .resolve(name, &tags2[..], None)
876            .expect("should not fail to resolve");
877
878        // The contexts should be equal to each other, and should have the same underlying pointer to the shared context
879        // state:
880        assert_eq!(context1, context2);
881        assert!(context1.ptr_eq(&context2));
882    }
883
884    #[test]
885    fn active_contexts() {
886        let recorder = DebuggingRecorder::new();
887        let snapshotter = recorder.snapshotter();
888
889        // Create our resolver and then create a context, which will have its metrics attached to our local recorder:
890        let context = metrics::with_local_recorder(&recorder, || {
891            let mut resolver = ContextResolverBuilder::for_tests().build();
892            resolver
893                .resolve("name", &["tag"][..], None)
894                .expect("should not fail to resolve")
895        });
896
897        // We should be able to see that the active context count is one, representing the context we created:
898        let metrics_before = snapshotter.snapshot().into_vec();
899        let active_contexts = get_gauge_value(&metrics_before, Telemetry::active_contexts_name());
900        assert_eq!(active_contexts, 1.0);
901
902        // Now drop the context, and observe the active context count is negative one, representing the context we dropped:
903        drop(context);
904        let metrics_after = snapshotter.snapshot().into_vec();
905        let active_contexts = get_gauge_value(&metrics_after, Telemetry::active_contexts_name());
906        assert_eq!(active_contexts, -1.0);
907    }
908
909    #[test]
910    fn duplicate_tags() {
911        let mut resolver = ContextResolverBuilder::for_tests().build();
912
913        // Two contexts with the same name, but each with a different set of duplicate tags:
914        let name = "metric_name";
915        let tags1 = ["tag1"];
916        let tags1_duplicated = ["tag1", "tag1"];
917        let tags2 = ["tag2"];
918        let tags2_duplicated = ["tag2", "tag2"];
919
920        let context1 = resolver
921            .resolve(name, &tags1[..], None)
922            .expect("should not fail to resolve");
923        let context1_duplicated = resolver
924            .resolve(name, &tags1_duplicated[..], None)
925            .expect("should not fail to resolve");
926        let context2 = resolver
927            .resolve(name, &tags2[..], None)
928            .expect("should not fail to resolve");
929        let context2_duplicated = resolver
930            .resolve(name, &tags2_duplicated[..], None)
931            .expect("should not fail to resolve");
932
933        // Each non-duplicated/duplicated context pair should be equal to one another:
934        assert_eq!(context1, context1_duplicated);
935        assert_eq!(context2, context2_duplicated);
936
937        // Each pair should not be equal to the other pair, however.
938        //
939        // What we're asserting here is that, if we didn't handle duplicate tags correctly, the XOR hashing of [tag1,
940        // tag1] and [tag2, tag2] would result in the same hash value, since the second duplicate hash of tag1/tag2
941        // would cancel out the first... and thus all that would be left is the hash of the name itself, which is the
942        // same in this test. This would lead to the contexts being equal, which is obviously wrong.
943        //
944        // If we're handling duplicates properly, then the resulting context hashes _shouldn't_ be equal.
945        assert_ne!(context1, context2);
946        assert_ne!(context1_duplicated, context2_duplicated);
947        assert_ne!(context1, context2_duplicated);
948        assert_ne!(context2, context1_duplicated);
949    }
950
951    #[test]
952    fn differing_origins_with_without_resolver() {
953        // Create a regular context resolver, without any origin tags resolver, which should result in contexts being
954        // the same so long as the name and tags are the same, disregarding any difference in origin information:
955        let mut resolver = ContextResolverBuilder::for_tests().build();
956
957        let name = "metric_name";
958        let tags = ["tag1"];
959        let mut origin1 = RawOrigin::default();
960        origin1.set_container_id("container1");
961        let mut origin2 = RawOrigin::default();
962        origin2.set_container_id("container2");
963
964        let context1 = resolver
965            .resolve(name, &tags[..], Some(origin1.clone()))
966            .expect("should not fail to resolve");
967        let context2 = resolver
968            .resolve(name, &tags[..], Some(origin2.clone()))
969            .expect("should not fail to resolve");
970
971        assert_eq!(context1, context2);
972
973        let tags_resolver = TagsResolverBuilder::for_tests()
974            .with_origin_tags_resolver(Some(Arc::new(DummyOriginTagsResolver)))
975            .build();
976        // Now build a context resolver with an origin tags resolver that trivially returns the hash of the origin info
977        // as a tag, which should result in differeing sets of origin tags between the two origins, thus no longer
978        // comparing as equal:
979        let mut resolver = ContextResolverBuilder::for_tests()
980            .with_tags_resolver(Some(tags_resolver))
981            .build();
982
983        let context1 = resolver
984            .resolve(name, &tags[..], Some(origin1))
985            .expect("should not fail to resolve");
986        let context2 = resolver
987            .resolve(name, &tags[..], Some(origin2))
988            .expect("should not fail to resolve");
989
990        assert_ne!(context1, context2);
991    }
992
993    #[test]
994    fn caching_disabled() {
995        let tags_resolver = TagsResolverBuilder::for_tests()
996            .with_origin_tags_resolver(Some(Arc::new(DummyOriginTagsResolver)))
997            .build();
998        let mut resolver = ContextResolverBuilder::for_tests()
999            .without_caching()
1000            .with_tags_resolver(Some(tags_resolver))
1001            .build();
1002
1003        let name = "metric_name";
1004        let tags = ["tag1"];
1005        let mut origin1 = RawOrigin::default();
1006        origin1.set_container_id("container1");
1007
1008        // Create a context with caching disabled, and verify that the context is not cached:
1009        let context1 = resolver
1010            .resolve(name, &tags[..], Some(origin1.clone()))
1011            .expect("should not fail to resolve");
1012        assert_eq!(resolver.context_cache.len(), 0);
1013
1014        // Create a second context with the same name and tags, and verify that it is not cached:
1015        let context2 = resolver
1016            .resolve(name, &tags[..], Some(origin1))
1017            .expect("should not fail to resolve");
1018        assert_eq!(resolver.context_cache.len(), 0);
1019
1020        // The contexts should be equal to each other, but the underlying `Arc` pointers should be different since
1021        // they're two distinct contexts in terms of not being cached:
1022        assert_eq!(context1, context2);
1023        assert!(!context1.ptr_eq(&context2));
1024    }
1025
1026    #[test]
1027    fn cheaply_cloneable_name_and_tags() {
1028        const BIG_TAG_ONE: &str = "long-tag-that-cannot-be-inlined-just-to-be-doubly-sure-on-top-of-being-static";
1029        const BIG_TAG_TWO: &str = "another-long-boye-that-we-are-also-sure-wont-be-inlined-and-we-stand-on-that";
1030
1031        // Create a context resolver with a proper string interner configured:
1032        let mut resolver = ContextResolverBuilder::for_tests()
1033            .with_interner_capacity_bytes(NonZeroUsize::new(1024).expect("not zero"))
1034            .build();
1035
1036        // Create our context with cheaply cloneable tags, aka static strings:
1037        let name = MetaString::from_static("long-metric-name-that-shouldnt-be-inlined-and-should-end-up-interned");
1038        let tags = [
1039            MetaString::from_static(BIG_TAG_ONE),
1040            MetaString::from_static(BIG_TAG_TWO),
1041        ];
1042        assert!(tags[0].is_cheaply_cloneable());
1043        assert!(tags[1].is_cheaply_cloneable());
1044
1045        // Make sure the interner is empty before we resolve the context, and that it's empty afterwards, since we
1046        // should be able to cheaply clone both the metric name and both tags:
1047        assert_eq!(resolver.interner.len(), 0);
1048        assert_eq!(resolver.interner.len_bytes(), 0);
1049
1050        let context = resolver
1051            .resolve(&name, &tags[..], None)
1052            .expect("should not fail to resolve");
1053        assert_eq!(resolver.interner.len(), 0);
1054        assert_eq!(resolver.interner.len_bytes(), 0);
1055
1056        // And just a sanity check that we have the expected name and tags in the context:
1057        assert_eq!(context.name(), &name);
1058
1059        let context_tags = context.tags();
1060        assert_eq!(context_tags.len(), 2);
1061        assert!(context_tags.has_tag(&tags[0]));
1062        assert!(context_tags.has_tag(&tags[1]));
1063    }
1064}