saluki_components/sources/internal_metrics/
mod.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures::StreamExt as _;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_core::data_model::event::EventType;
7use saluki_core::{
8 components::{sources::*, ComponentContext},
9 observability::metrics::MetricsStream,
10 topology::OutputDefinition,
11};
12use saluki_error::GenericError;
13use tokio::select;
14use tracing::{debug, error};
15
16pub struct InternalMetricsConfiguration;
20
21#[async_trait]
22impl SourceBuilder for InternalMetricsConfiguration {
23 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
24 Ok(Box::new(InternalMetrics))
25 }
26
27 fn outputs(&self) -> &[OutputDefinition] {
28 static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::Metric)];
29
30 OUTPUTS
31 }
32}
33
34impl MemoryBounds for InternalMetricsConfiguration {
35 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
36 builder
38 .minimum()
39 .with_single_value::<InternalMetrics>("component struct");
40 }
41}
42
43pub struct InternalMetrics;
44
45#[async_trait]
46impl Source for InternalMetrics {
47 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
48 let mut global_shutdown = context.take_shutdown_handle();
49 let mut health = context.take_health_handle();
50
51 let mut metrics_stream = MetricsStream::register();
52
53 health.mark_ready();
54 debug!("Internal Metrics source started.");
55
56 loop {
57 select! {
58 _ = &mut global_shutdown => {
59 debug!("Received shutdown signal.");
60 break;
61 },
62 _ = health.live() => continue,
63 maybe_metrics = metrics_stream.next() => match maybe_metrics {
64 Some(metrics) => {
65 debug!(metrics_len = metrics.len(), "Received internal metrics.");
66
67 let events = Arc::unwrap_or_clone(metrics);
68 if let Err(e) = context.dispatcher().buffered()?.send_all(events).await {
69 error!(error = %e, "Failed to dispatch events.");
70 }
71 },
72 None => {
73 error!("Internal metrics stream ended unexpectedly.");
74 break;
75 },
76 },
77 }
78 }
79
80 debug!("Internal Metrics source stopped.");
81
82 Ok(())
83 }
84}