saluki_components/sources/checks/
mod.rs1use std::collections::HashSet;
5use std::sync::{Arc, LazyLock};
6
7use async_trait::async_trait;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_core::{
12 components::{sources::*, ComponentContext},
13 data_model::event::{Event, EventType},
14 topology::{
15 shutdown::{ComponentShutdownHandle, DynamicShutdownCoordinator, DynamicShutdownHandle},
16 OutputDefinition,
17 },
18};
19use saluki_env::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider};
20use saluki_env::{EnvironmentProvider, HostProvider};
21use saluki_error::{generic_error, GenericError};
22use serde::Deserialize;
23use tokio::select;
24use tokio::sync::broadcast::Receiver;
25use tokio::sync::mpsc;
26use tracing::{debug, error, info, trace, warn};
27
28mod scheduler;
29use self::scheduler::Scheduler;
30
31mod check_metric;
32
33mod check;
34use self::check::Check;
35
36mod builder;
37
38mod execution_context;
39#[cfg(feature = "python-checks")]
40use self::builder::python::builder::PythonCheckBuilder;
41use self::builder::CheckBuilder;
42use self::execution_context::ExecutionContext;
43
44const fn default_check_runners() -> usize {
45 4
46}
47
48#[derive(Deserialize)]
50pub struct ChecksConfiguration<E = ()> {
51 #[serde(default = "default_check_runners")]
55 check_runners: usize,
56
57 #[serde(default)]
59 additional_checksd: String,
60
61 #[serde(skip)]
62 environment_provider: Option<E>,
63
64 #[serde(skip)]
65 full_configuration: Option<GenericConfiguration>,
66}
67
68impl ChecksConfiguration {
69 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
71 let mut checks_config = config.as_typed::<Self>()?;
72 checks_config.full_configuration = Some(config.clone());
73 Ok(checks_config)
74 }
75}
76
77impl<E> ChecksConfiguration<E> {
78 pub fn with_environment_provider<E2>(self, environment_provider: E2) -> ChecksConfiguration<E2> {
80 ChecksConfiguration {
81 check_runners: self.check_runners,
82 additional_checksd: self.additional_checksd,
83 environment_provider: Some(environment_provider),
84 full_configuration: self.full_configuration,
85 }
86 }
87}
88
89#[async_trait]
90impl<E> SourceBuilder for ChecksConfiguration<E>
91where
92 E: EnvironmentProvider + Send + Sync + 'static,
93 <E::Host as HostProvider>::Error: Into<GenericError> + std::fmt::Debug,
94{
95 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
96 let environment_provider = self
97 .environment_provider
98 .as_ref()
99 .ok_or_else(|| generic_error!("No environment provider configured."))?;
100
101 let receiver = environment_provider
102 .autodiscovery()
103 .subscribe()
104 .await
105 .ok_or_else(|| generic_error!("No autodiscovery stream configured."))?;
106
107 let configuration = self
108 .full_configuration
109 .as_ref()
110 .ok_or_else(|| generic_error!("No configuration configured."))?;
111
112 let execution_context =
113 ExecutionContext::from_environment_provider(configuration.clone(), environment_provider).await;
114
115 Ok(Box::new(ChecksSource {
116 autodiscovery_rx: receiver,
117 check_runners: self.check_runners,
118 custom_checks_dirs: if !self.additional_checksd.is_empty() {
119 Some(self.additional_checksd.split(",").map(|s| s.to_string()).collect())
120 } else {
121 None
122 },
123 execution_context,
124 }))
125 }
126
127 fn outputs(&self) -> &[OutputDefinition] {
128 static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
129 vec![
130 OutputDefinition::named_output("metrics", EventType::Metric),
131 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
132 OutputDefinition::named_output("events", EventType::EventD),
133 ]
134 });
135
136 &OUTPUTS
137 }
138}
139
140impl<E> MemoryBounds for ChecksConfiguration<E> {
141 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
142 builder.minimum().with_single_value::<ChecksSource>("component struct");
143 }
144}
145
146struct ChecksSource {
147 autodiscovery_rx: Receiver<AutodiscoveryEvent>,
148 check_runners: usize,
149 custom_checks_dirs: Option<Vec<String>>,
150 execution_context: ExecutionContext,
151}
152
153impl ChecksSource {
154 fn builders(
156 &self, check_events_tx: mpsc::Sender<Event>, execution_context: ExecutionContext,
157 ) -> Vec<Arc<dyn CheckBuilder + Send + Sync>> {
158 #[cfg(feature = "python-checks")]
159 {
160 vec![Arc::new(PythonCheckBuilder::new(
161 check_events_tx,
162 self.custom_checks_dirs.clone(),
163 execution_context,
164 ))]
165 }
166 #[cfg(not(feature = "python-checks"))]
167 {
168 let _ = check_events_tx; let _ = &self.custom_checks_dirs; let _ = execution_context; vec![]
172 }
173 }
174}
175
176#[async_trait]
177impl Source for ChecksSource {
178 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
179 let mut global_shutdown: ComponentShutdownHandle = context.take_shutdown_handle();
180 let mut health = context.take_health_handle();
181 health.mark_ready();
182
183 info!("Checks source started.");
184
185 let (check_events_tx, check_event_rx) = mpsc::channel(128);
186
187 let mut check_builders: Vec<Arc<dyn CheckBuilder + Send + Sync>> =
188 self.builders(check_events_tx, self.execution_context.clone());
189
190 let mut check_ids = HashSet::new();
191 let scheduler = Scheduler::new(self.check_runners);
192
193 let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
194
195 spawn_traced_named(
196 "checks-events-listener",
197 drain_and_dispatch_check_events(listener_shutdown_coordinator.register(), context, check_event_rx),
198 );
199
200 let mut autodiscovery_rx = self.autodiscovery_rx;
201
202 loop {
203 select! {
204 _ = &mut global_shutdown => {
205 debug!("Checks source received shutdown signal.");
206 scheduler.shutdown().await;
207 break
208 },
209
210 _ = health.live() => continue,
211
212 event = autodiscovery_rx.recv() => match event {
213 Ok(event) => {
214 match event {
215 AutodiscoveryEvent::CheckSchedule { config } => {
216 let mut runnable_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![];
217 for instance in &config.instances {
218 let check_id = instance.id();
219 if check_ids.contains(check_id) {
220 continue;
221 }
222
223 for builder in check_builders.iter_mut() {
224 if let Some(check) = builder.build_check(&config.name, instance, &config.init_config, &config.source) {
225 debug!(
226 check_id = check.id(),
227 check_name = config.name.as_ref(),
228 check_version = check.version(),
229 check_source = check.source(),
230 "Built new check instance."
231 );
232 runnable_checks.push(check);
233 check_ids.insert(check_id.clone());
234 break;
235 }
236 }
237 }
238
239 for check in runnable_checks {
240 scheduler.schedule(check);
241 }
242 }
243 AutodiscoveryEvent::CheckUnscheduled { config } => {
244 for instance in &config.instances {
245 let check_id = instance.id();
246 if !check_ids.contains(check_id) {
247 warn!("Unscheduling check {} not found, skipping.", check_id);
248 continue;
249 }
250
251 scheduler.unschedule(check_id);
252 }
253 }
254 _ => {}
256 }
257 }
258 Err(e) => {
259 error!("Error receiving event: {:?}", e);
260 }
261 }
262 }
263 }
264
265 listener_shutdown_coordinator.shutdown().await;
266
267 info!("Checks source stopped.");
268
269 Ok(())
270 }
271}
272
273async fn drain_and_dispatch_check_events(
274 shutdown_handle: DynamicShutdownHandle, context: SourceContext, mut check_event_rx: mpsc::Receiver<Event>,
275) {
276 tokio::pin!(shutdown_handle);
277
278 loop {
279 select! {
280 _ = &mut shutdown_handle => {
281 debug!("Checks events listeners received shutdown signal.");
282 break;
283 }
284 result = check_event_rx.recv() => match result {
285 None => break,
286 Some(check_event) => {
287 trace!("Received check event: {:?}", check_event);
288
289 match check_event.event_type() {
290 EventType::Metric => {
291 let mut buffered_dispatcher = context
292 .dispatcher()
293 .buffered_named("metrics")
294 .expect("checks source metrics output should always exist");
295
296 if let Err(e) = buffered_dispatcher.push(check_event).await {
297 error!(error = %e, "Failed to forward check metric.");
298 }
299
300 if let Err(e) = buffered_dispatcher.flush().await {
301 error!(error = %e, "Failed to flush check metric.");
302 }
303 },
304 EventType::ServiceCheck => {
305 let mut buffered_dispatcher = context
306 .dispatcher()
307 .buffered_named("service_checks")
308 .expect("checks source service checks output should always exist");
309
310 if let Err(e) = buffered_dispatcher.push(check_event).await {
311 error!(error = %e, "Failed to forward check service check.");
312 }
313
314 if let Err(e) = buffered_dispatcher.flush().await {
315 error!(error = %e, "Failed to flush check service check.");
316 }
317 },
318 EventType::EventD => {
319 let mut buffered_dispatcher = context
320 .dispatcher()
321 .buffered_named("events")
322 .expect("checks source events output should always exist");
323
324 if let Err(e) = buffered_dispatcher.push(check_event).await {
325 error!(error = %e, "Failed to forward check event");
326 }
327
328 if let Err(e) = buffered_dispatcher.flush().await {
329 error!(error = %e, "Failed to flush check event.");
330 }
331 },
332 _ => {},
333 }
334
335 }
336 }
337 }
338 }
339}