1use std::sync::{Arc, LazyLock};
2use std::time::Duration;
3
4use async_trait::async_trait;
5use datadog_protos::checks::{
6 check_data::Data,
7 checks_server::{Checks, ChecksServer},
8 event::{AlertType as ProtoAlertType, Event as ProtoEvent, Priority as ProtoPriority},
9 log::{Log as ProtoLog, LogLevel},
10 metric::{Metric as ProtoMetric, MetricType},
11 service_check::{ServiceCheck as ProtoServiceCheck, Status as ServiceCheckStatus},
12 SendCheckPayloadRequest, SendCheckPayloadResponse,
13};
14use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
15use saluki_common::task::HandleExt as _;
16use saluki_config::GenericConfiguration;
17use saluki_context::tags::{Tag, TagSet};
18use saluki_context::Context;
19use saluki_core::data_model::event::eventd::{AlertType, EventD, Priority};
20use saluki_core::data_model::event::log::Log;
21use saluki_core::data_model::event::metric::Metric;
22use saluki_core::data_model::event::service_check::{CheckStatus, ServiceCheck};
23use saluki_core::data_model::event::{Event, EventType};
24use saluki_core::topology::OutputDefinition;
25use saluki_core::{
26 components::{sources::*, ComponentContext},
27 data_model::event::log::LogStatus,
28};
29use saluki_error::{generic_error, GenericError};
30use saluki_io::net::ListenAddress;
31use serde::Deserialize;
32use stringtheory::MetaString;
33use tokio::select;
34use tokio::sync::mpsc;
35use tonic::transport::Server;
36use tonic::{Response, Status};
37use tracing::{debug, trace, warn};
38
39const fn default_grpc_endpoint() -> ListenAddress {
40 ListenAddress::any_tcp(5105)
41}
42
43#[derive(Debug, Deserialize)]
45pub struct ChecksIPCConfiguration {
46 #[serde(rename = "checks_ipc_endpoint", default = "default_grpc_endpoint")]
47 grpc_endpoint: ListenAddress,
48}
49
50impl ChecksIPCConfiguration {
51 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
53 Ok(config.as_typed()?)
54 }
55}
56
57#[async_trait]
58impl SourceBuilder for ChecksIPCConfiguration {
59 fn outputs(&self) -> &[OutputDefinition<EventType>] {
60 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
61 vec![
62 OutputDefinition::named_output("metrics", EventType::Metric),
63 OutputDefinition::named_output("logs", EventType::Log),
64 OutputDefinition::named_output("events", EventType::EventD),
65 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
66 ]
67 });
68
69 &OUTPUTS
70 }
71
72 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
73 Ok(Box::new(ChecksIPC {
74 grpc_endpoint: self.grpc_endpoint.clone(),
75 }))
76 }
77}
78
79impl MemoryBounds for ChecksIPCConfiguration {
80 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
81 builder.minimum().with_single_value::<ChecksIPC>("checks_ipc");
83 }
84}
85
86struct ChecksIPC {
87 grpc_endpoint: ListenAddress,
88}
89
90#[async_trait]
91impl Source for ChecksIPC {
92 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
93 let mut global_shutdown = context.take_shutdown_handle();
94 let mut health = context.take_health_handle();
95
96 let (events_tx, mut events_rx) = mpsc::channel(16);
97
98 let grpc_server = Server::builder().add_service(ChecksServer::new(ChecksService { events_tx }));
99
100 let grpc_socket_addr = match self.grpc_endpoint {
101 ListenAddress::Tcp(addr) => addr,
102 _ => return Err(generic_error!("OTLP gRPC endpoint must be a TCP address.")),
103 };
104 context
105 .topology_context()
106 .global_thread_pool()
107 .spawn_traced_named("checks-ipc-grpc-server", grpc_server.serve(grpc_socket_addr));
108
109 health.mark_ready();
110 debug!("Checks IPC source started.");
111
112 loop {
113 select! {
114 _ = &mut global_shutdown => {
115 debug!("Received shutdown signal.");
116 break;
117 },
118 _ = health.live() => continue,
119 Some(event) = events_rx.recv() => {
120 let output_name = match &event {
121 Event::Metric(_) => "metrics",
122 Event::Log(_) => "logs",
123 Event::EventD(_) => "events",
124 Event::ServiceCheck(_) => "service_checks",
125 _ => continue,
126 };
127
128 if let Err(e) = context.dispatcher().dispatch_one_named(output_name, event).await {
129 warn!("Failed to dispatch {output_name} event: {:?}", e);
130 }
131 },
132 }
133 }
134
135 debug!("Checks IPC source stopped.");
136 Ok(())
137 }
138}
139
140struct ChecksService {
141 events_tx: mpsc::Sender<Event>,
142}
143
144#[async_trait]
145impl Checks for ChecksService {
146 async fn send_check_payload(
147 &self, request: tonic::Request<SendCheckPayloadRequest>,
148 ) -> Result<Response<SendCheckPayloadResponse>, Status> {
149 trace!("Received check payload.");
150
151 let payload = request.into_inner();
152 for check_data in payload.data.into_iter().filter_map(|data| data.data) {
153 let Some(event) = check_data_to_event(check_data) else {
154 continue;
155 };
156
157 if let Err(e) = self.events_tx.send(event).await {
158 warn!("Failed to send check event: {:?}", e);
159 }
160 }
161
162 Ok(Response::new(SendCheckPayloadResponse {}))
163 }
164}
165
166fn check_data_to_event(check_data: Data) -> Option<Event> {
167 match check_data {
170 Data::Metric(metric) => {
171 let ProtoMetric {
172 r#type,
173 name,
174 value,
175 timestamp,
176 tags,
177 hostname,
178 interval_secs,
179 } = metric;
180
181 let metric_type = MetricType::try_from(r#type).ok()?;
182
183 let tags = tags.into_iter().map(Tag::from).collect::<TagSet>();
184 let context = Context::from_parts(name, tags.into_shared());
185 let mut metric = match metric_type {
186 MetricType::Counter => Metric::counter(context, (timestamp, value)),
187 MetricType::Gauge => Metric::gauge(context, (timestamp, value)),
188 MetricType::Rate => {
189 if interval_secs == 0 {
190 warn!("Received rate metric from check with interval of zero. Skipping.");
191 return None;
192 }
193 Metric::rate(context, (timestamp, value), Duration::from_secs(interval_secs))
194 }
195 MetricType::Histogram => Metric::histogram(context, (timestamp, value)),
196 MetricType::Unspecified => {
197 warn!("Received metric with unspecified type. Skipping.");
198 return None;
199 }
200 };
201 if !hostname.is_empty() {
202 metric.metadata_mut().set_hostname(Arc::from(hostname));
203 }
204 Some(Event::Metric(metric))
205 }
206 Data::Log(log) => {
207 let ProtoLog { message, level } = log;
208
209 let level = LogLevel::try_from(level).ok()?;
210 let status = log_level_to_log_status(level);
211
212 Some(Event::Log(Log::new(message).with_status(status)))
213 }
214 Data::Event(event) => {
215 let ProtoEvent {
216 title,
217 text,
218 priority,
219 hostname,
220 tags,
221 alert_type,
222 aggregation_key,
223 source_type_name,
224 timestamp,
225 } = event;
226
227 let tags = tags.into_iter().map(Tag::from).collect::<TagSet>();
228 let mut eventd = EventD::new(title, text)
229 .with_timestamp(timestamp)
230 .with_tags(tags.into_shared());
231
232 if !hostname.is_empty() {
233 eventd.set_hostname(MetaString::from(hostname));
234 }
235 if !aggregation_key.is_empty() {
236 eventd.set_aggregation_key(MetaString::from(aggregation_key));
237 }
238 if !source_type_name.is_empty() {
239 eventd.set_source_type_name(MetaString::from(source_type_name));
240 }
241 if let Some(p) = ProtoPriority::try_from(priority)
242 .ok()
243 .and_then(proto_priority_to_priority)
244 {
245 eventd.set_priority(p);
246 }
247 if let Some(a) = ProtoAlertType::try_from(alert_type)
248 .ok()
249 .and_then(proto_alert_type_to_alert_type)
250 {
251 eventd.set_alert_type(a);
252 }
253 Some(Event::EventD(eventd))
254 }
255 Data::ServiceCheck(sc) => {
256 let ProtoServiceCheck {
257 status,
258 name,
259 message,
260 tags,
261 hostname,
262 } = sc;
263
264 let Some(status) = ServiceCheckStatus::try_from(status)
265 .ok()
266 .and_then(service_check_status_to_check_status)
267 else {
268 warn!(
269 "Received service check with unspecified or invalid status: {}. Skipping.",
270 status
271 );
272 return None;
273 };
274 let tags = tags.into_iter().map(Tag::from).collect::<TagSet>();
275 let mut service_check = ServiceCheck::new(name, status)
276 .with_message(MetaString::from(message))
277 .with_tags(tags.into_shared());
278 if !hostname.is_empty() {
279 service_check.set_hostname(MetaString::from(hostname));
280 }
281 Some(Event::ServiceCheck(service_check))
282 }
283 }
284}
285
286fn log_level_to_log_status(log_level: LogLevel) -> LogStatus {
287 match log_level {
288 LogLevel::Trace => LogStatus::Trace,
289 LogLevel::Debug => LogStatus::Debug,
290 LogLevel::Info => LogStatus::Info,
291 LogLevel::Warning => LogStatus::Warning,
292 LogLevel::Error => LogStatus::Error,
293 LogLevel::Critical => LogStatus::Emergency,
294 _ => LogStatus::Info,
295 }
296}
297
298fn service_check_status_to_check_status(status: ServiceCheckStatus) -> Option<CheckStatus> {
299 match status {
300 ServiceCheckStatus::Ok => Some(CheckStatus::Ok),
301 ServiceCheckStatus::Warning => Some(CheckStatus::Warning),
302 ServiceCheckStatus::Critical => Some(CheckStatus::Critical),
303 ServiceCheckStatus::Unknown => Some(CheckStatus::Unknown),
304 ServiceCheckStatus::Unspecified => None,
305 }
306}
307
308fn proto_priority_to_priority(priority: ProtoPriority) -> Option<Priority> {
309 match priority {
310 ProtoPriority::Normal => Some(Priority::Normal),
311 ProtoPriority::Low => Some(Priority::Low),
312 ProtoPriority::Unspecified => None,
313 }
314}
315
316fn proto_alert_type_to_alert_type(alert_type: ProtoAlertType) -> Option<AlertType> {
317 match alert_type {
318 ProtoAlertType::Info => Some(AlertType::Info),
319 ProtoAlertType::Error => Some(AlertType::Error),
320 ProtoAlertType::Warning => Some(AlertType::Warning),
321 ProtoAlertType::Success => Some(AlertType::Success),
322 ProtoAlertType::Unspecified => None,
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use datadog_protos::checks::{
329 check_data::Data,
330 event::Event as ProtoEvent,
331 log::Log as ProtoLog,
332 metric::{Metric as ProtoMetric, MetricType as ProtoMetricType},
333 service_check::{ServiceCheck as ProtoServiceCheck, Status as ProtoServiceCheckStatus},
334 };
335 use saluki_core::data_model::event::metric::MetricValues;
336
337 use super::*;
338
339 fn metric_data(
340 r#type: i32, name: &str, value: f64, timestamp: u64, interval_secs: u64, tags: &[&str], hostname: &str,
341 ) -> Data {
342 Data::Metric(ProtoMetric {
343 r#type,
344 name: name.to_string(),
345 value,
346 timestamp,
347 tags: tags.iter().map(|t| (*t).to_string()).collect(),
348 hostname: hostname.to_string(),
349 interval_secs,
350 })
351 }
352
353 fn log_data(level: i32, message: &str) -> Data {
354 Data::Log(ProtoLog {
355 message: message.to_string(),
356 level,
357 })
358 }
359
360 fn event_data(title: &str, text: &str, timestamp: u64, tags: &[&str], hostname: &str) -> Data {
361 Data::Event(ProtoEvent {
362 title: title.to_string(),
363 text: text.to_string(),
364 priority: 0,
365 hostname: hostname.to_string(),
366 tags: tags.iter().map(|t| (*t).to_string()).collect(),
367 alert_type: 0,
368 aggregation_key: String::new(),
369 source_type_name: String::new(),
370 timestamp,
371 })
372 }
373
374 fn service_check_data(status: i32, name: &str, message: &str, tags: &[&str], hostname: &str) -> Data {
375 Data::ServiceCheck(ProtoServiceCheck {
376 status,
377 name: name.to_string(),
378 message: message.to_string(),
379 tags: tags.iter().map(|t| (*t).to_string()).collect(),
380 hostname: hostname.to_string(),
381 })
382 }
383
384 #[test]
385 fn metric_counter_conversion() {
386 let event = check_data_to_event(metric_data(
387 ProtoMetricType::Counter as i32,
388 "my_counter",
389 1.0,
390 1234,
391 0,
392 &["tag1:value1", "tag2:value2"],
393 "",
394 ))
395 .expect("counter should convert");
396
397 let Event::Metric(metric) = event else {
398 panic!("expected Metric event");
399 };
400 assert_eq!(metric.context().name().as_ref(), "my_counter");
401 assert!(metric.context().tags().has_tag("tag1:value1"));
402 assert!(metric.context().tags().has_tag("tag2:value2"));
403 assert!(matches!(metric.values(), MetricValues::Counter(_)));
404 }
405
406 #[test]
407 fn metric_gauge_conversion() {
408 let event = check_data_to_event(metric_data(
409 ProtoMetricType::Gauge as i32,
410 "my_gauge",
411 42.0,
412 1234,
413 0,
414 &[],
415 "",
416 ))
417 .expect("gauge should convert");
418 let Event::Metric(metric) = event else {
419 panic!("expected Metric event");
420 };
421 assert!(matches!(metric.values(), MetricValues::Gauge(_)));
422 }
423
424 #[test]
425 fn metric_histogram_conversion() {
426 let event = check_data_to_event(metric_data(
427 ProtoMetricType::Histogram as i32,
428 "my_hist",
429 1.0,
430 1234,
431 0,
432 &[],
433 "",
434 ))
435 .expect("histogram should convert");
436 let Event::Metric(metric) = event else {
437 panic!("expected Metric event");
438 };
439 assert!(matches!(metric.values(), MetricValues::Histogram(_)));
440 }
441
442 #[test]
443 fn metric_rate_conversion_uses_interval() {
444 let event = check_data_to_event(metric_data(
445 ProtoMetricType::Rate as i32,
446 "my_rate",
447 10.0,
448 1234,
449 60,
450 &[],
451 "",
452 ))
453 .expect("rate should convert");
454 let Event::Metric(metric) = event else {
455 panic!("expected Metric event");
456 };
457 match metric.values() {
458 MetricValues::Rate(_, interval) => assert_eq!(*interval, Duration::from_secs(60)),
459 other => panic!("expected Rate values, got {other:?}"),
460 }
461 }
462
463 #[test]
464 fn metric_rate_with_zero_interval_is_skipped() {
465 let event = check_data_to_event(metric_data(
466 ProtoMetricType::Rate as i32,
467 "my_rate",
468 10.0,
469 1234,
470 0,
471 &[],
472 "",
473 ));
474 assert!(event.is_none(), "rate with zero interval must be skipped");
475 }
476
477 #[test]
478 fn metric_unspecified_type_is_skipped() {
479 let event = check_data_to_event(metric_data(
480 ProtoMetricType::Unspecified as i32,
481 "x",
482 1.0,
483 1234,
484 0,
485 &[],
486 "",
487 ));
488 assert!(event.is_none(), "unspecified metric type must be skipped");
489 }
490
491 #[test]
492 fn metric_unknown_type_is_skipped() {
493 let event = check_data_to_event(metric_data(99, "x", 1.0, 1234, 0, &[], ""));
495 assert!(event.is_none(), "unknown metric type must be skipped");
496 }
497
498 #[test]
499 fn log_unknown_level_is_skipped() {
500 let event = check_data_to_event(log_data(99, "hello"));
502 assert!(event.is_none(), "unknown log level must be skipped");
503 }
504
505 #[test]
506 fn event_conversion_preserves_fields() {
507 let event = check_data_to_event(event_data("title", "body", 1234, &["env:prod", "team:foo"], ""))
508 .expect("event should convert");
509 let Event::EventD(ev) = event else {
510 panic!("expected EventD event");
511 };
512 assert_eq!(ev.title(), "title");
513 assert_eq!(ev.text(), "body");
514 assert_eq!(ev.timestamp(), Some(1234));
515 assert!(ev.tags().has_tag("env:prod"));
516 assert!(ev.tags().has_tag("team:foo"));
517 }
518
519 #[test]
520 fn service_check_status_mapping() {
521 let cases = [
522 (ProtoServiceCheckStatus::Ok, CheckStatus::Ok),
523 (ProtoServiceCheckStatus::Warning, CheckStatus::Warning),
524 (ProtoServiceCheckStatus::Critical, CheckStatus::Critical),
525 (ProtoServiceCheckStatus::Unknown, CheckStatus::Unknown),
526 ];
527
528 for (proto_status, expected) in cases {
529 let event = check_data_to_event(service_check_data(proto_status as i32, "n", "m", &[], ""))
530 .unwrap_or_else(|| panic!("status {proto_status:?} should convert"));
531 let Event::ServiceCheck(sc) = event else {
532 panic!("expected ServiceCheck event for {proto_status:?}");
533 };
534 assert_eq!(sc.status(), expected, "status {proto_status:?}");
535 }
536 }
537
538 #[test]
539 fn service_check_unspecified_status_is_skipped() {
540 let event = check_data_to_event(service_check_data(
541 ProtoServiceCheckStatus::Unspecified as i32,
542 "n",
543 "m",
544 &[],
545 "",
546 ));
547 assert!(event.is_none(), "service check with unspecified status must be skipped");
548 }
549
550 #[test]
551 fn service_check_unknown_status_value_is_skipped() {
552 let event = check_data_to_event(service_check_data(99, "n", "m", &[], ""));
554 assert!(
555 event.is_none(),
556 "service check with out-of-range status must be skipped"
557 );
558 }
559
560 #[test]
561 fn service_check_preserves_name_message_and_tags() {
562 let event = check_data_to_event(service_check_data(
563 ProtoServiceCheckStatus::Ok as i32,
564 "my.check",
565 "all good",
566 &["env:prod"],
567 "",
568 ))
569 .expect("service check should convert");
570 let Event::ServiceCheck(sc) = event else {
571 panic!("expected ServiceCheck event");
572 };
573 assert_eq!(sc.name(), "my.check");
574 assert_eq!(sc.status(), CheckStatus::Ok);
575 assert_eq!(sc.message(), Some("all good"));
576 assert!(sc.tags().has_tag("env:prod"));
577 }
578
579 #[test]
580 fn metric_hostname_propagates() {
581 let event = check_data_to_event(metric_data(
582 ProtoMetricType::Counter as i32,
583 "n",
584 1.0,
585 0,
586 0,
587 &[],
588 "host-a",
589 ))
590 .expect("metric should convert");
591 let Event::Metric(m) = event else {
592 panic!("expected Metric event");
593 };
594 assert_eq!(m.metadata().hostname(), Some("host-a"));
595 }
596
597 #[test]
598 fn metric_empty_hostname_stays_unset() {
599 let event = check_data_to_event(metric_data(ProtoMetricType::Counter as i32, "n", 1.0, 0, 0, &[], ""))
600 .expect("metric should convert");
601 let Event::Metric(m) = event else {
602 panic!("expected Metric event");
603 };
604 assert_eq!(m.metadata().hostname(), None);
605 }
606
607 #[test]
608 fn eventd_hostname_propagates() {
609 let event = check_data_to_event(event_data("title", "body", 0, &[], "host-b")).expect("event should convert");
610 let Event::EventD(ev) = event else {
611 panic!("expected EventD event");
612 };
613 assert_eq!(ev.hostname(), Some("host-b"));
614 }
615
616 #[test]
617 fn eventd_empty_hostname_stays_unset() {
618 let event = check_data_to_event(event_data("title", "body", 0, &[], "")).expect("event should convert");
619 let Event::EventD(ev) = event else {
620 panic!("expected EventD event");
621 };
622 assert_eq!(ev.hostname(), None);
623 }
624
625 #[test]
626 fn service_check_hostname_propagates() {
627 let event = check_data_to_event(service_check_data(
628 ProtoServiceCheckStatus::Ok as i32,
629 "n",
630 "m",
631 &[],
632 "host-c",
633 ))
634 .expect("service check should convert");
635 let Event::ServiceCheck(sc) = event else {
636 panic!("expected ServiceCheck event");
637 };
638 assert_eq!(sc.hostname(), Some("host-c"));
639 }
640
641 #[test]
642 fn service_check_empty_hostname_stays_unset() {
643 let event = check_data_to_event(service_check_data(
644 ProtoServiceCheckStatus::Ok as i32,
645 "n",
646 "m",
647 &[],
648 "",
649 ))
650 .expect("service check should convert");
651 let Event::ServiceCheck(sc) = event else {
652 panic!("expected ServiceCheck event");
653 };
654 assert_eq!(sc.hostname(), None);
655 }
656
657 #[test]
658 fn eventd_priority_propagates() {
659 let event = check_data_to_event(Data::Event(ProtoEvent {
660 priority: ProtoPriority::Low as i32,
661 ..Default::default()
662 }))
663 .expect("event should convert");
664 let Event::EventD(ev) = event else {
665 panic!("expected EventD event");
666 };
667 assert_eq!(ev.priority(), Some(Priority::Low));
668 }
669
670 #[test]
671 fn eventd_alert_type_propagates() {
672 let event = check_data_to_event(Data::Event(ProtoEvent {
673 alert_type: ProtoAlertType::Warning as i32,
674 ..Default::default()
675 }))
676 .expect("event should convert");
677 let Event::EventD(ev) = event else {
678 panic!("expected EventD event");
679 };
680 assert_eq!(ev.alert_type(), Some(AlertType::Warning));
681 }
682
683 #[test]
684 fn eventd_aggregation_key_propagates() {
685 let event = check_data_to_event(Data::Event(ProtoEvent {
686 aggregation_key: "agg-key-1".to_string(),
687 ..Default::default()
688 }))
689 .expect("event should convert");
690 let Event::EventD(ev) = event else {
691 panic!("expected EventD event");
692 };
693 assert_eq!(ev.aggregation_key(), Some("agg-key-1"));
694 }
695
696 #[test]
697 fn eventd_source_type_name_propagates() {
698 let event = check_data_to_event(Data::Event(ProtoEvent {
699 source_type_name: "my-source".to_string(),
700 ..Default::default()
701 }))
702 .expect("event should convert");
703 let Event::EventD(ev) = event else {
704 panic!("expected EventD event");
705 };
706 assert_eq!(ev.source_type_name(), Some("my-source"));
707 }
708
709 #[test]
710 fn eventd_unspecified_proto_keeps_saluki_defaults() {
711 let event = check_data_to_event(Data::Event(ProtoEvent::default())).expect("event should convert");
716 let Event::EventD(ev) = event else {
717 panic!("expected EventD event");
718 };
719 assert_eq!(ev.priority(), Some(Priority::Normal));
720 assert_eq!(ev.alert_type(), Some(AlertType::Info));
721 assert_eq!(ev.aggregation_key(), None);
722 assert_eq!(ev.source_type_name(), None);
723 assert_eq!(ev.hostname(), None);
724 }
725}