1use std::{
10 collections::VecDeque,
11 num::NonZeroUsize,
12 path::PathBuf,
13 sync::{Arc, LazyLock},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15};
16
17use async_trait::async_trait;
18use bytes::{Buf, BufMut};
19use bytesize::ByteSize;
20use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
21use saluki_common::task::spawn_traced_named;
22use saluki_config::{deserialize_space_separated_or_seq, GenericConfiguration};
23use saluki_context::{
24 origin::RawOrigin,
25 tags::{RawTags, RawTagsFilter},
26 TagsResolver,
27};
28use saluki_core::data_model::event::{
29 eventd::EventD,
30 metric::{Metric, MetricMetadata, MetricOrigin},
31 service_check::ServiceCheck,
32 Event, EventType,
33};
34use saluki_core::{
35 components::{sources::*, ComponentContext},
36 pooling::FixedSizeObjectPool,
37 topology::{
38 interconnect::EventBufferManager,
39 shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
40 EventsBuffer, OutputDefinition,
41 },
42};
43use saluki_env::{workload::CaptureEntityResolver, WorkloadProvider};
44use saluki_error::{generic_error, ErrorContext as _, GenericError};
45use saluki_io::{
46 buf::{BytesBuffer, ClearableIoBuffer as _, FixedSizeVec},
47 deser::{
48 codec::dogstatsd::*,
49 framing::{Framer as _, FramingError, LengthDelimitedFramer},
50 },
51 net::{
52 listener::{Listener, ListenerError},
53 ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity, Stream,
54 },
55};
56use serde::{Deserialize, Deserializer};
57use serde_with::{serde_as, NoneAsEmptyString};
58use snafu::{ResultExt as _, Snafu};
59use stringtheory::MetaString;
60use tokio::{
61 select,
62 time::{interval, MissedTickBehavior},
63};
64use tracing::{debug, error, info, trace, warn};
65
66mod forwarder;
67use self::forwarder::{PacketForwarder, PacketForwarderTarget};
68
69mod framer;
70use self::framer::{get_framer, DsdFramer};
71use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
72
73mod filters;
74use self::filters::EnablePayloadsFilter;
75
76mod io_buffer;
77use self::io_buffer::IoBufferManager;
78
79mod metrics;
80use self::metrics::{build_metrics, Metrics};
81
82mod replay;
83use self::replay::{CaptureRecord, CapturedTaggerHandle, TrafficCapture};
84pub use self::replay::{
85 DogStatsDCaptureAPIHandler, DogStatsDCaptureControl, DogStatsDReplayAPIHandler, DogStatsDReplayControl,
86 ReplaySession, TimestampResolution, TrafficCaptureReader, DEFAULT_REPLAY_LOOPS, REPLAY_CREDENTIALS_GID,
87};
88
89mod origin;
90use self::origin::{
91 mark_replay_process_id, origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet,
92 DogStatsDOriginTagResolver, OriginEnrichmentConfiguration,
93};
94
95mod resolver;
96use self::resolver::ContextResolvers;
97
98mod tags;
99
100#[derive(Debug, Snafu)]
101#[snafu(context(suffix(false)))]
102enum Error {
103 #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
104 FailedToCreateListener {
105 listener_type: &'static str,
106 source: ListenerError,
107 },
108
109 #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
110 NoListenersConfigured,
111
112 #[snafu(display("Could not resolve bind_host '{}': {}", host, source))]
113 UnresolvableBindHost { host: String, source: std::io::Error },
114
115 #[snafu(display("bind_host '{}' resolved to zero IP addresses.", host))]
116 BindHostHasNoAddresses { host: String },
117}
118
119const INTERNER_BASELINE_BYTES_PER_ENTRY: u64 = 512;
124
125const fn default_buffer_size() -> usize {
126 8192
127}
128
129const fn default_buffer_count() -> usize {
130 128
131}
132
133const fn default_port() -> u16 {
134 8125
135}
136
137const fn default_tcp_port() -> u16 {
138 0
139}
140
141const fn default_statsd_forward_port() -> u16 {
142 0
143}
144
145const fn default_socket_receive_buffer_size() -> usize {
146 0
147}
148
149const fn default_allow_context_heap_allocations() -> bool {
150 true
151}
152
153const fn default_no_aggregation_pipeline_support() -> bool {
154 true
155}
156
157const fn default_context_string_interner_entry_count() -> u64 {
158 4096
159}
160
161const fn default_cached_contexts_limit() -> usize {
162 500_000
163}
164
165const fn default_cached_tagsets_limit() -> usize {
166 500_000
167}
168
169const fn default_context_expiry_seconds() -> u64 {
170 20
171}
172
173const fn default_dogstatsd_permissive_decoding() -> bool {
174 true
175}
176
177const fn default_dogstatsd_minimum_sample_rate() -> f64 {
178 0.000000003845
179}
180
181const fn default_true() -> bool {
182 true
183}
184
185#[derive(Deserialize)]
187#[cfg_attr(test, derive(PartialEq, serde::Serialize))]
188pub struct EnablePayloadsConfiguration {
189 #[serde(default = "default_true")]
193 pub series: bool,
194
195 #[serde(default = "default_true")]
199 pub sketches: bool,
200
201 #[serde(default = "default_true")]
205 pub events: bool,
206
207 #[serde(default = "default_true")]
211 pub service_checks: bool,
212}
213
214impl Default for EnablePayloadsConfiguration {
215 fn default() -> Self {
216 Self {
217 series: true,
218 sketches: true,
219 events: true,
220 service_checks: true,
221 }
222 }
223}
224
225const MIN_CAPTURE_DEPTH: usize = 1024;
226
227const fn default_capture_depth() -> usize {
228 MIN_CAPTURE_DEPTH
229}
230
231const DOGSTATSD_CAPTURE_DIR: &str = "dsd_capture";
232
233fn deserialize_empty_metastring_as_none<'de, D>(deserializer: D) -> Result<Option<MetaString>, D::Error>
234where
235 D: Deserializer<'de>,
236{
237 let value = Option::<MetaString>::deserialize(deserializer)?;
238 Ok(value.filter(|host| !host.is_empty()))
239}
240
241#[serde_as]
245#[derive(Deserialize, Default)]
246#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
247#[cfg_attr(test, derive_where(PartialEq))]
248pub struct DogStatsDConfiguration {
249 #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
255 buffer_size: usize,
256
257 #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
265 buffer_count: usize,
266
267 #[serde(rename = "dogstatsd_port", default = "default_port")]
273 port: u16,
274
275 #[serde(rename = "dogstatsd_so_rcvbuf", default = "default_socket_receive_buffer_size")]
281 socket_receive_buffer_size: usize,
282
283 #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
289 tcp_port: u16,
290
291 #[serde(
298 rename = "statsd_forward_host",
299 default,
300 deserialize_with = "deserialize_empty_metastring_as_none"
301 )]
302 statsd_forward_host: Option<MetaString>,
303
304 #[serde(rename = "statsd_forward_port", default = "default_statsd_forward_port")]
310 statsd_forward_port: u16,
311
312 #[serde(rename = "dogstatsd_socket", default)]
318 #[serde_as(as = "NoneAsEmptyString")]
319 socket_path: Option<String>,
320
321 #[serde(rename = "dogstatsd_stream_socket", default)]
327 #[serde_as(as = "NoneAsEmptyString")]
328 socket_stream_path: Option<String>,
329
330 #[serde(rename = "dogstatsd_stream_log_too_big", default)]
339 stream_log_too_big: bool,
340
341 #[serde(
350 rename = "dogstatsd_eol_required",
351 default,
352 deserialize_with = "deserialize_space_separated_or_seq"
353 )]
354 eol_required: Vec<String>,
355
356 #[serde(rename = "bind_host", default)]
364 #[serde_as(as = "NoneAsEmptyString")]
365 bind_host: Option<String>,
366
367 #[serde(rename = "dogstatsd_non_local_traffic", default)]
374 non_local_traffic: bool,
375
376 #[serde(rename = "dogstatsd_autoscale_udp_listeners", default)]
391 autoscale_udp_listeners: bool,
392
393 #[serde(
403 rename = "dogstatsd_allow_context_heap_allocs",
404 default = "default_allow_context_heap_allocations"
405 )]
406 allow_context_heap_allocations: bool,
407
408 #[serde(
416 rename = "dogstatsd_no_aggregation_pipeline",
417 default = "default_no_aggregation_pipeline_support"
418 )]
419 no_aggregation_pipeline_support: bool,
420
421 #[serde(
429 rename = "dogstatsd_string_interner_size",
430 default = "default_context_string_interner_entry_count"
431 )]
432 context_string_interner_entry_count: u64,
433
434 #[serde(rename = "dogstatsd_string_interner_size_bytes", default)]
440 context_string_interner_size_bytes: Option<ByteSize>,
441
442 #[serde(
450 rename = "dogstatsd_cached_contexts_limit",
451 default = "default_cached_contexts_limit"
452 )]
453 cached_contexts_limit: usize,
454
455 #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
463 cached_tagsets_limit: usize,
464
465 #[serde(
471 rename = "dogstatsd_context_expiry_seconds",
472 default = "default_context_expiry_seconds"
473 )]
474 context_expiry_seconds: u64,
475
476 #[serde(
483 rename = "dogstatsd_permissive_decoding",
484 default = "default_dogstatsd_permissive_decoding"
485 )]
486 permissive_decoding: bool,
487
488 #[serde(
498 rename = "dogstatsd_minimum_sample_rate",
499 default = "default_dogstatsd_minimum_sample_rate"
500 )]
501 minimum_sample_rate: f64,
502
503 #[serde(rename = "enable_payloads", default)]
505 enable_payloads: EnablePayloadsConfiguration,
506
507 #[serde(flatten, default)]
509 origin_enrichment: OriginEnrichmentConfiguration,
510
511 #[serde(skip)]
513 #[cfg_attr(test, derive_where(skip))]
514 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
515
516 #[serde(skip, default)]
518 #[cfg_attr(test, derive_where(skip))]
519 capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
520
521 #[serde(rename = "dogstatsd_tags", default)]
523 additional_tags: Vec<String>,
524
525 #[serde(rename = "dogstatsd_capture_path", default)]
533 capture_path: PathBuf,
534
535 #[serde(rename = "dogstatsd_capture_depth", default = "default_capture_depth")]
543 capture_depth: usize,
544
545 #[serde(skip, default)]
546 #[cfg_attr(test, derive_where(skip))]
547 capture_control: DogStatsDCaptureControl,
548
549 #[serde(skip, default)]
550 #[cfg_attr(test, derive_where(skip))]
551 replay_control: DogStatsDReplayControl,
552
553 #[serde(default)]
560 provider_kind: String,
561}
562
563#[derive(Clone, Copy, Default)]
564struct EolRequired {
565 udp: bool,
566 uds: bool,
567}
568
569impl EolRequired {
570 fn from_config_values(values: &[String]) -> Self {
571 let mut eol_required = Self::default();
572
573 for value in values {
574 match value.as_str() {
575 "udp" => eol_required.udp = true,
576 "uds" => eol_required.uds = true,
577 "named_pipe" => {}
578 _ => warn!(
579 value,
580 "Invalid dogstatsd_eol_required value. Expected 'udp', 'uds', or 'named_pipe'."
581 ),
582 }
583 }
584
585 eol_required
586 }
587
588 fn for_listener(&self, listen_addr: &ListenAddress) -> bool {
589 match listen_addr {
590 ListenAddress::Udp(_) => self.udp,
591 ListenAddress::Tcp(_) => false,
592 #[cfg(unix)]
593 ListenAddress::Unixgram(_) | ListenAddress::Unix(_) => self.uds,
594 }
595 }
596}
597
598async fn resolve_bind_host(host: &str) -> Result<std::net::IpAddr, Error> {
604 let mut addrs = tokio::net::lookup_host((host, 0u16))
605 .await
606 .context(UnresolvableBindHost { host: host.to_string() })?;
607 addrs
608 .next()
609 .map(|sa| sa.ip())
610 .ok_or_else(|| Error::BindHostHasNoAddresses { host: host.to_string() })
611}
612
613impl DogStatsDConfiguration {
614 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
616 let mut dogstatsd_config: Self = config.as_typed()?;
617 dogstatsd_config.fix_empty_capture_path(config);
618 dogstatsd_config.fix_capture_depth();
619 Ok(dogstatsd_config)
620 }
621
622 fn additional_tags(&self) -> Vec<String> {
624 if self.provider_kind.is_empty() {
625 return self.additional_tags.clone();
626 }
627
628 let mut tags = self.additional_tags.clone();
629 tags.push(format!("provider_kind:{}", self.provider_kind.clone()));
630 tags
631 }
632
633 fn fix_capture_depth(&mut self) {
634 self.capture_depth = self.capture_depth.max(MIN_CAPTURE_DEPTH);
635 }
636
637 fn effective_context_string_interner_bytes(&self) -> ByteSize {
643 match self.context_string_interner_size_bytes {
644 Some(explicit_bytes) => explicit_bytes,
645 None => ByteSize::b(self.context_string_interner_entry_count * INTERNER_BASELINE_BYTES_PER_ENTRY),
646 }
647 }
648
649 fn eol_required(&self) -> EolRequired {
650 EolRequired::from_config_values(&self.eol_required)
651 }
652
653 fn statsd_forward_target(&self) -> Option<(&MetaString, u16)> {
654 let host = self.statsd_forward_host.as_ref()?;
655 if self.statsd_forward_port == 0 {
656 return None;
657 }
658
659 Some((host, self.statsd_forward_port))
660 }
661
662 fn packet_forwarder_target(&self) -> Option<PacketForwarderTarget> {
663 let (host, port) = self.statsd_forward_target()?;
664 Some(PacketForwarderTarget::new(host.clone(), port))
665 }
666
667 fn udp_streams_to_yield(&self) -> Option<NonZeroUsize> {
673 if !self.autoscale_udp_listeners {
674 return None;
675 }
676
677 #[cfg(not(target_os = "linux"))]
678 if self.autoscale_udp_listeners {
679 warn!("UDP stream handler autoscaling not supported on non-Linux platforms. Default to single stream handler.");
680 return None;
681 }
682
683 let vcpus = std::thread::available_parallelism().map(NonZeroUsize::get).unwrap_or(1);
684 let streams = (1 + vcpus / 8).min(4);
685 NonZeroUsize::new(streams)
686 }
687
688 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
694 where
695 W: WorkloadProvider + Send + Sync + 'static,
696 {
697 self.workload_provider = Some(Arc::new(workload_provider));
698 self
699 }
700
701 pub fn with_capture_entity_resolver<R>(mut self, capture_entity_resolver: R) -> Self
708 where
709 R: CaptureEntityResolver + Send + Sync + 'static,
710 {
711 self.capture_entity_resolver = Some(Arc::new(capture_entity_resolver));
712 self
713 }
714
715 pub fn capture_control(&self) -> DogStatsDCaptureControl {
717 self.capture_control.clone()
718 }
719
720 pub fn capture_api_handler(&self) -> DogStatsDCaptureAPIHandler {
722 DogStatsDCaptureAPIHandler::new(self.capture_control.clone())
723 }
724
725 pub fn replay_control(&self) -> DogStatsDReplayControl {
727 self.replay_control.clone()
728 }
729
730 pub fn replay_api_handler(&self) -> DogStatsDReplayAPIHandler {
732 DogStatsDReplayAPIHandler::new(self.replay_control.clone())
733 }
734
735 fn fix_empty_capture_path(&mut self, config: &GenericConfiguration) {
736 if self.capture_path.parent().is_some() {
737 return;
738 }
739
740 let capture_path = match config.try_get_typed::<PathBuf>("run_path") {
741 Ok(Some(mut run_path)) => {
742 run_path.push(DOGSTATSD_CAPTURE_DIR);
743 run_path
744 }
745 Ok(None) => {
746 debug!(
747 "`dogstatsd_capture_path` and `run_path` were empty. Default DogStatsD capture path is unavailable."
748 );
749 return;
750 }
751 Err(e) => {
752 debug!(
753 error = %e,
754 "Failed to read `run_path` from configuration. Default DogStatsD capture path is unavailable."
755 );
756 return;
757 }
758 };
759
760 self.capture_path = capture_path;
761 }
762
763 fn build_addresses(&self, bind_host: Option<std::net::IpAddr>) -> Vec<ListenAddress> {
773 let bind_ip: std::net::IpAddr = if self.non_local_traffic {
774 [0, 0, 0, 0].into()
775 } else {
776 bind_host.unwrap_or_else(|| [127, 0, 0, 1].into())
777 };
778
779 let mut addresses: Vec<ListenAddress> = Vec::new();
780
781 if self.port != 0 {
782 addresses.push(ListenAddress::Udp(std::net::SocketAddr::new(bind_ip, self.port)));
783 }
784
785 if self.tcp_port != 0 {
786 addresses.push(ListenAddress::Tcp(std::net::SocketAddr::new(bind_ip, self.tcp_port)));
787 }
788
789 if let Some(socket_path) = &self.socket_path {
790 addresses.push(ListenAddress::Unixgram(socket_path.into()));
791 }
792
793 if let Some(socket_stream_path) = &self.socket_stream_path {
794 addresses.push(ListenAddress::Unix(socket_stream_path.into()));
795 }
796
797 addresses
798 }
799
800 async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
802 let bind_host: Option<std::net::IpAddr> = if self.non_local_traffic {
806 None
807 } else {
808 match &self.bind_host {
809 Some(host) => Some(resolve_bind_host(host).await?),
810 None => None,
811 }
812 };
813
814 let addresses = self.build_addresses(bind_host);
815 let mut listeners = Vec::new();
816 let socket_receive_buffer_size =
817 (self.socket_receive_buffer_size != 0).then_some(self.socket_receive_buffer_size);
818 let udp_streams_to_yield = self.udp_streams_to_yield();
819 for address in addresses {
820 let listener_type = address.listener_type();
821 let listener_streams = matches!(address, ListenAddress::Udp(_))
822 .then_some(udp_streams_to_yield)
823 .flatten();
824 let listener = Listener::from_listen_address(address, listener_streams)
825 .await
826 .context(FailedToCreateListener { listener_type })?
827 .with_receive_buffer_size(socket_receive_buffer_size);
828
829 listeners.push(listener);
830 }
831 Ok(listeners)
832 }
833}
834
835#[async_trait]
836impl SourceBuilder for DogStatsDConfiguration {
837 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
838 let listeners = self.build_listeners().await?;
839 if listeners.is_empty() {
840 return Err(Error::NoListenersConfigured.into());
841 }
842
843 let min_buffers: usize = listeners.iter().map(Listener::min_buffer_reservation).sum();
847 if self.buffer_count < min_buffers {
848 return Err(generic_error!(
849 "Must have a minimum of {} I/O buffers to service all configured listeners (have {}).",
850 min_buffers,
851 self.buffer_count,
852 ));
853 }
854
855 let origin_detection_enabled = self.origin_enrichment.enabled();
856 let captured_tagger = CapturedTaggerHandle::new();
859
860 let maybe_origin_tags_resolver = self.workload_provider.clone().map(|provider| {
861 DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider, captured_tagger.clone())
862 });
863 let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
864 .error_context("Failed to create context resolvers.")?;
865
866 let codec_config = DogStatsDCodecConfiguration::default()
867 .with_timestamps(self.no_aggregation_pipeline_support)
868 .with_permissive_mode(self.permissive_decoding)
869 .with_minimum_sample_rate(self.minimum_sample_rate)
870 .with_client_origin_detection(self.origin_enrichment.origin_detection_client);
871
872 let codec = DogStatsDCodec::from_configuration(codec_config);
873 let eol_required = self.eol_required();
874
875 let enable_payloads_filter = EnablePayloadsFilter::default()
876 .with_allow_series(self.enable_payloads.series)
877 .with_allow_sketches(self.enable_payloads.sketches)
878 .with_allow_events(self.enable_payloads.events)
879 .with_allow_service_checks(self.enable_payloads.service_checks);
880 let traffic_capture = TrafficCapture::with_workload_provider(
881 self.capture_path.clone(),
882 self.capture_depth.max(MIN_CAPTURE_DEPTH),
883 self.workload_provider.clone(),
884 );
885 self.capture_control.bind(traffic_capture.clone());
886 let packet_forwarder_target = self.packet_forwarder_target();
887
888 self.replay_control.bind(captured_tagger);
889
890 Ok(Box::new(DogStatsD {
891 listeners,
892 io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
893 FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
894 }),
895 codec,
896 context_resolvers,
897 enabled_filter: enable_payloads_filter,
898 origin_detection_enabled,
899 stream_log_too_big: self.stream_log_too_big,
900 eol_required,
901 additional_tags: self.additional_tags().into(),
902 capture_entity_resolver: self.capture_entity_resolver.clone(),
903 traffic_capture,
904 packet_forwarder_target,
905 }))
906 }
907
908 fn outputs(&self) -> &[OutputDefinition<EventType>] {
909 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
910 vec![
911 OutputDefinition::named_output("metrics", EventType::Metric),
912 OutputDefinition::named_output("events", EventType::EventD),
913 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
914 ]
915 });
916 &OUTPUTS
917 }
918}
919
920impl MemoryBounds for DogStatsDConfiguration {
921 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
922 builder
923 .minimum()
924 .with_single_value::<DogStatsD>("source struct")
926 .with_expr(UsageExpr::product(
928 "buffers",
929 UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
930 UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
931 ))
932 .with_expr(UsageExpr::config(
935 "dogstatsd_string_interner_size_bytes",
936 self.effective_context_string_interner_bytes().as_u64() as usize,
937 ));
938 }
939}
940
941pub struct DogStatsD {
943 listeners: Vec<Listener>,
944 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
945 codec: DogStatsDCodec,
946 context_resolvers: ContextResolvers,
947 enabled_filter: EnablePayloadsFilter,
948 origin_detection_enabled: bool,
949 stream_log_too_big: bool,
950 eol_required: EolRequired,
951 additional_tags: Arc<[String]>,
952 capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
953 traffic_capture: TrafficCapture,
954 packet_forwarder_target: Option<PacketForwarderTarget>,
955}
956
957struct ListenerContext {
958 shutdown_handle: DynamicShutdownHandle,
959 listener: Listener,
960 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
961 codec: DogStatsDCodec,
962 context_resolvers: ContextResolvers,
963 origin_detection_enabled: bool,
964 stream_log_too_big: bool,
965 eol_required: EolRequired,
966 additional_tags: Arc<[String]>,
967 capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
968 traffic_capture: TrafficCapture,
969 packet_forwarder_target: Option<PacketForwarderTarget>,
970}
971
972struct HandlerContext {
973 listen_addr: ListenAddress,
974 framer: DsdFramer,
975 codec: DogStatsDCodec,
976 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
977 metrics: Metrics,
978 context_resolvers: ContextResolvers,
979 origin_detection_enabled: bool,
980 stream_log_too_big: bool,
981 additional_tags: Arc<[String]>,
982 capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
983 traffic_capture: TrafficCapture,
984 packet_forwarder: Option<PacketForwarder>,
985}
986
987#[async_trait]
988impl Source for DogStatsD {
989 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
990 let mut global_shutdown = context.take_shutdown_handle();
991 let mut health = context.take_health_handle();
992
993 let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
994
995 for listener in self.listeners {
997 let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
998
999 let listener_context = ListenerContext {
1006 shutdown_handle: listener_shutdown_coordinator.register(),
1007 listener,
1008 io_buffer_pool: self.io_buffer_pool.clone(),
1009 codec: self.codec.clone(),
1010 context_resolvers: self.context_resolvers.clone(),
1011 origin_detection_enabled: self.origin_detection_enabled,
1012 stream_log_too_big: self.stream_log_too_big,
1013 eol_required: self.eol_required,
1014 additional_tags: self.additional_tags.clone(),
1015 capture_entity_resolver: self.capture_entity_resolver.clone(),
1016 traffic_capture: self.traffic_capture.clone(),
1017 packet_forwarder_target: self.packet_forwarder_target.clone(),
1018 };
1019
1020 spawn_traced_named(
1021 task_name,
1022 process_listener(context.clone(), listener_context, self.enabled_filter),
1023 );
1024 }
1025
1026 health.mark_ready();
1027 debug!("DogStatsD source started.");
1028
1029 loop {
1034 select! {
1035 _ = &mut global_shutdown => {
1036 debug!("Received shutdown signal.");
1037 break
1038 },
1039 _ = health.live() => continue,
1040 }
1041 }
1042
1043 debug!("Stopping DogStatsD source...");
1044
1045 listener_shutdown_coordinator.shutdown().await;
1046
1047 debug!("DogStatsD source stopped.");
1048
1049 Ok(())
1050 }
1051}
1052
1053async fn process_listener(
1054 source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
1055) {
1056 let ListenerContext {
1057 shutdown_handle,
1058 mut listener,
1059 io_buffer_pool,
1060 codec,
1061 context_resolvers,
1062 origin_detection_enabled,
1063 stream_log_too_big,
1064 eol_required,
1065 additional_tags,
1066 capture_entity_resolver,
1067 traffic_capture,
1068 packet_forwarder_target,
1069 } = listener_context;
1070 tokio::pin!(shutdown_handle);
1071
1072 let listen_addr = listener.listen_address().clone();
1073 let metrics = build_metrics(&listen_addr, source_context.component_context());
1074 let packet_forwarder = packet_forwarder_target
1075 .as_ref()
1076 .map(|target| target.to_forwarder(metrics.clone()));
1077 if let Some(packet_forwarder) = &packet_forwarder {
1078 packet_forwarder.spawn_connect();
1079 }
1080
1081 let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
1082 let mut stream_idx: u32 = 0;
1083
1084 info!(%listen_addr, "DogStatsD listener started.");
1085
1086 loop {
1087 select! {
1088 _ = &mut shutdown_handle => {
1089 debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
1090 break;
1091 }
1092 result = listener.accept() => match result {
1093 Ok(stream) => {
1094 debug!(%listen_addr, "Spawning new stream handler.");
1095
1096 let handler_context = HandlerContext {
1097 listen_addr: listen_addr.clone(),
1098 framer: get_framer(&listen_addr, eol_required.for_listener(&listen_addr)),
1099 codec: codec.clone(),
1100 io_buffer_pool: io_buffer_pool.clone(),
1101 metrics: metrics.clone(),
1102 context_resolvers: context_resolvers.clone(),
1103 origin_detection_enabled,
1104 stream_log_too_big,
1105 additional_tags: additional_tags.clone(),
1106 capture_entity_resolver: capture_entity_resolver.clone(),
1107 traffic_capture: traffic_capture.clone(),
1108 packet_forwarder: packet_forwarder.clone(),
1109 };
1110
1111 let task_name = format!(
1112 "dogstatsd-stream-handler-{}-{}",
1113 listen_addr.listener_type(),
1114 stream_idx,
1115 );
1116 stream_idx = stream_idx.wrapping_add(1);
1117 spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
1118 }
1119 Err(e) => {
1120 error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
1121 break
1122 }
1123 }
1124 }
1125 }
1126
1127 stream_shutdown_coordinator.shutdown().await;
1128
1129 info!(%listen_addr, "DogStatsD listener stopped.");
1130}
1131
1132async fn process_stream(
1133 stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
1134 shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
1135) {
1136 tokio::pin!(shutdown_handle);
1137
1138 select! {
1139 _ = &mut shutdown_handle => {
1140 debug!("Stream handler received shutdown signal.");
1141 },
1142 _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
1143 }
1144}
1145
1146async fn drive_stream(
1147 mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
1148 enabled_filter: EnablePayloadsFilter,
1149) {
1150 let HandlerContext {
1151 listen_addr,
1152 mut framer,
1153 codec,
1154 io_buffer_pool,
1155 metrics,
1156 mut context_resolvers,
1157 origin_detection_enabled,
1158 stream_log_too_big,
1159 additional_tags,
1160 capture_entity_resolver,
1161 traffic_capture,
1162 packet_forwarder,
1163 } = handler_context;
1164
1165 debug!(%listen_addr, "Stream handler started.");
1166
1167 if !stream.is_connectionless() {
1168 metrics.connections_active().increment(1);
1169 }
1170
1171 let mut stream_capture = StreamCaptureState::new();
1172 let mut buffer_flush = interval(Duration::from_millis(100));
1175 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
1176
1177 let mut event_buffer_manager = EventBufferManager::default();
1178 let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
1179 let memory_limiter = source_context.topology_context().memory_limiter();
1180
1181 'read: loop {
1182 let mut eof = false;
1183
1184 let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
1185
1186 memory_limiter.wait_for_capacity().await;
1187
1188 select! {
1189 read_result = stream.receive(&mut io_buffer) => match read_result {
1191 Ok((bytes_read, peer_addr)) => {
1192 if bytes_read == 0 {
1193 eof = true;
1194 }
1195
1196 let is_connectionless = stream.is_connectionless();
1197 let payload = received_payload(io_buffer, bytes_read);
1198
1199 capture_uds_traffic(
1200 &listen_addr,
1201 &traffic_capture,
1202 capture_entity_resolver.as_deref(),
1203 &peer_addr,
1204 payload,
1205 &mut stream_capture,
1206 );
1207
1208 if is_connectionless {
1209 metrics.packet_receive_success().increment(1);
1210 }
1211 metrics.bytes_received().increment(bytes_read as u64);
1212 metrics.bytes_received_size().record(bytes_read as f64);
1213 let origin_detection_failed =
1214 origin_detection_enabled && bytes_read > 0 && peer_addr.has_process_credential_error();
1215 if origin_detection_failed && is_connectionless {
1216 metrics.origin_detection_errors().increment(1);
1217 }
1218
1219 let reached_eof = eof || is_connectionless;
1225
1226 trace!(
1227 buffer_len = io_buffer.remaining(),
1228 buffer_cap = io_buffer.remaining_mut(),
1229 eof = reached_eof,
1230 %listen_addr,
1231 %peer_addr,
1232 "Received {} bytes from stream.",
1233 bytes_read
1234 );
1235
1236 'frame: loop {
1237 let frame_result = framer.next_frame(io_buffer, reached_eof);
1238 let completed_outer_frames = framer.take_completed_outer_frames();
1239 if !is_connectionless && completed_outer_frames > 0 {
1240 metrics.packet_receive_success().increment(completed_outer_frames as u64);
1241 }
1242 if origin_detection_failed && completed_outer_frames > 0 {
1243 metrics.origin_detection_errors().increment(completed_outer_frames as u64);
1244 }
1245
1246 match frame_result {
1247 Ok(Some(frame)) => {
1248 trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
1249 if let Some(forwarder) = &packet_forwarder {
1250 forwarder.forward(frame.clone()).await;
1251 }
1252 match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
1253 Ok(Some(event)) => {
1254 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
1255 debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
1256 dispatch_events(event_buffer, &source_context, &listen_addr).await;
1257 }
1258 },
1259 Ok(None) => {
1260 continue
1265 },
1266 Err(e) => {
1267 let frame_lossy_str = String::from_utf8_lossy(&frame);
1268 warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
1269 },
1270 }
1271 }
1272 Err(e) => {
1273 metrics.framing_errors().increment(1);
1274 if should_warn_stream_log_too_big(&listen_addr, &e, stream_log_too_big) {
1275 warn!(
1276 %listen_addr,
1277 %peer_addr,
1278 error = %e,
1279 "DogStatsD stream frame exceeded the configured buffer size."
1280 );
1281 }
1282
1283 if stream.is_connectionless() {
1284 io_buffer.clear();
1285 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
1288 continue 'read;
1289 } else {
1290 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
1291 break 'read;
1292 }
1293 }
1294 Ok(None) => {
1295 trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
1296 if eof && !stream.is_connectionless() {
1297 debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
1298 break 'read;
1299 } else {
1300 break 'frame;
1301 }
1302 }
1303 }
1304 }
1305 },
1306 Err(e) => {
1307 metrics.packet_receive_failure().increment(1);
1308
1309 if stream.is_connectionless() {
1310 warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
1313 continue 'read;
1314 } else {
1315 warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
1316 break 'read;
1317 }
1318 }
1319 },
1320
1321 _ = buffer_flush.tick() => {
1322 if let Some(event_buffer) = event_buffer_manager.consume() {
1323 dispatch_events(event_buffer, &source_context, &listen_addr).await;
1324 }
1325 },
1326
1327 }
1328 }
1329
1330 if let Some(event_buffer) = event_buffer_manager.consume() {
1331 dispatch_events(event_buffer, &source_context, &listen_addr).await;
1332 }
1333
1334 metrics.connections_active().decrement(1);
1335
1336 debug!(%listen_addr, "Stream handler stopped.");
1337}
1338
1339fn should_warn_stream_log_too_big(listen_addr: &ListenAddress, error: &FramingError, stream_log_too_big: bool) -> bool {
1340 stream_log_too_big
1341 && matches!(listen_addr, ListenAddress::Unix(_))
1342 && matches!(error, FramingError::InvalidFrame { .. })
1343}
1344
1345fn capture_uds_traffic(
1346 listen_addr: &ListenAddress, traffic_capture: &TrafficCapture,
1347 capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, peer_addr: &ConnectionAddress,
1348 payload: &[u8], stream_capture: &mut StreamCaptureState,
1349) {
1350 if payload.is_empty() || !traffic_capture.is_ongoing() {
1351 return;
1352 }
1353
1354 match listen_addr {
1355 ListenAddress::Unixgram(_) => {
1356 let _ = traffic_capture.enqueue(build_capture_record(
1357 capture_entity_resolver,
1358 process_id_from_peer_addr(peer_addr),
1359 payload,
1360 ));
1361 }
1362 ListenAddress::Unix(_) => {
1363 stream_capture.update_peer_metadata(peer_addr);
1364 stream_capture.pending.extend(payload);
1365
1366 while let Ok(Some(outer_payload)) = stream_capture
1367 .outer_framer
1368 .next_frame(&mut stream_capture.pending, false)
1369 {
1370 let _ = traffic_capture.enqueue(build_capture_record(
1371 capture_entity_resolver,
1372 stream_capture.last_pid,
1373 &outer_payload,
1374 ));
1375 }
1376 }
1377 _ => {}
1378 }
1379}
1380
1381struct StreamCaptureState {
1382 outer_framer: LengthDelimitedFramer,
1383 pending: VecDeque<u8>,
1384 last_pid: Option<i32>,
1385}
1386
1387impl StreamCaptureState {
1388 fn new() -> Self {
1389 Self {
1390 outer_framer: LengthDelimitedFramer,
1391 pending: VecDeque::new(),
1392 last_pid: None,
1393 }
1394 }
1395
1396 fn update_peer_metadata(&mut self, peer_addr: &ConnectionAddress) {
1397 if let Some(process_id) = process_id_from_peer_addr(peer_addr) {
1398 self.last_pid = Some(process_id);
1399 }
1400 }
1401}
1402
1403fn build_capture_record(
1404 capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, process_id: Option<i32>,
1405 payload: &[u8],
1406) -> CaptureRecord {
1407 CaptureRecord {
1408 timestamp_ns: capture_timestamp_ns(),
1409 payload: payload.to_vec(),
1410 pid: process_id,
1411 ancillary: Vec::new(),
1412 container_id: resolve_capture_container_id(capture_entity_resolver, process_id),
1413 }
1414}
1415
1416fn resolve_capture_container_id(
1417 capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, process_id: Option<i32>,
1418) -> Option<String> {
1419 let process_id = u32::try_from(process_id?).ok()?;
1420 capture_entity_resolver
1421 .and_then(|resolver| resolver.resolve_container_entity_for_live_pid(process_id))
1422 .map(|entity_id| entity_id.to_string())
1423}
1424
1425fn process_id_from_peer_addr(peer_addr: &ConnectionAddress) -> Option<i32> {
1426 match peer_addr {
1427 ConnectionAddress::ProcessLike(ProcessIdentity::Credentials(creds)) => Some(creds.pid),
1428 _ => None,
1429 }
1430}
1431
1432fn apply_credentials_to_origin(origin: &mut RawOrigin<'_>, creds: &ProcessCredentials) {
1438 if creds.gid == REPLAY_CREDENTIALS_GID {
1439 origin.set_process_id(mark_replay_process_id(creds.uid));
1440 } else {
1441 origin.set_process_id(creds.pid as u32);
1442 }
1443}
1444
1445fn received_payload(buffer: &BytesBuffer, bytes_read: usize) -> &[u8] {
1446 let chunk = buffer.chunk();
1447 let start = chunk.len().saturating_sub(bytes_read);
1448 &chunk[start..]
1449}
1450
1451fn capture_timestamp_ns() -> i64 {
1452 SystemTime::now()
1453 .duration_since(UNIX_EPOCH)
1454 .map(|duration| duration.as_nanos().min(i64::MAX as u128) as i64)
1455 .unwrap_or_default()
1456}
1457
1458fn handle_frame(
1459 frame: &[u8], codec: &DogStatsDCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
1460 peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
1461) -> Result<Option<Event>, ParseError> {
1462 let parsed = match codec.decode_packet(frame) {
1463 Ok(parsed) => parsed,
1464 Err(e) => {
1465 match parse_message_type(frame) {
1467 MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
1468 MessageType::Event => source_metrics.event_decode_failed().increment(1),
1469 MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
1470 }
1471
1472 return Err(e);
1473 }
1474 };
1475
1476 let event = match parsed {
1477 ParsedPacket::Metric(metric_packet) => {
1478 if metric_packet.num_points == 0 {
1479 return Ok(None);
1480 }
1481 let events_len = metric_packet.num_points;
1482 if !enabled_filter.allow_metric(&metric_packet) {
1483 trace!(
1484 metric.name = metric_packet.metric_name,
1485 "Skipping metric due to filter configuration."
1486 );
1487 return Ok(None);
1488 }
1489
1490 match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
1491 Some(metric) => {
1492 source_metrics.metrics_received().increment(events_len);
1493 Event::Metric(metric)
1494 }
1495 None => {
1496 source_metrics.failed_context_resolve_total().increment(1);
1498 return Ok(None);
1499 }
1500 }
1501 }
1502 ParsedPacket::Event(event) => {
1503 if !enabled_filter.allow_event(&event) {
1504 trace!("Skipping event {} due to filter configuration.", event.title);
1505 return Ok(None);
1506 }
1507 let tags_resolver = context_resolvers.tags();
1508 match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
1509 Some(event) => {
1510 source_metrics.events_received().increment(1);
1511 Event::EventD(event)
1512 }
1513 None => {
1514 source_metrics.failed_context_resolve_total().increment(1);
1515 return Ok(None);
1516 }
1517 }
1518 }
1519 ParsedPacket::ServiceCheck(service_check) => {
1520 if !enabled_filter.allow_service_check(&service_check) {
1521 trace!(
1522 "Skipping service check {} due to filter configuration.",
1523 service_check.name
1524 );
1525 return Ok(None);
1526 }
1527 let tags_resolver = context_resolvers.tags();
1528 match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
1529 Some(service_check) => {
1530 source_metrics.service_checks_received().increment(1);
1531 Event::ServiceCheck(service_check)
1532 }
1533 None => {
1534 source_metrics.failed_context_resolve_total().increment(1);
1535 return Ok(None);
1536 }
1537 }
1538 }
1539 };
1540
1541 Ok(Some(event))
1542}
1543
1544fn handle_metric_packet(
1545 packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1546 additional_tags: &[String],
1547) -> Option<Metric> {
1548 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1549
1550 let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1551 if let Some(creds) = peer_addr.process_credentials() {
1552 apply_credentials_to_origin(&mut origin, creds);
1553 }
1554
1555 let context_resolver = if packet.timestamp.is_some() {
1557 context_resolvers.no_agg()
1558 } else {
1559 context_resolvers.primary()
1560 };
1561
1562 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1563
1564 match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1566 Some(context) => {
1567 let metric_origin = well_known_tags
1568 .jmx_check_name
1569 .map(MetricOrigin::jmx_check)
1570 .unwrap_or_else(MetricOrigin::dogstatsd);
1571 let metadata = MetricMetadata::default()
1572 .with_origin(metric_origin)
1573 .with_hostname(well_known_tags.hostname.map(Arc::from))
1574 .with_unit(packet.unit.map_or_else(MetaString::empty, MetaString::from_static));
1575
1576 Some(Metric::from_parts(context, packet.values, metadata))
1577 }
1578 None => None,
1580 }
1581}
1582
1583fn handle_event_packet(
1584 packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1585) -> Option<EventD> {
1586 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1587
1588 let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1589 if let Some(creds) = peer_addr.process_credentials() {
1590 apply_credentials_to_origin(&mut origin, creds);
1591 }
1592 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1593
1594 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1595 let tags = tags_resolver.create_tag_set(tags)?;
1596
1597 let timestamp = packet
1601 .timestamp
1602 .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1603
1604 let eventd = EventD::new(packet.title, packet.text)
1605 .with_timestamp(timestamp)
1606 .with_hostname(packet.hostname.map(|s| s.into()))
1607 .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1608 .with_alert_type(packet.alert_type)
1609 .with_priority(packet.priority)
1610 .with_source_type_name(Some(
1615 packet
1616 .source_type_name
1617 .map(|s| s.into())
1618 .unwrap_or_else(|| "api".into()),
1619 ))
1620 .with_alert_type(packet.alert_type)
1621 .with_tags(tags)
1622 .with_origin_tags(origin_tags);
1623
1624 Some(eventd)
1625}
1626
1627fn handle_service_check_packet(
1628 packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1629 additional_tags: &[String],
1630) -> Option<ServiceCheck> {
1631 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1632
1633 let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1634 if let Some(creds) = peer_addr.process_credentials() {
1635 apply_credentials_to_origin(&mut origin, creds);
1636 }
1637 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1638
1639 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1640 let tags = tags_resolver.create_tag_set(tags)?;
1641
1642 let timestamp = packet
1646 .timestamp
1647 .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1648
1649 let service_check = ServiceCheck::new(packet.name, packet.status)
1650 .with_timestamp(timestamp)
1651 .with_hostname(packet.hostname.map(|s| s.into()))
1652 .with_tags(tags)
1653 .with_origin_tags(origin_tags)
1654 .with_message(packet.message.map(|s| s.into()));
1655
1656 Some(service_check)
1657}
1658
1659fn get_filtered_tags_iterator<'a>(
1660 raw_tags: RawTags<'a>, additional_tags: &'a [String],
1661) -> impl Iterator<Item = &'a str> + Clone {
1662 RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1665}
1666
1667async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1668 debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1669
1670 if event_buffer.has_event_type(EventType::EventD) {
1680 let eventd_events = event_buffer.extract(Event::is_eventd);
1681 if let Err(e) = source_context
1682 .dispatcher()
1683 .buffered_named("events")
1684 .expect("events output should always exist")
1685 .send_all(eventd_events)
1686 .await
1687 {
1688 error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1689 }
1690 }
1691
1692 if event_buffer.has_event_type(EventType::ServiceCheck) {
1694 let service_check_events = event_buffer.extract(Event::is_service_check);
1695 if let Err(e) = source_context
1696 .dispatcher()
1697 .buffered_named("service_checks")
1698 .expect("service checks output should always exist")
1699 .send_all(service_check_events)
1700 .await
1701 {
1702 error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1703 }
1704 }
1705
1706 if !event_buffer.is_empty() {
1708 if let Err(e) = source_context
1709 .dispatcher()
1710 .dispatch_named("metrics", event_buffer)
1711 .await
1712 {
1713 error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1714 }
1715 }
1716}
1717
1718const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1719 buffer_size + 4
1734}
1735
1736#[cfg(test)]
1737mod tests {
1738 use std::{
1739 collections::HashMap,
1740 io::ErrorKind,
1741 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
1742 path::PathBuf,
1743 sync::{Arc, OnceLock},
1744 time::Duration,
1745 };
1746
1747 use bytes::Bytes;
1748 use bytesize::ByteSize;
1749 use saluki_config::ConfigurationLoader;
1750 use saluki_context::{origin::RawOrigin, ContextResolverBuilder, TagsResolverBuilder};
1751 use saluki_core::{components::ComponentContext, topology::ComponentId};
1752 use saluki_env::workload::{CaptureEntityResolver, EntityId};
1753 use saluki_io::{
1754 deser::codec::dogstatsd::{DogStatsDCodec, DogStatsDCodecConfiguration, ParsedPacket},
1755 net::{ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity},
1756 };
1757 use saluki_metrics::test::TestRecorder;
1758 use serde_json::json;
1759 use stringtheory::MetaString;
1760 use tokio::{net::UdpSocket, sync::mpsc, time::timeout};
1761
1762 use super::{
1763 forwarder::{
1764 ConnectedPacketForwarder, ForwardPacket, PacketForwarder, PacketForwarderTarget, FORWARDER_QUEUE_CAPACITY,
1765 },
1766 handle_metric_packet,
1767 metrics::build_metrics,
1768 resolve_capture_container_id, ContextResolvers, DogStatsDConfiguration, DOGSTATSD_CAPTURE_DIR,
1769 MIN_CAPTURE_DEPTH,
1770 };
1771
1772 const LINUX_EAFNOSUPPORT: i32 = 97;
1773 const MACOS_EAFNOSUPPORT: i32 = 47;
1774
1775 fn is_ipv6_unavailable_error(error: &std::io::Error) -> bool {
1776 matches!(error.kind(), ErrorKind::AddrNotAvailable | ErrorKind::Unsupported)
1777 || matches!(error.raw_os_error(), Some(LINUX_EAFNOSUPPORT | MACOS_EAFNOSUPPORT))
1778 }
1779
1780 #[derive(Default)]
1781 struct CaptureTestEntityResolver {
1782 pid_map: HashMap<u32, EntityId>,
1783 }
1784
1785 impl CaptureTestEntityResolver {
1786 fn with_pid_mapping(process_id: u32, entity_id: EntityId) -> Self {
1787 let mut pid_map = HashMap::new();
1788 pid_map.insert(process_id, entity_id);
1789 Self { pid_map }
1790 }
1791 }
1792
1793 impl CaptureEntityResolver for CaptureTestEntityResolver {
1794 fn resolve_container_entity_for_live_pid(&self, process_id: u32) -> Option<EntityId> {
1795 self.pid_map.get(&process_id).cloned()
1796 }
1797 }
1798
1799 fn packet_forwarder_from_sender(
1800 target_port: u16, packets_tx: mpsc::Sender<ForwardPacket>, metrics: super::metrics::Metrics,
1801 ) -> PacketForwarder {
1802 let mut forwarder =
1803 PacketForwarderTarget::new(MetaString::from_static("127.0.0.1"), target_port).to_forwarder(metrics);
1804 forwarder.connected = Arc::new(OnceLock::from(packets_tx));
1805 forwarder
1806 }
1807
1808 #[test]
1809 fn no_metrics_when_interner_full_allocations_disallowed() {
1810 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1818 let tags_resolver = TagsResolverBuilder::for_tests().build();
1819 let context_resolver = ContextResolverBuilder::for_tests()
1820 .with_heap_allocations(false)
1821 .with_tags_resolver(Some(tags_resolver.clone()))
1822 .build();
1823 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1824 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1825
1826 let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1827
1828 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1829 panic!("Failed to parse packet.");
1830 };
1831
1832 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1833 assert!(maybe_metric.is_none());
1834 }
1835
1836 #[test]
1837 fn metric_with_additional_tags() {
1838 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1839 let tags_resolver = TagsResolverBuilder::for_tests().build();
1840 let context_resolver = ContextResolverBuilder::for_tests()
1841 .with_heap_allocations(false)
1842 .with_tags_resolver(Some(tags_resolver.clone()))
1843 .build();
1844 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1845 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1846
1847 let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1848 let existing_tags_str = existing_tags.join(",");
1849
1850 let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1851 let additional_tags = [
1852 "tag4:value4".to_string(),
1853 "tag5:value5".to_string(),
1854 "tag6:value6".to_string(),
1855 ];
1856
1857 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1858 panic!("Failed to parse packet.");
1859 };
1860 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1861 assert!(maybe_metric.is_some());
1862
1863 let metric = maybe_metric.unwrap();
1864 let context = metric.context();
1865
1866 for tag in existing_tags {
1867 assert!(context.tags().has_tag(tag));
1868 }
1869
1870 for tag in additional_tags {
1871 assert!(context.tags().has_tag(tag));
1872 }
1873 }
1874
1875 fn deser_config(json: &str) -> DogStatsDConfiguration {
1876 serde_json::from_str(json).expect("failed to deserialize config")
1877 }
1878
1879 fn udp_listen_address() -> ListenAddress {
1880 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)))
1881 }
1882
1883 fn tcp_listen_address() -> ListenAddress {
1884 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)))
1885 }
1886
1887 #[test]
1888 fn interner_size_defaults_to_2mib() {
1889 let config = deser_config("{}");
1890 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
1891 }
1892
1893 #[test]
1894 fn socket_receive_buffer_size_defaults_to_zero() {
1895 let config = deser_config("{}");
1896 assert_eq!(config.socket_receive_buffer_size, 0);
1897 }
1898
1899 #[test]
1900 fn socket_receive_buffer_size_from_config() {
1901 let config = deser_config(r#"{"dogstatsd_so_rcvbuf": 131072}"#);
1902 assert_eq!(config.socket_receive_buffer_size, 131_072);
1903 }
1904
1905 #[test]
1906 fn stream_log_too_big_defaults_to_false() {
1907 let config = deser_config("{}");
1908 assert!(!config.stream_log_too_big);
1909 }
1910
1911 #[test]
1912 fn stream_log_too_big_from_config() {
1913 let config = deser_config(r#"{"dogstatsd_stream_log_too_big": true}"#);
1914 assert!(config.stream_log_too_big);
1915 }
1916
1917 #[test]
1918 fn statsd_forward_defaults_disabled() {
1919 let config = deser_config("{}");
1920 assert!(config.statsd_forward_host.is_none());
1921 assert_eq!(config.statsd_forward_port, 0);
1922 assert!(config.statsd_forward_target().is_none());
1923 }
1924
1925 #[test]
1926 fn statsd_forward_empty_host_disabled() {
1927 let config = deser_config(r#"{"statsd_forward_host": "", "statsd_forward_port": 9125}"#);
1928 assert!(config.statsd_forward_host.is_none());
1929 assert!(config.statsd_forward_target().is_none());
1930 }
1931
1932 #[test]
1933 fn statsd_forward_zero_port_disabled() {
1934 let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 0}"#);
1935 assert_eq!(config.statsd_forward_host.as_deref(), Some("127.0.0.1"));
1936 assert!(config.statsd_forward_target().is_none());
1937 }
1938
1939 #[test]
1940 fn statsd_forward_host_and_port_enabled() {
1941 let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 9125}"#);
1942 let (host, port) = config.statsd_forward_target().expect("forwarding should be enabled");
1943 assert_eq!(host.as_ref(), "127.0.0.1");
1944 assert_eq!(port, 9125);
1945 }
1946
1947 #[test]
1948 fn statsd_forward_invalid_target_still_builds_forwarder_handle() {
1949 let config = deser_config(r#"{"statsd_forward_host": "not a valid host", "statsd_forward_port": 9125}"#);
1950 assert!(config.packet_forwarder_target().is_some());
1951 }
1952
1953 #[tokio::test]
1954 async fn packet_forwarder_sends_payload_bytes() {
1955 let receiver = UdpSocket::bind("127.0.0.1:0").await.expect("receiver should bind");
1956 let receiver_addr = receiver.local_addr().expect("receiver should have an address");
1957 let forwarder = ConnectedPacketForwarder::connect("127.0.0.1", receiver_addr.port())
1958 .await
1959 .expect("forwarder should connect");
1960 let payload = b"daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2";
1961
1962 let recorder = TestRecorder::default();
1963 let _recorder_guard = metrics::set_default_local_recorder(&recorder);
1964 let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
1965 let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
1966 let metrics = build_metrics(&listen_addr, &context);
1967 let (packets_tx, packets_rx) = mpsc::channel(1);
1968 let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
1969 let packet_forwarder = packet_forwarder_from_sender(receiver_addr.port(), packets_tx, metrics);
1970
1971 packet_forwarder.forward(Bytes::copy_from_slice(payload)).await;
1972
1973 let mut actual = [0u8; 128];
1974 let (received_len, _) = timeout(Duration::from_secs(1), receiver.recv_from(&mut actual))
1975 .await
1976 .expect("receive should not time out")
1977 .expect("receiver should receive payload");
1978
1979 assert_eq!(&actual[..received_len], payload);
1980 assert_eq!(
1981 recorder.counter((
1982 "component_packets_forwarded_total",
1983 &[
1984 ("component_id", "dogstatsd_test"),
1985 ("component_type", "source"),
1986 ("listener_type", "udp"),
1987 ("state", "ok"),
1988 ]
1989 )),
1990 Some(1)
1991 );
1992 assert_eq!(
1993 recorder.counter((
1994 "component_bytes_forwarded_total",
1995 &[
1996 ("component_id", "dogstatsd_test"),
1997 ("component_type", "source"),
1998 ("listener_type", "udp"),
1999 ]
2000 )),
2001 Some(payload.len() as u64)
2002 );
2003 worker.abort();
2004 }
2005
2006 #[tokio::test]
2007 async fn packet_forwarder_sends_payload_bytes_to_ipv6_target() {
2008 let receiver = match UdpSocket::bind("[::1]:0").await {
2009 Ok(receiver) => receiver,
2010 Err(e) if is_ipv6_unavailable_error(&e) => return,
2011 Err(e) => panic!("receiver should bind: {e}"),
2012 };
2013 let receiver_addr = receiver.local_addr().expect("receiver should have an address");
2014 let forwarder = ConnectedPacketForwarder::connect("::1", receiver_addr.port())
2015 .await
2016 .expect("forwarder should connect");
2017 let payload = b"daemon:666|g|#ip:6";
2018
2019 let recorder = TestRecorder::default();
2020 let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2021 let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2022 let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2023 let metrics = build_metrics(&listen_addr, &context);
2024 let (packets_tx, packets_rx) = mpsc::channel(1);
2025 let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2026 let packet_forwarder = packet_forwarder_from_sender(receiver_addr.port(), packets_tx, metrics);
2027
2028 packet_forwarder.forward(Bytes::copy_from_slice(payload)).await;
2029
2030 let mut actual = [0u8; 128];
2031 let (received_len, _) = timeout(Duration::from_secs(1), receiver.recv_from(&mut actual))
2032 .await
2033 .expect("receive should not time out")
2034 .expect("receiver should receive payload");
2035
2036 assert_eq!(&actual[..received_len], payload);
2037 worker.abort();
2038 }
2039
2040 #[tokio::test]
2041 async fn packet_forwarder_waits_when_queue_is_full() {
2042 let recorder = TestRecorder::default();
2043 let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2044 let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2045 let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2046 let metrics = build_metrics(&listen_addr, &context);
2047 let (packets_tx, _packets_rx) = mpsc::channel(FORWARDER_QUEUE_CAPACITY);
2048 let packet_forwarder = packet_forwarder_from_sender(9125, packets_tx, metrics);
2049
2050 for _ in 0..FORWARDER_QUEUE_CAPACITY {
2051 packet_forwarder.forward(Bytes::from_static(b"queued:1|c")).await;
2052 }
2053
2054 assert!(
2055 timeout(
2056 Duration::from_millis(100),
2057 packet_forwarder.forward(Bytes::from_static(b"blocked:1|c")),
2058 )
2059 .await
2060 .is_err(),
2061 "forwarding should wait for queue capacity instead of dropping"
2062 );
2063 }
2064
2065 #[tokio::test]
2066 async fn packet_forwarder_send_error_increments_error_telemetry() {
2067 let recorder = TestRecorder::default();
2068 let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2069 let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2070 let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2071 let metrics = build_metrics(&listen_addr, &context);
2072 let socket = UdpSocket::bind("127.0.0.1:0").await.expect("socket should bind");
2073 let forwarder = ConnectedPacketForwarder {
2074 socket,
2075 target: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9125)),
2076 };
2077 let (packets_tx, packets_rx) = mpsc::channel(1);
2078 let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2079 let packet_forwarder = packet_forwarder_from_sender(9125, packets_tx, metrics);
2080
2081 packet_forwarder.forward(Bytes::from_static(b"daemon:666|g")).await;
2082
2083 let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
2084 loop {
2085 if recorder.counter((
2086 "component_packets_forwarded_total",
2087 &[
2088 ("component_id", "dogstatsd_test"),
2089 ("component_type", "source"),
2090 ("listener_type", "udp"),
2091 ("state", "error"),
2092 ],
2093 )) == Some(1)
2094 {
2095 break;
2096 }
2097
2098 assert!(
2099 tokio::time::Instant::now() < deadline,
2100 "forwarding error telemetry should be recorded"
2101 );
2102 tokio::time::sleep(Duration::from_millis(10)).await;
2103 }
2104 worker.abort();
2105 }
2106
2107 #[test]
2108 fn autoscale_udp_listeners_defaults_to_false() {
2109 let config = deser_config("{}");
2110 assert!(!config.autoscale_udp_listeners);
2111 assert!(config.udp_streams_to_yield().is_none());
2112 }
2113
2114 #[test]
2115 #[cfg(target_os = "linux")]
2116 fn autoscale_udp_listeners_from_config_linux() {
2117 let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#);
2118 assert!(config.autoscale_udp_listeners);
2119
2120 let streams = config
2121 .udp_streams_to_yield()
2122 .expect("autoscale yields at least 1 stream");
2123 let n = streams.get();
2124 assert!(
2125 (1..=4).contains(&n),
2126 "expected 1..=4 streams from vCPU formula, got {n}"
2127 );
2128 }
2129
2130 #[test]
2131 #[cfg(not(target_os = "linux"))]
2132 fn autoscale_udp_listeners_from_config_non_linux() {
2133 let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#);
2134 assert!(config.autoscale_udp_listeners);
2135
2136 assert_eq!(None, config.udp_streams_to_yield());
2137 }
2138
2139 #[test]
2140 fn eol_required_defaults_to_no_listeners() {
2141 let config = deser_config("{}");
2142 let eol_required = config.eol_required();
2143
2144 assert!(!eol_required.for_listener(&udp_listen_address()));
2145 assert!(!eol_required.for_listener(&tcp_listen_address()));
2146 }
2147
2148 #[test]
2149 fn eol_required_matches_configured_listener_types() {
2150 let config = deser_config(r#"{"dogstatsd_eol_required": ["udp", "uds"]}"#);
2151 let eol_required = config.eol_required();
2152
2153 assert!(eol_required.for_listener(&udp_listen_address()));
2154 assert!(!eol_required.for_listener(&tcp_listen_address()));
2155
2156 #[cfg(unix)]
2157 {
2158 assert!(eol_required.for_listener(&ListenAddress::Unixgram("/tmp/dsd.sock".into())));
2159 assert!(eol_required.for_listener(&ListenAddress::Unix("/tmp/dsd-stream.sock".into())));
2160 }
2161 }
2162
2163 #[test]
2164 fn eol_required_accepts_space_separated_string() {
2165 let config = deser_config(r#"{"dogstatsd_eol_required": "udp uds"}"#);
2166 let eol_required = config.eol_required();
2167
2168 assert!(eol_required.for_listener(&udp_listen_address()));
2169 }
2170
2171 #[test]
2172 fn stream_log_too_big_only_warns_for_enabled_unix_invalid_frames() {
2173 let uds_stream = ListenAddress::Unix("/tmp/dsd-stream.sock".into());
2174 let tcp_stream = ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2175 let error = saluki_io::deser::framing::FramingError::InvalidFrame {
2176 frame_len: 8193,
2177 reason: "frame length exceeds buffer capacity",
2178 };
2179
2180 assert!(super::should_warn_stream_log_too_big(&uds_stream, &error, true));
2181 assert!(!super::should_warn_stream_log_too_big(&uds_stream, &error, false));
2182 assert!(!super::should_warn_stream_log_too_big(&tcp_stream, &error, true));
2183 }
2184
2185 #[test]
2186 fn interner_size_from_entry_count() {
2187 let config = deser_config(r#"{"dogstatsd_string_interner_size": 4096}"#);
2189 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
2190 }
2191
2192 #[test]
2193 fn interner_size_from_explicit_bytes() {
2194 let config = deser_config(r#"{"dogstatsd_string_interner_size_bytes": 4194304}"#);
2195 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(4194304));
2196 }
2197
2198 #[test]
2199 fn interner_size_explicit_bytes_takes_priority() {
2200 let config = deser_config(
2201 r#"{"dogstatsd_string_interner_size": 4096, "dogstatsd_string_interner_size_bytes": 8388608}"#,
2202 );
2203 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(8388608));
2205 }
2206
2207 #[test]
2208 fn interner_size_custom_entry_count() {
2209 let config = deser_config(r#"{"dogstatsd_string_interner_size": 8192}"#);
2210 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(4));
2212 }
2213
2214 fn address_list_eq(expected: &mut [ListenAddress], actual: &mut [ListenAddress]) -> Result<(), String> {
2216 if expected.len() != actual.len() {
2217 return Err(format!(
2218 "length mismatch: expected {} addresses, got {}",
2219 expected.len(),
2220 actual.len()
2221 ));
2222 }
2223
2224 expected.sort_by_key(|a| a.to_string());
2225 actual.sort_by_key(|a| a.to_string());
2226
2227 for (e, a) in expected.iter().zip(actual.iter()) {
2228 let (es, as_) = (e.to_string(), a.to_string());
2229 if es != as_ {
2230 return Err(format!("address mismatch: expected {}, got {}", es, as_));
2231 }
2232 }
2233
2234 Ok(())
2235 }
2236
2237 #[test]
2240 fn build_addresses_assertion_function_works() {
2241 let config = DogStatsDConfiguration {
2242 port: 0,
2243 tcp_port: 123,
2244 socket_path: None,
2245 socket_stream_path: None,
2246 non_local_traffic: false,
2247 ..Default::default()
2248 };
2249 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2250 Ipv4Addr::new(127, 0, 0, 2),
2252 123,
2253 )))];
2254 let mut actual = config.build_addresses(None);
2255 assert!(address_list_eq(&mut expected, &mut actual).is_err())
2256 }
2257
2258 #[test]
2260 fn build_addresses_no_listeners() {
2261 let config = DogStatsDConfiguration {
2262 port: 0,
2263 tcp_port: 0,
2264 socket_path: None,
2265 socket_stream_path: None,
2266 non_local_traffic: false,
2267 ..Default::default()
2268 };
2269 let mut expected = vec![];
2270 let mut actual = config.build_addresses(None);
2271 address_list_eq(&mut expected, &mut actual).unwrap();
2272 }
2273
2274 #[test]
2276 fn build_addresses_udp_local_only() {
2277 let config = DogStatsDConfiguration {
2278 port: 8125,
2279 tcp_port: 0,
2280 socket_path: None,
2281 socket_stream_path: None,
2282 non_local_traffic: false,
2283 ..Default::default()
2284 };
2285 let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
2286 Ipv4Addr::new(127, 0, 0, 1),
2287 8125,
2288 )))];
2289 let mut actual = config.build_addresses(None);
2290 address_list_eq(&mut expected, &mut actual).unwrap();
2291 }
2292
2293 #[test]
2295 fn build_addresses_udp_non_local_only() {
2296 let config = DogStatsDConfiguration {
2297 port: 8125,
2298 tcp_port: 0,
2299 socket_path: None,
2300 socket_stream_path: None,
2301 non_local_traffic: true,
2302 ..Default::default()
2303 };
2304 let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
2305 Ipv4Addr::new(0, 0, 0, 0),
2306 8125,
2307 )))];
2308 let mut actual = config.build_addresses(None);
2309 address_list_eq(&mut expected, &mut actual).unwrap();
2310 }
2311
2312 #[test]
2314 fn build_addresses_tcp_local_only() {
2315 let config = DogStatsDConfiguration {
2316 port: 0,
2317 tcp_port: 9000,
2318 socket_path: None,
2319 socket_stream_path: None,
2320 non_local_traffic: false,
2321 ..Default::default()
2322 };
2323 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2324 Ipv4Addr::new(127, 0, 0, 1),
2325 9000,
2326 )))];
2327 let mut actual = config.build_addresses(None);
2328 address_list_eq(&mut expected, &mut actual).unwrap();
2329 }
2330
2331 #[test]
2333 fn build_addresses_tcp_non_local_only() {
2334 let config = DogStatsDConfiguration {
2335 port: 0,
2336 tcp_port: 9000,
2337 socket_path: None,
2338 socket_stream_path: None,
2339 non_local_traffic: true,
2340 ..Default::default()
2341 };
2342 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2343 Ipv4Addr::new(0, 0, 0, 0),
2344 9000,
2345 )))];
2346 let mut actual = config.build_addresses(None);
2347 address_list_eq(&mut expected, &mut actual).unwrap();
2348 }
2349
2350 #[test]
2352 fn build_addresses_unixgram_only() {
2353 let config = DogStatsDConfiguration {
2354 port: 0,
2355 tcp_port: 0,
2356 socket_path: Some("/tmp/dsd.sock".to_string()),
2357 socket_stream_path: None,
2358 non_local_traffic: false,
2359 ..Default::default()
2360 };
2361 let mut expected = vec![ListenAddress::Unixgram("/tmp/dsd.sock".into())];
2362 let mut actual = config.build_addresses(None);
2363 address_list_eq(&mut expected, &mut actual).unwrap();
2364 }
2365
2366 #[test]
2368 fn build_addresses_unix_stream_only() {
2369 let config = DogStatsDConfiguration {
2370 port: 0,
2371 tcp_port: 0,
2372 socket_path: None,
2373 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2374 non_local_traffic: false,
2375 ..Default::default()
2376 };
2377 let mut expected = vec![ListenAddress::Unix("/tmp/dsd-stream.sock".into())];
2378 let mut actual = config.build_addresses(None);
2379 address_list_eq(&mut expected, &mut actual).unwrap();
2380 }
2381
2382 #[test]
2384 fn build_addresses_all_four_non_local() {
2385 let config = DogStatsDConfiguration {
2386 port: 8125,
2387 tcp_port: 9000,
2388 socket_path: Some("/tmp/dsd.sock".to_string()),
2389 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2390 non_local_traffic: true,
2391 ..Default::default()
2392 };
2393 let mut expected = vec![
2394 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
2395 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
2396 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2397 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2398 ];
2399 let mut actual = config.build_addresses(None);
2400 address_list_eq(&mut expected, &mut actual).unwrap();
2401 }
2402
2403 #[test]
2405 fn build_addresses_all_four_local() {
2406 let config = DogStatsDConfiguration {
2407 port: 8125,
2408 tcp_port: 9000,
2409 socket_path: Some("/tmp/dsd.sock".to_string()),
2410 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2411 non_local_traffic: false,
2412 ..Default::default()
2413 };
2414 let mut expected = vec![
2415 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8125))),
2416 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9000))),
2417 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2418 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2419 ];
2420 let mut actual = config.build_addresses(None);
2421 address_list_eq(&mut expected, &mut actual).unwrap();
2422 }
2423
2424 #[test]
2427 fn build_addresses_bind_host_applies_to_udp_and_tcp() {
2428 let config = DogStatsDConfiguration {
2429 port: 8125,
2430 tcp_port: 9000,
2431 socket_path: Some("/tmp/dsd.sock".to_string()),
2432 socket_stream_path: None,
2433 non_local_traffic: false,
2434 ..Default::default()
2435 };
2436 let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
2437 let mut expected = vec![
2438 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 8125))),
2439 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 9000))),
2440 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2441 ];
2442 let mut actual = config.build_addresses(bind_host);
2443 address_list_eq(&mut expected, &mut actual).unwrap();
2444 }
2445
2446 #[test]
2450 fn build_addresses_non_local_clobbers_bind_host() {
2451 let config = DogStatsDConfiguration {
2452 port: 8125,
2453 tcp_port: 9000,
2454 socket_path: None,
2455 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2456 non_local_traffic: true,
2457 ..Default::default()
2458 };
2459 let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
2460 let mut expected = vec![
2461 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
2462 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
2463 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2464 ];
2465 let mut actual = config.build_addresses(bind_host);
2466 address_list_eq(&mut expected, &mut actual).unwrap();
2467 }
2468
2469 #[test]
2470 fn non_finite_metric_values_are_silently_dropped() {
2471 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
2476 for input in &[b"my.gauge:NaN|g" as &[u8], b"my.gauge:inf|g", b"my.gauge:-inf|g"] {
2477 match codec.decode_packet(input).expect("should decode without error") {
2478 ParsedPacket::Metric(packet) => assert_eq!(
2479 packet.num_points, 0,
2480 "non-finite value should be dropped, leaving 0 valid points"
2481 ),
2482 _ => panic!("expected Metric packet"),
2483 }
2484 }
2485 }
2486
2487 #[tokio::test]
2488 async fn fix_empty_capture_path_sets_path_from_run_path() {
2489 const RUN_PATH: &str = "/my/little/run_path";
2490
2491 let base_config_values = json!({ "run_path": RUN_PATH });
2492 let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2493
2494 let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2495
2496 let expected = PathBuf::from(RUN_PATH).join(DOGSTATSD_CAPTURE_DIR);
2497 assert_eq!(expected, dogstatsd_config.capture_path);
2498 }
2499
2500 #[tokio::test]
2501 async fn fix_empty_capture_path_keeps_explicit_path() {
2502 const RUN_PATH: &str = "/my/little/run_path";
2503 const CAPTURE_PATH: &str = "/custom/path/to/capture";
2504
2505 let base_config_values = json!({ "run_path": RUN_PATH, "dogstatsd_capture_path": CAPTURE_PATH });
2506 let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2507
2508 let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2509
2510 assert_eq!(PathBuf::from(CAPTURE_PATH), dogstatsd_config.capture_path);
2511 }
2512
2513 #[tokio::test]
2514 async fn from_configuration_normalizes_capture_depth() {
2515 let cases = [
2516 (json!({}), MIN_CAPTURE_DEPTH),
2517 (json!({ "dogstatsd_capture_depth": 0 }), MIN_CAPTURE_DEPTH),
2518 (json!({ "dogstatsd_capture_depth": 2048 }), 2048),
2519 ];
2520
2521 for (base_config_values, expected_depth) in cases {
2522 let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2523 let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2524
2525 assert_eq!(expected_depth, dogstatsd_config.capture_depth);
2526 }
2527 }
2528
2529 #[test]
2530 fn capture_entity_resolver_is_configured_separately_from_workload_provider() {
2531 let config =
2532 DogStatsDConfiguration::default().with_capture_entity_resolver(CaptureTestEntityResolver::default());
2533
2534 assert!(config.capture_entity_resolver.is_some());
2535 assert!(config.workload_provider.is_none());
2536 }
2537
2538 #[test]
2539 fn resolve_capture_container_id_uses_live_pid_mapping() {
2540 let capture_entity_resolver = CaptureTestEntityResolver::with_pid_mapping(
2541 42,
2542 EntityId::from_local_data("ci-pid-container").expect("container entity"),
2543 );
2544
2545 assert_eq!(
2546 resolve_capture_container_id(Some(&capture_entity_resolver), Some(42)),
2547 Some("container_id://pid-container".to_string())
2548 );
2549 }
2550
2551 #[test]
2552 fn build_capture_record_ignores_payload_local_data() {
2553 let record = super::build_capture_record(None, None, b"test.metric:1|c|c:ci-local-container\n");
2554
2555 assert_eq!(record.container_id, None);
2556 assert!(record.ancillary.is_empty());
2557 }
2558
2559 #[test]
2560 fn stream_capture_state_preserves_last_pid_without_new_creds() {
2561 let mut stream_capture = super::StreamCaptureState::new();
2562
2563 stream_capture.update_peer_metadata(&ConnectionAddress::ProcessLike(ProcessIdentity::Credentials(
2564 ProcessCredentials {
2565 pid: 42,
2566 uid: 0,
2567 gid: 0,
2568 },
2569 )));
2570 stream_capture.update_peer_metadata(&ConnectionAddress::ProcessLike(ProcessIdentity::Unavailable));
2571
2572 assert_eq!(stream_capture.last_pid, Some(42));
2573 }
2574
2575 #[test]
2576 fn apply_credentials_uses_live_pid_for_normal_packet() {
2577 let mut origin = RawOrigin::default();
2578 let creds = ProcessCredentials {
2579 pid: 12345,
2580 uid: 1000,
2581 gid: 1000,
2582 };
2583 super::apply_credentials_to_origin(&mut origin, &creds);
2584
2585 assert_eq!(origin.process_id(), Some(12345));
2586 }
2587
2588 #[test]
2589 fn apply_credentials_unpacks_captured_pid_when_replay_gid_present() {
2590 let mut origin = RawOrigin::default();
2591 let captured_pid: u32 = 99887766;
2592 let creds = ProcessCredentials {
2593 pid: 12345, uid: captured_pid, gid: super::REPLAY_CREDENTIALS_GID,
2596 };
2597 super::apply_credentials_to_origin(&mut origin, &creds);
2598
2599 assert_eq!(
2600 origin.process_id(),
2601 Some(super::origin::mark_replay_process_id(captured_pid))
2602 );
2603 }
2604}
2605
2606#[cfg(test)]
2607mod config_smoke {
2608 use serde_json::json;
2609
2610 use super::DogStatsDConfiguration;
2611 use crate::config_registry::structs;
2612 use crate::config_registry::test_support::run_config_smoke_tests;
2613
2614 #[tokio::test]
2615 async fn smoke_test() {
2616 run_config_smoke_tests(structs::DOGSTATSD_CONFIGURATION, &[], json!({}), |cfg| {
2617 cfg.as_typed::<DogStatsDConfiguration>()
2618 .expect("DogStatsDConfiguration should deserialize")
2619 })
2620 .await
2621 }
2622}