Skip to main content

saluki_env/autodiscovery/providers/
local.rs

1use std::collections::BTreeMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use saluki_error::GenericError;
7use serde::Deserialize;
8use stringtheory::MetaString;
9use tokio::fs;
10use tokio::sync::broadcast::{self, Receiver, Sender};
11use tokio::sync::OnceCell;
12use tokio::time::{interval, Duration};
13use tracing::{debug, info, warn};
14
15use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfig, Config, Data};
16
17const BG_MONITOR_INTERVAL: u64 = 30;
18
19/// A local auto-discovery provider that uses the file system.
20pub struct LocalAutodiscoveryProvider {
21    search_paths: Vec<PathBuf>,
22    sender: Sender<AutodiscoveryEvent>,
23    listener_init: OnceCell<()>,
24}
25
26impl LocalAutodiscoveryProvider {
27    /// Creates a new `LocalAutodiscoveryProvider` that will monitor the specified paths.
28    pub fn new<P: AsRef<Path>>(paths: Vec<P>) -> Self {
29        let search_paths: Vec<PathBuf> = paths
30            .iter()
31            .filter_map(|p| {
32                if !p.as_ref().exists() {
33                    warn!("Skipping path '{}' as it does not exist", p.as_ref().display());
34                    return None;
35                }
36                if !p.as_ref().is_dir() {
37                    warn!("Skipping path '{}', it is not a directory.", p.as_ref().display());
38                    return None;
39                }
40                Some(p.as_ref().to_path_buf())
41            })
42            .collect();
43
44        let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(super::AD_STREAM_CAPACITY);
45
46        Self {
47            search_paths,
48            sender,
49            listener_init: OnceCell::new(),
50        }
51    }
52
53    /// Starts a background task that periodically scans for configuration changes
54    async fn start_background_monitor(&self, interval_sec: u64) {
55        let mut interval = interval(Duration::from_secs(interval_sec));
56        let sender = self.sender.clone();
57        let search_paths = self.search_paths.clone();
58
59        info!(
60            "Scanning for local autodiscovery events every {} seconds.",
61            interval_sec
62        );
63
64        tokio::spawn(async move {
65            let mut known_configs = HashSet::new();
66            let mut configs = BTreeMap::new();
67            loop {
68                interval.tick().await;
69
70                // Scan for configurations and emit events for changes
71                if let Err(e) = scan_and_emit_events(&search_paths, &mut known_configs, &sender, &mut configs).await {
72                    warn!("Error scanning for configurations: {}", e);
73                }
74            }
75        });
76    }
77}
78
79#[derive(Debug, Deserialize)]
80struct LocalCheckConfig {
81    #[serde(default)]
82    init_config: BTreeMap<String, serde_yaml::Value>,
83    instances: Vec<BTreeMap<String, serde_yaml::Value>>,
84}
85
86/// Parse a YAML file into a Config object.
87async fn parse_config_file(path: &PathBuf, check_name: &str) -> Result<(String, CheckConfig), GenericError> {
88    let content = fs::read_to_string(path).await?;
89
90    let check_config: LocalCheckConfig = match serde_yaml::from_str(&content) {
91        Ok(read) => read,
92        Err(e) => {
93            return Err(GenericError::from(e).context("Failed to decode yaml as check configuration."));
94        }
95    };
96
97    let canonicalized_path = fs::canonicalize(&path).await?;
98
99    // Build config ID from the file path
100    let config_id = canonicalized_path.to_string_lossy().replace(['/', '\\'], "_");
101
102    let instances: Vec<Data> = check_config
103        .instances
104        .into_iter()
105        .map(|instance| {
106            let mut result = BTreeMap::new();
107            for (key, value) in instance {
108                result.insert(key.into(), value);
109            }
110            Data { value: result }
111        })
112        .collect();
113
114    let init_config = {
115        let mut result = BTreeMap::new();
116        for (key, value) in check_config.init_config {
117            result.insert(key.into(), value);
118        }
119        Data { value: result }
120    };
121
122    // Create a Config
123    let config = Config {
124        name: MetaString::from(check_name),
125        init_config,
126        instances,
127        metric_config: Data::default(),
128        logs_config: Data::default(),
129        ad_identifiers: Vec::new(),
130        provider: MetaString::empty(),
131        service_id: MetaString::empty(),
132        tagger_entity: MetaString::empty(),
133        cluster_check: false,
134        node_name: MetaString::empty(),
135        source: MetaString::from_static("local"),
136        ignore_autodiscovery_tags: false,
137        metrics_excluded: false,
138        logs_excluded: false,
139        advanced_ad_identifiers: Vec::new(),
140    };
141
142    let check_config = CheckConfig::from(config);
143
144    Ok((config_id, check_config))
145}
146
147/// Process a single YAML config file and record it in the tracking structures, emitting events as
148/// needed.
149async fn process_yaml_file(
150    path: PathBuf, check_name: &str, found_configs: &mut HashSet<String>, known_configs: &mut HashSet<String>,
151    sender: &Sender<AutodiscoveryEvent>, configs: &mut BTreeMap<String, CheckConfig>,
152) {
153    match parse_config_file(&path, check_name).await {
154        Ok((config_id, config)) => {
155            found_configs.insert(config_id.clone());
156
157            if !known_configs.contains(&config_id) {
158                debug!("New configuration found: {}", config_id);
159                let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() };
160                let _ = sender.send(event);
161                known_configs.insert(config_id.clone());
162                configs.insert(config_id, config);
163            } else {
164                // Config ID exists, but the content might have changed
165                let existing_config = configs.get(&config_id).unwrap();
166                if *existing_config != config {
167                    configs.insert(config_id.clone(), config.clone());
168                    debug!("Configuration updated: {}", config_id);
169                    let event = AutodiscoveryEvent::CheckSchedule { config };
170                    let _ = sender.send(event);
171                }
172            }
173        }
174        Err(e) => {
175            warn!("Failed to parse config file {}: {}", path.display(), e);
176        }
177    }
178}
179
180/// Returns true if the path has a YAML extension (`yaml` or `yml`).
181fn is_yaml_file(path: &Path) -> bool {
182    matches!(path.extension().and_then(|e| e.to_str()), Some("yaml") | Some("yml"))
183}
184
185/// Scan and emit events based on configuration files in the directory.
186///
187/// Supports two layouts:
188/// - `<search-path>/<check-name>.yaml` — flat YAML files.
189/// - `<search-path>/<check-name>.d/*.yaml` — directory-based configs; the check name is derived
190///   from the directory stem (the `.d` suffix is stripped).
191async fn scan_and_emit_events(
192    paths: &[PathBuf], known_configs: &mut HashSet<String>, sender: &Sender<AutodiscoveryEvent>,
193    configs: &mut BTreeMap<String, CheckConfig>,
194) -> Result<(), GenericError> {
195    let mut found_configs = HashSet::new();
196
197    for path in paths {
198        let mut entries = fs::read_dir(path).await?;
199        while let Ok(Some(entry)) = entries.next_entry().await {
200            let path = entry.path();
201
202            if is_yaml_file(&path) {
203                // Flat YAML file: <check-name>.yaml
204                let check_name = path.file_stem().unwrap().to_string_lossy().into_owned();
205                process_yaml_file(path, &check_name, &mut found_configs, known_configs, sender, configs).await;
206            } else if path.is_dir() {
207                // Directory: only process if it matches the `<check-name>.d` convention
208                let dir_name = path.file_name().unwrap_or_default().to_string_lossy();
209                if let Some(check_name) = dir_name.strip_suffix(".d") {
210                    let Ok(mut sub_entries) = fs::read_dir(&path).await else {
211                        warn!("Failed to read directory {}", path.display());
212                        continue;
213                    };
214                    while let Ok(Some(sub_entry)) = sub_entries.next_entry().await {
215                        let sub_path = sub_entry.path();
216                        if is_yaml_file(&sub_path) {
217                            process_yaml_file(sub_path, check_name, &mut found_configs, known_configs, sender, configs)
218                                .await;
219                        }
220                    }
221                }
222            }
223        }
224    }
225
226    // Clean up removed configurations
227    let to_remove: Vec<String> = known_configs
228        .iter()
229        .filter(|config_id| !found_configs.contains(*config_id))
230        .cloned()
231        .collect();
232
233    for config_id in to_remove {
234        debug!("Configuration removed: {}", config_id);
235        known_configs.remove(&config_id);
236
237        let config = configs.remove(&config_id).unwrap();
238
239        // Create an unschedule Config event
240        let event = AutodiscoveryEvent::CheckUnscheduled { config };
241        let _ = sender.send(event);
242    }
243
244    Ok(())
245}
246
247#[async_trait]
248impl AutodiscoveryProvider for LocalAutodiscoveryProvider {
249    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
250        self.listener_init
251            .get_or_init(|| async {
252                self.start_background_monitor(BG_MONITOR_INTERVAL).await;
253            })
254            .await;
255
256        Some(self.sender.subscribe())
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use std::path::Path;
263
264    use tempfile::tempdir;
265    use tokio::io::AsyncWriteExt;
266
267    use super::*;
268
269    // Get the path to the test_data directory
270    fn test_data_path() -> PathBuf {
271        let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
272        PathBuf::from(manifest_dir)
273            .join("src")
274            .join("autodiscovery")
275            .join("providers")
276            .join("test_data")
277    }
278
279    // Copy a `<check>.d` directory from test_data into the temp directory, preserving the structure.
280    async fn copy_test_check_dir(check_dir_name: &str, temp_dir: &Path) {
281        let source_dir = test_data_path().join(check_dir_name);
282        let target_dir = temp_dir.join(check_dir_name);
283        fs::create_dir_all(&target_dir).await.unwrap();
284
285        let mut entries = fs::read_dir(&source_dir).await.unwrap();
286        while let Ok(Some(entry)) = entries.next_entry().await {
287            fs::copy(entry.path(), target_dir.join(entry.file_name()))
288                .await
289                .unwrap();
290        }
291    }
292
293    // Copy a file from test_data to the temp directory
294    async fn copy_test_file(source_name: &str, temp_dir: &Path) -> PathBuf {
295        let source_path = test_data_path().join(source_name);
296        let target_path = temp_dir.join(source_name);
297
298        let content = fs::read_to_string(&source_path)
299            .await
300            .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
301
302        let mut file = fs::File::create(&target_path).await.unwrap();
303        file.write_all(content.as_bytes()).await.unwrap();
304
305        target_path
306    }
307
308    #[tokio::test]
309    async fn test_parse_config_file() {
310        let test_file = test_data_path().join("test-config.yaml");
311
312        let (id, config) = parse_config_file(&test_file, "test-config").await.unwrap();
313
314        assert!(id.contains("saluki-env_src_autodiscovery_providers_test_data_test-config.yaml"));
315        assert_eq!(config.name, "test-config");
316        assert_eq!(
317            config.init_config.value.get("service"),
318            Some(&serde_yaml::Value::String("test-service".to_string()))
319        );
320        assert_eq!(config.source, "local");
321    }
322
323    #[tokio::test]
324    async fn test_parse_minimal_config_file() {
325        let test_file = test_data_path().join("test-minimal-config.yaml");
326
327        let (_, config) = parse_config_file(&test_file, "test-minimal-config").await.unwrap();
328
329        // Parsing a config without `init_config` yields an empty hash map for that field.
330        assert!(config.init_config.value.is_empty());
331    }
332
333    #[tokio::test]
334    async fn test_scan_and_emit_events_new_config() {
335        let dir = tempdir().unwrap();
336        let _test_file = copy_test_file("config1.yaml", dir.path()).await;
337
338        let mut known_configs = HashSet::new();
339        let mut configs = BTreeMap::new();
340        let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
341
342        scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
343            .await
344            .unwrap();
345
346        assert_eq!(known_configs.len(), 1);
347
348        let event = receiver.try_recv().unwrap();
349        assert!(matches!(event, AutodiscoveryEvent::CheckSchedule { .. }));
350
351        if let AutodiscoveryEvent::Schedule { config } = event {
352            assert_eq!(config.name, "config1.yaml");
353            assert_eq!(config.instances.len(), 1);
354            assert_eq!(
355                config.instances[0].value.get("server"),
356                Some(&serde_yaml::Value::String("localhost".to_string()))
357            );
358            assert_eq!(
359                config.instances[0].value.get("port"),
360                Some(&serde_yaml::Value::Number(8080.into()))
361            );
362            assert_eq!(
363                config.instances[0].value.get("tags"),
364                Some(&serde_yaml::Value::Sequence(vec![
365                    serde_yaml::Value::String("test:true".to_string()),
366                    serde_yaml::Value::String("env:test".to_string())
367                ]))
368            );
369        }
370        assert!(receiver.try_recv().is_err());
371    }
372
373    #[tokio::test]
374    async fn test_scan_and_emit_events_removed_config() {
375        let dir = tempdir().unwrap();
376
377        let mut known_configs = HashSet::new();
378        known_configs.insert("removed-config".to_string());
379        let mut configs = BTreeMap::new();
380        configs.insert(
381            "removed-config".to_string(),
382            CheckConfig {
383                name: MetaString::from("removed-config"),
384                init_config: Data::default(),
385                instances: Vec::new(),
386                source: MetaString::from_static("local"),
387            },
388        );
389
390        let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
391
392        scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
393            .await
394            .unwrap();
395
396        assert_eq!(known_configs.len(), 0);
397
398        let event = receiver.try_recv().unwrap();
399        assert!(matches!(event, AutodiscoveryEvent::CheckUnscheduled { config } if config.name == "removed-config"));
400
401        assert!(receiver.try_recv().is_err());
402    }
403
404    #[tokio::test]
405    async fn test_scan_and_emit_events_check_dir() {
406        let dir = tempdir().unwrap();
407        copy_test_check_dir("test-check.d", dir.path()).await;
408
409        let mut known_configs = HashSet::new();
410        let mut configs = BTreeMap::new();
411        let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
412
413        scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
414            .await
415            .unwrap();
416
417        // One config file inside test-check.d/
418        assert_eq!(known_configs.len(), 1);
419
420        let event = receiver.try_recv().unwrap();
421        assert!(
422            matches!(&event, AutodiscoveryEvent::CheckSchedule { config } if config.name == "test-check"),
423            "expected check name 'test-check', got: {:?}",
424            event
425        );
426
427        assert!(receiver.try_recv().is_err());
428    }
429}