saluki_env/autodiscovery/providers/
local.rs1use std::collections::BTreeMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use saluki_error::GenericError;
7use serde::Deserialize;
8use stringtheory::MetaString;
9use tokio::fs;
10use tokio::sync::broadcast::{self, Receiver, Sender};
11use tokio::sync::OnceCell;
12use tokio::time::{interval, Duration};
13use tracing::{debug, info, warn};
14
15use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfig, Config, Data};
16
17const BG_MONITOR_INTERVAL: u64 = 30;
18
19pub struct LocalAutodiscoveryProvider {
21 search_paths: Vec<PathBuf>,
22 sender: Sender<AutodiscoveryEvent>,
23 listener_init: OnceCell<()>,
24}
25
26impl LocalAutodiscoveryProvider {
27 pub fn new<P: AsRef<Path>>(paths: Vec<P>) -> Self {
29 let search_paths: Vec<PathBuf> = paths
30 .iter()
31 .filter_map(|p| {
32 if !p.as_ref().exists() {
33 warn!("Skipping path '{}' as it does not exist", p.as_ref().display());
34 return None;
35 }
36 if !p.as_ref().is_dir() {
37 warn!("Skipping path '{}', it is not a directory.", p.as_ref().display());
38 return None;
39 }
40 Some(p.as_ref().to_path_buf())
41 })
42 .collect();
43
44 let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(super::AD_STREAM_CAPACITY);
45
46 Self {
47 search_paths,
48 sender,
49 listener_init: OnceCell::new(),
50 }
51 }
52
53 async fn start_background_monitor(&self, interval_sec: u64) {
55 let mut interval = interval(Duration::from_secs(interval_sec));
56 let sender = self.sender.clone();
57 let search_paths = self.search_paths.clone();
58
59 info!(
60 "Scanning for local autodiscovery events every {} seconds.",
61 interval_sec
62 );
63
64 tokio::spawn(async move {
65 let mut known_configs = HashSet::new();
66 let mut configs = BTreeMap::new();
67 loop {
68 interval.tick().await;
69
70 if let Err(e) = scan_and_emit_events(&search_paths, &mut known_configs, &sender, &mut configs).await {
72 warn!("Error scanning for configurations: {}", e);
73 }
74 }
75 });
76 }
77}
78
79#[derive(Debug, Deserialize)]
80struct LocalCheckConfig {
81 #[serde(default)]
82 init_config: BTreeMap<String, serde_yaml::Value>,
83 instances: Vec<BTreeMap<String, serde_yaml::Value>>,
84}
85
86async fn parse_config_file(path: &PathBuf, check_name: &str) -> Result<(String, CheckConfig), GenericError> {
88 let content = fs::read_to_string(path).await?;
89
90 let check_config: LocalCheckConfig = match serde_yaml::from_str(&content) {
91 Ok(read) => read,
92 Err(e) => {
93 return Err(GenericError::from(e).context("Failed to decode yaml as check configuration."));
94 }
95 };
96
97 let canonicalized_path = fs::canonicalize(&path).await?;
98
99 let config_id = canonicalized_path.to_string_lossy().replace(['/', '\\'], "_");
101
102 let instances: Vec<Data> = check_config
103 .instances
104 .into_iter()
105 .map(|instance| {
106 let mut result = BTreeMap::new();
107 for (key, value) in instance {
108 result.insert(key.into(), value);
109 }
110 Data { value: result }
111 })
112 .collect();
113
114 let init_config = {
115 let mut result = BTreeMap::new();
116 for (key, value) in check_config.init_config {
117 result.insert(key.into(), value);
118 }
119 Data { value: result }
120 };
121
122 let config = Config {
124 name: MetaString::from(check_name),
125 init_config,
126 instances,
127 metric_config: Data::default(),
128 logs_config: Data::default(),
129 ad_identifiers: Vec::new(),
130 provider: MetaString::empty(),
131 service_id: MetaString::empty(),
132 tagger_entity: MetaString::empty(),
133 cluster_check: false,
134 node_name: MetaString::empty(),
135 source: MetaString::from_static("local"),
136 ignore_autodiscovery_tags: false,
137 metrics_excluded: false,
138 logs_excluded: false,
139 advanced_ad_identifiers: Vec::new(),
140 };
141
142 let check_config = CheckConfig::from(config);
143
144 Ok((config_id, check_config))
145}
146
147async fn process_yaml_file(
150 path: PathBuf, check_name: &str, found_configs: &mut HashSet<String>, known_configs: &mut HashSet<String>,
151 sender: &Sender<AutodiscoveryEvent>, configs: &mut BTreeMap<String, CheckConfig>,
152) {
153 match parse_config_file(&path, check_name).await {
154 Ok((config_id, config)) => {
155 found_configs.insert(config_id.clone());
156
157 if !known_configs.contains(&config_id) {
158 debug!("New configuration found: {}", config_id);
159 let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() };
160 let _ = sender.send(event);
161 known_configs.insert(config_id.clone());
162 configs.insert(config_id, config);
163 } else {
164 let existing_config = configs.get(&config_id).unwrap();
166 if *existing_config != config {
167 configs.insert(config_id.clone(), config.clone());
168 debug!("Configuration updated: {}", config_id);
169 let event = AutodiscoveryEvent::CheckSchedule { config };
170 let _ = sender.send(event);
171 }
172 }
173 }
174 Err(e) => {
175 warn!("Failed to parse config file {}: {}", path.display(), e);
176 }
177 }
178}
179
180fn is_yaml_file(path: &Path) -> bool {
182 matches!(path.extension().and_then(|e| e.to_str()), Some("yaml") | Some("yml"))
183}
184
185async fn scan_and_emit_events(
192 paths: &[PathBuf], known_configs: &mut HashSet<String>, sender: &Sender<AutodiscoveryEvent>,
193 configs: &mut BTreeMap<String, CheckConfig>,
194) -> Result<(), GenericError> {
195 let mut found_configs = HashSet::new();
196
197 for path in paths {
198 let mut entries = fs::read_dir(path).await?;
199 while let Ok(Some(entry)) = entries.next_entry().await {
200 let path = entry.path();
201
202 if is_yaml_file(&path) {
203 let check_name = path.file_stem().unwrap().to_string_lossy().into_owned();
205 process_yaml_file(path, &check_name, &mut found_configs, known_configs, sender, configs).await;
206 } else if path.is_dir() {
207 let dir_name = path.file_name().unwrap_or_default().to_string_lossy();
209 if let Some(check_name) = dir_name.strip_suffix(".d") {
210 let Ok(mut sub_entries) = fs::read_dir(&path).await else {
211 warn!("Failed to read directory {}", path.display());
212 continue;
213 };
214 while let Ok(Some(sub_entry)) = sub_entries.next_entry().await {
215 let sub_path = sub_entry.path();
216 if is_yaml_file(&sub_path) {
217 process_yaml_file(sub_path, check_name, &mut found_configs, known_configs, sender, configs)
218 .await;
219 }
220 }
221 }
222 }
223 }
224 }
225
226 let to_remove: Vec<String> = known_configs
228 .iter()
229 .filter(|config_id| !found_configs.contains(*config_id))
230 .cloned()
231 .collect();
232
233 for config_id in to_remove {
234 debug!("Configuration removed: {}", config_id);
235 known_configs.remove(&config_id);
236
237 let config = configs.remove(&config_id).unwrap();
238
239 let event = AutodiscoveryEvent::CheckUnscheduled { config };
241 let _ = sender.send(event);
242 }
243
244 Ok(())
245}
246
247#[async_trait]
248impl AutodiscoveryProvider for LocalAutodiscoveryProvider {
249 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
250 self.listener_init
251 .get_or_init(|| async {
252 self.start_background_monitor(BG_MONITOR_INTERVAL).await;
253 })
254 .await;
255
256 Some(self.sender.subscribe())
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use std::path::Path;
263
264 use tempfile::tempdir;
265 use tokio::io::AsyncWriteExt;
266
267 use super::*;
268
269 fn test_data_path() -> PathBuf {
271 let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
272 PathBuf::from(manifest_dir)
273 .join("src")
274 .join("autodiscovery")
275 .join("providers")
276 .join("test_data")
277 }
278
279 async fn copy_test_check_dir(check_dir_name: &str, temp_dir: &Path) {
281 let source_dir = test_data_path().join(check_dir_name);
282 let target_dir = temp_dir.join(check_dir_name);
283 fs::create_dir_all(&target_dir).await.unwrap();
284
285 let mut entries = fs::read_dir(&source_dir).await.unwrap();
286 while let Ok(Some(entry)) = entries.next_entry().await {
287 fs::copy(entry.path(), target_dir.join(entry.file_name()))
288 .await
289 .unwrap();
290 }
291 }
292
293 async fn copy_test_file(source_name: &str, temp_dir: &Path) -> PathBuf {
295 let source_path = test_data_path().join(source_name);
296 let target_path = temp_dir.join(source_name);
297
298 let content = fs::read_to_string(&source_path)
299 .await
300 .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
301
302 let mut file = fs::File::create(&target_path).await.unwrap();
303 file.write_all(content.as_bytes()).await.unwrap();
304
305 target_path
306 }
307
308 #[tokio::test]
309 async fn test_parse_config_file() {
310 let test_file = test_data_path().join("test-config.yaml");
311
312 let (id, config) = parse_config_file(&test_file, "test-config").await.unwrap();
313
314 assert!(id.contains("saluki-env_src_autodiscovery_providers_test_data_test-config.yaml"));
315 assert_eq!(config.name, "test-config");
316 assert_eq!(
317 config.init_config.value.get("service"),
318 Some(&serde_yaml::Value::String("test-service".to_string()))
319 );
320 assert_eq!(config.source, "local");
321 }
322
323 #[tokio::test]
324 async fn test_parse_minimal_config_file() {
325 let test_file = test_data_path().join("test-minimal-config.yaml");
326
327 let (_, config) = parse_config_file(&test_file, "test-minimal-config").await.unwrap();
328
329 assert!(config.init_config.value.is_empty());
331 }
332
333 #[tokio::test]
334 async fn test_scan_and_emit_events_new_config() {
335 let dir = tempdir().unwrap();
336 let _test_file = copy_test_file("config1.yaml", dir.path()).await;
337
338 let mut known_configs = HashSet::new();
339 let mut configs = BTreeMap::new();
340 let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
341
342 scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
343 .await
344 .unwrap();
345
346 assert_eq!(known_configs.len(), 1);
347
348 let event = receiver.try_recv().unwrap();
349 assert!(matches!(event, AutodiscoveryEvent::CheckSchedule { .. }));
350
351 if let AutodiscoveryEvent::Schedule { config } = event {
352 assert_eq!(config.name, "config1.yaml");
353 assert_eq!(config.instances.len(), 1);
354 assert_eq!(
355 config.instances[0].value.get("server"),
356 Some(&serde_yaml::Value::String("localhost".to_string()))
357 );
358 assert_eq!(
359 config.instances[0].value.get("port"),
360 Some(&serde_yaml::Value::Number(8080.into()))
361 );
362 assert_eq!(
363 config.instances[0].value.get("tags"),
364 Some(&serde_yaml::Value::Sequence(vec![
365 serde_yaml::Value::String("test:true".to_string()),
366 serde_yaml::Value::String("env:test".to_string())
367 ]))
368 );
369 }
370 assert!(receiver.try_recv().is_err());
371 }
372
373 #[tokio::test]
374 async fn test_scan_and_emit_events_removed_config() {
375 let dir = tempdir().unwrap();
376
377 let mut known_configs = HashSet::new();
378 known_configs.insert("removed-config".to_string());
379 let mut configs = BTreeMap::new();
380 configs.insert(
381 "removed-config".to_string(),
382 CheckConfig {
383 name: MetaString::from("removed-config"),
384 init_config: Data::default(),
385 instances: Vec::new(),
386 source: MetaString::from_static("local"),
387 },
388 );
389
390 let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
391
392 scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
393 .await
394 .unwrap();
395
396 assert_eq!(known_configs.len(), 0);
397
398 let event = receiver.try_recv().unwrap();
399 assert!(matches!(event, AutodiscoveryEvent::CheckUnscheduled { config } if config.name == "removed-config"));
400
401 assert!(receiver.try_recv().is_err());
402 }
403
404 #[tokio::test]
405 async fn test_scan_and_emit_events_check_dir() {
406 let dir = tempdir().unwrap();
407 copy_test_check_dir("test-check.d", dir.path()).await;
408
409 let mut known_configs = HashSet::new();
410 let mut configs = BTreeMap::new();
411 let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
412
413 scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
414 .await
415 .unwrap();
416
417 assert_eq!(known_configs.len(), 1);
419
420 let event = receiver.try_recv().unwrap();
421 assert!(
422 matches!(&event, AutodiscoveryEvent::CheckSchedule { config } if config.name == "test-check"),
423 "expected check name 'test-check', got: {:?}",
424 event
425 );
426
427 assert!(receiver.try_recv().is_err());
428 }
429}