saluki_core/observability/metrics/
mod.rs1use std::{
8 num::NonZeroUsize,
9 pin::Pin,
10 sync::{atomic::Ordering, Arc, LazyLock, Mutex, OnceLock},
11 task::{self, ready, Poll},
12 time::Duration,
13};
14
15use async_trait::async_trait;
16use futures::Stream;
17use metrics::{
18 Counter, Gauge, Histogram, Key, KeyName, Level, Metadata, Recorder, SetRecorderError, SharedString, Unit,
19};
20use metrics_util::registry::{AtomicStorage, Registry};
21use saluki_common::collections::{FastConcurrentHashMap, FastHashMap};
22use saluki_context::{
23 origin::RawOrigin,
24 tags::{Tag, TagSet},
25 Context, ContextResolver, ContextResolverBuilder,
26};
27use saluki_error::GenericError;
28use tokio::{
29 select,
30 sync::broadcast::{self, error::RecvError, Receiver},
31};
32use tokio_util::sync::ReusableBoxFuture;
33use tracing::debug;
34
35use crate::{
36 data_model::event::{metric::*, Event},
37 runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
38};
39
40mod aggregated;
41pub use self::aggregated::{
42 get_shared_metrics_state, AggregatedMetricValue, AggregatedMetricsProcessor, AggregatedMetricsState,
43};
44
45mod histogram;
46pub use self::histogram::AggregatedHistogram;
47
48mod processor;
49pub use self::processor::TelemetryProcessor;
50
51mod reflector;
52pub use self::reflector::{Processor, Reflector};
53
54mod remapper;
55pub use self::remapper::{RemappedMetric, RemapperRule};
56
57const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
58const INTERNAL_METRICS_INTERNER_SIZE: NonZeroUsize = NonZeroUsize::new(16_384).unwrap();
59
60static RECEIVER_STATE: OnceLock<Arc<State>> = OnceLock::new();
61
62pub type SharedEvents = Arc<Vec<Event>>;
64
65pub struct FilterHandle {
70 state: Arc<State>,
71}
72
73impl FilterHandle {
74 pub fn override_filter(&self, level: Level) {
76 *self.state.current_level.lock().unwrap() = level;
77 }
78
79 pub fn reset_filter(&self) {
81 *self.state.current_level.lock().unwrap() = self.state.default_level;
82 }
83}
84
85struct State {
86 registry: Registry<Key, AtomicStorage>,
87 level_map: FastConcurrentHashMap<Key, Level>,
88 flush_tx: broadcast::Sender<SharedEvents>,
89 metrics_prefix: String,
90 default_level: Level,
91 current_level: Mutex<Level>,
92}
93
94impl State {
95 fn is_metric_filtered(&self, key: &Key, filter_level: &Level) -> bool {
96 match self.level_map.pin().get(key) {
97 None => true,
99
100 Some(level) => level < filter_level,
103 }
104 }
105}
106
107struct MetricsRecorder {
108 state: Arc<State>,
109}
110
111impl MetricsRecorder {
112 fn new(metrics_prefix: String, default_level: Level) -> Self {
113 let (flush_tx, _) = broadcast::channel(2);
114 Self {
115 state: Arc::new(State {
116 registry: Registry::new(AtomicStorage),
117 level_map: FastConcurrentHashMap::default(),
118 flush_tx,
119 metrics_prefix,
120 default_level,
121 current_level: Mutex::new(default_level),
122 }),
123 }
124 }
125
126 fn filter_handle(&self) -> FilterHandle {
127 FilterHandle {
128 state: Arc::clone(&self.state),
129 }
130 }
131
132 fn install(self) -> Result<(), SetRecorderError<Self>> {
133 let state = Arc::clone(&self.state);
134 metrics::set_global_recorder(self)?;
135
136 if RECEIVER_STATE.set(state).is_err() {
137 panic!("metrics receiver should never be set prior to global recorder being installed");
138 }
139
140 Ok(())
141 }
142
143 fn prefix_key(&self, key: &Key) -> Key {
144 Key::from_parts(format!("{}.{}", self.state.metrics_prefix, key.name()), key.labels())
145 }
146}
147
148impl Recorder for MetricsRecorder {
149 fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
150 fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
151 fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
152
153 fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
154 let prefixed_key = self.prefix_key(key);
155 let handle = self
156 .state
157 .registry
158 .get_or_create_counter(&prefixed_key, |c| c.clone().into());
159 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
160
161 handle
162 }
163
164 fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
165 let prefixed_key = self.prefix_key(key);
166 let handle = self
167 .state
168 .registry
169 .get_or_create_gauge(&prefixed_key, |g| g.clone().into());
170 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
171
172 handle
173 }
174
175 fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
176 let prefixed_key = self.prefix_key(key);
177 let handle = self
178 .state
179 .registry
180 .get_or_create_histogram(&prefixed_key, |h| h.clone().into());
181 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
182
183 handle
184 }
185}
186
187pub struct MetricsStream {
192 inner: ReusableBoxFuture<'static, (Result<SharedEvents, RecvError>, Receiver<SharedEvents>)>,
193}
194
195impl MetricsStream {
196 pub fn register() -> Self {
198 let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
199 Self {
200 inner: ReusableBoxFuture::new(make_rx_future(state.flush_tx.subscribe())),
201 }
202 }
203}
204
205impl Stream for MetricsStream {
206 type Item = SharedEvents;
207
208 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
209 loop {
210 let (result, rx) = ready!(self.inner.poll(cx));
212 self.inner.set(make_rx_future(rx));
213
214 match result {
215 Ok(item) => return Poll::Ready(Some(item)),
216 Err(RecvError::Closed) => return Poll::Ready(None),
217 Err(RecvError::Lagged(n)) => {
218 debug!(
219 missed_payloads = n,
220 "Stream lagging behind internal metrics producer. Internal metrics may have been lost."
221 );
222 continue;
223 }
224 }
225 }
226 }
227}
228
229async fn make_rx_future(mut rx: Receiver<SharedEvents>) -> (Result<SharedEvents, RecvError>, Receiver<SharedEvents>) {
230 let result = rx.recv().await;
231 (result, rx)
232}
233
234async fn flush_metrics(flush_interval: Duration) {
235 let mut context_resolver = MetricsContextResolver::new(INTERNAL_METRICS_INTERNER_SIZE);
236
237 let mut flush_interval = tokio::time::interval(flush_interval);
238 flush_interval.tick().await;
239
240 let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
241
242 let mut histogram_samples = Vec::<f64>::new();
243
244 loop {
245 flush_interval.tick().await;
246
247 if state.flush_tx.receiver_count() == 0 {
249 let counters = state.registry.get_counter_handles();
250 for (_, counter) in counters {
251 counter.swap(0, Ordering::Relaxed);
252 }
253
254 let histograms = state.registry.get_histogram_handles();
255 for (_, histogram) in histograms {
256 histogram.clear();
257 }
258
259 continue;
260 }
261
262 let mut metrics = Vec::new();
263 let current_level = *state.current_level.lock().unwrap();
264
265 let counters = state.registry.get_counter_handles();
266 let gauges = state.registry.get_gauge_handles();
267 let histograms = state.registry.get_histogram_handles();
268
269 for (key, counter) in counters {
276 let value = counter.swap(0, Ordering::Relaxed) as f64;
277
278 if state.is_metric_filtered(&key, ¤t_level) {
279 continue;
280 }
281
282 let context = context_resolver.resolve_from_key(key);
283 let metric = Metric::counter(context, value);
284 metrics.push(Event::Metric(metric));
285 }
286
287 for (key, gauge) in gauges {
288 let value = f64::from_bits(gauge.load(Ordering::Relaxed));
289
290 if state.is_metric_filtered(&key, ¤t_level) {
291 continue;
292 }
293
294 let context = context_resolver.resolve_from_key(key);
295 let metric = Metric::gauge(context, value);
296 metrics.push(Event::Metric(metric));
297 }
298
299 for (key, histogram) in histograms {
300 histogram_samples.clear();
301 histogram.clear_with(|samples| histogram_samples.extend(samples));
302
303 if state.is_metric_filtered(&key, ¤t_level) {
304 continue;
305 }
306
307 let context = context_resolver.resolve_from_key(key);
308 let metric = Metric::histogram(context, &histogram_samples[..]);
309 metrics.push(Event::Metric(metric));
310 }
311
312 let shared = Arc::new(metrics);
313 let _ = state.flush_tx.send(shared);
314 }
315}
316
317struct MetricsContextResolver {
318 context_resolver: ContextResolver,
319 key_context_cache: FastHashMap<Key, Context>,
320}
321
322impl MetricsContextResolver {
323 fn new(resolver_interner_size_bytes: NonZeroUsize) -> Self {
324 Self {
325 context_resolver: ContextResolverBuilder::from_name("core/internal_metrics")
327 .expect("resolver name is not empty")
328 .with_interner_capacity_bytes(resolver_interner_size_bytes)
329 .without_caching()
330 .build(),
331 key_context_cache: FastHashMap::default(),
332 }
333 }
334
335 fn resolve_from_key(&mut self, key: Key) -> Context {
336 static SELF_ORIGIN_INFO: LazyLock<RawOrigin<'static>> = LazyLock::new(|| {
337 let mut origin_info = RawOrigin::default();
338 origin_info.set_process_id(std::process::id());
339 origin_info
340 });
341
342 if let Some(context) = self.key_context_cache.get(&key) {
344 return context.clone();
345 }
346
347 let tags = key
349 .labels()
350 .map(|l| Tag::from(format!("{}:{}", l.key(), l.value())))
351 .collect::<TagSet>();
352
353 let context = self
354 .context_resolver
355 .resolve(key.name(), &tags, Some(SELF_ORIGIN_INFO.clone()))
356 .expect("resolver should always allow falling back");
357
358 self.key_context_cache.insert(key, context.clone());
359 context
360 }
361}
362
363pub async fn initialize_metrics(
377 metrics_prefix: String, default_level: Level,
378) -> Result<(FilterHandle, MetricsFlusherWorker), GenericError> {
379 let recorder = MetricsRecorder::new(metrics_prefix, default_level);
380 let filter_handle = recorder.filter_handle();
381 recorder.install()?;
382
383 Ok((filter_handle, MetricsFlusherWorker))
384}
385
386pub struct MetricsFlusherWorker;
392
393#[async_trait]
394impl Supervisable for MetricsFlusherWorker {
395 fn name(&self) -> &str {
396 "internal-telemetry-metrics-flusher"
397 }
398
399 async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
400 Ok(Box::pin(async move {
401 select! {
402 _ = flush_metrics(FLUSH_INTERVAL) => {},
403 _ = process_shutdown.wait_for_shutdown() => {},
404 }
405 Ok(())
406 }))
407 }
408}