saluki_components/sources/internal_metrics/
mod.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures::StreamExt as _;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_core::data_model::event::EventType;
7use saluki_core::{
8 components::{sources::*, ComponentContext},
9 observability::metrics::MetricsStream,
10 topology::OutputDefinition,
11};
12use saluki_error::GenericError;
13use tokio::select;
14use tracing::{debug, error};
15
16pub struct InternalMetricsConfiguration;
20
21#[async_trait]
22impl SourceBuilder for InternalMetricsConfiguration {
23 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
24 Ok(Box::new(InternalMetrics))
25 }
26
27 fn outputs(&self) -> &[OutputDefinition<EventType>] {
28 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
29 OUTPUTS
30 }
31}
32
33impl MemoryBounds for InternalMetricsConfiguration {
34 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
35 builder
37 .minimum()
38 .with_single_value::<InternalMetrics>("component struct");
39 }
40}
41
42pub struct InternalMetrics;
43
44#[async_trait]
45impl Source for InternalMetrics {
46 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
47 let mut global_shutdown = context.take_shutdown_handle();
48 let mut health = context.take_health_handle();
49
50 let mut metrics_stream = MetricsStream::register();
51
52 health.mark_ready();
53 debug!("Internal Metrics source started.");
54
55 loop {
56 select! {
57 _ = &mut global_shutdown => {
58 debug!("Received shutdown signal.");
59 break;
60 },
61 _ = health.live() => continue,
62 maybe_metrics = metrics_stream.next() => match maybe_metrics {
63 Some(metrics) => {
64 debug!(metrics_len = metrics.len(), "Received internal metrics.");
65
66 let events = Arc::unwrap_or_clone(metrics);
67 if let Err(e) = context.dispatcher().buffered()?.send_all(events).await {
68 error!(error = %e, "Failed to dispatch events.");
69 }
70 },
71 None => {
72 error!("Internal metrics stream ended unexpectedly.");
73 break;
74 },
75 },
76 }
77 }
78
79 debug!("Internal Metrics source stopped.");
80
81 Ok(())
82 }
83}