Skip to main content

saluki_env/autodiscovery/providers/
local.rs

1use std::collections::BTreeMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use saluki_error::GenericError;
8use serde::Deserialize;
9use stringtheory::MetaString;
10use tokio::fs;
11use tokio::sync::mpsc::{self, Receiver, Sender};
12use tokio::sync::{Mutex, OnceCell};
13use tokio::time::{interval, Duration};
14use tracing::{debug, info, warn};
15
16use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfig, Config, Data};
17
18const BG_MONITOR_INTERVAL: u64 = 30;
19
20type AutodiscoverySubscribers = Arc<Mutex<Vec<Sender<AutodiscoveryEvent>>>>;
21
22/// A local auto-discovery provider that uses the file system.
23pub struct LocalAutodiscoveryProvider {
24    search_paths: Vec<PathBuf>,
25    subscribers: AutodiscoverySubscribers,
26    listener_init: OnceCell<()>,
27}
28
29impl LocalAutodiscoveryProvider {
30    /// Creates a new `LocalAutodiscoveryProvider` that will monitor the specified paths.
31    pub fn new<P: AsRef<Path>>(paths: Vec<P>) -> Self {
32        let search_paths: Vec<PathBuf> = paths
33            .iter()
34            .filter_map(|p| {
35                if !p.as_ref().exists() {
36                    warn!("Skipping path '{}' as it does not exist", p.as_ref().display());
37                    return None;
38                }
39                if !p.as_ref().is_dir() {
40                    warn!("Skipping path '{}', it is not a directory.", p.as_ref().display());
41                    return None;
42                }
43                Some(p.as_ref().to_path_buf())
44            })
45            .collect();
46
47        Self {
48            search_paths,
49            subscribers: Arc::new(Mutex::new(Vec::new())),
50            listener_init: OnceCell::new(),
51        }
52    }
53
54    /// Starts a background task that periodically scans for configuration changes
55    async fn start_background_monitor(&self, interval_sec: u64) {
56        let mut interval = interval(Duration::from_secs(interval_sec));
57        let subscribers = self.subscribers.clone();
58        let search_paths = self.search_paths.clone();
59
60        info!(
61            "Scanning for local autodiscovery events every {} seconds.",
62            interval_sec
63        );
64
65        tokio::spawn(async move {
66            let mut known_configs = HashSet::new();
67            let mut configs = BTreeMap::new();
68            loop {
69                interval.tick().await;
70
71                // Scan for configurations and emit events for changes
72                if let Err(e) =
73                    scan_and_emit_events(&search_paths, &mut known_configs, &subscribers, &mut configs).await
74                {
75                    warn!("Error scanning for configurations: {}", e);
76                }
77            }
78        });
79    }
80}
81
82#[derive(Debug, Deserialize)]
83struct LocalCheckConfig {
84    #[serde(default)]
85    init_config: BTreeMap<String, serde_yaml::Value>,
86    instances: Vec<BTreeMap<String, serde_yaml::Value>>,
87}
88
89/// Parse a YAML file into a Config object.
90async fn parse_config_file(path: &PathBuf, check_name: &str) -> Result<(String, CheckConfig), GenericError> {
91    let content = fs::read_to_string(path).await?;
92
93    let check_config: LocalCheckConfig = match serde_yaml::from_str(&content) {
94        Ok(read) => read,
95        Err(e) => {
96            return Err(GenericError::from(e).context("Failed to decode yaml as check configuration."));
97        }
98    };
99
100    let canonicalized_path = fs::canonicalize(&path).await?;
101
102    // Build config ID from the file path
103    let config_id = canonicalized_path.to_string_lossy().replace(['/', '\\'], "_");
104
105    let instances: Vec<Data> = check_config
106        .instances
107        .into_iter()
108        .map(|instance| {
109            let mut result = BTreeMap::new();
110            for (key, value) in instance {
111                result.insert(key.into(), value);
112            }
113            Data { value: result }
114        })
115        .collect();
116
117    let init_config = {
118        let mut result = BTreeMap::new();
119        for (key, value) in check_config.init_config {
120            result.insert(key.into(), value);
121        }
122        Data { value: result }
123    };
124
125    // Create a Config
126    let config = Config {
127        name: MetaString::from(check_name),
128        init_config,
129        instances,
130        metric_config: Data::default(),
131        logs_config: Data::default(),
132        ad_identifiers: Vec::new(),
133        provider: MetaString::empty(),
134        service_id: MetaString::empty(),
135        tagger_entity: MetaString::empty(),
136        cluster_check: false,
137        node_name: MetaString::empty(),
138        source: MetaString::from_static("local"),
139        ignore_autodiscovery_tags: false,
140        metrics_excluded: false,
141        logs_excluded: false,
142        advanced_ad_identifiers: Vec::new(),
143    };
144
145    let check_config = CheckConfig::from(config);
146
147    Ok((config_id, check_config))
148}
149
150/// Process a single YAML config file and record it in the tracking structures, emitting events as
151/// needed.
152async fn process_yaml_file(
153    path: PathBuf, check_name: &str, found_configs: &mut HashSet<String>, known_configs: &mut HashSet<String>,
154    subscribers: &AutodiscoverySubscribers, configs: &mut BTreeMap<String, CheckConfig>,
155) {
156    match parse_config_file(&path, check_name).await {
157        Ok((config_id, config)) => {
158            found_configs.insert(config_id.clone());
159
160            if !known_configs.contains(&config_id) {
161                debug!("New configuration found: {}", config_id);
162                let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() };
163                send_to_subscribers(subscribers, event).await;
164                known_configs.insert(config_id.clone());
165                configs.insert(config_id, config);
166            } else {
167                // Config ID exists, but the content might have changed
168                let existing_config = configs.get(&config_id).unwrap();
169                if *existing_config != config {
170                    configs.insert(config_id.clone(), config.clone());
171                    debug!("Configuration updated: {}", config_id);
172                    let event = AutodiscoveryEvent::CheckSchedule { config };
173                    send_to_subscribers(subscribers, event).await;
174                }
175            }
176        }
177        Err(e) => {
178            warn!("Failed to parse config file {}: {}", path.display(), e);
179        }
180    }
181}
182
183/// Returns true if the path has a YAML extension (`yaml` or `yml`).
184fn is_yaml_file(path: &Path) -> bool {
185    matches!(path.extension().and_then(|e| e.to_str()), Some("yaml") | Some("yml"))
186}
187
188/// Scan and emit events based on configuration files in the directory.
189///
190/// Supports two layouts:
191/// - `<search-path>/<check-name>.yaml`—flat YAML files.
192/// - `<search-path>/<check-name>.d/*.yaml`—directory-based configs; the check name is derived
193///   from the directory stem (the `.d` suffix is stripped).
194async fn scan_and_emit_events(
195    paths: &[PathBuf], known_configs: &mut HashSet<String>, subscribers: &AutodiscoverySubscribers,
196    configs: &mut BTreeMap<String, CheckConfig>,
197) -> Result<(), GenericError> {
198    let mut found_configs = HashSet::new();
199
200    for path in paths {
201        let mut entries = fs::read_dir(path).await?;
202        while let Ok(Some(entry)) = entries.next_entry().await {
203            let path = entry.path();
204
205            if is_yaml_file(&path) {
206                // Flat YAML file: <check-name>.yaml
207                let check_name = path.file_stem().unwrap().to_string_lossy().into_owned();
208                process_yaml_file(
209                    path,
210                    &check_name,
211                    &mut found_configs,
212                    known_configs,
213                    subscribers,
214                    configs,
215                )
216                .await;
217            } else if path.is_dir() {
218                // Directory: only process if it matches the `<check-name>.d` convention
219                let dir_name = path.file_name().unwrap_or_default().to_string_lossy();
220                if let Some(check_name) = dir_name.strip_suffix(".d") {
221                    let Ok(mut sub_entries) = fs::read_dir(&path).await else {
222                        warn!("Failed to read directory {}", path.display());
223                        continue;
224                    };
225                    while let Ok(Some(sub_entry)) = sub_entries.next_entry().await {
226                        let sub_path = sub_entry.path();
227                        if is_yaml_file(&sub_path) {
228                            process_yaml_file(
229                                sub_path,
230                                check_name,
231                                &mut found_configs,
232                                known_configs,
233                                subscribers,
234                                configs,
235                            )
236                            .await;
237                        }
238                    }
239                }
240            }
241        }
242    }
243
244    // Clean up removed configurations
245    let to_remove: Vec<String> = known_configs
246        .iter()
247        .filter(|config_id| !found_configs.contains(*config_id))
248        .cloned()
249        .collect();
250
251    for config_id in to_remove {
252        debug!("Configuration removed: {}", config_id);
253        known_configs.remove(&config_id);
254
255        let config = configs.remove(&config_id).unwrap();
256
257        // Create an unschedule Config event
258        let event = AutodiscoveryEvent::CheckUnscheduled { config };
259        send_to_subscribers(subscribers, event).await;
260    }
261
262    Ok(())
263}
264
265async fn send_to_subscribers(subscribers: &AutodiscoverySubscribers, event: AutodiscoveryEvent) {
266    let mut subscribers = subscribers.lock().await;
267    let mut active_subscribers = Vec::with_capacity(subscribers.len());
268
269    for sender in subscribers.drain(..) {
270        if sender.send(event.clone()).await.is_ok() {
271            active_subscribers.push(sender);
272        }
273    }
274
275    *subscribers = active_subscribers;
276}
277
278#[async_trait]
279impl AutodiscoveryProvider for LocalAutodiscoveryProvider {
280    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
281        self.listener_init
282            .get_or_init(|| async {
283                self.start_background_monitor(BG_MONITOR_INTERVAL).await;
284            })
285            .await;
286
287        let (sender, receiver) = mpsc::channel::<AutodiscoveryEvent>(16);
288        self.subscribers.lock().await.push(sender);
289        Some(receiver)
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use std::path::Path;
296
297    use tempfile::tempdir;
298    use tokio::io::AsyncWriteExt;
299
300    use super::*;
301
302    // Get the path to the test_data directory
303    fn test_data_path() -> PathBuf {
304        let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
305        PathBuf::from(manifest_dir)
306            .join("src")
307            .join("autodiscovery")
308            .join("providers")
309            .join("test_data")
310    }
311
312    // Copy a `<check>.d` directory from test_data into the temp directory, preserving the structure.
313    async fn copy_test_check_dir(check_dir_name: &str, temp_dir: &Path) {
314        let source_dir = test_data_path().join(check_dir_name);
315        let target_dir = temp_dir.join(check_dir_name);
316        fs::create_dir_all(&target_dir).await.unwrap();
317
318        let mut entries = fs::read_dir(&source_dir).await.unwrap();
319        while let Ok(Some(entry)) = entries.next_entry().await {
320            fs::copy(entry.path(), target_dir.join(entry.file_name()))
321                .await
322                .unwrap();
323        }
324    }
325
326    // Copy a file from test_data to the temp directory
327    async fn copy_test_file(source_name: &str, temp_dir: &Path) -> PathBuf {
328        let source_path = test_data_path().join(source_name);
329        let target_path = temp_dir.join(source_name);
330
331        let content = fs::read_to_string(&source_path)
332            .await
333            .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
334
335        let mut file = fs::File::create(&target_path).await.unwrap();
336        file.write_all(content.as_bytes()).await.unwrap();
337
338        target_path
339    }
340
341    // Copy a file from test_data to the temp directory with a different target name.
342    async fn copy_test_file_as(source_name: &str, target_name: &str, temp_dir: &Path) -> PathBuf {
343        let source_path = test_data_path().join(source_name);
344        let target_path = temp_dir.join(target_name);
345
346        let content = fs::read_to_string(&source_path)
347            .await
348            .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
349
350        let mut file = fs::File::create(&target_path).await.unwrap();
351        file.write_all(content.as_bytes()).await.unwrap();
352
353        target_path
354    }
355
356    #[tokio::test]
357    async fn test_parse_config_file() {
358        let test_file = test_data_path().join("test-config.yaml");
359
360        let (id, config) = parse_config_file(&test_file, "test-config").await.unwrap();
361
362        assert!(id.contains("saluki-env_src_autodiscovery_providers_test_data_test-config.yaml"));
363        assert_eq!(config.name, "test-config");
364        assert_eq!(
365            config.init_config.value.get("service"),
366            Some(&serde_yaml::Value::String("test-service".to_string()))
367        );
368        assert_eq!(config.source, "local");
369    }
370
371    #[tokio::test]
372    async fn test_parse_minimal_config_file() {
373        let test_file = test_data_path().join("test-minimal-config.yaml");
374
375        let (_, config) = parse_config_file(&test_file, "test-minimal-config").await.unwrap();
376
377        // Parsing a config without `init_config` yields an empty hash map for that field.
378        assert!(config.init_config.value.is_empty());
379    }
380
381    #[tokio::test]
382    async fn test_scan_and_emit_events_new_config() {
383        let dir = tempdir().unwrap();
384        let _test_file = copy_test_file("config1.yaml", dir.path()).await;
385
386        let mut known_configs = HashSet::new();
387        let mut configs = BTreeMap::new();
388        let (sender, mut receiver) = mpsc::channel::<AutodiscoveryEvent>(10);
389        let subscribers = Arc::new(Mutex::new(vec![sender]));
390
391        scan_and_emit_events(
392            &[dir.path().to_path_buf()],
393            &mut known_configs,
394            &subscribers,
395            &mut configs,
396        )
397        .await
398        .unwrap();
399
400        assert_eq!(known_configs.len(), 1);
401
402        let event = receiver.try_recv().unwrap();
403        assert!(matches!(event, AutodiscoveryEvent::CheckSchedule { .. }));
404
405        if let AutodiscoveryEvent::Schedule { config } = event {
406            assert_eq!(config.name, "config1.yaml");
407            assert_eq!(config.instances.len(), 1);
408            assert_eq!(
409                config.instances[0].value.get("server"),
410                Some(&serde_yaml::Value::String("localhost".to_string()))
411            );
412            assert_eq!(
413                config.instances[0].value.get("port"),
414                Some(&serde_yaml::Value::Number(8080.into()))
415            );
416            assert_eq!(
417                config.instances[0].value.get("tags"),
418                Some(&serde_yaml::Value::Sequence(vec![
419                    serde_yaml::Value::String("test:true".to_string()),
420                    serde_yaml::Value::String("env:test".to_string())
421                ]))
422            );
423        }
424        assert!(receiver.try_recv().is_err());
425    }
426
427    #[tokio::test]
428    async fn test_scan_and_emit_events_preserves_bursts() {
429        let dir = tempdir().unwrap();
430        let _test_file1 = copy_test_file_as("config1.yaml", "config1.yaml", dir.path()).await;
431        let _test_file2 = copy_test_file_as("config1.yaml", "config2.yaml", dir.path()).await;
432
433        let (sender, mut receiver) = mpsc::channel::<AutodiscoveryEvent>(1);
434        let subscribers = Arc::new(Mutex::new(vec![sender]));
435        let search_path = dir.path().to_path_buf();
436
437        let scan = tokio::spawn(async move {
438            let mut known_configs = HashSet::new();
439            let mut configs = BTreeMap::new();
440
441            scan_and_emit_events(&[search_path], &mut known_configs, &subscribers, &mut configs).await
442        });
443
444        let event1 = tokio::time::timeout(Duration::from_secs(1), receiver.recv())
445            .await
446            .unwrap()
447            .unwrap();
448        let event2 = tokio::time::timeout(Duration::from_secs(1), receiver.recv())
449            .await
450            .unwrap()
451            .unwrap();
452
453        scan.await.unwrap().unwrap();
454
455        assert!(matches!(event1, AutodiscoveryEvent::CheckSchedule { .. }));
456        assert!(matches!(event2, AutodiscoveryEvent::CheckSchedule { .. }));
457        assert!(receiver.try_recv().is_err());
458    }
459
460    #[tokio::test]
461    async fn test_send_to_subscribers_fans_out_and_prunes_closed_receivers() {
462        let (sender1, mut receiver1) = mpsc::channel::<AutodiscoveryEvent>(10);
463        let (sender2, mut receiver2) = mpsc::channel::<AutodiscoveryEvent>(10);
464        let (closed_sender, closed_receiver) = mpsc::channel::<AutodiscoveryEvent>(10);
465        drop(closed_receiver);
466
467        let subscribers = Arc::new(Mutex::new(vec![sender1, sender2, closed_sender]));
468        let event = AutodiscoveryEvent::CheckSchedule {
469            config: CheckConfig {
470                name: MetaString::from("test-check"),
471                init_config: Data::default(),
472                instances: Vec::new(),
473                source: MetaString::from_static("local"),
474            },
475        };
476
477        send_to_subscribers(&subscribers, event).await;
478
479        assert_eq!(subscribers.lock().await.len(), 2);
480        assert!(matches!(
481            receiver1.try_recv().unwrap(),
482            AutodiscoveryEvent::CheckSchedule { .. }
483        ));
484        assert!(matches!(
485            receiver2.try_recv().unwrap(),
486            AutodiscoveryEvent::CheckSchedule { .. }
487        ));
488    }
489
490    #[tokio::test]
491    async fn test_scan_and_emit_events_removed_config() {
492        let dir = tempdir().unwrap();
493
494        let mut known_configs = HashSet::new();
495        known_configs.insert("removed-config".to_string());
496        let mut configs = BTreeMap::new();
497        configs.insert(
498            "removed-config".to_string(),
499            CheckConfig {
500                name: MetaString::from("removed-config"),
501                init_config: Data::default(),
502                instances: Vec::new(),
503                source: MetaString::from_static("local"),
504            },
505        );
506
507        let (sender, mut receiver) = mpsc::channel::<AutodiscoveryEvent>(10);
508        let subscribers = Arc::new(Mutex::new(vec![sender]));
509
510        scan_and_emit_events(
511            &[dir.path().to_path_buf()],
512            &mut known_configs,
513            &subscribers,
514            &mut configs,
515        )
516        .await
517        .unwrap();
518
519        assert_eq!(known_configs.len(), 0);
520
521        let event = receiver.try_recv().unwrap();
522        assert!(matches!(event, AutodiscoveryEvent::CheckUnscheduled { config } if config.name == "removed-config"));
523
524        assert!(receiver.try_recv().is_err());
525    }
526
527    #[tokio::test]
528    async fn test_scan_and_emit_events_check_dir() {
529        let dir = tempdir().unwrap();
530        copy_test_check_dir("test-check.d", dir.path()).await;
531
532        let mut known_configs = HashSet::new();
533        let mut configs = BTreeMap::new();
534        let (sender, mut receiver) = mpsc::channel::<AutodiscoveryEvent>(10);
535        let subscribers = Arc::new(Mutex::new(vec![sender]));
536
537        scan_and_emit_events(
538            &[dir.path().to_path_buf()],
539            &mut known_configs,
540            &subscribers,
541            &mut configs,
542        )
543        .await
544        .unwrap();
545
546        // One config file inside test-check.d/
547        assert_eq!(known_configs.len(), 1);
548
549        let event = receiver.try_recv().unwrap();
550        assert!(
551            matches!(&event, AutodiscoveryEvent::CheckSchedule { config } if config.name == "test-check"),
552            "expected check name 'test-check', got: {:?}",
553            event
554        );
555
556        assert!(receiver.try_recv().is_err());
557    }
558}