Skip to main content

saluki_core/observability/
metrics.rs

1//! Internal metrics support.
2use std::{
3    num::NonZeroUsize,
4    pin::Pin,
5    sync::{atomic::Ordering, Arc, LazyLock, Mutex, OnceLock},
6    task::{self, ready, Poll},
7    time::Duration,
8};
9
10use async_trait::async_trait;
11use futures::Stream;
12use metrics::{
13    Counter, Gauge, Histogram, Key, KeyName, Level, Metadata, Recorder, SetRecorderError, SharedString, Unit,
14};
15use metrics_util::registry::{AtomicStorage, Registry};
16use saluki_common::collections::{FastConcurrentHashMap, FastHashMap};
17use saluki_context::{
18    origin::RawOrigin,
19    tags::{Tag, TagSet},
20    Context, ContextResolver, ContextResolverBuilder,
21};
22use saluki_error::GenericError;
23use tokio::{
24    select,
25    sync::broadcast::{self, error::RecvError, Receiver},
26};
27use tokio_util::sync::ReusableBoxFuture;
28use tracing::debug;
29
30use crate::{
31    data_model::event::{metric::*, Event},
32    runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
33};
34
35const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
36const INTERNAL_METRICS_INTERNER_SIZE: NonZeroUsize = NonZeroUsize::new(16_384).unwrap();
37
38static RECEIVER_STATE: OnceLock<Arc<State>> = OnceLock::new();
39
40/// A collection of events that can be cheaply cloned and shared between multiple consumers.
41pub type SharedEvents = Arc<Vec<Event>>;
42
43/// Handle to the metrics filter.
44///
45/// Allows for overriding the current metrics filter level, which influences which metrics are emitted to downstream
46/// receivers.
47pub struct FilterHandle {
48    state: Arc<State>,
49}
50
51impl FilterHandle {
52    /// Overrides the current metrics filter level.
53    pub fn override_filter(&self, level: Level) {
54        *self.state.current_level.lock().unwrap() = Some(level);
55    }
56
57    /// Resets the metrics filter level to the default (INFO).
58    pub fn reset_filter(&self) {
59        *self.state.current_level.lock().unwrap() = None;
60    }
61}
62
63struct State {
64    registry: Registry<Key, AtomicStorage>,
65    level_map: FastConcurrentHashMap<Key, Level>,
66    flush_tx: broadcast::Sender<SharedEvents>,
67    metrics_prefix: String,
68    current_level: Mutex<Option<Level>>,
69}
70
71impl State {
72    fn is_metric_filtered(&self, key: &Key, filter_level: &Level) -> bool {
73        match self.level_map.pin().get(key) {
74            // We have to know about a metric to be sure it's allowed.
75            None => true,
76
77            // Higher verbosity levels have lower values, so if the metric's level is lower than our filter level, that
78            // means we're actively filtering it out.
79            Some(level) => level < filter_level,
80        }
81    }
82}
83
84struct MetricsRecorder {
85    state: Arc<State>,
86}
87
88impl MetricsRecorder {
89    fn new(metrics_prefix: String) -> Self {
90        let (flush_tx, _) = broadcast::channel(2);
91        Self {
92            state: Arc::new(State {
93                registry: Registry::new(AtomicStorage),
94                level_map: FastConcurrentHashMap::default(),
95                flush_tx,
96                metrics_prefix,
97                current_level: Mutex::new(None),
98            }),
99        }
100    }
101
102    fn filter_handle(&self) -> FilterHandle {
103        FilterHandle {
104            state: Arc::clone(&self.state),
105        }
106    }
107
108    fn install(self) -> Result<(), SetRecorderError<Self>> {
109        let state = Arc::clone(&self.state);
110        metrics::set_global_recorder(self)?;
111
112        if RECEIVER_STATE.set(state).is_err() {
113            panic!("metrics receiver should never be set prior to global recorder being installed");
114        }
115
116        Ok(())
117    }
118
119    fn prefix_key(&self, key: &Key) -> Key {
120        Key::from_parts(format!("{}.{}", self.state.metrics_prefix, key.name()), key.labels())
121    }
122}
123
124impl Recorder for MetricsRecorder {
125    fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
126    fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
127    fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
128
129    fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
130        let prefixed_key = self.prefix_key(key);
131        let handle = self
132            .state
133            .registry
134            .get_or_create_counter(&prefixed_key, |c| c.clone().into());
135        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
136
137        handle
138    }
139
140    fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
141        let prefixed_key = self.prefix_key(key);
142        let handle = self
143            .state
144            .registry
145            .get_or_create_gauge(&prefixed_key, |g| g.clone().into());
146        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
147
148        handle
149    }
150
151    fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
152        let prefixed_key = self.prefix_key(key);
153        let handle = self
154            .state
155            .registry
156            .get_or_create_histogram(&prefixed_key, |h| h.clone().into());
157        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
158
159        handle
160    }
161}
162
163/// Internal metrics stream
164///
165/// Used to receive periodic snapshots of the internal metrics registry, which contains all metrics that are currently
166/// active within the process.
167pub struct MetricsStream {
168    inner: ReusableBoxFuture<'static, (Result<SharedEvents, RecvError>, Receiver<SharedEvents>)>,
169}
170
171impl MetricsStream {
172    /// Creates a new `MetricsStream` that receives updates from the internal metrics registry.
173    pub fn register() -> Self {
174        let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
175        Self {
176            inner: ReusableBoxFuture::new(make_rx_future(state.flush_tx.subscribe())),
177        }
178    }
179}
180
181impl Stream for MetricsStream {
182    type Item = SharedEvents;
183
184    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
185        loop {
186            // Poll the receiver, and rearm the future once we actually resolve it.
187            let (result, rx) = ready!(self.inner.poll(cx));
188            self.inner.set(make_rx_future(rx));
189
190            match result {
191                Ok(item) => return Poll::Ready(Some(item)),
192                Err(RecvError::Closed) => return Poll::Ready(None),
193                Err(RecvError::Lagged(n)) => {
194                    debug!(
195                        missed_payloads = n,
196                        "Stream lagging behind internal metrics producer. Internal metrics may have been lost."
197                    );
198                    continue;
199                }
200            }
201        }
202    }
203}
204
205async fn make_rx_future(mut rx: Receiver<SharedEvents>) -> (Result<SharedEvents, RecvError>, Receiver<SharedEvents>) {
206    let result = rx.recv().await;
207    (result, rx)
208}
209
210async fn flush_metrics(flush_interval: Duration) {
211    let mut context_resolver = MetricsContextResolver::new(INTERNAL_METRICS_INTERNER_SIZE);
212
213    let mut flush_interval = tokio::time::interval(flush_interval);
214    flush_interval.tick().await;
215
216    let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
217
218    let mut histogram_samples = Vec::<f64>::new();
219
220    loop {
221        flush_interval.tick().await;
222
223        // If we have no downstream listeners, just clear our histograms so they don't accumulate memory forever.
224        if state.flush_tx.receiver_count() == 0 {
225            let histograms = state.registry.get_histogram_handles();
226            for (_, histogram) in histograms {
227                histogram.clear();
228            }
229            continue;
230        }
231
232        let mut metrics = Vec::new();
233        let current_level = {
234            let current_level = state.current_level.lock().unwrap();
235            current_level.as_ref().copied().unwrap_or(Level::TRACE)
236        };
237
238        let counters = state.registry.get_counter_handles();
239        let gauges = state.registry.get_gauge_handles();
240        let histograms = state.registry.get_histogram_handles();
241
242        for (key, counter) in counters {
243            if state.is_metric_filtered(&key, &current_level) {
244                continue;
245            }
246
247            let context = context_resolver.resolve_from_key(key);
248            let value = counter.swap(0, Ordering::Relaxed) as f64;
249
250            let metric = Metric::counter(context, value);
251            metrics.push(Event::Metric(metric));
252        }
253
254        for (key, gauge) in gauges {
255            if state.is_metric_filtered(&key, &current_level) {
256                continue;
257            }
258
259            let context = context_resolver.resolve_from_key(key);
260            let value = f64::from_bits(gauge.load(Ordering::Relaxed));
261
262            let metric = Metric::gauge(context, value);
263            metrics.push(Event::Metric(metric));
264        }
265
266        for (key, histogram) in histograms {
267            if state.is_metric_filtered(&key, &current_level) {
268                continue;
269            }
270
271            let context = context_resolver.resolve_from_key(key);
272
273            // Collect all of the samples from the histogram.
274            //
275            // If the histogram was empty, skip emitting a metric for this histogram entirely. Empty sketches don't make
276            // sense to send.
277            histogram_samples.clear();
278            histogram.clear_with(|samples| histogram_samples.extend(samples));
279
280            if histogram_samples.is_empty() {
281                continue;
282            }
283
284            let metric = Metric::histogram(context, &histogram_samples[..]);
285            metrics.push(Event::Metric(metric));
286        }
287
288        let shared = Arc::new(metrics);
289        let _ = state.flush_tx.send(shared);
290    }
291}
292
293struct MetricsContextResolver {
294    context_resolver: ContextResolver,
295    key_context_cache: FastHashMap<Key, Context>,
296}
297
298impl MetricsContextResolver {
299    fn new(resolver_interner_size_bytes: NonZeroUsize) -> Self {
300        Self {
301            // Set up our context resolver without caching, since we will be caching the contexts ourselves.
302            context_resolver: ContextResolverBuilder::from_name("core/internal_metrics")
303                .expect("resolver name is not empty")
304                .with_interner_capacity_bytes(resolver_interner_size_bytes)
305                .without_caching()
306                .build(),
307            key_context_cache: FastHashMap::default(),
308        }
309    }
310
311    fn resolve_from_key(&mut self, key: Key) -> Context {
312        static SELF_ORIGIN_INFO: LazyLock<RawOrigin<'static>> = LazyLock::new(|| {
313            let mut origin_info = RawOrigin::default();
314            origin_info.set_process_id(std::process::id());
315            origin_info
316        });
317
318        // Check the cache first.
319        if let Some(context) = self.key_context_cache.get(&key) {
320            return context.clone();
321        }
322
323        // We don't have the context cached, so we need to resolve it.
324        let tags = key
325            .labels()
326            .map(|l| Tag::from(format!("{}:{}", l.key(), l.value())))
327            .collect::<TagSet>();
328
329        let context = self
330            .context_resolver
331            .resolve(key.name(), &tags, Some(SELF_ORIGIN_INFO.clone()))
332            .expect("resolver should always allow falling back");
333
334        self.key_context_cache.insert(key, context.clone());
335        context
336    }
337}
338
339/// Initializes the metrics subsystem with the given metrics prefix.
340///
341/// Returns a [`FilterHandle`] for adjusting the runtime metrics filter, plus a [`MetricsFlusherWorker`]
342/// that must be added to a [`Supervisor`][crate::runtime::Supervisor] in order to drive the periodic
343/// flush loop. Internal metrics are not propagated to subscribers until the worker is running.
344///
345/// # Errors
346///
347/// If a global recorder was already installed, an error will be returned.
348pub async fn initialize_metrics(metrics_prefix: String) -> Result<(FilterHandle, MetricsFlusherWorker), GenericError> {
349    let recorder = MetricsRecorder::new(metrics_prefix);
350    let filter_handle = recorder.filter_handle();
351    recorder.install()?;
352
353    Ok((filter_handle, MetricsFlusherWorker))
354}
355
356/// A worker that periodically flushes the internal metrics registry to broadcast subscribers.
357///
358/// Wraps the internal flush loop and runs it under a [`Supervisor`][crate::runtime::Supervisor]. Must
359/// only be added to a supervisor after [`initialize_metrics`] has been called -- the flush loop
360/// reads from the global recorder state set by that call.
361pub struct MetricsFlusherWorker;
362
363#[async_trait]
364impl Supervisable for MetricsFlusherWorker {
365    fn name(&self) -> &str {
366        "internal-telemetry-metrics-flusher"
367    }
368
369    async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
370        Ok(Box::pin(async move {
371            select! {
372                _ = flush_metrics(FLUSH_INTERVAL) => {},
373                _ = process_shutdown.wait_for_shutdown() => {},
374            }
375            Ok(())
376        }))
377    }
378}