Skip to main content

saluki_core/observability/metrics/
mod.rs

1//! Internal metrics support.
2//!
3//! Includes the [`MetricsStream`] broadcast of internally emitted metrics events and the
4//! [`Reflector`]-based [`AggregatedMetricsState`] view that downstream callers query for
5//! Prometheus exposition.
6
7use std::{
8    num::NonZeroUsize,
9    pin::Pin,
10    sync::{atomic::Ordering, Arc, LazyLock, Mutex, OnceLock},
11    task::{self, ready, Poll},
12    time::Duration,
13};
14
15use async_trait::async_trait;
16use futures::Stream;
17use metrics::{
18    Counter, Gauge, Histogram, Key, KeyName, Level, Metadata, Recorder, SetRecorderError, SharedString, Unit,
19};
20use metrics_util::registry::{AtomicStorage, Registry};
21use saluki_common::collections::{FastConcurrentHashMap, FastHashMap};
22use saluki_context::{
23    origin::RawOrigin,
24    tags::{Tag, TagSet},
25    Context, ContextResolver, ContextResolverBuilder,
26};
27use saluki_error::GenericError;
28use tokio::{
29    select,
30    sync::broadcast::{self, error::RecvError, Receiver},
31};
32use tokio_util::sync::ReusableBoxFuture;
33use tracing::debug;
34
35use crate::{
36    data_model::event::{metric::*, Event},
37    runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
38};
39
40mod aggregated;
41pub use self::aggregated::{
42    get_shared_metrics_state, AggregatedMetricValue, AggregatedMetricsProcessor, AggregatedMetricsState,
43};
44
45mod histogram;
46pub use self::histogram::AggregatedHistogram;
47
48mod processor;
49pub use self::processor::TelemetryProcessor;
50
51mod reflector;
52pub use self::reflector::{Processor, Reflector};
53
54mod remapper;
55pub use self::remapper::{RemappedMetric, RemapperRule};
56
57const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
58const INTERNAL_METRICS_INTERNER_SIZE: NonZeroUsize = NonZeroUsize::new(16_384).unwrap();
59
60static RECEIVER_STATE: OnceLock<Arc<State>> = OnceLock::new();
61
62/// A collection of events that can be cheaply cloned and shared between multiple consumers.
63pub type SharedEvents = Arc<Vec<Event>>;
64
65/// Handle to the metrics filter.
66///
67/// Allows for overriding the current metrics filter level, which influences which metrics are emitted to downstream
68/// receivers.
69pub struct FilterHandle {
70    state: Arc<State>,
71}
72
73impl FilterHandle {
74    /// Overrides the current metrics filter level.
75    pub fn override_filter(&self, level: Level) {
76        *self.state.current_level.lock().unwrap() = level;
77    }
78
79    /// Resets the metrics filter level to the default that was configured when the metrics subsystem was initialized.
80    pub fn reset_filter(&self) {
81        *self.state.current_level.lock().unwrap() = self.state.default_level;
82    }
83}
84
85struct State {
86    registry: Registry<Key, AtomicStorage>,
87    level_map: FastConcurrentHashMap<Key, Level>,
88    flush_tx: broadcast::Sender<SharedEvents>,
89    metrics_prefix: String,
90    default_level: Level,
91    current_level: Mutex<Level>,
92}
93
94impl State {
95    fn is_metric_filtered(&self, key: &Key, filter_level: &Level) -> bool {
96        match self.level_map.pin().get(key) {
97            // We have to know about a metric to be sure it's allowed.
98            None => true,
99
100            // Higher verbosity levels have lower values, so if the metric's level is lower than our filter level, that
101            // means we're actively filtering it out.
102            Some(level) => level < filter_level,
103        }
104    }
105}
106
107struct MetricsRecorder {
108    state: Arc<State>,
109}
110
111impl MetricsRecorder {
112    fn new(metrics_prefix: String, default_level: Level) -> Self {
113        let (flush_tx, _) = broadcast::channel(2);
114        Self {
115            state: Arc::new(State {
116                registry: Registry::new(AtomicStorage),
117                level_map: FastConcurrentHashMap::default(),
118                flush_tx,
119                metrics_prefix,
120                default_level,
121                current_level: Mutex::new(default_level),
122            }),
123        }
124    }
125
126    fn filter_handle(&self) -> FilterHandle {
127        FilterHandle {
128            state: Arc::clone(&self.state),
129        }
130    }
131
132    fn install(self) -> Result<(), SetRecorderError<Self>> {
133        let state = Arc::clone(&self.state);
134        metrics::set_global_recorder(self)?;
135
136        if RECEIVER_STATE.set(state).is_err() {
137            panic!("metrics receiver should never be set prior to global recorder being installed");
138        }
139
140        Ok(())
141    }
142
143    fn prefix_key(&self, key: &Key) -> Key {
144        Key::from_parts(format!("{}.{}", self.state.metrics_prefix, key.name()), key.labels())
145    }
146}
147
148impl Recorder for MetricsRecorder {
149    fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
150    fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
151    fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
152
153    fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
154        let prefixed_key = self.prefix_key(key);
155        let handle = self
156            .state
157            .registry
158            .get_or_create_counter(&prefixed_key, |c| c.clone().into());
159        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
160
161        handle
162    }
163
164    fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
165        let prefixed_key = self.prefix_key(key);
166        let handle = self
167            .state
168            .registry
169            .get_or_create_gauge(&prefixed_key, |g| g.clone().into());
170        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
171
172        handle
173    }
174
175    fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
176        let prefixed_key = self.prefix_key(key);
177        let handle = self
178            .state
179            .registry
180            .get_or_create_histogram(&prefixed_key, |h| h.clone().into());
181        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
182
183        handle
184    }
185}
186
187/// Internal metrics stream
188///
189/// Used to receive periodic snapshots of the internal metrics registry, which contains all metrics that are currently
190/// active within the process.
191pub struct MetricsStream {
192    inner: ReusableBoxFuture<'static, (Result<SharedEvents, RecvError>, Receiver<SharedEvents>)>,
193}
194
195impl MetricsStream {
196    /// Creates a new `MetricsStream` that receives updates from the internal metrics registry.
197    pub fn register() -> Self {
198        let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
199        Self {
200            inner: ReusableBoxFuture::new(make_rx_future(state.flush_tx.subscribe())),
201        }
202    }
203}
204
205impl Stream for MetricsStream {
206    type Item = SharedEvents;
207
208    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
209        loop {
210            // Poll the receiver, and rearm the future once we actually resolve it.
211            let (result, rx) = ready!(self.inner.poll(cx));
212            self.inner.set(make_rx_future(rx));
213
214            match result {
215                Ok(item) => return Poll::Ready(Some(item)),
216                Err(RecvError::Closed) => return Poll::Ready(None),
217                Err(RecvError::Lagged(n)) => {
218                    debug!(
219                        missed_payloads = n,
220                        "Stream lagging behind internal metrics producer. Internal metrics may have been lost."
221                    );
222                    continue;
223                }
224            }
225        }
226    }
227}
228
229async fn make_rx_future(mut rx: Receiver<SharedEvents>) -> (Result<SharedEvents, RecvError>, Receiver<SharedEvents>) {
230    let result = rx.recv().await;
231    (result, rx)
232}
233
234async fn flush_metrics(flush_interval: Duration) {
235    let mut context_resolver = MetricsContextResolver::new(INTERNAL_METRICS_INTERNER_SIZE);
236
237    let mut flush_interval = tokio::time::interval(flush_interval);
238    flush_interval.tick().await;
239
240    let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
241
242    let mut histogram_samples = Vec::<f64>::new();
243
244    loop {
245        flush_interval.tick().await;
246
247        // If we have no downstream listeners, clear our counters and histograms to keep them in a quieseced state.
248        if state.flush_tx.receiver_count() == 0 {
249            let counters = state.registry.get_counter_handles();
250            for (_, counter) in counters {
251                counter.swap(0, Ordering::Relaxed);
252            }
253
254            let histograms = state.registry.get_histogram_handles();
255            for (_, histogram) in histograms {
256                histogram.clear();
257            }
258
259            continue;
260        }
261
262        let mut metrics = Vec::new();
263        let current_level = *state.current_level.lock().unwrap();
264
265        let counters = state.registry.get_counter_handles();
266        let gauges = state.registry.get_gauge_handles();
267        let histograms = state.registry.get_histogram_handles();
268
269        // For all metric types, we take their value (in a consuming fashion) _before_ checking if they're filtered.
270        //
271        // This lets us avoid emitting large, accumulated values for counters the first time they pass the filter check,
272        // and likewise, it avoids endless accumulation of histogram values, which translates to actual allocations
273        // behind the scenes.
274
275        for (key, counter) in counters {
276            let value = counter.swap(0, Ordering::Relaxed) as f64;
277
278            if state.is_metric_filtered(&key, &current_level) {
279                continue;
280            }
281
282            let context = context_resolver.resolve_from_key(key);
283            let metric = Metric::counter(context, value);
284            metrics.push(Event::Metric(metric));
285        }
286
287        for (key, gauge) in gauges {
288            let value = f64::from_bits(gauge.load(Ordering::Relaxed));
289
290            if state.is_metric_filtered(&key, &current_level) {
291                continue;
292            }
293
294            let context = context_resolver.resolve_from_key(key);
295            let metric = Metric::gauge(context, value);
296            metrics.push(Event::Metric(metric));
297        }
298
299        for (key, histogram) in histograms {
300            histogram_samples.clear();
301            histogram.clear_with(|samples| histogram_samples.extend(samples));
302
303            if state.is_metric_filtered(&key, &current_level) {
304                continue;
305            }
306
307            let context = context_resolver.resolve_from_key(key);
308            let metric = Metric::histogram(context, &histogram_samples[..]);
309            metrics.push(Event::Metric(metric));
310        }
311
312        let shared = Arc::new(metrics);
313        let _ = state.flush_tx.send(shared);
314    }
315}
316
317struct MetricsContextResolver {
318    context_resolver: ContextResolver,
319    key_context_cache: FastHashMap<Key, Context>,
320}
321
322impl MetricsContextResolver {
323    fn new(resolver_interner_size_bytes: NonZeroUsize) -> Self {
324        Self {
325            // Set up our context resolver without caching, since we will be caching the contexts ourselves.
326            context_resolver: ContextResolverBuilder::from_name("core/internal_metrics")
327                .expect("resolver name is not empty")
328                .with_interner_capacity_bytes(resolver_interner_size_bytes)
329                .without_caching()
330                .build(),
331            key_context_cache: FastHashMap::default(),
332        }
333    }
334
335    fn resolve_from_key(&mut self, key: Key) -> Context {
336        static SELF_ORIGIN_INFO: LazyLock<RawOrigin<'static>> = LazyLock::new(|| {
337            let mut origin_info = RawOrigin::default();
338            origin_info.set_process_id(std::process::id());
339            origin_info
340        });
341
342        // Check the cache first.
343        if let Some(context) = self.key_context_cache.get(&key) {
344            return context.clone();
345        }
346
347        // We don't have the context cached, so we need to resolve it.
348        let tags = key
349            .labels()
350            .map(|l| Tag::from(format!("{}:{}", l.key(), l.value())))
351            .collect::<TagSet>();
352
353        let context = self
354            .context_resolver
355            .resolve(key.name(), &tags, Some(SELF_ORIGIN_INFO.clone()))
356            .expect("resolver should always allow falling back");
357
358        self.key_context_cache.insert(key, context.clone());
359        context
360    }
361}
362
363/// Initializes the metrics subsystem with the given metrics prefix and default filter level.
364///
365/// `default_level` sets the initial filter level for emitted metrics, and is also what the filter is restored to when
366/// [`FilterHandle::reset_filter`] is invoked. Metrics whose level is more verbose than this default are filtered out
367/// until a runtime override is applied via [`FilterHandle::override_filter`].
368///
369/// Returns a [`FilterHandle`] for adjusting the runtime metrics filter, plus a [`MetricsFlusherWorker`]
370/// that must be added to a [`Supervisor`][crate::runtime::Supervisor] in order to drive the periodic
371/// flush loop. Internal metrics aren't propagated to subscribers until the worker is running.
372///
373/// # Errors
374///
375/// If a global recorder was already installed, an error will be returned.
376pub async fn initialize_metrics(
377    metrics_prefix: String, default_level: Level,
378) -> Result<(FilterHandle, MetricsFlusherWorker), GenericError> {
379    let recorder = MetricsRecorder::new(metrics_prefix, default_level);
380    let filter_handle = recorder.filter_handle();
381    recorder.install()?;
382
383    Ok((filter_handle, MetricsFlusherWorker))
384}
385
386/// A worker that periodically flushes the internal metrics registry to broadcast subscribers.
387///
388/// Wraps the internal flush loop and runs it under a [`Supervisor`][crate::runtime::Supervisor]. Must
389/// only be added to a supervisor after [`initialize_metrics`] has been called -- the flush loop
390/// reads from the global recorder state set by that call.
391pub struct MetricsFlusherWorker;
392
393#[async_trait]
394impl Supervisable for MetricsFlusherWorker {
395    fn name(&self) -> &str {
396        "internal-telemetry-metrics-flusher"
397    }
398
399    async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
400        Ok(Box::pin(async move {
401            select! {
402                _ = flush_metrics(FLUSH_INTERVAL) => {},
403                _ = process_shutdown.wait_for_shutdown() => {},
404            }
405            Ok(())
406        }))
407    }
408}