saluki_components/sources/heartbeat/
mod.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_context::Context;
6use saluki_core::components::{sources::*, ComponentContext};
7use saluki_core::data_model::event::{metric::Metric, Event, EventType};
8use saluki_core::topology::OutputDefinition;
9use saluki_error::GenericError;
10use tokio::{select, time::interval};
11use tracing::{debug, error};
12
13/// Heartbeat source.
14///
15/// Emits a "heartbeat" metric on a configurable interval.
16#[derive(Clone, Debug)]
17pub struct HeartbeatConfiguration {
18    /// Interval for heartbeat metrics in seconds
19    pub heartbeat_interval_secs: u64,
20}
21
22impl Default for HeartbeatConfiguration {
23    fn default() -> Self {
24        Self {
25            heartbeat_interval_secs: 10,
26        }
27    }
28}
29
30struct Heartbeat {
31    heartbeat_interval_secs: u64,
32}
33
34#[async_trait]
35impl Source for Heartbeat {
36    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
37        let mut global_shutdown = context.take_shutdown_handle();
38        let mut health = context.take_health_handle();
39        let mut tick_interval = interval(Duration::from_secs(self.heartbeat_interval_secs));
40
41        health.mark_ready();
42        debug!("Heartbeat source started.");
43
44        loop {
45            select! {
46                _ = &mut global_shutdown => {
47                    debug!("Received shutdown signal.");
48                    break;
49                },
50                _ = health.live() => continue,
51                _ = tick_interval.tick() => {
52                    // Create a simple heartbeat metric
53                    let metric_context = Context::from_static_name("heartbeat");
54                    let metric = Metric::gauge(metric_context, 1.0);
55                    let mut buffered_dispatcher = context.dispatcher().buffered().expect("default output must always exist");
56
57                    if let Err(e) = buffered_dispatcher.push(Event::Metric(metric)).await {
58                        error!(error = %e, "Failed to dispatch event.");
59                    } else if let Err(e) = buffered_dispatcher.flush().await {
60                        error!(error = %e, "Failed to dispatch events.");
61                    } else {
62                        debug!("Emitted heartbeat metric.");
63                    }
64                }
65            }
66        }
67
68        debug!("Heartbeat source stopped.");
69        Ok(())
70    }
71}
72
73#[async_trait]
74impl SourceBuilder for HeartbeatConfiguration {
75    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
76        Ok(Box::new(Heartbeat {
77            heartbeat_interval_secs: self.heartbeat_interval_secs,
78        }))
79    }
80
81    fn outputs(&self) -> &[OutputDefinition] {
82        static OUTPUTS: [OutputDefinition; 1] = [OutputDefinition::default_output(EventType::Metric)];
83
84        &OUTPUTS
85    }
86}
87
88impl MemoryBounds for HeartbeatConfiguration {
89    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
90        // Minimal memory footprint when emitting heartbeat metrics
91        builder.minimum().with_single_value::<Heartbeat>("component struct");
92    }
93}