saluki_components/sources/checks/
mod.rs1use std::collections::HashSet;
5use std::sync::{Arc, LazyLock};
6
7use async_trait::async_trait;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_core::{
12 components::{sources::*, ComponentContext},
13 data_model::event::{Event, EventType},
14 topology::{
15 shutdown::{ComponentShutdownHandle, DynamicShutdownCoordinator, DynamicShutdownHandle},
16 OutputDefinition,
17 },
18};
19use saluki_env::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider};
20use saluki_env::{EnvironmentProvider, HostProvider};
21use saluki_error::{generic_error, GenericError};
22use serde::Deserialize;
23use tokio::select;
24use tokio::sync::broadcast::Receiver;
25use tokio::sync::mpsc;
26use tracing::{debug, error, info, trace, warn};
27
28mod scheduler;
29use self::scheduler::Scheduler;
30
31mod check_metric;
32
33mod check;
34use self::check::Check;
35
36mod builder;
37
38mod execution_context;
39#[cfg(feature = "python-checks")]
40use self::builder::python::builder::PythonCheckBuilder;
41use self::builder::CheckBuilder;
42use self::execution_context::ExecutionContext;
43
44const fn default_check_runners() -> usize {
45 4
46}
47
48#[derive(Deserialize)]
50pub struct ChecksConfiguration<E = ()> {
51 #[serde(default = "default_check_runners")]
55 check_runners: usize,
56
57 #[serde(default)]
59 additional_checksd: String,
60
61 #[serde(skip)]
62 environment_provider: Option<E>,
63
64 #[serde(skip)]
65 full_configuration: Option<GenericConfiguration>,
66}
67
68impl ChecksConfiguration {
69 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
71 let mut checks_config = config.as_typed::<Self>()?;
72 checks_config.full_configuration = Some(config.clone());
73 Ok(checks_config)
74 }
75}
76
77impl<E> ChecksConfiguration<E> {
78 pub fn with_environment_provider<E2>(self, environment_provider: E2) -> ChecksConfiguration<E2> {
80 ChecksConfiguration {
81 check_runners: self.check_runners,
82 additional_checksd: self.additional_checksd,
83 environment_provider: Some(environment_provider),
84 full_configuration: self.full_configuration,
85 }
86 }
87}
88
89#[async_trait]
90impl<E> SourceBuilder for ChecksConfiguration<E>
91where
92 E: EnvironmentProvider + Send + Sync + 'static,
93 <E::Host as HostProvider>::Error: Into<GenericError> + std::fmt::Debug,
94{
95 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
96 let environment_provider = self
97 .environment_provider
98 .as_ref()
99 .ok_or_else(|| generic_error!("No environment provider configured."))?;
100
101 let receiver = environment_provider
102 .autodiscovery()
103 .subscribe()
104 .await
105 .ok_or_else(|| generic_error!("No autodiscovery stream configured."))?;
106
107 let configuration = self
108 .full_configuration
109 .as_ref()
110 .ok_or_else(|| generic_error!("No configuration configured."))?;
111
112 let execution_context =
113 ExecutionContext::from_environment_provider(configuration.clone(), environment_provider).await;
114
115 Ok(Box::new(ChecksSource {
116 autodiscovery_rx: receiver,
117 check_runners: self.check_runners,
118 custom_checks_dirs: if !self.additional_checksd.is_empty() {
119 Some(self.additional_checksd.split(",").map(|s| s.to_string()).collect())
120 } else {
121 None
122 },
123 execution_context,
124 }))
125 }
126
127 fn outputs(&self) -> &[OutputDefinition<EventType>] {
128 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
129 vec![
130 OutputDefinition::named_output("metrics", EventType::Metric),
131 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
132 OutputDefinition::named_output("events", EventType::EventD),
133 ]
134 });
135 &OUTPUTS
136 }
137}
138
139impl<E> MemoryBounds for ChecksConfiguration<E> {
140 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
141 builder.minimum().with_single_value::<ChecksSource>("component struct");
142 }
143}
144
145struct ChecksSource {
146 autodiscovery_rx: Receiver<AutodiscoveryEvent>,
147 check_runners: usize,
148 custom_checks_dirs: Option<Vec<String>>,
149 execution_context: ExecutionContext,
150}
151
152impl ChecksSource {
153 fn builders(
155 &self, check_events_tx: mpsc::Sender<Event>, execution_context: ExecutionContext,
156 ) -> Vec<Arc<dyn CheckBuilder + Send + Sync>> {
157 #[cfg(feature = "python-checks")]
158 {
159 vec![Arc::new(PythonCheckBuilder::new(
160 check_events_tx,
161 self.custom_checks_dirs.clone(),
162 execution_context,
163 ))]
164 }
165 #[cfg(not(feature = "python-checks"))]
166 {
167 let _ = check_events_tx; let _ = &self.custom_checks_dirs; let _ = execution_context; vec![]
171 }
172 }
173}
174
175#[async_trait]
176impl Source for ChecksSource {
177 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
178 let mut global_shutdown: ComponentShutdownHandle = context.take_shutdown_handle();
179 let mut health = context.take_health_handle();
180 health.mark_ready();
181
182 info!("Checks source started.");
183
184 let (check_events_tx, check_event_rx) = mpsc::channel(128);
185
186 let mut check_builders: Vec<Arc<dyn CheckBuilder + Send + Sync>> =
187 self.builders(check_events_tx, self.execution_context.clone());
188
189 let mut check_ids = HashSet::new();
190 let scheduler = Scheduler::new(self.check_runners);
191
192 let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
193
194 spawn_traced_named(
195 "checks-events-listener",
196 drain_and_dispatch_check_events(listener_shutdown_coordinator.register(), context, check_event_rx),
197 );
198
199 let mut autodiscovery_rx = self.autodiscovery_rx;
200
201 loop {
202 select! {
203 _ = &mut global_shutdown => {
204 debug!("Checks source received shutdown signal.");
205 scheduler.shutdown().await;
206 break
207 },
208
209 _ = health.live() => continue,
210
211 event = autodiscovery_rx.recv() => match event {
212 Ok(event) => {
213 match event {
214 AutodiscoveryEvent::CheckSchedule { config } => {
215 let mut runnable_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![];
216 for instance in &config.instances {
217 let check_id = instance.id();
218 if check_ids.contains(check_id) {
219 continue;
220 }
221
222 for builder in check_builders.iter_mut() {
223 if let Some(check) = builder.build_check(&config.name, instance, &config.init_config, &config.source) {
224 debug!(
225 check_id = check.id(),
226 check_name = config.name.as_ref(),
227 check_version = check.version(),
228 check_source = check.source(),
229 "Built new check instance."
230 );
231 runnable_checks.push(check);
232 check_ids.insert(check_id.clone());
233 break;
234 }
235 }
236 }
237
238 for check in runnable_checks {
239 scheduler.schedule(check);
240 }
241 }
242 AutodiscoveryEvent::CheckUnscheduled { config } => {
243 for instance in &config.instances {
244 let check_id = instance.id();
245 if !check_ids.contains(check_id) {
246 warn!("Unscheduling check {} not found, skipping.", check_id);
247 continue;
248 }
249
250 scheduler.unschedule(check_id);
251 }
252 }
253 _ => {}
255 }
256 }
257 Err(e) => {
258 error!("Error receiving event: {:?}", e);
259 }
260 }
261 }
262 }
263
264 listener_shutdown_coordinator.shutdown().await;
265
266 info!("Checks source stopped.");
267
268 Ok(())
269 }
270}
271
272async fn drain_and_dispatch_check_events(
273 shutdown_handle: DynamicShutdownHandle, context: SourceContext, mut check_event_rx: mpsc::Receiver<Event>,
274) {
275 tokio::pin!(shutdown_handle);
276
277 loop {
278 select! {
279 _ = &mut shutdown_handle => {
280 debug!("Checks events listeners received shutdown signal.");
281 break;
282 }
283 result = check_event_rx.recv() => match result {
284 None => break,
285 Some(check_event) => {
286 trace!("Received check event: {:?}", check_event);
287
288 match check_event.event_type() {
289 EventType::Metric => {
290 let mut buffered_dispatcher = context
291 .dispatcher()
292 .buffered_named("metrics")
293 .expect("checks source metrics output should always exist");
294
295 if let Err(e) = buffered_dispatcher.push(check_event).await {
296 error!(error = %e, "Failed to forward check metric.");
297 }
298
299 if let Err(e) = buffered_dispatcher.flush().await {
300 error!(error = %e, "Failed to flush check metric.");
301 }
302 },
303 EventType::ServiceCheck => {
304 let mut buffered_dispatcher = context
305 .dispatcher()
306 .buffered_named("service_checks")
307 .expect("checks source service checks output should always exist");
308
309 if let Err(e) = buffered_dispatcher.push(check_event).await {
310 error!(error = %e, "Failed to forward check service check.");
311 }
312
313 if let Err(e) = buffered_dispatcher.flush().await {
314 error!(error = %e, "Failed to flush check service check.");
315 }
316 },
317 EventType::EventD => {
318 let mut buffered_dispatcher = context
319 .dispatcher()
320 .buffered_named("events")
321 .expect("checks source events output should always exist");
322
323 if let Err(e) = buffered_dispatcher.push(check_event).await {
324 error!(error = %e, "Failed to forward check event");
325 }
326
327 if let Err(e) = buffered_dispatcher.flush().await {
328 error!(error = %e, "Failed to flush check event.");
329 }
330 },
331 _ => {},
332 }
333
334 }
335 }
336 }
337 }
338}