Skip to main content

saluki_core/observability/metrics/
mod.rs

1//! Internal metrics support.
2//!
3//! Includes the [`MetricsStream`] broadcast of internally emitted metrics events and the
4//! [`Reflector`]-based [`AggregatedMetricsState`] view that downstream callers query for
5//! Prometheus exposition.
6
7use std::{
8    num::NonZeroUsize,
9    pin::Pin,
10    sync::{atomic::Ordering, Arc, LazyLock, Mutex, OnceLock},
11    task::{self, ready, Poll},
12    time::Duration,
13};
14
15use async_trait::async_trait;
16use futures::Stream;
17use metrics::{
18    Counter, Gauge, Histogram, Key, KeyName, Level, Metadata, Recorder, SetRecorderError, SharedString, Unit,
19};
20use metrics_util::registry::{AtomicStorage, Registry};
21use saluki_common::{
22    collections::{FastConcurrentHashMap, FastHashMap},
23    sync::shutdown::ShutdownHandle,
24};
25use saluki_context::{
26    origin::RawOrigin,
27    tags::{Tag, TagSet},
28    Context, ContextResolver, ContextResolverBuilder,
29};
30use saluki_error::GenericError;
31use tokio::{
32    select,
33    sync::broadcast::{self, error::RecvError, Receiver},
34};
35use tokio_util::sync::ReusableBoxFuture;
36use tracing::debug;
37
38use crate::{
39    data_model::event::{metric::*, Event},
40    runtime::{InitializationError, Supervisable, SupervisorFuture},
41};
42
43mod aggregated;
44pub use self::aggregated::{
45    get_shared_metrics_state, AggregatedMetricValue, AggregatedMetricsProcessor, AggregatedMetricsState,
46};
47
48mod histogram;
49pub use self::histogram::AggregatedHistogram;
50
51mod processor;
52pub use self::processor::TelemetryProcessor;
53
54mod reflector;
55pub use self::reflector::{Processor, Reflector};
56
57mod remapper;
58pub use self::remapper::{RemappedMetric, RemapperRule};
59
60const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
61const INTERNAL_METRICS_INTERNER_SIZE: NonZeroUsize = NonZeroUsize::new(16_384).unwrap();
62
63static RECEIVER_STATE: OnceLock<Arc<State>> = OnceLock::new();
64
65/// A collection of events that can be cheaply cloned and shared between multiple consumers.
66pub type SharedEvents = Arc<Vec<Event>>;
67
68/// Handle to the metrics filter.
69///
70/// Allows for overriding the current metrics filter level, which influences which metrics are emitted to downstream
71/// receivers.
72pub struct FilterHandle {
73    state: Arc<State>,
74}
75
76impl FilterHandle {
77    /// Overrides the current metrics filter level.
78    pub fn override_filter(&self, level: Level) {
79        *self.state.current_level.lock().unwrap() = level;
80    }
81
82    /// Resets the metrics filter level to the default that was configured when the metrics subsystem was initialized.
83    pub fn reset_filter(&self) {
84        *self.state.current_level.lock().unwrap() = self.state.default_level;
85    }
86}
87
88struct State {
89    registry: Registry<Key, AtomicStorage>,
90    level_map: FastConcurrentHashMap<Key, Level>,
91    flush_tx: broadcast::Sender<SharedEvents>,
92    metrics_prefix: String,
93    default_level: Level,
94    current_level: Mutex<Level>,
95}
96
97impl State {
98    fn is_metric_filtered(&self, key: &Key, filter_level: &Level) -> bool {
99        match self.level_map.pin().get(key) {
100            // We have to know about a metric to be sure it's allowed.
101            None => true,
102
103            // Higher verbosity levels have lower values, so if the metric's level is lower than our filter level, that
104            // means we're actively filtering it out.
105            Some(level) => level < filter_level,
106        }
107    }
108}
109
110struct MetricsRecorder {
111    state: Arc<State>,
112}
113
114impl MetricsRecorder {
115    fn new(metrics_prefix: String, default_level: Level) -> Self {
116        let (flush_tx, _) = broadcast::channel(2);
117        Self {
118            state: Arc::new(State {
119                registry: Registry::new(AtomicStorage),
120                level_map: FastConcurrentHashMap::default(),
121                flush_tx,
122                metrics_prefix,
123                default_level,
124                current_level: Mutex::new(default_level),
125            }),
126        }
127    }
128
129    fn filter_handle(&self) -> FilterHandle {
130        FilterHandle {
131            state: Arc::clone(&self.state),
132        }
133    }
134
135    fn install(self) -> Result<(), SetRecorderError<Self>> {
136        let state = Arc::clone(&self.state);
137        metrics::set_global_recorder(self)?;
138
139        if RECEIVER_STATE.set(state).is_err() {
140            panic!("metrics receiver should never be set prior to global recorder being installed");
141        }
142
143        Ok(())
144    }
145
146    fn prefix_key(&self, key: &Key) -> Key {
147        Key::from_parts(format!("{}.{}", self.state.metrics_prefix, key.name()), key.labels())
148    }
149}
150
151impl Recorder for MetricsRecorder {
152    fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
153    fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
154    fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
155
156    fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
157        let prefixed_key = self.prefix_key(key);
158        let handle = self
159            .state
160            .registry
161            .get_or_create_counter(&prefixed_key, |c| c.clone().into());
162        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
163
164        handle
165    }
166
167    fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
168        let prefixed_key = self.prefix_key(key);
169        let handle = self
170            .state
171            .registry
172            .get_or_create_gauge(&prefixed_key, |g| g.clone().into());
173        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
174
175        handle
176    }
177
178    fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
179        let prefixed_key = self.prefix_key(key);
180        let handle = self
181            .state
182            .registry
183            .get_or_create_histogram(&prefixed_key, |h| h.clone().into());
184        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
185
186        handle
187    }
188}
189
190/// Internal metrics stream
191///
192/// Used to receive periodic snapshots of the internal metrics registry, which contains all metrics that are currently
193/// active within the process.
194pub struct MetricsStream {
195    inner: ReusableBoxFuture<'static, (Result<SharedEvents, RecvError>, Receiver<SharedEvents>)>,
196}
197
198impl MetricsStream {
199    /// Creates a new `MetricsStream` that receives updates from the internal metrics registry.
200    pub fn register() -> Self {
201        let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
202        Self {
203            inner: ReusableBoxFuture::new(make_rx_future(state.flush_tx.subscribe())),
204        }
205    }
206}
207
208impl Stream for MetricsStream {
209    type Item = SharedEvents;
210
211    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
212        loop {
213            // Poll the receiver, and rearm the future once we actually resolve it.
214            let (result, rx) = ready!(self.inner.poll(cx));
215            self.inner.set(make_rx_future(rx));
216
217            match result {
218                Ok(item) => return Poll::Ready(Some(item)),
219                Err(RecvError::Closed) => return Poll::Ready(None),
220                Err(RecvError::Lagged(n)) => {
221                    debug!(
222                        missed_payloads = n,
223                        "Stream lagging behind internal metrics producer. Internal metrics may have been lost."
224                    );
225                    continue;
226                }
227            }
228        }
229    }
230}
231
232async fn make_rx_future(mut rx: Receiver<SharedEvents>) -> (Result<SharedEvents, RecvError>, Receiver<SharedEvents>) {
233    let result = rx.recv().await;
234    (result, rx)
235}
236
237async fn flush_metrics(flush_interval: Duration) {
238    let mut context_resolver = MetricsContextResolver::new(INTERNAL_METRICS_INTERNER_SIZE);
239
240    let mut flush_interval = tokio::time::interval(flush_interval);
241    flush_interval.tick().await;
242
243    let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
244
245    let mut histogram_samples = Vec::<f64>::new();
246
247    loop {
248        flush_interval.tick().await;
249
250        // If we have no downstream listeners, clear our counters and histograms to keep them in a quieseced state.
251        if state.flush_tx.receiver_count() == 0 {
252            let counters = state.registry.get_counter_handles();
253            for (_, counter) in counters {
254                counter.swap(0, Ordering::Relaxed);
255            }
256
257            let histograms = state.registry.get_histogram_handles();
258            for (_, histogram) in histograms {
259                histogram.clear();
260            }
261
262            continue;
263        }
264
265        let mut metrics = Vec::new();
266        let current_level = *state.current_level.lock().unwrap();
267
268        let counters = state.registry.get_counter_handles();
269        let gauges = state.registry.get_gauge_handles();
270        let histograms = state.registry.get_histogram_handles();
271
272        // For all metric types, we take their value (in a consuming fashion) _before_ checking if they're filtered.
273        //
274        // This lets us avoid emitting large, accumulated values for counters the first time they pass the filter check,
275        // and likewise, it avoids endless accumulation of histogram values, which translates to actual allocations
276        // behind the scenes.
277
278        for (key, counter) in counters {
279            let value = counter.swap(0, Ordering::Relaxed) as f64;
280
281            if state.is_metric_filtered(&key, &current_level) {
282                continue;
283            }
284
285            let context = context_resolver.resolve_from_key(key);
286            let metric = Metric::counter(context, value);
287            metrics.push(Event::Metric(metric));
288        }
289
290        for (key, gauge) in gauges {
291            let value = f64::from_bits(gauge.load(Ordering::Relaxed));
292
293            if state.is_metric_filtered(&key, &current_level) {
294                continue;
295            }
296
297            let context = context_resolver.resolve_from_key(key);
298            let metric = Metric::gauge(context, value);
299            metrics.push(Event::Metric(metric));
300        }
301
302        for (key, histogram) in histograms {
303            histogram_samples.clear();
304            histogram.clear_with(|samples| histogram_samples.extend(samples));
305
306            if state.is_metric_filtered(&key, &current_level) {
307                continue;
308            }
309
310            let context = context_resolver.resolve_from_key(key);
311            let metric = Metric::histogram(context, &histogram_samples[..]);
312            metrics.push(Event::Metric(metric));
313        }
314
315        let shared = Arc::new(metrics);
316        let _ = state.flush_tx.send(shared);
317    }
318}
319
320struct MetricsContextResolver {
321    context_resolver: ContextResolver,
322    key_context_cache: FastHashMap<Key, Context>,
323}
324
325impl MetricsContextResolver {
326    fn new(resolver_interner_size_bytes: NonZeroUsize) -> Self {
327        Self {
328            // Set up our context resolver without caching, since we will be caching the contexts ourselves.
329            context_resolver: ContextResolverBuilder::from_name("core/internal_metrics")
330                .expect("resolver name is not empty")
331                .with_interner_capacity_bytes(resolver_interner_size_bytes)
332                .without_caching()
333                .build(),
334            key_context_cache: FastHashMap::default(),
335        }
336    }
337
338    fn resolve_from_key(&mut self, key: Key) -> Context {
339        static SELF_ORIGIN_INFO: LazyLock<RawOrigin<'static>> = LazyLock::new(|| {
340            let mut origin_info = RawOrigin::default();
341            origin_info.set_process_id(std::process::id());
342            origin_info
343        });
344
345        // Check the cache first.
346        if let Some(context) = self.key_context_cache.get(&key) {
347            return context.clone();
348        }
349
350        // We don't have the context cached, so we need to resolve it.
351        let tags = key
352            .labels()
353            .map(|l| Tag::from(format!("{}:{}", l.key(), l.value())))
354            .collect::<TagSet>();
355
356        let context = self
357            .context_resolver
358            .resolve(key.name(), &tags, Some(SELF_ORIGIN_INFO.clone()))
359            .expect("resolver should always allow falling back");
360
361        self.key_context_cache.insert(key, context.clone());
362        context
363    }
364}
365
366/// Initializes the metrics subsystem with the given metrics prefix and default filter level.
367///
368/// `default_level` sets the initial filter level for emitted metrics, and is also what the filter is restored to when
369/// [`FilterHandle::reset_filter`] is invoked. Metrics whose level is more verbose than this default are filtered out
370/// until a runtime override is applied via [`FilterHandle::override_filter`].
371///
372/// Returns a [`FilterHandle`] for adjusting the runtime metrics filter, plus a [`MetricsFlusherWorker`]
373/// that must be added to a [`Supervisor`][crate::runtime::Supervisor] in order to drive the periodic
374/// flush loop. Internal metrics aren't propagated to subscribers until the worker is running.
375///
376/// # Errors
377///
378/// If a global recorder was already installed, an error will be returned.
379pub async fn initialize_metrics(
380    metrics_prefix: String, default_level: Level,
381) -> Result<(FilterHandle, MetricsFlusherWorker), GenericError> {
382    let recorder = MetricsRecorder::new(metrics_prefix, default_level);
383    let filter_handle = recorder.filter_handle();
384    recorder.install()?;
385
386    Ok((filter_handle, MetricsFlusherWorker))
387}
388
389/// A worker that periodically flushes the internal metrics registry to broadcast subscribers.
390///
391/// Wraps the internal flush loop and runs it under a [`Supervisor`][crate::runtime::Supervisor]. Must
392/// only be added to a supervisor after [`initialize_metrics`] has been called -- the flush loop
393/// reads from the global recorder state set by that call.
394pub struct MetricsFlusherWorker;
395
396#[async_trait]
397impl Supervisable for MetricsFlusherWorker {
398    fn name(&self) -> &str {
399        "internal-telemetry-metrics-flusher"
400    }
401
402    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
403        Ok(Box::pin(async move {
404            select! {
405                _ = process_shutdown => {},
406                _ = flush_metrics(FLUSH_INTERVAL) => {},
407            }
408
409            Ok(())
410        }))
411    }
412}