1use std::{
2 convert::Infallible,
3 num::NonZeroUsize,
4 sync::{Arc, LazyLock},
5};
6
7use async_trait::async_trait;
8use ddsketch::DDSketch;
9use http::{Request, Response};
10use hyper::{body::Incoming, service::service_fn};
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use prometheus_exposition::{MetricType, PrometheusRenderer};
13use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator};
14use saluki_context::{tags::Tag, Context};
15use saluki_core::components::{destinations::*, ComponentContext};
16use saluki_core::data_model::event::{
17 metric::{Histogram, Metric, MetricValues},
18 EventType,
19};
20use saluki_error::GenericError;
21use saluki_io::net::{
22 listener::ConnectionOrientedListener,
23 server::http::{ErrorHandle, HttpServer, ShutdownHandle},
24 ListenAddress,
25};
26use serde::Deserialize;
27use stringtheory::{
28 interning::{FixedSizeInterner, Interner as _},
29 MetaString,
30};
31use tokio::{select, sync::RwLock};
32use tracing::debug;
33
34const CONTEXT_LIMIT: usize = 10_000;
35const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024;
36const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048;
37
38const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
40static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> =
41 LazyLock::new(|| histogram_buckets::<TIME_HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
42
43const NON_TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
44static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTOGRAM_BUCKET_COUNT]> =
45 LazyLock::new(|| histogram_buckets::<NON_TIME_HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
46
47const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap();
49
50#[derive(Deserialize)]
68pub struct PrometheusConfiguration {
69 #[serde(rename = "prometheus_listen_addr")]
70 listen_addr: ListenAddress,
71}
72
73impl PrometheusConfiguration {
74 pub fn from_listen_address(listen_addr: ListenAddress) -> Self {
76 Self { listen_addr }
77 }
78}
79
80#[async_trait]
81impl DestinationBuilder for PrometheusConfiguration {
82 fn input_event_type(&self) -> EventType {
83 EventType::Metric
84 }
85
86 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
87 Ok(Box::new(Prometheus {
88 listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?,
89 metrics: FastIndexMap::default(),
90 payload: Arc::new(RwLock::new(String::new())),
91 renderer: PrometheusRenderer::new(),
92 interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES),
93 }))
94 }
95}
96
97impl MemoryBounds for PrometheusConfiguration {
98 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
99 builder
100 .minimum()
101 .with_single_value::<Prometheus>("component struct");
103
104 builder
105 .firm()
106 .with_map::<Context, PrometheusValue>("state map", CONTEXT_LIMIT)
110 .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES)
111 .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES);
112 }
113}
114
115struct Prometheus {
116 listener: ConnectionOrientedListener,
117 metrics: FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>,
118 payload: Arc<RwLock<String>>,
119 renderer: PrometheusRenderer,
120 interner: FixedSizeInterner<1>,
121}
122
123#[async_trait]
124impl Destination for Prometheus {
125 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
126 let Self {
127 listener,
128 mut metrics,
129 payload,
130 mut renderer,
131 interner,
132 } = *self;
133
134 let mut health = context.take_health_handle();
135
136 let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload));
137 health.mark_ready();
138
139 debug!("Prometheus destination started.");
140
141 let mut contexts = 0;
142 let mut tags_deduplicator = ReusableDeduplicator::new();
143
144 loop {
145 select! {
146 _ = health.live() => continue,
147 maybe_events = context.events().next() => match maybe_events {
148 Some(events) => {
149 for event in events {
152 if let Some(metric) = event.try_into_metric() {
153 let prom_context = match into_prometheus_metric(&metric, &mut renderer, &interner) {
157 Some(prom_context) => prom_context,
158 None => continue,
159 };
160
161 let (context, values, _) = metric.into_parts();
162
163 let existing_contexts = metrics.entry(prom_context.clone()).or_default();
165 match existing_contexts.get_mut(&context) {
166 Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value),
167 None => {
168 if contexts >= CONTEXT_LIMIT {
169 debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name());
170 continue
171 }
172
173 let mut new_prom_value = get_prom_value_for_prom_context(&prom_context);
174 merge_metric_values_with_prom_value(values, &mut new_prom_value);
175
176 existing_contexts.insert(context, new_prom_value);
177 contexts += 1;
178 }
179 }
180 }
181 }
182
183 regenerate_payload(&metrics, &payload, &mut renderer, &mut tags_deduplicator).await;
185 },
186 None => break,
187 },
188 error = &mut http_error => {
189 if let Some(error) = error {
190 debug!(%error, "HTTP server error.");
191 }
192 break;
193 },
194 }
195 }
196
197 http_shutdown.shutdown();
200
201 debug!("Prometheus destination stopped.");
202
203 Ok(())
204 }
205}
206
207fn spawn_prom_scrape_service(
208 listener: ConnectionOrientedListener, payload: Arc<RwLock<String>>,
209) -> (ShutdownHandle, ErrorHandle) {
210 let service = service_fn(move |_: Request<Incoming>| {
211 let payload = Arc::clone(&payload);
212 async move {
213 let payload = payload.read().await;
214 Ok::<_, Infallible>(Response::new(axum::body::Body::from(payload.to_string())))
215 }
216 });
217
218 let http_server = HttpServer::from_listener(listener, service);
219 http_server.listen()
220}
221
222#[allow(clippy::mutable_key_type)]
223async fn regenerate_payload(
224 metrics: &FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>, payload: &Arc<RwLock<String>>,
225 renderer: &mut PrometheusRenderer, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
226) {
227 renderer.clear();
228
229 for (prom_context, contexts) in metrics {
230 if !write_metrics(renderer, prom_context, contexts, tags_deduplicator) {
231 debug!("Failed to write metric to payload. Continuing...");
232 continue;
233 }
234
235 if renderer.output().len() > PAYLOAD_SIZE_LIMIT_BYTES {
236 debug!(
237 payload_len = renderer.output().len(),
238 "Payload size limit exceeded. Skipping remaining metrics."
239 );
240 break;
241 }
242 }
243
244 let mut payload = payload.write().await;
245 payload.clear();
246 payload.push_str(renderer.output());
247}
248
249fn write_metrics(
250 renderer: &mut PrometheusRenderer, prom_context: &PrometheusContext,
251 contexts: &FastIndexMap<Context, PrometheusValue>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
252) -> bool {
253 if contexts.is_empty() {
254 debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name);
255 return true;
256 }
257
258 renderer.begin_group(&prom_context.metric_name, prom_context.metric_type, None);
259
260 for (context, values) in contexts {
261 let labels = match collect_tags(context, tags_deduplicator) {
262 Some(labels) => labels,
263 None => return false,
264 };
265
266 match values {
267 PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => {
268 renderer.write_gauge_or_counter_series(labels, *value);
269 }
270 PrometheusValue::Histogram(histogram) => {
271 renderer.write_histogram_series(labels, histogram.buckets(), histogram.sum, histogram.count);
272 }
273 PrometheusValue::Summary(sketch) => {
274 let quantiles = [0.1, 0.25, 0.5, 0.95, 0.99, 0.999]
275 .into_iter()
276 .map(|q| (q, sketch.quantile(q).unwrap_or_default()));
277
278 renderer.write_summary_series(labels, quantiles, sketch.sum().unwrap_or_default(), sketch.count());
279 }
280 }
281 }
282
283 renderer.finish_group();
284 true
285}
286
287fn collect_tags<'a>(
289 context: &'a Context, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
290) -> Option<Vec<(&'a str, &'a str)>> {
291 let mut labels = Vec::new();
292 let mut total_bytes = 0;
293
294 let chained_tags = context.tags().into_iter().chain(context.origin_tags());
295 let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
296
297 for tag in deduplicated_tags {
298 let tag_name = tag.name();
299 let tag_value = match tag.value() {
300 Some(value) => value,
301 None => {
302 debug!("Skipping bare tag.");
303 continue;
304 }
305 };
306
307 total_bytes += tag_name.len() + tag_value.len() + 4;
310 if total_bytes > TAGS_BUFFER_SIZE_LIMIT_BYTES {
311 debug!("Tags buffer size limit exceeded. Tags may be missing from this metric.");
312 return None;
313 }
314
315 labels.push((tag_name, tag_value));
316 }
317
318 Some(labels)
319}
320
321#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
322struct PrometheusContext {
323 metric_name: MetaString,
324 metric_type: MetricType,
325}
326
327enum PrometheusValue {
328 Counter(f64),
329 Gauge(f64),
330 Histogram(PrometheusHistogram),
331 Summary(DDSketch),
332}
333
334fn into_prometheus_metric(
335 metric: &Metric, renderer: &mut PrometheusRenderer, interner: &FixedSizeInterner<1>,
336) -> Option<PrometheusContext> {
337 let normalized = renderer.normalize_metric_name(metric.context().name());
339 let metric_name = match interner.try_intern(normalized).map(MetaString::from) {
340 Some(name) => name,
341 None => {
342 debug!(
343 "Failed to intern normalized metric name. Skipping metric '{}'.",
344 metric.context().name()
345 );
346 return None;
347 }
348 };
349
350 let metric_type = match metric.values() {
351 MetricValues::Counter(_) => MetricType::Counter,
352 MetricValues::Gauge(_) | MetricValues::Set(_) => MetricType::Gauge,
353 MetricValues::Histogram(_) => MetricType::Histogram,
354 MetricValues::Distribution(_) => MetricType::Summary,
355 _ => return None,
356 };
357
358 Some(PrometheusContext {
359 metric_name,
360 metric_type,
361 })
362}
363
364fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue {
365 match prom_context.metric_type {
366 MetricType::Counter => PrometheusValue::Counter(0.0),
367 MetricType::Gauge => PrometheusValue::Gauge(0.0),
368 MetricType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)),
369 MetricType::Summary => PrometheusValue::Summary(DDSketch::default()),
370 }
371}
372
373fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) {
374 match (values, prom_value) {
375 (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => {
376 for (_, value) in counter_values {
377 *prom_counter += value;
378 }
379 }
380 (MetricValues::Gauge(gauge_values), PrometheusValue::Gauge(prom_gauge)) => {
381 let latest_value = gauge_values
382 .into_iter()
383 .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
384 .map(|(_, value)| value)
385 .unwrap_or_default();
386 *prom_gauge = latest_value;
387 }
388 (MetricValues::Set(set_values), PrometheusValue::Gauge(prom_gauge)) => {
389 let latest_value = set_values
390 .into_iter()
391 .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
392 .map(|(_, value)| value)
393 .unwrap_or_default();
394 *prom_gauge = latest_value;
395 }
396 (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => {
397 for (_, value) in histogram_values {
398 prom_histogram.merge_histogram(&value);
399 }
400 }
401 (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => {
402 for (_, value) in distribution_values {
403 prom_summary.merge(&value);
404 }
405 }
406 _ => panic!("Mismatched metric types"),
407 }
408}
409
410#[derive(Clone)]
411struct PrometheusHistogram {
412 sum: f64,
413 count: u64,
414 buckets: Vec<(f64, &'static str, u64)>,
415}
416
417impl PrometheusHistogram {
418 fn new(metric_name: &str) -> Self {
419 let base_buckets = if metric_name.ends_with("_seconds") {
421 &TIME_HISTOGRAM_BUCKETS[..]
422 } else {
423 &NON_TIME_HISTOGRAM_BUCKETS[..]
424 };
425
426 let buckets = base_buckets
427 .iter()
428 .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
429 .collect();
430
431 Self {
432 sum: 0.0,
433 count: 0,
434 buckets,
435 }
436 }
437
438 fn merge_histogram(&mut self, histogram: &Histogram) {
439 for sample in histogram.samples() {
440 self.add_sample(sample.value.into_inner(), sample.weight);
441 }
442 }
443
444 fn add_sample(&mut self, value: f64, weight: u64) {
445 self.sum += value * weight as f64;
446 self.count += weight;
447
448 for (upper_bound, _, count) in &mut self.buckets {
450 if value <= *upper_bound {
451 *count += weight;
452 }
453 }
454 }
455
456 fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
457 self.buckets
458 .iter()
459 .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
460 }
461}
462
463fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
464 let mut buckets = [(0.0, ""); N];
472
473 let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
474 let pow = scale.powf(i as f64);
475 let value = base * pow;
476
477 let next_pow = scale.powf((i + 1) as f64);
478 let next_value = base * next_pow;
479 let midpoint = (value + next_value) / 2.0;
480
481 [value, midpoint]
482 });
483
484 for (i, current_le) in log_linear_buckets.enumerate().take(N) {
485 let (bucket_le, bucket_le_str) = &mut buckets[i];
486 let current_le_str = format!("{}", current_le);
487
488 *bucket_le = current_le;
489 *bucket_le_str = current_le_str.leak();
490 }
491
492 buckets
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[test]
500 fn bucket_print() {
501 println!("time buckets: {:?}", *TIME_HISTOGRAM_BUCKETS);
502 println!("non-time buckets: {:?}", *NON_TIME_HISTOGRAM_BUCKETS);
503 }
504
505 #[test]
506 fn prom_histogram_add_sample() {
507 let sample1 = (0.25, 1);
508 let sample2 = (1.0, 2);
509 let sample3 = (2.0, 3);
510
511 let mut histogram = PrometheusHistogram::new("time_metric_seconds");
512 histogram.add_sample(sample1.0, sample1.1);
513 histogram.add_sample(sample2.0, sample2.1);
514 histogram.add_sample(sample3.0, sample3.1);
515
516 let sample1_weighted_value = sample1.0 * sample1.1 as f64;
517 let sample2_weighted_value = sample2.0 * sample2.1 as f64;
518 let sample3_weighted_value = sample3.0 * sample3.1 as f64;
519 let expected_sum = sample1_weighted_value + sample2_weighted_value + sample3_weighted_value;
520 let expected_count = sample1.1 + sample2.1 + sample3.1;
521 assert_eq!(histogram.sum, expected_sum);
522 assert_eq!(histogram.count, expected_count);
523
524 let mut expected_bucket_count = 0;
526 for sample in [sample1, sample2, sample3] {
527 for bucket in &histogram.buckets {
528 if sample.0 <= bucket.0 {
531 assert!(bucket.2 >= expected_bucket_count + sample.1);
532 }
533 }
534
535 expected_bucket_count += sample.1;
537 }
538 }
539}