saluki_env/autodiscovery/
mod.rs

1//! Autodiscovery provider.
2//!
3//! This module provides the `Autodiscovery` trait, which deals with providing information about autodiscovery.
4
5use std::collections::HashMap;
6use std::hash::Hasher;
7
8use async_trait::async_trait;
9use datadog_protos::agent::{
10    AdvancedAdIdentifier as ProtoAdvancedAdIdentifier, Config as ProtoConfig, ConfigEventType,
11    KubeNamespacedName as ProtoKubeNamespacedName,
12};
13use fnv::FnvHasher;
14use saluki_error::GenericError;
15use stringtheory::MetaString;
16use tokio::sync::broadcast::Receiver;
17use twox_hash::XxHash64;
18
19pub mod providers;
20
21/// Configuration event type
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum EventType {
24    /// Schedule a configuration
25    Schedule,
26    /// Unschedule a configuration
27    Unschedule,
28}
29
30impl From<ConfigEventType> for EventType {
31    fn from(event_type: ConfigEventType) -> Self {
32        match event_type {
33            ConfigEventType::Schedule => EventType::Schedule,
34            ConfigEventType::Unschedule => EventType::Unschedule,
35        }
36    }
37}
38
39impl From<i32> for EventType {
40    fn from(value: i32) -> Self {
41        if value == ConfigEventType::Unschedule as i32 {
42            EventType::Unschedule
43        } else {
44            // Default to Schedule for unknown values
45            EventType::Schedule
46        }
47    }
48}
49
50/// Kubernetes namespaced name
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct KubeNamespacedName {
53    /// Kubernetes resource name
54    pub name: MetaString,
55    /// Kubernetes namespace
56    pub namespace: MetaString,
57}
58
59impl From<ProtoKubeNamespacedName> for KubeNamespacedName {
60    fn from(value: ProtoKubeNamespacedName) -> Self {
61        Self {
62            name: value.name.into(),
63            namespace: value.namespace.into(),
64        }
65    }
66}
67
68/// Advanced autodiscovery identifier
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct AdvancedADIdentifier {
71    /// Kubernetes service
72    pub kube_service: Option<KubeNamespacedName>,
73    /// Kubernetes endpoints
74    pub kube_endpoints: Option<KubeNamespacedName>,
75}
76
77impl From<ProtoAdvancedAdIdentifier> for AdvancedADIdentifier {
78    fn from(value: ProtoAdvancedAdIdentifier) -> Self {
79        Self {
80            kube_service: value.kube_service.map(Into::into),
81            kube_endpoints: value
82                .kube_endpoints
83                .and_then(|endpoint| endpoint.kube_namespaced_name)
84                .map(Into::into),
85        }
86    }
87}
88
89/// Raw data trait for configuration data
90pub trait RawData {
91    /// Get the value of the data
92    fn get_value(&self) -> &HashMap<MetaString, serde_yaml::Value>;
93
94    /// Get a value from the data
95    fn get(&self, key: &str) -> Option<&serde_yaml::Value> {
96        self.get_value().get(key)
97    }
98
99    /// Convert the data to bytes
100    fn to_bytes(&self) -> Result<Vec<u8>, GenericError> {
101        let mut buffer = Vec::new();
102        serde_yaml::to_writer(&mut buffer, &self.get_value())?;
103        Ok(buffer)
104    }
105}
106
107/// Generic map of key-value pairs
108#[derive(Debug, Default, Clone, PartialEq, Eq)]
109pub struct Data {
110    value: HashMap<MetaString, serde_yaml::Value>,
111}
112
113impl RawData for Data {
114    fn get_value(&self) -> &HashMap<MetaString, serde_yaml::Value> {
115        &self.value
116    }
117}
118
119/// Configuration for a check instance
120#[derive(Debug, Default, Clone, PartialEq, Eq)]
121pub struct Instance {
122    /// Instance ID
123    id: String,
124    /// Instance value
125    value: HashMap<MetaString, serde_yaml::Value>,
126}
127
128impl Instance {
129    /// Get the instance ID
130    pub fn id(&self) -> &String {
131        &self.id
132    }
133}
134
135impl RawData for Instance {
136    fn get_value(&self) -> &HashMap<MetaString, serde_yaml::Value> {
137        &self.value
138    }
139}
140
141/// Configuration for a check
142#[derive(Debug, Clone)]
143pub struct Config {
144    /// Configuration name/identifier
145    pub name: MetaString,
146    /// Raw configuration data
147    pub init_config: Data,
148    /// Instance configurations
149    pub instances: Vec<Data>,
150    /// Metric configuration
151    pub metric_config: Data,
152    /// Logs configuration
153    pub logs_config: Data,
154    /// Auto-discovery identifiers
155    pub ad_identifiers: Vec<MetaString>,
156    /// Advanced auto-discovery identifiers
157    pub advanced_ad_identifiers: Vec<AdvancedADIdentifier>,
158    /// Provider that discovered this config
159    pub provider: MetaString,
160    /// Service ID
161    pub service_id: MetaString,
162    /// Tagger entity
163    pub tagger_entity: MetaString,
164    /// Whether this is a cluster check
165    pub cluster_check: bool,
166    /// Node name
167    pub node_name: MetaString,
168    /// Source of the configuration
169    pub source: MetaString,
170    /// Whether to ignore autodiscovery tags
171    pub ignore_autodiscovery_tags: bool,
172    /// Whether metrics are excluded
173    pub metrics_excluded: bool,
174    /// Whether logs are excluded
175    pub logs_excluded: bool,
176}
177
178/// Check configuration
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct CheckConfig {
181    /// Check name
182    pub name: MetaString,
183    /// Init configuration data
184    pub init_config: Data,
185    /// Instance configurations
186    pub instances: Vec<Instance>,
187    /// Source of the configuration
188    pub source: MetaString,
189}
190
191impl Config {
192    /// Get the digest for the config
193    pub fn digest(&self) -> u64 {
194        let mut h = XxHash64::with_seed(0);
195
196        h.write(self.name.as_bytes());
197        for i in &self.instances {
198            h.write(&i.to_bytes().unwrap_or_default());
199        }
200        h.write(&self.init_config.to_bytes().unwrap_or_default());
201        for i in &self.ad_identifiers {
202            h.write(i.as_bytes());
203        }
204        h.write(self.node_name.as_bytes());
205        h.write(&self.logs_config.to_bytes().unwrap_or_default());
206        h.write(self.service_id.as_bytes());
207        h.write(if self.ignore_autodiscovery_tags {
208            b"true"
209        } else {
210            b"false"
211        });
212
213        h.finish()
214    }
215}
216
217impl From<ProtoConfig> for Config {
218    fn from(proto: ProtoConfig) -> Self {
219        // Convert advanced AD identifiers from proto
220        let advanced_ad_identifiers = proto.advanced_ad_identifiers.into_iter().map(Into::into).collect();
221
222        let init_config = bytes_to_data(proto.init_config).unwrap_or_default();
223        let instances = proto
224            .instances
225            .into_iter()
226            .map(|instance| bytes_to_data(instance).unwrap_or_default())
227            .collect();
228
229        Self {
230            name: proto.name.into(),
231            init_config,
232            instances,
233            metric_config: bytes_to_data(proto.metric_config).unwrap_or_default(),
234            logs_config: bytes_to_data(proto.logs_config).unwrap_or_default(),
235            ad_identifiers: proto.ad_identifiers.into_iter().map(MetaString::from).collect(),
236            advanced_ad_identifiers,
237            provider: proto.provider.into(),
238            service_id: proto.service_id.into(),
239            tagger_entity: proto.tagger_entity.into(),
240            cluster_check: proto.cluster_check,
241            node_name: proto.node_name.into(),
242            source: proto.source.into(),
243            ignore_autodiscovery_tags: proto.ignore_autodiscovery_tags,
244            metrics_excluded: proto.metrics_excluded,
245            logs_excluded: proto.logs_excluded,
246        }
247    }
248}
249
250fn bytes_to_data(bytes: Vec<u8>) -> Result<Data, GenericError> {
251    let parse_bytes = String::from_utf8(bytes)?;
252
253    let map: HashMap<String, serde_yaml::Value> = serde_yaml::from_str(&parse_bytes)?;
254
255    let mut result = HashMap::<MetaString, serde_yaml::Value>::new();
256
257    for (key, value) in map {
258        result.insert(key.into(), value);
259    }
260
261    Ok(Data { value: result })
262}
263
264fn instance_id(name: &str, instance: &Data, digest: u64, init_config: &Data) -> String {
265    let mut h2 = FnvHasher::default();
266    h2.write_u64(digest);
267    h2.write(&instance.to_bytes().unwrap_or_default());
268    h2.write(&init_config.to_bytes().unwrap_or_default());
269
270    let instance_name = instance_name(instance);
271    let hash2 = h2.finish();
272
273    if !instance_name.is_empty() {
274        format!("{}:{}:{:X}", name, instance_name, hash2)
275    } else {
276        format!("{}:{:X}", name, hash2)
277    }
278}
279
280fn instance_name(instance: &Data) -> String {
281    if let Some(name) = instance.get("name") {
282        if let Some(value) = name.as_str() {
283            return value.to_string();
284        }
285    }
286    if let Some(namespace) = instance.get("namespace") {
287        if let Some(value) = namespace.as_str() {
288            return value.to_string();
289        }
290    }
291    "".to_string()
292}
293
294impl From<Config> for CheckConfig {
295    fn from(config: Config) -> Self {
296        let digest = config.digest();
297
298        CheckConfig {
299            name: config.name.clone(),
300            init_config: config.init_config.clone(),
301            instances: config
302                .instances
303                .into_iter()
304                .map(|instance_data| Instance {
305                    id: instance_id(&config.name, &instance_data, digest, &config.init_config),
306                    value: instance_data.value,
307                })
308                .collect(),
309            source: config.source,
310        }
311    }
312}
313
314impl From<ProtoConfig> for AutodiscoveryEvent {
315    fn from(proto: ProtoConfig) -> AutodiscoveryEvent {
316        let event_type = EventType::from(proto.event_type);
317
318        let config = Config::from(proto);
319
320        if !config.instances.is_empty() && !config.cluster_check {
321            let check_config = CheckConfig::from(config);
322
323            if event_type == EventType::Schedule {
324                return AutodiscoveryEvent::CheckSchedule { config: check_config };
325            } else {
326                return AutodiscoveryEvent::CheckUnscheduled { config: check_config };
327            }
328        }
329
330        if event_type == EventType::Schedule {
331            AutodiscoveryEvent::Schedule { config }
332        } else {
333            AutodiscoveryEvent::Unscheduled { config }
334        }
335    }
336}
337
338/// An autodiscovery event
339#[derive(Debug, Clone)]
340#[allow(clippy::large_enum_variant)]
341pub enum AutodiscoveryEvent {
342    /// Schedule a check configuration
343    CheckSchedule {
344        /// Configuration
345        config: CheckConfig,
346    },
347    /// Unschedule a check configuration
348    CheckUnscheduled {
349        /// Configuration
350        config: CheckConfig,
351    },
352    /// Schedule a generic configuration
353    Schedule {
354        /// Configuration
355        config: Config,
356    },
357    /// Unscheduled a generic configuration
358    Unscheduled {
359        /// Configuration
360        config: Config,
361    },
362}
363
364/// Provides autodiscovery functionality.
365///
366/// This trait is used to discover and monitor configuration files for checks.
367#[async_trait]
368pub trait AutodiscoveryProvider {
369    /// Subscribe to autodiscovery events.
370    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>>;
371}
372
373#[async_trait]
374impl<T> AutodiscoveryProvider for Option<T>
375where
376    T: AutodiscoveryProvider + Sync,
377{
378    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
379        match self.as_ref() {
380            Some(provider) => provider.subscribe().await,
381            None => None,
382        }
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use datadog_protos::agent::{AdvancedAdIdentifier, KubeNamespacedName};
389
390    use super::*;
391
392    #[test]
393    fn test_event_type_from_config_event_type() {
394        assert_eq!(EventType::from(ConfigEventType::Schedule), EventType::Schedule);
395        assert_eq!(EventType::from(ConfigEventType::Unschedule), EventType::Unschedule);
396    }
397
398    #[test]
399    fn test_event_type_from_i32() {
400        // Known values
401        assert_eq!(EventType::from(0), EventType::Schedule); // Schedule is 0
402        assert_eq!(EventType::from(1), EventType::Unschedule); // Unschedule is 1
403
404        // Unknown values should default to Schedule
405        assert_eq!(EventType::from(2), EventType::Schedule);
406        assert_eq!(EventType::from(-1), EventType::Schedule);
407    }
408
409    #[test]
410    fn test_check_config_instance_id() {
411        let proto_config = ProtoConfig {
412            name: "test-check".to_string(),
413            event_type: ConfigEventType::Schedule as i32,
414            init_config: b"key: value".to_vec(),
415            instances: vec![b"name: test".to_vec(), b"another_key: another_value".to_vec()],
416            provider: "test-provider".to_string(),
417            ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
418            cluster_check: false,
419            metric_config: b"metric_key: metric_value".to_vec(),
420            logs_config: b"log_key: log_value".to_vec(),
421            advanced_ad_identifiers: vec![],
422            service_id: "service-id".to_string(),
423            tagger_entity: "tagger-entity".to_string(),
424            node_name: "node-name".to_string(),
425            source: "source".to_string(),
426            ignore_autodiscovery_tags: false,
427            metrics_excluded: false,
428            logs_excluded: false,
429        };
430
431        let config = Config::from(proto_config);
432
433        let check_config = CheckConfig::from(config);
434
435        let id1 = &check_config.instances[0].id;
436        let id2 = &check_config.instances[1].id;
437
438        assert_ne!(id1, id2);
439
440        assert_eq!(id1, "test-check:test:369F074E36651C8");
441        assert_eq!(id2, "test-check:8C83712B7A572843");
442    }
443
444    #[test]
445    fn test_config_from_proto_config() {
446        // Create a ProtoConfig with test values
447        let mut proto_config = ProtoConfig {
448            name: "test-config".to_string(),
449            event_type: ConfigEventType::Schedule as i32,
450            init_config: b"key: value".to_vec(),
451            instances: vec![
452                b"instance_key: instance_value".to_vec(),
453                b"another_key: another_value".to_vec(),
454            ],
455            provider: "test-provider".to_string(),
456            ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
457            cluster_check: true,
458            metric_config: b"metric_key: metric_value".to_vec(),
459            logs_config: b"log_key: log_value".to_vec(),
460            advanced_ad_identifiers: vec![],
461            service_id: "service-id".to_string(),
462            tagger_entity: "tagger-entity".to_string(),
463            node_name: "node-name".to_string(),
464            source: "source".to_string(),
465            ignore_autodiscovery_tags: false,
466            metrics_excluded: false,
467            logs_excluded: false,
468        };
469
470        let kube_svc = KubeNamespacedName {
471            name: "nginx".to_string(),
472            namespace: "default".to_string(),
473        };
474
475        let adv_id = AdvancedAdIdentifier {
476            kube_service: Some(kube_svc),
477            kube_endpoints: None,
478        };
479
480        proto_config.advanced_ad_identifiers = vec![adv_id];
481
482        let config = Config::from(proto_config);
483
484        assert_eq!(config.name, "test-config");
485        assert_eq!(config.provider, "test-provider");
486        assert_eq!(
487            config.ad_identifiers,
488            vec![MetaString::from_static("id1"), MetaString::from_static("id2")]
489        );
490        assert!(config.cluster_check);
491        assert_eq!(config.service_id, "service-id");
492        assert_eq!(config.tagger_entity, "tagger-entity");
493        assert_eq!(config.node_name, "node-name");
494        assert_eq!(config.source, "source");
495        assert!(!config.ignore_autodiscovery_tags);
496        assert!(!config.metrics_excluded);
497        assert!(!config.logs_excluded);
498        assert_eq!(
499            config.init_config.get("key"),
500            Some(&serde_yaml::Value::String("value".to_string()))
501        );
502        assert_eq!(config.instances.len(), 2);
503        assert_eq!(
504            config.instances[0].get("instance_key"),
505            Some(&serde_yaml::Value::String("instance_value".to_string()))
506        );
507        assert_eq!(
508            config.instances[1].get("another_key"),
509            Some(&serde_yaml::Value::String("another_value".to_string()))
510        );
511        assert_eq!(
512            config.metric_config.get("metric_key"),
513            Some(&serde_yaml::Value::String("metric_value".to_string()))
514        );
515        assert_eq!(
516            config.logs_config.get("log_key"),
517            Some(&serde_yaml::Value::String("log_value".to_string()))
518        );
519
520        assert_eq!(config.advanced_ad_identifiers.len(), 1);
521        let adv_id = &config.advanced_ad_identifiers[0];
522        assert!(adv_id.kube_endpoints.is_none());
523        assert!(adv_id.kube_service.is_some());
524        let svc = adv_id.kube_service.as_ref().unwrap();
525        assert_eq!(svc.name, "nginx");
526        assert_eq!(svc.namespace, "default");
527    }
528
529    #[test]
530    fn test_autodiscovery_event_from_proto_config() {
531        // Create a ProtoConfig with test values
532        let mut proto_config = ProtoConfig {
533            name: "test-config".to_string(),
534            event_type: ConfigEventType::Schedule as i32,
535            init_config: b"init-data".to_vec(),
536            instances: vec![],
537            provider: "test-provider".to_string(),
538            ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
539            cluster_check: true,
540            metric_config: vec![],
541            logs_config: vec![],
542            advanced_ad_identifiers: vec![],
543            service_id: "service-id".to_string(),
544            tagger_entity: "tagger-entity".to_string(),
545            node_name: "node-name".to_string(),
546            source: "source".to_string(),
547            ignore_autodiscovery_tags: false,
548            metrics_excluded: false,
549            logs_excluded: false,
550        };
551
552        let kube_svc = KubeNamespacedName {
553            name: "nginx".to_string(),
554            namespace: "default".to_string(),
555        };
556
557        let adv_id = AdvancedAdIdentifier {
558            kube_service: Some(kube_svc),
559            kube_endpoints: None,
560        };
561
562        proto_config.advanced_ad_identifiers = vec![adv_id];
563
564        let event = AutodiscoveryEvent::from(proto_config.clone());
565
566        match event {
567            AutodiscoveryEvent::Schedule { config: _config } => {}
568            _ => panic!("Expected an Schedule event"),
569        }
570
571        proto_config.event_type = ConfigEventType::Unschedule as i32;
572
573        let event = AutodiscoveryEvent::from(proto_config.clone());
574
575        match event {
576            AutodiscoveryEvent::Unscheduled { config } => {
577                assert_eq!(config.name, "test-config");
578            }
579            _ => panic!("Expected an Unscheduled event"),
580        }
581
582        proto_config.instances = vec![b"instance1".to_vec(), b"instance2".to_vec()];
583        proto_config.cluster_check = false;
584        proto_config.event_type = ConfigEventType::Schedule as i32;
585
586        let event = AutodiscoveryEvent::from(proto_config.clone());
587
588        match event {
589            AutodiscoveryEvent::CheckSchedule { config: _config } => {}
590            _ => panic!("Expected an CheckSchedule event"),
591        }
592
593        proto_config.event_type = ConfigEventType::Unschedule as i32;
594
595        let event = AutodiscoveryEvent::from(proto_config);
596
597        match event {
598            AutodiscoveryEvent::CheckUnscheduled { config: _config } => {}
599            _ => panic!("Expected an CheckUnscheduled event"),
600        }
601    }
602}