saluki_components/sources/internal_metrics/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures::StreamExt as _;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_core::data_model::event::EventType;
7use saluki_core::{
8    components::{sources::*, ComponentContext},
9    observability::metrics::MetricsStream,
10    topology::OutputDefinition,
11};
12use saluki_error::GenericError;
13use tokio::select;
14use tracing::{debug, error};
15
16/// Internal metrics source.
17///
18/// Collects all metrics that are emitted internally (via the `metrics` crate) and forwards them as-is.
19pub struct InternalMetricsConfiguration;
20
21#[async_trait]
22impl SourceBuilder for InternalMetricsConfiguration {
23    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
24        Ok(Box::new(InternalMetrics))
25    }
26
27    fn outputs(&self) -> &[OutputDefinition] {
28        static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::Metric)];
29
30        OUTPUTS
31    }
32}
33
34impl MemoryBounds for InternalMetricsConfiguration {
35    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
36        // Capture the size of the heap allocation when the component is built.
37        builder
38            .minimum()
39            .with_single_value::<InternalMetrics>("component struct");
40    }
41}
42
43pub struct InternalMetrics;
44
45#[async_trait]
46impl Source for InternalMetrics {
47    async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
48        let mut global_shutdown = context.take_shutdown_handle();
49        let mut health = context.take_health_handle();
50
51        let mut metrics_stream = MetricsStream::register();
52
53        health.mark_ready();
54        debug!("Internal Metrics source started.");
55
56        loop {
57            select! {
58                _ = &mut global_shutdown => {
59                    debug!("Received shutdown signal.");
60                    break;
61                },
62                _ = health.live() => continue,
63                maybe_metrics = metrics_stream.next() => match maybe_metrics {
64                    Some(metrics) => {
65                        debug!(metrics_len = metrics.len(), "Received internal metrics.");
66
67                        let events = Arc::unwrap_or_clone(metrics);
68                        if let Err(e) = context.dispatcher().buffered()?.send_all(events).await {
69                            error!(error = %e, "Failed to dispatch events.");
70                        }
71                    },
72                    None => {
73                        error!("Internal metrics stream ended unexpectedly.");
74                        break;
75                    },
76                },
77            }
78        }
79
80        debug!("Internal Metrics source stopped.");
81
82        Ok(())
83    }
84}