saluki_components/sources/heartbeat/
mod.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_context::Context;
6use saluki_core::components::{sources::*, ComponentContext};
7use saluki_core::data_model::event::{metric::Metric, Event, EventType};
8use saluki_core::topology::OutputDefinition;
9use saluki_error::GenericError;
10use tokio::pin;
11use tokio::{select, time::interval};
12use tracing::{debug, error};
13
14#[derive(Clone, Debug)]
18pub struct HeartbeatConfiguration {
19 pub heartbeat_interval_secs: u64,
21}
22
23impl Default for HeartbeatConfiguration {
24 fn default() -> Self {
25 Self {
26 heartbeat_interval_secs: 10,
27 }
28 }
29}
30
31struct Heartbeat {
32 heartbeat_interval_secs: u64,
33}
34
35#[async_trait]
36impl Source for Heartbeat {
37 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
38 let global_shutdown = context.take_shutdown_handle();
39 pin!(global_shutdown);
40
41 let mut health = context.take_health_handle();
42 let mut tick_interval = interval(Duration::from_secs(self.heartbeat_interval_secs));
43
44 health.mark_ready();
45 debug!("Heartbeat source started.");
46
47 loop {
48 select! {
49 _ = &mut global_shutdown => {
50 debug!("Received shutdown signal.");
51 break;
52 },
53 _ = health.live() => continue,
54 _ = tick_interval.tick() => {
55 let metric_context = Context::from_static_name("heartbeat");
57 let metric = Metric::gauge(metric_context, 1.0);
58 let mut buffered_dispatcher = context.dispatcher().buffered().expect("default output must always exist");
59
60 if let Err(e) = buffered_dispatcher.push(Event::Metric(metric)).await {
61 error!(error = %e, "Failed to dispatch event.");
62 } else if let Err(e) = buffered_dispatcher.flush().await {
63 error!(error = %e, "Failed to dispatch events.");
64 } else {
65 debug!("Emitted heartbeat metric.");
66 }
67 }
68 }
69 }
70
71 debug!("Heartbeat source stopped.");
72 Ok(())
73 }
74}
75
76#[async_trait]
77impl SourceBuilder for HeartbeatConfiguration {
78 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
79 Ok(Box::new(Heartbeat {
80 heartbeat_interval_secs: self.heartbeat_interval_secs,
81 }))
82 }
83
84 fn outputs(&self) -> &[OutputDefinition<EventType>] {
85 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
86 OUTPUTS
87 }
88}
89
90impl MemoryBounds for HeartbeatConfiguration {
91 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
92 builder.minimum().with_single_value::<Heartbeat>("component struct");
94 }
95}