Skip to main content

saluki_env/autodiscovery/providers/
local.rs

1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use saluki_error::GenericError;
7use serde::Deserialize;
8use stringtheory::MetaString;
9use tokio::fs;
10use tokio::sync::broadcast::{self, Receiver, Sender};
11use tokio::sync::OnceCell;
12use tokio::time::{interval, Duration};
13use tracing::{debug, info, warn};
14
15use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfig, Config, Data};
16
17const BG_MONITOR_INTERVAL: u64 = 30;
18
19/// A local auto-discovery provider that uses the file system.
20pub struct LocalAutodiscoveryProvider {
21    search_paths: Vec<PathBuf>,
22    sender: Sender<AutodiscoveryEvent>,
23    listener_init: OnceCell<()>,
24}
25
26impl LocalAutodiscoveryProvider {
27    /// Creates a new `LocalAutodiscoveryProvider` that will monitor the specified paths.
28    pub fn new<P: AsRef<Path>>(paths: Vec<P>) -> Self {
29        let search_paths: Vec<PathBuf> = paths
30            .iter()
31            .filter_map(|p| {
32                if !p.as_ref().exists() {
33                    warn!("Skipping path '{}' as it does not exist", p.as_ref().display());
34                    return None;
35                }
36                if !p.as_ref().is_dir() {
37                    warn!("Skipping path '{}', it is not a directory.", p.as_ref().display());
38                    return None;
39                }
40                Some(p.as_ref().to_path_buf())
41            })
42            .collect();
43
44        let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(super::AD_STREAM_CAPACITY);
45
46        Self {
47            search_paths,
48            sender,
49            listener_init: OnceCell::new(),
50        }
51    }
52
53    /// Starts a background task that periodically scans for configuration changes
54    async fn start_background_monitor(&self, interval_sec: u64) {
55        let mut interval = interval(Duration::from_secs(interval_sec));
56        let sender = self.sender.clone();
57        let search_paths = self.search_paths.clone();
58
59        info!(
60            "Scanning for local autodiscovery events every {} seconds.",
61            interval_sec
62        );
63
64        tokio::spawn(async move {
65            let mut known_configs = HashSet::new();
66            let mut configs = HashMap::new();
67            loop {
68                interval.tick().await;
69
70                // Scan for configurations and emit events for changes
71                if let Err(e) = scan_and_emit_events(&search_paths, &mut known_configs, &sender, &mut configs).await {
72                    warn!("Error scanning for configurations: {}", e);
73                }
74            }
75        });
76    }
77}
78
79#[derive(Debug, Deserialize)]
80struct LocalCheckConfig {
81    #[serde(default)]
82    init_config: HashMap<String, serde_yaml::Value>,
83    instances: Vec<HashMap<String, serde_yaml::Value>>,
84}
85
86/// Parse a YAML file into a Config object
87async fn parse_config_file(path: &PathBuf) -> Result<(String, CheckConfig), GenericError> {
88    let content = fs::read_to_string(path).await?;
89
90    let check_config: LocalCheckConfig = match serde_yaml::from_str(&content) {
91        Ok(read) => read,
92        Err(e) => {
93            return Err(GenericError::from(e).context("Failed to decode yaml as check configuration."));
94        }
95    };
96
97    let canonicalized_path = fs::canonicalize(&path).await?;
98
99    // Build config ID from the file path
100    let config_id = canonicalized_path.to_string_lossy().replace(['/', '\\'], "_");
101
102    let instances: Vec<Data> = check_config
103        .instances
104        .into_iter()
105        .map(|instance| {
106            let mut result = HashMap::new();
107            for (key, value) in instance {
108                result.insert(key.into(), value);
109            }
110            Data { value: result }
111        })
112        .collect();
113
114    let init_config = {
115        let mut result = HashMap::new();
116        for (key, value) in check_config.init_config {
117            result.insert(key.into(), value);
118        }
119        Data { value: result }
120    };
121
122    // Create a Config
123    let config = Config {
124        name: MetaString::from(path.file_stem().unwrap().to_string_lossy().to_string()),
125        init_config,
126        instances,
127        metric_config: Data::default(),
128        logs_config: Data::default(),
129        ad_identifiers: Vec::new(),
130        provider: MetaString::empty(),
131        service_id: MetaString::empty(),
132        tagger_entity: MetaString::empty(),
133        cluster_check: false,
134        node_name: MetaString::empty(),
135        source: MetaString::from_static("local"),
136        ignore_autodiscovery_tags: false,
137        metrics_excluded: false,
138        logs_excluded: false,
139        advanced_ad_identifiers: Vec::new(),
140    };
141
142    let check_config = CheckConfig::from(config);
143
144    Ok((config_id, check_config))
145}
146
147/// Scan and emit events based on configuration files in the directory
148async fn scan_and_emit_events(
149    paths: &[PathBuf], known_configs: &mut HashSet<String>, sender: &Sender<AutodiscoveryEvent>,
150    configs: &mut HashMap<String, CheckConfig>,
151) -> Result<(), GenericError> {
152    let mut found_configs = HashSet::new();
153
154    for path in paths {
155        let mut entries = fs::read_dir(path).await?;
156        while let Ok(Some(entry)) = entries.next_entry().await {
157            let path = entry.path();
158
159            // Only process YAML files
160            if let Some(ext) = path.extension() {
161                if ext == "yaml" || ext == "yml" {
162                    // Process the file if it's a valid configuration
163                    match parse_config_file(&path).await {
164                        Ok((config_id, config)) => {
165                            found_configs.insert(config_id.clone());
166
167                            // Check if this is a new or updated configuration
168                            if !known_configs.contains(&config_id) {
169                                debug!("New configuration found: {}", config_id);
170
171                                let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() };
172                                let _ = sender.send(event);
173                                known_configs.insert(config_id.clone());
174                                configs.insert(config_id.clone(), config);
175                            } else {
176                                // Config ID exists, but the content might have changed
177                                let existing_config = configs.get(&config_id).unwrap();
178                                if *existing_config != config {
179                                    configs.insert(config_id.clone(), config.clone());
180                                    debug!("Configuration updated: {}", config_id);
181                                    let event = AutodiscoveryEvent::CheckSchedule { config };
182                                    let _ = sender.send(event);
183                                }
184                            }
185                        }
186                        Err(e) => {
187                            warn!("Failed to parse config file {}: {}", path.display(), e);
188                        }
189                    }
190                }
191            }
192        }
193    }
194
195    // Clean up removed configurations
196    let to_remove: Vec<String> = known_configs
197        .iter()
198        .filter(|config_id| !found_configs.contains(*config_id))
199        .cloned()
200        .collect();
201
202    for config_id in to_remove {
203        debug!("Configuration removed: {}", config_id);
204        known_configs.remove(&config_id);
205
206        let config = configs.remove(&config_id).unwrap();
207
208        // Create an unschedule Config event
209        let event = AutodiscoveryEvent::CheckUnscheduled { config };
210        let _ = sender.send(event);
211    }
212
213    Ok(())
214}
215
216#[async_trait]
217impl AutodiscoveryProvider for LocalAutodiscoveryProvider {
218    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
219        self.listener_init
220            .get_or_init(|| async {
221                self.start_background_monitor(BG_MONITOR_INTERVAL).await;
222            })
223            .await;
224
225        Some(self.sender.subscribe())
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use std::path::Path;
232
233    use tempfile::tempdir;
234    use tokio::io::AsyncWriteExt;
235
236    use super::*;
237
238    // Get the path to the test_data directory
239    fn test_data_path() -> PathBuf {
240        let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
241        PathBuf::from(manifest_dir)
242            .join("src")
243            .join("autodiscovery")
244            .join("providers")
245            .join("test_data")
246    }
247
248    // Copy a file from test_data to the temp directory
249    async fn copy_test_file(source_name: &str, temp_dir: &Path) -> PathBuf {
250        let source_path = test_data_path().join(source_name);
251        let target_path = temp_dir.join(source_name);
252
253        let content = fs::read_to_string(&source_path)
254            .await
255            .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
256
257        let mut file = fs::File::create(&target_path).await.unwrap();
258        file.write_all(content.as_bytes()).await.unwrap();
259
260        target_path
261    }
262
263    #[tokio::test]
264    async fn test_parse_config_file() {
265        let test_file = test_data_path().join("test-config.yaml");
266
267        let (id, config) = parse_config_file(&test_file).await.unwrap();
268
269        assert!(id.contains("saluki-env_src_autodiscovery_providers_test_data_test-config.yaml"));
270        assert_eq!(config.name, "test-config");
271        assert_eq!(
272            config.init_config.value.get("service"),
273            Some(&serde_yaml::Value::String("test-service".to_string()))
274        );
275        assert_eq!(config.source, "local");
276    }
277
278    #[tokio::test]
279    async fn test_parse_minimal_config_file() {
280        let test_file = test_data_path().join("test-minimal-config.yaml");
281
282        let (_, config) = parse_config_file(&test_file).await.unwrap();
283
284        // Parsing a config without `init_config` yields an empty hash map for that field.
285        assert!(config.init_config.value.is_empty());
286    }
287
288    #[tokio::test]
289    async fn test_scan_and_emit_events_new_config() {
290        let dir = tempdir().unwrap();
291        let _test_file = copy_test_file("config1.yaml", dir.path()).await;
292
293        let mut known_configs = HashSet::new();
294        let mut configs = HashMap::new();
295        let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
296
297        scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
298            .await
299            .unwrap();
300
301        assert_eq!(known_configs.len(), 1);
302
303        let event = receiver.try_recv().unwrap();
304        assert!(matches!(event, AutodiscoveryEvent::CheckSchedule { .. }));
305
306        if let AutodiscoveryEvent::Schedule { config } = event {
307            assert_eq!(config.name, "config1.yaml");
308            assert_eq!(config.instances.len(), 1);
309            assert_eq!(
310                config.instances[0].value.get("server"),
311                Some(&serde_yaml::Value::String("localhost".to_string()))
312            );
313            assert_eq!(
314                config.instances[0].value.get("port"),
315                Some(&serde_yaml::Value::Number(8080.into()))
316            );
317            assert_eq!(
318                config.instances[0].value.get("tags"),
319                Some(&serde_yaml::Value::Sequence(vec![
320                    serde_yaml::Value::String("test:true".to_string()),
321                    serde_yaml::Value::String("env:test".to_string())
322                ]))
323            );
324        }
325        assert!(receiver.try_recv().is_err());
326    }
327
328    #[tokio::test]
329    async fn test_scan_and_emit_events_removed_config() {
330        let dir = tempdir().unwrap();
331
332        let mut known_configs = HashSet::new();
333        known_configs.insert("removed-config".to_string());
334        let mut configs = HashMap::new();
335        configs.insert(
336            "removed-config".to_string(),
337            CheckConfig {
338                name: MetaString::from("removed-config"),
339                init_config: Data::default(),
340                instances: Vec::new(),
341                source: MetaString::from_static("local"),
342            },
343        );
344
345        let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
346
347        scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
348            .await
349            .unwrap();
350
351        assert_eq!(known_configs.len(), 0);
352
353        let event = receiver.try_recv().unwrap();
354        assert!(matches!(event, AutodiscoveryEvent::CheckUnscheduled { config } if config.name == "removed-config"));
355
356        assert!(receiver.try_recv().is_err());
357    }
358}