1use std::collections::HashMap;
6use std::hash::Hasher;
7
8use async_trait::async_trait;
9use datadog_protos::agent::{
10 AdvancedAdIdentifier as ProtoAdvancedAdIdentifier, Config as ProtoConfig, ConfigEventType,
11 KubeNamespacedName as ProtoKubeNamespacedName,
12};
13use fnv::FnvHasher;
14use saluki_error::GenericError;
15use stringtheory::MetaString;
16use tokio::sync::broadcast::Receiver;
17use twox_hash::XxHash64;
18
19pub mod providers;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum EventType {
24 Schedule,
26 Unschedule,
28}
29
30impl From<ConfigEventType> for EventType {
31 fn from(event_type: ConfigEventType) -> Self {
32 match event_type {
33 ConfigEventType::Schedule => EventType::Schedule,
34 ConfigEventType::Unschedule => EventType::Unschedule,
35 }
36 }
37}
38
39impl From<i32> for EventType {
40 fn from(value: i32) -> Self {
41 if value == ConfigEventType::Unschedule as i32 {
42 EventType::Unschedule
43 } else {
44 EventType::Schedule
46 }
47 }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct KubeNamespacedName {
53 pub name: MetaString,
55 pub namespace: MetaString,
57}
58
59impl From<ProtoKubeNamespacedName> for KubeNamespacedName {
60 fn from(value: ProtoKubeNamespacedName) -> Self {
61 Self {
62 name: value.name.into(),
63 namespace: value.namespace.into(),
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct AdvancedADIdentifier {
71 pub kube_service: Option<KubeNamespacedName>,
73 pub kube_endpoints: Option<KubeNamespacedName>,
75}
76
77impl From<ProtoAdvancedAdIdentifier> for AdvancedADIdentifier {
78 fn from(value: ProtoAdvancedAdIdentifier) -> Self {
79 Self {
80 kube_service: value.kube_service.map(Into::into),
81 kube_endpoints: value
82 .kube_endpoints
83 .and_then(|endpoint| endpoint.kube_namespaced_name)
84 .map(Into::into),
85 }
86 }
87}
88
89pub trait RawData {
91 fn get_value(&self) -> &HashMap<MetaString, serde_yaml::Value>;
93
94 fn get(&self, key: &str) -> Option<&serde_yaml::Value> {
96 self.get_value().get(key)
97 }
98
99 fn to_bytes(&self) -> Result<Vec<u8>, GenericError> {
101 let mut buffer = Vec::new();
102 serde_yaml::to_writer(&mut buffer, &self.get_value())?;
103 Ok(buffer)
104 }
105}
106
107#[derive(Debug, Default, Clone, PartialEq, Eq)]
109pub struct Data {
110 value: HashMap<MetaString, serde_yaml::Value>,
111}
112
113impl RawData for Data {
114 fn get_value(&self) -> &HashMap<MetaString, serde_yaml::Value> {
115 &self.value
116 }
117}
118
119#[derive(Debug, Default, Clone, PartialEq, Eq)]
121pub struct Instance {
122 id: String,
124 value: HashMap<MetaString, serde_yaml::Value>,
126}
127
128impl Instance {
129 pub fn id(&self) -> &String {
131 &self.id
132 }
133}
134
135impl RawData for Instance {
136 fn get_value(&self) -> &HashMap<MetaString, serde_yaml::Value> {
137 &self.value
138 }
139}
140
141#[derive(Debug, Clone)]
143pub struct Config {
144 pub name: MetaString,
146 pub init_config: Data,
148 pub instances: Vec<Data>,
150 pub metric_config: Data,
152 pub logs_config: Data,
154 pub ad_identifiers: Vec<MetaString>,
156 pub advanced_ad_identifiers: Vec<AdvancedADIdentifier>,
158 pub provider: MetaString,
160 pub service_id: MetaString,
162 pub tagger_entity: MetaString,
164 pub cluster_check: bool,
166 pub node_name: MetaString,
168 pub source: MetaString,
170 pub ignore_autodiscovery_tags: bool,
172 pub metrics_excluded: bool,
174 pub logs_excluded: bool,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct CheckConfig {
181 pub name: MetaString,
183 pub init_config: Data,
185 pub instances: Vec<Instance>,
187 pub source: MetaString,
189}
190
191impl Config {
192 pub fn digest(&self) -> u64 {
194 let mut h = XxHash64::with_seed(0);
195
196 h.write(self.name.as_bytes());
197 for i in &self.instances {
198 h.write(&i.to_bytes().unwrap_or_default());
199 }
200 h.write(&self.init_config.to_bytes().unwrap_or_default());
201 for i in &self.ad_identifiers {
202 h.write(i.as_bytes());
203 }
204 h.write(self.node_name.as_bytes());
205 h.write(&self.logs_config.to_bytes().unwrap_or_default());
206 h.write(self.service_id.as_bytes());
207 h.write(if self.ignore_autodiscovery_tags {
208 b"true"
209 } else {
210 b"false"
211 });
212
213 h.finish()
214 }
215}
216
217impl From<ProtoConfig> for Config {
218 fn from(proto: ProtoConfig) -> Self {
219 let advanced_ad_identifiers = proto.advanced_ad_identifiers.into_iter().map(Into::into).collect();
221
222 let init_config = bytes_to_data(proto.init_config).unwrap_or_default();
223 let instances = proto
224 .instances
225 .into_iter()
226 .map(|instance| bytes_to_data(instance).unwrap_or_default())
227 .collect();
228
229 Self {
230 name: proto.name.into(),
231 init_config,
232 instances,
233 metric_config: bytes_to_data(proto.metric_config).unwrap_or_default(),
234 logs_config: bytes_to_data(proto.logs_config).unwrap_or_default(),
235 ad_identifiers: proto.ad_identifiers.into_iter().map(MetaString::from).collect(),
236 advanced_ad_identifiers,
237 provider: proto.provider.into(),
238 service_id: proto.service_id.into(),
239 tagger_entity: proto.tagger_entity.into(),
240 cluster_check: proto.cluster_check,
241 node_name: proto.node_name.into(),
242 source: proto.source.into(),
243 ignore_autodiscovery_tags: proto.ignore_autodiscovery_tags,
244 metrics_excluded: proto.metrics_excluded,
245 logs_excluded: proto.logs_excluded,
246 }
247 }
248}
249
250fn bytes_to_data(bytes: Vec<u8>) -> Result<Data, GenericError> {
251 let parse_bytes = String::from_utf8(bytes)?;
252
253 let map: HashMap<String, serde_yaml::Value> = serde_yaml::from_str(&parse_bytes)?;
254
255 let mut result = HashMap::<MetaString, serde_yaml::Value>::new();
256
257 for (key, value) in map {
258 result.insert(key.into(), value);
259 }
260
261 Ok(Data { value: result })
262}
263
264fn instance_id(name: &str, instance: &Data, digest: u64, init_config: &Data) -> String {
265 let mut h2 = FnvHasher::default();
266 h2.write_u64(digest);
267 h2.write(&instance.to_bytes().unwrap_or_default());
268 h2.write(&init_config.to_bytes().unwrap_or_default());
269
270 let instance_name = instance_name(instance);
271 let hash2 = h2.finish();
272
273 if !instance_name.is_empty() {
274 format!("{}:{}:{:X}", name, instance_name, hash2)
275 } else {
276 format!("{}:{:X}", name, hash2)
277 }
278}
279
280fn instance_name(instance: &Data) -> String {
281 if let Some(name) = instance.get("name") {
282 if let Some(value) = name.as_str() {
283 return value.to_string();
284 }
285 }
286 if let Some(namespace) = instance.get("namespace") {
287 if let Some(value) = namespace.as_str() {
288 return value.to_string();
289 }
290 }
291 "".to_string()
292}
293
294impl From<Config> for CheckConfig {
295 fn from(config: Config) -> Self {
296 let digest = config.digest();
297
298 CheckConfig {
299 name: config.name.clone(),
300 init_config: config.init_config.clone(),
301 instances: config
302 .instances
303 .into_iter()
304 .map(|instance_data| Instance {
305 id: instance_id(&config.name, &instance_data, digest, &config.init_config),
306 value: instance_data.value,
307 })
308 .collect(),
309 source: config.source,
310 }
311 }
312}
313
314impl From<ProtoConfig> for AutodiscoveryEvent {
315 fn from(proto: ProtoConfig) -> AutodiscoveryEvent {
316 let event_type = EventType::from(proto.event_type);
317
318 let config = Config::from(proto);
319
320 if !config.instances.is_empty() && !config.cluster_check {
321 let check_config = CheckConfig::from(config);
322
323 if event_type == EventType::Schedule {
324 return AutodiscoveryEvent::CheckSchedule { config: check_config };
325 } else {
326 return AutodiscoveryEvent::CheckUnscheduled { config: check_config };
327 }
328 }
329
330 if event_type == EventType::Schedule {
331 AutodiscoveryEvent::Schedule { config }
332 } else {
333 AutodiscoveryEvent::Unscheduled { config }
334 }
335 }
336}
337
338#[derive(Debug, Clone)]
340#[allow(clippy::large_enum_variant)]
341pub enum AutodiscoveryEvent {
342 CheckSchedule {
344 config: CheckConfig,
346 },
347 CheckUnscheduled {
349 config: CheckConfig,
351 },
352 Schedule {
354 config: Config,
356 },
357 Unscheduled {
359 config: Config,
361 },
362}
363
364#[async_trait]
368pub trait AutodiscoveryProvider {
369 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>>;
371}
372
373#[async_trait]
374impl<T> AutodiscoveryProvider for Option<T>
375where
376 T: AutodiscoveryProvider + Sync,
377{
378 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
379 match self.as_ref() {
380 Some(provider) => provider.subscribe().await,
381 None => None,
382 }
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use datadog_protos::agent::{AdvancedAdIdentifier, KubeNamespacedName};
389
390 use super::*;
391
392 #[test]
393 fn test_event_type_from_config_event_type() {
394 assert_eq!(EventType::from(ConfigEventType::Schedule), EventType::Schedule);
395 assert_eq!(EventType::from(ConfigEventType::Unschedule), EventType::Unschedule);
396 }
397
398 #[test]
399 fn test_event_type_from_i32() {
400 assert_eq!(EventType::from(0), EventType::Schedule); assert_eq!(EventType::from(1), EventType::Unschedule); assert_eq!(EventType::from(2), EventType::Schedule);
406 assert_eq!(EventType::from(-1), EventType::Schedule);
407 }
408
409 #[test]
410 fn test_check_config_instance_id() {
411 let proto_config = ProtoConfig {
412 name: "test-check".to_string(),
413 event_type: ConfigEventType::Schedule as i32,
414 init_config: b"key: value".to_vec(),
415 instances: vec![b"name: test".to_vec(), b"another_key: another_value".to_vec()],
416 provider: "test-provider".to_string(),
417 ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
418 cluster_check: false,
419 metric_config: b"metric_key: metric_value".to_vec(),
420 logs_config: b"log_key: log_value".to_vec(),
421 advanced_ad_identifiers: vec![],
422 service_id: "service-id".to_string(),
423 tagger_entity: "tagger-entity".to_string(),
424 node_name: "node-name".to_string(),
425 source: "source".to_string(),
426 ignore_autodiscovery_tags: false,
427 metrics_excluded: false,
428 logs_excluded: false,
429 };
430
431 let config = Config::from(proto_config);
432
433 let check_config = CheckConfig::from(config);
434
435 let id1 = &check_config.instances[0].id;
436 let id2 = &check_config.instances[1].id;
437
438 assert_ne!(id1, id2);
439
440 assert_eq!(id1, "test-check:test:369F074E36651C8");
441 assert_eq!(id2, "test-check:8C83712B7A572843");
442 }
443
444 #[test]
445 fn test_config_from_proto_config() {
446 let mut proto_config = ProtoConfig {
448 name: "test-config".to_string(),
449 event_type: ConfigEventType::Schedule as i32,
450 init_config: b"key: value".to_vec(),
451 instances: vec![
452 b"instance_key: instance_value".to_vec(),
453 b"another_key: another_value".to_vec(),
454 ],
455 provider: "test-provider".to_string(),
456 ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
457 cluster_check: true,
458 metric_config: b"metric_key: metric_value".to_vec(),
459 logs_config: b"log_key: log_value".to_vec(),
460 advanced_ad_identifiers: vec![],
461 service_id: "service-id".to_string(),
462 tagger_entity: "tagger-entity".to_string(),
463 node_name: "node-name".to_string(),
464 source: "source".to_string(),
465 ignore_autodiscovery_tags: false,
466 metrics_excluded: false,
467 logs_excluded: false,
468 };
469
470 let kube_svc = KubeNamespacedName {
471 name: "nginx".to_string(),
472 namespace: "default".to_string(),
473 };
474
475 let adv_id = AdvancedAdIdentifier {
476 kube_service: Some(kube_svc),
477 kube_endpoints: None,
478 };
479
480 proto_config.advanced_ad_identifiers = vec![adv_id];
481
482 let config = Config::from(proto_config);
483
484 assert_eq!(config.name, "test-config");
485 assert_eq!(config.provider, "test-provider");
486 assert_eq!(
487 config.ad_identifiers,
488 vec![MetaString::from_static("id1"), MetaString::from_static("id2")]
489 );
490 assert!(config.cluster_check);
491 assert_eq!(config.service_id, "service-id");
492 assert_eq!(config.tagger_entity, "tagger-entity");
493 assert_eq!(config.node_name, "node-name");
494 assert_eq!(config.source, "source");
495 assert!(!config.ignore_autodiscovery_tags);
496 assert!(!config.metrics_excluded);
497 assert!(!config.logs_excluded);
498 assert_eq!(
499 config.init_config.get("key"),
500 Some(&serde_yaml::Value::String("value".to_string()))
501 );
502 assert_eq!(config.instances.len(), 2);
503 assert_eq!(
504 config.instances[0].get("instance_key"),
505 Some(&serde_yaml::Value::String("instance_value".to_string()))
506 );
507 assert_eq!(
508 config.instances[1].get("another_key"),
509 Some(&serde_yaml::Value::String("another_value".to_string()))
510 );
511 assert_eq!(
512 config.metric_config.get("metric_key"),
513 Some(&serde_yaml::Value::String("metric_value".to_string()))
514 );
515 assert_eq!(
516 config.logs_config.get("log_key"),
517 Some(&serde_yaml::Value::String("log_value".to_string()))
518 );
519
520 assert_eq!(config.advanced_ad_identifiers.len(), 1);
521 let adv_id = &config.advanced_ad_identifiers[0];
522 assert!(adv_id.kube_endpoints.is_none());
523 assert!(adv_id.kube_service.is_some());
524 let svc = adv_id.kube_service.as_ref().unwrap();
525 assert_eq!(svc.name, "nginx");
526 assert_eq!(svc.namespace, "default");
527 }
528
529 #[test]
530 fn test_autodiscovery_event_from_proto_config() {
531 let mut proto_config = ProtoConfig {
533 name: "test-config".to_string(),
534 event_type: ConfigEventType::Schedule as i32,
535 init_config: b"init-data".to_vec(),
536 instances: vec![],
537 provider: "test-provider".to_string(),
538 ad_identifiers: vec!["id1".to_string(), "id2".to_string()],
539 cluster_check: true,
540 metric_config: vec![],
541 logs_config: vec![],
542 advanced_ad_identifiers: vec![],
543 service_id: "service-id".to_string(),
544 tagger_entity: "tagger-entity".to_string(),
545 node_name: "node-name".to_string(),
546 source: "source".to_string(),
547 ignore_autodiscovery_tags: false,
548 metrics_excluded: false,
549 logs_excluded: false,
550 };
551
552 let kube_svc = KubeNamespacedName {
553 name: "nginx".to_string(),
554 namespace: "default".to_string(),
555 };
556
557 let adv_id = AdvancedAdIdentifier {
558 kube_service: Some(kube_svc),
559 kube_endpoints: None,
560 };
561
562 proto_config.advanced_ad_identifiers = vec![adv_id];
563
564 let event = AutodiscoveryEvent::from(proto_config.clone());
565
566 match event {
567 AutodiscoveryEvent::Schedule { config: _config } => {}
568 _ => panic!("Expected an Schedule event"),
569 }
570
571 proto_config.event_type = ConfigEventType::Unschedule as i32;
572
573 let event = AutodiscoveryEvent::from(proto_config.clone());
574
575 match event {
576 AutodiscoveryEvent::Unscheduled { config } => {
577 assert_eq!(config.name, "test-config");
578 }
579 _ => panic!("Expected an Unscheduled event"),
580 }
581
582 proto_config.instances = vec![b"instance1".to_vec(), b"instance2".to_vec()];
583 proto_config.cluster_check = false;
584 proto_config.event_type = ConfigEventType::Schedule as i32;
585
586 let event = AutodiscoveryEvent::from(proto_config.clone());
587
588 match event {
589 AutodiscoveryEvent::CheckSchedule { config: _config } => {}
590 _ => panic!("Expected an CheckSchedule event"),
591 }
592
593 proto_config.event_type = ConfigEventType::Unschedule as i32;
594
595 let event = AutodiscoveryEvent::from(proto_config);
596
597 match event {
598 AutodiscoveryEvent::CheckUnscheduled { config: _config } => {}
599 _ => panic!("Expected an CheckUnscheduled event"),
600 }
601 }
602}