saluki_env/autodiscovery/providers/
local.rs

1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use saluki_error::GenericError;
7use serde::Deserialize;
8use stringtheory::MetaString;
9use tokio::fs;
10use tokio::sync::broadcast::{self, Receiver, Sender};
11use tokio::sync::OnceCell;
12use tokio::time::{interval, Duration};
13use tracing::{debug, info, warn};
14
15use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfig, Config, Data};
16
17const BG_MONITOR_INTERVAL: u64 = 30;
18
19/// A local auto-discovery provider that uses the file system.
20pub struct LocalAutodiscoveryProvider {
21    search_paths: Vec<PathBuf>,
22    sender: Sender<AutodiscoveryEvent>,
23    listener_init: OnceCell<()>,
24}
25
26impl LocalAutodiscoveryProvider {
27    /// Creates a new `LocalAutodiscoveryProvider` that will monitor the specified paths.
28    pub fn new<P: AsRef<Path>>(paths: Vec<P>) -> Self {
29        let search_paths: Vec<PathBuf> = paths
30            .iter()
31            .filter_map(|p| {
32                if !p.as_ref().exists() {
33                    warn!("Skipping path '{}' as it does not exist", p.as_ref().display());
34                    return None;
35                }
36                if !p.as_ref().is_dir() {
37                    warn!("Skipping path '{}', it is not a directory.", p.as_ref().display());
38                    return None;
39                }
40                Some(p.as_ref().to_path_buf())
41            })
42            .collect();
43
44        let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(super::AD_STREAM_CAPACITY);
45
46        Self {
47            search_paths,
48            sender,
49            listener_init: OnceCell::new(),
50        }
51    }
52
53    /// Starts a background task that periodically scans for configuration changes
54    async fn start_background_monitor(&self, interval_sec: u64) {
55        let mut interval = interval(Duration::from_secs(interval_sec));
56        let sender = self.sender.clone();
57        let search_paths = self.search_paths.clone();
58
59        info!(
60            "Scanning for local autodiscovery events every {} seconds.",
61            interval_sec
62        );
63
64        tokio::spawn(async move {
65            let mut known_configs = HashSet::new();
66            let mut configs = HashMap::new();
67            loop {
68                interval.tick().await;
69
70                // Scan for configurations and emit events for changes
71                if let Err(e) = scan_and_emit_events(&search_paths, &mut known_configs, &sender, &mut configs).await {
72                    warn!("Error scanning for configurations: {}", e);
73                }
74            }
75        });
76    }
77}
78
79#[derive(Debug, Deserialize)]
80struct LocalCheckConfig {
81    init_config: HashMap<String, serde_yaml::Value>,
82    instances: Vec<HashMap<String, serde_yaml::Value>>,
83}
84
85/// Parse a YAML file into a Config object
86async fn parse_config_file(path: &PathBuf) -> Result<(String, CheckConfig), GenericError> {
87    let content = fs::read_to_string(path).await?;
88
89    let check_config: LocalCheckConfig = match serde_yaml::from_str(&content) {
90        Ok(read) => read,
91        Err(e) => {
92            return Err(GenericError::from(e).context("Failed to decode yaml as check configuration."));
93        }
94    };
95
96    let canonicalized_path = fs::canonicalize(&path).await?;
97
98    // Build config ID from the file path
99    let config_id = canonicalized_path.to_string_lossy().replace(['/', '\\'], "_");
100
101    let instances: Vec<Data> = check_config
102        .instances
103        .into_iter()
104        .map(|instance| {
105            let mut result = HashMap::new();
106            for (key, value) in instance {
107                result.insert(key.into(), value);
108            }
109            Data { value: result }
110        })
111        .collect();
112
113    let init_config = {
114        let mut result = HashMap::new();
115        for (key, value) in check_config.init_config {
116            result.insert(key.into(), value);
117        }
118        Data { value: result }
119    };
120
121    // Create a Config
122    let config = Config {
123        name: MetaString::from(path.file_stem().unwrap().to_string_lossy().to_string()),
124        init_config,
125        instances,
126        metric_config: Data::default(),
127        logs_config: Data::default(),
128        ad_identifiers: Vec::new(),
129        provider: MetaString::empty(),
130        service_id: MetaString::empty(),
131        tagger_entity: MetaString::empty(),
132        cluster_check: false,
133        node_name: MetaString::empty(),
134        source: MetaString::from_static("local"),
135        ignore_autodiscovery_tags: false,
136        metrics_excluded: false,
137        logs_excluded: false,
138        advanced_ad_identifiers: Vec::new(),
139    };
140
141    let check_config = CheckConfig::from(config);
142
143    Ok((config_id, check_config))
144}
145
146/// Scan and emit events based on configuration files in the directory
147async fn scan_and_emit_events(
148    paths: &[PathBuf], known_configs: &mut HashSet<String>, sender: &Sender<AutodiscoveryEvent>,
149    configs: &mut HashMap<String, CheckConfig>,
150) -> Result<(), GenericError> {
151    let mut found_configs = HashSet::new();
152
153    for path in paths {
154        let mut entries = fs::read_dir(path).await?;
155        while let Ok(Some(entry)) = entries.next_entry().await {
156            let path = entry.path();
157
158            // Only process YAML files
159            if let Some(ext) = path.extension() {
160                if ext == "yaml" || ext == "yml" {
161                    // Process the file if it's a valid configuration
162                    match parse_config_file(&path).await {
163                        Ok((config_id, config)) => {
164                            found_configs.insert(config_id.clone());
165
166                            // Check if this is a new or updated configuration
167                            if !known_configs.contains(&config_id) {
168                                debug!("New configuration found: {}", config_id);
169
170                                let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() };
171                                let _ = sender.send(event);
172                                known_configs.insert(config_id.clone());
173                                configs.insert(config_id.clone(), config);
174                            } else {
175                                // Config ID exists, but the content might have changed
176                                let existing_config = configs.get(&config_id).unwrap();
177                                if *existing_config != config {
178                                    configs.insert(config_id.clone(), config.clone());
179                                    debug!("Configuration updated: {}", config_id);
180                                    let event = AutodiscoveryEvent::CheckSchedule { config };
181                                    let _ = sender.send(event);
182                                }
183                            }
184                        }
185                        Err(e) => {
186                            warn!("Failed to parse config file {}: {}", path.display(), e);
187                        }
188                    }
189                }
190            }
191        }
192    }
193
194    // Clean up removed configurations
195    let to_remove: Vec<String> = known_configs
196        .iter()
197        .filter(|config_id| !found_configs.contains(*config_id))
198        .cloned()
199        .collect();
200
201    for config_id in to_remove {
202        debug!("Configuration removed: {}", config_id);
203        known_configs.remove(&config_id);
204
205        let config = configs.remove(&config_id).unwrap();
206
207        // Create an unschedule Config event
208        let event = AutodiscoveryEvent::CheckUnscheduled { config };
209        let _ = sender.send(event);
210    }
211
212    Ok(())
213}
214
215#[async_trait]
216impl AutodiscoveryProvider for LocalAutodiscoveryProvider {
217    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
218        self.listener_init
219            .get_or_init(|| async {
220                self.start_background_monitor(BG_MONITOR_INTERVAL).await;
221            })
222            .await;
223
224        Some(self.sender.subscribe())
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use std::path::Path;
231
232    use tempfile::tempdir;
233    use tokio::io::AsyncWriteExt;
234
235    use super::*;
236
237    // Get the path to the test_data directory
238    fn test_data_path() -> PathBuf {
239        let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
240        PathBuf::from(manifest_dir)
241            .join("src")
242            .join("autodiscovery")
243            .join("providers")
244            .join("test_data")
245    }
246
247    // Copy a file from test_data to the temp directory
248    async fn copy_test_file(source_name: &str, temp_dir: &Path) -> PathBuf {
249        let source_path = test_data_path().join(source_name);
250        let target_path = temp_dir.join(source_name);
251
252        let content = fs::read_to_string(&source_path)
253            .await
254            .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
255
256        let mut file = fs::File::create(&target_path).await.unwrap();
257        file.write_all(content.as_bytes()).await.unwrap();
258
259        target_path
260    }
261
262    #[tokio::test]
263    async fn test_parse_config_file() {
264        let test_file = test_data_path().join("test-config.yaml");
265
266        let (id, config) = parse_config_file(&test_file).await.unwrap();
267
268        assert!(id.contains("saluki-env_src_autodiscovery_providers_test_data_test-config.yaml"));
269        assert_eq!(config.name, "test-config");
270        assert_eq!(
271            config.init_config.value.get("service"),
272            Some(&serde_yaml::Value::String("test-service".to_string()))
273        );
274        assert_eq!(config.source, "local");
275    }
276
277    #[tokio::test]
278    async fn test_scan_and_emit_events_new_config() {
279        let dir = tempdir().unwrap();
280        let _test_file = copy_test_file("config1.yaml", dir.path()).await;
281
282        let mut known_configs = HashSet::new();
283        let mut configs = HashMap::new();
284        let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
285
286        scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
287            .await
288            .unwrap();
289
290        assert_eq!(known_configs.len(), 1);
291
292        let event = receiver.try_recv().unwrap();
293        assert!(matches!(event, AutodiscoveryEvent::CheckSchedule { .. }));
294
295        if let AutodiscoveryEvent::Schedule { config } = event {
296            assert_eq!(config.name, "config1.yaml");
297            assert_eq!(config.instances.len(), 1);
298            assert_eq!(
299                config.instances[0].value.get("server"),
300                Some(&serde_yaml::Value::String("localhost".to_string()))
301            );
302            assert_eq!(
303                config.instances[0].value.get("port"),
304                Some(&serde_yaml::Value::Number(8080.into()))
305            );
306            assert_eq!(
307                config.instances[0].value.get("tags"),
308                Some(&serde_yaml::Value::Sequence(vec![
309                    serde_yaml::Value::String("test:true".to_string()),
310                    serde_yaml::Value::String("env:test".to_string())
311                ]))
312            );
313        }
314        assert!(receiver.try_recv().is_err());
315    }
316
317    #[tokio::test]
318    async fn test_scan_and_emit_events_removed_config() {
319        let dir = tempdir().unwrap();
320
321        let mut known_configs = HashSet::new();
322        known_configs.insert("removed-config".to_string());
323        let mut configs = HashMap::new();
324        configs.insert(
325            "removed-config".to_string(),
326            CheckConfig {
327                name: MetaString::from("removed-config"),
328                init_config: Data::default(),
329                instances: Vec::new(),
330                source: MetaString::from_static("local"),
331            },
332        );
333
334        let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
335
336        scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
337            .await
338            .unwrap();
339
340        assert_eq!(known_configs.len(), 0);
341
342        let event = receiver.try_recv().unwrap();
343        assert!(matches!(event, AutodiscoveryEvent::CheckUnscheduled { config } if config.name == "removed-config"));
344
345        assert!(receiver.try_recv().is_err());
346    }
347}