1use std::{
10 collections::VecDeque,
11 num::NonZeroUsize,
12 path::PathBuf,
13 sync::{Arc, LazyLock},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15};
16
17use async_trait::async_trait;
18use bytes::{Buf, BufMut};
19use bytesize::ByteSize;
20use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
21use saluki_common::{
22 sync::shutdown::{ShutdownCoordinator, ShutdownHandle},
23 task::spawn_traced_named,
24};
25use saluki_config::{deserialize_space_separated_or_seq, GenericConfiguration};
26use saluki_context::{
27 origin::RawOrigin,
28 tags::{RawTags, RawTagsFilter},
29 TagsResolver,
30};
31use saluki_core::data_model::event::{
32 eventd::EventD,
33 metric::{Metric, MetricMetadata, MetricOrigin},
34 service_check::ServiceCheck,
35 Event, EventType,
36};
37use saluki_core::{
38 components::{sources::*, ComponentContext},
39 pooling::FixedSizeObjectPool,
40 topology::{interconnect::EventBufferManager, EventsBuffer, OutputDefinition},
41};
42use saluki_env::{workload::CaptureEntityResolver, WorkloadProvider};
43use saluki_error::{generic_error, ErrorContext as _, GenericError};
44use saluki_io::{
45 buf::{BytesBuffer, ClearableIoBuffer as _, FixedSizeVec},
46 deser::{
47 codec::dogstatsd::*,
48 framing::{Framer as _, FramingError, LengthDelimitedFramer},
49 },
50 net::{
51 listener::{Listener, ListenerError},
52 ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity, Stream,
53 },
54};
55use serde::{Deserialize, Deserializer};
56use serde_with::{serde_as, NoneAsEmptyString};
57use snafu::{ResultExt as _, Snafu};
58use stringtheory::MetaString;
59use tokio::{
60 pin, select,
61 time::{interval, MissedTickBehavior},
62};
63use tracing::{debug, error, info, trace, warn};
64
65mod forwarder;
66use self::forwarder::{PacketForwarder, PacketForwarderTarget};
67
68mod framer;
69use self::framer::{get_framer, DsdFramer};
70use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
71
72mod filters;
73use self::filters::EnablePayloadsFilter;
74
75mod io_buffer;
76use self::io_buffer::IoBufferManager;
77
78mod metrics;
79use self::metrics::{build_metrics, Metrics};
80
81mod replay;
82use self::replay::{CaptureRecord, CapturedTaggerHandle, TrafficCapture};
83pub use self::replay::{
84 DogStatsDCaptureAPIHandler, DogStatsDCaptureControl, DogStatsDReplayAPIHandler, DogStatsDReplayControl,
85 ReplaySession, TimestampResolution, TrafficCaptureReader, DEFAULT_REPLAY_LOOPS, REPLAY_CREDENTIALS_GID,
86};
87
88mod origin;
89use self::origin::{
90 mark_replay_process_id, origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet,
91 DogStatsDOriginTagResolver, OriginEnrichmentConfiguration,
92};
93
94mod resolver;
95use self::resolver::ContextResolvers;
96
97mod tags;
98
99#[derive(Debug, Snafu)]
100#[snafu(context(suffix(false)))]
101enum Error {
102 #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
103 FailedToCreateListener {
104 listener_type: &'static str,
105 source: ListenerError,
106 },
107
108 #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
109 NoListenersConfigured,
110
111 #[snafu(display("Could not resolve bind_host '{}': {}", host, source))]
112 UnresolvableBindHost { host: String, source: std::io::Error },
113
114 #[snafu(display("bind_host '{}' resolved to zero IP addresses.", host))]
115 BindHostHasNoAddresses { host: String },
116}
117
118const INTERNER_BASELINE_BYTES_PER_ENTRY: u64 = 512;
123
124const fn default_buffer_size() -> usize {
125 8192
126}
127
128const fn default_buffer_count() -> usize {
129 128
130}
131
132const fn default_port() -> u16 {
133 8125
134}
135
136const fn default_tcp_port() -> u16 {
137 0
138}
139
140const fn default_statsd_forward_port() -> u16 {
141 0
142}
143
144const fn default_socket_receive_buffer_size() -> usize {
145 0
146}
147
148const fn default_allow_context_heap_allocations() -> bool {
149 true
150}
151
152const fn default_no_aggregation_pipeline_support() -> bool {
153 true
154}
155
156const fn default_context_string_interner_entry_count() -> u64 {
157 4096
158}
159
160const fn default_cached_contexts_limit() -> usize {
161 500_000
162}
163
164const fn default_cached_tagsets_limit() -> usize {
165 500_000
166}
167
168const fn default_context_expiry_seconds() -> u64 {
169 20
170}
171
172const fn default_dogstatsd_permissive_decoding() -> bool {
173 true
174}
175
176const fn default_dogstatsd_minimum_sample_rate() -> f64 {
177 0.000000003845
178}
179
180const fn default_true() -> bool {
181 true
182}
183
184#[derive(Deserialize)]
186#[cfg_attr(test, derive(PartialEq, serde::Serialize))]
187pub struct EnablePayloadsConfiguration {
188 #[serde(default = "default_true")]
192 pub series: bool,
193
194 #[serde(default = "default_true")]
198 pub sketches: bool,
199
200 #[serde(default = "default_true")]
204 pub events: bool,
205
206 #[serde(default = "default_true")]
210 pub service_checks: bool,
211}
212
213impl Default for EnablePayloadsConfiguration {
214 fn default() -> Self {
215 Self {
216 series: true,
217 sketches: true,
218 events: true,
219 service_checks: true,
220 }
221 }
222}
223
224const MIN_CAPTURE_DEPTH: usize = 1024;
225
226const fn default_capture_depth() -> usize {
227 MIN_CAPTURE_DEPTH
228}
229
230const DOGSTATSD_CAPTURE_DIR: &str = "dsd_capture";
231
232fn deserialize_empty_metastring_as_none<'de, D>(deserializer: D) -> Result<Option<MetaString>, D::Error>
233where
234 D: Deserializer<'de>,
235{
236 let value = Option::<MetaString>::deserialize(deserializer)?;
237 Ok(value.filter(|host| !host.is_empty()))
238}
239
240#[serde_as]
244#[derive(Deserialize, Default)]
245#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
246#[cfg_attr(test, derive_where(PartialEq))]
247pub struct DogStatsDConfiguration {
248 #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
254 buffer_size: usize,
255
256 #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
264 buffer_count: usize,
265
266 #[serde(rename = "dogstatsd_port", default = "default_port")]
272 port: u16,
273
274 #[serde(rename = "dogstatsd_so_rcvbuf", default = "default_socket_receive_buffer_size")]
280 socket_receive_buffer_size: usize,
281
282 #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
288 tcp_port: u16,
289
290 #[serde(
297 rename = "statsd_forward_host",
298 default,
299 deserialize_with = "deserialize_empty_metastring_as_none"
300 )]
301 statsd_forward_host: Option<MetaString>,
302
303 #[serde(rename = "statsd_forward_port", default = "default_statsd_forward_port")]
309 statsd_forward_port: u16,
310
311 #[serde(rename = "dogstatsd_socket", default)]
317 #[serde_as(as = "NoneAsEmptyString")]
318 socket_path: Option<String>,
319
320 #[serde(rename = "dogstatsd_stream_socket", default)]
326 #[serde_as(as = "NoneAsEmptyString")]
327 socket_stream_path: Option<String>,
328
329 #[serde(rename = "dogstatsd_stream_log_too_big", default)]
338 stream_log_too_big: bool,
339
340 #[serde(rename = "dogstatsd_disable_verbose_logs", default)]
348 disable_verbose_logs: bool,
349
350 #[serde(
359 rename = "dogstatsd_eol_required",
360 default,
361 deserialize_with = "deserialize_space_separated_or_seq"
362 )]
363 eol_required: Vec<String>,
364
365 #[serde(rename = "bind_host", default)]
373 #[serde_as(as = "NoneAsEmptyString")]
374 bind_host: Option<String>,
375
376 #[serde(rename = "dogstatsd_non_local_traffic", default)]
383 non_local_traffic: bool,
384
385 #[serde(rename = "dogstatsd_autoscale_udp_listeners", default)]
400 autoscale_udp_listeners: bool,
401
402 #[serde(
412 rename = "dogstatsd_allow_context_heap_allocs",
413 default = "default_allow_context_heap_allocations"
414 )]
415 allow_context_heap_allocations: bool,
416
417 #[serde(
425 rename = "dogstatsd_no_aggregation_pipeline",
426 default = "default_no_aggregation_pipeline_support"
427 )]
428 no_aggregation_pipeline_support: bool,
429
430 #[serde(
438 rename = "dogstatsd_string_interner_size",
439 default = "default_context_string_interner_entry_count"
440 )]
441 context_string_interner_entry_count: u64,
442
443 #[serde(rename = "dogstatsd_string_interner_size_bytes", default)]
449 context_string_interner_size_bytes: Option<ByteSize>,
450
451 #[serde(
459 rename = "dogstatsd_cached_contexts_limit",
460 default = "default_cached_contexts_limit"
461 )]
462 cached_contexts_limit: usize,
463
464 #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
472 cached_tagsets_limit: usize,
473
474 #[serde(
480 rename = "dogstatsd_context_expiry_seconds",
481 default = "default_context_expiry_seconds"
482 )]
483 context_expiry_seconds: u64,
484
485 #[serde(
492 rename = "dogstatsd_permissive_decoding",
493 default = "default_dogstatsd_permissive_decoding"
494 )]
495 permissive_decoding: bool,
496
497 #[serde(
507 rename = "dogstatsd_minimum_sample_rate",
508 default = "default_dogstatsd_minimum_sample_rate"
509 )]
510 minimum_sample_rate: f64,
511
512 #[serde(rename = "enable_payloads", default)]
514 enable_payloads: EnablePayloadsConfiguration,
515
516 #[serde(flatten, default)]
518 origin_enrichment: OriginEnrichmentConfiguration,
519
520 #[serde(skip)]
522 #[cfg_attr(test, derive_where(skip))]
523 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
524
525 #[serde(skip, default)]
527 #[cfg_attr(test, derive_where(skip))]
528 capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
529
530 #[serde(rename = "dogstatsd_tags", default)]
532 additional_tags: Vec<String>,
533
534 #[serde(rename = "dogstatsd_capture_path", default)]
542 capture_path: PathBuf,
543
544 #[serde(rename = "dogstatsd_capture_depth", default = "default_capture_depth")]
552 capture_depth: usize,
553
554 #[serde(skip, default)]
555 #[cfg_attr(test, derive_where(skip))]
556 capture_control: DogStatsDCaptureControl,
557
558 #[serde(skip, default)]
559 #[cfg_attr(test, derive_where(skip))]
560 replay_control: DogStatsDReplayControl,
561
562 #[serde(default)]
569 provider_kind: String,
570}
571
572#[derive(Clone, Copy, Default)]
573struct EolRequired {
574 udp: bool,
575 uds: bool,
576}
577
578impl EolRequired {
579 fn from_config_values(values: &[String]) -> Self {
580 let mut eol_required = Self::default();
581
582 for value in values {
583 match value.as_str() {
584 "udp" => eol_required.udp = true,
585 "uds" => eol_required.uds = true,
586 "named_pipe" => {}
587 _ => warn!(
588 value,
589 "Invalid dogstatsd_eol_required value. Expected 'udp', 'uds', or 'named_pipe'."
590 ),
591 }
592 }
593
594 eol_required
595 }
596
597 fn for_listener(&self, listen_addr: &ListenAddress) -> bool {
598 match listen_addr {
599 ListenAddress::Udp(_) => self.udp,
600 ListenAddress::Tcp(_) => false,
601 ListenAddress::Unixgram(_) | ListenAddress::Unix(_) => self.uds,
602 }
603 }
604}
605
606async fn resolve_bind_host(host: &str) -> Result<std::net::IpAddr, Error> {
612 let mut addrs = tokio::net::lookup_host((host, 0u16))
613 .await
614 .context(UnresolvableBindHost { host: host.to_string() })?;
615 addrs
616 .next()
617 .map(|sa| sa.ip())
618 .ok_or_else(|| Error::BindHostHasNoAddresses { host: host.to_string() })
619}
620
621impl DogStatsDConfiguration {
622 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
624 let mut dogstatsd_config: Self = config.as_typed()?;
625 dogstatsd_config.fix_empty_capture_path(config);
626 dogstatsd_config.fix_capture_depth();
627 Ok(dogstatsd_config)
628 }
629
630 fn additional_tags(&self) -> Vec<String> {
632 if self.provider_kind.is_empty() {
633 return self.additional_tags.clone();
634 }
635
636 let mut tags = self.additional_tags.clone();
637 tags.push(format!("provider_kind:{}", self.provider_kind.clone()));
638 tags
639 }
640
641 fn fix_capture_depth(&mut self) {
642 self.capture_depth = self.capture_depth.max(MIN_CAPTURE_DEPTH);
643 }
644
645 fn effective_context_string_interner_bytes(&self) -> ByteSize {
651 match self.context_string_interner_size_bytes {
652 Some(explicit_bytes) => explicit_bytes,
653 None => ByteSize::b(self.context_string_interner_entry_count * INTERNER_BASELINE_BYTES_PER_ENTRY),
654 }
655 }
656
657 fn eol_required(&self) -> EolRequired {
658 EolRequired::from_config_values(&self.eol_required)
659 }
660
661 fn statsd_forward_target(&self) -> Option<(&MetaString, u16)> {
662 let host = self.statsd_forward_host.as_ref()?;
663 if self.statsd_forward_port == 0 {
664 return None;
665 }
666
667 Some((host, self.statsd_forward_port))
668 }
669
670 fn packet_forwarder_target(&self) -> Option<PacketForwarderTarget> {
671 let (host, port) = self.statsd_forward_target()?;
672 Some(PacketForwarderTarget::new(host.clone(), port))
673 }
674
675 fn udp_streams_to_yield(&self) -> Option<NonZeroUsize> {
681 if !self.autoscale_udp_listeners {
682 return None;
683 }
684
685 #[cfg(not(target_os = "linux"))]
686 if self.autoscale_udp_listeners {
687 warn!("UDP stream handler autoscaling not supported on non-Linux platforms. Default to single stream handler.");
688 return None;
689 }
690
691 let vcpus = std::thread::available_parallelism().map(NonZeroUsize::get).unwrap_or(1);
692 let streams = (1 + vcpus / 8).min(4);
693 NonZeroUsize::new(streams)
694 }
695
696 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
702 where
703 W: WorkloadProvider + Send + Sync + 'static,
704 {
705 self.workload_provider = Some(Arc::new(workload_provider));
706 self
707 }
708
709 pub fn with_capture_entity_resolver<R>(mut self, capture_entity_resolver: R) -> Self
716 where
717 R: CaptureEntityResolver + Send + Sync + 'static,
718 {
719 self.capture_entity_resolver = Some(Arc::new(capture_entity_resolver));
720 self
721 }
722
723 pub fn capture_control(&self) -> DogStatsDCaptureControl {
725 self.capture_control.clone()
726 }
727
728 pub fn capture_api_handler(&self) -> DogStatsDCaptureAPIHandler {
730 DogStatsDCaptureAPIHandler::new(self.capture_control.clone())
731 }
732
733 pub fn replay_control(&self) -> DogStatsDReplayControl {
735 self.replay_control.clone()
736 }
737
738 pub fn replay_api_handler(&self) -> DogStatsDReplayAPIHandler {
740 DogStatsDReplayAPIHandler::new(self.replay_control.clone())
741 }
742
743 fn fix_empty_capture_path(&mut self, config: &GenericConfiguration) {
744 if self.capture_path.parent().is_some() {
745 return;
746 }
747
748 let capture_path = match config.try_get_typed::<PathBuf>("run_path") {
749 Ok(Some(mut run_path)) => {
750 run_path.push(DOGSTATSD_CAPTURE_DIR);
751 run_path
752 }
753 Ok(None) => {
754 debug!(
755 "`dogstatsd_capture_path` and `run_path` were empty. Default DogStatsD capture path is unavailable."
756 );
757 return;
758 }
759 Err(e) => {
760 debug!(
761 error = %e,
762 "Failed to read `run_path` from configuration. Default DogStatsD capture path is unavailable."
763 );
764 return;
765 }
766 };
767
768 self.capture_path = capture_path;
769 }
770
771 fn build_addresses(&self, bind_host: Option<std::net::IpAddr>) -> Vec<ListenAddress> {
781 let bind_ip: std::net::IpAddr = if self.non_local_traffic {
782 [0, 0, 0, 0].into()
783 } else {
784 bind_host.unwrap_or_else(|| [127, 0, 0, 1].into())
785 };
786
787 let mut addresses: Vec<ListenAddress> = Vec::new();
788
789 if self.port != 0 {
790 addresses.push(ListenAddress::Udp(std::net::SocketAddr::new(bind_ip, self.port)));
791 }
792
793 if self.tcp_port != 0 {
794 addresses.push(ListenAddress::Tcp(std::net::SocketAddr::new(bind_ip, self.tcp_port)));
795 }
796
797 if let Some(socket_path) = &self.socket_path {
798 addresses.push(ListenAddress::Unixgram(socket_path.into()));
799 }
800
801 if let Some(socket_stream_path) = &self.socket_stream_path {
802 addresses.push(ListenAddress::Unix(socket_stream_path.into()));
803 }
804
805 addresses
806 }
807
808 async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
810 let bind_host: Option<std::net::IpAddr> = if self.non_local_traffic {
814 None
815 } else {
816 match &self.bind_host {
817 Some(host) => Some(resolve_bind_host(host).await?),
818 None => None,
819 }
820 };
821
822 let addresses = self.build_addresses(bind_host);
823 let mut listeners = Vec::new();
824 let socket_receive_buffer_size =
825 (self.socket_receive_buffer_size != 0).then_some(self.socket_receive_buffer_size);
826 let udp_streams_to_yield = self.udp_streams_to_yield();
827 for address in addresses {
828 let listener_type = address.listener_type();
829 let listener_streams = matches!(address, ListenAddress::Udp(_))
830 .then_some(udp_streams_to_yield)
831 .flatten();
832 let listener = Listener::from_listen_address(address, listener_streams)
833 .await
834 .context(FailedToCreateListener { listener_type })?
835 .with_receive_buffer_size(socket_receive_buffer_size);
836
837 listeners.push(listener);
838 }
839 Ok(listeners)
840 }
841}
842
843#[async_trait]
844impl SourceBuilder for DogStatsDConfiguration {
845 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
846 let listeners = self.build_listeners().await?;
847 if listeners.is_empty() {
848 return Err(Error::NoListenersConfigured.into());
849 }
850
851 let min_buffers: usize = listeners.iter().map(Listener::min_buffer_reservation).sum();
855 if self.buffer_count < min_buffers {
856 return Err(generic_error!(
857 "Must have a minimum of {} I/O buffers to service all configured listeners (have {}).",
858 min_buffers,
859 self.buffer_count,
860 ));
861 }
862
863 let origin_detection_enabled = self.origin_enrichment.enabled();
864 let captured_tagger = CapturedTaggerHandle::new();
867
868 let maybe_origin_tags_resolver = self.workload_provider.clone().map(|provider| {
869 DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider, captured_tagger.clone())
870 });
871 let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
872 .error_context("Failed to create context resolvers.")?;
873
874 let codec_config = DogStatsDCodecConfiguration::default()
875 .with_timestamps(self.no_aggregation_pipeline_support)
876 .with_permissive_mode(self.permissive_decoding)
877 .with_minimum_sample_rate(self.minimum_sample_rate)
878 .with_client_origin_detection(self.origin_enrichment.origin_detection_client);
879
880 let codec = DogStatsDCodec::from_configuration(codec_config);
881 let eol_required = self.eol_required();
882
883 let enable_payloads_filter = EnablePayloadsFilter::default()
884 .with_allow_series(self.enable_payloads.series)
885 .with_allow_sketches(self.enable_payloads.sketches)
886 .with_allow_events(self.enable_payloads.events)
887 .with_allow_service_checks(self.enable_payloads.service_checks);
888 let traffic_capture = TrafficCapture::with_workload_provider(
889 self.capture_path.clone(),
890 self.capture_depth.max(MIN_CAPTURE_DEPTH),
891 self.workload_provider.clone(),
892 );
893 self.capture_control.bind(traffic_capture.clone());
894 let packet_forwarder_target = self.packet_forwarder_target();
895
896 self.replay_control.bind(captured_tagger);
897
898 Ok(Box::new(DogStatsD {
899 listeners,
900 io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
901 FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
902 }),
903 codec,
904 context_resolvers,
905 enabled_filter: enable_payloads_filter,
906 origin_detection_enabled,
907 stream_log_too_big: self.stream_log_too_big,
908 disable_verbose_logs: self.disable_verbose_logs,
909 eol_required,
910 additional_tags: self.additional_tags().into(),
911 capture_entity_resolver: self.capture_entity_resolver.clone(),
912 traffic_capture,
913 packet_forwarder_target,
914 }))
915 }
916
917 fn outputs(&self) -> &[OutputDefinition<EventType>] {
918 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
919 vec![
920 OutputDefinition::named_output("metrics", EventType::Metric),
921 OutputDefinition::named_output("events", EventType::EventD),
922 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
923 ]
924 });
925 &OUTPUTS
926 }
927}
928
929impl MemoryBounds for DogStatsDConfiguration {
930 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
931 builder
932 .minimum()
933 .with_single_value::<DogStatsD>("source struct")
935 .with_expr(UsageExpr::product(
937 "buffers",
938 UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
939 UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
940 ))
941 .with_expr(UsageExpr::config(
944 "dogstatsd_string_interner_size_bytes",
945 self.effective_context_string_interner_bytes().as_u64() as usize,
946 ));
947 }
948}
949
950pub struct DogStatsD {
952 listeners: Vec<Listener>,
953 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
954 codec: DogStatsDCodec,
955 context_resolvers: ContextResolvers,
956 enabled_filter: EnablePayloadsFilter,
957 origin_detection_enabled: bool,
958 stream_log_too_big: bool,
959 disable_verbose_logs: bool,
960 eol_required: EolRequired,
961 additional_tags: Arc<[String]>,
962 capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
963 traffic_capture: TrafficCapture,
964 packet_forwarder_target: Option<PacketForwarderTarget>,
965}
966
967struct ListenerContext {
968 shutdown_handle: ShutdownHandle,
969 listener: Listener,
970 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
971 codec: DogStatsDCodec,
972 context_resolvers: ContextResolvers,
973 origin_detection_enabled: bool,
974 stream_log_too_big: bool,
975 disable_verbose_logs: bool,
976 eol_required: EolRequired,
977 additional_tags: Arc<[String]>,
978 capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
979 traffic_capture: TrafficCapture,
980 packet_forwarder_target: Option<PacketForwarderTarget>,
981}
982
983struct HandlerContext {
984 listen_addr: ListenAddress,
985 framer: DsdFramer,
986 codec: DogStatsDCodec,
987 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
988 metrics: Metrics,
989 context_resolvers: ContextResolvers,
990 origin_detection_enabled: bool,
991 stream_log_too_big: bool,
992 disable_verbose_logs: bool,
993 additional_tags: Arc<[String]>,
994 capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
995 traffic_capture: TrafficCapture,
996 packet_forwarder: Option<PacketForwarder>,
997}
998
999#[async_trait]
1000impl Source for DogStatsD {
1001 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
1002 let global_shutdown = context.take_shutdown_handle();
1003 pin!(global_shutdown);
1004
1005 let mut health = context.take_health_handle();
1006
1007 let mut listener_shutdown_coordinator = ShutdownCoordinator::default();
1008
1009 for listener in self.listeners {
1011 let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
1012
1013 let listener_context = ListenerContext {
1020 shutdown_handle: listener_shutdown_coordinator.register(),
1021 listener,
1022 io_buffer_pool: self.io_buffer_pool.clone(),
1023 codec: self.codec.clone(),
1024 context_resolvers: self.context_resolvers.clone(),
1025 origin_detection_enabled: self.origin_detection_enabled,
1026 stream_log_too_big: self.stream_log_too_big,
1027 disable_verbose_logs: self.disable_verbose_logs,
1028 eol_required: self.eol_required,
1029 additional_tags: self.additional_tags.clone(),
1030 capture_entity_resolver: self.capture_entity_resolver.clone(),
1031 traffic_capture: self.traffic_capture.clone(),
1032 packet_forwarder_target: self.packet_forwarder_target.clone(),
1033 };
1034
1035 spawn_traced_named(
1036 task_name,
1037 process_listener(context.clone(), listener_context, self.enabled_filter),
1038 );
1039 }
1040
1041 health.mark_ready();
1042 debug!("DogStatsD source started.");
1043
1044 loop {
1049 select! {
1050 _ = &mut global_shutdown => {
1051 debug!("Received shutdown signal.");
1052 break
1053 },
1054 _ = health.live() => continue,
1055 }
1056 }
1057
1058 debug!("Stopping DogStatsD source...");
1059
1060 listener_shutdown_coordinator.shutdown_and_wait().await;
1061
1062 debug!("DogStatsD source stopped.");
1063
1064 Ok(())
1065 }
1066}
1067
1068async fn process_listener(
1069 source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
1070) {
1071 let ListenerContext {
1072 shutdown_handle,
1073 mut listener,
1074 io_buffer_pool,
1075 codec,
1076 context_resolvers,
1077 origin_detection_enabled,
1078 stream_log_too_big,
1079 disable_verbose_logs,
1080 eol_required,
1081 additional_tags,
1082 capture_entity_resolver,
1083 traffic_capture,
1084 packet_forwarder_target,
1085 } = listener_context;
1086
1087 pin!(shutdown_handle);
1088
1089 let listen_addr = listener.listen_address().clone();
1090 let metrics = build_metrics(&listen_addr, source_context.component_context());
1091 let packet_forwarder = packet_forwarder_target
1092 .as_ref()
1093 .map(|target| target.to_forwarder(metrics.clone()));
1094 if let Some(packet_forwarder) = &packet_forwarder {
1095 packet_forwarder.spawn_connect();
1096 }
1097
1098 let mut stream_shutdown_coordinator = ShutdownCoordinator::default();
1099 let mut stream_idx: u32 = 0;
1100
1101 info!(%listen_addr, "DogStatsD listener started.");
1102
1103 loop {
1104 select! {
1105 _ = &mut shutdown_handle => {
1106 debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
1107 break;
1108 }
1109 result = listener.accept() => match result {
1110 Ok(stream) => {
1111 debug!(%listen_addr, "Spawning new stream handler.");
1112
1113 let handler_context = HandlerContext {
1114 listen_addr: listen_addr.clone(),
1115 framer: get_framer(&listen_addr, eol_required.for_listener(&listen_addr)),
1116 codec: codec.clone(),
1117 io_buffer_pool: io_buffer_pool.clone(),
1118 metrics: metrics.clone(),
1119 context_resolvers: context_resolvers.clone(),
1120 origin_detection_enabled,
1121 stream_log_too_big,
1122 disable_verbose_logs,
1123 additional_tags: additional_tags.clone(),
1124 capture_entity_resolver: capture_entity_resolver.clone(),
1125 traffic_capture: traffic_capture.clone(),
1126 packet_forwarder: packet_forwarder.clone(),
1127 };
1128
1129 let task_name = format!(
1130 "dogstatsd-stream-handler-{}-{}",
1131 listen_addr.listener_type(),
1132 stream_idx,
1133 );
1134 stream_idx = stream_idx.wrapping_add(1);
1135 spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
1136 }
1137 Err(e) => {
1138 error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
1139 break
1140 }
1141 }
1142 }
1143 }
1144
1145 stream_shutdown_coordinator.shutdown_and_wait().await;
1146
1147 info!(%listen_addr, "DogStatsD listener stopped.");
1148}
1149
1150async fn process_stream(
1151 stream: Stream, source_context: SourceContext, handler_context: HandlerContext, shutdown_handle: ShutdownHandle,
1152 enabled_filter: EnablePayloadsFilter,
1153) {
1154 select! {
1155 _ = shutdown_handle => {
1156 debug!("Stream handler received shutdown signal.");
1157 },
1158 _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
1159 }
1160}
1161
1162async fn drive_stream(
1163 mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
1164 enabled_filter: EnablePayloadsFilter,
1165) {
1166 let HandlerContext {
1167 listen_addr,
1168 mut framer,
1169 codec,
1170 io_buffer_pool,
1171 metrics,
1172 mut context_resolvers,
1173 origin_detection_enabled,
1174 stream_log_too_big,
1175 disable_verbose_logs,
1176 additional_tags,
1177 capture_entity_resolver,
1178 traffic_capture,
1179 packet_forwarder,
1180 } = handler_context;
1181
1182 debug!(%listen_addr, "Stream handler started.");
1183
1184 if !stream.is_connectionless() {
1185 metrics.connections_active().increment(1);
1186 }
1187
1188 let mut stream_capture = StreamCaptureState::new();
1189 let mut buffer_flush = interval(Duration::from_millis(100));
1192 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
1193
1194 let mut event_buffer_manager = EventBufferManager::default();
1195 let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
1196 let memory_limiter = source_context.topology_context().memory_limiter();
1197
1198 'read: loop {
1199 let mut eof = false;
1200
1201 let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
1202
1203 memory_limiter.wait_for_capacity().await;
1204
1205 select! {
1206 read_result = stream.receive(&mut io_buffer) => match read_result {
1208 Ok((bytes_read, peer_addr)) => {
1209 if bytes_read == 0 {
1210 eof = true;
1211 }
1212
1213 let is_connectionless = stream.is_connectionless();
1214 let payload = received_payload(io_buffer, bytes_read);
1215
1216 capture_uds_traffic(
1217 &listen_addr,
1218 &traffic_capture,
1219 capture_entity_resolver.as_deref(),
1220 &peer_addr,
1221 payload,
1222 &mut stream_capture,
1223 );
1224
1225 if is_connectionless {
1226 metrics.packet_receive_success().increment(1);
1227 }
1228 metrics.bytes_received().increment(bytes_read as u64);
1229 metrics.bytes_received_size().record(bytes_read as f64);
1230 let origin_detection_failed =
1231 origin_detection_enabled && bytes_read > 0 && peer_addr.has_process_credential_error();
1232 if origin_detection_failed && is_connectionless {
1233 metrics.origin_detection_errors().increment(1);
1234 }
1235
1236 let reached_eof = eof || is_connectionless;
1242
1243 trace!(
1244 buffer_len = io_buffer.remaining(),
1245 buffer_cap = io_buffer.remaining_mut(),
1246 eof = reached_eof,
1247 %listen_addr,
1248 %peer_addr,
1249 "Received {} bytes from stream.",
1250 bytes_read
1251 );
1252
1253 'frame: loop {
1254 let frame_result = framer.next_frame(io_buffer, reached_eof);
1255 let completed_outer_frames = framer.take_completed_outer_frames();
1256 if !is_connectionless && completed_outer_frames > 0 {
1257 metrics.packet_receive_success().increment(completed_outer_frames as u64);
1258 }
1259 if origin_detection_failed && completed_outer_frames > 0 {
1260 metrics.origin_detection_errors().increment(completed_outer_frames as u64);
1261 }
1262
1263 match frame_result {
1264 Ok(Some(frame)) => {
1265 trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
1266 if let Some(forwarder) = &packet_forwarder {
1267 forwarder.forward(frame.clone()).await;
1268 }
1269 match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
1270 Ok(Some(event)) => {
1271 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
1272 debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
1273 dispatch_events(event_buffer, &source_context, &listen_addr).await;
1274 }
1275 },
1276 Ok(None) => {
1277 continue
1282 },
1283 Err(e) => {
1284 log_parse_failure(disable_verbose_logs, &listen_addr, &peer_addr, &frame, &e);
1285 },
1286 }
1287 }
1288 Err(e) => {
1289 metrics.framing_errors().increment(1);
1290 if should_warn_stream_log_too_big(&listen_addr, &e, stream_log_too_big) {
1291 warn!(
1292 %listen_addr,
1293 %peer_addr,
1294 error = %e,
1295 "DogStatsD stream frame exceeded the configured buffer size."
1296 );
1297 }
1298
1299 if stream.is_connectionless() {
1300 io_buffer.clear();
1301 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
1304 continue 'read;
1305 } else {
1306 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
1307 break 'read;
1308 }
1309 }
1310 Ok(None) => {
1311 trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
1312 if eof && !stream.is_connectionless() {
1313 debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
1314 break 'read;
1315 } else {
1316 break 'frame;
1317 }
1318 }
1319 }
1320 }
1321 },
1322 Err(e) => {
1323 metrics.packet_receive_failure().increment(1);
1324
1325 if stream.is_connectionless() {
1326 warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
1329 continue 'read;
1330 } else {
1331 warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
1332 break 'read;
1333 }
1334 }
1335 },
1336
1337 _ = buffer_flush.tick() => {
1338 if let Some(event_buffer) = event_buffer_manager.consume() {
1339 dispatch_events(event_buffer, &source_context, &listen_addr).await;
1340 }
1341 },
1342
1343 }
1344 }
1345
1346 if let Some(event_buffer) = event_buffer_manager.consume() {
1347 dispatch_events(event_buffer, &source_context, &listen_addr).await;
1348 }
1349
1350 metrics.connections_active().decrement(1);
1351
1352 debug!(%listen_addr, "Stream handler stopped.");
1353}
1354
1355fn should_warn_stream_log_too_big(listen_addr: &ListenAddress, error: &FramingError, stream_log_too_big: bool) -> bool {
1356 stream_log_too_big
1357 && matches!(listen_addr, ListenAddress::Unix(_))
1358 && matches!(error, FramingError::InvalidFrame { .. })
1359}
1360
1361fn log_parse_failure(
1362 disable_verbose_logs: bool, listen_addr: &ListenAddress, peer_addr: &ConnectionAddress, frame: &[u8],
1363 error: &ParseError,
1364) {
1365 let frame = String::from_utf8_lossy(frame);
1366 if disable_verbose_logs {
1367 debug!(%listen_addr, %peer_addr, %frame, %error, "Failed to parse frame.");
1368 } else {
1369 warn!(%listen_addr, %peer_addr, %frame, %error, "Failed to parse frame.");
1370 }
1371}
1372
1373fn capture_uds_traffic(
1374 listen_addr: &ListenAddress, traffic_capture: &TrafficCapture,
1375 capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, peer_addr: &ConnectionAddress,
1376 payload: &[u8], stream_capture: &mut StreamCaptureState,
1377) {
1378 if payload.is_empty() || !traffic_capture.is_ongoing() {
1379 return;
1380 }
1381
1382 match listen_addr {
1383 ListenAddress::Unixgram(_) => {
1384 let _ = traffic_capture.enqueue(build_capture_record(
1385 capture_entity_resolver,
1386 process_id_from_peer_addr(peer_addr),
1387 payload,
1388 ));
1389 }
1390 ListenAddress::Unix(_) => {
1391 stream_capture.update_peer_metadata(peer_addr);
1392 stream_capture.pending.extend(payload);
1393
1394 while let Ok(Some(outer_payload)) = stream_capture
1395 .outer_framer
1396 .next_frame(&mut stream_capture.pending, false)
1397 {
1398 let _ = traffic_capture.enqueue(build_capture_record(
1399 capture_entity_resolver,
1400 stream_capture.last_pid,
1401 &outer_payload,
1402 ));
1403 }
1404 }
1405 _ => {}
1406 }
1407}
1408
1409struct StreamCaptureState {
1410 outer_framer: LengthDelimitedFramer,
1411 pending: VecDeque<u8>,
1412 last_pid: Option<i32>,
1413}
1414
1415impl StreamCaptureState {
1416 fn new() -> Self {
1417 Self {
1418 outer_framer: LengthDelimitedFramer,
1419 pending: VecDeque::new(),
1420 last_pid: None,
1421 }
1422 }
1423
1424 fn update_peer_metadata(&mut self, peer_addr: &ConnectionAddress) {
1425 if let Some(process_id) = process_id_from_peer_addr(peer_addr) {
1426 self.last_pid = Some(process_id);
1427 }
1428 }
1429}
1430
1431fn build_capture_record(
1432 capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, process_id: Option<i32>,
1433 payload: &[u8],
1434) -> CaptureRecord {
1435 CaptureRecord {
1436 timestamp_ns: capture_timestamp_ns(),
1437 payload: payload.to_vec(),
1438 pid: process_id,
1439 ancillary: Vec::new(),
1440 container_id: resolve_capture_container_id(capture_entity_resolver, process_id),
1441 }
1442}
1443
1444fn resolve_capture_container_id(
1445 capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, process_id: Option<i32>,
1446) -> Option<String> {
1447 let process_id = u32::try_from(process_id?).ok()?;
1448 capture_entity_resolver
1449 .and_then(|resolver| resolver.resolve_container_entity_for_live_pid(process_id))
1450 .map(|entity_id| entity_id.to_string())
1451}
1452
1453fn process_id_from_peer_addr(peer_addr: &ConnectionAddress) -> Option<i32> {
1454 match peer_addr {
1455 ConnectionAddress::ProcessLike(ProcessIdentity::Credentials(creds)) => Some(creds.pid),
1456 _ => None,
1457 }
1458}
1459
1460fn apply_credentials_to_origin(origin: &mut RawOrigin<'_>, creds: &ProcessCredentials) {
1466 if creds.gid == REPLAY_CREDENTIALS_GID {
1467 origin.set_process_id(mark_replay_process_id(creds.uid));
1468 } else {
1469 origin.set_process_id(creds.pid as u32);
1470 }
1471}
1472
1473fn received_payload(buffer: &BytesBuffer, bytes_read: usize) -> &[u8] {
1474 let chunk = buffer.chunk();
1475 let start = chunk.len().saturating_sub(bytes_read);
1476 &chunk[start..]
1477}
1478
1479fn capture_timestamp_ns() -> i64 {
1480 SystemTime::now()
1481 .duration_since(UNIX_EPOCH)
1482 .map(|duration| duration.as_nanos().min(i64::MAX as u128) as i64)
1483 .unwrap_or_default()
1484}
1485
1486fn handle_frame(
1487 frame: &[u8], codec: &DogStatsDCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
1488 peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
1489) -> Result<Option<Event>, ParseError> {
1490 let parsed = match codec.decode_packet(frame) {
1491 Ok(parsed) => parsed,
1492 Err(e) => {
1493 match parse_message_type(frame) {
1495 MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
1496 MessageType::Event => source_metrics.event_decode_failed().increment(1),
1497 MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
1498 }
1499
1500 return Err(e);
1501 }
1502 };
1503
1504 let event = match parsed {
1505 ParsedPacket::Metric(metric_packet) => {
1506 if metric_packet.num_points == 0 {
1507 return Ok(None);
1508 }
1509 let events_len = metric_packet.num_points;
1510 if !enabled_filter.allow_metric(&metric_packet) {
1511 trace!(
1512 metric.name = metric_packet.metric_name,
1513 "Skipping metric due to filter configuration."
1514 );
1515 return Ok(None);
1516 }
1517
1518 match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
1519 Some(metric) => {
1520 source_metrics.metrics_received().increment(events_len);
1521 Event::Metric(metric)
1522 }
1523 None => {
1524 source_metrics.failed_context_resolve_total().increment(1);
1526 return Ok(None);
1527 }
1528 }
1529 }
1530 ParsedPacket::Event(event) => {
1531 if !enabled_filter.allow_event(&event) {
1532 trace!("Skipping event {} due to filter configuration.", event.title);
1533 return Ok(None);
1534 }
1535 let tags_resolver = context_resolvers.tags();
1536 match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
1537 Some(event) => {
1538 source_metrics.events_received().increment(1);
1539 Event::EventD(event)
1540 }
1541 None => {
1542 source_metrics.failed_context_resolve_total().increment(1);
1543 return Ok(None);
1544 }
1545 }
1546 }
1547 ParsedPacket::ServiceCheck(service_check) => {
1548 if !enabled_filter.allow_service_check(&service_check) {
1549 trace!(
1550 "Skipping service check {} due to filter configuration.",
1551 service_check.name
1552 );
1553 return Ok(None);
1554 }
1555 let tags_resolver = context_resolvers.tags();
1556 match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
1557 Some(service_check) => {
1558 source_metrics.service_checks_received().increment(1);
1559 Event::ServiceCheck(service_check)
1560 }
1561 None => {
1562 source_metrics.failed_context_resolve_total().increment(1);
1563 return Ok(None);
1564 }
1565 }
1566 }
1567 };
1568
1569 Ok(Some(event))
1570}
1571
1572fn handle_metric_packet(
1573 packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1574 additional_tags: &[String],
1575) -> Option<Metric> {
1576 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1577
1578 let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1579 if let Some(creds) = peer_addr.process_credentials() {
1580 apply_credentials_to_origin(&mut origin, creds);
1581 }
1582
1583 let context_resolver = if packet.timestamp.is_some() {
1585 context_resolvers.no_agg()
1586 } else {
1587 context_resolvers.primary()
1588 };
1589
1590 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1591
1592 match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1594 Some(context) => {
1595 let metric_origin = well_known_tags
1596 .jmx_check_name
1597 .map(MetricOrigin::jmx_check)
1598 .unwrap_or_else(MetricOrigin::dogstatsd);
1599 let metadata = MetricMetadata::default()
1600 .with_origin(metric_origin)
1601 .with_hostname(well_known_tags.hostname.map(Arc::from))
1602 .with_unit(packet.unit.map_or_else(MetaString::empty, MetaString::from_static));
1603
1604 Some(Metric::from_parts(context, packet.values, metadata))
1605 }
1606 None => None,
1608 }
1609}
1610
1611fn handle_event_packet(
1612 packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1613) -> Option<EventD> {
1614 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1615
1616 let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1617 if let Some(creds) = peer_addr.process_credentials() {
1618 apply_credentials_to_origin(&mut origin, creds);
1619 }
1620 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1621
1622 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1623 let tags = tags_resolver.create_tag_set(tags)?;
1624
1625 let timestamp = packet
1629 .timestamp
1630 .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1631
1632 let eventd = EventD::new(packet.title, packet.text)
1633 .with_timestamp(timestamp)
1634 .with_hostname(packet.hostname.map(|s| s.into()))
1635 .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1636 .with_alert_type(packet.alert_type)
1637 .with_priority(packet.priority)
1638 .with_source_type_name(Some(
1643 packet
1644 .source_type_name
1645 .map(|s| s.into())
1646 .unwrap_or_else(|| "api".into()),
1647 ))
1648 .with_alert_type(packet.alert_type)
1649 .with_tags(tags)
1650 .with_origin_tags(origin_tags);
1651
1652 Some(eventd)
1653}
1654
1655fn handle_service_check_packet(
1656 packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1657 additional_tags: &[String],
1658) -> Option<ServiceCheck> {
1659 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1660
1661 let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1662 if let Some(creds) = peer_addr.process_credentials() {
1663 apply_credentials_to_origin(&mut origin, creds);
1664 }
1665 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1666
1667 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1668 let tags = tags_resolver.create_tag_set(tags)?;
1669
1670 let timestamp = packet
1674 .timestamp
1675 .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1676
1677 let service_check = ServiceCheck::new(packet.name, packet.status)
1678 .with_timestamp(timestamp)
1679 .with_hostname(packet.hostname.map(|s| s.into()))
1680 .with_tags(tags)
1681 .with_origin_tags(origin_tags)
1682 .with_message(packet.message.map(|s| s.into()));
1683
1684 Some(service_check)
1685}
1686
1687fn get_filtered_tags_iterator<'a>(
1688 raw_tags: RawTags<'a>, additional_tags: &'a [String],
1689) -> impl Iterator<Item = &'a str> + Clone {
1690 RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1693}
1694
1695async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1696 debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1697
1698 if event_buffer.has_event_type(EventType::EventD) {
1708 let eventd_events = event_buffer.extract(Event::is_eventd);
1709 if let Err(e) = source_context
1710 .dispatcher()
1711 .buffered_named("events")
1712 .expect("events output should always exist")
1713 .send_all(eventd_events)
1714 .await
1715 {
1716 error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1717 }
1718 }
1719
1720 if event_buffer.has_event_type(EventType::ServiceCheck) {
1722 let service_check_events = event_buffer.extract(Event::is_service_check);
1723 if let Err(e) = source_context
1724 .dispatcher()
1725 .buffered_named("service_checks")
1726 .expect("service checks output should always exist")
1727 .send_all(service_check_events)
1728 .await
1729 {
1730 error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1731 }
1732 }
1733
1734 if !event_buffer.is_empty() {
1736 if let Err(e) = source_context
1737 .dispatcher()
1738 .dispatch_named("metrics", event_buffer)
1739 .await
1740 {
1741 error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1742 }
1743 }
1744}
1745
1746const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1747 buffer_size + 4
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766 use std::{
1767 collections::HashMap,
1768 io::ErrorKind,
1769 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
1770 path::PathBuf,
1771 sync::{Arc, OnceLock},
1772 time::Duration,
1773 };
1774
1775 use bytes::Bytes;
1776 use bytesize::ByteSize;
1777 use saluki_config::ConfigurationLoader;
1778 use saluki_context::{origin::RawOrigin, ContextResolverBuilder, TagsResolverBuilder};
1779 use saluki_core::{components::ComponentContext, topology::ComponentId};
1780 use saluki_env::workload::{CaptureEntityResolver, EntityId};
1781 use saluki_io::{
1782 deser::codec::dogstatsd::{DogStatsDCodec, DogStatsDCodecConfiguration, ParsedPacket},
1783 net::{ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity},
1784 };
1785 use saluki_metrics::test::TestRecorder;
1786 use serde_json::json;
1787 use stringtheory::MetaString;
1788 use tokio::{net::UdpSocket, sync::mpsc, time::timeout};
1789
1790 use super::{
1791 forwarder::{
1792 ConnectedPacketForwarder, ForwardPacket, PacketForwarder, PacketForwarderTarget, FORWARDER_QUEUE_CAPACITY,
1793 },
1794 handle_metric_packet,
1795 metrics::build_metrics,
1796 resolve_capture_container_id, ContextResolvers, DogStatsDConfiguration, DOGSTATSD_CAPTURE_DIR,
1797 MIN_CAPTURE_DEPTH,
1798 };
1799
1800 const LINUX_EAFNOSUPPORT: i32 = 97;
1801 const MACOS_EAFNOSUPPORT: i32 = 47;
1802
1803 fn is_ipv6_unavailable_error(error: &std::io::Error) -> bool {
1804 matches!(error.kind(), ErrorKind::AddrNotAvailable | ErrorKind::Unsupported)
1805 || matches!(error.raw_os_error(), Some(LINUX_EAFNOSUPPORT | MACOS_EAFNOSUPPORT))
1806 }
1807
1808 #[derive(Default)]
1809 struct CaptureTestEntityResolver {
1810 pid_map: HashMap<u32, EntityId>,
1811 }
1812
1813 impl CaptureTestEntityResolver {
1814 fn with_pid_mapping(process_id: u32, entity_id: EntityId) -> Self {
1815 let mut pid_map = HashMap::new();
1816 pid_map.insert(process_id, entity_id);
1817 Self { pid_map }
1818 }
1819 }
1820
1821 impl CaptureEntityResolver for CaptureTestEntityResolver {
1822 fn resolve_container_entity_for_live_pid(&self, process_id: u32) -> Option<EntityId> {
1823 self.pid_map.get(&process_id).cloned()
1824 }
1825 }
1826
1827 fn packet_forwarder_from_sender(
1828 target_port: u16, packets_tx: mpsc::Sender<ForwardPacket>, metrics: super::metrics::Metrics,
1829 ) -> PacketForwarder {
1830 let mut forwarder =
1831 PacketForwarderTarget::new(MetaString::from_static("127.0.0.1"), target_port).to_forwarder(metrics);
1832 forwarder.connected = Arc::new(OnceLock::from(packets_tx));
1833 forwarder
1834 }
1835
1836 #[test]
1837 fn no_metrics_when_interner_full_allocations_disallowed() {
1838 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1846 let tags_resolver = TagsResolverBuilder::for_tests().build();
1847 let context_resolver = ContextResolverBuilder::for_tests()
1848 .with_heap_allocations(false)
1849 .with_tags_resolver(Some(tags_resolver.clone()))
1850 .build();
1851 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1852 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1853
1854 let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1855
1856 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1857 panic!("Failed to parse packet.");
1858 };
1859
1860 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1861 assert!(maybe_metric.is_none());
1862 }
1863
1864 #[test]
1865 fn metric_with_additional_tags() {
1866 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1867 let tags_resolver = TagsResolverBuilder::for_tests().build();
1868 let context_resolver = ContextResolverBuilder::for_tests()
1869 .with_heap_allocations(false)
1870 .with_tags_resolver(Some(tags_resolver.clone()))
1871 .build();
1872 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1873 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1874
1875 let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1876 let existing_tags_str = existing_tags.join(",");
1877
1878 let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1879 let additional_tags = [
1880 "tag4:value4".to_string(),
1881 "tag5:value5".to_string(),
1882 "tag6:value6".to_string(),
1883 ];
1884
1885 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1886 panic!("Failed to parse packet.");
1887 };
1888 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1889 assert!(maybe_metric.is_some());
1890
1891 let metric = maybe_metric.unwrap();
1892 let context = metric.context();
1893
1894 for tag in existing_tags {
1895 assert!(context.tags().has_tag(tag));
1896 }
1897
1898 for tag in additional_tags {
1899 assert!(context.tags().has_tag(tag));
1900 }
1901 }
1902
1903 fn deser_config(json: &str) -> DogStatsDConfiguration {
1904 serde_json::from_str(json).expect("failed to deserialize config")
1905 }
1906
1907 fn udp_listen_address() -> ListenAddress {
1908 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)))
1909 }
1910
1911 fn tcp_listen_address() -> ListenAddress {
1912 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)))
1913 }
1914
1915 #[test]
1916 fn interner_size_defaults_to_2mib() {
1917 let config = deser_config("{}");
1918 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
1919 }
1920
1921 #[test]
1922 fn socket_receive_buffer_size_defaults_to_zero() {
1923 let config = deser_config("{}");
1924 assert_eq!(config.socket_receive_buffer_size, 0);
1925 }
1926
1927 #[test]
1928 fn socket_receive_buffer_size_from_config() {
1929 let config = deser_config(r#"{"dogstatsd_so_rcvbuf": 131072}"#);
1930 assert_eq!(config.socket_receive_buffer_size, 131_072);
1931 }
1932
1933 #[test]
1934 fn stream_log_too_big_defaults_to_false() {
1935 let config = deser_config("{}");
1936 assert!(!config.stream_log_too_big);
1937 }
1938
1939 #[test]
1940 fn stream_log_too_big_from_config() {
1941 let config = deser_config(r#"{"dogstatsd_stream_log_too_big": true}"#);
1942 assert!(config.stream_log_too_big);
1943 }
1944
1945 #[test]
1946 fn disable_verbose_logs_defaults_to_false() {
1947 let config = deser_config("{}");
1948 assert!(!config.disable_verbose_logs);
1949 }
1950
1951 #[test]
1952 fn disable_verbose_logs_from_config() {
1953 let config = deser_config(r#"{"dogstatsd_disable_verbose_logs": true}"#);
1954 assert!(config.disable_verbose_logs);
1955 }
1956
1957 #[test]
1958 fn statsd_forward_defaults_disabled() {
1959 let config = deser_config("{}");
1960 assert!(config.statsd_forward_host.is_none());
1961 assert_eq!(config.statsd_forward_port, 0);
1962 assert!(config.statsd_forward_target().is_none());
1963 }
1964
1965 #[test]
1966 fn statsd_forward_empty_host_disabled() {
1967 let config = deser_config(r#"{"statsd_forward_host": "", "statsd_forward_port": 9125}"#);
1968 assert!(config.statsd_forward_host.is_none());
1969 assert!(config.statsd_forward_target().is_none());
1970 }
1971
1972 #[test]
1973 fn statsd_forward_zero_port_disabled() {
1974 let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 0}"#);
1975 assert_eq!(config.statsd_forward_host.as_deref(), Some("127.0.0.1"));
1976 assert!(config.statsd_forward_target().is_none());
1977 }
1978
1979 #[test]
1980 fn statsd_forward_host_and_port_enabled() {
1981 let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 9125}"#);
1982 let (host, port) = config.statsd_forward_target().expect("forwarding should be enabled");
1983 assert_eq!(host.as_ref(), "127.0.0.1");
1984 assert_eq!(port, 9125);
1985 }
1986
1987 #[test]
1988 fn statsd_forward_invalid_target_still_builds_forwarder_handle() {
1989 let config = deser_config(r#"{"statsd_forward_host": "not a valid host", "statsd_forward_port": 9125}"#);
1990 assert!(config.packet_forwarder_target().is_some());
1991 }
1992
1993 #[tokio::test]
1994 async fn packet_forwarder_sends_payload_bytes() {
1995 let receiver = UdpSocket::bind("127.0.0.1:0").await.expect("receiver should bind");
1996 let receiver_addr = receiver.local_addr().expect("receiver should have an address");
1997 let forwarder = ConnectedPacketForwarder::connect("127.0.0.1", receiver_addr.port())
1998 .await
1999 .expect("forwarder should connect");
2000 let payload = b"daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2";
2001
2002 let recorder = TestRecorder::default();
2003 let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2004 let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2005 let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2006 let metrics = build_metrics(&listen_addr, &context);
2007 let (packets_tx, packets_rx) = mpsc::channel(1);
2008 let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2009 let packet_forwarder = packet_forwarder_from_sender(receiver_addr.port(), packets_tx, metrics);
2010
2011 packet_forwarder.forward(Bytes::copy_from_slice(payload)).await;
2012
2013 let mut actual = [0u8; 128];
2014 let (received_len, _) = timeout(Duration::from_secs(1), receiver.recv_from(&mut actual))
2015 .await
2016 .expect("receive should not time out")
2017 .expect("receiver should receive payload");
2018
2019 assert_eq!(&actual[..received_len], payload);
2020 assert_eq!(
2021 recorder.counter((
2022 "component_packets_forwarded_total",
2023 &[
2024 ("component_id", "dogstatsd_test"),
2025 ("component_type", "source"),
2026 ("listener_type", "udp"),
2027 ("state", "ok"),
2028 ]
2029 )),
2030 Some(1)
2031 );
2032 assert_eq!(
2033 recorder.counter((
2034 "component_bytes_forwarded_total",
2035 &[
2036 ("component_id", "dogstatsd_test"),
2037 ("component_type", "source"),
2038 ("listener_type", "udp"),
2039 ]
2040 )),
2041 Some(payload.len() as u64)
2042 );
2043 worker.abort();
2044 }
2045
2046 #[tokio::test]
2047 async fn packet_forwarder_sends_payload_bytes_to_ipv6_target() {
2048 let receiver = match UdpSocket::bind("[::1]:0").await {
2049 Ok(receiver) => receiver,
2050 Err(e) if is_ipv6_unavailable_error(&e) => return,
2051 Err(e) => panic!("receiver should bind: {e}"),
2052 };
2053 let receiver_addr = receiver.local_addr().expect("receiver should have an address");
2054 let forwarder = ConnectedPacketForwarder::connect("::1", receiver_addr.port())
2055 .await
2056 .expect("forwarder should connect");
2057 let payload = b"daemon:666|g|#ip:6";
2058
2059 let recorder = TestRecorder::default();
2060 let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2061 let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2062 let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2063 let metrics = build_metrics(&listen_addr, &context);
2064 let (packets_tx, packets_rx) = mpsc::channel(1);
2065 let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2066 let packet_forwarder = packet_forwarder_from_sender(receiver_addr.port(), packets_tx, metrics);
2067
2068 packet_forwarder.forward(Bytes::copy_from_slice(payload)).await;
2069
2070 let mut actual = [0u8; 128];
2071 let (received_len, _) = timeout(Duration::from_secs(1), receiver.recv_from(&mut actual))
2072 .await
2073 .expect("receive should not time out")
2074 .expect("receiver should receive payload");
2075
2076 assert_eq!(&actual[..received_len], payload);
2077 worker.abort();
2078 }
2079
2080 #[tokio::test]
2081 async fn packet_forwarder_waits_when_queue_is_full() {
2082 let recorder = TestRecorder::default();
2083 let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2084 let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2085 let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2086 let metrics = build_metrics(&listen_addr, &context);
2087 let (packets_tx, _packets_rx) = mpsc::channel(FORWARDER_QUEUE_CAPACITY);
2088 let packet_forwarder = packet_forwarder_from_sender(9125, packets_tx, metrics);
2089
2090 for _ in 0..FORWARDER_QUEUE_CAPACITY {
2091 packet_forwarder.forward(Bytes::from_static(b"queued:1|c")).await;
2092 }
2093
2094 assert!(
2095 timeout(
2096 Duration::from_millis(100),
2097 packet_forwarder.forward(Bytes::from_static(b"blocked:1|c")),
2098 )
2099 .await
2100 .is_err(),
2101 "forwarding should wait for queue capacity instead of dropping"
2102 );
2103 }
2104
2105 #[tokio::test]
2106 async fn packet_forwarder_send_error_increments_error_telemetry() {
2107 let recorder = TestRecorder::default();
2108 let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2109 let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2110 let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2111 let metrics = build_metrics(&listen_addr, &context);
2112 let socket = UdpSocket::bind("127.0.0.1:0").await.expect("socket should bind");
2113 let forwarder = ConnectedPacketForwarder {
2114 socket,
2115 target: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9125)),
2116 };
2117 let (packets_tx, packets_rx) = mpsc::channel(1);
2118 let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2119 let packet_forwarder = packet_forwarder_from_sender(9125, packets_tx, metrics);
2120
2121 packet_forwarder.forward(Bytes::from_static(b"daemon:666|g")).await;
2122
2123 let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
2124 loop {
2125 if recorder.counter((
2126 "component_packets_forwarded_total",
2127 &[
2128 ("component_id", "dogstatsd_test"),
2129 ("component_type", "source"),
2130 ("listener_type", "udp"),
2131 ("state", "error"),
2132 ],
2133 )) == Some(1)
2134 {
2135 break;
2136 }
2137
2138 assert!(
2139 tokio::time::Instant::now() < deadline,
2140 "forwarding error telemetry should be recorded"
2141 );
2142 tokio::time::sleep(Duration::from_millis(10)).await;
2143 }
2144 worker.abort();
2145 }
2146
2147 #[test]
2148 fn autoscale_udp_listeners_defaults_to_false() {
2149 let config = deser_config("{}");
2150 assert!(!config.autoscale_udp_listeners);
2151 assert!(config.udp_streams_to_yield().is_none());
2152 }
2153
2154 #[test]
2155 #[cfg(target_os = "linux")]
2156 fn autoscale_udp_listeners_from_config_linux() {
2157 let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#);
2158 assert!(config.autoscale_udp_listeners);
2159
2160 let streams = config
2161 .udp_streams_to_yield()
2162 .expect("autoscale yields at least 1 stream");
2163 let n = streams.get();
2164 assert!(
2165 (1..=4).contains(&n),
2166 "expected 1..=4 streams from vCPU formula, got {n}"
2167 );
2168 }
2169
2170 #[test]
2171 #[cfg(not(target_os = "linux"))]
2172 fn autoscale_udp_listeners_from_config_non_linux() {
2173 let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#);
2174 assert!(config.autoscale_udp_listeners);
2175
2176 assert_eq!(None, config.udp_streams_to_yield());
2177 }
2178
2179 #[test]
2180 fn eol_required_defaults_to_no_listeners() {
2181 let config = deser_config("{}");
2182 let eol_required = config.eol_required();
2183
2184 assert!(!eol_required.for_listener(&udp_listen_address()));
2185 assert!(!eol_required.for_listener(&tcp_listen_address()));
2186 }
2187
2188 #[test]
2189 fn eol_required_matches_configured_listener_types() {
2190 let config = deser_config(r#"{"dogstatsd_eol_required": ["udp", "uds"]}"#);
2191 let eol_required = config.eol_required();
2192
2193 assert!(eol_required.for_listener(&udp_listen_address()));
2194 assert!(!eol_required.for_listener(&tcp_listen_address()));
2195
2196 #[cfg(unix)]
2197 {
2198 assert!(eol_required.for_listener(&ListenAddress::Unixgram("/tmp/dsd.sock".into())));
2199 assert!(eol_required.for_listener(&ListenAddress::Unix("/tmp/dsd-stream.sock".into())));
2200 }
2201 }
2202
2203 #[test]
2204 fn eol_required_accepts_space_separated_string() {
2205 let config = deser_config(r#"{"dogstatsd_eol_required": "udp uds"}"#);
2206 let eol_required = config.eol_required();
2207
2208 assert!(eol_required.for_listener(&udp_listen_address()));
2209 }
2210
2211 #[test]
2212 fn stream_log_too_big_only_warns_for_enabled_unix_invalid_frames() {
2213 let uds_stream = ListenAddress::Unix("/tmp/dsd-stream.sock".into());
2214 let tcp_stream = ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2215 let error = saluki_io::deser::framing::FramingError::InvalidFrame {
2216 frame_len: 8193,
2217 reason: "frame length exceeds buffer capacity",
2218 };
2219
2220 assert!(super::should_warn_stream_log_too_big(&uds_stream, &error, true));
2221 assert!(!super::should_warn_stream_log_too_big(&uds_stream, &error, false));
2222 assert!(!super::should_warn_stream_log_too_big(&tcp_stream, &error, true));
2223 }
2224
2225 #[test]
2226 fn interner_size_from_entry_count() {
2227 let config = deser_config(r#"{"dogstatsd_string_interner_size": 4096}"#);
2229 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
2230 }
2231
2232 #[test]
2233 fn interner_size_from_explicit_bytes() {
2234 let config = deser_config(r#"{"dogstatsd_string_interner_size_bytes": 4194304}"#);
2235 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(4194304));
2236 }
2237
2238 #[test]
2239 fn interner_size_explicit_bytes_takes_priority() {
2240 let config = deser_config(
2241 r#"{"dogstatsd_string_interner_size": 4096, "dogstatsd_string_interner_size_bytes": 8388608}"#,
2242 );
2243 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(8388608));
2245 }
2246
2247 #[test]
2248 fn interner_size_custom_entry_count() {
2249 let config = deser_config(r#"{"dogstatsd_string_interner_size": 8192}"#);
2250 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(4));
2252 }
2253
2254 fn address_list_eq(expected: &mut [ListenAddress], actual: &mut [ListenAddress]) -> Result<(), String> {
2256 if expected.len() != actual.len() {
2257 return Err(format!(
2258 "length mismatch: expected {} addresses, got {}",
2259 expected.len(),
2260 actual.len()
2261 ));
2262 }
2263
2264 expected.sort_by_key(|a| a.to_string());
2265 actual.sort_by_key(|a| a.to_string());
2266
2267 for (e, a) in expected.iter().zip(actual.iter()) {
2268 let (es, as_) = (e.to_string(), a.to_string());
2269 if es != as_ {
2270 return Err(format!("address mismatch: expected {}, got {}", es, as_));
2271 }
2272 }
2273
2274 Ok(())
2275 }
2276
2277 #[test]
2280 fn build_addresses_assertion_function_works() {
2281 let config = DogStatsDConfiguration {
2282 port: 0,
2283 tcp_port: 123,
2284 socket_path: None,
2285 socket_stream_path: None,
2286 non_local_traffic: false,
2287 ..Default::default()
2288 };
2289 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2290 Ipv4Addr::new(127, 0, 0, 2),
2292 123,
2293 )))];
2294 let mut actual = config.build_addresses(None);
2295 assert!(address_list_eq(&mut expected, &mut actual).is_err())
2296 }
2297
2298 #[test]
2300 fn build_addresses_no_listeners() {
2301 let config = DogStatsDConfiguration {
2302 port: 0,
2303 tcp_port: 0,
2304 socket_path: None,
2305 socket_stream_path: None,
2306 non_local_traffic: false,
2307 ..Default::default()
2308 };
2309 let mut expected = vec![];
2310 let mut actual = config.build_addresses(None);
2311 address_list_eq(&mut expected, &mut actual).unwrap();
2312 }
2313
2314 #[test]
2316 fn build_addresses_udp_local_only() {
2317 let config = DogStatsDConfiguration {
2318 port: 8125,
2319 tcp_port: 0,
2320 socket_path: None,
2321 socket_stream_path: None,
2322 non_local_traffic: false,
2323 ..Default::default()
2324 };
2325 let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
2326 Ipv4Addr::new(127, 0, 0, 1),
2327 8125,
2328 )))];
2329 let mut actual = config.build_addresses(None);
2330 address_list_eq(&mut expected, &mut actual).unwrap();
2331 }
2332
2333 #[test]
2335 fn build_addresses_udp_non_local_only() {
2336 let config = DogStatsDConfiguration {
2337 port: 8125,
2338 tcp_port: 0,
2339 socket_path: None,
2340 socket_stream_path: None,
2341 non_local_traffic: true,
2342 ..Default::default()
2343 };
2344 let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
2345 Ipv4Addr::new(0, 0, 0, 0),
2346 8125,
2347 )))];
2348 let mut actual = config.build_addresses(None);
2349 address_list_eq(&mut expected, &mut actual).unwrap();
2350 }
2351
2352 #[test]
2354 fn build_addresses_tcp_local_only() {
2355 let config = DogStatsDConfiguration {
2356 port: 0,
2357 tcp_port: 9000,
2358 socket_path: None,
2359 socket_stream_path: None,
2360 non_local_traffic: false,
2361 ..Default::default()
2362 };
2363 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2364 Ipv4Addr::new(127, 0, 0, 1),
2365 9000,
2366 )))];
2367 let mut actual = config.build_addresses(None);
2368 address_list_eq(&mut expected, &mut actual).unwrap();
2369 }
2370
2371 #[test]
2373 fn build_addresses_tcp_non_local_only() {
2374 let config = DogStatsDConfiguration {
2375 port: 0,
2376 tcp_port: 9000,
2377 socket_path: None,
2378 socket_stream_path: None,
2379 non_local_traffic: true,
2380 ..Default::default()
2381 };
2382 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2383 Ipv4Addr::new(0, 0, 0, 0),
2384 9000,
2385 )))];
2386 let mut actual = config.build_addresses(None);
2387 address_list_eq(&mut expected, &mut actual).unwrap();
2388 }
2389
2390 #[test]
2392 fn build_addresses_unixgram_only() {
2393 let config = DogStatsDConfiguration {
2394 port: 0,
2395 tcp_port: 0,
2396 socket_path: Some("/tmp/dsd.sock".to_string()),
2397 socket_stream_path: None,
2398 non_local_traffic: false,
2399 ..Default::default()
2400 };
2401 let mut expected = vec![ListenAddress::Unixgram("/tmp/dsd.sock".into())];
2402 let mut actual = config.build_addresses(None);
2403 address_list_eq(&mut expected, &mut actual).unwrap();
2404 }
2405
2406 #[test]
2408 fn build_addresses_unix_stream_only() {
2409 let config = DogStatsDConfiguration {
2410 port: 0,
2411 tcp_port: 0,
2412 socket_path: None,
2413 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2414 non_local_traffic: false,
2415 ..Default::default()
2416 };
2417 let mut expected = vec![ListenAddress::Unix("/tmp/dsd-stream.sock".into())];
2418 let mut actual = config.build_addresses(None);
2419 address_list_eq(&mut expected, &mut actual).unwrap();
2420 }
2421
2422 #[test]
2424 fn build_addresses_all_four_non_local() {
2425 let config = DogStatsDConfiguration {
2426 port: 8125,
2427 tcp_port: 9000,
2428 socket_path: Some("/tmp/dsd.sock".to_string()),
2429 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2430 non_local_traffic: true,
2431 ..Default::default()
2432 };
2433 let mut expected = vec![
2434 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
2435 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
2436 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2437 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2438 ];
2439 let mut actual = config.build_addresses(None);
2440 address_list_eq(&mut expected, &mut actual).unwrap();
2441 }
2442
2443 #[test]
2445 fn build_addresses_all_four_local() {
2446 let config = DogStatsDConfiguration {
2447 port: 8125,
2448 tcp_port: 9000,
2449 socket_path: Some("/tmp/dsd.sock".to_string()),
2450 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2451 non_local_traffic: false,
2452 ..Default::default()
2453 };
2454 let mut expected = vec![
2455 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8125))),
2456 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9000))),
2457 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2458 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2459 ];
2460 let mut actual = config.build_addresses(None);
2461 address_list_eq(&mut expected, &mut actual).unwrap();
2462 }
2463
2464 #[test]
2467 fn build_addresses_bind_host_applies_to_udp_and_tcp() {
2468 let config = DogStatsDConfiguration {
2469 port: 8125,
2470 tcp_port: 9000,
2471 socket_path: Some("/tmp/dsd.sock".to_string()),
2472 socket_stream_path: None,
2473 non_local_traffic: false,
2474 ..Default::default()
2475 };
2476 let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
2477 let mut expected = vec![
2478 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 8125))),
2479 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 9000))),
2480 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2481 ];
2482 let mut actual = config.build_addresses(bind_host);
2483 address_list_eq(&mut expected, &mut actual).unwrap();
2484 }
2485
2486 #[test]
2490 fn build_addresses_non_local_clobbers_bind_host() {
2491 let config = DogStatsDConfiguration {
2492 port: 8125,
2493 tcp_port: 9000,
2494 socket_path: None,
2495 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2496 non_local_traffic: true,
2497 ..Default::default()
2498 };
2499 let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
2500 let mut expected = vec![
2501 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
2502 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
2503 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2504 ];
2505 let mut actual = config.build_addresses(bind_host);
2506 address_list_eq(&mut expected, &mut actual).unwrap();
2507 }
2508
2509 #[test]
2510 fn non_finite_metric_values_are_silently_dropped() {
2511 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
2516 for input in &[b"my.gauge:NaN|g" as &[u8], b"my.gauge:inf|g", b"my.gauge:-inf|g"] {
2517 match codec.decode_packet(input).expect("should decode without error") {
2518 ParsedPacket::Metric(packet) => assert_eq!(
2519 packet.num_points, 0,
2520 "non-finite value should be dropped, leaving 0 valid points"
2521 ),
2522 _ => panic!("expected Metric packet"),
2523 }
2524 }
2525 }
2526
2527 #[tokio::test]
2528 async fn fix_empty_capture_path_sets_path_from_run_path() {
2529 const RUN_PATH: &str = "/my/little/run_path";
2530
2531 let base_config_values = json!({ "run_path": RUN_PATH });
2532 let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2533
2534 let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2535
2536 let expected = PathBuf::from(RUN_PATH).join(DOGSTATSD_CAPTURE_DIR);
2537 assert_eq!(expected, dogstatsd_config.capture_path);
2538 }
2539
2540 #[tokio::test]
2541 async fn fix_empty_capture_path_keeps_explicit_path() {
2542 const RUN_PATH: &str = "/my/little/run_path";
2543 const CAPTURE_PATH: &str = "/custom/path/to/capture";
2544
2545 let base_config_values = json!({ "run_path": RUN_PATH, "dogstatsd_capture_path": CAPTURE_PATH });
2546 let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2547
2548 let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2549
2550 assert_eq!(PathBuf::from(CAPTURE_PATH), dogstatsd_config.capture_path);
2551 }
2552
2553 #[tokio::test]
2554 async fn from_configuration_normalizes_capture_depth() {
2555 let cases = [
2556 (json!({}), MIN_CAPTURE_DEPTH),
2557 (json!({ "dogstatsd_capture_depth": 0 }), MIN_CAPTURE_DEPTH),
2558 (json!({ "dogstatsd_capture_depth": 2048 }), 2048),
2559 ];
2560
2561 for (base_config_values, expected_depth) in cases {
2562 let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2563 let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2564
2565 assert_eq!(expected_depth, dogstatsd_config.capture_depth);
2566 }
2567 }
2568
2569 #[test]
2570 fn capture_entity_resolver_is_configured_separately_from_workload_provider() {
2571 let config =
2572 DogStatsDConfiguration::default().with_capture_entity_resolver(CaptureTestEntityResolver::default());
2573
2574 assert!(config.capture_entity_resolver.is_some());
2575 assert!(config.workload_provider.is_none());
2576 }
2577
2578 #[test]
2579 fn resolve_capture_container_id_uses_live_pid_mapping() {
2580 let capture_entity_resolver = CaptureTestEntityResolver::with_pid_mapping(
2581 42,
2582 EntityId::from_local_data("ci-pid-container").expect("container entity"),
2583 );
2584
2585 assert_eq!(
2586 resolve_capture_container_id(Some(&capture_entity_resolver), Some(42)),
2587 Some("container_id://pid-container".to_string())
2588 );
2589 }
2590
2591 #[test]
2592 fn build_capture_record_ignores_payload_local_data() {
2593 let record = super::build_capture_record(None, None, b"test.metric:1|c|c:ci-local-container\n");
2594
2595 assert_eq!(record.container_id, None);
2596 assert!(record.ancillary.is_empty());
2597 }
2598
2599 #[test]
2600 fn stream_capture_state_preserves_last_pid_without_new_creds() {
2601 let mut stream_capture = super::StreamCaptureState::new();
2602
2603 stream_capture.update_peer_metadata(&ConnectionAddress::ProcessLike(ProcessIdentity::Credentials(
2604 ProcessCredentials {
2605 pid: 42,
2606 uid: 0,
2607 gid: 0,
2608 },
2609 )));
2610 stream_capture.update_peer_metadata(&ConnectionAddress::ProcessLike(ProcessIdentity::Unavailable));
2611
2612 assert_eq!(stream_capture.last_pid, Some(42));
2613 }
2614
2615 #[test]
2616 fn apply_credentials_uses_live_pid_for_normal_packet() {
2617 let mut origin = RawOrigin::default();
2618 let creds = ProcessCredentials {
2619 pid: 12345,
2620 uid: 1000,
2621 gid: 1000,
2622 };
2623 super::apply_credentials_to_origin(&mut origin, &creds);
2624
2625 assert_eq!(origin.process_id(), Some(12345));
2626 }
2627
2628 #[test]
2629 fn apply_credentials_unpacks_captured_pid_when_replay_gid_present() {
2630 let mut origin = RawOrigin::default();
2631 let captured_pid: u32 = 99887766;
2632 let creds = ProcessCredentials {
2633 pid: 12345, uid: captured_pid, gid: super::REPLAY_CREDENTIALS_GID,
2636 };
2637 super::apply_credentials_to_origin(&mut origin, &creds);
2638
2639 assert_eq!(
2640 origin.process_id(),
2641 Some(super::origin::mark_replay_process_id(captured_pid))
2642 );
2643 }
2644}
2645
2646#[cfg(test)]
2647mod config_smoke {
2648 use datadog_agent_config_testing::config_registry::structs;
2649 use datadog_agent_config_testing::run_config_smoke_tests;
2650 use serde_json::json;
2651
2652 use super::DogStatsDConfiguration;
2653 use crate::config::{DatadogRemapper, KEY_ALIASES};
2654
2655 #[tokio::test]
2656 async fn smoke_test() {
2657 run_config_smoke_tests(
2658 structs::DOGSTATSD_CONFIGURATION,
2659 &[],
2660 json!({}),
2661 |cfg| {
2662 cfg.as_typed::<DogStatsDConfiguration>()
2663 .expect("DogStatsDConfiguration should deserialize")
2664 },
2665 KEY_ALIASES,
2666 DatadogRemapper::new,
2667 )
2668 .await
2669 }
2670}