saluki_core/observability/
metrics.rs1use std::{
3 num::NonZeroUsize,
4 pin::Pin,
5 sync::{atomic::Ordering, Arc, LazyLock, Mutex, OnceLock},
6 task::{self, ready, Poll},
7 time::Duration,
8};
9
10use futures::Stream;
11use metrics::{
12 Counter, Gauge, Histogram, Key, KeyName, Level, Metadata, Recorder, SetRecorderError, SharedString, Unit,
13};
14use metrics_util::registry::{AtomicStorage, Registry};
15use saluki_common::{
16 collections::{FastConcurrentHashMap, FastHashMap},
17 task::spawn_traced_named,
18};
19use saluki_context::{
20 origin::RawOrigin,
21 tags::{Tag, TagSet},
22 Context, ContextResolver, ContextResolverBuilder,
23};
24use saluki_error::GenericError;
25use tokio::sync::broadcast::{self, error::RecvError, Receiver};
26use tokio_util::sync::ReusableBoxFuture;
27use tracing::debug;
28
29use crate::data_model::event::{metric::*, Event};
30
31const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
32const INTERNAL_METRICS_INTERNER_SIZE: NonZeroUsize = NonZeroUsize::new(16_384).unwrap();
33
34static RECEIVER_STATE: OnceLock<Arc<State>> = OnceLock::new();
35
36pub type SharedEvents = Arc<Vec<Event>>;
38
39pub struct FilterHandle {
44 state: Arc<State>,
45}
46
47impl FilterHandle {
48 pub fn override_filter(&self, level: Level) {
50 *self.state.current_level.lock().unwrap() = Some(level);
51 }
52
53 pub fn reset_filter(&self) {
55 *self.state.current_level.lock().unwrap() = None;
56 }
57}
58
59struct State {
60 registry: Registry<Key, AtomicStorage>,
61 level_map: FastConcurrentHashMap<Key, Level>,
62 flush_tx: broadcast::Sender<SharedEvents>,
63 metrics_prefix: String,
64 current_level: Mutex<Option<Level>>,
65}
66
67impl State {
68 fn is_metric_filtered(&self, key: &Key, filter_level: &Level) -> bool {
69 match self.level_map.pin().get(key) {
70 None => true,
72
73 Some(level) => level < filter_level,
76 }
77 }
78}
79
80struct MetricsRecorder {
81 state: Arc<State>,
82}
83
84impl MetricsRecorder {
85 fn new(metrics_prefix: String) -> Self {
86 let (flush_tx, _) = broadcast::channel(2);
87 Self {
88 state: Arc::new(State {
89 registry: Registry::new(AtomicStorage),
90 level_map: FastConcurrentHashMap::default(),
91 flush_tx,
92 metrics_prefix,
93 current_level: Mutex::new(None),
94 }),
95 }
96 }
97
98 fn filter_handle(&self) -> FilterHandle {
99 FilterHandle {
100 state: Arc::clone(&self.state),
101 }
102 }
103
104 fn install(self) -> Result<(), SetRecorderError<Self>> {
105 let state = Arc::clone(&self.state);
106 metrics::set_global_recorder(self)?;
107
108 if RECEIVER_STATE.set(state).is_err() {
109 panic!("metrics receiver should never be set prior to global recorder being installed");
110 }
111
112 Ok(())
113 }
114
115 fn prefix_key(&self, key: &Key) -> Key {
116 Key::from_parts(format!("{}.{}", self.state.metrics_prefix, key.name()), key.labels())
117 }
118}
119
120impl Recorder for MetricsRecorder {
121 fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
122 fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
123 fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
124
125 fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
126 let prefixed_key = self.prefix_key(key);
127 let handle = self
128 .state
129 .registry
130 .get_or_create_counter(&prefixed_key, |c| c.clone().into());
131 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
132
133 handle
134 }
135
136 fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
137 let prefixed_key = self.prefix_key(key);
138 let handle = self
139 .state
140 .registry
141 .get_or_create_gauge(&prefixed_key, |g| g.clone().into());
142 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
143
144 handle
145 }
146
147 fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
148 let prefixed_key = self.prefix_key(key);
149 let handle = self
150 .state
151 .registry
152 .get_or_create_histogram(&prefixed_key, |h| h.clone().into());
153 self.state.level_map.pin().insert(prefixed_key, *metadata.level());
154
155 handle
156 }
157}
158
159pub struct MetricsStream {
164 inner: ReusableBoxFuture<'static, (Result<SharedEvents, RecvError>, Receiver<SharedEvents>)>,
165}
166
167impl MetricsStream {
168 pub fn register() -> Self {
170 let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
171 Self {
172 inner: ReusableBoxFuture::new(make_rx_future(state.flush_tx.subscribe())),
173 }
174 }
175}
176
177impl Stream for MetricsStream {
178 type Item = SharedEvents;
179
180 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
181 loop {
182 let (result, rx) = ready!(self.inner.poll(cx));
184 self.inner.set(make_rx_future(rx));
185
186 match result {
187 Ok(item) => return Poll::Ready(Some(item)),
188 Err(RecvError::Closed) => return Poll::Ready(None),
189 Err(RecvError::Lagged(n)) => {
190 debug!(
191 missed_payloads = n,
192 "Stream lagging behind internal metrics producer. Internal metrics may have been lost."
193 );
194 continue;
195 }
196 }
197 }
198 }
199}
200
201async fn make_rx_future(mut rx: Receiver<SharedEvents>) -> (Result<SharedEvents, RecvError>, Receiver<SharedEvents>) {
202 let result = rx.recv().await;
203 (result, rx)
204}
205
206async fn flush_metrics(flush_interval: Duration) {
207 let mut context_resolver = MetricsContextResolver::new(INTERNAL_METRICS_INTERNER_SIZE);
208
209 let mut flush_interval = tokio::time::interval(flush_interval);
210 flush_interval.tick().await;
211
212 let state = RECEIVER_STATE.get().expect("metrics receiver should be set");
213
214 let mut histogram_samples = Vec::<f64>::new();
215
216 loop {
217 flush_interval.tick().await;
218
219 if state.flush_tx.receiver_count() == 0 {
221 let histograms = state.registry.get_histogram_handles();
222 for (_, histogram) in histograms {
223 histogram.clear();
224 }
225 continue;
226 }
227
228 let mut metrics = Vec::new();
229 let current_level = {
230 let current_level = state.current_level.lock().unwrap();
231 current_level.as_ref().copied().unwrap_or(Level::TRACE)
232 };
233
234 let counters = state.registry.get_counter_handles();
235 let gauges = state.registry.get_gauge_handles();
236 let histograms = state.registry.get_histogram_handles();
237
238 for (key, counter) in counters {
239 if state.is_metric_filtered(&key, ¤t_level) {
240 continue;
241 }
242
243 let context = context_resolver.resolve_from_key(key);
244 let value = counter.swap(0, Ordering::Relaxed) as f64;
245
246 let metric = Metric::counter(context, value);
247 metrics.push(Event::Metric(metric));
248 }
249
250 for (key, gauge) in gauges {
251 if state.is_metric_filtered(&key, ¤t_level) {
252 continue;
253 }
254
255 let context = context_resolver.resolve_from_key(key);
256 let value = f64::from_bits(gauge.load(Ordering::Relaxed));
257
258 let metric = Metric::gauge(context, value);
259 metrics.push(Event::Metric(metric));
260 }
261
262 for (key, histogram) in histograms {
263 if state.is_metric_filtered(&key, ¤t_level) {
264 continue;
265 }
266
267 let context = context_resolver.resolve_from_key(key);
268
269 histogram_samples.clear();
274 histogram.clear_with(|samples| histogram_samples.extend(samples));
275
276 if histogram_samples.is_empty() {
277 continue;
278 }
279
280 let metric = Metric::histogram(context, &histogram_samples[..]);
281 metrics.push(Event::Metric(metric));
282 }
283
284 let shared = Arc::new(metrics);
285 let _ = state.flush_tx.send(shared);
286 }
287}
288
289struct MetricsContextResolver {
290 context_resolver: ContextResolver,
291 key_context_cache: FastHashMap<Key, Context>,
292}
293
294impl MetricsContextResolver {
295 fn new(resolver_interner_size_bytes: NonZeroUsize) -> Self {
296 Self {
297 context_resolver: ContextResolverBuilder::from_name("core/internal_metrics")
299 .expect("resolver name is not empty")
300 .with_interner_capacity_bytes(resolver_interner_size_bytes)
301 .without_caching()
302 .build(),
303 key_context_cache: FastHashMap::default(),
304 }
305 }
306
307 fn resolve_from_key(&mut self, key: Key) -> Context {
308 static SELF_ORIGIN_INFO: LazyLock<RawOrigin<'static>> = LazyLock::new(|| {
309 let mut origin_info = RawOrigin::default();
310 origin_info.set_process_id(std::process::id());
311 origin_info
312 });
313
314 if let Some(context) = self.key_context_cache.get(&key) {
316 return context.clone();
317 }
318
319 let tags = key
321 .labels()
322 .map(|l| Tag::from(format!("{}:{}", l.key(), l.value())))
323 .collect::<TagSet>();
324
325 let context = self
326 .context_resolver
327 .resolve(key.name(), &tags, Some(SELF_ORIGIN_INFO.clone()))
328 .expect("resolver should always allow falling back");
329
330 self.key_context_cache.insert(key, context.clone());
331 context
332 }
333}
334
335pub async fn initialize_metrics(metrics_prefix: String) -> Result<FilterHandle, GenericError> {
341 let recorder = MetricsRecorder::new(metrics_prefix);
342 let filter_handle = recorder.filter_handle();
343 recorder.install()?;
344
345 spawn_traced_named("internal-telemetry-metrics-flusher", flush_metrics(FLUSH_INTERVAL));
346
347 Ok(filter_handle)
348}