saluki_core/observability/
metrics.rs1use std::{
3 num::NonZeroUsize,
4 pin::Pin,
5 sync::{atomic::Ordering, Arc, LazyLock, Mutex, OnceLock},
6 task::{self, ready, Poll},
7 time::Duration,
8};
9
10use async_trait::async_trait;
11use futures::Stream;
12use metrics::{
13 Counter, Gauge, Histogram, Key, KeyName, Level, Metadata, Recorder, SetRecorderError, SharedString, Unit,
14};
15use metrics_util::registry::{AtomicStorage, Registry};
16use saluki_common::collections::{FastConcurrentHashMap, FastHashMap};
17use saluki_context::{
18 origin::RawOrigin,
19 tags::{Tag, TagSet},
20 Context, ContextResolver, ContextResolverBuilder,
21};
22use saluki_error::GenericError;
23use tokio::{
24 select,
25 sync::broadcast::{self, error::RecvError, Receiver},
26};
27use tokio_util::sync::ReusableBoxFuture;
28use tracing::debug;
29
30use crate::{
31 data_model::event::{metric::*, Event},
32 runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
33};
34
35const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
36const INTERNAL_METRICS_INTERNER_SIZE: NonZeroUsize = NonZeroUsize::new(16_384).unwrap();
37
38static RECEIVER_STATE: OnceLock<Arc<State>> = OnceLock::new();
39
40pub type SharedEvents = Arc<Vec<Event>>;
42
43pub struct FilterHandle {
48 state: Arc<State>,
49}
50
51impl FilterHandle {
52 pub fn override_filter(&self, level: Level) {
54 *self.state.current_level.lock().unwrap() = Some(level);
55 }
56
57 pub fn reset_filter(&self) {
59 *self.state.current_level.lock().unwrap() = None;
60 }
61}
62
63struct State {
64 registry: Registry<Key, AtomicStorage>,
65 level_map: FastConcurrentHashMap<Key, Level>,
66 flush_tx: broadcast::Sender<SharedEvents>,
67 metrics_prefix: String,
68 current_level: Mutex<Option<Level>>,
69}
70
71impl State {
72 fn is_metric_filtered(&self, key: &Key, filter_level: &Level) -> bool {
73 match self.level_map.pin().get(key) {
74 None => true,
76
77 Some(level) => level < filter_level,
80 }
81 }
82}
83
84struct MetricsRecorder {
85 state: Arc<State>,
86}
87
88impl MetricsRecorder {
89 fn new(metrics_prefix: String) -> Self {
90 let (flush_tx, _) = broadcast::channel(2);
91 Self {
92 state: Arc::new(State {
93 registry: Registry::new(AtomicStorage),
94 level_map: FastConcurrentHashMap::default(),
95 flush_tx,
96 metrics_prefix,
97 current_level: Mutex::new(None),
98 }),
99 }
100 }
101
102 fn filter_handle(&self) -> FilterHandle {
103 FilterHandle {
104 state: Arc::clone(&self.state),
105 }
106 }
107
108 fn install(self) -> Result<(), SetRecorderError<Self>> {
109 let state = Arc::clone(&self.state);
110 metrics::set_global_recorder(self)?;
111
112 if RECEIVER_STATE.set(state).is_err() {
113 panic!("metrics receiver should never be set prior to global recorder being installed");
114 }
115
116 Ok(())
117 }
118
119 fn prefix_key(&self, key: &Key) -> Key {
120 Key::from_parts(format!("{}.{}", self.state.metrics_prefix, key.name()), key.labels())
121 }
122}
123
124impl Recorder for MetricsRecorder {
125 fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
126 fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
127 fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
128
129 fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
130 let prefixed_key = self.prefix_key(key);
131 let handle = self
132 .state
133 .registry
134 .get_or_create_counter(&prefixed_key, |c| c.clone().into());
135 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
136
137 handle
138 }
139
140 fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
141 let prefixed_key = self.prefix_key(key);
142 let handle = self
143 .state
144 .registry
145 .get_or_create_gauge(&prefixed_key, |g| g.clone().into());
146 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
147
148 handle
149 }
150
151 fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
152 let prefixed_key = self.prefix_key(key);
153 let handle = self
154 .state
155 .registry
156 .get_or_create_histogram(&prefixed_key, |h| h.clone().into());
157 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
158
159 handle
160 }
161}
162
163pub struct MetricsStream {
168 inner: ReusableBoxFuture<'static, (Result<SharedEvents, RecvError>, Receiver<SharedEvents>)>,
169}
170
171impl MetricsStream {
172 pub fn register() -> Self {
174 let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
175 Self {
176 inner: ReusableBoxFuture::new(make_rx_future(state.flush_tx.subscribe())),
177 }
178 }
179}
180
181impl Stream for MetricsStream {
182 type Item = SharedEvents;
183
184 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
185 loop {
186 let (result, rx) = ready!(self.inner.poll(cx));
188 self.inner.set(make_rx_future(rx));
189
190 match result {
191 Ok(item) => return Poll::Ready(Some(item)),
192 Err(RecvError::Closed) => return Poll::Ready(None),
193 Err(RecvError::Lagged(n)) => {
194 debug!(
195 missed_payloads = n,
196 "Stream lagging behind internal metrics producer. Internal metrics may have been lost."
197 );
198 continue;
199 }
200 }
201 }
202 }
203}
204
205async fn make_rx_future(mut rx: Receiver<SharedEvents>) -> (Result<SharedEvents, RecvError>, Receiver<SharedEvents>) {
206 let result = rx.recv().await;
207 (result, rx)
208}
209
210async fn flush_metrics(flush_interval: Duration) {
211 let mut context_resolver = MetricsContextResolver::new(INTERNAL_METRICS_INTERNER_SIZE);
212
213 let mut flush_interval = tokio::time::interval(flush_interval);
214 flush_interval.tick().await;
215
216 let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
217
218 let mut histogram_samples = Vec::<f64>::new();
219
220 loop {
221 flush_interval.tick().await;
222
223 if state.flush_tx.receiver_count() == 0 {
225 let histograms = state.registry.get_histogram_handles();
226 for (_, histogram) in histograms {
227 histogram.clear();
228 }
229 continue;
230 }
231
232 let mut metrics = Vec::new();
233 let current_level = {
234 let current_level = state.current_level.lock().unwrap();
235 current_level.as_ref().copied().unwrap_or(Level::TRACE)
236 };
237
238 let counters = state.registry.get_counter_handles();
239 let gauges = state.registry.get_gauge_handles();
240 let histograms = state.registry.get_histogram_handles();
241
242 for (key, counter) in counters {
243 if state.is_metric_filtered(&key, ¤t_level) {
244 continue;
245 }
246
247 let context = context_resolver.resolve_from_key(key);
248 let value = counter.swap(0, Ordering::Relaxed) as f64;
249
250 let metric = Metric::counter(context, value);
251 metrics.push(Event::Metric(metric));
252 }
253
254 for (key, gauge) in gauges {
255 if state.is_metric_filtered(&key, ¤t_level) {
256 continue;
257 }
258
259 let context = context_resolver.resolve_from_key(key);
260 let value = f64::from_bits(gauge.load(Ordering::Relaxed));
261
262 let metric = Metric::gauge(context, value);
263 metrics.push(Event::Metric(metric));
264 }
265
266 for (key, histogram) in histograms {
267 if state.is_metric_filtered(&key, ¤t_level) {
268 continue;
269 }
270
271 let context = context_resolver.resolve_from_key(key);
272
273 histogram_samples.clear();
278 histogram.clear_with(|samples| histogram_samples.extend(samples));
279
280 if histogram_samples.is_empty() {
281 continue;
282 }
283
284 let metric = Metric::histogram(context, &histogram_samples[..]);
285 metrics.push(Event::Metric(metric));
286 }
287
288 let shared = Arc::new(metrics);
289 let _ = state.flush_tx.send(shared);
290 }
291}
292
293struct MetricsContextResolver {
294 context_resolver: ContextResolver,
295 key_context_cache: FastHashMap<Key, Context>,
296}
297
298impl MetricsContextResolver {
299 fn new(resolver_interner_size_bytes: NonZeroUsize) -> Self {
300 Self {
301 context_resolver: ContextResolverBuilder::from_name("core/internal_metrics")
303 .expect("resolver name is not empty")
304 .with_interner_capacity_bytes(resolver_interner_size_bytes)
305 .without_caching()
306 .build(),
307 key_context_cache: FastHashMap::default(),
308 }
309 }
310
311 fn resolve_from_key(&mut self, key: Key) -> Context {
312 static SELF_ORIGIN_INFO: LazyLock<RawOrigin<'static>> = LazyLock::new(|| {
313 let mut origin_info = RawOrigin::default();
314 origin_info.set_process_id(std::process::id());
315 origin_info
316 });
317
318 if let Some(context) = self.key_context_cache.get(&key) {
320 return context.clone();
321 }
322
323 let tags = key
325 .labels()
326 .map(|l| Tag::from(format!("{}:{}", l.key(), l.value())))
327 .collect::<TagSet>();
328
329 let context = self
330 .context_resolver
331 .resolve(key.name(), &tags, Some(SELF_ORIGIN_INFO.clone()))
332 .expect("resolver should always allow falling back");
333
334 self.key_context_cache.insert(key, context.clone());
335 context
336 }
337}
338
339pub async fn initialize_metrics(metrics_prefix: String) -> Result<(FilterHandle, MetricsFlusherWorker), GenericError> {
349 let recorder = MetricsRecorder::new(metrics_prefix);
350 let filter_handle = recorder.filter_handle();
351 recorder.install()?;
352
353 Ok((filter_handle, MetricsFlusherWorker))
354}
355
356pub struct MetricsFlusherWorker;
362
363#[async_trait]
364impl Supervisable for MetricsFlusherWorker {
365 fn name(&self) -> &str {
366 "internal-telemetry-metrics-flusher"
367 }
368
369 async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
370 Ok(Box::pin(async move {
371 select! {
372 _ = flush_metrics(FLUSH_INTERVAL) => {},
373 _ = process_shutdown.wait_for_shutdown() => {},
374 }
375 Ok(())
376 }))
377 }
378}