saluki_core/observability/
metrics.rs

1//! Internal metrics support.
2use std::{
3    num::NonZeroUsize,
4    pin::Pin,
5    sync::{atomic::Ordering, Arc, LazyLock, Mutex, OnceLock},
6    task::{self, ready, Poll},
7    time::Duration,
8};
9
10use futures::Stream;
11use metrics::{
12    Counter, Gauge, Histogram, Key, KeyName, Level, Metadata, Recorder, SetRecorderError, SharedString, Unit,
13};
14use metrics_util::registry::{AtomicStorage, Registry};
15use saluki_common::{
16    collections::{FastConcurrentHashMap, FastHashMap},
17    task::spawn_traced_named,
18};
19use saluki_context::{
20    origin::RawOrigin,
21    tags::{Tag, TagSet},
22    Context, ContextResolver, ContextResolverBuilder,
23};
24use saluki_error::GenericError;
25use tokio::sync::broadcast::{self, error::RecvError, Receiver};
26use tokio_util::sync::ReusableBoxFuture;
27use tracing::debug;
28
29use crate::data_model::event::{metric::*, Event};
30
31const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
32const INTERNAL_METRICS_INTERNER_SIZE: NonZeroUsize = NonZeroUsize::new(16_384).unwrap();
33
34static RECEIVER_STATE: OnceLock<Arc<State>> = OnceLock::new();
35
36/// A collection of events that can be cheaply cloned and shared between multiple consumers.
37pub type SharedEvents = Arc<Vec<Event>>;
38
39/// Handle to the metrics filter.
40///
41/// Allows for overriding the current metrics filter level, which influences which metrics are emitted to downstream
42/// receivers.
43pub struct FilterHandle {
44    state: Arc<State>,
45}
46
47impl FilterHandle {
48    /// Overrides the current metrics filter level.
49    pub fn override_filter(&self, level: Level) {
50        *self.state.current_level.lock().unwrap() = Some(level);
51    }
52
53    /// Resets the metrics filter level to the default (INFO).
54    pub fn reset_filter(&self) {
55        *self.state.current_level.lock().unwrap() = None;
56    }
57}
58
59struct State {
60    registry: Registry<Key, AtomicStorage>,
61    level_map: FastConcurrentHashMap<Key, Level>,
62    flush_tx: broadcast::Sender<SharedEvents>,
63    metrics_prefix: String,
64    current_level: Mutex<Option<Level>>,
65}
66
67impl State {
68    fn is_metric_filtered(&self, key: &Key, filter_level: &Level) -> bool {
69        match self.level_map.pin().get(key) {
70            // We have to know about a metric to be sure it's allowed.
71            None => true,
72
73            // Higher verbosity levels have lower values, so if the metric's level is lower than our filter level, that
74            // means we're actively filtering it out.
75            Some(level) => level < filter_level,
76        }
77    }
78}
79
80struct MetricsRecorder {
81    state: Arc<State>,
82}
83
84impl MetricsRecorder {
85    fn new(metrics_prefix: String) -> Self {
86        let (flush_tx, _) = broadcast::channel(2);
87        Self {
88            state: Arc::new(State {
89                registry: Registry::new(AtomicStorage),
90                level_map: FastConcurrentHashMap::default(),
91                flush_tx,
92                metrics_prefix,
93                current_level: Mutex::new(None),
94            }),
95        }
96    }
97
98    fn filter_handle(&self) -> FilterHandle {
99        FilterHandle {
100            state: Arc::clone(&self.state),
101        }
102    }
103
104    fn install(self) -> Result<(), SetRecorderError<Self>> {
105        let state = Arc::clone(&self.state);
106        metrics::set_global_recorder(self)?;
107
108        if RECEIVER_STATE.set(state).is_err() {
109            panic!("metrics receiver should never be set prior to global recorder being installed");
110        }
111
112        Ok(())
113    }
114
115    fn prefix_key(&self, key: &Key) -> Key {
116        Key::from_parts(format!("{}.{}", self.state.metrics_prefix, key.name()), key.labels())
117    }
118}
119
120impl Recorder for MetricsRecorder {
121    fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
122    fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
123    fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
124
125    fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
126        let prefixed_key = self.prefix_key(key);
127        let handle = self
128            .state
129            .registry
130            .get_or_create_counter(&prefixed_key, |c| c.clone().into());
131        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
132
133        handle
134    }
135
136    fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
137        let prefixed_key = self.prefix_key(key);
138        let handle = self
139            .state
140            .registry
141            .get_or_create_gauge(&prefixed_key, |g| g.clone().into());
142        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
143
144        handle
145    }
146
147    fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
148        let prefixed_key = self.prefix_key(key);
149        let handle = self
150            .state
151            .registry
152            .get_or_create_histogram(&prefixed_key, |h| h.clone().into());
153        self.state.level_map.pin().insert(prefixed_key, *metadata.level());
154
155        handle
156    }
157}
158
159/// Internal metrics stream
160///
161/// Used to receive periodic snapshots of the internal metrics registry, which contains all metrics that are currently
162/// active within the process.
163pub struct MetricsStream {
164    inner: ReusableBoxFuture<'static, (Result<SharedEvents, RecvError>, Receiver<SharedEvents>)>,
165}
166
167impl MetricsStream {
168    /// Creates a new `MetricsStream` that receives updates from the internal metrics registry.
169    pub fn register() -> Self {
170        let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
171        Self {
172            inner: ReusableBoxFuture::new(make_rx_future(state.flush_tx.subscribe())),
173        }
174    }
175}
176
177impl Stream for MetricsStream {
178    type Item = SharedEvents;
179
180    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
181        loop {
182            // Poll the receiver, and rearm the future once we actually resolve it.
183            let (result, rx) = ready!(self.inner.poll(cx));
184            self.inner.set(make_rx_future(rx));
185
186            match result {
187                Ok(item) => return Poll::Ready(Some(item)),
188                Err(RecvError::Closed) => return Poll::Ready(None),
189                Err(RecvError::Lagged(n)) => {
190                    debug!(
191                        missed_payloads = n,
192                        "Stream lagging behind internal metrics producer. Internal metrics may have been lost."
193                    );
194                    continue;
195                }
196            }
197        }
198    }
199}
200
201async fn make_rx_future(mut rx: Receiver<SharedEvents>) -> (Result<SharedEvents, RecvError>, Receiver<SharedEvents>) {
202    let result = rx.recv().await;
203    (result, rx)
204}
205
206async fn flush_metrics(flush_interval: Duration) {
207    let mut context_resolver = MetricsContextResolver::new(INTERNAL_METRICS_INTERNER_SIZE);
208
209    let mut flush_interval = tokio::time::interval(flush_interval);
210    flush_interval.tick().await;
211
212    let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
213
214    let mut histogram_samples = Vec::<f64>::new();
215
216    loop {
217        flush_interval.tick().await;
218
219        // If we have no downstream listeners, just clear our histograms so they don't accumulate memory forever.
220        if state.flush_tx.receiver_count() == 0 {
221            let histograms = state.registry.get_histogram_handles();
222            for (_, histogram) in histograms {
223                histogram.clear();
224            }
225            continue;
226        }
227
228        let mut metrics = Vec::new();
229        let current_level = {
230            let current_level = state.current_level.lock().unwrap();
231            current_level.as_ref().copied().unwrap_or(Level::TRACE)
232        };
233
234        let counters = state.registry.get_counter_handles();
235        let gauges = state.registry.get_gauge_handles();
236        let histograms = state.registry.get_histogram_handles();
237
238        for (key, counter) in counters {
239            if state.is_metric_filtered(&key, &current_level) {
240                continue;
241            }
242
243            let context = context_resolver.resolve_from_key(key);
244            let value = counter.swap(0, Ordering::Relaxed) as f64;
245
246            let metric = Metric::counter(context, value);
247            metrics.push(Event::Metric(metric));
248        }
249
250        for (key, gauge) in gauges {
251            if state.is_metric_filtered(&key, &current_level) {
252                continue;
253            }
254
255            let context = context_resolver.resolve_from_key(key);
256            let value = f64::from_bits(gauge.load(Ordering::Relaxed));
257
258            let metric = Metric::gauge(context, value);
259            metrics.push(Event::Metric(metric));
260        }
261
262        for (key, histogram) in histograms {
263            if state.is_metric_filtered(&key, &current_level) {
264                continue;
265            }
266
267            let context = context_resolver.resolve_from_key(key);
268
269            // Collect all of the samples from the histogram.
270            //
271            // If the histogram was empty, skip emitting a metric for this histogram entirely. Empty sketches don't make
272            // sense to send.
273            histogram_samples.clear();
274            histogram.clear_with(|samples| histogram_samples.extend(samples));
275
276            if histogram_samples.is_empty() {
277                continue;
278            }
279
280            let metric = Metric::histogram(context, &histogram_samples[..]);
281            metrics.push(Event::Metric(metric));
282        }
283
284        let shared = Arc::new(metrics);
285        let _ = state.flush_tx.send(shared);
286    }
287}
288
289struct MetricsContextResolver {
290    context_resolver: ContextResolver,
291    key_context_cache: FastHashMap<Key, Context>,
292}
293
294impl MetricsContextResolver {
295    fn new(resolver_interner_size_bytes: NonZeroUsize) -> Self {
296        Self {
297            // Set up our context resolver without caching, since we will be caching the contexts ourselves.
298            context_resolver: ContextResolverBuilder::from_name("core/internal_metrics")
299                .expect("resolver name is not empty")
300                .with_interner_capacity_bytes(resolver_interner_size_bytes)
301                .without_caching()
302                .build(),
303            key_context_cache: FastHashMap::default(),
304        }
305    }
306
307    fn resolve_from_key(&mut self, key: Key) -> Context {
308        static SELF_ORIGIN_INFO: LazyLock<RawOrigin<'static>> = LazyLock::new(|| {
309            let mut origin_info = RawOrigin::default();
310            origin_info.set_process_id(std::process::id());
311            origin_info
312        });
313
314        // Check the cache first.
315        if let Some(context) = self.key_context_cache.get(&key) {
316            return context.clone();
317        }
318
319        // We don't have the context cached, so we need to resolve it.
320        let tags = key
321            .labels()
322            .map(|l| Tag::from(format!("{}:{}", l.key(), l.value())))
323            .collect::<TagSet>();
324
325        let context = self
326            .context_resolver
327            .resolve(key.name(), &tags, Some(SELF_ORIGIN_INFO.clone()))
328            .expect("resolver should always allow falling back");
329
330        self.key_context_cache.insert(key, context.clone());
331        context
332    }
333}
334
335/// Initializes the metrics subsystem with the given metrics prefix.
336///
337/// ## Errors
338///
339/// If a global recorder was already installed, an error will be returned.
340pub async fn initialize_metrics(metrics_prefix: String) -> Result<FilterHandle, GenericError> {
341    let recorder = MetricsRecorder::new(metrics_prefix);
342    let filter_handle = recorder.filter_handle();
343    recorder.install()?;
344
345    spawn_traced_named("internal-telemetry-metrics-flusher", flush_metrics(FLUSH_INTERVAL));
346
347    Ok(filter_handle)
348}