saluki_env/autodiscovery/providers/
local.rs1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use saluki_error::GenericError;
7use serde::Deserialize;
8use stringtheory::MetaString;
9use tokio::fs;
10use tokio::sync::broadcast::{self, Receiver, Sender};
11use tokio::sync::OnceCell;
12use tokio::time::{interval, Duration};
13use tracing::{debug, info, warn};
14
15use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider, CheckConfig, Config, Data};
16
17const BG_MONITOR_INTERVAL: u64 = 30;
18
19pub struct LocalAutodiscoveryProvider {
21 search_paths: Vec<PathBuf>,
22 sender: Sender<AutodiscoveryEvent>,
23 listener_init: OnceCell<()>,
24}
25
26impl LocalAutodiscoveryProvider {
27 pub fn new<P: AsRef<Path>>(paths: Vec<P>) -> Self {
29 let search_paths: Vec<PathBuf> = paths
30 .iter()
31 .filter_map(|p| {
32 if !p.as_ref().exists() {
33 warn!("Skipping path '{}' as it does not exist", p.as_ref().display());
34 return None;
35 }
36 if !p.as_ref().is_dir() {
37 warn!("Skipping path '{}', it is not a directory.", p.as_ref().display());
38 return None;
39 }
40 Some(p.as_ref().to_path_buf())
41 })
42 .collect();
43
44 let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(super::AD_STREAM_CAPACITY);
45
46 Self {
47 search_paths,
48 sender,
49 listener_init: OnceCell::new(),
50 }
51 }
52
53 async fn start_background_monitor(&self, interval_sec: u64) {
55 let mut interval = interval(Duration::from_secs(interval_sec));
56 let sender = self.sender.clone();
57 let search_paths = self.search_paths.clone();
58
59 info!(
60 "Scanning for local autodiscovery events every {} seconds.",
61 interval_sec
62 );
63
64 tokio::spawn(async move {
65 let mut known_configs = HashSet::new();
66 let mut configs = HashMap::new();
67 loop {
68 interval.tick().await;
69
70 if let Err(e) = scan_and_emit_events(&search_paths, &mut known_configs, &sender, &mut configs).await {
72 warn!("Error scanning for configurations: {}", e);
73 }
74 }
75 });
76 }
77}
78
79#[derive(Debug, Deserialize)]
80struct LocalCheckConfig {
81 init_config: HashMap<String, serde_yaml::Value>,
82 instances: Vec<HashMap<String, serde_yaml::Value>>,
83}
84
85async fn parse_config_file(path: &PathBuf) -> Result<(String, CheckConfig), GenericError> {
87 let content = fs::read_to_string(path).await?;
88
89 let check_config: LocalCheckConfig = match serde_yaml::from_str(&content) {
90 Ok(read) => read,
91 Err(e) => {
92 return Err(GenericError::from(e).context("Failed to decode yaml as check configuration."));
93 }
94 };
95
96 let canonicalized_path = fs::canonicalize(&path).await?;
97
98 let config_id = canonicalized_path.to_string_lossy().replace(['/', '\\'], "_");
100
101 let instances: Vec<Data> = check_config
102 .instances
103 .into_iter()
104 .map(|instance| {
105 let mut result = HashMap::new();
106 for (key, value) in instance {
107 result.insert(key.into(), value);
108 }
109 Data { value: result }
110 })
111 .collect();
112
113 let init_config = {
114 let mut result = HashMap::new();
115 for (key, value) in check_config.init_config {
116 result.insert(key.into(), value);
117 }
118 Data { value: result }
119 };
120
121 let config = Config {
123 name: MetaString::from(path.file_stem().unwrap().to_string_lossy().to_string()),
124 init_config,
125 instances,
126 metric_config: Data::default(),
127 logs_config: Data::default(),
128 ad_identifiers: Vec::new(),
129 provider: MetaString::empty(),
130 service_id: MetaString::empty(),
131 tagger_entity: MetaString::empty(),
132 cluster_check: false,
133 node_name: MetaString::empty(),
134 source: MetaString::from_static("local"),
135 ignore_autodiscovery_tags: false,
136 metrics_excluded: false,
137 logs_excluded: false,
138 advanced_ad_identifiers: Vec::new(),
139 };
140
141 let check_config = CheckConfig::from(config);
142
143 Ok((config_id, check_config))
144}
145
146async fn scan_and_emit_events(
148 paths: &[PathBuf], known_configs: &mut HashSet<String>, sender: &Sender<AutodiscoveryEvent>,
149 configs: &mut HashMap<String, CheckConfig>,
150) -> Result<(), GenericError> {
151 let mut found_configs = HashSet::new();
152
153 for path in paths {
154 let mut entries = fs::read_dir(path).await?;
155 while let Ok(Some(entry)) = entries.next_entry().await {
156 let path = entry.path();
157
158 if let Some(ext) = path.extension() {
160 if ext == "yaml" || ext == "yml" {
161 match parse_config_file(&path).await {
163 Ok((config_id, config)) => {
164 found_configs.insert(config_id.clone());
165
166 if !known_configs.contains(&config_id) {
168 debug!("New configuration found: {}", config_id);
169
170 let event = AutodiscoveryEvent::CheckSchedule { config: config.clone() };
171 let _ = sender.send(event);
172 known_configs.insert(config_id.clone());
173 configs.insert(config_id.clone(), config);
174 } else {
175 let existing_config = configs.get(&config_id).unwrap();
177 if *existing_config != config {
178 configs.insert(config_id.clone(), config.clone());
179 debug!("Configuration updated: {}", config_id);
180 let event = AutodiscoveryEvent::CheckSchedule { config };
181 let _ = sender.send(event);
182 }
183 }
184 }
185 Err(e) => {
186 warn!("Failed to parse config file {}: {}", path.display(), e);
187 }
188 }
189 }
190 }
191 }
192 }
193
194 let to_remove: Vec<String> = known_configs
196 .iter()
197 .filter(|config_id| !found_configs.contains(*config_id))
198 .cloned()
199 .collect();
200
201 for config_id in to_remove {
202 debug!("Configuration removed: {}", config_id);
203 known_configs.remove(&config_id);
204
205 let config = configs.remove(&config_id).unwrap();
206
207 let event = AutodiscoveryEvent::CheckUnscheduled { config };
209 let _ = sender.send(event);
210 }
211
212 Ok(())
213}
214
215#[async_trait]
216impl AutodiscoveryProvider for LocalAutodiscoveryProvider {
217 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
218 self.listener_init
219 .get_or_init(|| async {
220 self.start_background_monitor(BG_MONITOR_INTERVAL).await;
221 })
222 .await;
223
224 Some(self.sender.subscribe())
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use std::path::Path;
231
232 use tempfile::tempdir;
233 use tokio::io::AsyncWriteExt;
234
235 use super::*;
236
237 fn test_data_path() -> PathBuf {
239 let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| ".".to_string());
240 PathBuf::from(manifest_dir)
241 .join("src")
242 .join("autodiscovery")
243 .join("providers")
244 .join("test_data")
245 }
246
247 async fn copy_test_file(source_name: &str, temp_dir: &Path) -> PathBuf {
249 let source_path = test_data_path().join(source_name);
250 let target_path = temp_dir.join(source_name);
251
252 let content = fs::read_to_string(&source_path)
253 .await
254 .unwrap_or_else(|_| panic!("Failed to read test file: {:?}", source_path));
255
256 let mut file = fs::File::create(&target_path).await.unwrap();
257 file.write_all(content.as_bytes()).await.unwrap();
258
259 target_path
260 }
261
262 #[tokio::test]
263 async fn test_parse_config_file() {
264 let test_file = test_data_path().join("test-config.yaml");
265
266 let (id, config) = parse_config_file(&test_file).await.unwrap();
267
268 assert!(id.contains("saluki-env_src_autodiscovery_providers_test_data_test-config.yaml"));
269 assert_eq!(config.name, "test-config");
270 assert_eq!(
271 config.init_config.value.get("service"),
272 Some(&serde_yaml::Value::String("test-service".to_string()))
273 );
274 assert_eq!(config.source, "local");
275 }
276
277 #[tokio::test]
278 async fn test_scan_and_emit_events_new_config() {
279 let dir = tempdir().unwrap();
280 let _test_file = copy_test_file("config1.yaml", dir.path()).await;
281
282 let mut known_configs = HashSet::new();
283 let mut configs = HashMap::new();
284 let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
285
286 scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
287 .await
288 .unwrap();
289
290 assert_eq!(known_configs.len(), 1);
291
292 let event = receiver.try_recv().unwrap();
293 assert!(matches!(event, AutodiscoveryEvent::CheckSchedule { .. }));
294
295 if let AutodiscoveryEvent::Schedule { config } = event {
296 assert_eq!(config.name, "config1.yaml");
297 assert_eq!(config.instances.len(), 1);
298 assert_eq!(
299 config.instances[0].value.get("server"),
300 Some(&serde_yaml::Value::String("localhost".to_string()))
301 );
302 assert_eq!(
303 config.instances[0].value.get("port"),
304 Some(&serde_yaml::Value::Number(8080.into()))
305 );
306 assert_eq!(
307 config.instances[0].value.get("tags"),
308 Some(&serde_yaml::Value::Sequence(vec![
309 serde_yaml::Value::String("test:true".to_string()),
310 serde_yaml::Value::String("env:test".to_string())
311 ]))
312 );
313 }
314 assert!(receiver.try_recv().is_err());
315 }
316
317 #[tokio::test]
318 async fn test_scan_and_emit_events_removed_config() {
319 let dir = tempdir().unwrap();
320
321 let mut known_configs = HashSet::new();
322 known_configs.insert("removed-config".to_string());
323 let mut configs = HashMap::new();
324 configs.insert(
325 "removed-config".to_string(),
326 CheckConfig {
327 name: MetaString::from("removed-config"),
328 init_config: Data::default(),
329 instances: Vec::new(),
330 source: MetaString::from_static("local"),
331 },
332 );
333
334 let (sender, mut receiver) = broadcast::channel::<AutodiscoveryEvent>(10);
335
336 scan_and_emit_events(&[dir.path().to_path_buf()], &mut known_configs, &sender, &mut configs)
337 .await
338 .unwrap();
339
340 assert_eq!(known_configs.len(), 0);
341
342 let event = receiver.try_recv().unwrap();
343 assert!(matches!(event, AutodiscoveryEvent::CheckUnscheduled { config } if config.name == "removed-config"));
344
345 assert!(receiver.try_recv().is_err());
346 }
347}