saluki_components/sources/checks/
mod.rs

1/// Checks source.
2///
3/// Listen to Autodiscovery events, schedule checks and emit results.
4use std::collections::HashSet;
5use std::sync::{Arc, LazyLock};
6
7use async_trait::async_trait;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_core::{
12    components::{sources::*, ComponentContext},
13    data_model::event::{Event, EventType},
14    topology::{
15        shutdown::{ComponentShutdownHandle, DynamicShutdownCoordinator, DynamicShutdownHandle},
16        OutputDefinition,
17    },
18};
19use saluki_env::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider};
20use saluki_env::{EnvironmentProvider, HostProvider};
21use saluki_error::{generic_error, GenericError};
22use serde::Deserialize;
23use tokio::select;
24use tokio::sync::broadcast::Receiver;
25use tokio::sync::mpsc;
26use tracing::{debug, error, info, trace, warn};
27
28mod scheduler;
29use self::scheduler::Scheduler;
30
31mod check_metric;
32
33mod check;
34use self::check::Check;
35
36mod builder;
37
38mod execution_context;
39#[cfg(feature = "python-checks")]
40use self::builder::python::builder::PythonCheckBuilder;
41use self::builder::CheckBuilder;
42use self::execution_context::ExecutionContext;
43
44const fn default_check_runners() -> usize {
45    4
46}
47
48/// Configuration for the checks source.
49#[derive(Deserialize)]
50pub struct ChecksConfiguration<E = ()> {
51    /// The number of check runners to use.
52    ///
53    /// Defaults to 4.
54    #[serde(default = "default_check_runners")]
55    check_runners: usize,
56
57    /// Additional checks.d folders.
58    #[serde(default)]
59    additional_checksd: String,
60
61    #[serde(skip)]
62    environment_provider: Option<E>,
63
64    #[serde(skip)]
65    full_configuration: Option<GenericConfiguration>,
66}
67
68impl ChecksConfiguration {
69    /// Creates a new `ChecksConfiguration` from the given configuration.
70    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
71        let mut checks_config = config.as_typed::<Self>()?;
72        checks_config.full_configuration = Some(config.clone());
73        Ok(checks_config)
74    }
75}
76
77impl<E> ChecksConfiguration<E> {
78    /// Sets the environment provider to use.
79    pub fn with_environment_provider<E2>(self, environment_provider: E2) -> ChecksConfiguration<E2> {
80        ChecksConfiguration {
81            check_runners: self.check_runners,
82            additional_checksd: self.additional_checksd,
83            environment_provider: Some(environment_provider),
84            full_configuration: self.full_configuration,
85        }
86    }
87}
88
89#[async_trait]
90impl<E> SourceBuilder for ChecksConfiguration<E>
91where
92    E: EnvironmentProvider + Send + Sync + 'static,
93    <E::Host as HostProvider>::Error: Into<GenericError> + std::fmt::Debug,
94{
95    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
96        let environment_provider = self
97            .environment_provider
98            .as_ref()
99            .ok_or_else(|| generic_error!("No environment provider configured."))?;
100
101        let receiver = environment_provider
102            .autodiscovery()
103            .subscribe()
104            .await
105            .ok_or_else(|| generic_error!("No autodiscovery stream configured."))?;
106
107        let configuration = self
108            .full_configuration
109            .as_ref()
110            .ok_or_else(|| generic_error!("No configuration configured."))?;
111
112        let execution_context =
113            ExecutionContext::from_environment_provider(configuration.clone(), environment_provider).await;
114
115        Ok(Box::new(ChecksSource {
116            autodiscovery_rx: receiver,
117            check_runners: self.check_runners,
118            custom_checks_dirs: if !self.additional_checksd.is_empty() {
119                Some(self.additional_checksd.split(",").map(|s| s.to_string()).collect())
120            } else {
121                None
122            },
123            execution_context,
124        }))
125    }
126
127    fn outputs(&self) -> &[OutputDefinition<EventType>] {
128        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
129            vec![
130                OutputDefinition::named_output("metrics", EventType::Metric),
131                OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
132                OutputDefinition::named_output("events", EventType::EventD),
133            ]
134        });
135        &OUTPUTS
136    }
137}
138
139impl<E> MemoryBounds for ChecksConfiguration<E> {
140    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
141        builder.minimum().with_single_value::<ChecksSource>("component struct");
142    }
143}
144
145struct ChecksSource {
146    autodiscovery_rx: Receiver<AutodiscoveryEvent>,
147    check_runners: usize,
148    custom_checks_dirs: Option<Vec<String>>,
149    execution_context: ExecutionContext,
150}
151
152impl ChecksSource {
153    /// Builds the check builders for the source.
154    fn builders(
155        &self, check_events_tx: mpsc::Sender<Event>, execution_context: ExecutionContext,
156    ) -> Vec<Arc<dyn CheckBuilder + Send + Sync>> {
157        #[cfg(feature = "python-checks")]
158        {
159            vec![Arc::new(PythonCheckBuilder::new(
160                check_events_tx,
161                self.custom_checks_dirs.clone(),
162                execution_context,
163            ))]
164        }
165        #[cfg(not(feature = "python-checks"))]
166        {
167            let _ = check_events_tx; // Suppress unused variable warning
168            let _ = &self.custom_checks_dirs; // Suppress unused field warning
169            let _ = execution_context; // Suppress unused field warning
170            vec![]
171        }
172    }
173}
174
175#[async_trait]
176impl Source for ChecksSource {
177    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
178        let mut global_shutdown: ComponentShutdownHandle = context.take_shutdown_handle();
179        let mut health = context.take_health_handle();
180        health.mark_ready();
181
182        info!("Checks source started.");
183
184        let (check_events_tx, check_event_rx) = mpsc::channel(128);
185
186        let mut check_builders: Vec<Arc<dyn CheckBuilder + Send + Sync>> =
187            self.builders(check_events_tx, self.execution_context.clone());
188
189        let mut check_ids = HashSet::new();
190        let scheduler = Scheduler::new(self.check_runners);
191
192        let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
193
194        spawn_traced_named(
195            "checks-events-listener",
196            drain_and_dispatch_check_events(listener_shutdown_coordinator.register(), context, check_event_rx),
197        );
198
199        let mut autodiscovery_rx = self.autodiscovery_rx;
200
201        loop {
202            select! {
203                _ = &mut global_shutdown => {
204                    debug!("Checks source received shutdown signal.");
205                    scheduler.shutdown().await;
206                    break
207                },
208
209                _ = health.live() => continue,
210
211                event = autodiscovery_rx.recv() => match event {
212                        Ok(event) => {
213                            match event {
214                                AutodiscoveryEvent::CheckSchedule { config } => {
215                                    let mut runnable_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![];
216                                    for instance in &config.instances {
217                                        let check_id = instance.id();
218                                        if check_ids.contains(check_id) {
219                                            continue;
220                                        }
221
222                                        for builder in check_builders.iter_mut() {
223                                            if let Some(check) = builder.build_check(&config.name, instance, &config.init_config, &config.source) {
224                                                debug!(
225                                                    check_id = check.id(),
226                                                    check_name = config.name.as_ref(),
227                                                    check_version = check.version(),
228                                                    check_source = check.source(),
229                                                    "Built new check instance."
230                                                );
231                                                runnable_checks.push(check);
232                                                check_ids.insert(check_id.clone());
233                                                break;
234                                            }
235                                        }
236                                    }
237
238                                    for check in runnable_checks {
239                                        scheduler.schedule(check);
240                                    }
241                                }
242                                AutodiscoveryEvent::CheckUnscheduled { config } => {
243                                    for instance in &config.instances {
244                                        let check_id = instance.id();
245                                        if !check_ids.contains(check_id) {
246                                            warn!("Unscheduling check {} not found, skipping.", check_id);
247                                            continue;
248                                        }
249
250                                        scheduler.unschedule(check_id);
251                                    }
252                                }
253                                // We only care about CheckSchedule and CheckUnscheduled events
254                                _ => {}
255                            }
256                        }
257                        Err(e) => {
258                            error!("Error receiving event: {:?}", e);
259                        }
260                }
261            }
262        }
263
264        listener_shutdown_coordinator.shutdown().await;
265
266        info!("Checks source stopped.");
267
268        Ok(())
269    }
270}
271
272async fn drain_and_dispatch_check_events(
273    shutdown_handle: DynamicShutdownHandle, context: SourceContext, mut check_event_rx: mpsc::Receiver<Event>,
274) {
275    tokio::pin!(shutdown_handle);
276
277    loop {
278        select! {
279            _ = &mut shutdown_handle => {
280                debug!("Checks events listeners received shutdown signal.");
281                break;
282            }
283            result = check_event_rx.recv() => match result {
284                None => break,
285                Some(check_event) => {
286                    trace!("Received check event: {:?}", check_event);
287
288                    match check_event.event_type() {
289                       EventType::Metric => {
290                            let mut buffered_dispatcher = context
291                                .dispatcher()
292                                .buffered_named("metrics")
293                                .expect("checks source metrics output should always exist");
294
295                                if let Err(e) = buffered_dispatcher.push(check_event).await {
296                                    error!(error = %e, "Failed to forward check metric.");
297                                }
298
299                                if let Err(e) = buffered_dispatcher.flush().await {
300                                    error!(error = %e, "Failed to flush check metric.");
301                                }
302                        },
303                        EventType::ServiceCheck => {
304                            let mut buffered_dispatcher = context
305                                .dispatcher()
306                                .buffered_named("service_checks")
307                                .expect("checks source service checks output should always exist");
308
309                                if let Err(e) = buffered_dispatcher.push(check_event).await {
310                                    error!(error = %e, "Failed to forward check service check.");
311                                }
312
313                                if let Err(e) = buffered_dispatcher.flush().await {
314                                    error!(error = %e, "Failed to flush check service check.");
315                                }
316                        },
317                        EventType::EventD => {
318                            let mut buffered_dispatcher = context
319                                .dispatcher()
320                                .buffered_named("events")
321                                .expect("checks source events output should always exist");
322
323                                if let Err(e) = buffered_dispatcher.push(check_event).await {
324                                    error!(error = %e, "Failed to forward check event");
325                                }
326
327                                if let Err(e) = buffered_dispatcher.flush().await {
328                                    error!(error = %e, "Failed to flush check event.");
329                                }
330                        },
331                        _ => {},
332                    }
333
334                }
335            }
336        }
337    }
338}