saluki_components/sources/heartbeat/
mod.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_context::Context;
6use saluki_core::components::{sources::*, ComponentContext};
7use saluki_core::data_model::event::{metric::Metric, Event, EventType};
8use saluki_core::topology::OutputDefinition;
9use saluki_error::GenericError;
10use tokio::{select, time::interval};
11use tracing::{debug, error};
12
13#[derive(Clone, Debug)]
17pub struct HeartbeatConfiguration {
18 pub heartbeat_interval_secs: u64,
20}
21
22impl Default for HeartbeatConfiguration {
23 fn default() -> Self {
24 Self {
25 heartbeat_interval_secs: 10,
26 }
27 }
28}
29
30struct Heartbeat {
31 heartbeat_interval_secs: u64,
32}
33
34#[async_trait]
35impl Source for Heartbeat {
36 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
37 let mut global_shutdown = context.take_shutdown_handle();
38 let mut health = context.take_health_handle();
39 let mut tick_interval = interval(Duration::from_secs(self.heartbeat_interval_secs));
40
41 health.mark_ready();
42 debug!("Heartbeat source started.");
43
44 loop {
45 select! {
46 _ = &mut global_shutdown => {
47 debug!("Received shutdown signal.");
48 break;
49 },
50 _ = health.live() => continue,
51 _ = tick_interval.tick() => {
52 let metric_context = Context::from_static_name("heartbeat");
54 let metric = Metric::gauge(metric_context, 1.0);
55 let mut buffered_dispatcher = context.dispatcher().buffered().expect("default output must always exist");
56
57 if let Err(e) = buffered_dispatcher.push(Event::Metric(metric)).await {
58 error!(error = %e, "Failed to dispatch event.");
59 } else if let Err(e) = buffered_dispatcher.flush().await {
60 error!(error = %e, "Failed to dispatch events.");
61 } else {
62 debug!("Emitted heartbeat metric.");
63 }
64 }
65 }
66 }
67
68 debug!("Heartbeat source stopped.");
69 Ok(())
70 }
71}
72
73#[async_trait]
74impl SourceBuilder for HeartbeatConfiguration {
75 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
76 Ok(Box::new(Heartbeat {
77 heartbeat_interval_secs: self.heartbeat_interval_secs,
78 }))
79 }
80
81 fn outputs(&self) -> &[OutputDefinition] {
82 static OUTPUTS: [OutputDefinition; 1] = [OutputDefinition::default_output(EventType::Metric)];
83
84 &OUTPUTS
85 }
86}
87
88impl MemoryBounds for HeartbeatConfiguration {
89 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
90 builder.minimum().with_single_value::<Heartbeat>("component struct");
92 }
93}