saluki_components/sources/internal_metrics/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures::StreamExt as _;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_core::data_model::event::EventType;
7use saluki_core::{
8    components::{sources::*, ComponentContext},
9    observability::metrics::MetricsStream,
10    topology::OutputDefinition,
11};
12use saluki_error::GenericError;
13use tokio::select;
14use tracing::{debug, error};
15
16/// Internal metrics source.
17///
18/// Collects all metrics that are emitted internally (via the `metrics` crate) and forwards them as-is.
19pub struct InternalMetricsConfiguration;
20
21#[async_trait]
22impl SourceBuilder for InternalMetricsConfiguration {
23    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
24        Ok(Box::new(InternalMetrics))
25    }
26
27    fn outputs(&self) -> &[OutputDefinition<EventType>] {
28        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
29        OUTPUTS
30    }
31}
32
33impl MemoryBounds for InternalMetricsConfiguration {
34    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
35        // Capture the size of the heap allocation when the component is built.
36        builder
37            .minimum()
38            .with_single_value::<InternalMetrics>("component struct");
39    }
40}
41
42pub struct InternalMetrics;
43
44#[async_trait]
45impl Source for InternalMetrics {
46    async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
47        let mut global_shutdown = context.take_shutdown_handle();
48        let mut health = context.take_health_handle();
49
50        let mut metrics_stream = MetricsStream::register();
51
52        health.mark_ready();
53        debug!("Internal Metrics source started.");
54
55        loop {
56            select! {
57                _ = &mut global_shutdown => {
58                    debug!("Received shutdown signal.");
59                    break;
60                },
61                _ = health.live() => continue,
62                maybe_metrics = metrics_stream.next() => match maybe_metrics {
63                    Some(metrics) => {
64                        debug!(metrics_len = metrics.len(), "Received internal metrics.");
65
66                        let events = Arc::unwrap_or_clone(metrics);
67                        if let Err(e) = context.dispatcher().buffered()?.send_all(events).await {
68                            error!(error = %e, "Failed to dispatch events.");
69                        }
70                    },
71                    None => {
72                        error!("Internal metrics stream ended unexpectedly.");
73                        break;
74                    },
75                },
76            }
77        }
78
79        debug!("Internal Metrics source stopped.");
80
81        Ok(())
82    }
83}