saluki_context/resolver.rs
1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
2
3use saluki_common::{
4 cache::{weight::ItemCountWeighter, Cache, CacheBuilder},
5 collections::PrehashedHashSet,
6 hash::NoopU64BuildHasher,
7};
8use saluki_error::{generic_error, GenericError};
9use saluki_metrics::static_metrics;
10use stringtheory::{
11 interning::{GenericMapInterner, Interner as _},
12 CheapMetaString, MetaString,
13};
14use tokio::time::sleep;
15use tracing::debug;
16
17use crate::{
18 context::{Context, ContextInner},
19 hash::{hash_context_with_seen, ContextKey, TagSetKey},
20 origin::{OriginTagsResolver, RawOrigin},
21 tags::{SharedTagSet, TagSet},
22};
23
24// SAFETY: We know, unquestionably, that this value is not zero.
25const DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT: NonZeroUsize = NonZeroUsize::new(500_000).unwrap();
26
27// SAFETY: We know, unquestionably, that this value is not zero.
28const DEFAULT_CONTEXT_RESOLVER_INTERNER_CAPACITY_BYTES: NonZeroUsize = NonZeroUsize::new(2 * 1024 * 1024).unwrap();
29
30const SEEN_HASHSET_INITIAL_CAPACITY: usize = 128;
31
32type ContextCache = Cache<ContextKey, Context, ItemCountWeighter, NoopU64BuildHasher>;
33type TagSetCache = Cache<TagSetKey, SharedTagSet, ItemCountWeighter, NoopU64BuildHasher>;
34
35static_metrics! {
36 name => Telemetry,
37 prefix => context_resolver,
38 labels => [resolver_id: String],
39 metrics => [
40 gauge(interner_capacity_bytes),
41 gauge(interner_len_bytes),
42 gauge(interner_entries),
43 counter(intern_fallback_total),
44
45 counter(resolved_existing_context_total),
46 counter(resolved_new_context_total),
47 gauge(active_contexts),
48
49 counter(resolved_existing_tagset_total),
50 counter(resolved_new_tagset_total),
51 ],
52}
53
54/// Builder for creating a [`ContextResolver`].
55///
56/// # Missing
57///
58/// - Support for configuring the size limit of cached contexts.
59pub struct ContextResolverBuilder {
60 name: String,
61 caching_enabled: bool,
62 cached_contexts_limit: Option<NonZeroUsize>,
63 idle_context_expiration: Option<Duration>,
64 interner_capacity_bytes: Option<NonZeroUsize>,
65 allow_heap_allocations: Option<bool>,
66 tags_resolver: Option<TagsResolver>,
67 interner: Option<GenericMapInterner>,
68 origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
69 telemetry_enabled: bool,
70}
71
72impl ContextResolverBuilder {
73 /// Creates a new `ContextResolverBuilder` with the given resolver name.
74 ///
75 /// The resolver name _should_ be unique, but it is not required to be. Metrics for the resolver will be
76 /// emitted using the given name, so in cases where the name is not unique, those metrics will be aggregated
77 /// together and it will not be possible to distinguish between the different resolvers.
78 ///
79 /// # Errors
80 ///
81 /// If the given resolver name is empty, an error is returned.
82 pub fn from_name<S: Into<String>>(name: S) -> Result<Self, GenericError> {
83 let name = name.into();
84 if name.is_empty() {
85 return Err(generic_error!("resolver name must not be empty"));
86 }
87
88 Ok(Self {
89 name,
90 caching_enabled: true,
91 cached_contexts_limit: None,
92 idle_context_expiration: None,
93 interner_capacity_bytes: None,
94 allow_heap_allocations: None,
95 tags_resolver: None,
96 interner: None,
97 origin_tags_resolver: None,
98 telemetry_enabled: true,
99 })
100 }
101
102 /// Sets whether or not to enable caching of resolved contexts.
103 ///
104 /// [`ContextResolver`] provides two main benefits: consistent behavior for resolving contexts (interning, origin
105 /// tags, etc), and the caching of those resolved contexts to speed up future resolutions. However, caching contexts
106 /// means that we pay a memory cost for the cache itself, even if the contexts are not ever reused or are seen
107 /// infrequently. While expiration can help free up cache capacity, it cannot help recover the memory used by the
108 /// underlying cache data structure once they have expanded to hold the contexts.
109 ///
110 /// Disabling caching allows normal resolving to take place without the overhead of caching the contexts. This can
111 /// lead to lower average memory usage, as contexts will only live as long as they are needed, but it will reduce
112 /// memory determinism as memory will be allocated for every resolved context (minus interned strings), which means
113 /// that resolving the same context ten times in a row will result in ten separate allocations, and so on.
114 ///
115 /// Defaults to caching enabled.
116 pub fn without_caching(mut self) -> Self {
117 self.caching_enabled = false;
118 self.idle_context_expiration = None;
119 self
120 }
121
122 /// Sets the limit on the number of cached contexts.
123 ///
124 /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
125 /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
126 /// and whether or not heap allocations are allowed.
127 ///
128 /// Caching contexts is beneficial when the same context is resolved frequently, and it is generally worth
129 /// allowing for higher limits on cached contexts when heap allocations are allowed, as this can better amortize the
130 /// cost of those heap allocations.
131 ///
132 /// If value is zero, caching will be disabled, and no contexts will be cached. This is equivalent to calling
133 /// `without_caching`.
134 ///
135 /// Defaults to 500,000.
136 pub fn with_cached_contexts_limit(mut self, limit: usize) -> Self {
137 match NonZeroUsize::new(limit) {
138 Some(limit) => {
139 self.cached_contexts_limit = Some(limit);
140 self
141 }
142 None => self.without_caching(),
143 }
144 }
145
146 /// Sets the time before contexts are considered "idle" and eligible for expiration.
147 ///
148 /// This controls how long a context will be kept in the cache after its last access or creation time. This value is
149 /// a lower bound, as contexts eligible for expiration may not be expired immediately. Contexts may still be removed
150 /// prior to their natural expiration time if the cache is full and evictions are required to make room for a new
151 /// context.
152 ///
153 /// Defaults to no expiration.
154 pub fn with_idle_context_expiration(mut self, time_to_idle: Duration) -> Self {
155 self.idle_context_expiration = Some(time_to_idle);
156 self
157 }
158
159 /// Sets the capacity of the string interner, in bytes.
160 ///
161 /// This is the maximum number of bytes that the interner will use for interning strings that are present in
162 /// contexts being resolved. This capacity may or may not be allocated entirely when the resolver is built, but the
163 /// interner will not exceed the configured capacity when allocating any backing storage.
164 ///
165 /// This value directly impacts the number of contexts that can be resolved when heap allocations are disabled, as
166 /// all resolved contexts must either have values (name or tags) that can be inlined or interned. Once the interner
167 /// is full, contexts may fail to be resolved if heap allocations are disabled.
168 ///
169 /// The optimal value will almost always be workload-dependent, but a good starting point can be to estimate around
170 /// 150 - 200 bytes per context based on empirical measurements around common metric name and tag lengths. This
171 /// translate to around 5000 unique contexts per 1MB of interner size.
172 ///
173 /// Defaults to 2MB.
174 pub fn with_interner_capacity_bytes(mut self, capacity: NonZeroUsize) -> Self {
175 self.interner_capacity_bytes = Some(capacity);
176 self
177 }
178
179 /// Sets whether or not to allow heap allocations when interning strings.
180 ///
181 /// In cases where the interner is full, this setting determines whether or not we refuse to resolve a context, or
182 /// if we allow it be resolved by allocating strings on the heap. When heap allocations are enabled, the amount of
183 /// memory that can be used by the interner is effectively unlimited, as contexts that cannot be interned will be
184 /// simply spill to the heap instead of being limited in any way.
185 ///
186 /// Defaults to `true`.
187 pub fn with_heap_allocations(mut self, allow: bool) -> Self {
188 self.allow_heap_allocations = Some(allow);
189 self
190 }
191
192 /// Sets the tags resolver.
193 ///
194 /// Defaults to unset.
195 pub fn with_tags_resolver(mut self, resolver: Option<TagsResolver>) -> Self {
196 self.tags_resolver = resolver;
197 self
198 }
199
200 /// Sets whether or not to enable telemetry for this resolver.
201 ///
202 /// Reporting the telemetry of the resolver requires running an asynchronous task to override adding additional
203 /// overhead in the hot path of resolving contexts. In some cases, it may be cumbersome to always create the
204 /// resolver in an asynchronous context so that the telemetry task can be spawned. This method allows disabling
205 /// telemetry reporting in those cases.
206 ///
207 /// Defaults to telemetry enabled.
208 pub fn without_telemetry(mut self) -> Self {
209 self.telemetry_enabled = false;
210 self
211 }
212
213 /// Sets the interner to use for this resolver.
214 ///
215 /// If an interner is not provided, an interner will be created in [`ContextResolverBuilder::build`]
216 pub fn with_interner(mut self, interner: GenericMapInterner) -> Self {
217 self.interner = Some(interner);
218 self
219 }
220
221 /// Configures a [`ContextResolverBuilder`] that is suitable for tests.
222 ///
223 /// This configures the builder with the following defaults:
224 ///
225 /// - resolver name of "noop"
226 /// - unlimited cache capacity
227 /// - no-op interner (all strings are heap-allocated)
228 /// - heap allocations allowed
229 /// - telemetry disabled
230 ///
231 /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate
232 /// testing scenarios.
233 pub fn for_tests() -> Self {
234 ContextResolverBuilder::from_name("noop")
235 .expect("resolver name not empty")
236 .with_cached_contexts_limit(usize::MAX)
237 .with_interner_capacity_bytes(NonZeroUsize::new(1).expect("not zero"))
238 .with_heap_allocations(true)
239 .with_tags_resolver(Some(TagsResolverBuilder::for_tests().build()))
240 .without_telemetry()
241 }
242
243 /// Builds a [`ContextResolver`] from the current configuration.
244 pub fn build(self) -> ContextResolver {
245 let interner_capacity_bytes = self
246 .interner_capacity_bytes
247 .unwrap_or(DEFAULT_CONTEXT_RESOLVER_INTERNER_CAPACITY_BYTES);
248
249 let interner = match self.interner {
250 Some(interner) => interner,
251 None => GenericMapInterner::new(interner_capacity_bytes),
252 };
253
254 let cached_context_limit = self
255 .cached_contexts_limit
256 .unwrap_or(DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT);
257
258 let allow_heap_allocations = self.allow_heap_allocations.unwrap_or(true);
259
260 let telemetry = Telemetry::new(self.name.clone());
261 telemetry
262 .interner_capacity_bytes()
263 .set(interner.capacity_bytes() as f64);
264
265 // NOTE: We should switch to using a size-based weighter so that we can do more firm bounding of what we cache.
266 let context_cache = CacheBuilder::from_identifier(format!("{}/contexts", self.name))
267 .expect("cache identifier cannot possibly be empty")
268 .with_capacity(cached_context_limit)
269 .with_time_to_idle(self.idle_context_expiration)
270 .with_hasher::<NoopU64BuildHasher>()
271 .with_telemetry(self.telemetry_enabled)
272 .build();
273
274 // If no tags resolver is provided, we need to create one using the same interner used for the context resolver.
275 let tags_resolver = match self.tags_resolver {
276 Some(tags_resolver) => tags_resolver,
277 None => TagsResolverBuilder::new(format!("{}/tags", self.name), interner.clone())
278 .expect("tags resolver name not empty")
279 .with_cached_tagsets_limit(cached_context_limit.get())
280 .with_idle_tagsets_expiration(self.idle_context_expiration.unwrap_or_default())
281 .with_heap_allocations(allow_heap_allocations)
282 .with_origin_tags_resolver(self.origin_tags_resolver.clone())
283 .build(),
284 };
285
286 if self.telemetry_enabled {
287 tokio::spawn(drive_telemetry(interner.clone(), telemetry.clone()));
288 }
289
290 ContextResolver {
291 telemetry,
292 interner,
293 caching_enabled: self.caching_enabled,
294 context_cache,
295 hash_seen_buffer: PrehashedHashSet::with_capacity_and_hasher(
296 SEEN_HASHSET_INITIAL_CAPACITY,
297 NoopU64BuildHasher,
298 ),
299 allow_heap_allocations,
300 tags_resolver,
301 }
302 }
303}
304
305/// A centralized store for resolved contexts.
306///
307/// Contexts are the combination of a name and a set of tags. They are used to identify a specific metric series. As contexts
308/// are constructed entirely of strings, they are expensive to construct in a way that allows sending between tasks, as
309/// this usually requires allocations. Even further, the same context may be "hot", used frequently by the
310/// applications/services sending us metrics.
311///
312/// In order to optimize this, the context resolver is responsible for both interning the strings involved where
313/// possible, as well as keeping a map of contexts that can be referred to with a cheap handle. We can cheaply search
314/// for an existing context without needing to allocate an entirely new one, and get a clone of the handle to use going
315/// forward.
316///
317/// # Design
318///
319/// `ContextResolver` specifically manages interning and mapping of contexts. It can be cheaply cloned itself.
320///
321/// In order to resolve a context, `resolve` must be called which requires taking a lock to check for an existing
322/// context. A read/write lock is used in order to prioritize lookups over inserts, as lookups are expected to be more
323/// common given how often a given context is used and resolved.
324///
325/// Once a context is resolved, a cheap handle -- `Context` -- is returned. This handle, like `ContextResolver`, can be
326/// cheaply cloned. It points directly to the underlying context data (name and tags) and provides access to these
327/// components.
328pub struct ContextResolver {
329 telemetry: Telemetry,
330 interner: GenericMapInterner,
331 caching_enabled: bool,
332 context_cache: ContextCache,
333 hash_seen_buffer: PrehashedHashSet<u64>,
334 allow_heap_allocations: bool,
335 tags_resolver: TagsResolver,
336}
337
338impl ContextResolver {
339 fn intern<S>(&self, s: S) -> Option<MetaString>
340 where
341 S: AsRef<str> + CheapMetaString,
342 {
343 // Try to cheaply clone the string, and if that fails, try to intern it. If that fails, then we fall back to
344 // allocating it on the heap if we allow it.
345 s.try_cheap_clone()
346 .or_else(|| self.interner.try_intern(s.as_ref()).map(MetaString::from))
347 .or_else(|| {
348 self.allow_heap_allocations.then(|| {
349 self.telemetry.intern_fallback_total().increment(1);
350 MetaString::from(s.as_ref())
351 })
352 })
353 }
354
355 fn create_context_key<N, I, I2, T, T2>(&mut self, name: N, tags: I, origin_tags: I2) -> (ContextKey, TagSetKey)
356 where
357 N: AsRef<str>,
358 I: IntoIterator<Item = T>,
359 T: AsRef<str>,
360 I2: IntoIterator<Item = T2>,
361 T2: AsRef<str>,
362 {
363 hash_context_with_seen(name.as_ref(), tags, origin_tags, &mut self.hash_seen_buffer)
364 }
365
366 fn create_context<N>(
367 &self, key: ContextKey, name: N, context_tags: SharedTagSet, origin_tags: SharedTagSet,
368 ) -> Option<Context>
369 where
370 N: AsRef<str> + CheapMetaString,
371 {
372 // Intern the name and tags of the context.
373 let context_name = self.intern(name)?;
374
375 self.telemetry.resolved_new_context_total().increment(1);
376 self.telemetry.active_contexts().increment(1);
377
378 Some(Context::from_inner(ContextInner::from_parts(
379 key,
380 context_name,
381 context_tags,
382 origin_tags,
383 self.telemetry.active_contexts().clone(),
384 )))
385 }
386
387 /// Resolves the given context.
388 ///
389 /// If the context has not yet been resolved, the name and tags are interned and a new context is created and
390 /// stored. Otherwise, the existing context is returned. If an origin tags resolver is configured, and origin info
391 /// is available, any enriched tags will be added to the context.
392 ///
393 /// `None` may be returned if the interner is full and outside allocations are disallowed. See
394 /// `allow_heap_allocations` for more information.
395 pub fn resolve<N, I, T>(&mut self, name: N, tags: I, maybe_origin: Option<RawOrigin<'_>>) -> Option<Context>
396 where
397 N: AsRef<str> + CheapMetaString,
398 I: IntoIterator<Item = T> + Clone,
399 T: AsRef<str> + CheapMetaString,
400 {
401 // Try and resolve our origin tags from the provided origin information, if any.
402 let origin_tags = self.tags_resolver.resolve_origin_tags(maybe_origin);
403
404 self.resolve_inner(name, tags, origin_tags)
405 }
406
407 /// Resolves the given context using the provided origin tags.
408 ///
409 /// If the context has not yet been resolved, the name and tags are interned and a new context is created and
410 /// stored. Otherwise, the existing context is returned. The provided origin tags are used to enrich the context.
411 ///
412 /// `None` may be returned if the interner is full and outside allocations are disallowed. See
413 /// `allow_heap_allocations` for more information.
414 ///
415 /// ## Origin tags resolver mismatch
416 ///
417 /// When passing in origin tags, they will be inherently tied to a specific `OriginTagsResolver`, which may
418 /// differ from the configured origin tags resolver in this context resolver. This means that the context that is
419 /// generated and cached may not be reused in the future if an attempt is made to resolve it using the raw origin
420 /// information instead.
421 ///
422 /// This method is intended primarily to allow for resolving contexts in a consistent way while _reusing_ the origin
423 /// tags from another context, such as when remapping the name and/or instrumented tags of a given metric, while
424 /// maintaining its origin association.
425 pub fn resolve_with_origin_tags<N, I, T>(&mut self, name: N, tags: I, origin_tags: SharedTagSet) -> Option<Context>
426 where
427 N: AsRef<str> + CheapMetaString,
428 I: IntoIterator<Item = T> + Clone,
429 T: AsRef<str> + CheapMetaString,
430 {
431 self.resolve_inner(name, tags, origin_tags)
432 }
433
434 fn resolve_inner<N, I, T>(&mut self, name: N, tags: I, origin_tags: SharedTagSet) -> Option<Context>
435 where
436 N: AsRef<str> + CheapMetaString,
437 I: IntoIterator<Item = T> + Clone,
438 T: AsRef<str> + CheapMetaString,
439 {
440 let (context_key, tagset_key) = self.create_context_key(&name, tags.clone(), &origin_tags);
441
442 // Fast path to avoid looking up the context in the cache if caching is disabled.
443 if !self.caching_enabled {
444 let tag_set = self.tags_resolver.create_tag_set(tags).unwrap_or_default();
445
446 let context = self.create_context(context_key, name, tag_set, origin_tags)?;
447
448 debug!(?context_key, ?context, "Resolved new non-cached context.");
449 return Some(context);
450 }
451
452 match self.context_cache.get(&context_key) {
453 Some(context) => {
454 self.telemetry.resolved_existing_context_total().increment(1);
455 Some(context)
456 }
457 None => {
458 // Try seeing if we have the tagset cached already, and create it if not.
459 let tag_set = match self.tags_resolver.get_tag_set(tagset_key) {
460 Some(tag_set) => {
461 self.telemetry.resolved_existing_tagset_total().increment(1);
462 tag_set
463 }
464 None => {
465 // If the tagset is not cached, we need to create it.
466 let tag_set = self.tags_resolver.create_tag_set(tags.clone()).unwrap_or_default();
467
468 self.tags_resolver.insert_tag_set(tagset_key, tag_set.clone());
469
470 tag_set
471 }
472 };
473
474 let context = self.create_context(context_key, name, tag_set, origin_tags)?;
475 self.context_cache.insert(context_key, context.clone());
476
477 debug!(?context_key, ?context, "Resolved new context.");
478 Some(context)
479 }
480 }
481 }
482}
483
484impl Clone for ContextResolver {
485 fn clone(&self) -> Self {
486 Self {
487 telemetry: self.telemetry.clone(),
488 interner: self.interner.clone(),
489 caching_enabled: self.caching_enabled,
490 context_cache: self.context_cache.clone(),
491 hash_seen_buffer: PrehashedHashSet::with_capacity_and_hasher(
492 SEEN_HASHSET_INITIAL_CAPACITY,
493 NoopU64BuildHasher,
494 ),
495 allow_heap_allocations: self.allow_heap_allocations,
496 tags_resolver: self.tags_resolver.clone(),
497 }
498 }
499}
500
501async fn drive_telemetry(interner: GenericMapInterner, telemetry: Telemetry) {
502 loop {
503 sleep(Duration::from_secs(1)).await;
504
505 telemetry.interner_entries().set(interner.len() as f64);
506 telemetry
507 .interner_capacity_bytes()
508 .set(interner.capacity_bytes() as f64);
509 telemetry.interner_len_bytes().set(interner.len_bytes() as f64);
510 }
511}
512
513/// A builder for a tag resolver.
514pub struct TagsResolverBuilder {
515 name: String,
516 caching_enabled: bool,
517 cached_tagset_limit: Option<NonZeroUsize>,
518 idle_tagset_expiration: Option<Duration>,
519 allow_heap_allocations: Option<bool>,
520 origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
521 telemetry_enabled: bool,
522 interner: GenericMapInterner,
523}
524
525impl TagsResolverBuilder {
526 /// Creates a new [`TagsResolverBuilder`] with the given name and interner.
527 pub fn new<S: Into<String>>(name: S, interner: GenericMapInterner) -> Result<Self, GenericError> {
528 let name = name.into();
529 if name.is_empty() {
530 return Err(generic_error!("resolver name must not be empty"));
531 }
532
533 Ok(Self {
534 name,
535 caching_enabled: true,
536 cached_tagset_limit: None,
537 idle_tagset_expiration: None,
538 allow_heap_allocations: None,
539 origin_tags_resolver: None,
540 telemetry_enabled: true,
541 interner,
542 })
543 }
544
545 /// Sets the interner to use for this resolver.
546 ///
547 /// This is used when we want to use a separate internet for tagsets, different from the one used for contexts.
548 ///
549 /// Defaults to using the interner passed to the builder.
550 pub fn with_interner(mut self, interner: GenericMapInterner) -> Self {
551 self.interner = interner;
552 self
553 }
554
555 /// Sets whether or not to enable caching of resolved tag sets.
556 ///
557 /// [`TagsResolver`] provides two main benefits: consistent behavior for resolving tag sets (interning, origin
558 /// tags, etc), and the caching of those resolved tag sets to speed up future resolutions. However, caching tag
559 /// sets means that we pay a memory cost for the cache itself, even if the tag sets are not ever reused or are seen
560 /// infrequently. While expiration can help free up cache capacity, it cannot help recover the memory used by the
561 /// underlying cache data structure once they have expanded to hold the tag sets.
562 ///
563 /// Disabling caching allows normal resolving to take place without the overhead of caching the tag sets. This can
564 /// lead to lower average memory usage, as tag sets will only live as long as they are needed, but it will reduce
565 /// memory determinism as memory will be allocated for every resolved tag set (minus interned strings), which means
566 /// that resolving the same tag set ten times in a row will result in ten separate allocations, and so on.
567 ///
568 /// Defaults to caching enabled.
569 pub fn without_caching(mut self) -> Self {
570 self.caching_enabled = false;
571 self.idle_tagset_expiration = None;
572 self
573 }
574
575 /// Sets the limit on the number of cached tagsets.
576 ///
577 /// This is the maximum number of resolved tag sets that can be cached at any given time. This limit does not affect
578 /// the total number of tag sets that can be _alive_ at any given time, which is dependent on the interner capacity
579 /// and whether or not heap allocations are allowed.
580 ///
581 /// Caching tag sets is beneficial when the same tag set is resolved frequently, and it is generally worth
582 /// allowing for higher limits on cached tag sets when heap allocations are allowed, as this can better amortize the
583 /// cost of those heap allocations.
584 ///
585 /// If value is zero, caching will be disabled, and no tag sets will be cached. This is equivalent to calling
586 /// `without_caching`.
587 ///
588 /// Defaults to 500,000.
589 pub fn with_cached_tagsets_limit(mut self, limit: usize) -> Self {
590 match NonZeroUsize::new(limit) {
591 Some(limit) => {
592 self.cached_tagset_limit = Some(limit);
593 self
594 }
595 None => self.without_caching(),
596 }
597 }
598
599 /// Sets the time before tag sets are considered "idle" and eligible for expiration.
600 ///
601 /// This controls how long a tag set will be kept in the cache after its last access or creation time. This value is
602 /// a lower bound, as tag sets eligible for expiration may not be expired immediately. Tag sets may still be removed
603 /// prior to their natural expiration time if the cache is full and evictions are required to make room for a new
604 /// context.
605 ///
606 /// Defaults to no expiration.
607 pub fn with_idle_tagsets_expiration(mut self, time_to_idle: Duration) -> Self {
608 self.idle_tagset_expiration = Some(time_to_idle);
609 self
610 }
611
612 /// Sets whether or not to allow heap allocations when interning strings.
613 ///
614 /// In cases where the interner is full, this setting determines whether or not we refuse to resolve a context, or
615 /// if we allow it be resolved by allocating strings on the heap. When heap allocations are enabled, the amount of
616 /// memory that can be used by the interner is effectively unlimited, as contexts that cannot be interned will be
617 /// simply spill to the heap instead of being limited in any way.
618 ///
619 /// Defaults to `true`.
620 pub fn with_heap_allocations(mut self, allow: bool) -> Self {
621 self.allow_heap_allocations = Some(allow);
622 self
623 }
624
625 /// Sets the origin tags resolver to use when building a context.
626 ///
627 /// In some cases, metrics, events, and service checks may have enriched tags based on their origin -- the
628 /// application/host/container/etc that emitted the metric -- which has to be considered when building the context
629 /// itself. As this can be expensive, it is useful to split the logic of actually grabbing the enriched tags based
630 /// on the available origin info into a separate phase, and implementation, that can run separately from the
631 /// initial hash-based approach of checking if a context has already been resolved.
632 ///
633 /// When set, any origin information provided will be considered during hashing when looking up a context, and any
634 /// enriched tags attached to the detected origin will be accessible from the context.
635 ///
636 /// Defaults to unset.
637 pub fn with_origin_tags_resolver(mut self, resolver: Option<Arc<dyn OriginTagsResolver>>) -> Self {
638 self.origin_tags_resolver = resolver;
639 self
640 }
641
642 /// Sets whether or not to enable telemetry for this resolver.
643 ///
644 /// Reporting the telemetry of the resolver requires running an asynchronous task to override adding additional
645 /// overhead in the hot path of resolving contexts. In some cases, it may be cumbersome to always create the
646 /// resolver in an asynchronous context so that the telemetry task can be spawned. This method allows disabling
647 /// telemetry reporting in those cases.
648 ///
649 /// Defaults to telemetry enabled.
650 pub fn without_telemetry(mut self) -> Self {
651 self.telemetry_enabled = false;
652 self
653 }
654
655 /// Builds a [`TagsResolver`] from the current configuration.
656 pub fn build(self) -> TagsResolver {
657 let cached_tagsets_limit = self
658 .cached_tagset_limit
659 .unwrap_or(DEFAULT_CONTEXT_RESOLVER_CACHED_CONTEXTS_LIMIT);
660
661 let allow_heap_allocations = self.allow_heap_allocations.unwrap_or(true);
662
663 let telemetry = Telemetry::new(self.name.clone());
664 telemetry
665 .interner_capacity_bytes()
666 .set(self.interner.capacity_bytes() as f64);
667
668 let tagset_cache = CacheBuilder::from_identifier(format!("{}/tagsets", self.name))
669 .expect("cache identifier cannot possibly be empty")
670 .with_capacity(cached_tagsets_limit)
671 .with_time_to_idle(self.idle_tagset_expiration)
672 .with_hasher::<NoopU64BuildHasher>()
673 .with_telemetry(self.telemetry_enabled)
674 .build();
675
676 TagsResolver {
677 telemetry,
678 interner: self.interner,
679 caching_enabled: self.caching_enabled,
680 tagset_cache,
681 origin_tags_resolver: self.origin_tags_resolver,
682 allow_heap_allocations,
683 }
684 }
685
686 /// Configures a [`TagsResolverBuilder`] that is suitable for tests.
687 ///
688 /// This configures the builder with the following defaults:
689 ///
690 /// - resolver name of "noop"
691 /// - unlimited cache capacity
692 /// - no-op interner (all strings are heap-allocated)
693 /// - heap allocations allowed
694 /// - telemetry disabled
695 ///
696 /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate
697 /// testing scenarios.
698 pub fn for_tests() -> Self {
699 TagsResolverBuilder::new("noop", GenericMapInterner::new(NonZeroUsize::new(1).expect("not zero")))
700 .expect("resolver name not empty")
701 .with_cached_tagsets_limit(usize::MAX)
702 .with_heap_allocations(true)
703 .without_telemetry()
704 }
705}
706
707/// A resolver for tags.
708pub struct TagsResolver {
709 telemetry: Telemetry,
710 interner: GenericMapInterner,
711 caching_enabled: bool,
712 tagset_cache: TagSetCache,
713 origin_tags_resolver: Option<Arc<dyn OriginTagsResolver>>,
714 allow_heap_allocations: bool,
715}
716
717impl TagsResolver {
718 fn intern<S>(&self, s: S) -> Option<MetaString>
719 where
720 S: AsRef<str> + CheapMetaString,
721 {
722 // Try to cheaply clone the string, and if that fails, try to intern it. If that fails, then we fall back to
723 // allocating it on the heap if we allow it.
724 s.try_cheap_clone()
725 .or_else(|| self.interner.try_intern(s.as_ref()).map(MetaString::from))
726 .or_else(|| {
727 self.allow_heap_allocations.then(|| {
728 self.telemetry.intern_fallback_total().increment(1);
729 MetaString::from(s.as_ref())
730 })
731 })
732 }
733
734 /// Creates a new tag set from the given tags.
735 ///
736 /// This will intern the tags, and then return a shared tag set. If the interner is full, and heap allocations are
737 /// not allowed, then this will return `None`.
738 ///
739 /// If heap allocations are allowed, then this will return a shared tag set, and the tag set will be cached.
740 pub fn create_tag_set<I, T>(&mut self, tags: I) -> Option<SharedTagSet>
741 where
742 I: IntoIterator<Item = T>,
743 T: AsRef<str> + CheapMetaString,
744 {
745 let mut tag_set = TagSet::default();
746 for tag in tags {
747 let tag = self.intern(tag)?;
748 tag_set.insert_tag(tag);
749 }
750
751 self.telemetry.resolved_new_tagset_total().increment(1);
752
753 Some(tag_set.into_shared())
754 }
755
756 /// Resolves the origin tags for the given origin.
757 ///
758 /// This will return the origin tags for the given origin, or an empty tag set if no origin tags resolver is set.
759 pub fn resolve_origin_tags(&self, maybe_origin: Option<RawOrigin<'_>>) -> SharedTagSet {
760 self.origin_tags_resolver
761 .as_ref()
762 .and_then(|resolver| maybe_origin.map(|origin| resolver.resolve_origin_tags(origin)))
763 .unwrap_or_default()
764 }
765
766 fn get_tag_set(&self, key: TagSetKey) -> Option<SharedTagSet> {
767 self.tagset_cache.get(&key)
768 }
769
770 fn insert_tag_set(&self, key: TagSetKey, tag_set: SharedTagSet) {
771 self.tagset_cache.insert(key, tag_set);
772 }
773}
774
775impl Clone for TagsResolver {
776 fn clone(&self) -> Self {
777 Self {
778 telemetry: self.telemetry.clone(),
779 interner: self.interner.clone(),
780 caching_enabled: self.caching_enabled,
781 tagset_cache: self.tagset_cache.clone(),
782 origin_tags_resolver: self.origin_tags_resolver.clone(),
783 allow_heap_allocations: self.allow_heap_allocations,
784 }
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use metrics::{SharedString, Unit};
791 use metrics_util::{
792 debugging::{DebugValue, DebuggingRecorder},
793 CompositeKey,
794 };
795 use saluki_common::hash::hash_single_fast;
796
797 use super::*;
798
799 fn get_gauge_value(metrics: &[(CompositeKey, Option<Unit>, Option<SharedString>, DebugValue)], key: &str) -> f64 {
800 metrics
801 .iter()
802 .find(|(k, _, _, _)| k.key().name() == key)
803 .map(|(_, _, _, value)| match value {
804 DebugValue::Gauge(value) => value.into_inner(),
805 other => panic!("expected a gauge, got: {:?}", other),
806 })
807 .unwrap_or_else(|| panic!("no metric found with key: {}", key))
808 }
809
810 struct DummyOriginTagsResolver;
811
812 impl OriginTagsResolver for DummyOriginTagsResolver {
813 fn resolve_origin_tags(&self, origin: RawOrigin<'_>) -> SharedTagSet {
814 let origin_key = hash_single_fast(origin);
815
816 let mut tags = TagSet::default();
817 tags.insert_tag(format!("origin_key:{}", origin_key));
818 tags.into_shared()
819 }
820 }
821
822 #[test]
823 fn basic() {
824 let mut resolver = ContextResolverBuilder::for_tests().build();
825
826 // Create two distinct contexts with the same name but different tags:
827 let name = "metric_name";
828 let tags1: [&str; 0] = [];
829 let tags2 = ["tag1"];
830
831 assert_ne!(&tags1[..], &tags2[..]);
832
833 let context1 = resolver
834 .resolve(name, &tags1[..], None)
835 .expect("should not fail to resolve");
836 let context2 = resolver
837 .resolve(name, &tags2[..], None)
838 .expect("should not fail to resolve");
839
840 // The contexts should not be equal to each other, and should have distinct underlying pointers to the shared
841 // context state:
842 assert_ne!(context1, context2);
843 assert!(!context1.ptr_eq(&context2));
844
845 // If we create the context references again, we _should_ get back the same contexts as before:
846 let context1_redo = resolver
847 .resolve(name, &tags1[..], None)
848 .expect("should not fail to resolve");
849 let context2_redo = resolver
850 .resolve(name, &tags2[..], None)
851 .expect("should not fail to resolve");
852
853 assert_ne!(context1_redo, context2_redo);
854 assert_eq!(context1, context1_redo);
855 assert_eq!(context2, context2_redo);
856 assert!(context1.ptr_eq(&context1_redo));
857 assert!(context2.ptr_eq(&context2_redo));
858 }
859
860 #[test]
861 fn tag_order() {
862 let mut resolver = ContextResolverBuilder::for_tests().build();
863
864 // Create two distinct contexts with the same name and tags, but with the tags in a different order:
865 let name = "metric_name";
866 let tags1 = ["tag1", "tag2"];
867 let tags2 = ["tag2", "tag1"];
868
869 assert_ne!(&tags1[..], &tags2[..]);
870
871 let context1 = resolver
872 .resolve(name, &tags1[..], None)
873 .expect("should not fail to resolve");
874 let context2 = resolver
875 .resolve(name, &tags2[..], None)
876 .expect("should not fail to resolve");
877
878 // The contexts should be equal to each other, and should have the same underlying pointer to the shared context
879 // state:
880 assert_eq!(context1, context2);
881 assert!(context1.ptr_eq(&context2));
882 }
883
884 #[test]
885 fn active_contexts() {
886 let recorder = DebuggingRecorder::new();
887 let snapshotter = recorder.snapshotter();
888
889 // Create our resolver and then create a context, which will have its metrics attached to our local recorder:
890 let context = metrics::with_local_recorder(&recorder, || {
891 let mut resolver = ContextResolverBuilder::for_tests().build();
892 resolver
893 .resolve("name", &["tag"][..], None)
894 .expect("should not fail to resolve")
895 });
896
897 // We should be able to see that the active context count is one, representing the context we created:
898 let metrics_before = snapshotter.snapshot().into_vec();
899 let active_contexts = get_gauge_value(&metrics_before, Telemetry::active_contexts_name());
900 assert_eq!(active_contexts, 1.0);
901
902 // Now drop the context, and observe the active context count is negative one, representing the context we dropped:
903 drop(context);
904 let metrics_after = snapshotter.snapshot().into_vec();
905 let active_contexts = get_gauge_value(&metrics_after, Telemetry::active_contexts_name());
906 assert_eq!(active_contexts, -1.0);
907 }
908
909 #[test]
910 fn duplicate_tags() {
911 let mut resolver = ContextResolverBuilder::for_tests().build();
912
913 // Two contexts with the same name, but each with a different set of duplicate tags:
914 let name = "metric_name";
915 let tags1 = ["tag1"];
916 let tags1_duplicated = ["tag1", "tag1"];
917 let tags2 = ["tag2"];
918 let tags2_duplicated = ["tag2", "tag2"];
919
920 let context1 = resolver
921 .resolve(name, &tags1[..], None)
922 .expect("should not fail to resolve");
923 let context1_duplicated = resolver
924 .resolve(name, &tags1_duplicated[..], None)
925 .expect("should not fail to resolve");
926 let context2 = resolver
927 .resolve(name, &tags2[..], None)
928 .expect("should not fail to resolve");
929 let context2_duplicated = resolver
930 .resolve(name, &tags2_duplicated[..], None)
931 .expect("should not fail to resolve");
932
933 // Each non-duplicated/duplicated context pair should be equal to one another:
934 assert_eq!(context1, context1_duplicated);
935 assert_eq!(context2, context2_duplicated);
936
937 // Each pair should not be equal to the other pair, however.
938 //
939 // What we're asserting here is that, if we didn't handle duplicate tags correctly, the XOR hashing of [tag1,
940 // tag1] and [tag2, tag2] would result in the same hash value, since the second duplicate hash of tag1/tag2
941 // would cancel out the first... and thus all that would be left is the hash of the name itself, which is the
942 // same in this test. This would lead to the contexts being equal, which is obviously wrong.
943 //
944 // If we're handling duplicates properly, then the resulting context hashes _shouldn't_ be equal.
945 assert_ne!(context1, context2);
946 assert_ne!(context1_duplicated, context2_duplicated);
947 assert_ne!(context1, context2_duplicated);
948 assert_ne!(context2, context1_duplicated);
949 }
950
951 #[test]
952 fn differing_origins_with_without_resolver() {
953 // Create a regular context resolver, without any origin tags resolver, which should result in contexts being
954 // the same so long as the name and tags are the same, disregarding any difference in origin information:
955 let mut resolver = ContextResolverBuilder::for_tests().build();
956
957 let name = "metric_name";
958 let tags = ["tag1"];
959 let mut origin1 = RawOrigin::default();
960 origin1.set_container_id("container1");
961 let mut origin2 = RawOrigin::default();
962 origin2.set_container_id("container2");
963
964 let context1 = resolver
965 .resolve(name, &tags[..], Some(origin1.clone()))
966 .expect("should not fail to resolve");
967 let context2 = resolver
968 .resolve(name, &tags[..], Some(origin2.clone()))
969 .expect("should not fail to resolve");
970
971 assert_eq!(context1, context2);
972
973 let tags_resolver = TagsResolverBuilder::for_tests()
974 .with_origin_tags_resolver(Some(Arc::new(DummyOriginTagsResolver)))
975 .build();
976 // Now build a context resolver with an origin tags resolver that trivially returns the hash of the origin info
977 // as a tag, which should result in differeing sets of origin tags between the two origins, thus no longer
978 // comparing as equal:
979 let mut resolver = ContextResolverBuilder::for_tests()
980 .with_tags_resolver(Some(tags_resolver))
981 .build();
982
983 let context1 = resolver
984 .resolve(name, &tags[..], Some(origin1))
985 .expect("should not fail to resolve");
986 let context2 = resolver
987 .resolve(name, &tags[..], Some(origin2))
988 .expect("should not fail to resolve");
989
990 assert_ne!(context1, context2);
991 }
992
993 #[test]
994 fn caching_disabled() {
995 let tags_resolver = TagsResolverBuilder::for_tests()
996 .with_origin_tags_resolver(Some(Arc::new(DummyOriginTagsResolver)))
997 .build();
998 let mut resolver = ContextResolverBuilder::for_tests()
999 .without_caching()
1000 .with_tags_resolver(Some(tags_resolver))
1001 .build();
1002
1003 let name = "metric_name";
1004 let tags = ["tag1"];
1005 let mut origin1 = RawOrigin::default();
1006 origin1.set_container_id("container1");
1007
1008 // Create a context with caching disabled, and verify that the context is not cached:
1009 let context1 = resolver
1010 .resolve(name, &tags[..], Some(origin1.clone()))
1011 .expect("should not fail to resolve");
1012 assert_eq!(resolver.context_cache.len(), 0);
1013
1014 // Create a second context with the same name and tags, and verify that it is not cached:
1015 let context2 = resolver
1016 .resolve(name, &tags[..], Some(origin1))
1017 .expect("should not fail to resolve");
1018 assert_eq!(resolver.context_cache.len(), 0);
1019
1020 // The contexts should be equal to each other, but the underlying `Arc` pointers should be different since
1021 // they're two distinct contexts in terms of not being cached:
1022 assert_eq!(context1, context2);
1023 assert!(!context1.ptr_eq(&context2));
1024 }
1025
1026 #[test]
1027 fn cheaply_cloneable_name_and_tags() {
1028 const BIG_TAG_ONE: &str = "long-tag-that-cannot-be-inlined-just-to-be-doubly-sure-on-top-of-being-static";
1029 const BIG_TAG_TWO: &str = "another-long-boye-that-we-are-also-sure-wont-be-inlined-and-we-stand-on-that";
1030
1031 // Create a context resolver with a proper string interner configured:
1032 let mut resolver = ContextResolverBuilder::for_tests()
1033 .with_interner_capacity_bytes(NonZeroUsize::new(1024).expect("not zero"))
1034 .build();
1035
1036 // Create our context with cheaply cloneable tags, aka static strings:
1037 let name = MetaString::from_static("long-metric-name-that-shouldnt-be-inlined-and-should-end-up-interned");
1038 let tags = [
1039 MetaString::from_static(BIG_TAG_ONE),
1040 MetaString::from_static(BIG_TAG_TWO),
1041 ];
1042 assert!(tags[0].is_cheaply_cloneable());
1043 assert!(tags[1].is_cheaply_cloneable());
1044
1045 // Make sure the interner is empty before we resolve the context, and that it's empty afterwards, since we
1046 // should be able to cheaply clone both the metric name and both tags:
1047 assert_eq!(resolver.interner.len(), 0);
1048 assert_eq!(resolver.interner.len_bytes(), 0);
1049
1050 let context = resolver
1051 .resolve(&name, &tags[..], None)
1052 .expect("should not fail to resolve");
1053 assert_eq!(resolver.interner.len(), 0);
1054 assert_eq!(resolver.interner.len_bytes(), 0);
1055
1056 // And just a sanity check that we have the expected name and tags in the context:
1057 assert_eq!(context.name(), &name);
1058
1059 let context_tags = context.tags();
1060 assert_eq!(context_tags.len(), 2);
1061 assert!(context_tags.has_tag(&tags[0]));
1062 assert!(context_tags.has_tag(&tags[1]));
1063 }
1064}