Skip to main content

saluki_context/
resolver.rs

1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
2
3use saluki_common::{
4    cache::{weight::ItemCountWeighter, Cache, CacheBuilder},
5    collections::PrehashedHashSet,
6    hash::NoopU64BuildHasher,
7};
8use saluki_error::{generic_error, GenericError};
9use saluki_metrics::static_metrics;
10use stringtheory::{
11    interning::{GenericMapInterner, Interner as _},
12    CheapMetaString, MetaString,
13};
14use tokio::time::sleep;
15use tracing::debug;
16
17use crate::{
18    context::{Context, ContextInner},
19    hash::{hash_context_with_seen, ContextKey, TagSetKey},
20    origin::{OriginTagsResolver, RawOrigin},
21    tags::{SharedTagSet, TagSet},
22};
23
24// SAFETY: We know, unquestionably, that this value is not zero.
25const DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT: NonZeroUsize = NonZeroUsize::new(500_000).unwrap();
26
27// SAFETY: We know, unquestionably, that this value is not zero.
28const DEFAULT_CONTEXT_RESOLVER_INTERNER_CAPACITY_BYTES: NonZeroUsize = NonZeroUsize::new(2 * 1024 * 1024).unwrap();
29
30const SEEN_HASHSET_INITIAL_CAPACITY: usize = 128;
31
32type ContextCache = Cache<ContextKey, Context, ItemCountWeighter, NoopU64BuildHasher>;
33type TagSetCache = Cache<TagSetKey, SharedTagSet, ItemCountWeighter, NoopU64BuildHasher>;
34
35static_metrics! {
36    name => Telemetry,
37    prefix => context_resolver,
38    labels => [resolver_id: String],
39    metrics => [
40        gauge(interner_capacity_bytes),
41        gauge(interner_len_bytes),
42        gauge(interner_entries),
43        counter(intern_fallback_total),
44
45        counter(resolved_existing_context_total),
46        counter(resolved_new_context_total),
47        gauge(active_contexts),
48
49        counter(resolved_existing_tagset_total),
50        counter(resolved_new_tagset_total),
51    ],
52}
53
54/// Builder for creating a [`ContextResolver`].
55///
56/// # Missing
57///
58/// - Support for configuring the size limit of cached contexts.
59pub struct ContextResolverBuilder {
60    name: String,
61    caching_enabled: bool,
62    cached_contexts_limit: Option<NonZeroUsize>,
63    idle_context_expiration: Option<Duration>,
64    interner_capacity_bytes: Option<NonZeroUsize>,
65    allow_heap_allocations: Option<bool>,
66    tags_resolver: Option<TagsResolver>,
67    interner: Option<GenericMapInterner>,
68    origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
69    telemetry_enabled: bool,
70}
71
72impl ContextResolverBuilder {
73    /// Creates a new `ContextResolverBuilder` with the given resolver name.
74    ///
75    /// The resolver name _should_ be unique, but it is not required to be. Metrics for the resolver will be
76    /// emitted using the given name, so in cases where the name is not unique, those metrics will be aggregated
77    /// together and it will not be possible to distinguish between the different resolvers.
78    ///
79    /// # Errors
80    ///
81    /// If the given resolver name is empty, an error is returned.
82    pub fn from_name<S: Into<String>>(name: S) -> Result<Self, GenericError> {
83        let name = name.into();
84        if name.is_empty() {
85            return Err(generic_error!("resolver name must not be empty"));
86        }
87
88        Ok(Self {
89            name,
90            caching_enabled: true,
91            cached_contexts_limit: None,
92            idle_context_expiration: None,
93            interner_capacity_bytes: None,
94            allow_heap_allocations: None,
95            tags_resolver: None,
96            interner: None,
97            origin_tags_resolver: None,
98            telemetry_enabled: true,
99        })
100    }
101
102    /// Sets whether or not to enable caching of resolved contexts.
103    ///
104    /// [`ContextResolver`] provides two main benefits: consistent behavior for resolving contexts (interning, origin
105    /// tags, etc), and the caching of those resolved contexts to speed up future resolutions. However, caching contexts
106    /// means that we pay a memory cost for the cache itself, even if the contexts are not ever reused or are seen
107    /// infrequently. While expiration can help free up cache capacity, it cannot help recover the memory used by the
108    /// underlying cache data structure once they have expanded to hold the contexts.
109    ///
110    /// Disabling caching allows normal resolving to take place without the overhead of caching the contexts. This can
111    /// lead to lower average memory usage, as contexts will only live as long as they are needed, but it will reduce
112    /// memory determinism as memory will be allocated for every resolved context (minus interned strings), which means
113    /// that resolving the same context ten times in a row will result in ten separate allocations, and so on.
114    ///
115    /// Defaults to caching enabled.
116    pub fn without_caching(mut self) -> Self {
117        self.caching_enabled = false;
118        self.idle_context_expiration = None;
119        self
120    }
121
122    /// Sets the limit on the number of cached contexts.
123    ///
124    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
125    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
126    /// and whether or not heap allocations are allowed.
127    ///
128    /// Caching contexts is beneficial when the same context is resolved frequently, and it is generally worth
129    /// allowing for higher limits on cached contexts when heap allocations are allowed, as this can better amortize the
130    /// cost of those heap allocations.
131    ///
132    /// If value is zero, caching will be disabled, and no contexts will be cached. This is equivalent to calling
133    /// `without_caching`.
134    ///
135    /// Defaults to 500,000.
136    pub fn with_cached_contexts_limit(mut self, limit: usize) -> Self {
137        match NonZeroUsize::new(limit) {
138            Some(limit) => {
139                self.cached_contexts_limit = Some(limit);
140                self
141            }
142            None => self.without_caching(),
143        }
144    }
145
146    /// Sets the time before contexts are considered "idle" and eligible for expiration.
147    ///
148    /// This controls how long a context will be kept in the cache after its last access or creation time. This value is
149    /// a lower bound, as contexts eligible for expiration may not be expired immediately. Contexts may still be removed
150    /// prior to their natural expiration time if the cache is full and evictions are required to make room for a new
151    /// context.
152    ///
153    /// Defaults to no expiration.
154    pub fn with_idle_context_expiration(mut self, time_to_idle: Duration) -> Self {
155        self.idle_context_expiration = Some(time_to_idle);
156        self
157    }
158
159    /// Sets the capacity of the string interner, in bytes.
160    ///
161    /// This is the maximum number of bytes that the interner will use for interning strings that are present in
162    /// contexts being resolved. This capacity may or may not be allocated entirely when the resolver is built, but the
163    /// interner will not exceed the configured capacity when allocating any backing storage.
164    ///
165    /// This value directly impacts the number of contexts that can be resolved when heap allocations are disabled, as
166    /// all resolved contexts must either have values (name or tags) that can be inlined or interned. Once the interner
167    /// is full, contexts may fail to be resolved if heap allocations are disabled.
168    ///
169    /// The optimal value will almost always be workload-dependent, but a good starting point can be to estimate around
170    /// 150 - 200 bytes per context based on empirical measurements around common metric name and tag lengths. This
171    /// translate to around 5000 unique contexts per 1MB of interner size.
172    ///
173    /// Defaults to 2MB.
174    pub fn with_interner_capacity_bytes(mut self, capacity: NonZeroUsize) -> Self {
175        self.interner_capacity_bytes = Some(capacity);
176        self
177    }
178
179    /// Sets whether or not to allow heap allocations when interning strings.
180    ///
181    /// In cases where the interner is full, this setting determines whether or not we refuse to resolve a context, or
182    /// if we allow it be resolved by allocating strings on the heap. When heap allocations are enabled, the amount of
183    /// memory that can be used by the interner is effectively unlimited, as contexts that cannot be interned will be
184    /// simply spill to the heap instead of being limited in any way.
185    ///
186    /// Defaults to `true`.
187    pub fn with_heap_allocations(mut self, allow: bool) -> Self {
188        self.allow_heap_allocations = Some(allow);
189        self
190    }
191
192    /// Sets the tags resolver.
193    ///
194    /// Defaults to unset.
195    pub fn with_tags_resolver(mut self, resolver: Option<TagsResolver>) -> Self {
196        self.tags_resolver = resolver;
197        self
198    }
199
200    /// Sets whether or not to enable telemetry for this resolver.
201    ///
202    /// Reporting the telemetry of the resolver requires running an asynchronous task to override adding additional
203    /// overhead in the hot path of resolving contexts. In some cases, it may be cumbersome to always create the
204    /// resolver in an asynchronous context so that the telemetry task can be spawned. This method allows disabling
205    /// telemetry reporting in those cases.
206    ///
207    /// Defaults to telemetry enabled.
208    pub fn without_telemetry(mut self) -> Self {
209        self.telemetry_enabled = false;
210        self
211    }
212
213    /// Sets the interner to use for this resolver.
214    ///
215    /// If an interner is not provided, an interner will be created in [`ContextResolverBuilder::build`]
216    pub fn with_interner(mut self, interner: GenericMapInterner) -> Self {
217        self.interner = Some(interner);
218        self
219    }
220
221    /// Configures a [`ContextResolverBuilder`] that is suitable for tests.
222    ///
223    /// This configures the builder with the following defaults:
224    ///
225    /// - resolver name of "noop"
226    /// - unlimited cache capacity
227    /// - no-op interner (all strings are heap-allocated)
228    /// - heap allocations allowed
229    /// - telemetry disabled
230    ///
231    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate
232    /// testing scenarios.
233    pub fn for_tests() -> Self {
234        ContextResolverBuilder::from_name("noop")
235            .expect("resolver name not empty")
236            .with_cached_contexts_limit(usize::MAX)
237            .with_interner_capacity_bytes(NonZeroUsize::new(1).expect("not zero"))
238            .with_heap_allocations(true)
239            .with_tags_resolver(Some(TagsResolverBuilder::for_tests().build()))
240            .without_telemetry()
241    }
242
243    /// Builds a [`ContextResolver`] from the current configuration.
244    pub fn build(self) -> ContextResolver {
245        let interner_capacity_bytes = self
246            .interner_capacity_bytes
247            .unwrap_or(DEFAULT_CONTEXT_RESOLVER_INTERNER_CAPACITY_BYTES);
248
249        let interner = match self.interner {
250            Some(interner) => interner,
251            None => GenericMapInterner::new(interner_capacity_bytes),
252        };
253
254        let cached_context_limit = self
255            .cached_contexts_limit
256            .unwrap_or(DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT);
257
258        let allow_heap_allocations = self.allow_heap_allocations.unwrap_or(true);
259
260        let telemetry = Telemetry::new(self.name.clone());
261        telemetry
262            .interner_capacity_bytes()
263            .set(interner.capacity_bytes() as f64);
264
265        // NOTE: We should switch to using a size-based weighter so that we can do more firm bounding of what we cache.
266        let context_cache = CacheBuilder::from_identifier(format!("{}/contexts", self.name))
267            .expect("cache identifier cannot possibly be empty")
268            .with_capacity(cached_context_limit)
269            .with_time_to_idle(self.idle_context_expiration)
270            .with_hasher::<NoopU64BuildHasher>()
271            .with_telemetry(self.telemetry_enabled)
272            .build();
273
274        // If no tags resolver is provided, we need to create one using the same interner used for the context resolver.
275        let tags_resolver = match self.tags_resolver {
276            Some(tags_resolver) => tags_resolver,
277            None => TagsResolverBuilder::new(format!("{}/tags", self.name), interner.clone())
278                .expect("tags resolver name not empty")
279                .with_cached_tagsets_limit(cached_context_limit.get())
280                .with_idle_tagsets_expiration(self.idle_context_expiration.unwrap_or_default())
281                .with_heap_allocations(allow_heap_allocations)
282                .with_origin_tags_resolver(self.origin_tags_resolver.clone())
283                .build(),
284        };
285
286        if self.telemetry_enabled {
287            tokio::spawn(drive_telemetry(interner.clone(), telemetry.clone()));
288        }
289
290        ContextResolver {
291            telemetry,
292            interner,
293            caching_enabled: self.caching_enabled,
294            context_cache,
295            hash_seen_buffer: PrehashedHashSet::with_capacity_and_hasher(
296                SEEN_HASHSET_INITIAL_CAPACITY,
297                NoopU64BuildHasher,
298            ),
299            allow_heap_allocations,
300            tags_resolver,
301        }
302    }
303}
304
305/// A centralized store for resolved contexts.
306///
307/// Contexts are the combination of a name and a set of tags. They are used to identify a specific metric series. As contexts
308/// are constructed entirely of strings, they are expensive to construct in a way that allows sending between tasks, as
309/// this usually requires allocations. Even further, the same context may be "hot", used frequently by the
310/// applications/services sending us metrics.
311///
312/// In order to optimize this, the context resolver is responsible for both interning the strings involved where
313/// possible, as well as keeping a map of contexts that can be referred to with a cheap handle. We can cheaply search
314/// for an existing context without needing to allocate an entirely new one, and get a clone of the handle to use going
315/// forward.
316///
317/// # Design
318///
319/// `ContextResolver` specifically manages interning and mapping of contexts. It can be cheaply cloned itself.
320///
321/// In order to resolve a context, `resolve` must be called which requires taking a lock to check for an existing
322/// context. A read/write lock is used in order to prioritize lookups over inserts, as lookups are expected to be more
323/// common given how often a given context is used and resolved.
324///
325/// Once a context is resolved, a cheap handle -- `Context` -- is returned. This handle, like `ContextResolver`, can be
326/// cheaply cloned. It points directly to the underlying context data (name and tags) and provides access to these
327/// components.
328pub struct ContextResolver {
329    telemetry: Telemetry,
330    interner: GenericMapInterner,
331    caching_enabled: bool,
332    context_cache: ContextCache,
333    hash_seen_buffer: PrehashedHashSet<u64>,
334    allow_heap_allocations: bool,
335    tags_resolver: TagsResolver,
336}
337
338impl ContextResolver {
339    fn intern<S>(&self, s: S) -> Option<MetaString>
340    where
341        S: AsRef<str> + CheapMetaString,
342    {
343        // Try to cheaply clone the string, and if that fails, try to intern it. If that fails, then we fall back to
344        // allocating it on the heap if we allow it.
345        s.try_cheap_clone()
346            .or_else(|| self.interner.try_intern(s.as_ref()).map(MetaString::from))
347            .or_else(|| {
348                self.allow_heap_allocations.then(|| {
349                    self.telemetry.intern_fallback_total().increment(1);
350                    MetaString::from(s.as_ref())
351                })
352            })
353    }
354
355    fn create_context_key<N, I, I2, T, T2>(&mut self, name: N, tags: I, origin_tags: I2) -> (ContextKey, TagSetKey)
356    where
357        N: AsRef<str>,
358        I: IntoIterator<Item = T>,
359        T: AsRef<str>,
360        I2: IntoIterator<Item = T2>,
361        T2: AsRef<str>,
362    {
363        hash_context_with_seen(name.as_ref(), tags, origin_tags, &mut self.hash_seen_buffer)
364    }
365
366    fn create_context<N>(
367        &self, key: ContextKey, name: N, context_tags: SharedTagSet, origin_tags: SharedTagSet,
368    ) -> Option<Context>
369    where
370        N: AsRef<str> + CheapMetaString,
371    {
372        // Intern the name and tags of the context.
373        let context_name = self.intern(name)?;
374
375        self.telemetry.resolved_new_context_total().increment(1);
376        self.telemetry.active_contexts().increment(1);
377
378        Some(Context::from_inner(ContextInner::from_parts(
379            key,
380            context_name,
381            context_tags.into(),
382            origin_tags.into(),
383            self.telemetry.active_contexts().clone(),
384        )))
385    }
386
387    /// Resolves the given context.
388    ///
389    /// If the context has not yet been resolved, the name and tags are interned and a new context is created and
390    /// stored. Otherwise, the existing context is returned. If an origin tags resolver is configured, and origin info
391    /// is available, any enriched tags will be added to the context.
392    ///
393    /// `None` may be returned if the interner is full and outside allocations are disallowed. See
394    /// `allow_heap_allocations` for more information.
395    pub fn resolve<N, I, T>(&mut self, name: N, tags: I, maybe_origin: Option<RawOrigin<'_>>) -> Option<Context>
396    where
397        N: AsRef<str> + CheapMetaString,
398        I: IntoIterator<Item = T> + Clone,
399        T: AsRef<str> + CheapMetaString,
400    {
401        // Try and resolve our origin tags from the provided origin information, if any.
402        let origin_tags = self.tags_resolver.resolve_origin_tags(maybe_origin);
403
404        self.resolve_inner(name, tags, origin_tags)
405    }
406
407    /// Resolves the given context using the provided origin tags.
408    ///
409    /// If the context has not yet been resolved, the name and tags are interned and a new context is created and
410    /// stored. Otherwise, the existing context is returned. The provided origin tags are used to enrich the context.
411    ///
412    /// `None` may be returned if the interner is full and outside allocations are disallowed. See
413    /// `allow_heap_allocations` for more information.
414    ///
415    /// ## Origin tags resolver mismatch
416    ///
417    /// When passing in origin tags, they will be inherently tied to a specific `OriginTagsResolver`, which may
418    /// differ from the configured origin tags resolver in this context resolver. This means that the context that is
419    /// generated and cached may not be reused in the future if an attempt is made to resolve it using the raw origin
420    /// information instead.
421    ///
422    /// This method is intended primarily to allow for resolving contexts in a consistent way while _reusing_ the origin
423    /// tags from another context, such as when remapping the name and/or instrumented tags of a given metric, while
424    /// maintaining its origin association.
425    pub fn resolve_with_origin_tags<N, I, T>(
426        &mut self, name: N, tags: I, origin_tags: impl Into<SharedTagSet>,
427    ) -> Option<Context>
428    where
429        N: AsRef<str> + CheapMetaString,
430        I: IntoIterator<Item = T> + Clone,
431        T: AsRef<str> + CheapMetaString,
432    {
433        self.resolve_inner(name, tags, origin_tags.into())
434    }
435
436    fn resolve_inner<N, I, T>(&mut self, name: N, tags: I, origin_tags: SharedTagSet) -> Option<Context>
437    where
438        N: AsRef<str> + CheapMetaString,
439        I: IntoIterator<Item = T> + Clone,
440        T: AsRef<str> + CheapMetaString,
441    {
442        let (context_key, tagset_key) = self.create_context_key(&name, tags.clone(), &origin_tags);
443
444        // Fast path to avoid looking up the context in the cache if caching is disabled.
445        if !self.caching_enabled {
446            let tag_set = self.tags_resolver.create_tag_set(tags).unwrap_or_default();
447
448            let context = self.create_context(context_key, name, tag_set, origin_tags)?;
449
450            debug!(?context_key, ?context, "Resolved new non-cached context.");
451            return Some(context);
452        }
453
454        match self.context_cache.get(&context_key) {
455            Some(context) => {
456                self.telemetry.resolved_existing_context_total().increment(1);
457                Some(context)
458            }
459            None => {
460                // Try seeing if we have the tagset cached already, and create it if not.
461                let tag_set = match self.tags_resolver.get_tag_set(tagset_key) {
462                    Some(tag_set) => {
463                        self.telemetry.resolved_existing_tagset_total().increment(1);
464                        tag_set
465                    }
466                    None => {
467                        // If the tagset is not cached, we need to create it.
468                        let tag_set = self.tags_resolver.create_tag_set(tags.clone()).unwrap_or_default();
469
470                        self.tags_resolver.insert_tag_set(tagset_key, tag_set.clone());
471
472                        tag_set
473                    }
474                };
475
476                let context = self.create_context(context_key, name, tag_set, origin_tags)?;
477                self.context_cache.insert(context_key, context.clone());
478
479                debug!(?context_key, ?context, "Resolved new context.");
480                Some(context)
481            }
482        }
483    }
484}
485
486impl Clone for ContextResolver {
487    fn clone(&self) -> Self {
488        Self {
489            telemetry: self.telemetry.clone(),
490            interner: self.interner.clone(),
491            caching_enabled: self.caching_enabled,
492            context_cache: self.context_cache.clone(),
493            hash_seen_buffer: PrehashedHashSet::with_capacity_and_hasher(
494                SEEN_HASHSET_INITIAL_CAPACITY,
495                NoopU64BuildHasher,
496            ),
497            allow_heap_allocations: self.allow_heap_allocations,
498            tags_resolver: self.tags_resolver.clone(),
499        }
500    }
501}
502
503async fn drive_telemetry(interner: GenericMapInterner, telemetry: Telemetry) {
504    loop {
505        sleep(Duration::from_secs(1)).await;
506
507        telemetry.interner_entries().set(interner.len() as f64);
508        telemetry
509            .interner_capacity_bytes()
510            .set(interner.capacity_bytes() as f64);
511        telemetry.interner_len_bytes().set(interner.len_bytes() as f64);
512    }
513}
514
515/// A builder for a tag resolver.
516pub struct TagsResolverBuilder {
517    name: String,
518    caching_enabled: bool,
519    cached_tagset_limit: Option<NonZeroUsize>,
520    idle_tagset_expiration: Option<Duration>,
521    allow_heap_allocations: Option<bool>,
522    origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
523    telemetry_enabled: bool,
524    interner: GenericMapInterner,
525}
526
527impl TagsResolverBuilder {
528    /// Creates a new [`TagsResolverBuilder`] with the given name and interner.
529    pub fn new<S: Into<String>>(name: S, interner: GenericMapInterner) -> Result<Self, GenericError> {
530        let name = name.into();
531        if name.is_empty() {
532            return Err(generic_error!("resolver name must not be empty"));
533        }
534
535        Ok(Self {
536            name,
537            caching_enabled: true,
538            cached_tagset_limit: None,
539            idle_tagset_expiration: None,
540            allow_heap_allocations: None,
541            origin_tags_resolver: None,
542            telemetry_enabled: true,
543            interner,
544        })
545    }
546
547    /// Sets the interner to use for this resolver.
548    ///
549    /// This is used when we want to use a separate internet for tagsets, different from the one used for contexts.
550    ///
551    /// Defaults to using the interner passed to the builder.
552    pub fn with_interner(mut self, interner: GenericMapInterner) -> Self {
553        self.interner = interner;
554        self
555    }
556
557    /// Sets whether or not to enable caching of resolved tag sets.
558    ///
559    /// [`TagsResolver`] provides two main benefits: consistent behavior for resolving tag sets (interning, origin
560    /// tags, etc), and the caching of those resolved tag sets to speed up future resolutions. However, caching tag
561    /// sets means that we pay a memory cost for the cache itself, even if the tag sets are not ever reused or are seen
562    /// infrequently. While expiration can help free up cache capacity, it cannot help recover the memory used by the
563    /// underlying cache data structure once they have expanded to hold the tag sets.
564    ///
565    /// Disabling caching allows normal resolving to take place without the overhead of caching the tag sets. This can
566    /// lead to lower average memory usage, as tag sets will only live as long as they are needed, but it will reduce
567    /// memory determinism as memory will be allocated for every resolved tag set (minus interned strings), which means
568    /// that resolving the same tag set ten times in a row will result in ten separate allocations, and so on.
569    ///
570    /// Defaults to caching enabled.
571    pub fn without_caching(mut self) -> Self {
572        self.caching_enabled = false;
573        self.idle_tagset_expiration = None;
574        self
575    }
576
577    /// Sets the limit on the number of cached tagsets.
578    ///
579    /// This is the maximum number of resolved tag sets that can be cached at any given time. This limit does not affect
580    /// the total number of tag sets that can be _alive_ at any given time, which is dependent on the interner capacity
581    /// and whether or not heap allocations are allowed.
582    ///
583    /// Caching tag sets is beneficial when the same tag set is resolved frequently, and it is generally worth
584    /// allowing for higher limits on cached tag sets when heap allocations are allowed, as this can better amortize the
585    /// cost of those heap allocations.
586    ///
587    /// If value is zero, caching will be disabled, and no tag sets will be cached. This is equivalent to calling
588    /// `without_caching`.
589    ///
590    /// Defaults to 500,000.
591    pub fn with_cached_tagsets_limit(mut self, limit: usize) -> Self {
592        match NonZeroUsize::new(limit) {
593            Some(limit) => {
594                self.cached_tagset_limit = Some(limit);
595                self
596            }
597            None => self.without_caching(),
598        }
599    }
600
601    /// Sets the time before tag sets are considered "idle" and eligible for expiration.
602    ///
603    /// This controls how long a tag set will be kept in the cache after its last access or creation time. This value is
604    /// a lower bound, as tag sets eligible for expiration may not be expired immediately. Tag sets may still be removed
605    /// prior to their natural expiration time if the cache is full and evictions are required to make room for a new
606    /// context.
607    ///
608    /// Defaults to no expiration.
609    pub fn with_idle_tagsets_expiration(mut self, time_to_idle: Duration) -> Self {
610        self.idle_tagset_expiration = Some(time_to_idle);
611        self
612    }
613
614    /// Sets whether or not to allow heap allocations when interning strings.
615    ///
616    /// In cases where the interner is full, this setting determines whether or not we refuse to resolve a context, or
617    /// if we allow it be resolved by allocating strings on the heap. When heap allocations are enabled, the amount of
618    /// memory that can be used by the interner is effectively unlimited, as contexts that cannot be interned will be
619    /// simply spill to the heap instead of being limited in any way.
620    ///
621    /// Defaults to `true`.
622    pub fn with_heap_allocations(mut self, allow: bool) -> Self {
623        self.allow_heap_allocations = Some(allow);
624        self
625    }
626
627    /// Sets the origin tags resolver to use when building a context.
628    ///
629    /// In some cases, metrics, events, and service checks may have enriched tags based on their origin -- the
630    /// application/host/container/etc that emitted the metric -- which has to be considered when building the context
631    /// itself. As this can be expensive, it is useful to split the logic of actually grabbing the enriched tags based
632    /// on the available origin info into a separate phase, and implementation, that can run separately from the
633    /// initial hash-based approach of checking if a context has already been resolved.
634    ///
635    /// When set, any origin information provided will be considered during hashing when looking up a context, and any
636    /// enriched tags attached to the detected origin will be accessible from the context.
637    ///
638    /// Defaults to unset.
639    pub fn with_origin_tags_resolver(mut self, resolver: Option<Arc<dyn OriginTagsResolver>>) -> Self {
640        self.origin_tags_resolver = resolver;
641        self
642    }
643
644    /// Sets whether or not to enable telemetry for this resolver.
645    ///
646    /// Reporting the telemetry of the resolver requires running an asynchronous task to override adding additional
647    /// overhead in the hot path of resolving contexts. In some cases, it may be cumbersome to always create the
648    /// resolver in an asynchronous context so that the telemetry task can be spawned. This method allows disabling
649    /// telemetry reporting in those cases.
650    ///
651    /// Defaults to telemetry enabled.
652    pub fn without_telemetry(mut self) -> Self {
653        self.telemetry_enabled = false;
654        self
655    }
656
657    /// Builds a [`TagsResolver`] from the current configuration.
658    pub fn build(self) -> TagsResolver {
659        let cached_tagsets_limit = self
660            .cached_tagset_limit
661            .unwrap_or(DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT);
662
663        let allow_heap_allocations = self.allow_heap_allocations.unwrap_or(true);
664
665        let telemetry = Telemetry::new(self.name.clone());
666        telemetry
667            .interner_capacity_bytes()
668            .set(self.interner.capacity_bytes() as f64);
669
670        let tagset_cache = CacheBuilder::from_identifier(format!("{}/tagsets", self.name))
671            .expect("cache identifier cannot possibly be empty")
672            .with_capacity(cached_tagsets_limit)
673            .with_time_to_idle(self.idle_tagset_expiration)
674            .with_hasher::<NoopU64BuildHasher>()
675            .with_telemetry(self.telemetry_enabled)
676            .build();
677
678        TagsResolver {
679            telemetry,
680            interner: self.interner,
681            caching_enabled: self.caching_enabled,
682            tagset_cache,
683            origin_tags_resolver: self.origin_tags_resolver,
684            allow_heap_allocations,
685        }
686    }
687
688    /// Configures a [`TagsResolverBuilder`] that is suitable for tests.
689    ///
690    /// This configures the builder with the following defaults:
691    ///
692    /// - resolver name of "noop"
693    /// - unlimited cache capacity
694    /// - no-op interner (all strings are heap-allocated)
695    /// - heap allocations allowed
696    /// - telemetry disabled
697    ///
698    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate
699    /// testing scenarios.
700    pub fn for_tests() -> Self {
701        TagsResolverBuilder::new("noop", GenericMapInterner::new(NonZeroUsize::new(1).expect("not zero")))
702            .expect("resolver name not empty")
703            .with_cached_tagsets_limit(usize::MAX)
704            .with_heap_allocations(true)
705            .without_telemetry()
706    }
707}
708
709/// A resolver for tags.
710pub struct TagsResolver {
711    telemetry: Telemetry,
712    interner: GenericMapInterner,
713    caching_enabled: bool,
714    tagset_cache: TagSetCache,
715    origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
716    allow_heap_allocations: bool,
717}
718
719impl TagsResolver {
720    fn intern<S>(&self, s: S) -> Option<MetaString>
721    where
722        S: AsRef<str> + CheapMetaString,
723    {
724        // Try to cheaply clone the string, and if that fails, try to intern it. If that fails, then we fall back to
725        // allocating it on the heap if we allow it.
726        s.try_cheap_clone()
727            .or_else(|| self.interner.try_intern(s.as_ref()).map(MetaString::from))
728            .or_else(|| {
729                self.allow_heap_allocations.then(|| {
730                    self.telemetry.intern_fallback_total().increment(1);
731                    MetaString::from(s.as_ref())
732                })
733            })
734    }
735
736    /// Creates a new tag set from the given tags.
737    ///
738    /// This will intern the tags, and then return a shared tag set. If the interner is full, and heap allocations are
739    /// not allowed, then this will return `None`.
740    ///
741    /// If heap allocations are allowed, then this will return a shared tag set, and the tag set will be cached.
742    pub fn create_tag_set<I, T>(&mut self, tags: I) -> Option<SharedTagSet>
743    where
744        I: IntoIterator<Item = T>,
745        T: AsRef<str> + CheapMetaString,
746    {
747        let mut tag_set = TagSet::default();
748        for tag in tags {
749            let tag = self.intern(tag)?;
750            tag_set.insert_tag(tag);
751        }
752
753        self.telemetry.resolved_new_tagset_total().increment(1);
754
755        Some(tag_set.into_shared())
756    }
757
758    /// Resolves the origin tags for the given origin.
759    ///
760    /// This will return the origin tags for the given origin, or an empty tag set if no origin tags resolver is set.
761    pub fn resolve_origin_tags(&self, maybe_origin: Option<RawOrigin<'_>>) -> SharedTagSet {
762        self.origin_tags_resolver
763            .as_ref()
764            .and_then(|resolver| maybe_origin.map(|origin| resolver.resolve_origin_tags(origin)))
765            .unwrap_or_default()
766    }
767
768    fn get_tag_set(&self, key: TagSetKey) -> Option<SharedTagSet> {
769        self.tagset_cache.get(&key)
770    }
771
772    fn insert_tag_set(&self, key: TagSetKey, tag_set: SharedTagSet) {
773        self.tagset_cache.insert(key, tag_set);
774    }
775}
776
777impl Clone for TagsResolver {
778    fn clone(&self) -> Self {
779        Self {
780            telemetry: self.telemetry.clone(),
781            interner: self.interner.clone(),
782            caching_enabled: self.caching_enabled,
783            tagset_cache: self.tagset_cache.clone(),
784            origin_tags_resolver: self.origin_tags_resolver.clone(),
785            allow_heap_allocations: self.allow_heap_allocations,
786        }
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use metrics::{SharedString, Unit};
793    use metrics_util::{
794        debugging::{DebugValue, DebuggingRecorder},
795        CompositeKey,
796    };
797    use saluki_common::hash::hash_single_fast;
798
799    use super::*;
800
801    fn get_gauge_value(metrics: &[(CompositeKey, Option<Unit>, Option<SharedString>, DebugValue)], key: &str) -> f64 {
802        metrics
803            .iter()
804            .find(|(k, _, _, _)| k.key().name() == key)
805            .map(|(_, _, _, value)| match value {
806                DebugValue::Gauge(value) => value.into_inner(),
807                other => panic!("expected a gauge, got: {:?}", other),
808            })
809            .unwrap_or_else(|| panic!("no metric found with key: {}", key))
810    }
811
812    struct DummyOriginTagsResolver;
813
814    impl OriginTagsResolver for DummyOriginTagsResolver {
815        fn resolve_origin_tags(&self, origin: RawOrigin<'_>) -> SharedTagSet {
816            let origin_key = hash_single_fast(origin);
817
818            let mut tags = TagSet::default();
819            tags.insert_tag(format!("origin_key:{}", origin_key));
820            tags.into_shared()
821        }
822    }
823
824    #[test]
825    fn basic() {
826        let mut resolver = ContextResolverBuilder::for_tests().build();
827
828        // Create two distinct contexts with the same name but different tags:
829        let name = "metric_name";
830        let tags1: [&str; 0] = [];
831        let tags2 = ["tag1"];
832
833        assert_ne!(&tags1[..], &tags2[..]);
834
835        let context1 = resolver
836            .resolve(name, &tags1[..], None)
837            .expect("should not fail to resolve");
838        let context2 = resolver
839            .resolve(name, &tags2[..], None)
840            .expect("should not fail to resolve");
841
842        // The contexts should not be equal to each other, and should have distinct underlying pointers to the shared
843        // context state:
844        assert_ne!(context1, context2);
845        assert!(!context1.ptr_eq(&context2));
846
847        // If we create the context references again, we _should_ get back the same contexts as before:
848        let context1_redo = resolver
849            .resolve(name, &tags1[..], None)
850            .expect("should not fail to resolve");
851        let context2_redo = resolver
852            .resolve(name, &tags2[..], None)
853            .expect("should not fail to resolve");
854
855        assert_ne!(context1_redo, context2_redo);
856        assert_eq!(context1, context1_redo);
857        assert_eq!(context2, context2_redo);
858        assert!(context1.ptr_eq(&context1_redo));
859        assert!(context2.ptr_eq(&context2_redo));
860    }
861
862    #[test]
863    fn tag_order() {
864        let mut resolver = ContextResolverBuilder::for_tests().build();
865
866        // Create two distinct contexts with the same name and tags, but with the tags in a different order:
867        let name = "metric_name";
868        let tags1 = ["tag1", "tag2"];
869        let tags2 = ["tag2", "tag1"];
870
871        assert_ne!(&tags1[..], &tags2[..]);
872
873        let context1 = resolver
874            .resolve(name, &tags1[..], None)
875            .expect("should not fail to resolve");
876        let context2 = resolver
877            .resolve(name, &tags2[..], None)
878            .expect("should not fail to resolve");
879
880        // The contexts should be equal to each other, and should have the same underlying pointer to the shared context
881        // state:
882        assert_eq!(context1, context2);
883        assert!(context1.ptr_eq(&context2));
884    }
885
886    #[test]
887    fn active_contexts() {
888        let recorder = DebuggingRecorder::new();
889        let snapshotter = recorder.snapshotter();
890
891        // Create our resolver and then create a context, which will have its metrics attached to our local recorder:
892        let context = metrics::with_local_recorder(&recorder, || {
893            let mut resolver = ContextResolverBuilder::for_tests().build();
894            resolver
895                .resolve("name", &["tag"][..], None)
896                .expect("should not fail to resolve")
897        });
898
899        // We should be able to see that the active context count is one, representing the context we created:
900        let metrics_before = snapshotter.snapshot().into_vec();
901        let active_contexts = get_gauge_value(&metrics_before, Telemetry::active_contexts_name());
902        assert_eq!(active_contexts, 1.0);
903
904        // Now drop the context, and observe the active context count is negative one, representing the context we dropped:
905        drop(context);
906        let metrics_after = snapshotter.snapshot().into_vec();
907        let active_contexts = get_gauge_value(&metrics_after, Telemetry::active_contexts_name());
908        assert_eq!(active_contexts, -1.0);
909    }
910
911    #[test]
912    fn duplicate_tags() {
913        let mut resolver = ContextResolverBuilder::for_tests().build();
914
915        // Two contexts with the same name, but each with a different set of duplicate tags:
916        let name = "metric_name";
917        let tags1 = ["tag1"];
918        let tags1_duplicated = ["tag1", "tag1"];
919        let tags2 = ["tag2"];
920        let tags2_duplicated = ["tag2", "tag2"];
921
922        let context1 = resolver
923            .resolve(name, &tags1[..], None)
924            .expect("should not fail to resolve");
925        let context1_duplicated = resolver
926            .resolve(name, &tags1_duplicated[..], None)
927            .expect("should not fail to resolve");
928        let context2 = resolver
929            .resolve(name, &tags2[..], None)
930            .expect("should not fail to resolve");
931        let context2_duplicated = resolver
932            .resolve(name, &tags2_duplicated[..], None)
933            .expect("should not fail to resolve");
934
935        // Each non-duplicated/duplicated context pair should be equal to one another:
936        assert_eq!(context1, context1_duplicated);
937        assert_eq!(context2, context2_duplicated);
938
939        // Each pair should not be equal to the other pair, however.
940        //
941        // What we're asserting here is that, if we didn't handle duplicate tags correctly, the XOR hashing of [tag1,
942        // tag1] and [tag2, tag2] would result in the same hash value, since the second duplicate hash of tag1/tag2
943        // would cancel out the first... and thus all that would be left is the hash of the name itself, which is the
944        // same in this test. This would lead to the contexts being equal, which is obviously wrong.
945        //
946        // If we're handling duplicates properly, then the resulting context hashes _shouldn't_ be equal.
947        assert_ne!(context1, context2);
948        assert_ne!(context1_duplicated, context2_duplicated);
949        assert_ne!(context1, context2_duplicated);
950        assert_ne!(context2, context1_duplicated);
951    }
952
953    #[test]
954    fn differing_origins_with_without_resolver() {
955        // Create a regular context resolver, without any origin tags resolver, which should result in contexts being
956        // the same so long as the name and tags are the same, disregarding any difference in origin information:
957        let mut resolver = ContextResolverBuilder::for_tests().build();
958
959        let name = "metric_name";
960        let tags = ["tag1"];
961        let mut origin1 = RawOrigin::default();
962        origin1.set_local_data("container1");
963        let mut origin2 = RawOrigin::default();
964        origin2.set_local_data("container2");
965
966        let context1 = resolver
967            .resolve(name, &tags[..], Some(origin1.clone()))
968            .expect("should not fail to resolve");
969        let context2 = resolver
970            .resolve(name, &tags[..], Some(origin2.clone()))
971            .expect("should not fail to resolve");
972
973        assert_eq!(context1, context2);
974
975        let tags_resolver = TagsResolverBuilder::for_tests()
976            .with_origin_tags_resolver(Some(Arc::new(DummyOriginTagsResolver)))
977            .build();
978        // Now build a context resolver with an origin tags resolver that trivially returns the hash of the origin info
979        // as a tag, which should result in differeing sets of origin tags between the two origins, thus no longer
980        // comparing as equal:
981        let mut resolver = ContextResolverBuilder::for_tests()
982            .with_tags_resolver(Some(tags_resolver))
983            .build();
984
985        let context1 = resolver
986            .resolve(name, &tags[..], Some(origin1))
987            .expect("should not fail to resolve");
988        let context2 = resolver
989            .resolve(name, &tags[..], Some(origin2))
990            .expect("should not fail to resolve");
991
992        assert_ne!(context1, context2);
993    }
994
995    #[test]
996    fn caching_disabled() {
997        let tags_resolver = TagsResolverBuilder::for_tests()
998            .with_origin_tags_resolver(Some(Arc::new(DummyOriginTagsResolver)))
999            .build();
1000        let mut resolver = ContextResolverBuilder::for_tests()
1001            .without_caching()
1002            .with_tags_resolver(Some(tags_resolver))
1003            .build();
1004
1005        let name = "metric_name";
1006        let tags = ["tag1"];
1007        let mut origin1 = RawOrigin::default();
1008        origin1.set_local_data("container1");
1009
1010        // Create a context with caching disabled, and verify that the context is not cached:
1011        let context1 = resolver
1012            .resolve(name, &tags[..], Some(origin1.clone()))
1013            .expect("should not fail to resolve");
1014        assert_eq!(resolver.context_cache.len(), 0);
1015
1016        // Create a second context with the same name and tags, and verify that it is not cached:
1017        let context2 = resolver
1018            .resolve(name, &tags[..], Some(origin1))
1019            .expect("should not fail to resolve");
1020        assert_eq!(resolver.context_cache.len(), 0);
1021
1022        // The contexts should be equal to each other, but the underlying `Arc` pointers should be different since
1023        // they're two distinct contexts in terms of not being cached:
1024        assert_eq!(context1, context2);
1025        assert!(!context1.ptr_eq(&context2));
1026    }
1027
1028    #[test]
1029    fn cheaply_cloneable_name_and_tags() {
1030        const BIG_TAG_ONE: &str = "long-tag-that-cannot-be-inlined-just-to-be-doubly-sure-on-top-of-being-static";
1031        const BIG_TAG_TWO: &str = "another-long-boye-that-we-are-also-sure-wont-be-inlined-and-we-stand-on-that";
1032
1033        // Create a context resolver with a proper string interner configured:
1034        let mut resolver = ContextResolverBuilder::for_tests()
1035            .with_interner_capacity_bytes(NonZeroUsize::new(1024).expect("not zero"))
1036            .build();
1037
1038        // Create our context with cheaply cloneable tags, aka static strings:
1039        let name = MetaString::from_static("long-metric-name-that-shouldnt-be-inlined-and-should-end-up-interned");
1040        let tags = [
1041            MetaString::from_static(BIG_TAG_ONE),
1042            MetaString::from_static(BIG_TAG_TWO),
1043        ];
1044        assert!(tags[0].is_cheaply_cloneable());
1045        assert!(tags[1].is_cheaply_cloneable());
1046
1047        // Make sure the interner is empty before we resolve the context, and that it's empty afterwards, since we
1048        // should be able to cheaply clone both the metric name and both tags:
1049        assert_eq!(resolver.interner.len(), 0);
1050        assert_eq!(resolver.interner.len_bytes(), 0);
1051
1052        let context = resolver
1053            .resolve(&name, &tags[..], None)
1054            .expect("should not fail to resolve");
1055        assert_eq!(resolver.interner.len(), 0);
1056        assert_eq!(resolver.interner.len_bytes(), 0);
1057
1058        // And just a sanity check that we have the expected name and tags in the context:
1059        assert_eq!(context.name(), &name);
1060
1061        let context_tags = context.tags();
1062        assert_eq!(context_tags.len(), 2);
1063        assert!(context_tags.has_tag(&tags[0]));
1064        assert!(context_tags.has_tag(&tags[1]));
1065    }
1066}