1use std::collections::BTreeMap;
6use std::hash::Hasher;
7
8use async_trait::async_trait;
9use datadog_protos::agent::{
10 AdvancedAdIdentifier as ProtoAdvancedAdIdentifier, Config as ProtoConfig, ConfigEventType,
11 KubeNamespacedName as ProtoKubeNamespacedName,
12};
13use fnv::FnvHasher;
14use saluki_error::GenericError;
15use stringtheory::MetaString;
16use tokio::sync::mpsc::Receiver;
17use twox_hash::XxHash64;
18
19pub mod providers;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum EventType {
24 Schedule,
26 Unschedule,
28}
29
30impl From<ConfigEventType> for EventType {
31 fn from(event_type: ConfigEventType) -> Self {
32 match event_type {
33 ConfigEventType::Schedule => EventType::Schedule,
34 ConfigEventType::Unschedule => EventType::Unschedule,
35 }
36 }
37}
38
39impl From<i32> for EventType {
40 fn from(value: i32) -> Self {
41 if value == ConfigEventType::Unschedule as i32 {
42 EventType::Unschedule
43 } else {
44 EventType::Schedule
46 }
47 }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct KubeNamespacedName {
53 pub name: MetaString,
55 pub namespace: MetaString,
57}
58
59impl From<ProtoKubeNamespacedName> for KubeNamespacedName {
60 fn from(value: ProtoKubeNamespacedName) -> Self {
61 Self {
62 name: value.name.into(),
63 namespace: value.namespace.into(),
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct AdvancedADIdentifier {
71 pub kube_service: Option<KubeNamespacedName>,
73 pub kube_endpoints: Option<KubeNamespacedName>,
75}
76
77impl From<ProtoAdvancedAdIdentifier> for AdvancedADIdentifier {
78 fn from(value: ProtoAdvancedAdIdentifier) -> Self {
79 Self {
80 kube_service: value.kube_service.map(Into::into),
81 kube_endpoints: value
82 .kube_endpoints
83 .and_then(|endpoint| endpoint.kube_namespaced_name)
84 .map(Into::into),
85 }
86 }
87}
88
89pub trait RawData {
91 fn get_value(&self) -> &BTreeMap<MetaString, serde_yaml::Value>;
93
94 fn get(&self, key: &str) -> Option<&serde_yaml::Value> {
96 self.get_value().get(key)
97 }
98
99 fn to_bytes(&self) -> Result<Vec<u8>, GenericError> {
101 let mut buffer = Vec::new();
102 serde_yaml::to_writer(&mut buffer, &self.get_value())?;
103 Ok(buffer)
104 }
105}
106
107#[derive(Debug, Default, Clone, PartialEq, Eq)]
109pub struct Data {
110 value: BTreeMap<MetaString, serde_yaml::Value>,
111}
112
113impl RawData for Data {
114 fn get_value(&self) -> &BTreeMap<MetaString, serde_yaml::Value> {
115 &self.value
116 }
117}
118
119#[derive(Debug, Default, Clone, PartialEq, Eq)]
121pub struct Instance {
122 id: String,
124 value: BTreeMap<MetaString, serde_yaml::Value>,
126}
127
128impl Instance {
129 pub fn id(&self) -> &String {
131 &self.id
132 }
133}
134
135impl RawData for Instance {
136 fn get_value(&self) -> &BTreeMap<MetaString, serde_yaml::Value> {
137 &self.value
138 }
139}
140
141#[derive(Debug, Clone)]
143pub struct Config {
144 pub name: MetaString,
146 pub init_config: Data,
148 pub instances: Vec<Data>,
150 pub metric_config: Data,
152 pub logs_config: Data,
154 pub ad_identifiers: Vec<MetaString>,
156 pub advanced_ad_identifiers: Vec<AdvancedADIdentifier>,
158 pub provider: MetaString,
160 pub service_id: MetaString,
162 pub tagger_entity: MetaString,
164 pub cluster_check: bool,
166 pub node_name: MetaString,
168 pub source: MetaString,
170 pub ignore_autodiscovery_tags: bool,
172 pub metrics_excluded: bool,
174 pub logs_excluded: bool,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct CheckConfig {
181 pub name: MetaString,
183 pub init_config: Data,
185 pub instances: Vec<Instance>,
187 pub source: MetaString,
189}
190
191impl Config {
192 pub fn digest(&self) -> u64 {
194 let mut h = XxHash64::with_seed(0);
195
196 h.write(self.name.as_bytes());
197 for i in &self.instances {
198 h.write(&i.to_bytes().unwrap_or_default());
199 }
200 h.write(&self.init_config.to_bytes().unwrap_or_default());
201 for i in &self.ad_identifiers {
202 h.write(i.as_bytes());
203 }
204 h.write(self.node_name.as_bytes());
205 h.write(&self.logs_config.to_bytes().unwrap_or_default());
206 h.write(self.service_id.as_bytes());
207 h.write(if self.ignore_autodiscovery_tags {
208 b"true"
209 } else {
210 b"false"
211 });
212
213 h.finish()
214 }
215}
216
217impl From<ProtoConfig> for Config {
218 fn from(proto: ProtoConfig) -> Self {
219 let advanced_ad_identifiers = proto.advanced_ad_identifiers.into_iter().map(Into::into).collect();
221
222 let init_config = bytes_to_data(proto.init_config).unwrap_or_default();
223 let instances = proto
224 .instances
225 .into_iter()
226 .map(|instance| bytes_to_data(instance).unwrap_or_default())
227 .collect();
228
229 Self {
230 name: proto.name.into(),
231 init_config,
232 instances,
233 metric_config: bytes_to_data(proto.metric_config).unwrap_or_default(),
234 logs_config: bytes_to_data(proto.logs_config).unwrap_or_default(),
235 ad_identifiers: proto.ad_identifiers.into_iter().map(MetaString::from).collect(),
236 advanced_ad_identifiers,
237 provider: proto.provider.into(),
238 service_id: proto.service_id.into(),
239 tagger_entity: proto.tagger_entity.into(),
240 cluster_check: proto.cluster_check,
241 node_name: proto.node_name.into(),
242 source: proto.source.into(),
243 ignore_autodiscovery_tags: proto.ignore_autodiscovery_tags,
244 metrics_excluded: proto.metrics_excluded,
245 logs_excluded: proto.logs_excluded,
246 }
247 }
248}
249
250fn bytes_to_data(bytes: Vec<u8>) -> Result<Data, GenericError> {
251 let parse_bytes = String::from_utf8(bytes)?;
252
253 let map: BTreeMap<String, serde_yaml::Value> = serde_yaml::from_str(&parse_bytes)?;
254
255 let mut result = BTreeMap::<MetaString, serde_yaml::Value>::new();
256
257 for (key, value) in map {
258 result.insert(key.into(), value);
259 }
260
261 Ok(Data { value: result })
262}
263
264fn instance_id(name: &str, instance: &Data, digest: u64, init_config: &Data) -> String {
265 let mut h2 = FnvHasher::default();
266 h2.write_u64(digest);
267 h2.write(&instance.to_bytes().unwrap_or_default());
268 h2.write(&init_config.to_bytes().unwrap_or_default());
269
270 let instance_name = instance_name(instance);
271 let hash2 = h2.finish();
272
273 if !instance_name.is_empty() {
274 format!("{}:{}:{:X}", name, instance_name, hash2)
275 } else {
276 format!("{}:{:X}", name, hash2)
277 }
278}
279
280fn instance_name(instance: &Data) -> String {
281 if let Some(name) = instance.get("name") {
282 if let Some(value) = name.as_str() {
283 return value.to_string();
284 }
285 }
286 if let Some(namespace) = instance.get("namespace") {
287 if let Some(value) = namespace.as_str() {
288 return value.to_string();
289 }
290 }
291 "".to_string()
292}
293
294impl From<Config> for CheckConfig {
295 fn from(config: Config) -> Self {
296 let digest = config.digest();
297
298 CheckConfig {
299 name: config.name.clone(),
300 init_config: config.init_config.clone(),
301 instances: config
302 .instances
303 .into_iter()
304 .map(|instance_data| Instance {
305 id: instance_id(&config.name, &instance_data, digest, &config.init_config),
306 value: instance_data.value,
307 })
308 .collect(),
309 source: config.source,
310 }
311 }
312}
313
314impl From<ProtoConfig> for AutodiscoveryEvent {
315 fn from(proto: ProtoConfig) -> AutodiscoveryEvent {
316 let event_type = EventType::from(proto.event_type);
317
318 let config = Config::from(proto);
319
320 if !config.instances.is_empty() && !config.cluster_check {
321 let check_config = CheckConfig::from(config);
322
323 if event_type == EventType::Schedule {
324 return AutodiscoveryEvent::CheckSchedule { config: check_config };
325 } else {
326 return AutodiscoveryEvent::CheckUnscheduled { config: check_config };
327 }
328 }
329
330 if event_type == EventType::Schedule {
331 AutodiscoveryEvent::Schedule { config }
332 } else {
333 AutodiscoveryEvent::Unscheduled { config }
334 }
335 }
336}
337
338#[derive(Debug, Clone)]
340#[allow(clippy::large_enum_variant)]
341pub enum AutodiscoveryEvent {
342 CheckSchedule {
344 config: CheckConfig,
346 },
347 CheckUnscheduled {
349 config: CheckConfig,
351 },
352 Schedule {
354 config: Config,
356 },
357 Unscheduled {
359 config: Config,
361 },
362}
363
364#[async_trait]
368pub trait AutodiscoveryProvider {
369 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>>;
373}
374
375#[async_trait]
376impl<T> AutodiscoveryProvider for Option<T>
377where
378 T: AutodiscoveryProvider + Sync,
379{
380 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
381 match self.as_ref() {
382 Some(provider) => provider.subscribe().await,
383 None => None,
384 }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use datadog_protos::agent::{AdvancedAdIdentifier, KubeNamespacedName};
391
392 use super::*;
393
394 #[test]
395 fn test_event_type_from_config_event_type() {
396 assert_eq!(EventType::from(ConfigEventType::Schedule), EventType::Schedule);
397 assert_eq!(EventType::from(ConfigEventType::Unschedule), EventType::Unschedule);
398 }
399
400 #[test]
401 fn test_event_type_from_i32() {
402 assert_eq!(EventType::from(0), EventType::Schedule); assert_eq!(EventType::from(1), EventType::Unschedule); assert_eq!(EventType::from(2), EventType::Schedule);
408 assert_eq!(EventType::from(-1), EventType::Schedule);
409 }
410
411 #[test]
412 fn test_check_config_instance_id() {
413 let proto_config = ProtoConfig {
414 name: "test-check".to_string(),
415 event_type: ConfigEventType::Schedule as i32,
416 init_config: b"key: value".to_vec(),
417 instances: vec![b"name: test".to_vec(), b"another_key: another_value".to_vec()],
418 provider: "test-provider".to_string(),
419 ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
420 cluster_check: false,
421 metric_config: b"metric_key: metric_value".to_vec(),
422 logs_config: b"log_key: log_value".to_vec(),
423 advanced_ad_identifiers: vec![],
424 service_id: "service-id".to_string(),
425 tagger_entity: "tagger-entity".to_string(),
426 node_name: "node-name".to_string(),
427 source: "source".to_string(),
428 ignore_autodiscovery_tags: false,
429 metrics_excluded: false,
430 logs_excluded: false,
431 };
432
433 let config = Config::from(proto_config);
434
435 let check_config = CheckConfig::from(config);
436
437 let id1 = &check_config.instances[0].id;
438 let id2 = &check_config.instances[1].id;
439
440 assert_ne!(id1, id2);
441
442 assert_eq!(id1, "test-check:test:369F074E36651C8");
443 assert_eq!(id2, "test-check:8C83712B7A572843");
444 }
445
446 #[test]
447 fn test_config_from_proto_config() {
448 let mut proto_config = ProtoConfig {
450 name: "test-config".to_string(),
451 event_type: ConfigEventType::Schedule as i32,
452 init_config: b"key: value".to_vec(),
453 instances: vec![
454 b"instance_key: instance_value".to_vec(),
455 b"another_key: another_value".to_vec(),
456 ],
457 provider: "test-provider".to_string(),
458 ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
459 cluster_check: true,
460 metric_config: b"metric_key: metric_value".to_vec(),
461 logs_config: b"log_key: log_value".to_vec(),
462 advanced_ad_identifiers: vec![],
463 service_id: "service-id".to_string(),
464 tagger_entity: "tagger-entity".to_string(),
465 node_name: "node-name".to_string(),
466 source: "source".to_string(),
467 ignore_autodiscovery_tags: false,
468 metrics_excluded: false,
469 logs_excluded: false,
470 };
471
472 let kube_svc = KubeNamespacedName {
473 name: "nginx".to_string(),
474 namespace: "default".to_string(),
475 };
476
477 let adv_id = AdvancedAdIdentifier {
478 kube_service: Some(kube_svc),
479 kube_endpoints: None,
480 };
481
482 proto_config.advanced_ad_identifiers = vec![adv_id];
483
484 let config = Config::from(proto_config);
485
486 assert_eq!(config.name, "test-config");
487 assert_eq!(config.provider, "test-provider");
488 assert_eq!(
489 config.ad_identifiers,
490 vec![MetaString::from_static("id1"), MetaString::from_static("id2")]
491 );
492 assert!(config.cluster_check);
493 assert_eq!(config.service_id, "service-id");
494 assert_eq!(config.tagger_entity, "tagger-entity");
495 assert_eq!(config.node_name, "node-name");
496 assert_eq!(config.source, "source");
497 assert!(!config.ignore_autodiscovery_tags);
498 assert!(!config.metrics_excluded);
499 assert!(!config.logs_excluded);
500 assert_eq!(
501 config.init_config.get("key"),
502 Some(&serde_yaml::Value::String("value".to_string()))
503 );
504 assert_eq!(config.instances.len(), 2);
505 assert_eq!(
506 config.instances[0].get("instance_key"),
507 Some(&serde_yaml::Value::String("instance_value".to_string()))
508 );
509 assert_eq!(
510 config.instances[1].get("another_key"),
511 Some(&serde_yaml::Value::String("another_value".to_string()))
512 );
513 assert_eq!(
514 config.metric_config.get("metric_key"),
515 Some(&serde_yaml::Value::String("metric_value".to_string()))
516 );
517 assert_eq!(
518 config.logs_config.get("log_key"),
519 Some(&serde_yaml::Value::String("log_value".to_string()))
520 );
521
522 assert_eq!(config.advanced_ad_identifiers.len(), 1);
523 let adv_id = &config.advanced_ad_identifiers[0];
524 assert!(adv_id.kube_endpoints.is_none());
525 assert!(adv_id.kube_service.is_some());
526 let svc = adv_id.kube_service.as_ref().unwrap();
527 assert_eq!(svc.name, "nginx");
528 assert_eq!(svc.namespace, "default");
529 }
530
531 #[test]
532 fn test_autodiscovery_event_from_proto_config() {
533 let mut proto_config = ProtoConfig {
535 name: "test-config".to_string(),
536 event_type: ConfigEventType::Schedule as i32,
537 init_config: b"init-data".to_vec(),
538 instances: vec![],
539 provider: "test-provider".to_string(),
540 ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
541 cluster_check: true,
542 metric_config: vec![],
543 logs_config: vec![],
544 advanced_ad_identifiers: vec![],
545 service_id: "service-id".to_string(),
546 tagger_entity: "tagger-entity".to_string(),
547 node_name: "node-name".to_string(),
548 source: "source".to_string(),
549 ignore_autodiscovery_tags: false,
550 metrics_excluded: false,
551 logs_excluded: false,
552 };
553
554 let kube_svc = KubeNamespacedName {
555 name: "nginx".to_string(),
556 namespace: "default".to_string(),
557 };
558
559 let adv_id = AdvancedAdIdentifier {
560 kube_service: Some(kube_svc),
561 kube_endpoints: None,
562 };
563
564 proto_config.advanced_ad_identifiers = vec![adv_id];
565
566 let event = AutodiscoveryEvent::from(proto_config.clone());
567
568 match event {
569 AutodiscoveryEvent::Schedule { config: _config } => {}
570 _ => panic!("Expected an Schedule event"),
571 }
572
573 proto_config.event_type = ConfigEventType::Unschedule as i32;
574
575 let event = AutodiscoveryEvent::from(proto_config.clone());
576
577 match event {
578 AutodiscoveryEvent::Unscheduled { config } => {
579 assert_eq!(config.name, "test-config");
580 }
581 _ => panic!("Expected an Unscheduled event"),
582 }
583
584 proto_config.instances = vec![b"instance1".to_vec(), b"instance2".to_vec()];
585 proto_config.cluster_check = false;
586 proto_config.event_type = ConfigEventType::Schedule as i32;
587
588 let event = AutodiscoveryEvent::from(proto_config.clone());
589
590 match event {
591 AutodiscoveryEvent::CheckSchedule { config: _config } => {}
592 _ => panic!("Expected an CheckSchedule event"),
593 }
594
595 proto_config.event_type = ConfigEventType::Unschedule as i32;
596
597 let event = AutodiscoveryEvent::from(proto_config);
598
599 match event {
600 AutodiscoveryEvent::CheckUnscheduled { config: _config } => {}
601 _ => panic!("Expected an CheckUnscheduled event"),
602 }
603 }
604}