1use std::sync::{Arc, LazyLock};
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::{Buf, BufMut};
6use bytesize::ByteSize;
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
8use metrics::{Counter, Gauge, Histogram};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_context::tags::{RawTags, RawTagsFilter};
12use saluki_context::TagsResolver;
13use saluki_core::data_model::event::eventd::EventD;
14use saluki_core::data_model::event::metric::{MetricMetadata, MetricOrigin};
15use saluki_core::data_model::event::service_check::ServiceCheck;
16use saluki_core::data_model::event::{metric::Metric, Event, EventType};
17use saluki_core::topology::EventsBuffer;
18use saluki_core::{
19 components::{sources::*, ComponentContext},
20 observability::ComponentMetricsExt as _,
21 pooling::FixedSizeObjectPool,
22 topology::{
23 interconnect::EventBufferManager,
24 shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
25 OutputDefinition,
26 },
27};
28use saluki_env::WorkloadProvider;
29use saluki_error::{generic_error, ErrorContext as _, GenericError};
30use saluki_io::deser::codec::dogstatsd::{EventPacket, ServiceCheckPacket};
31use saluki_io::{
32 buf::{BytesBuffer, FixedSizeVec},
33 deser::{
34 codec::{
35 dogstatsd::{parse_message_type, MessageType, MetricPacket, ParseError, ParsedPacket},
36 DogstatsdCodec, DogstatsdCodecConfiguration,
37 },
38 framing::FramerExt as _,
39 },
40 net::{
41 listener::{Listener, ListenerError},
42 ConnectionAddress, ListenAddress, Stream,
43 },
44};
45use saluki_metrics::MetricsBuilder;
46use serde::Deserialize;
47use snafu::{ResultExt as _, Snafu};
48use tokio::{
49 select,
50 time::{interval, MissedTickBehavior},
51};
52use tracing::{debug, error, info, trace, warn};
53
54mod framer;
55use self::framer::{get_framer, DsdFramer};
56use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
57
58mod filters;
59use self::filters::EnablePayloadsFilter;
60
61mod io_buffer;
62use self::io_buffer::IoBufferManager;
63
64mod origin;
65use self::origin::{
66 origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, DogStatsDOriginTagResolver,
67 OriginEnrichmentConfiguration,
68};
69
70mod resolver;
71use self::resolver::ContextResolvers;
72
73mod tags;
74
75#[derive(Debug, Snafu)]
76#[snafu(context(suffix(false)))]
77enum Error {
78 #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
79 FailedToCreateListener {
80 listener_type: &'static str,
81 source: ListenerError,
82 },
83
84 #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
85 NoListenersConfigured,
86}
87
88const fn default_buffer_size() -> usize {
89 8192
90}
91
92const fn default_buffer_count() -> usize {
93 128
94}
95
96const fn default_port() -> u16 {
97 8125
98}
99
100const fn default_tcp_port() -> u16 {
101 0
102}
103
104const fn default_allow_context_heap_allocations() -> bool {
105 true
106}
107
108const fn default_no_aggregation_pipeline_support() -> bool {
109 true
110}
111
112const fn default_context_string_interner_size() -> ByteSize {
113 ByteSize::mib(2)
114}
115
116const fn default_cached_contexts_limit() -> usize {
117 500_000
118}
119
120const fn default_cached_tagsets_limit() -> usize {
121 500_000
122}
123
124const fn default_dogstatsd_permissive_decoding() -> bool {
125 true
126}
127
128const fn default_enable_payloads_series() -> bool {
129 true
130}
131
132const fn default_enable_payloads_sketches() -> bool {
133 true
134}
135
136const fn default_enable_payloads_events() -> bool {
137 true
138}
139
140const fn default_enable_payloads_service_checks() -> bool {
141 true
142}
143
144#[derive(Deserialize)]
148pub struct DogStatsDConfiguration {
149 #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
155 buffer_size: usize,
156
157 #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
165 buffer_count: usize,
166
167 #[serde(rename = "dogstatsd_port", default = "default_port")]
173 port: u16,
174
175 #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
181 tcp_port: u16,
182
183 #[serde(rename = "dogstatsd_socket")]
189 socket_path: Option<String>,
190
191 #[serde(rename = "dogstatsd_stream_socket")]
197 socket_stream_path: Option<String>,
198
199 #[serde(rename = "dogstatsd_non_local_traffic", default)]
206 non_local_traffic: bool,
207
208 #[serde(
218 rename = "dogstatsd_allow_context_heap_allocs",
219 default = "default_allow_context_heap_allocations"
220 )]
221 allow_context_heap_allocations: bool,
222
223 #[serde(
231 rename = "dogstatsd_no_aggregation_pipeline",
232 default = "default_no_aggregation_pipeline_support"
233 )]
234 no_aggregation_pipeline_support: bool,
235
236 #[serde(
242 rename = "dogstatsd_string_interner_size",
243 default = "default_context_string_interner_size"
244 )]
245 context_string_interner_bytes: ByteSize,
246
247 #[serde(
255 rename = "dogstatsd_cached_contexts_limit",
256 default = "default_cached_contexts_limit"
257 )]
258 cached_contexts_limit: usize,
259
260 #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
268 cached_tagsets_limit: usize,
269
270 #[serde(
277 rename = "dogstatsd_permissive_decoding",
278 default = "default_dogstatsd_permissive_decoding"
279 )]
280 permissive_decoding: bool,
281
282 #[serde(default = "default_enable_payloads_series")]
286 enable_payloads_series: bool,
287
288 #[serde(default = "default_enable_payloads_sketches")]
292 enable_payloads_sketches: bool,
293
294 #[serde(default = "default_enable_payloads_events")]
298 enable_payloads_events: bool,
299
300 #[serde(default = "default_enable_payloads_service_checks")]
304 enable_payloads_service_checks: bool,
305
306 #[serde(flatten, default)]
308 origin_enrichment: OriginEnrichmentConfiguration,
309
310 #[serde(skip)]
312 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
313
314 #[serde(rename = "dogstatsd_tags", default)]
316 additional_tags: Vec<String>,
317}
318
319impl DogStatsDConfiguration {
320 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
322 Ok(config.as_typed()?)
323 }
324
325 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
331 where
332 W: WorkloadProvider + Send + Sync + 'static,
333 {
334 self.workload_provider = Some(Arc::new(workload_provider));
335 self
336 }
337
338 async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
339 let mut listeners = Vec::new();
340
341 if self.port != 0 {
342 let address = if self.non_local_traffic {
343 ListenAddress::Udp(([0, 0, 0, 0], self.port).into())
344 } else {
345 ListenAddress::Udp(([127, 0, 0, 1], self.port).into())
346 };
347
348 let listener = Listener::from_listen_address(address)
349 .await
350 .context(FailedToCreateListener { listener_type: "UDP" })?;
351 listeners.push(listener);
352 }
353
354 if self.tcp_port != 0 {
355 let address = if self.non_local_traffic {
356 ListenAddress::Tcp(([0, 0, 0, 0], self.tcp_port).into())
357 } else {
358 ListenAddress::Tcp(([127, 0, 0, 1], self.tcp_port).into())
359 };
360
361 let listener = Listener::from_listen_address(address)
362 .await
363 .context(FailedToCreateListener { listener_type: "TCP" })?;
364 listeners.push(listener);
365 }
366
367 if let Some(socket_path) = &self.socket_path {
368 let address = ListenAddress::Unixgram(socket_path.into());
369 let listener = Listener::from_listen_address(address)
370 .await
371 .context(FailedToCreateListener {
372 listener_type: "UDS (datagram)",
373 })?;
374 listeners.push(listener);
375 }
376
377 if let Some(socket_stream_path) = &self.socket_stream_path {
378 let address = ListenAddress::Unix(socket_stream_path.into());
379 let listener = Listener::from_listen_address(address)
380 .await
381 .context(FailedToCreateListener {
382 listener_type: "UDS (stream)",
383 })?;
384 listeners.push(listener);
385 }
386
387 Ok(listeners)
388 }
389}
390
391#[async_trait]
392impl SourceBuilder for DogStatsDConfiguration {
393 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
394 let listeners = self.build_listeners().await?;
395 if listeners.is_empty() {
396 return Err(Error::NoListenersConfigured.into());
397 }
398
399 if self.buffer_count < listeners.len() {
402 return Err(generic_error!(
403 "Must have a minimum of {} I/O buffers based on the number of listeners configured.",
404 listeners.len()
405 ));
406 }
407
408 let maybe_origin_tags_resolver = self
409 .workload_provider
410 .clone()
411 .map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
412 let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
413 .error_context("Failed to create context resolvers.")?;
414
415 let codec_config = DogstatsdCodecConfiguration::default()
416 .with_timestamps(self.no_aggregation_pipeline_support)
417 .with_permissive_mode(self.permissive_decoding);
418
419 let codec = DogstatsdCodec::from_configuration(codec_config);
420
421 let enable_payloads_filter = EnablePayloadsFilter::default()
422 .with_allow_series(self.enable_payloads_series)
423 .with_allow_sketches(self.enable_payloads_sketches)
424 .with_allow_events(self.enable_payloads_events)
425 .with_allow_service_checks(self.enable_payloads_service_checks);
426
427 Ok(Box::new(DogStatsD {
428 listeners,
429 io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
430 FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
431 }),
432 codec,
433 context_resolvers,
434 enabled_filter: enable_payloads_filter,
435 additional_tags: self.additional_tags.clone().into(),
436 }))
437 }
438
439 fn outputs(&self) -> &[OutputDefinition<EventType>] {
440 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
441 vec![
442 OutputDefinition::named_output("metrics", EventType::Metric),
443 OutputDefinition::named_output("events", EventType::EventD),
444 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
445 ]
446 });
447 &OUTPUTS
448 }
449}
450
451impl MemoryBounds for DogStatsDConfiguration {
452 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
453 builder
454 .minimum()
455 .with_single_value::<DogStatsD>("source struct")
457 .with_expr(UsageExpr::product(
459 "buffers",
460 UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
461 UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
462 ))
463 .with_expr(UsageExpr::config(
466 "dogstatsd_string_interner_size",
467 self.context_string_interner_bytes.as_u64() as usize,
468 ));
469 }
470}
471
472pub struct DogStatsD {
474 listeners: Vec<Listener>,
475 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
476 codec: DogstatsdCodec,
477 context_resolvers: ContextResolvers,
478 enabled_filter: EnablePayloadsFilter,
479 additional_tags: Arc<[String]>,
480}
481
482struct ListenerContext {
483 shutdown_handle: DynamicShutdownHandle,
484 listener: Listener,
485 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
486 codec: DogstatsdCodec,
487 context_resolvers: ContextResolvers,
488 additional_tags: Arc<[String]>,
489}
490
491struct HandlerContext {
492 listen_addr: ListenAddress,
493 framer: DsdFramer,
494 codec: DogstatsdCodec,
495 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
496 metrics: Metrics,
497 context_resolvers: ContextResolvers,
498 additional_tags: Arc<[String]>,
499}
500
501struct Metrics {
502 metrics_received: Counter,
503 events_received: Counter,
504 service_checks_received: Counter,
505 bytes_received: Counter,
506 bytes_received_size: Histogram,
507 framing_errors: Counter,
508 metric_decoder_errors: Counter,
509 event_decoder_errors: Counter,
510 service_check_decoder_errors: Counter,
511 failed_context_resolve_total: Counter,
512 connections_active: Gauge,
513 packet_receive_success: Counter,
514 packet_receive_failure: Counter,
515}
516
517impl Metrics {
518 fn metrics_received(&self) -> &Counter {
519 &self.metrics_received
520 }
521
522 fn events_received(&self) -> &Counter {
523 &self.events_received
524 }
525
526 fn service_checks_received(&self) -> &Counter {
527 &self.service_checks_received
528 }
529
530 fn bytes_received(&self) -> &Counter {
531 &self.bytes_received
532 }
533
534 fn bytes_received_size(&self) -> &Histogram {
535 &self.bytes_received_size
536 }
537
538 fn framing_errors(&self) -> &Counter {
539 &self.framing_errors
540 }
541
542 fn metric_decode_failed(&self) -> &Counter {
543 &self.metric_decoder_errors
544 }
545
546 fn event_decode_failed(&self) -> &Counter {
547 &self.event_decoder_errors
548 }
549
550 fn service_check_decode_failed(&self) -> &Counter {
551 &self.service_check_decoder_errors
552 }
553
554 fn failed_context_resolve_total(&self) -> &Counter {
555 &self.failed_context_resolve_total
556 }
557
558 fn connections_active(&self) -> &Gauge {
559 &self.connections_active
560 }
561
562 fn packet_receive_success(&self) -> &Counter {
563 &self.packet_receive_success
564 }
565
566 fn packet_receive_failure(&self) -> &Counter {
567 &self.packet_receive_failure
568 }
569}
570
571fn build_metrics(listen_addr: &ListenAddress, component_context: &ComponentContext) -> Metrics {
572 let builder = MetricsBuilder::from_component_context(component_context);
573
574 let listener_type = match listen_addr {
575 ListenAddress::Tcp(_) => "tcp",
576 ListenAddress::Udp(_) => "udp",
577 ListenAddress::Unix(_) => "unix",
578 ListenAddress::Unixgram(_) => "unixgram",
579 };
580
581 Metrics {
582 metrics_received: builder.register_counter_with_tags(
583 "component_events_received_total",
584 [("message_type", "metrics"), ("listener_type", listener_type)],
585 ),
586 events_received: builder.register_counter_with_tags(
587 "component_events_received_total",
588 [("message_type", "events"), ("listener_type", listener_type)],
589 ),
590 service_checks_received: builder.register_counter_with_tags(
591 "component_events_received_total",
592 [("message_type", "service_checks"), ("listener_type", listener_type)],
593 ),
594 bytes_received: builder
595 .register_counter_with_tags("component_bytes_received_total", [("listener_type", listener_type)]),
596 bytes_received_size: builder
597 .register_trace_histogram_with_tags("component_bytes_received_size", [("listener_type", listener_type)]),
598 framing_errors: builder.register_counter_with_tags(
599 "component_errors_total",
600 [("listener_type", listener_type), ("error_type", "framing")],
601 ),
602 metric_decoder_errors: builder.register_counter_with_tags(
603 "component_errors_total",
604 [
605 ("listener_type", listener_type),
606 ("error_type", "decode"),
607 ("message_type", "metrics"),
608 ],
609 ),
610 event_decoder_errors: builder.register_counter_with_tags(
611 "component_errors_total",
612 [
613 ("listener_type", listener_type),
614 ("error_type", "decode"),
615 ("message_type", "events"),
616 ],
617 ),
618 service_check_decoder_errors: builder.register_counter_with_tags(
619 "component_errors_total",
620 [
621 ("listener_type", listener_type),
622 ("error_type", "decode"),
623 ("message_type", "service_checks"),
624 ],
625 ),
626 connections_active: builder
627 .register_gauge_with_tags("component_connections_active", [("listener_type", listener_type)]),
628 packet_receive_success: builder.register_debug_counter_with_tags(
629 "component_packets_received_total",
630 [("listener_type", listener_type), ("state", "ok")],
631 ),
632 packet_receive_failure: builder.register_debug_counter_with_tags(
633 "component_packets_received_total",
634 [("listener_type", listener_type), ("state", "error")],
635 ),
636 failed_context_resolve_total: builder.register_debug_counter("component_failed_context_resolve_total"),
637 }
638}
639
640#[async_trait]
641impl Source for DogStatsD {
642 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
643 let mut global_shutdown = context.take_shutdown_handle();
644 let mut health = context.take_health_handle();
645
646 let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
647
648 for listener in self.listeners {
650 let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
651
652 let listener_context = ListenerContext {
659 shutdown_handle: listener_shutdown_coordinator.register(),
660 listener,
661 io_buffer_pool: self.io_buffer_pool.clone(),
662 codec: self.codec.clone(),
663 context_resolvers: self.context_resolvers.clone(),
664 additional_tags: self.additional_tags.clone(),
665 };
666
667 spawn_traced_named(
668 task_name,
669 process_listener(context.clone(), listener_context, self.enabled_filter),
670 );
671 }
672
673 health.mark_ready();
674 debug!("DogStatsD source started.");
675
676 loop {
681 select! {
682 _ = &mut global_shutdown => {
683 debug!("Received shutdown signal.");
684 break
685 },
686 _ = health.live() => continue,
687 }
688 }
689
690 debug!("Stopping DogStatsD source...");
691
692 listener_shutdown_coordinator.shutdown().await;
693
694 debug!("DogStatsD source stopped.");
695
696 Ok(())
697 }
698}
699
700async fn process_listener(
701 source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
702) {
703 let ListenerContext {
704 shutdown_handle,
705 mut listener,
706 io_buffer_pool,
707 codec,
708 context_resolvers,
709 additional_tags,
710 } = listener_context;
711 tokio::pin!(shutdown_handle);
712
713 let listen_addr = listener.listen_address().clone();
714 let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
715
716 info!(%listen_addr, "DogStatsD listener started.");
717
718 loop {
719 select! {
720 _ = &mut shutdown_handle => {
721 debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
722 break;
723 }
724 result = listener.accept() => match result {
725 Ok(stream) => {
726 debug!(%listen_addr, "Spawning new stream handler.");
727
728 let handler_context = HandlerContext {
729 listen_addr: listen_addr.clone(),
730 framer: get_framer(&listen_addr),
731 codec: codec.clone(),
732 io_buffer_pool: io_buffer_pool.clone(),
733 metrics: build_metrics(&listen_addr, source_context.component_context()),
734 context_resolvers: context_resolvers.clone(),
735 additional_tags: additional_tags.clone(),
736 };
737
738 let task_name = format!("dogstatsd-stream-handler-{}", listen_addr.listener_type());
739 spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
740 }
741 Err(e) => {
742 error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
743 break
744 }
745 }
746 }
747 }
748
749 stream_shutdown_coordinator.shutdown().await;
750
751 info!(%listen_addr, "DogStatsD listener stopped.");
752}
753
754async fn process_stream(
755 stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
756 shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
757) {
758 tokio::pin!(shutdown_handle);
759
760 select! {
761 _ = &mut shutdown_handle => {
762 debug!("Stream handler received shutdown signal.");
763 },
764 _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
765 }
766}
767
768async fn drive_stream(
769 mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
770 enabled_filter: EnablePayloadsFilter,
771) {
772 let HandlerContext {
773 listen_addr,
774 mut framer,
775 codec,
776 io_buffer_pool,
777 metrics,
778 mut context_resolvers,
779 additional_tags,
780 } = handler_context;
781
782 debug!(%listen_addr, "Stream handler started.");
783
784 if !stream.is_connectionless() {
785 metrics.connections_active().increment(1);
786 }
787
788 let mut buffer_flush = interval(Duration::from_millis(100));
791 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
792
793 let mut event_buffer_manager = EventBufferManager::default();
794 let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
795 let memory_limiter = source_context.topology_context().memory_limiter();
796
797 'read: loop {
798 let mut eof = false;
799
800 let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
801
802 memory_limiter.wait_for_capacity().await;
803
804 select! {
805 read_result = stream.receive(&mut io_buffer) => match read_result {
807 Ok((bytes_read, peer_addr)) => {
808 if bytes_read == 0 {
809 eof = true;
810 }
811
812 metrics.packet_receive_success().increment(1);
822 metrics.bytes_received().increment(bytes_read as u64);
823 metrics.bytes_received_size().record(bytes_read as f64);
824
825 let reached_eof = eof || stream.is_connectionless();
831
832 trace!(
833 buffer_len = io_buffer.remaining(),
834 buffer_cap = io_buffer.remaining_mut(),
835 eof = reached_eof,
836 %listen_addr,
837 %peer_addr,
838 "Received {} bytes from stream.",
839 bytes_read
840 );
841
842 let mut frames = io_buffer.framed(&mut framer, reached_eof);
843 'frame: loop {
844 match frames.next() {
845 Some(Ok(frame)) => {
846 trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
847 match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
848 Ok(Some(event)) => {
849 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
850 debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
851 dispatch_events(event_buffer, &source_context, &listen_addr).await;
852 }
853 },
854 Ok(None) => {
855 continue
860 },
861 Err(e) => {
862 let frame_lossy_str = String::from_utf8_lossy(&frame);
863 warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
864 },
865 }
866 }
867 Some(Err(e)) => {
868 metrics.framing_errors().increment(1);
869
870 if stream.is_connectionless() {
871 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
874 continue 'read;
875 } else {
876 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
877 break 'read;
878 }
879 }
880 None => {
881 trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
882 if eof && !stream.is_connectionless() {
883 debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
884 break 'read;
885 } else {
886 break 'frame;
887 }
888 }
889 }
890 }
891 },
892 Err(e) => {
893 metrics.packet_receive_failure().increment(1);
894
895 if stream.is_connectionless() {
896 warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
899 continue 'read;
900 } else {
901 warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
902 break 'read;
903 }
904 }
905 },
906
907 _ = buffer_flush.tick() => {
908 if let Some(event_buffer) = event_buffer_manager.consume() {
909 dispatch_events(event_buffer, &source_context, &listen_addr).await;
910 }
911 },
912 }
913 }
914
915 if let Some(event_buffer) = event_buffer_manager.consume() {
916 dispatch_events(event_buffer, &source_context, &listen_addr).await;
917 }
918
919 metrics.connections_active().decrement(1);
920
921 debug!(%listen_addr, "Stream handler stopped.");
922}
923
924fn handle_frame(
925 frame: &[u8], codec: &DogstatsdCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
926 peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
927) -> Result<Option<Event>, ParseError> {
928 let parsed = match codec.decode_packet(frame) {
929 Ok(parsed) => parsed,
930 Err(e) => {
931 match parse_message_type(frame) {
933 MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
934 MessageType::Event => source_metrics.event_decode_failed().increment(1),
935 MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
936 }
937
938 return Err(e);
939 }
940 };
941
942 let event = match parsed {
943 ParsedPacket::Metric(metric_packet) => {
944 let events_len = metric_packet.num_points;
945 if !enabled_filter.allow_metric(&metric_packet) {
946 trace!(
947 metric.name = metric_packet.metric_name,
948 "Skipping metric due to filter configuration."
949 );
950 return Ok(None);
951 }
952
953 match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
954 Some(metric) => {
955 source_metrics.metrics_received().increment(events_len);
956 Event::Metric(metric)
957 }
958 None => {
959 source_metrics.failed_context_resolve_total().increment(1);
961 return Ok(None);
962 }
963 }
964 }
965 ParsedPacket::Event(event) => {
966 if !enabled_filter.allow_event(&event) {
967 trace!("Skipping event {} due to filter configuration.", event.title);
968 return Ok(None);
969 }
970 let tags_resolver = context_resolvers.tags();
971 match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
972 Some(event) => {
973 source_metrics.events_received().increment(1);
974 Event::EventD(event)
975 }
976 None => {
977 source_metrics.failed_context_resolve_total().increment(1);
978 return Ok(None);
979 }
980 }
981 }
982 ParsedPacket::ServiceCheck(service_check) => {
983 if !enabled_filter.allow_service_check(&service_check) {
984 trace!(
985 "Skipping service check {} due to filter configuration.",
986 service_check.name
987 );
988 return Ok(None);
989 }
990 let tags_resolver = context_resolvers.tags();
991 match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
992 Some(service_check) => {
993 source_metrics.service_checks_received().increment(1);
994 Event::ServiceCheck(service_check)
995 }
996 None => {
997 source_metrics.failed_context_resolve_total().increment(1);
998 return Ok(None);
999 }
1000 }
1001 }
1002 };
1003
1004 Ok(Some(event))
1005}
1006
1007fn handle_metric_packet(
1008 packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1009 additional_tags: &[String],
1010) -> Option<Metric> {
1011 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1012
1013 let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1014 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1015 origin.set_process_id(creds.pid as u32);
1016 }
1017
1018 let context_resolver = if packet.timestamp.is_some() {
1020 context_resolvers.no_agg()
1021 } else {
1022 context_resolvers.primary()
1023 };
1024
1025 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1026
1027 match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1029 Some(context) => {
1030 let metric_origin = well_known_tags
1031 .jmx_check_name
1032 .map(MetricOrigin::jmx_check)
1033 .unwrap_or_else(MetricOrigin::dogstatsd);
1034 let metadata = MetricMetadata::default()
1035 .with_origin(metric_origin)
1036 .with_hostname(well_known_tags.hostname.map(Arc::from));
1037
1038 Some(Metric::from_parts(context, packet.values, metadata))
1039 }
1040 None => None,
1042 }
1043}
1044
1045fn handle_event_packet(
1046 packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1047) -> Option<EventD> {
1048 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1049
1050 let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1051 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1052 origin.set_process_id(creds.pid as u32);
1053 }
1054 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1055
1056 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1057 let tags = tags_resolver.create_tag_set(tags)?;
1058
1059 let eventd = EventD::new(packet.title, packet.text)
1060 .with_timestamp(packet.timestamp)
1061 .with_hostname(packet.hostname.map(|s| s.into()))
1062 .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1063 .with_alert_type(packet.alert_type)
1064 .with_priority(packet.priority)
1065 .with_source_type_name(packet.source_type_name.map(|s| s.into()))
1066 .with_alert_type(packet.alert_type)
1067 .with_tags(tags)
1068 .with_origin_tags(origin_tags);
1069
1070 Some(eventd)
1071}
1072
1073fn handle_service_check_packet(
1074 packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1075 additional_tags: &[String],
1076) -> Option<ServiceCheck> {
1077 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1078
1079 let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1080 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1081 origin.set_process_id(creds.pid as u32);
1082 }
1083 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1084
1085 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1086 let tags = tags_resolver.create_tag_set(tags)?;
1087
1088 let service_check = ServiceCheck::new(packet.name, packet.status)
1089 .with_timestamp(packet.timestamp)
1090 .with_hostname(packet.hostname.map(|s| s.into()))
1091 .with_tags(tags)
1092 .with_origin_tags(origin_tags)
1093 .with_message(packet.message.map(|s| s.into()));
1094
1095 Some(service_check)
1096}
1097
1098fn get_filtered_tags_iterator<'a>(
1099 raw_tags: RawTags<'a>, additional_tags: &'a [String],
1100) -> impl Iterator<Item = &'a str> + Clone {
1101 RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1104}
1105
1106async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1107 debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1108
1109 if event_buffer.has_event_type(EventType::EventD) {
1119 let eventd_events = event_buffer.extract(Event::is_eventd);
1120 if let Err(e) = source_context
1121 .dispatcher()
1122 .buffered_named("events")
1123 .expect("events output should always exist")
1124 .send_all(eventd_events)
1125 .await
1126 {
1127 error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1128 }
1129 }
1130
1131 if event_buffer.has_event_type(EventType::ServiceCheck) {
1133 let service_check_events = event_buffer.extract(Event::is_service_check);
1134 if let Err(e) = source_context
1135 .dispatcher()
1136 .buffered_named("service_checks")
1137 .expect("service checks output should always exist")
1138 .send_all(service_check_events)
1139 .await
1140 {
1141 error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1142 }
1143 }
1144
1145 if !event_buffer.is_empty() {
1147 if let Err(e) = source_context
1148 .dispatcher()
1149 .dispatch_named("metrics", event_buffer)
1150 .await
1151 {
1152 error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1153 }
1154 }
1155}
1156
1157const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1158 buffer_size + 4
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177 use std::net::SocketAddr;
1178
1179 use saluki_context::{ContextResolverBuilder, TagsResolverBuilder};
1180 use saluki_io::{
1181 deser::codec::{dogstatsd::ParsedPacket, DogstatsdCodec, DogstatsdCodecConfiguration},
1182 net::ConnectionAddress,
1183 };
1184
1185 use super::{handle_metric_packet, ContextResolvers};
1186
1187 #[test]
1188 fn no_metrics_when_interner_full_allocations_disallowed() {
1189 let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1197 let tags_resolver = TagsResolverBuilder::for_tests().build();
1198 let context_resolver = ContextResolverBuilder::for_tests()
1199 .with_heap_allocations(false)
1200 .with_tags_resolver(Some(tags_resolver.clone()))
1201 .build();
1202 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1203 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1204
1205 let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1206
1207 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1208 panic!("Failed to parse packet.");
1209 };
1210
1211 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1212 assert!(maybe_metric.is_none());
1213 }
1214
1215 #[test]
1216 fn metric_with_additional_tags() {
1217 let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1218 let tags_resolver = TagsResolverBuilder::for_tests().build();
1219 let context_resolver = ContextResolverBuilder::for_tests()
1220 .with_heap_allocations(false)
1221 .with_tags_resolver(Some(tags_resolver.clone()))
1222 .build();
1223 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1224 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1225
1226 let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1227 let existing_tags_str = existing_tags.join(",");
1228
1229 let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1230 let additional_tags = [
1231 "tag4:value4".to_string(),
1232 "tag5:value5".to_string(),
1233 "tag6:value6".to_string(),
1234 ];
1235
1236 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1237 panic!("Failed to parse packet.");
1238 };
1239 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1240 assert!(maybe_metric.is_some());
1241
1242 let metric = maybe_metric.unwrap();
1243 let context = metric.context();
1244
1245 for tag in existing_tags {
1246 assert!(context.tags().has_tag(tag));
1247 }
1248
1249 for tag in additional_tags {
1250 assert!(context.tags().has_tag(tag));
1251 }
1252 }
1253}