Skip to main content

saluki_env/autodiscovery/
mod.rs

1//! Autodiscovery provider.
2//!
3//! This module provides the `Autodiscovery` trait, which deals with providing information about autodiscovery.
4
5use std::collections::BTreeMap;
6use std::hash::Hasher;
7
8use async_trait::async_trait;
9use datadog_protos::agent::{
10    AdvancedAdIdentifier as ProtoAdvancedAdIdentifier, Config as ProtoConfig, ConfigEventType,
11    KubeNamespacedName as ProtoKubeNamespacedName,
12};
13use fnv::FnvHasher;
14use saluki_error::GenericError;
15use stringtheory::MetaString;
16use tokio::sync::mpsc::Receiver;
17use twox_hash::XxHash64;
18
19pub mod providers;
20
21/// Configuration event type
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum EventType {
24    /// Schedule a configuration
25    Schedule,
26    /// Unschedule a configuration
27    Unschedule,
28}
29
30impl From<ConfigEventType> for EventType {
31    fn from(event_type: ConfigEventType) -> Self {
32        match event_type {
33            ConfigEventType::Schedule => EventType::Schedule,
34            ConfigEventType::Unschedule => EventType::Unschedule,
35        }
36    }
37}
38
39impl From<i32> for EventType {
40    fn from(value: i32) -> Self {
41        if value == ConfigEventType::Unschedule as i32 {
42            EventType::Unschedule
43        } else {
44            // Default to Schedule for unknown values
45            EventType::Schedule
46        }
47    }
48}
49
50/// Kubernetes namespaced name
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct KubeNamespacedName {
53    /// Kubernetes resource name
54    pub name: MetaString,
55    /// Kubernetes namespace
56    pub namespace: MetaString,
57}
58
59impl From<ProtoKubeNamespacedName> for KubeNamespacedName {
60    fn from(value: ProtoKubeNamespacedName) -> Self {
61        Self {
62            name: value.name.into(),
63            namespace: value.namespace.into(),
64        }
65    }
66}
67
68/// Advanced autodiscovery identifier
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct AdvancedADIdentifier {
71    /// Kubernetes service
72    pub kube_service: Option<KubeNamespacedName>,
73    /// Kubernetes endpoints
74    pub kube_endpoints: Option<KubeNamespacedName>,
75}
76
77impl From<ProtoAdvancedAdIdentifier> for AdvancedADIdentifier {
78    fn from(value: ProtoAdvancedAdIdentifier) -> Self {
79        Self {
80            kube_service: value.kube_service.map(Into::into),
81            kube_endpoints: value
82                .kube_endpoints
83                .and_then(|endpoint| endpoint.kube_namespaced_name)
84                .map(Into::into),
85        }
86    }
87}
88
89/// Raw data trait for configuration data
90pub trait RawData {
91    /// Get the value of the data
92    fn get_value(&self) -> &BTreeMap<MetaString, serde_yaml::Value>;
93
94    /// Get a value from the data
95    fn get(&self, key: &str) -> Option<&serde_yaml::Value> {
96        self.get_value().get(key)
97    }
98
99    /// Convert the data to bytes
100    fn to_bytes(&self) -> Result<Vec<u8>, GenericError> {
101        let mut buffer = Vec::new();
102        serde_yaml::to_writer(&mut buffer, &self.get_value())?;
103        Ok(buffer)
104    }
105}
106
107/// Generic map of key-value pairs
108#[derive(Debug, Default, Clone, PartialEq, Eq)]
109pub struct Data {
110    value: BTreeMap<MetaString, serde_yaml::Value>,
111}
112
113impl RawData for Data {
114    fn get_value(&self) -> &BTreeMap<MetaString, serde_yaml::Value> {
115        &self.value
116    }
117}
118
119/// Configuration for a check instance
120#[derive(Debug, Default, Clone, PartialEq, Eq)]
121pub struct Instance {
122    /// Instance ID
123    id: String,
124    /// Instance value
125    value: BTreeMap<MetaString, serde_yaml::Value>,
126}
127
128impl Instance {
129    /// Get the instance ID
130    pub fn id(&self) -> &String {
131        &self.id
132    }
133}
134
135impl RawData for Instance {
136    fn get_value(&self) -> &BTreeMap<MetaString, serde_yaml::Value> {
137        &self.value
138    }
139}
140
141/// Configuration for a check
142#[derive(Debug, Clone)]
143pub struct Config {
144    /// Configuration name/identifier
145    pub name: MetaString,
146    /// Raw configuration data
147    pub init_config: Data,
148    /// Instance configurations
149    pub instances: Vec<Data>,
150    /// Metric configuration
151    pub metric_config: Data,
152    /// Logs configuration
153    pub logs_config: Data,
154    /// Auto-discovery identifiers
155    pub ad_identifiers: Vec<MetaString>,
156    /// Advanced auto-discovery identifiers
157    pub advanced_ad_identifiers: Vec<AdvancedADIdentifier>,
158    /// Provider that discovered this config
159    pub provider: MetaString,
160    /// Service ID
161    pub service_id: MetaString,
162    /// Tagger entity
163    pub tagger_entity: MetaString,
164    /// Whether this is a cluster check
165    pub cluster_check: bool,
166    /// Node name
167    pub node_name: MetaString,
168    /// Source of the configuration
169    pub source: MetaString,
170    /// Whether to ignore autodiscovery tags
171    pub ignore_autodiscovery_tags: bool,
172    /// Whether metrics are excluded
173    pub metrics_excluded: bool,
174    /// Whether logs are excluded
175    pub logs_excluded: bool,
176}
177
178/// Check configuration
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct CheckConfig {
181    /// Check name
182    pub name: MetaString,
183    /// Init configuration data
184    pub init_config: Data,
185    /// Instance configurations
186    pub instances: Vec<Instance>,
187    /// Source of the configuration
188    pub source: MetaString,
189}
190
191impl Config {
192    /// Get the digest for the config
193    pub fn digest(&self) -> u64 {
194        let mut h = XxHash64::with_seed(0);
195
196        h.write(self.name.as_bytes());
197        for i in &self.instances {
198            h.write(&i.to_bytes().unwrap_or_default());
199        }
200        h.write(&self.init_config.to_bytes().unwrap_or_default());
201        for i in &self.ad_identifiers {
202            h.write(i.as_bytes());
203        }
204        h.write(self.node_name.as_bytes());
205        h.write(&self.logs_config.to_bytes().unwrap_or_default());
206        h.write(self.service_id.as_bytes());
207        h.write(if self.ignore_autodiscovery_tags {
208            b"true"
209        } else {
210            b"false"
211        });
212
213        h.finish()
214    }
215}
216
217impl From<ProtoConfig> for Config {
218    fn from(proto: ProtoConfig) -> Self {
219        // Convert advanced AD identifiers from proto
220        let advanced_ad_identifiers = proto.advanced_ad_identifiers.into_iter().map(Into::into).collect();
221
222        let init_config = bytes_to_data(proto.init_config).unwrap_or_default();
223        let instances = proto
224            .instances
225            .into_iter()
226            .map(|instance| bytes_to_data(instance).unwrap_or_default())
227            .collect();
228
229        Self {
230            name: proto.name.into(),
231            init_config,
232            instances,
233            metric_config: bytes_to_data(proto.metric_config).unwrap_or_default(),
234            logs_config: bytes_to_data(proto.logs_config).unwrap_or_default(),
235            ad_identifiers: proto.ad_identifiers.into_iter().map(MetaString::from).collect(),
236            advanced_ad_identifiers,
237            provider: proto.provider.into(),
238            service_id: proto.service_id.into(),
239            tagger_entity: proto.tagger_entity.into(),
240            cluster_check: proto.cluster_check,
241            node_name: proto.node_name.into(),
242            source: proto.source.into(),
243            ignore_autodiscovery_tags: proto.ignore_autodiscovery_tags,
244            metrics_excluded: proto.metrics_excluded,
245            logs_excluded: proto.logs_excluded,
246        }
247    }
248}
249
250fn bytes_to_data(bytes: Vec<u8>) -> Result<Data, GenericError> {
251    let parse_bytes = String::from_utf8(bytes)?;
252
253    let map: BTreeMap<String, serde_yaml::Value> = serde_yaml::from_str(&parse_bytes)?;
254
255    let mut result = BTreeMap::<MetaString, serde_yaml::Value>::new();
256
257    for (key, value) in map {
258        result.insert(key.into(), value);
259    }
260
261    Ok(Data { value: result })
262}
263
264fn instance_id(name: &str, instance: &Data, digest: u64, init_config: &Data) -> String {
265    let mut h2 = FnvHasher::default();
266    h2.write_u64(digest);
267    h2.write(&instance.to_bytes().unwrap_or_default());
268    h2.write(&init_config.to_bytes().unwrap_or_default());
269
270    let instance_name = instance_name(instance);
271    let hash2 = h2.finish();
272
273    if !instance_name.is_empty() {
274        format!("{}:{}:{:X}", name, instance_name, hash2)
275    } else {
276        format!("{}:{:X}", name, hash2)
277    }
278}
279
280fn instance_name(instance: &Data) -> String {
281    if let Some(name) = instance.get("name") {
282        if let Some(value) = name.as_str() {
283            return value.to_string();
284        }
285    }
286    if let Some(namespace) = instance.get("namespace") {
287        if let Some(value) = namespace.as_str() {
288            return value.to_string();
289        }
290    }
291    "".to_string()
292}
293
294impl From<Config> for CheckConfig {
295    fn from(config: Config) -> Self {
296        let digest = config.digest();
297
298        CheckConfig {
299            name: config.name.clone(),
300            init_config: config.init_config.clone(),
301            instances: config
302                .instances
303                .into_iter()
304                .map(|instance_data| Instance {
305                    id: instance_id(&config.name, &instance_data, digest, &config.init_config),
306                    value: instance_data.value,
307                })
308                .collect(),
309            source: config.source,
310        }
311    }
312}
313
314impl From<ProtoConfig> for AutodiscoveryEvent {
315    fn from(proto: ProtoConfig) -> AutodiscoveryEvent {
316        let event_type = EventType::from(proto.event_type);
317
318        let config = Config::from(proto);
319
320        if !config.instances.is_empty() && !config.cluster_check {
321            let check_config = CheckConfig::from(config);
322
323            if event_type == EventType::Schedule {
324                return AutodiscoveryEvent::CheckSchedule { config: check_config };
325            } else {
326                return AutodiscoveryEvent::CheckUnscheduled { config: check_config };
327            }
328        }
329
330        if event_type == EventType::Schedule {
331            AutodiscoveryEvent::Schedule { config }
332        } else {
333            AutodiscoveryEvent::Unscheduled { config }
334        }
335    }
336}
337
338/// An autodiscovery event
339#[derive(Debug, Clone)]
340#[allow(clippy::large_enum_variant)]
341pub enum AutodiscoveryEvent {
342    /// Schedule a check configuration
343    CheckSchedule {
344        /// Configuration
345        config: CheckConfig,
346    },
347    /// Unschedule a check configuration
348    CheckUnscheduled {
349        /// Configuration
350        config: CheckConfig,
351    },
352    /// Schedule a generic configuration
353    Schedule {
354        /// Configuration
355        config: Config,
356    },
357    /// Unscheduled a generic configuration
358    Unscheduled {
359        /// Configuration
360        config: Config,
361    },
362}
363
364/// Provides autodiscovery functionality.
365///
366/// This trait is used to discover and monitor configuration files for checks.
367#[async_trait]
368pub trait AutodiscoveryProvider {
369    /// Subscribe to autodiscovery events.
370    ///
371    /// Returns `None` when no provider is configured.
372    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>>;
373}
374
375#[async_trait]
376impl<T> AutodiscoveryProvider for Option<T>
377where
378    T: AutodiscoveryProvider + Sync,
379{
380    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
381        match self.as_ref() {
382            Some(provider) => provider.subscribe().await,
383            None => None,
384        }
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use datadog_protos::agent::{AdvancedAdIdentifier, KubeNamespacedName};
391
392    use super::*;
393
394    #[test]
395    fn test_event_type_from_config_event_type() {
396        assert_eq!(EventType::from(ConfigEventType::Schedule), EventType::Schedule);
397        assert_eq!(EventType::from(ConfigEventType::Unschedule), EventType::Unschedule);
398    }
399
400    #[test]
401    fn test_event_type_from_i32() {
402        // Known values
403        assert_eq!(EventType::from(0), EventType::Schedule); // Schedule is 0
404        assert_eq!(EventType::from(1), EventType::Unschedule); // Unschedule is 1
405
406        // Unknown values should default to Schedule
407        assert_eq!(EventType::from(2), EventType::Schedule);
408        assert_eq!(EventType::from(-1), EventType::Schedule);
409    }
410
411    #[test]
412    fn test_check_config_instance_id() {
413        let proto_config = ProtoConfig {
414            name: "test-check".to_string(),
415            event_type: ConfigEventType::Schedule as i32,
416            init_config: b"key: value".to_vec(),
417            instances: vec![b"name: test".to_vec(), b"another_key: another_value".to_vec()],
418            provider: "test-provider".to_string(),
419            ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
420            cluster_check: false,
421            metric_config: b"metric_key: metric_value".to_vec(),
422            logs_config: b"log_key: log_value".to_vec(),
423            advanced_ad_identifiers: vec![],
424            service_id: "service-id".to_string(),
425            tagger_entity: "tagger-entity".to_string(),
426            node_name: "node-name".to_string(),
427            source: "source".to_string(),
428            ignore_autodiscovery_tags: false,
429            metrics_excluded: false,
430            logs_excluded: false,
431        };
432
433        let config = Config::from(proto_config);
434
435        let check_config = CheckConfig::from(config);
436
437        let id1 = &check_config.instances[0].id;
438        let id2 = &check_config.instances[1].id;
439
440        assert_ne!(id1, id2);
441
442        assert_eq!(id1, "test-check:test:369F074E36651C8");
443        assert_eq!(id2, "test-check:8C83712B7A572843");
444    }
445
446    #[test]
447    fn test_config_from_proto_config() {
448        // Create a ProtoConfig with test values
449        let mut proto_config = ProtoConfig {
450            name: "test-config".to_string(),
451            event_type: ConfigEventType::Schedule as i32,
452            init_config: b"key: value".to_vec(),
453            instances: vec![
454                b"instance_key: instance_value".to_vec(),
455                b"another_key: another_value".to_vec(),
456            ],
457            provider: "test-provider".to_string(),
458            ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
459            cluster_check: true,
460            metric_config: b"metric_key: metric_value".to_vec(),
461            logs_config: b"log_key: log_value".to_vec(),
462            advanced_ad_identifiers: vec![],
463            service_id: "service-id".to_string(),
464            tagger_entity: "tagger-entity".to_string(),
465            node_name: "node-name".to_string(),
466            source: "source".to_string(),
467            ignore_autodiscovery_tags: false,
468            metrics_excluded: false,
469            logs_excluded: false,
470        };
471
472        let kube_svc = KubeNamespacedName {
473            name: "nginx".to_string(),
474            namespace: "default".to_string(),
475        };
476
477        let adv_id = AdvancedAdIdentifier {
478            kube_service: Some(kube_svc),
479            kube_endpoints: None,
480        };
481
482        proto_config.advanced_ad_identifiers = vec![adv_id];
483
484        let config = Config::from(proto_config);
485
486        assert_eq!(config.name, "test-config");
487        assert_eq!(config.provider, "test-provider");
488        assert_eq!(
489            config.ad_identifiers,
490            vec![MetaString::from_static("id1"), MetaString::from_static("id2")]
491        );
492        assert!(config.cluster_check);
493        assert_eq!(config.service_id, "service-id");
494        assert_eq!(config.tagger_entity, "tagger-entity");
495        assert_eq!(config.node_name, "node-name");
496        assert_eq!(config.source, "source");
497        assert!(!config.ignore_autodiscovery_tags);
498        assert!(!config.metrics_excluded);
499        assert!(!config.logs_excluded);
500        assert_eq!(
501            config.init_config.get("key"),
502            Some(&serde_yaml::Value::String("value".to_string()))
503        );
504        assert_eq!(config.instances.len(), 2);
505        assert_eq!(
506            config.instances[0].get("instance_key"),
507            Some(&serde_yaml::Value::String("instance_value".to_string()))
508        );
509        assert_eq!(
510            config.instances[1].get("another_key"),
511            Some(&serde_yaml::Value::String("another_value".to_string()))
512        );
513        assert_eq!(
514            config.metric_config.get("metric_key"),
515            Some(&serde_yaml::Value::String("metric_value".to_string()))
516        );
517        assert_eq!(
518            config.logs_config.get("log_key"),
519            Some(&serde_yaml::Value::String("log_value".to_string()))
520        );
521
522        assert_eq!(config.advanced_ad_identifiers.len(), 1);
523        let adv_id = &config.advanced_ad_identifiers[0];
524        assert!(adv_id.kube_endpoints.is_none());
525        assert!(adv_id.kube_service.is_some());
526        let svc = adv_id.kube_service.as_ref().unwrap();
527        assert_eq!(svc.name, "nginx");
528        assert_eq!(svc.namespace, "default");
529    }
530
531    #[test]
532    fn test_autodiscovery_event_from_proto_config() {
533        // Create a ProtoConfig with test values
534        let mut proto_config = ProtoConfig {
535            name: "test-config".to_string(),
536            event_type: ConfigEventType::Schedule as i32,
537            init_config: b"init-data".to_vec(),
538            instances: vec![],
539            provider: "test-provider".to_string(),
540            ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
541            cluster_check: true,
542            metric_config: vec![],
543            logs_config: vec![],
544            advanced_ad_identifiers: vec![],
545            service_id: "service-id".to_string(),
546            tagger_entity: "tagger-entity".to_string(),
547            node_name: "node-name".to_string(),
548            source: "source".to_string(),
549            ignore_autodiscovery_tags: false,
550            metrics_excluded: false,
551            logs_excluded: false,
552        };
553
554        let kube_svc = KubeNamespacedName {
555            name: "nginx".to_string(),
556            namespace: "default".to_string(),
557        };
558
559        let adv_id = AdvancedAdIdentifier {
560            kube_service: Some(kube_svc),
561            kube_endpoints: None,
562        };
563
564        proto_config.advanced_ad_identifiers = vec![adv_id];
565
566        let event = AutodiscoveryEvent::from(proto_config.clone());
567
568        match event {
569            AutodiscoveryEvent::Schedule { config: _config } => {}
570            _ => panic!("Expected an Schedule event"),
571        }
572
573        proto_config.event_type = ConfigEventType::Unschedule as i32;
574
575        let event = AutodiscoveryEvent::from(proto_config.clone());
576
577        match event {
578            AutodiscoveryEvent::Unscheduled { config } => {
579                assert_eq!(config.name, "test-config");
580            }
581            _ => panic!("Expected an Unscheduled event"),
582        }
583
584        proto_config.instances = vec![b"instance1".to_vec(), b"instance2".to_vec()];
585        proto_config.cluster_check = false;
586        proto_config.event_type = ConfigEventType::Schedule as i32;
587
588        let event = AutodiscoveryEvent::from(proto_config.clone());
589
590        match event {
591            AutodiscoveryEvent::CheckSchedule { config: _config } => {}
592            _ => panic!("Expected an CheckSchedule event"),
593        }
594
595        proto_config.event_type = ConfigEventType::Unschedule as i32;
596
597        let event = AutodiscoveryEvent::from(proto_config);
598
599        match event {
600            AutodiscoveryEvent::CheckUnscheduled { config: _config } => {}
601            _ => panic!("Expected an CheckUnscheduled event"),
602        }
603    }
604}