saluki_components/sources/checks/
mod.rs

1/// Checks source.
2///
3/// Listen to Autodiscovery events, schedule checks and emit results.
4use std::collections::HashSet;
5use std::sync::{Arc, LazyLock};
6
7use async_trait::async_trait;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_core::{
12    components::{sources::*, ComponentContext},
13    data_model::event::{Event, EventType},
14    topology::{
15        shutdown::{ComponentShutdownHandle, DynamicShutdownCoordinator, DynamicShutdownHandle},
16        OutputDefinition,
17    },
18};
19use saluki_env::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider};
20use saluki_env::{EnvironmentProvider, HostProvider};
21use saluki_error::{generic_error, GenericError};
22use serde::Deserialize;
23use tokio::select;
24use tokio::sync::broadcast::Receiver;
25use tokio::sync::mpsc;
26use tracing::{debug, error, info, trace, warn};
27
28mod scheduler;
29use self::scheduler::Scheduler;
30
31mod check_metric;
32
33mod check;
34use self::check::Check;
35
36mod builder;
37
38mod execution_context;
39#[cfg(feature = "python-checks")]
40use self::builder::python::builder::PythonCheckBuilder;
41use self::builder::CheckBuilder;
42use self::execution_context::ExecutionContext;
43
44const fn default_check_runners() -> usize {
45    4
46}
47
48/// Configuration for the checks source.
49#[derive(Deserialize)]
50pub struct ChecksConfiguration<E = ()> {
51    /// The number of check runners to use.
52    ///
53    /// Defaults to 4.
54    #[serde(default = "default_check_runners")]
55    check_runners: usize,
56
57    /// Additional checks.d folders.
58    #[serde(default)]
59    additional_checksd: String,
60
61    #[serde(skip)]
62    environment_provider: Option<E>,
63
64    #[serde(skip)]
65    full_configuration: Option<GenericConfiguration>,
66}
67
68impl ChecksConfiguration {
69    /// Creates a new `ChecksConfiguration` from the given configuration.
70    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
71        let mut checks_config = config.as_typed::<Self>()?;
72        checks_config.full_configuration = Some(config.clone());
73        Ok(checks_config)
74    }
75}
76
77impl<E> ChecksConfiguration<E> {
78    /// Sets the environment provider to use.
79    pub fn with_environment_provider<E2>(self, environment_provider: E2) -> ChecksConfiguration<E2> {
80        ChecksConfiguration {
81            check_runners: self.check_runners,
82            additional_checksd: self.additional_checksd,
83            environment_provider: Some(environment_provider),
84            full_configuration: self.full_configuration,
85        }
86    }
87}
88
89#[async_trait]
90impl<E> SourceBuilder for ChecksConfiguration<E>
91where
92    E: EnvironmentProvider + Send + Sync + 'static,
93    <E::Host as HostProvider>::Error: Into<GenericError> + std::fmt::Debug,
94{
95    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
96        let environment_provider = self
97            .environment_provider
98            .as_ref()
99            .ok_or_else(|| generic_error!("No environment provider configured."))?;
100
101        let receiver = environment_provider
102            .autodiscovery()
103            .subscribe()
104            .await
105            .ok_or_else(|| generic_error!("No autodiscovery stream configured."))?;
106
107        let configuration = self
108            .full_configuration
109            .as_ref()
110            .ok_or_else(|| generic_error!("No configuration configured."))?;
111
112        let execution_context =
113            ExecutionContext::from_environment_provider(configuration.clone(), environment_provider).await;
114
115        Ok(Box::new(ChecksSource {
116            autodiscovery_rx: receiver,
117            check_runners: self.check_runners,
118            custom_checks_dirs: if !self.additional_checksd.is_empty() {
119                Some(self.additional_checksd.split(",").map(|s| s.to_string()).collect())
120            } else {
121                None
122            },
123            execution_context,
124        }))
125    }
126
127    fn outputs(&self) -> &[OutputDefinition] {
128        static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
129            vec![
130                OutputDefinition::named_output("metrics", EventType::Metric),
131                OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
132                OutputDefinition::named_output("events", EventType::EventD),
133            ]
134        });
135
136        &OUTPUTS
137    }
138}
139
140impl<E> MemoryBounds for ChecksConfiguration<E> {
141    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
142        builder.minimum().with_single_value::<ChecksSource>("component struct");
143    }
144}
145
146struct ChecksSource {
147    autodiscovery_rx: Receiver<AutodiscoveryEvent>,
148    check_runners: usize,
149    custom_checks_dirs: Option<Vec<String>>,
150    execution_context: ExecutionContext,
151}
152
153impl ChecksSource {
154    /// Builds the check builders for the source.
155    fn builders(
156        &self, check_events_tx: mpsc::Sender<Event>, execution_context: ExecutionContext,
157    ) -> Vec<Arc<dyn CheckBuilder + Send + Sync>> {
158        #[cfg(feature = "python-checks")]
159        {
160            vec![Arc::new(PythonCheckBuilder::new(
161                check_events_tx,
162                self.custom_checks_dirs.clone(),
163                execution_context,
164            ))]
165        }
166        #[cfg(not(feature = "python-checks"))]
167        {
168            let _ = check_events_tx; // Suppress unused variable warning
169            let _ = &self.custom_checks_dirs; // Suppress unused field warning
170            let _ = execution_context; // Suppress unused field warning
171            vec![]
172        }
173    }
174}
175
176#[async_trait]
177impl Source for ChecksSource {
178    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
179        let mut global_shutdown: ComponentShutdownHandle = context.take_shutdown_handle();
180        let mut health = context.take_health_handle();
181        health.mark_ready();
182
183        info!("Checks source started.");
184
185        let (check_events_tx, check_event_rx) = mpsc::channel(128);
186
187        let mut check_builders: Vec<Arc<dyn CheckBuilder + Send + Sync>> =
188            self.builders(check_events_tx, self.execution_context.clone());
189
190        let mut check_ids = HashSet::new();
191        let scheduler = Scheduler::new(self.check_runners);
192
193        let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
194
195        spawn_traced_named(
196            "checks-events-listener",
197            drain_and_dispatch_check_events(listener_shutdown_coordinator.register(), context, check_event_rx),
198        );
199
200        let mut autodiscovery_rx = self.autodiscovery_rx;
201
202        loop {
203            select! {
204                _ = &mut global_shutdown => {
205                    debug!("Checks source received shutdown signal.");
206                    scheduler.shutdown().await;
207                    break
208                },
209
210                _ = health.live() => continue,
211
212                event = autodiscovery_rx.recv() => match event {
213                        Ok(event) => {
214                            match event {
215                                AutodiscoveryEvent::CheckSchedule { config } => {
216                                    let mut runnable_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![];
217                                    for instance in &config.instances {
218                                        let check_id = instance.id();
219                                        if check_ids.contains(check_id) {
220                                            continue;
221                                        }
222
223                                        for builder in check_builders.iter_mut() {
224                                            if let Some(check) = builder.build_check(&config.name, instance, &config.init_config, &config.source) {
225                                                debug!(
226                                                    check_id = check.id(),
227                                                    check_name = config.name.as_ref(),
228                                                    check_version = check.version(),
229                                                    check_source = check.source(),
230                                                    "Built new check instance."
231                                                );
232                                                runnable_checks.push(check);
233                                                check_ids.insert(check_id.clone());
234                                                break;
235                                            }
236                                        }
237                                    }
238
239                                    for check in runnable_checks {
240                                        scheduler.schedule(check);
241                                    }
242                                }
243                                AutodiscoveryEvent::CheckUnscheduled { config } => {
244                                    for instance in &config.instances {
245                                        let check_id = instance.id();
246                                        if !check_ids.contains(check_id) {
247                                            warn!("Unscheduling check {} not found, skipping.", check_id);
248                                            continue;
249                                        }
250
251                                        scheduler.unschedule(check_id);
252                                    }
253                                }
254                                // We only care about CheckSchedule and CheckUnscheduled events
255                                _ => {}
256                            }
257                        }
258                        Err(e) => {
259                            error!("Error receiving event: {:?}", e);
260                        }
261                }
262            }
263        }
264
265        listener_shutdown_coordinator.shutdown().await;
266
267        info!("Checks source stopped.");
268
269        Ok(())
270    }
271}
272
273async fn drain_and_dispatch_check_events(
274    shutdown_handle: DynamicShutdownHandle, context: SourceContext, mut check_event_rx: mpsc::Receiver<Event>,
275) {
276    tokio::pin!(shutdown_handle);
277
278    loop {
279        select! {
280            _ = &mut shutdown_handle => {
281                debug!("Checks events listeners received shutdown signal.");
282                break;
283            }
284            result = check_event_rx.recv() => match result {
285                None => break,
286                Some(check_event) => {
287                    trace!("Received check event: {:?}", check_event);
288
289                    match check_event.event_type() {
290                       EventType::Metric => {
291                            let mut buffered_dispatcher = context
292                                .dispatcher()
293                                .buffered_named("metrics")
294                                .expect("checks source metrics output should always exist");
295
296                                if let Err(e) = buffered_dispatcher.push(check_event).await {
297                                    error!(error = %e, "Failed to forward check metric.");
298                                }
299
300                                if let Err(e) = buffered_dispatcher.flush().await {
301                                    error!(error = %e, "Failed to flush check metric.");
302                                }
303                        },
304                        EventType::ServiceCheck => {
305                            let mut buffered_dispatcher = context
306                                .dispatcher()
307                                .buffered_named("service_checks")
308                                .expect("checks source service checks output should always exist");
309
310                                if let Err(e) = buffered_dispatcher.push(check_event).await {
311                                    error!(error = %e, "Failed to forward check service check.");
312                                }
313
314                                if let Err(e) = buffered_dispatcher.flush().await {
315                                    error!(error = %e, "Failed to flush check service check.");
316                                }
317                        },
318                        EventType::EventD => {
319                            let mut buffered_dispatcher = context
320                                .dispatcher()
321                                .buffered_named("events")
322                                .expect("checks source events output should always exist");
323
324                                if let Err(e) = buffered_dispatcher.push(check_event).await {
325                                    error!(error = %e, "Failed to forward check event");
326                                }
327
328                                if let Err(e) = buffered_dispatcher.flush().await {
329                                    error!(error = %e, "Failed to flush check event.");
330                                }
331                        },
332                        _ => {},
333                    }
334
335                }
336            }
337        }
338    }
339}