1use std::sync::{Arc, LazyLock};
2use std::time::Duration;
3
4use async_trait::async_trait;
5use datadog_protos::checks::{
6 check_data::Data,
7 checks_server::{Checks, ChecksServer},
8 event::{AlertType as ProtoAlertType, Event as ProtoEvent, Priority as ProtoPriority},
9 log::{Log as ProtoLog, LogLevel},
10 metric::{Metric as ProtoMetric, MetricType},
11 service_check::{ServiceCheck as ProtoServiceCheck, Status as ServiceCheckStatus},
12 SendCheckPayloadRequest, SendCheckPayloadResponse,
13};
14use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
15use saluki_common::task::HandleExt as _;
16use saluki_config::GenericConfiguration;
17use saluki_context::tags::{Tag, TagSet};
18use saluki_context::Context;
19use saluki_core::data_model::event::eventd::{AlertType, EventD, Priority};
20use saluki_core::data_model::event::log::Log;
21use saluki_core::data_model::event::metric::Metric;
22use saluki_core::data_model::event::service_check::{CheckStatus, ServiceCheck};
23use saluki_core::data_model::event::{Event, EventType};
24use saluki_core::topology::OutputDefinition;
25use saluki_core::{
26 components::{sources::*, ComponentContext},
27 data_model::event::log::LogStatus,
28};
29use saluki_error::{generic_error, GenericError};
30use saluki_io::net::ListenAddress;
31use serde::Deserialize;
32use stringtheory::MetaString;
33use tokio::sync::mpsc;
34use tokio::{pin, select};
35use tonic::transport::Server;
36use tonic::{Response, Status};
37use tracing::{debug, trace, warn};
38
39const fn default_grpc_endpoint() -> ListenAddress {
40 ListenAddress::any_tcp(5105)
41}
42
43#[derive(Debug, Deserialize)]
45pub struct ChecksIPCConfiguration {
46 #[serde(rename = "checks_ipc_endpoint", default = "default_grpc_endpoint")]
47 grpc_endpoint: ListenAddress,
48}
49
50impl ChecksIPCConfiguration {
51 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
53 Ok(config.as_typed()?)
54 }
55}
56
57#[async_trait]
58impl SourceBuilder for ChecksIPCConfiguration {
59 fn outputs(&self) -> &[OutputDefinition<EventType>] {
60 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
61 vec![
62 OutputDefinition::named_output("metrics", EventType::Metric),
63 OutputDefinition::named_output("logs", EventType::Log),
64 OutputDefinition::named_output("events", EventType::EventD),
65 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
66 ]
67 });
68
69 &OUTPUTS
70 }
71
72 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
73 Ok(Box::new(ChecksIPC {
74 grpc_endpoint: self.grpc_endpoint.clone(),
75 }))
76 }
77}
78
79impl MemoryBounds for ChecksIPCConfiguration {
80 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
81 builder.minimum().with_single_value::<ChecksIPC>("checks_ipc");
83 }
84}
85
86struct ChecksIPC {
87 grpc_endpoint: ListenAddress,
88}
89
90#[async_trait]
91impl Source for ChecksIPC {
92 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
93 let global_shutdown = context.take_shutdown_handle();
94 pin!(global_shutdown);
95
96 let mut health = context.take_health_handle();
97
98 let (events_tx, mut events_rx) = mpsc::channel(16);
99
100 let grpc_server = Server::builder().add_service(ChecksServer::new(ChecksService { events_tx }));
101
102 let grpc_socket_addr = match self.grpc_endpoint {
103 ListenAddress::Tcp(addr) => addr,
104 _ => return Err(generic_error!("OTLP gRPC endpoint must be a TCP address.")),
105 };
106 context
107 .topology_context()
108 .global_thread_pool()
109 .spawn_traced_named("checks-ipc-grpc-server", grpc_server.serve(grpc_socket_addr));
110
111 health.mark_ready();
112 debug!("Checks IPC source started.");
113
114 loop {
115 select! {
116 _ = &mut global_shutdown => {
117 debug!("Received shutdown signal.");
118 break;
119 },
120 _ = health.live() => continue,
121 Some(event) = events_rx.recv() => {
122 let output_name = match &event {
123 Event::Metric(_) => "metrics",
124 Event::Log(_) => "logs",
125 Event::EventD(_) => "events",
126 Event::ServiceCheck(_) => "service_checks",
127 _ => continue,
128 };
129
130 if let Err(e) = context.dispatcher().dispatch_one_named(output_name, event).await {
131 warn!("Failed to dispatch {output_name} event: {:?}", e);
132 }
133 },
134 }
135 }
136
137 debug!("Checks IPC source stopped.");
138 Ok(())
139 }
140}
141
142struct ChecksService {
143 events_tx: mpsc::Sender<Event>,
144}
145
146#[async_trait]
147impl Checks for ChecksService {
148 async fn send_check_payload(
149 &self, request: tonic::Request<SendCheckPayloadRequest>,
150 ) -> Result<Response<SendCheckPayloadResponse>, Status> {
151 trace!("Received check payload.");
152
153 let payload = request.into_inner();
154 for check_data in payload.data.into_iter().filter_map(|data| data.data) {
155 let Some(event) = check_data_to_event(check_data) else {
156 continue;
157 };
158
159 if let Err(e) = self.events_tx.send(event).await {
160 warn!("Failed to send check event: {:?}", e);
161 }
162 }
163
164 Ok(Response::new(SendCheckPayloadResponse {}))
165 }
166}
167
168fn check_data_to_event(check_data: Data) -> Option<Event> {
169 match check_data {
172 Data::Metric(metric) => {
173 let ProtoMetric {
174 r#type,
175 name,
176 value,
177 timestamp,
178 tags,
179 hostname,
180 interval_secs,
181 } = metric;
182
183 let metric_type = MetricType::try_from(r#type).ok()?;
184
185 let tags = tags.into_iter().map(Tag::from).collect::<TagSet>();
186 let context = Context::from_parts(name, tags.into_shared());
187 let mut metric = match metric_type {
188 MetricType::Counter => Metric::counter(context, (timestamp, value)),
189 MetricType::Gauge => Metric::gauge(context, (timestamp, value)),
190 MetricType::Rate => {
191 if interval_secs == 0 {
192 warn!("Received rate metric from check with interval of zero. Skipping.");
193 return None;
194 }
195 Metric::rate(context, (timestamp, value), Duration::from_secs(interval_secs))
196 }
197 MetricType::Histogram => Metric::histogram(context, (timestamp, value)),
198 MetricType::Unspecified => {
199 warn!("Received metric with unspecified type. Skipping.");
200 return None;
201 }
202 };
203 if !hostname.is_empty() {
204 metric.metadata_mut().set_hostname(Arc::from(hostname));
205 }
206 Some(Event::Metric(metric))
207 }
208 Data::Log(log) => {
209 let ProtoLog { message, level } = log;
210
211 let level = LogLevel::try_from(level).ok()?;
212 let status = log_level_to_log_status(level);
213
214 Some(Event::Log(Log::new(message).with_status(status)))
215 }
216 Data::Event(event) => {
217 let ProtoEvent {
218 title,
219 text,
220 priority,
221 hostname,
222 tags,
223 alert_type,
224 aggregation_key,
225 source_type_name,
226 timestamp,
227 } = event;
228
229 let tags = tags.into_iter().map(Tag::from).collect::<TagSet>();
230 let mut eventd = EventD::new(title, text)
231 .with_timestamp(timestamp)
232 .with_tags(tags.into_shared());
233
234 if !hostname.is_empty() {
235 eventd.set_hostname(MetaString::from(hostname));
236 }
237 if !aggregation_key.is_empty() {
238 eventd.set_aggregation_key(MetaString::from(aggregation_key));
239 }
240 if !source_type_name.is_empty() {
241 eventd.set_source_type_name(MetaString::from(source_type_name));
242 }
243 if let Some(p) = ProtoPriority::try_from(priority)
244 .ok()
245 .and_then(proto_priority_to_priority)
246 {
247 eventd.set_priority(p);
248 }
249 if let Some(a) = ProtoAlertType::try_from(alert_type)
250 .ok()
251 .and_then(proto_alert_type_to_alert_type)
252 {
253 eventd.set_alert_type(a);
254 }
255 Some(Event::EventD(eventd))
256 }
257 Data::ServiceCheck(sc) => {
258 let ProtoServiceCheck {
259 status,
260 name,
261 message,
262 tags,
263 hostname,
264 } = sc;
265
266 let Some(status) = ServiceCheckStatus::try_from(status)
267 .ok()
268 .and_then(service_check_status_to_check_status)
269 else {
270 warn!(
271 "Received service check with unspecified or invalid status: {}. Skipping.",
272 status
273 );
274 return None;
275 };
276 let tags = tags.into_iter().map(Tag::from).collect::<TagSet>();
277 let mut service_check = ServiceCheck::new(name, status)
278 .with_message(MetaString::from(message))
279 .with_tags(tags.into_shared());
280 if !hostname.is_empty() {
281 service_check.set_hostname(MetaString::from(hostname));
282 }
283 Some(Event::ServiceCheck(service_check))
284 }
285 }
286}
287
288fn log_level_to_log_status(log_level: LogLevel) -> LogStatus {
289 match log_level {
290 LogLevel::Trace => LogStatus::Trace,
291 LogLevel::Debug => LogStatus::Debug,
292 LogLevel::Info => LogStatus::Info,
293 LogLevel::Warning => LogStatus::Warning,
294 LogLevel::Error => LogStatus::Error,
295 LogLevel::Critical => LogStatus::Emergency,
296 _ => LogStatus::Info,
297 }
298}
299
300fn service_check_status_to_check_status(status: ServiceCheckStatus) -> Option<CheckStatus> {
301 match status {
302 ServiceCheckStatus::Ok => Some(CheckStatus::Ok),
303 ServiceCheckStatus::Warning => Some(CheckStatus::Warning),
304 ServiceCheckStatus::Critical => Some(CheckStatus::Critical),
305 ServiceCheckStatus::Unknown => Some(CheckStatus::Unknown),
306 ServiceCheckStatus::Unspecified => None,
307 }
308}
309
310fn proto_priority_to_priority(priority: ProtoPriority) -> Option<Priority> {
311 match priority {
312 ProtoPriority::Normal => Some(Priority::Normal),
313 ProtoPriority::Low => Some(Priority::Low),
314 ProtoPriority::Unspecified => None,
315 }
316}
317
318fn proto_alert_type_to_alert_type(alert_type: ProtoAlertType) -> Option<AlertType> {
319 match alert_type {
320 ProtoAlertType::Info => Some(AlertType::Info),
321 ProtoAlertType::Error => Some(AlertType::Error),
322 ProtoAlertType::Warning => Some(AlertType::Warning),
323 ProtoAlertType::Success => Some(AlertType::Success),
324 ProtoAlertType::Unspecified => None,
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use datadog_protos::checks::{
331 check_data::Data,
332 event::Event as ProtoEvent,
333 log::Log as ProtoLog,
334 metric::{Metric as ProtoMetric, MetricType as ProtoMetricType},
335 service_check::{ServiceCheck as ProtoServiceCheck, Status as ProtoServiceCheckStatus},
336 };
337 use saluki_core::data_model::event::metric::MetricValues;
338
339 use super::*;
340
341 fn metric_data(
342 r#type: i32, name: &str, value: f64, timestamp: u64, interval_secs: u64, tags: &[&str], hostname: &str,
343 ) -> Data {
344 Data::Metric(ProtoMetric {
345 r#type,
346 name: name.to_string(),
347 value,
348 timestamp,
349 tags: tags.iter().map(|t| (*t).to_string()).collect(),
350 hostname: hostname.to_string(),
351 interval_secs,
352 })
353 }
354
355 fn log_data(level: i32, message: &str) -> Data {
356 Data::Log(ProtoLog {
357 message: message.to_string(),
358 level,
359 })
360 }
361
362 fn event_data(title: &str, text: &str, timestamp: u64, tags: &[&str], hostname: &str) -> Data {
363 Data::Event(ProtoEvent {
364 title: title.to_string(),
365 text: text.to_string(),
366 priority: 0,
367 hostname: hostname.to_string(),
368 tags: tags.iter().map(|t| (*t).to_string()).collect(),
369 alert_type: 0,
370 aggregation_key: String::new(),
371 source_type_name: String::new(),
372 timestamp,
373 })
374 }
375
376 fn service_check_data(status: i32, name: &str, message: &str, tags: &[&str], hostname: &str) -> Data {
377 Data::ServiceCheck(ProtoServiceCheck {
378 status,
379 name: name.to_string(),
380 message: message.to_string(),
381 tags: tags.iter().map(|t| (*t).to_string()).collect(),
382 hostname: hostname.to_string(),
383 })
384 }
385
386 #[test]
387 fn metric_counter_conversion() {
388 let event = check_data_to_event(metric_data(
389 ProtoMetricType::Counter as i32,
390 "my_counter",
391 1.0,
392 1234,
393 0,
394 &["tag1:value1", "tag2:value2"],
395 "",
396 ))
397 .expect("counter should convert");
398
399 let Event::Metric(metric) = event else {
400 panic!("expected Metric event");
401 };
402 assert_eq!(metric.context().name().as_ref(), "my_counter");
403 assert!(metric.context().tags().has_tag("tag1:value1"));
404 assert!(metric.context().tags().has_tag("tag2:value2"));
405 assert!(matches!(metric.values(), MetricValues::Counter(_)));
406 }
407
408 #[test]
409 fn metric_gauge_conversion() {
410 let event = check_data_to_event(metric_data(
411 ProtoMetricType::Gauge as i32,
412 "my_gauge",
413 42.0,
414 1234,
415 0,
416 &[],
417 "",
418 ))
419 .expect("gauge should convert");
420 let Event::Metric(metric) = event else {
421 panic!("expected Metric event");
422 };
423 assert!(matches!(metric.values(), MetricValues::Gauge(_)));
424 }
425
426 #[test]
427 fn metric_histogram_conversion() {
428 let event = check_data_to_event(metric_data(
429 ProtoMetricType::Histogram as i32,
430 "my_hist",
431 1.0,
432 1234,
433 0,
434 &[],
435 "",
436 ))
437 .expect("histogram should convert");
438 let Event::Metric(metric) = event else {
439 panic!("expected Metric event");
440 };
441 assert!(matches!(metric.values(), MetricValues::Histogram(_)));
442 }
443
444 #[test]
445 fn metric_rate_conversion_uses_interval() {
446 let event = check_data_to_event(metric_data(
447 ProtoMetricType::Rate as i32,
448 "my_rate",
449 10.0,
450 1234,
451 60,
452 &[],
453 "",
454 ))
455 .expect("rate should convert");
456 let Event::Metric(metric) = event else {
457 panic!("expected Metric event");
458 };
459 match metric.values() {
460 MetricValues::Rate(_, interval) => assert_eq!(*interval, Duration::from_secs(60)),
461 other => panic!("expected Rate values, got {other:?}"),
462 }
463 }
464
465 #[test]
466 fn metric_rate_with_zero_interval_is_skipped() {
467 let event = check_data_to_event(metric_data(
468 ProtoMetricType::Rate as i32,
469 "my_rate",
470 10.0,
471 1234,
472 0,
473 &[],
474 "",
475 ));
476 assert!(event.is_none(), "rate with zero interval must be skipped");
477 }
478
479 #[test]
480 fn metric_unspecified_type_is_skipped() {
481 let event = check_data_to_event(metric_data(
482 ProtoMetricType::Unspecified as i32,
483 "x",
484 1.0,
485 1234,
486 0,
487 &[],
488 "",
489 ));
490 assert!(event.is_none(), "unspecified metric type must be skipped");
491 }
492
493 #[test]
494 fn metric_unknown_type_is_skipped() {
495 let event = check_data_to_event(metric_data(99, "x", 1.0, 1234, 0, &[], ""));
497 assert!(event.is_none(), "unknown metric type must be skipped");
498 }
499
500 #[test]
501 fn log_unknown_level_is_skipped() {
502 let event = check_data_to_event(log_data(99, "hello"));
504 assert!(event.is_none(), "unknown log level must be skipped");
505 }
506
507 #[test]
508 fn event_conversion_preserves_fields() {
509 let event = check_data_to_event(event_data("title", "body", 1234, &["env:prod", "team:foo"], ""))
510 .expect("event should convert");
511 let Event::EventD(ev) = event else {
512 panic!("expected EventD event");
513 };
514 assert_eq!(ev.title(), "title");
515 assert_eq!(ev.text(), "body");
516 assert_eq!(ev.timestamp(), Some(1234));
517 assert!(ev.tags().has_tag("env:prod"));
518 assert!(ev.tags().has_tag("team:foo"));
519 }
520
521 #[test]
522 fn service_check_status_mapping() {
523 let cases = [
524 (ProtoServiceCheckStatus::Ok, CheckStatus::Ok),
525 (ProtoServiceCheckStatus::Warning, CheckStatus::Warning),
526 (ProtoServiceCheckStatus::Critical, CheckStatus::Critical),
527 (ProtoServiceCheckStatus::Unknown, CheckStatus::Unknown),
528 ];
529
530 for (proto_status, expected) in cases {
531 let event = check_data_to_event(service_check_data(proto_status as i32, "n", "m", &[], ""))
532 .unwrap_or_else(|| panic!("status {proto_status:?} should convert"));
533 let Event::ServiceCheck(sc) = event else {
534 panic!("expected ServiceCheck event for {proto_status:?}");
535 };
536 assert_eq!(sc.status(), expected, "status {proto_status:?}");
537 }
538 }
539
540 #[test]
541 fn service_check_unspecified_status_is_skipped() {
542 let event = check_data_to_event(service_check_data(
543 ProtoServiceCheckStatus::Unspecified as i32,
544 "n",
545 "m",
546 &[],
547 "",
548 ));
549 assert!(event.is_none(), "service check with unspecified status must be skipped");
550 }
551
552 #[test]
553 fn service_check_unknown_status_value_is_skipped() {
554 let event = check_data_to_event(service_check_data(99, "n", "m", &[], ""));
556 assert!(
557 event.is_none(),
558 "service check with out-of-range status must be skipped"
559 );
560 }
561
562 #[test]
563 fn service_check_preserves_name_message_and_tags() {
564 let event = check_data_to_event(service_check_data(
565 ProtoServiceCheckStatus::Ok as i32,
566 "my.check",
567 "all good",
568 &["env:prod"],
569 "",
570 ))
571 .expect("service check should convert");
572 let Event::ServiceCheck(sc) = event else {
573 panic!("expected ServiceCheck event");
574 };
575 assert_eq!(sc.name(), "my.check");
576 assert_eq!(sc.status(), CheckStatus::Ok);
577 assert_eq!(sc.message(), Some("all good"));
578 assert!(sc.tags().has_tag("env:prod"));
579 }
580
581 #[test]
582 fn metric_hostname_propagates() {
583 let event = check_data_to_event(metric_data(
584 ProtoMetricType::Counter as i32,
585 "n",
586 1.0,
587 0,
588 0,
589 &[],
590 "host-a",
591 ))
592 .expect("metric should convert");
593 let Event::Metric(m) = event else {
594 panic!("expected Metric event");
595 };
596 assert_eq!(m.metadata().hostname(), Some("host-a"));
597 }
598
599 #[test]
600 fn metric_empty_hostname_stays_unset() {
601 let event = check_data_to_event(metric_data(ProtoMetricType::Counter as i32, "n", 1.0, 0, 0, &[], ""))
602 .expect("metric should convert");
603 let Event::Metric(m) = event else {
604 panic!("expected Metric event");
605 };
606 assert_eq!(m.metadata().hostname(), None);
607 }
608
609 #[test]
610 fn eventd_hostname_propagates() {
611 let event = check_data_to_event(event_data("title", "body", 0, &[], "host-b")).expect("event should convert");
612 let Event::EventD(ev) = event else {
613 panic!("expected EventD event");
614 };
615 assert_eq!(ev.hostname(), Some("host-b"));
616 }
617
618 #[test]
619 fn eventd_empty_hostname_stays_unset() {
620 let event = check_data_to_event(event_data("title", "body", 0, &[], "")).expect("event should convert");
621 let Event::EventD(ev) = event else {
622 panic!("expected EventD event");
623 };
624 assert_eq!(ev.hostname(), None);
625 }
626
627 #[test]
628 fn service_check_hostname_propagates() {
629 let event = check_data_to_event(service_check_data(
630 ProtoServiceCheckStatus::Ok as i32,
631 "n",
632 "m",
633 &[],
634 "host-c",
635 ))
636 .expect("service check should convert");
637 let Event::ServiceCheck(sc) = event else {
638 panic!("expected ServiceCheck event");
639 };
640 assert_eq!(sc.hostname(), Some("host-c"));
641 }
642
643 #[test]
644 fn service_check_empty_hostname_stays_unset() {
645 let event = check_data_to_event(service_check_data(
646 ProtoServiceCheckStatus::Ok as i32,
647 "n",
648 "m",
649 &[],
650 "",
651 ))
652 .expect("service check should convert");
653 let Event::ServiceCheck(sc) = event else {
654 panic!("expected ServiceCheck event");
655 };
656 assert_eq!(sc.hostname(), None);
657 }
658
659 #[test]
660 fn eventd_priority_propagates() {
661 let event = check_data_to_event(Data::Event(ProtoEvent {
662 priority: ProtoPriority::Low as i32,
663 ..Default::default()
664 }))
665 .expect("event should convert");
666 let Event::EventD(ev) = event else {
667 panic!("expected EventD event");
668 };
669 assert_eq!(ev.priority(), Some(Priority::Low));
670 }
671
672 #[test]
673 fn eventd_alert_type_propagates() {
674 let event = check_data_to_event(Data::Event(ProtoEvent {
675 alert_type: ProtoAlertType::Warning as i32,
676 ..Default::default()
677 }))
678 .expect("event should convert");
679 let Event::EventD(ev) = event else {
680 panic!("expected EventD event");
681 };
682 assert_eq!(ev.alert_type(), Some(AlertType::Warning));
683 }
684
685 #[test]
686 fn eventd_aggregation_key_propagates() {
687 let event = check_data_to_event(Data::Event(ProtoEvent {
688 aggregation_key: "agg-key-1".to_string(),
689 ..Default::default()
690 }))
691 .expect("event should convert");
692 let Event::EventD(ev) = event else {
693 panic!("expected EventD event");
694 };
695 assert_eq!(ev.aggregation_key(), Some("agg-key-1"));
696 }
697
698 #[test]
699 fn eventd_source_type_name_propagates() {
700 let event = check_data_to_event(Data::Event(ProtoEvent {
701 source_type_name: "my-source".to_string(),
702 ..Default::default()
703 }))
704 .expect("event should convert");
705 let Event::EventD(ev) = event else {
706 panic!("expected EventD event");
707 };
708 assert_eq!(ev.source_type_name(), Some("my-source"));
709 }
710
711 #[test]
712 fn eventd_unspecified_proto_keeps_saluki_defaults() {
713 let event = check_data_to_event(Data::Event(ProtoEvent::default())).expect("event should convert");
718 let Event::EventD(ev) = event else {
719 panic!("expected EventD event");
720 };
721 assert_eq!(ev.priority(), Some(Priority::Normal));
722 assert_eq!(ev.alert_type(), Some(AlertType::Info));
723 assert_eq!(ev.aggregation_key(), None);
724 assert_eq!(ev.source_type_name(), None);
725 assert_eq!(ev.hostname(), None);
726 }
727}