saluki_context/resolver.rs
1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
2
3use saluki_common::{
4 cache::{weight::ItemCountWeighter, Cache, CacheBuilder},
5 collections::PrehashedHashSet,
6 hash::NoopU64BuildHasher,
7};
8use saluki_error::{generic_error, GenericError};
9use saluki_metrics::static_metrics;
10use stringtheory::{
11 interning::{GenericMapInterner, Interner as _},
12 CheapMetaString, MetaString,
13};
14use tokio::time::sleep;
15use tracing::debug;
16
17use crate::{
18 context::{Context, ContextInner},
19 hash::{hash_context_with_seen, ContextKey, TagSetKey},
20 origin::{OriginTagsResolver, RawOrigin},
21 tags::{SharedTagSet, TagSet},
22};
23
24// SAFETY: We know, unquestionably, that this value is not zero.
25const DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT: NonZeroUsize = NonZeroUsize::new(500_000).unwrap();
26
27// SAFETY: We know, unquestionably, that this value is not zero.
28const DEFAULT_CONTEXT_RESOLVER_INTERNER_CAPACITY_BYTES: NonZeroUsize = NonZeroUsize::new(2 * 1024 * 1024).unwrap();
29
30const SEEN_HASHSET_INITIAL_CAPACITY: usize = 128;
31
32type ContextCache = Cache<ContextKey, Context, ItemCountWeighter, NoopU64BuildHasher>;
33type TagSetCache = Cache<TagSetKey, SharedTagSet, ItemCountWeighter, NoopU64BuildHasher>;
34
35static_metrics! {
36 name => Telemetry,
37 prefix => context_resolver,
38 labels => [resolver_id: String],
39 metrics => [
40 gauge(interner_capacity_bytes),
41 gauge(interner_len_bytes),
42 gauge(interner_entries),
43 counter(intern_fallback_total),
44
45 counter(resolved_existing_context_total),
46 counter(resolved_new_context_total),
47 gauge(active_contexts),
48
49 counter(resolved_existing_tagset_total),
50 counter(resolved_new_tagset_total),
51 ],
52}
53
54/// Builder for creating a [`ContextResolver`].
55///
56/// # Missing
57///
58/// - Support for configuring the size limit of cached contexts.
59pub struct ContextResolverBuilder {
60 name: String,
61 caching_enabled: bool,
62 cached_contexts_limit: Option<NonZeroUsize>,
63 idle_context_expiration: Option<Duration>,
64 interner_capacity_bytes: Option<NonZeroUsize>,
65 allow_heap_allocations: Option<bool>,
66 tags_resolver: Option<TagsResolver>,
67 interner: Option<GenericMapInterner>,
68 origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
69 telemetry_enabled: bool,
70}
71
72impl ContextResolverBuilder {
73 /// Creates a new `ContextResolverBuilder` with the given resolver name.
74 ///
75 /// The resolver name _should_ be unique, but it is not required to be. Metrics for the resolver will be
76 /// emitted using the given name, so in cases where the name is not unique, those metrics will be aggregated
77 /// together and it will not be possible to distinguish between the different resolvers.
78 ///
79 /// # Errors
80 ///
81 /// If the given resolver name is empty, an error is returned.
82 pub fn from_name<S: Into<String>>(name: S) -> Result<Self, GenericError> {
83 let name = name.into();
84 if name.is_empty() {
85 return Err(generic_error!("resolver name must not be empty"));
86 }
87
88 Ok(Self {
89 name,
90 caching_enabled: true,
91 cached_contexts_limit: None,
92 idle_context_expiration: None,
93 interner_capacity_bytes: None,
94 allow_heap_allocations: None,
95 tags_resolver: None,
96 interner: None,
97 origin_tags_resolver: None,
98 telemetry_enabled: true,
99 })
100 }
101
102 /// Sets whether or not to enable caching of resolved contexts.
103 ///
104 /// [`ContextResolver`] provides two main benefits: consistent behavior for resolving contexts (interning, origin
105 /// tags, etc), and the caching of those resolved contexts to speed up future resolutions. However, caching contexts
106 /// means that we pay a memory cost for the cache itself, even if the contexts are not ever reused or are seen
107 /// infrequently. While expiration can help free up cache capacity, it cannot help recover the memory used by the
108 /// underlying cache data structure once they have expanded to hold the contexts.
109 ///
110 /// Disabling caching allows normal resolving to take place without the overhead of caching the contexts. This can
111 /// lead to lower average memory usage, as contexts will only live as long as they are needed, but it will reduce
112 /// memory determinism as memory will be allocated for every resolved context (minus interned strings), which means
113 /// that resolving the same context ten times in a row will result in ten separate allocations, and so on.
114 ///
115 /// Defaults to caching enabled.
116 pub fn without_caching(mut self) -> Self {
117 self.caching_enabled = false;
118 self.idle_context_expiration = None;
119 self
120 }
121
122 /// Sets the limit on the number of cached contexts.
123 ///
124 /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
125 /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
126 /// and whether or not heap allocations are allowed.
127 ///
128 /// Caching contexts is beneficial when the same context is resolved frequently, and it is generally worth
129 /// allowing for higher limits on cached contexts when heap allocations are allowed, as this can better amortize the
130 /// cost of those heap allocations.
131 ///
132 /// If value is zero, caching will be disabled, and no contexts will be cached. This is equivalent to calling
133 /// `without_caching`.
134 ///
135 /// Defaults to 500,000.
136 pub fn with_cached_contexts_limit(mut self, limit: usize) -> Self {
137 match NonZeroUsize::new(limit) {
138 Some(limit) => {
139 self.cached_contexts_limit = Some(limit);
140 self
141 }
142 None => self.without_caching(),
143 }
144 }
145
146 /// Sets the time before contexts are considered "idle" and eligible for expiration.
147 ///
148 /// This controls how long a context will be kept in the cache after its last access or creation time. This value is
149 /// a lower bound, as contexts eligible for expiration may not be expired immediately. Contexts may still be removed
150 /// prior to their natural expiration time if the cache is full and evictions are required to make room for a new
151 /// context.
152 ///
153 /// Defaults to no expiration.
154 pub fn with_idle_context_expiration(mut self, time_to_idle: Duration) -> Self {
155 self.idle_context_expiration = Some(time_to_idle);
156 self
157 }
158
159 /// Sets the capacity of the string interner, in bytes.
160 ///
161 /// This is the maximum number of bytes that the interner will use for interning strings that are present in
162 /// contexts being resolved. This capacity may or may not be allocated entirely when the resolver is built, but the
163 /// interner will not exceed the configured capacity when allocating any backing storage.
164 ///
165 /// This value directly impacts the number of contexts that can be resolved when heap allocations are disabled, as
166 /// all resolved contexts must either have values (name or tags) that can be inlined or interned. Once the interner
167 /// is full, contexts may fail to be resolved if heap allocations are disabled.
168 ///
169 /// The optimal value will almost always be workload-dependent, but a good starting point can be to estimate around
170 /// 150 - 200 bytes per context based on empirical measurements around common metric name and tag lengths. This
171 /// translate to around 5000 unique contexts per 1MB of interner size.
172 ///
173 /// Defaults to 2MB.
174 pub fn with_interner_capacity_bytes(mut self, capacity: NonZeroUsize) -> Self {
175 self.interner_capacity_bytes = Some(capacity);
176 self
177 }
178
179 /// Sets whether or not to allow heap allocations when interning strings.
180 ///
181 /// In cases where the interner is full, this setting determines whether or not we refuse to resolve a context, or
182 /// if we allow it be resolved by allocating strings on the heap. When heap allocations are enabled, the amount of
183 /// memory that can be used by the interner is effectively unlimited, as contexts that cannot be interned will be
184 /// simply spill to the heap instead of being limited in any way.
185 ///
186 /// Defaults to `true`.
187 pub fn with_heap_allocations(mut self, allow: bool) -> Self {
188 self.allow_heap_allocations = Some(allow);
189 self
190 }
191
192 /// Sets the tags resolver.
193 ///
194 /// Defaults to unset.
195 pub fn with_tags_resolver(mut self, resolver: Option<TagsResolver>) -> Self {
196 self.tags_resolver = resolver;
197 self
198 }
199
200 /// Sets whether or not to enable telemetry for this resolver.
201 ///
202 /// Reporting the telemetry of the resolver requires running an asynchronous task to override adding additional
203 /// overhead in the hot path of resolving contexts. In some cases, it may be cumbersome to always create the
204 /// resolver in an asynchronous context so that the telemetry task can be spawned. This method allows disabling
205 /// telemetry reporting in those cases.
206 ///
207 /// Defaults to telemetry enabled.
208 pub fn without_telemetry(mut self) -> Self {
209 self.telemetry_enabled = false;
210 self
211 }
212
213 /// Sets the interner to use for this resolver.
214 ///
215 /// If an interner is not provided, an interner will be created in [`ContextResolverBuilder::build`]
216 pub fn with_interner(mut self, interner: GenericMapInterner) -> Self {
217 self.interner = Some(interner);
218 self
219 }
220
221 /// Configures a [`ContextResolverBuilder`] that is suitable for tests.
222 ///
223 /// This configures the builder with the following defaults:
224 ///
225 /// - resolver name of "noop"
226 /// - unlimited cache capacity
227 /// - no-op interner (all strings are heap-allocated)
228 /// - heap allocations allowed
229 /// - telemetry disabled
230 ///
231 /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate
232 /// testing scenarios.
233 pub fn for_tests() -> Self {
234 ContextResolverBuilder::from_name("noop")
235 .expect("resolver name not empty")
236 .with_cached_contexts_limit(usize::MAX)
237 .with_interner_capacity_bytes(NonZeroUsize::new(1).expect("not zero"))
238 .with_heap_allocations(true)
239 .with_tags_resolver(Some(TagsResolverBuilder::for_tests().build()))
240 .without_telemetry()
241 }
242
243 /// Builds a [`ContextResolver`] from the current configuration.
244 pub fn build(self) -> ContextResolver {
245 let interner_capacity_bytes = self
246 .interner_capacity_bytes
247 .unwrap_or(DEFAULT_CONTEXT_RESOLVER_INTERNER_CAPACITY_BYTES);
248
249 let interner = match self.interner {
250 Some(interner) => interner,
251 None => GenericMapInterner::new(interner_capacity_bytes),
252 };
253
254 let cached_context_limit = self
255 .cached_contexts_limit
256 .unwrap_or(DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT);
257
258 let allow_heap_allocations = self.allow_heap_allocations.unwrap_or(true);
259
260 let telemetry = Telemetry::new(self.name.clone());
261 telemetry
262 .interner_capacity_bytes()
263 .set(interner.capacity_bytes() as f64);
264
265 // NOTE: We should switch to using a size-based weighter so that we can do more firm bounding of what we cache.
266 let context_cache = CacheBuilder::from_identifier(format!("{}/contexts", self.name))
267 .expect("cache identifier cannot possibly be empty")
268 .with_capacity(cached_context_limit)
269 .with_time_to_idle(self.idle_context_expiration)
270 .with_hasher::<NoopU64BuildHasher>()
271 .with_telemetry(self.telemetry_enabled)
272 .build();
273
274 // If no tags resolver is provided, we need to create one using the same interner used for the context resolver.
275 let tags_resolver = match self.tags_resolver {
276 Some(tags_resolver) => tags_resolver,
277 None => TagsResolverBuilder::new(format!("{}/tags", self.name), interner.clone())
278 .expect("tags resolver name not empty")
279 .with_cached_tagsets_limit(cached_context_limit.get())
280 .with_idle_tagsets_expiration(self.idle_context_expiration.unwrap_or_default())
281 .with_heap_allocations(allow_heap_allocations)
282 .with_origin_tags_resolver(self.origin_tags_resolver.clone())
283 .build(),
284 };
285
286 if self.telemetry_enabled {
287 tokio::spawn(drive_telemetry(interner.clone(), telemetry.clone()));
288 }
289
290 ContextResolver {
291 telemetry,
292 interner,
293 caching_enabled: self.caching_enabled,
294 context_cache,
295 hash_seen_buffer: PrehashedHashSet::with_capacity_and_hasher(
296 SEEN_HASHSET_INITIAL_CAPACITY,
297 NoopU64BuildHasher,
298 ),
299 allow_heap_allocations,
300 tags_resolver,
301 }
302 }
303}
304
305/// A centralized store for resolved contexts.
306///
307/// Contexts are the combination of a name and a set of tags. They are used to identify a specific metric series. As contexts
308/// are constructed entirely of strings, they are expensive to construct in a way that allows sending between tasks, as
309/// this usually requires allocations. Even further, the same context may be "hot", used frequently by the
310/// applications/services sending us metrics.
311///
312/// In order to optimize this, the context resolver is responsible for both interning the strings involved where
313/// possible, as well as keeping a map of contexts that can be referred to with a cheap handle. We can cheaply search
314/// for an existing context without needing to allocate an entirely new one, and get a clone of the handle to use going
315/// forward.
316///
317/// # Design
318///
319/// `ContextResolver` specifically manages interning and mapping of contexts. It can be cheaply cloned itself.
320///
321/// In order to resolve a context, `resolve` must be called which requires taking a lock to check for an existing
322/// context. A read/write lock is used in order to prioritize lookups over inserts, as lookups are expected to be more
323/// common given how often a given context is used and resolved.
324///
325/// Once a context is resolved, a cheap handle -- `Context` -- is returned. This handle, like `ContextResolver`, can be
326/// cheaply cloned. It points directly to the underlying context data (name and tags) and provides access to these
327/// components.
328pub struct ContextResolver {
329 telemetry: Telemetry,
330 interner: GenericMapInterner,
331 caching_enabled: bool,
332 context_cache: ContextCache,
333 hash_seen_buffer: PrehashedHashSet<u64>,
334 allow_heap_allocations: bool,
335 tags_resolver: TagsResolver,
336}
337
338impl ContextResolver {
339 fn intern<S>(&self, s: S) -> Option<MetaString>
340 where
341 S: AsRef<str> + CheapMetaString,
342 {
343 // Try to cheaply clone the string, and if that fails, try to intern it. If that fails, then we fall back to
344 // allocating it on the heap if we allow it.
345 s.try_cheap_clone()
346 .or_else(|| self.interner.try_intern(s.as_ref()).map(MetaString::from))
347 .or_else(|| {
348 self.allow_heap_allocations.then(|| {
349 self.telemetry.intern_fallback_total().increment(1);
350 MetaString::from(s.as_ref())
351 })
352 })
353 }
354
355 fn create_context_key<N, I, I2, T, T2>(&mut self, name: N, tags: I, origin_tags: I2) -> (ContextKey, TagSetKey)
356 where
357 N: AsRef<str>,
358 I: IntoIterator<Item = T>,
359 T: AsRef<str>,
360 I2: IntoIterator<Item = T2>,
361 T2: AsRef<str>,
362 {
363 hash_context_with_seen(name.as_ref(), tags, origin_tags, &mut self.hash_seen_buffer)
364 }
365
366 fn create_context<N>(
367 &self, key: ContextKey, name: N, context_tags: SharedTagSet, origin_tags: SharedTagSet,
368 ) -> Option<Context>
369 where
370 N: AsRef<str> + CheapMetaString,
371 {
372 // Intern the name and tags of the context.
373 let context_name = self.intern(name)?;
374
375 self.telemetry.resolved_new_context_total().increment(1);
376 self.telemetry.active_contexts().increment(1);
377
378 Some(Context::from_inner(ContextInner::from_parts(
379 key,
380 context_name,
381 context_tags.into(),
382 origin_tags.into(),
383 self.telemetry.active_contexts().clone(),
384 )))
385 }
386
387 /// Resolves the given context.
388 ///
389 /// If the context has not yet been resolved, the name and tags are interned and a new context is created and
390 /// stored. Otherwise, the existing context is returned. If an origin tags resolver is configured, and origin info
391 /// is available, any enriched tags will be added to the context.
392 ///
393 /// `None` may be returned if the interner is full and outside allocations are disallowed. See
394 /// `allow_heap_allocations` for more information.
395 pub fn resolve<N, I, T>(&mut self, name: N, tags: I, maybe_origin: Option<RawOrigin<'_>>) -> Option<Context>
396 where
397 N: AsRef<str> + CheapMetaString,
398 I: IntoIterator<Item = T> + Clone,
399 T: AsRef<str> + CheapMetaString,
400 {
401 // Try and resolve our origin tags from the provided origin information, if any.
402 let origin_tags = self.tags_resolver.resolve_origin_tags(maybe_origin);
403
404 self.resolve_inner(name, tags, origin_tags)
405 }
406
407 /// Resolves the given context using the provided origin tags.
408 ///
409 /// If the context has not yet been resolved, the name and tags are interned and a new context is created and
410 /// stored. Otherwise, the existing context is returned. The provided origin tags are used to enrich the context.
411 ///
412 /// `None` may be returned if the interner is full and outside allocations are disallowed. See
413 /// `allow_heap_allocations` for more information.
414 ///
415 /// ## Origin tags resolver mismatch
416 ///
417 /// When passing in origin tags, they will be inherently tied to a specific `OriginTagsResolver`, which may
418 /// differ from the configured origin tags resolver in this context resolver. This means that the context that is
419 /// generated and cached may not be reused in the future if an attempt is made to resolve it using the raw origin
420 /// information instead.
421 ///
422 /// This method is intended primarily to allow for resolving contexts in a consistent way while _reusing_ the origin
423 /// tags from another context, such as when remapping the name and/or instrumented tags of a given metric, while
424 /// maintaining its origin association.
425 pub fn resolve_with_origin_tags<N, I, T>(
426 &mut self, name: N, tags: I, origin_tags: impl Into<SharedTagSet>,
427 ) -> Option<Context>
428 where
429 N: AsRef<str> + CheapMetaString,
430 I: IntoIterator<Item = T> + Clone,
431 T: AsRef<str> + CheapMetaString,
432 {
433 self.resolve_inner(name, tags, origin_tags.into())
434 }
435
436 fn resolve_inner<N, I, T>(&mut self, name: N, tags: I, origin_tags: SharedTagSet) -> Option<Context>
437 where
438 N: AsRef<str> + CheapMetaString,
439 I: IntoIterator<Item = T> + Clone,
440 T: AsRef<str> + CheapMetaString,
441 {
442 let (context_key, tagset_key) = self.create_context_key(&name, tags.clone(), &origin_tags);
443
444 // Fast path to avoid looking up the context in the cache if caching is disabled.
445 if !self.caching_enabled {
446 let tag_set = self.tags_resolver.create_tag_set(tags).unwrap_or_default();
447
448 let context = self.create_context(context_key, name, tag_set, origin_tags)?;
449
450 debug!(?context_key, ?context, "Resolved new non-cached context.");
451 return Some(context);
452 }
453
454 match self.context_cache.get(&context_key) {
455 Some(context) => {
456 self.telemetry.resolved_existing_context_total().increment(1);
457 Some(context)
458 }
459 None => {
460 // Try seeing if we have the tagset cached already, and create it if not.
461 let tag_set = match self.tags_resolver.get_tag_set(tagset_key) {
462 Some(tag_set) => {
463 self.telemetry.resolved_existing_tagset_total().increment(1);
464 tag_set
465 }
466 None => {
467 // If the tagset is not cached, we need to create it.
468 let tag_set = self.tags_resolver.create_tag_set(tags.clone()).unwrap_or_default();
469
470 self.tags_resolver.insert_tag_set(tagset_key, tag_set.clone());
471
472 tag_set
473 }
474 };
475
476 let context = self.create_context(context_key, name, tag_set, origin_tags)?;
477 self.context_cache.insert(context_key, context.clone());
478
479 debug!(?context_key, ?context, "Resolved new context.");
480 Some(context)
481 }
482 }
483 }
484}
485
486impl Clone for ContextResolver {
487 fn clone(&self) -> Self {
488 Self {
489 telemetry: self.telemetry.clone(),
490 interner: self.interner.clone(),
491 caching_enabled: self.caching_enabled,
492 context_cache: self.context_cache.clone(),
493 hash_seen_buffer: PrehashedHashSet::with_capacity_and_hasher(
494 SEEN_HASHSET_INITIAL_CAPACITY,
495 NoopU64BuildHasher,
496 ),
497 allow_heap_allocations: self.allow_heap_allocations,
498 tags_resolver: self.tags_resolver.clone(),
499 }
500 }
501}
502
503async fn drive_telemetry(interner: GenericMapInterner, telemetry: Telemetry) {
504 loop {
505 sleep(Duration::from_secs(1)).await;
506
507 telemetry.interner_entries().set(interner.len() as f64);
508 telemetry
509 .interner_capacity_bytes()
510 .set(interner.capacity_bytes() as f64);
511 telemetry.interner_len_bytes().set(interner.len_bytes() as f64);
512 }
513}
514
515/// A builder for a tag resolver.
516pub struct TagsResolverBuilder {
517 name: String,
518 caching_enabled: bool,
519 cached_tagset_limit: Option<NonZeroUsize>,
520 idle_tagset_expiration: Option<Duration>,
521 allow_heap_allocations: Option<bool>,
522 origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
523 telemetry_enabled: bool,
524 interner: GenericMapInterner,
525}
526
527impl TagsResolverBuilder {
528 /// Creates a new [`TagsResolverBuilder`] with the given name and interner.
529 pub fn new<S: Into<String>>(name: S, interner: GenericMapInterner) -> Result<Self, GenericError> {
530 let name = name.into();
531 if name.is_empty() {
532 return Err(generic_error!("resolver name must not be empty"));
533 }
534
535 Ok(Self {
536 name,
537 caching_enabled: true,
538 cached_tagset_limit: None,
539 idle_tagset_expiration: None,
540 allow_heap_allocations: None,
541 origin_tags_resolver: None,
542 telemetry_enabled: true,
543 interner,
544 })
545 }
546
547 /// Sets the interner to use for this resolver.
548 ///
549 /// This is used when we want to use a separate internet for tagsets, different from the one used for contexts.
550 ///
551 /// Defaults to using the interner passed to the builder.
552 pub fn with_interner(mut self, interner: GenericMapInterner) -> Self {
553 self.interner = interner;
554 self
555 }
556
557 /// Sets whether or not to enable caching of resolved tag sets.
558 ///
559 /// [`TagsResolver`] provides two main benefits: consistent behavior for resolving tag sets (interning, origin
560 /// tags, etc), and the caching of those resolved tag sets to speed up future resolutions. However, caching tag
561 /// sets means that we pay a memory cost for the cache itself, even if the tag sets are not ever reused or are seen
562 /// infrequently. While expiration can help free up cache capacity, it cannot help recover the memory used by the
563 /// underlying cache data structure once they have expanded to hold the tag sets.
564 ///
565 /// Disabling caching allows normal resolving to take place without the overhead of caching the tag sets. This can
566 /// lead to lower average memory usage, as tag sets will only live as long as they are needed, but it will reduce
567 /// memory determinism as memory will be allocated for every resolved tag set (minus interned strings), which means
568 /// that resolving the same tag set ten times in a row will result in ten separate allocations, and so on.
569 ///
570 /// Defaults to caching enabled.
571 pub fn without_caching(mut self) -> Self {
572 self.caching_enabled = false;
573 self.idle_tagset_expiration = None;
574 self
575 }
576
577 /// Sets the limit on the number of cached tagsets.
578 ///
579 /// This is the maximum number of resolved tag sets that can be cached at any given time. This limit does not affect
580 /// the total number of tag sets that can be _alive_ at any given time, which is dependent on the interner capacity
581 /// and whether or not heap allocations are allowed.
582 ///
583 /// Caching tag sets is beneficial when the same tag set is resolved frequently, and it is generally worth
584 /// allowing for higher limits on cached tag sets when heap allocations are allowed, as this can better amortize the
585 /// cost of those heap allocations.
586 ///
587 /// If value is zero, caching will be disabled, and no tag sets will be cached. This is equivalent to calling
588 /// `without_caching`.
589 ///
590 /// Defaults to 500,000.
591 pub fn with_cached_tagsets_limit(mut self, limit: usize) -> Self {
592 match NonZeroUsize::new(limit) {
593 Some(limit) => {
594 self.cached_tagset_limit = Some(limit);
595 self
596 }
597 None => self.without_caching(),
598 }
599 }
600
601 /// Sets the time before tag sets are considered "idle" and eligible for expiration.
602 ///
603 /// This controls how long a tag set will be kept in the cache after its last access or creation time. This value is
604 /// a lower bound, as tag sets eligible for expiration may not be expired immediately. Tag sets may still be removed
605 /// prior to their natural expiration time if the cache is full and evictions are required to make room for a new
606 /// context.
607 ///
608 /// Defaults to no expiration.
609 pub fn with_idle_tagsets_expiration(mut self, time_to_idle: Duration) -> Self {
610 self.idle_tagset_expiration = Some(time_to_idle);
611 self
612 }
613
614 /// Sets whether or not to allow heap allocations when interning strings.
615 ///
616 /// In cases where the interner is full, this setting determines whether or not we refuse to resolve a context, or
617 /// if we allow it be resolved by allocating strings on the heap. When heap allocations are enabled, the amount of
618 /// memory that can be used by the interner is effectively unlimited, as contexts that cannot be interned will be
619 /// simply spill to the heap instead of being limited in any way.
620 ///
621 /// Defaults to `true`.
622 pub fn with_heap_allocations(mut self, allow: bool) -> Self {
623 self.allow_heap_allocations = Some(allow);
624 self
625 }
626
627 /// Sets the origin tags resolver to use when building a context.
628 ///
629 /// In some cases, metrics, events, and service checks may have enriched tags based on their origin -- the
630 /// application/host/container/etc that emitted the metric -- which has to be considered when building the context
631 /// itself. As this can be expensive, it is useful to split the logic of actually grabbing the enriched tags based
632 /// on the available origin info into a separate phase, and implementation, that can run separately from the
633 /// initial hash-based approach of checking if a context has already been resolved.
634 ///
635 /// When set, any origin information provided will be considered during hashing when looking up a context, and any
636 /// enriched tags attached to the detected origin will be accessible from the context.
637 ///
638 /// Defaults to unset.
639 pub fn with_origin_tags_resolver(mut self, resolver: Option<Arc<dyn OriginTagsResolver>>) -> Self {
640 self.origin_tags_resolver = resolver;
641 self
642 }
643
644 /// Sets whether or not to enable telemetry for this resolver.
645 ///
646 /// Reporting the telemetry of the resolver requires running an asynchronous task to override adding additional
647 /// overhead in the hot path of resolving contexts. In some cases, it may be cumbersome to always create the
648 /// resolver in an asynchronous context so that the telemetry task can be spawned. This method allows disabling
649 /// telemetry reporting in those cases.
650 ///
651 /// Defaults to telemetry enabled.
652 pub fn without_telemetry(mut self) -> Self {
653 self.telemetry_enabled = false;
654 self
655 }
656
657 /// Builds a [`TagsResolver`] from the current configuration.
658 pub fn build(self) -> TagsResolver {
659 let cached_tagsets_limit = self
660 .cached_tagset_limit
661 .unwrap_or(DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT);
662
663 let allow_heap_allocations = self.allow_heap_allocations.unwrap_or(true);
664
665 let telemetry = Telemetry::new(self.name.clone());
666 telemetry
667 .interner_capacity_bytes()
668 .set(self.interner.capacity_bytes() as f64);
669
670 let tagset_cache = CacheBuilder::from_identifier(format!("{}/tagsets", self.name))
671 .expect("cache identifier cannot possibly be empty")
672 .with_capacity(cached_tagsets_limit)
673 .with_time_to_idle(self.idle_tagset_expiration)
674 .with_hasher::<NoopU64BuildHasher>()
675 .with_telemetry(self.telemetry_enabled)
676 .build();
677
678 TagsResolver {
679 telemetry,
680 interner: self.interner,
681 caching_enabled: self.caching_enabled,
682 tagset_cache,
683 origin_tags_resolver: self.origin_tags_resolver,
684 allow_heap_allocations,
685 }
686 }
687
688 /// Configures a [`TagsResolverBuilder`] that is suitable for tests.
689 ///
690 /// This configures the builder with the following defaults:
691 ///
692 /// - resolver name of "noop"
693 /// - unlimited cache capacity
694 /// - no-op interner (all strings are heap-allocated)
695 /// - heap allocations allowed
696 /// - telemetry disabled
697 ///
698 /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate
699 /// testing scenarios.
700 pub fn for_tests() -> Self {
701 TagsResolverBuilder::new("noop", GenericMapInterner::new(NonZeroUsize::new(1).expect("not zero")))
702 .expect("resolver name not empty")
703 .with_cached_tagsets_limit(usize::MAX)
704 .with_heap_allocations(true)
705 .without_telemetry()
706 }
707}
708
709/// A resolver for tags.
710pub struct TagsResolver {
711 telemetry: Telemetry,
712 interner: GenericMapInterner,
713 caching_enabled: bool,
714 tagset_cache: TagSetCache,
715 origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
716 allow_heap_allocations: bool,
717}
718
719impl TagsResolver {
720 fn intern<S>(&self, s: S) -> Option<MetaString>
721 where
722 S: AsRef<str> + CheapMetaString,
723 {
724 // Try to cheaply clone the string, and if that fails, try to intern it. If that fails, then we fall back to
725 // allocating it on the heap if we allow it.
726 s.try_cheap_clone()
727 .or_else(|| self.interner.try_intern(s.as_ref()).map(MetaString::from))
728 .or_else(|| {
729 self.allow_heap_allocations.then(|| {
730 self.telemetry.intern_fallback_total().increment(1);
731 MetaString::from(s.as_ref())
732 })
733 })
734 }
735
736 /// Creates a new tag set from the given tags.
737 ///
738 /// This will intern the tags, and then return a shared tag set. If the interner is full, and heap allocations are
739 /// not allowed, then this will return `None`.
740 ///
741 /// If heap allocations are allowed, then this will return a shared tag set, and the tag set will be cached.
742 pub fn create_tag_set<I, T>(&mut self, tags: I) -> Option<SharedTagSet>
743 where
744 I: IntoIterator<Item = T>,
745 T: AsRef<str> + CheapMetaString,
746 {
747 let mut tag_set = TagSet::default();
748 for tag in tags {
749 let tag = self.intern(tag)?;
750 tag_set.insert_tag(tag);
751 }
752
753 self.telemetry.resolved_new_tagset_total().increment(1);
754
755 Some(tag_set.into_shared())
756 }
757
758 /// Resolves the origin tags for the given origin.
759 ///
760 /// This will return the origin tags for the given origin, or an empty tag set if no origin tags resolver is set.
761 pub fn resolve_origin_tags(&self, maybe_origin: Option<RawOrigin<'_>>) -> SharedTagSet {
762 self.origin_tags_resolver
763 .as_ref()
764 .and_then(|resolver| maybe_origin.map(|origin| resolver.resolve_origin_tags(origin)))
765 .unwrap_or_default()
766 }
767
768 fn get_tag_set(&self, key: TagSetKey) -> Option<SharedTagSet> {
769 self.tagset_cache.get(&key)
770 }
771
772 fn insert_tag_set(&self, key: TagSetKey, tag_set: SharedTagSet) {
773 self.tagset_cache.insert(key, tag_set);
774 }
775}
776
777impl Clone for TagsResolver {
778 fn clone(&self) -> Self {
779 Self {
780 telemetry: self.telemetry.clone(),
781 interner: self.interner.clone(),
782 caching_enabled: self.caching_enabled,
783 tagset_cache: self.tagset_cache.clone(),
784 origin_tags_resolver: self.origin_tags_resolver.clone(),
785 allow_heap_allocations: self.allow_heap_allocations,
786 }
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use metrics::{SharedString, Unit};
793 use metrics_util::{
794 debugging::{DebugValue, DebuggingRecorder},
795 CompositeKey,
796 };
797 use saluki_common::hash::hash_single_fast;
798
799 use super::*;
800
801 fn get_gauge_value(metrics: &[(CompositeKey, Option<Unit>, Option<SharedString>, DebugValue)], key: &str) -> f64 {
802 metrics
803 .iter()
804 .find(|(k, _, _, _)| k.key().name() == key)
805 .map(|(_, _, _, value)| match value {
806 DebugValue::Gauge(value) => value.into_inner(),
807 other => panic!("expected a gauge, got: {:?}", other),
808 })
809 .unwrap_or_else(|| panic!("no metric found with key: {}", key))
810 }
811
812 struct DummyOriginTagsResolver;
813
814 impl OriginTagsResolver for DummyOriginTagsResolver {
815 fn resolve_origin_tags(&self, origin: RawOrigin<'_>) -> SharedTagSet {
816 let origin_key = hash_single_fast(origin);
817
818 let mut tags = TagSet::default();
819 tags.insert_tag(format!("origin_key:{}", origin_key));
820 tags.into_shared()
821 }
822 }
823
824 #[test]
825 fn basic() {
826 let mut resolver = ContextResolverBuilder::for_tests().build();
827
828 // Create two distinct contexts with the same name but different tags:
829 let name = "metric_name";
830 let tags1: [&str; 0] = [];
831 let tags2 = ["tag1"];
832
833 assert_ne!(&tags1[..], &tags2[..]);
834
835 let context1 = resolver
836 .resolve(name, &tags1[..], None)
837 .expect("should not fail to resolve");
838 let context2 = resolver
839 .resolve(name, &tags2[..], None)
840 .expect("should not fail to resolve");
841
842 // The contexts should not be equal to each other, and should have distinct underlying pointers to the shared
843 // context state:
844 assert_ne!(context1, context2);
845 assert!(!context1.ptr_eq(&context2));
846
847 // If we create the context references again, we _should_ get back the same contexts as before:
848 let context1_redo = resolver
849 .resolve(name, &tags1[..], None)
850 .expect("should not fail to resolve");
851 let context2_redo = resolver
852 .resolve(name, &tags2[..], None)
853 .expect("should not fail to resolve");
854
855 assert_ne!(context1_redo, context2_redo);
856 assert_eq!(context1, context1_redo);
857 assert_eq!(context2, context2_redo);
858 assert!(context1.ptr_eq(&context1_redo));
859 assert!(context2.ptr_eq(&context2_redo));
860 }
861
862 #[test]
863 fn tag_order() {
864 let mut resolver = ContextResolverBuilder::for_tests().build();
865
866 // Create two distinct contexts with the same name and tags, but with the tags in a different order:
867 let name = "metric_name";
868 let tags1 = ["tag1", "tag2"];
869 let tags2 = ["tag2", "tag1"];
870
871 assert_ne!(&tags1[..], &tags2[..]);
872
873 let context1 = resolver
874 .resolve(name, &tags1[..], None)
875 .expect("should not fail to resolve");
876 let context2 = resolver
877 .resolve(name, &tags2[..], None)
878 .expect("should not fail to resolve");
879
880 // The contexts should be equal to each other, and should have the same underlying pointer to the shared context
881 // state:
882 assert_eq!(context1, context2);
883 assert!(context1.ptr_eq(&context2));
884 }
885
886 #[test]
887 fn active_contexts() {
888 let recorder = DebuggingRecorder::new();
889 let snapshotter = recorder.snapshotter();
890
891 // Create our resolver and then create a context, which will have its metrics attached to our local recorder:
892 let context = metrics::with_local_recorder(&recorder, || {
893 let mut resolver = ContextResolverBuilder::for_tests().build();
894 resolver
895 .resolve("name", &["tag"][..], None)
896 .expect("should not fail to resolve")
897 });
898
899 // We should be able to see that the active context count is one, representing the context we created:
900 let metrics_before = snapshotter.snapshot().into_vec();
901 let active_contexts = get_gauge_value(&metrics_before, Telemetry::active_contexts_name());
902 assert_eq!(active_contexts, 1.0);
903
904 // Now drop the context, and observe the active context count is negative one, representing the context we dropped:
905 drop(context);
906 let metrics_after = snapshotter.snapshot().into_vec();
907 let active_contexts = get_gauge_value(&metrics_after, Telemetry::active_contexts_name());
908 assert_eq!(active_contexts, -1.0);
909 }
910
911 #[test]
912 fn duplicate_tags() {
913 let mut resolver = ContextResolverBuilder::for_tests().build();
914
915 // Two contexts with the same name, but each with a different set of duplicate tags:
916 let name = "metric_name";
917 let tags1 = ["tag1"];
918 let tags1_duplicated = ["tag1", "tag1"];
919 let tags2 = ["tag2"];
920 let tags2_duplicated = ["tag2", "tag2"];
921
922 let context1 = resolver
923 .resolve(name, &tags1[..], None)
924 .expect("should not fail to resolve");
925 let context1_duplicated = resolver
926 .resolve(name, &tags1_duplicated[..], None)
927 .expect("should not fail to resolve");
928 let context2 = resolver
929 .resolve(name, &tags2[..], None)
930 .expect("should not fail to resolve");
931 let context2_duplicated = resolver
932 .resolve(name, &tags2_duplicated[..], None)
933 .expect("should not fail to resolve");
934
935 // Each non-duplicated/duplicated context pair should be equal to one another:
936 assert_eq!(context1, context1_duplicated);
937 assert_eq!(context2, context2_duplicated);
938
939 // Each pair should not be equal to the other pair, however.
940 //
941 // What we're asserting here is that, if we didn't handle duplicate tags correctly, the XOR hashing of [tag1,
942 // tag1] and [tag2, tag2] would result in the same hash value, since the second duplicate hash of tag1/tag2
943 // would cancel out the first... and thus all that would be left is the hash of the name itself, which is the
944 // same in this test. This would lead to the contexts being equal, which is obviously wrong.
945 //
946 // If we're handling duplicates properly, then the resulting context hashes _shouldn't_ be equal.
947 assert_ne!(context1, context2);
948 assert_ne!(context1_duplicated, context2_duplicated);
949 assert_ne!(context1, context2_duplicated);
950 assert_ne!(context2, context1_duplicated);
951 }
952
953 #[test]
954 fn differing_origins_with_without_resolver() {
955 // Create a regular context resolver, without any origin tags resolver, which should result in contexts being
956 // the same so long as the name and tags are the same, disregarding any difference in origin information:
957 let mut resolver = ContextResolverBuilder::for_tests().build();
958
959 let name = "metric_name";
960 let tags = ["tag1"];
961 let mut origin1 = RawOrigin::default();
962 origin1.set_local_data("container1");
963 let mut origin2 = RawOrigin::default();
964 origin2.set_local_data("container2");
965
966 let context1 = resolver
967 .resolve(name, &tags[..], Some(origin1.clone()))
968 .expect("should not fail to resolve");
969 let context2 = resolver
970 .resolve(name, &tags[..], Some(origin2.clone()))
971 .expect("should not fail to resolve");
972
973 assert_eq!(context1, context2);
974
975 let tags_resolver = TagsResolverBuilder::for_tests()
976 .with_origin_tags_resolver(Some(Arc::new(DummyOriginTagsResolver)))
977 .build();
978 // Now build a context resolver with an origin tags resolver that trivially returns the hash of the origin info
979 // as a tag, which should result in differeing sets of origin tags between the two origins, thus no longer
980 // comparing as equal:
981 let mut resolver = ContextResolverBuilder::for_tests()
982 .with_tags_resolver(Some(tags_resolver))
983 .build();
984
985 let context1 = resolver
986 .resolve(name, &tags[..], Some(origin1))
987 .expect("should not fail to resolve");
988 let context2 = resolver
989 .resolve(name, &tags[..], Some(origin2))
990 .expect("should not fail to resolve");
991
992 assert_ne!(context1, context2);
993 }
994
995 #[test]
996 fn caching_disabled() {
997 let tags_resolver = TagsResolverBuilder::for_tests()
998 .with_origin_tags_resolver(Some(Arc::new(DummyOriginTagsResolver)))
999 .build();
1000 let mut resolver = ContextResolverBuilder::for_tests()
1001 .without_caching()
1002 .with_tags_resolver(Some(tags_resolver))
1003 .build();
1004
1005 let name = "metric_name";
1006 let tags = ["tag1"];
1007 let mut origin1 = RawOrigin::default();
1008 origin1.set_local_data("container1");
1009
1010 // Create a context with caching disabled, and verify that the context is not cached:
1011 let context1 = resolver
1012 .resolve(name, &tags[..], Some(origin1.clone()))
1013 .expect("should not fail to resolve");
1014 assert_eq!(resolver.context_cache.len(), 0);
1015
1016 // Create a second context with the same name and tags, and verify that it is not cached:
1017 let context2 = resolver
1018 .resolve(name, &tags[..], Some(origin1))
1019 .expect("should not fail to resolve");
1020 assert_eq!(resolver.context_cache.len(), 0);
1021
1022 // The contexts should be equal to each other, but the underlying `Arc` pointers should be different since
1023 // they're two distinct contexts in terms of not being cached:
1024 assert_eq!(context1, context2);
1025 assert!(!context1.ptr_eq(&context2));
1026 }
1027
1028 #[test]
1029 fn cheaply_cloneable_name_and_tags() {
1030 const BIG_TAG_ONE: &str = "long-tag-that-cannot-be-inlined-just-to-be-doubly-sure-on-top-of-being-static";
1031 const BIG_TAG_TWO: &str = "another-long-boye-that-we-are-also-sure-wont-be-inlined-and-we-stand-on-that";
1032
1033 // Create a context resolver with a proper string interner configured:
1034 let mut resolver = ContextResolverBuilder::for_tests()
1035 .with_interner_capacity_bytes(NonZeroUsize::new(1024).expect("not zero"))
1036 .build();
1037
1038 // Create our context with cheaply cloneable tags, aka static strings:
1039 let name = MetaString::from_static("long-metric-name-that-shouldnt-be-inlined-and-should-end-up-interned");
1040 let tags = [
1041 MetaString::from_static(BIG_TAG_ONE),
1042 MetaString::from_static(BIG_TAG_TWO),
1043 ];
1044 assert!(tags[0].is_cheaply_cloneable());
1045 assert!(tags[1].is_cheaply_cloneable());
1046
1047 // Make sure the interner is empty before we resolve the context, and that it's empty afterwards, since we
1048 // should be able to cheaply clone both the metric name and both tags:
1049 assert_eq!(resolver.interner.len(), 0);
1050 assert_eq!(resolver.interner.len_bytes(), 0);
1051
1052 let context = resolver
1053 .resolve(&name, &tags[..], None)
1054 .expect("should not fail to resolve");
1055 assert_eq!(resolver.interner.len(), 0);
1056 assert_eq!(resolver.interner.len_bytes(), 0);
1057
1058 // And just a sanity check that we have the expected name and tags in the context:
1059 assert_eq!(context.name(), &name);
1060
1061 let context_tags = context.tags();
1062 assert_eq!(context_tags.len(), 2);
1063 assert!(context_tags.has_tag(&tags[0]));
1064 assert!(context_tags.has_tag(&tags[1]));
1065 }
1066}