1use std::collections::BTreeMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use saluki_error::GenericError;
8use serde::Deserialize;
9use stringtheory::MetaString;
10use tokio::fs;
11use tokio::sync::mpsc::{self, Receiver, Sender};
12use tokio::sync::{Mutex, OnceCell};
13use tokio::time::{interval, Duration};
14use tracing::{debug, info, warn};
15
16use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfig, Config, Data};
17
18const BG_MONITOR_INTERVAL: u64 = 30;
19
20type AutodiscoverySubscribers = Arc<Mutex<Vec<Sender<AutodiscoveryEvent>>>>;
21
22pub struct LocalAutodiscoveryProvider {
24 search_paths: Vec<PathBuf>,
25 subscribers: AutodiscoverySubscribers,
26 listener_init: OnceCell<()>,
27}
28
29impl LocalAutodiscoveryProvider {
30 pub fn new<P: AsRef<Path>>(paths: Vec<P>) -> Self {
32 let search_paths: Vec<PathBuf> = paths
33 .iter()
34 .filter_map(|p| {
35 if !p.as_ref().exists() {
36 warn!("Skipping path '{}' as it does not exist", p.as_ref().display());
37 return None;
38 }
39 if !p.as_ref().is_dir() {
40 warn!("Skipping path '{}', it is not a directory.", p.as_ref().display());
41 return None;
42 }
43 Some(p.as_ref().to_path_buf())
44 })
45 .collect();
46
47 Self {
48 search_paths,
49 subscribers: Arc::new(Mutex::new(Vec::new())),
50 listener_init: OnceCell::new(),
51 }
52 }
53
54 async fn start_background_monitor(&self, interval_sec: u64) {
56 let mut interval = interval(Duration::from_secs(interval_sec));
57 let subscribers = self.subscribers.clone();
58 let search_paths = self.search_paths.clone();
59
60 info!(
61 "Scanning for local autodiscovery events every {} seconds.",
62 interval_sec
63 );
64
65 tokio::spawn(async move {
66 let mut known_configs = HashSet::new();
67 let mut configs = BTreeMap::new();
68 loop {
69 interval.tick().await;
70
71 if let Err(e) =
73 scan_and_emit_events(&search_paths, &mut known_configs, &subscribers, &mut configs).await
74 {
75 warn!("Error scanning for configurations: {}", e);
76 }
77 }
78 });
79 }
80}
81
82#[derive(Debug, Deserialize)]
83struct LocalCheckConfig {
84 #[serde(default)]
85 init_config: BTreeMap<String, serde_yaml::Value>,
86 instances: Vec<BTreeMap<String, serde_yaml::Value>>,
87}
88
89async fn parse_config_file(path: &PathBuf, check_name: &str) -> Result<(String, CheckConfig), GenericError> {
91 let content = fs::read_to_string(path).await?;
92
93 let check_config: LocalCheckConfig = match serde_yaml::from_str(&content) {
94 Ok(read) => read,
95 Err(e) => {
96 return Err(GenericError::from(e).context("Failed to decode yaml as check configuration."));
97 }
98 };
99
100 let canonicalized_path = fs::canonicalize(&path).await?;
101
102 let config_id = canonicalized_path.to_string_lossy().replace(['/', '\\'], "_");
104
105 let instances: Vec<Data> = check_config
106 .instances
107 .into_iter()
108 .map(|instance| {
109 let mut result = BTreeMap::new();
110 for (key, value) in instance {
111 result.insert(key.into(), value);
112 }
113 Data { value: result }
114 })
115 .collect();
116
117 let init_config = {
118 let mut result = BTreeMap::new();
119 for (key, value) in check_config.init_config {
120 result.insert(key.into(), value);
121 }
122 Data { value: result }
123 };
124
125 let config = Config {
127 name: MetaString::from(check_name),
128 init_config,
129 instances,
130 metric_config: Data::default(),
131 logs_config: Data::default(),
132 ad_identifiers: Vec::new(),
133 provider: MetaString::empty(),
134 service_id: MetaString::empty(),
135 tagger_entity: MetaString::empty(),
136 cluster_check: false,
137 node_name: MetaString::empty(),
138 source: MetaString::from_static("local"),
139 ignore_autodiscovery_tags: false,
140 metrics_excluded: false,
141 logs_excluded: false,
142 advanced_ad_identifiers: Vec::new(),
143 };
144
145 let check_config = CheckConfig::from(config);
146
147 Ok((config_id, check_config))
148}
149
150async fn process_yaml_file(
153 path: PathBuf, check_name: &str, found_configs: &mut HashSet<String>, known_configs: &mut HashSet<String>,
154 subscribers: &AutodiscoverySubscribers, configs: &mut BTreeMap<String, CheckConfig>,
155) {
156 match parse_config_file(&path, check_name).await {
157 Ok((config_id, config)) => {
158 found_configs.insert(config_id.clone());
159
160 if !known_configs.contains(&config_id) {
161 debug!("New configuration found: {}", config_id);
162 let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() };
163 send_to_subscribers(subscribers, event).await;
164 known_configs.insert(config_id.clone());
165 configs.insert(config_id, config);
166 } else {
167 let existing_config = configs.get(&config_id).unwrap();
169 if *existing_config != config {
170 configs.insert(config_id.clone(), config.clone());
171 debug!("Configuration updated: {}", config_id);
172 let event = AutodiscoveryEvent::CheckSchedule { config };
173 send_to_subscribers(subscribers, event).await;
174 }
175 }
176 }
177 Err(e) => {
178 warn!("Failed to parse config file {}: {}", path.display(), e);
179 }
180 }
181}
182
183fn is_yaml_file(path: &Path) -> bool {
185 matches!(path.extension().and_then(|e| e.to_str()), Some("yaml") | Some("yml"))
186}
187
188async fn scan_and_emit_events(
195 paths: &[PathBuf], known_configs: &mut HashSet<String>, subscribers: &AutodiscoverySubscribers,
196 configs: &mut BTreeMap<String, CheckConfig>,
197) -> Result<(), GenericError> {
198 let mut found_configs = HashSet::new();
199
200 for path in paths {
201 let mut entries = fs::read_dir(path).await?;
202 while let Ok(Some(entry)) = entries.next_entry().await {
203 let path = entry.path();
204
205 if is_yaml_file(&path) {
206 let check_name = path.file_stem().unwrap().to_string_lossy().into_owned();
208 process_yaml_file(
209 path,
210 &check_name,
211 &mut found_configs,
212 known_configs,
213 subscribers,
214 configs,
215 )
216 .await;
217 } else if path.is_dir() {
218 let dir_name = path.file_name().unwrap_or_default().to_string_lossy();
220 if let Some(check_name) = dir_name.strip_suffix(".d") {
221 let Ok(mut sub_entries) = fs::read_dir(&path).await else {
222 warn!("Failed to read directory {}", path.display());
223 continue;
224 };
225 while let Ok(Some(sub_entry)) = sub_entries.next_entry().await {
226 let sub_path = sub_entry.path();
227 if is_yaml_file(&sub_path) {
228 process_yaml_file(
229 sub_path,
230 check_name,
231 &mut found_configs,
232 known_configs,
233 subscribers,
234 configs,
235 )
236 .await;
237 }
238 }
239 }
240 }
241 }
242 }
243
244 let to_remove: Vec<String> = known_configs
246 .iter()
247 .filter(|config_id| !found_configs.contains(*config_id))
248 .cloned()
249 .collect();
250
251 for config_id in to_remove {
252 debug!("Configuration removed: {}", config_id);
253 known_configs.remove(&config_id);
254
255 let config = configs.remove(&config_id).unwrap();
256
257 let event = AutodiscoveryEvent::CheckUnscheduled { config };
259 send_to_subscribers(subscribers, event).await;
260 }
261
262 Ok(())
263}
264
265async fn send_to_subscribers(subscribers: &AutodiscoverySubscribers, event: AutodiscoveryEvent) {
266 let mut subscribers = subscribers.lock().await;
267 let mut active_subscribers = Vec::with_capacity(subscribers.len());
268
269 for sender in subscribers.drain(..) {
270 if sender.send(event.clone()).await.is_ok() {
271 active_subscribers.push(sender);
272 }
273 }
274
275 *subscribers = active_subscribers;
276}
277
278#[async_trait]
279impl AutodiscoveryProvider for LocalAutodiscoveryProvider {
280 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
281 self.listener_init
282 .get_or_init(|| async {
283 self.start_background_monitor(BG_MONITOR_INTERVAL).await;
284 })
285 .await;
286
287 let (sender, receiver) = mpsc::channel::<AutodiscoveryEvent>(16);
288 self.subscribers.lock().await.push(sender);
289 Some(receiver)
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use std::path::Path;
296
297 use tempfile::tempdir;
298 use tokio::io::AsyncWriteExt;
299
300 use super::*;
301
302 fn test_data_path() -> PathBuf {
304 let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
305 PathBuf::from(manifest_dir)
306 .join("src")
307 .join("autodiscovery")
308 .join("providers")
309 .join("test_data")
310 }
311
312 async fn copy_test_check_dir(check_dir_name: &str, temp_dir: &Path) {
314 let source_dir = test_data_path().join(check_dir_name);
315 let target_dir = temp_dir.join(check_dir_name);
316 fs::create_dir_all(&target_dir).await.unwrap();
317
318 let mut entries = fs::read_dir(&source_dir).await.unwrap();
319 while let Ok(Some(entry)) = entries.next_entry().await {
320 fs::copy(entry.path(), target_dir.join(entry.file_name()))
321 .await
322 .unwrap();
323 }
324 }
325
326 async fn copy_test_file(source_name: &str, temp_dir: &Path) -> PathBuf {
328 let source_path = test_data_path().join(source_name);
329 let target_path = temp_dir.join(source_name);
330
331 let content = fs::read_to_string(&source_path)
332 .await
333 .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
334
335 let mut file = fs::File::create(&target_path).await.unwrap();
336 file.write_all(content.as_bytes()).await.unwrap();
337
338 target_path
339 }
340
341 async fn copy_test_file_as(source_name: &str, target_name: &str, temp_dir: &Path) -> PathBuf {
343 let source_path = test_data_path().join(source_name);
344 let target_path = temp_dir.join(target_name);
345
346 let content = fs::read_to_string(&source_path)
347 .await
348 .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
349
350 let mut file = fs::File::create(&target_path).await.unwrap();
351 file.write_all(content.as_bytes()).await.unwrap();
352
353 target_path
354 }
355
356 #[tokio::test]
357 async fn test_parse_config_file() {
358 let test_file = test_data_path().join("test-config.yaml");
359
360 let (id, config) = parse_config_file(&test_file, "test-config").await.unwrap();
361
362 assert!(id.contains("saluki-env_src_autodiscovery_providers_test_data_test-config.yaml"));
363 assert_eq!(config.name, "test-config");
364 assert_eq!(
365 config.init_config.value.get("service"),
366 Some(&serde_yaml::Value::String("test-service".to_string()))
367 );
368 assert_eq!(config.source, "local");
369 }
370
371 #[tokio::test]
372 async fn test_parse_minimal_config_file() {
373 let test_file = test_data_path().join("test-minimal-config.yaml");
374
375 let (_, config) = parse_config_file(&test_file, "test-minimal-config").await.unwrap();
376
377 assert!(config.init_config.value.is_empty());
379 }
380
381 #[tokio::test]
382 async fn test_scan_and_emit_events_new_config() {
383 let dir = tempdir().unwrap();
384 let _test_file = copy_test_file("config1.yaml", dir.path()).await;
385
386 let mut known_configs = HashSet::new();
387 let mut configs = BTreeMap::new();
388 let (sender, mut receiver) = mpsc::channel::<AutodiscoveryEvent>(10);
389 let subscribers = Arc::new(Mutex::new(vec![sender]));
390
391 scan_and_emit_events(
392 &[dir.path().to_path_buf()],
393 &mut known_configs,
394 &subscribers,
395 &mut configs,
396 )
397 .await
398 .unwrap();
399
400 assert_eq!(known_configs.len(), 1);
401
402 let event = receiver.try_recv().unwrap();
403 assert!(matches!(event, AutodiscoveryEvent::CheckSchedule { .. }));
404
405 if let AutodiscoveryEvent::Schedule { config } = event {
406 assert_eq!(config.name, "config1.yaml");
407 assert_eq!(config.instances.len(), 1);
408 assert_eq!(
409 config.instances[0].value.get("server"),
410 Some(&serde_yaml::Value::String("localhost".to_string()))
411 );
412 assert_eq!(
413 config.instances[0].value.get("port"),
414 Some(&serde_yaml::Value::Number(8080.into()))
415 );
416 assert_eq!(
417 config.instances[0].value.get("tags"),
418 Some(&serde_yaml::Value::Sequence(vec![
419 serde_yaml::Value::String("test:true".to_string()),
420 serde_yaml::Value::String("env:test".to_string())
421 ]))
422 );
423 }
424 assert!(receiver.try_recv().is_err());
425 }
426
427 #[tokio::test]
428 async fn test_scan_and_emit_events_preserves_bursts() {
429 let dir = tempdir().unwrap();
430 let _test_file1 = copy_test_file_as("config1.yaml", "config1.yaml", dir.path()).await;
431 let _test_file2 = copy_test_file_as("config1.yaml", "config2.yaml", dir.path()).await;
432
433 let (sender, mut receiver) = mpsc::channel::<AutodiscoveryEvent>(1);
434 let subscribers = Arc::new(Mutex::new(vec![sender]));
435 let search_path = dir.path().to_path_buf();
436
437 let scan = tokio::spawn(async move {
438 let mut known_configs = HashSet::new();
439 let mut configs = BTreeMap::new();
440
441 scan_and_emit_events(&[search_path], &mut known_configs, &subscribers, &mut configs).await
442 });
443
444 let event1 = tokio::time::timeout(Duration::from_secs(1), receiver.recv())
445 .await
446 .unwrap()
447 .unwrap();
448 let event2 = tokio::time::timeout(Duration::from_secs(1), receiver.recv())
449 .await
450 .unwrap()
451 .unwrap();
452
453 scan.await.unwrap().unwrap();
454
455 assert!(matches!(event1, AutodiscoveryEvent::CheckSchedule { .. }));
456 assert!(matches!(event2, AutodiscoveryEvent::CheckSchedule { .. }));
457 assert!(receiver.try_recv().is_err());
458 }
459
460 #[tokio::test]
461 async fn test_send_to_subscribers_fans_out_and_prunes_closed_receivers() {
462 let (sender1, mut receiver1) = mpsc::channel::<AutodiscoveryEvent>(10);
463 let (sender2, mut receiver2) = mpsc::channel::<AutodiscoveryEvent>(10);
464 let (closed_sender, closed_receiver) = mpsc::channel::<AutodiscoveryEvent>(10);
465 drop(closed_receiver);
466
467 let subscribers = Arc::new(Mutex::new(vec![sender1, sender2, closed_sender]));
468 let event = AutodiscoveryEvent::CheckSchedule {
469 config: CheckConfig {
470 name: MetaString::from("test-check"),
471 init_config: Data::default(),
472 instances: Vec::new(),
473 source: MetaString::from_static("local"),
474 },
475 };
476
477 send_to_subscribers(&subscribers, event).await;
478
479 assert_eq!(subscribers.lock().await.len(), 2);
480 assert!(matches!(
481 receiver1.try_recv().unwrap(),
482 AutodiscoveryEvent::CheckSchedule { .. }
483 ));
484 assert!(matches!(
485 receiver2.try_recv().unwrap(),
486 AutodiscoveryEvent::CheckSchedule { .. }
487 ));
488 }
489
490 #[tokio::test]
491 async fn test_scan_and_emit_events_removed_config() {
492 let dir = tempdir().unwrap();
493
494 let mut known_configs = HashSet::new();
495 known_configs.insert("removed-config".to_string());
496 let mut configs = BTreeMap::new();
497 configs.insert(
498 "removed-config".to_string(),
499 CheckConfig {
500 name: MetaString::from("removed-config"),
501 init_config: Data::default(),
502 instances: Vec::new(),
503 source: MetaString::from_static("local"),
504 },
505 );
506
507 let (sender, mut receiver) = mpsc::channel::<AutodiscoveryEvent>(10);
508 let subscribers = Arc::new(Mutex::new(vec![sender]));
509
510 scan_and_emit_events(
511 &[dir.path().to_path_buf()],
512 &mut known_configs,
513 &subscribers,
514 &mut configs,
515 )
516 .await
517 .unwrap();
518
519 assert_eq!(known_configs.len(), 0);
520
521 let event = receiver.try_recv().unwrap();
522 assert!(matches!(event, AutodiscoveryEvent::CheckUnscheduled { config } if config.name == "removed-config"));
523
524 assert!(receiver.try_recv().is_err());
525 }
526
527 #[tokio::test]
528 async fn test_scan_and_emit_events_check_dir() {
529 let dir = tempdir().unwrap();
530 copy_test_check_dir("test-check.d", dir.path()).await;
531
532 let mut known_configs = HashSet::new();
533 let mut configs = BTreeMap::new();
534 let (sender, mut receiver) = mpsc::channel::<AutodiscoveryEvent>(10);
535 let subscribers = Arc::new(Mutex::new(vec![sender]));
536
537 scan_and_emit_events(
538 &[dir.path().to_path_buf()],
539 &mut known_configs,
540 &subscribers,
541 &mut configs,
542 )
543 .await
544 .unwrap();
545
546 assert_eq!(known_configs.len(), 1);
548
549 let event = receiver.try_recv().unwrap();
550 assert!(
551 matches!(&event, AutodiscoveryEvent::CheckSchedule { config } if config.name == "test-check"),
552 "expected check name 'test-check', got: {:?}",
553 event
554 );
555
556 assert!(receiver.try_recv().is_err());
557 }
558}