saluki_core/observability/metrics/
mod.rs1use std::{
8 num::NonZeroUsize,
9 pin::Pin,
10 sync::{atomic::Ordering, Arc, LazyLock, Mutex, OnceLock},
11 task::{self, ready, Poll},
12 time::Duration,
13};
14
15use async_trait::async_trait;
16use futures::Stream;
17use metrics::{
18 Counter, Gauge, Histogram, Key, KeyName, Level, Metadata, Recorder, SetRecorderError, SharedString, Unit,
19};
20use metrics_util::registry::{AtomicStorage, Registry};
21use saluki_common::{
22 collections::{FastConcurrentHashMap, FastHashMap},
23 sync::shutdown::ShutdownHandle,
24};
25use saluki_context::{
26 origin::RawOrigin,
27 tags::{Tag, TagSet},
28 Context, ContextResolver, ContextResolverBuilder,
29};
30use saluki_error::GenericError;
31use tokio::{
32 select,
33 sync::broadcast::{self, error::RecvError, Receiver},
34};
35use tokio_util::sync::ReusableBoxFuture;
36use tracing::debug;
37
38use crate::{
39 data_model::event::{metric::*, Event},
40 runtime::{InitializationError, Supervisable, SupervisorFuture},
41};
42
43mod aggregated;
44pub use self::aggregated::{
45 get_shared_metrics_state, AggregatedMetricValue, AggregatedMetricsProcessor, AggregatedMetricsState,
46};
47
48mod histogram;
49pub use self::histogram::AggregatedHistogram;
50
51mod processor;
52pub use self::processor::TelemetryProcessor;
53
54mod reflector;
55pub use self::reflector::{Processor, Reflector};
56
57mod remapper;
58pub use self::remapper::{RemappedMetric, RemapperRule};
59
60const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
61const INTERNAL_METRICS_INTERNER_SIZE: NonZeroUsize = NonZeroUsize::new(16_384).unwrap();
62
63static RECEIVER_STATE: OnceLock<Arc<State>> = OnceLock::new();
64
65pub type SharedEvents = Arc<Vec<Event>>;
67
68pub struct FilterHandle {
73 state: Arc<State>,
74}
75
76impl FilterHandle {
77 pub fn override_filter(&self, level: Level) {
79 *self.state.current_level.lock().unwrap() = level;
80 }
81
82 pub fn reset_filter(&self) {
84 *self.state.current_level.lock().unwrap() = self.state.default_level;
85 }
86}
87
88struct State {
89 registry: Registry<Key, AtomicStorage>,
90 level_map: FastConcurrentHashMap<Key, Level>,
91 flush_tx: broadcast::Sender<SharedEvents>,
92 metrics_prefix: String,
93 default_level: Level,
94 current_level: Mutex<Level>,
95}
96
97impl State {
98 fn is_metric_filtered(&self, key: &Key, filter_level: &Level) -> bool {
99 match self.level_map.pin().get(key) {
100 None => true,
102
103 Some(level) => level < filter_level,
106 }
107 }
108}
109
110struct MetricsRecorder {
111 state: Arc<State>,
112}
113
114impl MetricsRecorder {
115 fn new(metrics_prefix: String, default_level: Level) -> Self {
116 let (flush_tx, _) = broadcast::channel(2);
117 Self {
118 state: Arc::new(State {
119 registry: Registry::new(AtomicStorage),
120 level_map: FastConcurrentHashMap::default(),
121 flush_tx,
122 metrics_prefix,
123 default_level,
124 current_level: Mutex::new(default_level),
125 }),
126 }
127 }
128
129 fn filter_handle(&self) -> FilterHandle {
130 FilterHandle {
131 state: Arc::clone(&self.state),
132 }
133 }
134
135 fn install(self) -> Result<(), SetRecorderError<Self>> {
136 let state = Arc::clone(&self.state);
137 metrics::set_global_recorder(self)?;
138
139 if RECEIVER_STATE.set(state).is_err() {
140 panic!("metrics receiver should never be set prior to global recorder being installed");
141 }
142
143 Ok(())
144 }
145
146 fn prefix_key(&self, key: &Key) -> Key {
147 Key::from_parts(format!("{}.{}", self.state.metrics_prefix, key.name()), key.labels())
148 }
149}
150
151impl Recorder for MetricsRecorder {
152 fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
153 fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
154 fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
155
156 fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
157 let prefixed_key = self.prefix_key(key);
158 let handle = self
159 .state
160 .registry
161 .get_or_create_counter(&prefixed_key, |c| c.clone().into());
162 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
163
164 handle
165 }
166
167 fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
168 let prefixed_key = self.prefix_key(key);
169 let handle = self
170 .state
171 .registry
172 .get_or_create_gauge(&prefixed_key, |g| g.clone().into());
173 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
174
175 handle
176 }
177
178 fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
179 let prefixed_key = self.prefix_key(key);
180 let handle = self
181 .state
182 .registry
183 .get_or_create_histogram(&prefixed_key, |h| h.clone().into());
184 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
185
186 handle
187 }
188}
189
190pub struct MetricsStream {
195 inner: ReusableBoxFuture<'static, (Result<SharedEvents, RecvError>, Receiver<SharedEvents>)>,
196}
197
198impl MetricsStream {
199 pub fn register() -> Self {
201 let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
202 Self {
203 inner: ReusableBoxFuture::new(make_rx_future(state.flush_tx.subscribe())),
204 }
205 }
206}
207
208impl Stream for MetricsStream {
209 type Item = SharedEvents;
210
211 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
212 loop {
213 let (result, rx) = ready!(self.inner.poll(cx));
215 self.inner.set(make_rx_future(rx));
216
217 match result {
218 Ok(item) => return Poll::Ready(Some(item)),
219 Err(RecvError::Closed) => return Poll::Ready(None),
220 Err(RecvError::Lagged(n)) => {
221 debug!(
222 missed_payloads = n,
223 "Stream lagging behind internal metrics producer. Internal metrics may have been lost."
224 );
225 continue;
226 }
227 }
228 }
229 }
230}
231
232async fn make_rx_future(mut rx: Receiver<SharedEvents>) -> (Result<SharedEvents, RecvError>, Receiver<SharedEvents>) {
233 let result = rx.recv().await;
234 (result, rx)
235}
236
237async fn flush_metrics(flush_interval: Duration) {
238 let mut context_resolver = MetricsContextResolver::new(INTERNAL_METRICS_INTERNER_SIZE);
239
240 let mut flush_interval = tokio::time::interval(flush_interval);
241 flush_interval.tick().await;
242
243 let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
244
245 let mut histogram_samples = Vec::<f64>::new();
246
247 loop {
248 flush_interval.tick().await;
249
250 if state.flush_tx.receiver_count() == 0 {
252 let counters = state.registry.get_counter_handles();
253 for (_, counter) in counters {
254 counter.swap(0, Ordering::Relaxed);
255 }
256
257 let histograms = state.registry.get_histogram_handles();
258 for (_, histogram) in histograms {
259 histogram.clear();
260 }
261
262 continue;
263 }
264
265 let mut metrics = Vec::new();
266 let current_level = *state.current_level.lock().unwrap();
267
268 let counters = state.registry.get_counter_handles();
269 let gauges = state.registry.get_gauge_handles();
270 let histograms = state.registry.get_histogram_handles();
271
272 for (key, counter) in counters {
279 let value = counter.swap(0, Ordering::Relaxed) as f64;
280
281 if state.is_metric_filtered(&key, ¤t_level) {
282 continue;
283 }
284
285 let context = context_resolver.resolve_from_key(key);
286 let metric = Metric::counter(context, value);
287 metrics.push(Event::Metric(metric));
288 }
289
290 for (key, gauge) in gauges {
291 let value = f64::from_bits(gauge.load(Ordering::Relaxed));
292
293 if state.is_metric_filtered(&key, ¤t_level) {
294 continue;
295 }
296
297 let context = context_resolver.resolve_from_key(key);
298 let metric = Metric::gauge(context, value);
299 metrics.push(Event::Metric(metric));
300 }
301
302 for (key, histogram) in histograms {
303 histogram_samples.clear();
304 histogram.clear_with(|samples| histogram_samples.extend(samples));
305
306 if state.is_metric_filtered(&key, ¤t_level) {
307 continue;
308 }
309
310 let context = context_resolver.resolve_from_key(key);
311 let metric = Metric::histogram(context, &histogram_samples[..]);
312 metrics.push(Event::Metric(metric));
313 }
314
315 let shared = Arc::new(metrics);
316 let _ = state.flush_tx.send(shared);
317 }
318}
319
320struct MetricsContextResolver {
321 context_resolver: ContextResolver,
322 key_context_cache: FastHashMap<Key, Context>,
323}
324
325impl MetricsContextResolver {
326 fn new(resolver_interner_size_bytes: NonZeroUsize) -> Self {
327 Self {
328 context_resolver: ContextResolverBuilder::from_name("core/internal_metrics")
330 .expect("resolver name is not empty")
331 .with_interner_capacity_bytes(resolver_interner_size_bytes)
332 .without_caching()
333 .build(),
334 key_context_cache: FastHashMap::default(),
335 }
336 }
337
338 fn resolve_from_key(&mut self, key: Key) -> Context {
339 static SELF_ORIGIN_INFO: LazyLock<RawOrigin<'static>> = LazyLock::new(|| {
340 let mut origin_info = RawOrigin::default();
341 origin_info.set_process_id(std::process::id());
342 origin_info
343 });
344
345 if let Some(context) = self.key_context_cache.get(&key) {
347 return context.clone();
348 }
349
350 let tags = key
352 .labels()
353 .map(|l| Tag::from(format!("{}:{}", l.key(), l.value())))
354 .collect::<TagSet>();
355
356 let context = self
357 .context_resolver
358 .resolve(key.name(), &tags, Some(SELF_ORIGIN_INFO.clone()))
359 .expect("resolver should always allow falling back");
360
361 self.key_context_cache.insert(key, context.clone());
362 context
363 }
364}
365
366pub async fn initialize_metrics(
380 metrics_prefix: String, default_level: Level,
381) -> Result<(FilterHandle, MetricsFlusherWorker), GenericError> {
382 let recorder = MetricsRecorder::new(metrics_prefix, default_level);
383 let filter_handle = recorder.filter_handle();
384 recorder.install()?;
385
386 Ok((filter_handle, MetricsFlusherWorker))
387}
388
389pub struct MetricsFlusherWorker;
395
396#[async_trait]
397impl Supervisable for MetricsFlusherWorker {
398 fn name(&self) -> &str {
399 "internal-telemetry-metrics-flusher"
400 }
401
402 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
403 Ok(Box::pin(async move {
404 select! {
405 _ = process_shutdown => {},
406 _ = flush_metrics(FLUSH_INTERVAL) => {},
407 }
408
409 Ok(())
410 }))
411 }
412}