saluki_env/autodiscovery/providers/
local.rs1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use saluki_error::GenericError;
7use serde::Deserialize;
8use stringtheory::MetaString;
9use tokio::fs;
10use tokio::sync::broadcast::{self, Receiver, Sender};
11use tokio::sync::OnceCell;
12use tokio::time::{interval, Duration};
13use tracing::{debug, info, warn};
14
15use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfig, Config, Data};
16
17const BG_MONITOR_INTERVAL: u64 = 30;
18
19pub struct LocalAutodiscoveryProvider {
21 search_paths: Vec<PathBuf>,
22 sender: Sender<AutodiscoveryEvent>,
23 listener_init: OnceCell<()>,
24}
25
26impl LocalAutodiscoveryProvider {
27 pub fn new<P: AsRef<Path>>(paths: Vec<P>) -> Self {
29 let search_paths: Vec<PathBuf> = paths
30 .iter()
31 .filter_map(|p| {
32 if !p.as_ref().exists() {
33 warn!("Skipping path '{}' as it does not exist", p.as_ref().display());
34 return None;
35 }
36 if !p.as_ref().is_dir() {
37 warn!("Skipping path '{}', it is not a directory.", p.as_ref().display());
38 return None;
39 }
40 Some(p.as_ref().to_path_buf())
41 })
42 .collect();
43
44 let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(super::AD_STREAM_CAPACITY);
45
46 Self {
47 search_paths,
48 sender,
49 listener_init: OnceCell::new(),
50 }
51 }
52
53 async fn start_background_monitor(&self, interval_sec: u64) {
55 let mut interval = interval(Duration::from_secs(interval_sec));
56 let sender = self.sender.clone();
57 let search_paths = self.search_paths.clone();
58
59 info!(
60 "Scanning for local autodiscovery events every {} seconds.",
61 interval_sec
62 );
63
64 tokio::spawn(async move {
65 let mut known_configs = HashSet::new();
66 let mut configs = HashMap::new();
67 loop {
68 interval.tick().await;
69
70 if let Err(e) = scan_and_emit_events(&search_paths, &mut known_configs, &sender, &mut configs).await {
72 warn!("Error scanning for configurations: {}", e);
73 }
74 }
75 });
76 }
77}
78
79#[derive(Debug, Deserialize)]
80struct LocalCheckConfig {
81 #[serde(default)]
82 init_config: HashMap<String, serde_yaml::Value>,
83 instances: Vec<HashMap<String, serde_yaml::Value>>,
84}
85
86async fn parse_config_file(path: &PathBuf) -> Result<(String, CheckConfig), GenericError> {
88 let content = fs::read_to_string(path).await?;
89
90 let check_config: LocalCheckConfig = match serde_yaml::from_str(&content) {
91 Ok(read) => read,
92 Err(e) => {
93 return Err(GenericError::from(e).context("Failed to decode yaml as check configuration."));
94 }
95 };
96
97 let canonicalized_path = fs::canonicalize(&path).await?;
98
99 let config_id = canonicalized_path.to_string_lossy().replace(['/', '\\'], "_");
101
102 let instances: Vec<Data> = check_config
103 .instances
104 .into_iter()
105 .map(|instance| {
106 let mut result = HashMap::new();
107 for (key, value) in instance {
108 result.insert(key.into(), value);
109 }
110 Data { value: result }
111 })
112 .collect();
113
114 let init_config = {
115 let mut result = HashMap::new();
116 for (key, value) in check_config.init_config {
117 result.insert(key.into(), value);
118 }
119 Data { value: result }
120 };
121
122 let config = Config {
124 name: MetaString::from(path.file_stem().unwrap().to_string_lossy().to_string()),
125 init_config,
126 instances,
127 metric_config: Data::default(),
128 logs_config: Data::default(),
129 ad_identifiers: Vec::new(),
130 provider: MetaString::empty(),
131 service_id: MetaString::empty(),
132 tagger_entity: MetaString::empty(),
133 cluster_check: false,
134 node_name: MetaString::empty(),
135 source: MetaString::from_static("local"),
136 ignore_autodiscovery_tags: false,
137 metrics_excluded: false,
138 logs_excluded: false,
139 advanced_ad_identifiers: Vec::new(),
140 };
141
142 let check_config = CheckConfig::from(config);
143
144 Ok((config_id, check_config))
145}
146
147async fn scan_and_emit_events(
149 paths: &[PathBuf], known_configs: &mut HashSet<String>, sender: &Sender<AutodiscoveryEvent>,
150 configs: &mut HashMap<String, CheckConfig>,
151) -> Result<(), GenericError> {
152 let mut found_configs = HashSet::new();
153
154 for path in paths {
155 let mut entries = fs::read_dir(path).await?;
156 while let Ok(Some(entry)) = entries.next_entry().await {
157 let path = entry.path();
158
159 if let Some(ext) = path.extension() {
161 if ext == "yaml" || ext == "yml" {
162 match parse_config_file(&path).await {
164 Ok((config_id, config)) => {
165 found_configs.insert(config_id.clone());
166
167 if !known_configs.contains(&config_id) {
169 debug!("New configuration found: {}", config_id);
170
171 let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() };
172 let _ = sender.send(event);
173 known_configs.insert(config_id.clone());
174 configs.insert(config_id.clone(), config);
175 } else {
176 let existing_config = configs.get(&config_id).unwrap();
178 if *existing_config != config {
179 configs.insert(config_id.clone(), config.clone());
180 debug!("Configuration updated: {}", config_id);
181 let event = AutodiscoveryEvent::CheckSchedule { config };
182 let _ = sender.send(event);
183 }
184 }
185 }
186 Err(e) => {
187 warn!("Failed to parse config file {}: {}", path.display(), e);
188 }
189 }
190 }
191 }
192 }
193 }
194
195 let to_remove: Vec<String> = known_configs
197 .iter()
198 .filter(|config_id| !found_configs.contains(*config_id))
199 .cloned()
200 .collect();
201
202 for config_id in to_remove {
203 debug!("Configuration removed: {}", config_id);
204 known_configs.remove(&config_id);
205
206 let config = configs.remove(&config_id).unwrap();
207
208 let event = AutodiscoveryEvent::CheckUnscheduled { config };
210 let _ = sender.send(event);
211 }
212
213 Ok(())
214}
215
216#[async_trait]
217impl AutodiscoveryProvider for LocalAutodiscoveryProvider {
218 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
219 self.listener_init
220 .get_or_init(|| async {
221 self.start_background_monitor(BG_MONITOR_INTERVAL).await;
222 })
223 .await;
224
225 Some(self.sender.subscribe())
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use std::path::Path;
232
233 use tempfile::tempdir;
234 use tokio::io::AsyncWriteExt;
235
236 use super::*;
237
238 fn test_data_path() -> PathBuf {
240 let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
241 PathBuf::from(manifest_dir)
242 .join("src")
243 .join("autodiscovery")
244 .join("providers")
245 .join("test_data")
246 }
247
248 async fn copy_test_file(source_name: &str, temp_dir: &Path) -> PathBuf {
250 let source_path = test_data_path().join(source_name);
251 let target_path = temp_dir.join(source_name);
252
253 let content = fs::read_to_string(&source_path)
254 .await
255 .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
256
257 let mut file = fs::File::create(&target_path).await.unwrap();
258 file.write_all(content.as_bytes()).await.unwrap();
259
260 target_path
261 }
262
263 #[tokio::test]
264 async fn test_parse_config_file() {
265 let test_file = test_data_path().join("test-config.yaml");
266
267 let (id, config) = parse_config_file(&test_file).await.unwrap();
268
269 assert!(id.contains("saluki-env_src_autodiscovery_providers_test_data_test-config.yaml"));
270 assert_eq!(config.name, "test-config");
271 assert_eq!(
272 config.init_config.value.get("service"),
273 Some(&serde_yaml::Value::String("test-service".to_string()))
274 );
275 assert_eq!(config.source, "local");
276 }
277
278 #[tokio::test]
279 async fn test_parse_minimal_config_file() {
280 let test_file = test_data_path().join("test-minimal-config.yaml");
281
282 let (_, config) = parse_config_file(&test_file).await.unwrap();
283
284 assert!(config.init_config.value.is_empty());
286 }
287
288 #[tokio::test]
289 async fn test_scan_and_emit_events_new_config() {
290 let dir = tempdir().unwrap();
291 let _test_file = copy_test_file("config1.yaml", dir.path()).await;
292
293 let mut known_configs = HashSet::new();
294 let mut configs = HashMap::new();
295 let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
296
297 scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
298 .await
299 .unwrap();
300
301 assert_eq!(known_configs.len(), 1);
302
303 let event = receiver.try_recv().unwrap();
304 assert!(matches!(event, AutodiscoveryEvent::CheckSchedule { .. }));
305
306 if let AutodiscoveryEvent::Schedule { config } = event {
307 assert_eq!(config.name, "config1.yaml");
308 assert_eq!(config.instances.len(), 1);
309 assert_eq!(
310 config.instances[0].value.get("server"),
311 Some(&serde_yaml::Value::String("localhost".to_string()))
312 );
313 assert_eq!(
314 config.instances[0].value.get("port"),
315 Some(&serde_yaml::Value::Number(8080.into()))
316 );
317 assert_eq!(
318 config.instances[0].value.get("tags"),
319 Some(&serde_yaml::Value::Sequence(vec![
320 serde_yaml::Value::String("test:true".to_string()),
321 serde_yaml::Value::String("env:test".to_string())
322 ]))
323 );
324 }
325 assert!(receiver.try_recv().is_err());
326 }
327
328 #[tokio::test]
329 async fn test_scan_and_emit_events_removed_config() {
330 let dir = tempdir().unwrap();
331
332 let mut known_configs = HashSet::new();
333 known_configs.insert("removed-config".to_string());
334 let mut configs = HashMap::new();
335 configs.insert(
336 "removed-config".to_string(),
337 CheckConfig {
338 name: MetaString::from("removed-config"),
339 init_config: Data::default(),
340 instances: Vec::new(),
341 source: MetaString::from_static("local"),
342 },
343 );
344
345 let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
346
347 scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
348 .await
349 .unwrap();
350
351 assert_eq!(known_configs.len(), 0);
352
353 let event = receiver.try_recv().unwrap();
354 assert!(matches!(event, AutodiscoveryEvent::CheckUnscheduled { config } if config.name == "removed-config"));
355
356 assert!(receiver.try_recv().is_err());
357 }
358}