Skip to main content

saluki_components/sources/heartbeat/
mod.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_context::Context;
6use saluki_core::components::{sources::*, ComponentContext};
7use saluki_core::data_model::event::{metric::Metric, Event, EventType};
8use saluki_core::topology::OutputDefinition;
9use saluki_error::GenericError;
10use tokio::pin;
11use tokio::{select, time::interval};
12use tracing::{debug, error};
13
14/// Heartbeat source.
15///
16/// Emits a "heartbeat" metric on a configurable interval.
17#[derive(Clone, Debug)]
18pub struct HeartbeatConfiguration {
19    /// Interval for heartbeat metrics in seconds
20    pub heartbeat_interval_secs: u64,
21}
22
23impl Default for HeartbeatConfiguration {
24    fn default() -> Self {
25        Self {
26            heartbeat_interval_secs: 10,
27        }
28    }
29}
30
31struct Heartbeat {
32    heartbeat_interval_secs: u64,
33}
34
35#[async_trait]
36impl Source for Heartbeat {
37    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
38        let global_shutdown = context.take_shutdown_handle();
39        pin!(global_shutdown);
40
41        let mut health = context.take_health_handle();
42        let mut tick_interval = interval(Duration::from_secs(self.heartbeat_interval_secs));
43
44        health.mark_ready();
45        debug!("Heartbeat source started.");
46
47        loop {
48            select! {
49                _ = &mut global_shutdown => {
50                    debug!("Received shutdown signal.");
51                    break;
52                },
53                _ = health.live() => continue,
54                _ = tick_interval.tick() => {
55                    // Create a simple heartbeat metric
56                    let metric_context = Context::from_static_name("heartbeat");
57                    let metric = Metric::gauge(metric_context, 1.0);
58                    let mut buffered_dispatcher = context.dispatcher().buffered().expect("default output must always exist");
59
60                    if let Err(e) = buffered_dispatcher.push(Event::Metric(metric)).await {
61                        error!(error = %e, "Failed to dispatch event.");
62                    } else if let Err(e) = buffered_dispatcher.flush().await {
63                        error!(error = %e, "Failed to dispatch events.");
64                    } else {
65                        debug!("Emitted heartbeat metric.");
66                    }
67                }
68            }
69        }
70
71        debug!("Heartbeat source stopped.");
72        Ok(())
73    }
74}
75
76#[async_trait]
77impl SourceBuilder for HeartbeatConfiguration {
78    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
79        Ok(Box::new(Heartbeat {
80            heartbeat_interval_secs: self.heartbeat_interval_secs,
81        }))
82    }
83
84    fn outputs(&self) -> &[OutputDefinition<EventType>] {
85        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
86        OUTPUTS
87    }
88}
89
90impl MemoryBounds for HeartbeatConfiguration {
91    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
92        // Minimal memory footprint when emitting heartbeat metrics
93        builder.minimum().with_single_value::<Heartbeat>("component struct");
94    }
95}