Skip to main content

saluki_components/sources/dogstatsd/
mod.rs

1//! DogStatsD source.
2//!
3//! # Missing
4//!
5//! - Create a health handle for each listener.
6//! - Handle UDS stream framing without treating EOF the same way as UDP and UDS datagram framing.
7//! - Track dispatch failures without depending on whether all events were already iterated.
8
9use std::{
10    collections::VecDeque,
11    num::NonZeroUsize,
12    path::PathBuf,
13    sync::{Arc, LazyLock},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15};
16
17use async_trait::async_trait;
18use bytes::{Buf, BufMut};
19use bytesize::ByteSize;
20use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
21use saluki_common::task::spawn_traced_named;
22use saluki_config::{deserialize_space_separated_or_seq, GenericConfiguration};
23use saluki_context::{
24    origin::RawOrigin,
25    tags::{RawTags, RawTagsFilter},
26    TagsResolver,
27};
28use saluki_core::data_model::event::{
29    eventd::EventD,
30    metric::{Metric, MetricMetadata, MetricOrigin},
31    service_check::ServiceCheck,
32    Event, EventType,
33};
34use saluki_core::{
35    components::{sources::*, ComponentContext},
36    pooling::FixedSizeObjectPool,
37    topology::{
38        interconnect::EventBufferManager,
39        shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
40        EventsBuffer, OutputDefinition,
41    },
42};
43use saluki_env::{workload::CaptureEntityResolver, WorkloadProvider};
44use saluki_error::{generic_error, ErrorContext as _, GenericError};
45use saluki_io::{
46    buf::{BytesBuffer, ClearableIoBuffer as _, FixedSizeVec},
47    deser::{
48        codec::dogstatsd::*,
49        framing::{Framer as _, FramingError, LengthDelimitedFramer},
50    },
51    net::{
52        listener::{Listener, ListenerError},
53        ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity, Stream,
54    },
55};
56use serde::{Deserialize, Deserializer};
57use serde_with::{serde_as, NoneAsEmptyString};
58use snafu::{ResultExt as _, Snafu};
59use stringtheory::MetaString;
60use tokio::{
61    select,
62    time::{interval, MissedTickBehavior},
63};
64use tracing::{debug, error, info, trace, warn};
65
66mod forwarder;
67use self::forwarder::{PacketForwarder, PacketForwarderTarget};
68
69mod framer;
70use self::framer::{get_framer, DsdFramer};
71use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
72
73mod filters;
74use self::filters::EnablePayloadsFilter;
75
76mod io_buffer;
77use self::io_buffer::IoBufferManager;
78
79mod metrics;
80use self::metrics::{build_metrics, Metrics};
81
82mod replay;
83use self::replay::{CaptureRecord, CapturedTaggerHandle, TrafficCapture};
84pub use self::replay::{
85    DogStatsDCaptureAPIHandler, DogStatsDCaptureControl, DogStatsDReplayAPIHandler, DogStatsDReplayControl,
86    ReplaySession, TimestampResolution, TrafficCaptureReader, DEFAULT_REPLAY_LOOPS, REPLAY_CREDENTIALS_GID,
87};
88
89mod origin;
90use self::origin::{
91    mark_replay_process_id, origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet,
92    DogStatsDOriginTagResolver, OriginEnrichmentConfiguration,
93};
94
95mod resolver;
96use self::resolver::ContextResolvers;
97
98mod tags;
99
100#[derive(Debug, Snafu)]
101#[snafu(context(suffix(false)))]
102enum Error {
103    #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
104    FailedToCreateListener {
105        listener_type: &'static str,
106        source: ListenerError,
107    },
108
109    #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
110    NoListenersConfigured,
111
112    #[snafu(display("Could not resolve bind_host '{}': {}", host, source))]
113    UnresolvableBindHost { host: String, source: std::io::Error },
114
115    #[snafu(display("bind_host '{}' resolved to zero IP addresses.", host))]
116    BindHostHasNoAddresses { host: String },
117}
118
119/// Baseline byte cost per interner entry, used to convert the Core Agent's entry-count-based
120/// `dogstatsd_string_interner_size` to a byte size.
121///
122/// 4096 entries × 512 bytes = 2 MiB, matching ADP's previous default.
123const INTERNER_BASELINE_BYTES_PER_ENTRY: u64 = 512;
124
125const fn default_buffer_size() -> usize {
126    8192
127}
128
129const fn default_buffer_count() -> usize {
130    128
131}
132
133const fn default_port() -> u16 {
134    8125
135}
136
137const fn default_tcp_port() -> u16 {
138    0
139}
140
141const fn default_statsd_forward_port() -> u16 {
142    0
143}
144
145const fn default_socket_receive_buffer_size() -> usize {
146    0
147}
148
149const fn default_allow_context_heap_allocations() -> bool {
150    true
151}
152
153const fn default_no_aggregation_pipeline_support() -> bool {
154    true
155}
156
157const fn default_context_string_interner_entry_count() -> u64 {
158    4096
159}
160
161const fn default_cached_contexts_limit() -> usize {
162    500_000
163}
164
165const fn default_cached_tagsets_limit() -> usize {
166    500_000
167}
168
169const fn default_context_expiry_seconds() -> u64 {
170    20
171}
172
173const fn default_dogstatsd_permissive_decoding() -> bool {
174    true
175}
176
177const fn default_dogstatsd_minimum_sample_rate() -> f64 {
178    0.000000003845
179}
180
181const fn default_true() -> bool {
182    true
183}
184
185/// Controls which payload types are forwarded to the backend.
186#[derive(Deserialize)]
187#[cfg_attr(test, derive(PartialEq, serde::Serialize))]
188pub struct EnablePayloadsConfiguration {
189    /// Whether or not to enable sending series (counter/gauge/rate) payloads.
190    ///
191    /// Defaults to `true`.
192    #[serde(default = "default_true")]
193    pub series: bool,
194
195    /// Whether or not to enable sending sketch (distribution) payloads.
196    ///
197    /// Defaults to `true`.
198    #[serde(default = "default_true")]
199    pub sketches: bool,
200
201    /// Whether or not to enable sending event payloads.
202    ///
203    /// Defaults to `true`.
204    #[serde(default = "default_true")]
205    pub events: bool,
206
207    /// Whether or not to enable sending service check payloads.
208    ///
209    /// Defaults to `true`.
210    #[serde(default = "default_true")]
211    pub service_checks: bool,
212}
213
214impl Default for EnablePayloadsConfiguration {
215    fn default() -> Self {
216        Self {
217            series: true,
218            sketches: true,
219            events: true,
220            service_checks: true,
221        }
222    }
223}
224
225const MIN_CAPTURE_DEPTH: usize = 1024;
226
227const fn default_capture_depth() -> usize {
228    MIN_CAPTURE_DEPTH
229}
230
231const DOGSTATSD_CAPTURE_DIR: &str = "dsd_capture";
232
233fn deserialize_empty_metastring_as_none<'de, D>(deserializer: D) -> Result<Option<MetaString>, D::Error>
234where
235    D: Deserializer<'de>,
236{
237    let value = Option::<MetaString>::deserialize(deserializer)?;
238    Ok(value.filter(|host| !host.is_empty()))
239}
240
241/// DogStatsD source.
242///
243/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
244#[serde_as]
245#[derive(Deserialize, Default)]
246#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
247#[cfg_attr(test, derive_where(PartialEq))]
248pub struct DogStatsDConfiguration {
249    /// The size of the buffer used to receive messages into, in bytes.
250    ///
251    /// Payloads can't exceed this size, or they will be truncated, leading to discarded messages.
252    ///
253    /// Defaults to 8192 bytes.
254    #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
255    buffer_size: usize,
256
257    /// The number of message buffers to allocate overall.
258    ///
259    /// This represents the maximum number of message buffers available for processing incoming metrics, which loosely
260    /// correlates with how many messages can be received per second. The default value should be suitable for the
261    /// majority of workloads, but high-throughput workloads may consider increasing this value.
262    ///
263    /// Defaults to 128.
264    #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
265    buffer_count: usize,
266
267    /// The port to listen on in UDP mode.
268    ///
269    /// If set to `0`, UDP isn't used.
270    ///
271    /// Defaults to 8125.
272    #[serde(rename = "dogstatsd_port", default = "default_port")]
273    port: u16,
274
275    /// The size of the DogStatsD UDP/UDS socket receive buffer, in bytes.
276    ///
277    /// If set to `0`, the OS default is used.
278    ///
279    /// Defaults to 0.
280    #[serde(rename = "dogstatsd_so_rcvbuf", default = "default_socket_receive_buffer_size")]
281    socket_receive_buffer_size: usize,
282
283    /// The port to listen on in TCP mode.
284    ///
285    /// If set to `0`, TCP isn't used.
286    ///
287    /// Defaults to 0.
288    #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
289    tcp_port: u16,
290
291    /// The host to forward framed DogStatsD messages to over UDP.
292    ///
293    /// Forwarding is enabled only when this value is non-empty and `statsd_forward_port` is non-zero. Setup failures
294    /// are logged, and send failures are tracked through telemetry.
295    ///
296    /// Defaults to unset.
297    #[serde(
298        rename = "statsd_forward_host",
299        default,
300        deserialize_with = "deserialize_empty_metastring_as_none"
301    )]
302    statsd_forward_host: Option<MetaString>,
303
304    /// The port to forward framed DogStatsD messages to over UDP.
305    ///
306    /// Forwarding is enabled only when this value is non-zero and `statsd_forward_host` is non-empty.
307    ///
308    /// Defaults to 0.
309    #[serde(rename = "statsd_forward_port", default = "default_statsd_forward_port")]
310    statsd_forward_port: u16,
311
312    /// The Unix domain socket path to listen on, in datagram mode.
313    ///
314    /// If not set, UDS (in datagram mode) isn't used.
315    ///
316    /// Defaults to unset.
317    #[serde(rename = "dogstatsd_socket", default)]
318    #[serde_as(as = "NoneAsEmptyString")]
319    socket_path: Option<String>,
320
321    /// The Unix domain socket path to listen on, in stream mode.
322    ///
323    /// If not set, UDS (in stream mode) isn't used.
324    ///
325    /// Defaults to unset.
326    #[serde(rename = "dogstatsd_stream_socket", default)]
327    #[serde_as(as = "NoneAsEmptyString")]
328    socket_stream_path: Option<String>,
329
330    /// Controls whether ADP logs oversized DogStatsD stream frames.
331    ///
332    /// When set to `true`, ADP emits a warning when a UDS stream frame exceeds the
333    /// configured DogStatsD buffer size. The frame is still rejected either way.
334    ///
335    /// Enable this when diagnosing clients that send oversized UDS stream frames.
336    ///
337    /// Defaults to `false`.
338    #[serde(rename = "dogstatsd_stream_log_too_big", default)]
339    stream_log_too_big: bool,
340
341    /// Listener types that require DogStatsD messages to be newline-terminated.
342    ///
343    /// Valid values are `udp`, `uds`, and `named_pipe`. ADP accepts `named_pipe` for compatibility, but it has no effect
344    /// until named pipe listeners are supported. Invalid values are ignored.
345    ///
346    /// Enable this when DogStatsD clients must reject packets or stream frames that don't end with a newline.
347    ///
348    /// Defaults to unset, which accepts the final message without a newline.
349    #[serde(
350        rename = "dogstatsd_eol_required",
351        default,
352        deserialize_with = "deserialize_space_separated_or_seq"
353    )]
354    eol_required: Vec<String>,
355
356    /// The host address to bind DogStatsD UDP and TCP listeners to.
357    ///
358    /// When set, UDP and TCP listeners bind to this address. Accepts either an IP literal (for example,
359    /// `192.168.1.50`, `::1`) or a hostname that resolves via DNS (for example, `agent.internal`).
360    /// Ignored when `dogstatsd_non_local_traffic` is `true`.
361    ///
362    /// Defaults to unset, which binds to `127.0.0.1`.
363    #[serde(rename = "bind_host", default)]
364    #[serde_as(as = "NoneAsEmptyString")]
365    bind_host: Option<String>,
366
367    /// Whether or not to listen for non-local traffic in UDP mode.
368    ///
369    /// If set to `true`, the listener will accept packets from any interface/address. Otherwise, the source will only
370    /// listen on the address specified by `bind_host`, or `127.0.0.1` if `bind_host` isn't set.
371    ///
372    /// Defaults to `false`.
373    #[serde(rename = "dogstatsd_non_local_traffic", default)]
374    non_local_traffic: bool,
375
376    /// Whether to autoscale UDP stream handlers using `SO_REUSEPORT`.
377    ///
378    /// When enabled on Linux, the DogStatsD source binds multiple UDP sockets to the configured port with
379    /// `SO_REUSEPORT`, allowing the kernel to load-balance incoming datagrams across independent stream handler
380    /// tasks. The number of sockets scales with available vCPUs: one stream handler base, plus one additional
381    /// per 8 vCPUs, capped at 4 total.
382    ///
383    /// Has no effect on non-Linux platforms because `SO_REUSEPORT` doesn't provide kernel-level load balancing
384    /// there; a warning is logged at startup if enabled outside of Linux.
385    ///
386    /// Enable this on multi-vCPU Linux deployments where UDP DogStatsD throughput is bottlenecked on a single
387    /// receive task.
388    ///
389    /// Defaults to `false`.
390    #[serde(rename = "dogstatsd_autoscale_udp_listeners", default)]
391    autoscale_udp_listeners: bool,
392
393    /// Whether or not to allow heap allocations when resolving contexts.
394    ///
395    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
396    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
397    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
398    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags can't be
399    /// interned, the metric is skipped.
400    ///
401    /// Defaults to `true`.
402    #[serde(
403        rename = "dogstatsd_allow_context_heap_allocs",
404        default = "default_allow_context_heap_allocations"
405    )]
406    allow_context_heap_allocations: bool,
407
408    /// Whether or not to enable support for no-aggregation pipelines.
409    ///
410    /// When enabled, this influences how metrics are parsed, specifically around user-provided metric timestamps. When
411    /// metric timestamps are present, it's used as a signal to any aggregation transforms that the metric shouldn't
412    /// be aggregated.
413    ///
414    /// Defaults to `true`.
415    #[serde(
416        rename = "dogstatsd_no_aggregation_pipeline",
417        default = "default_no_aggregation_pipeline_support"
418    )]
419    no_aggregation_pipeline_support: bool,
420
421    /// Number of entries for the string interner, as interpreted by the Core Datadog Agent.
422    ///
423    /// When `dogstatsd_string_interner_size_bytes` isn't set, this value is multiplied by 512 bytes per entry to
424    /// derive the interner byte size. This provides backwards compatibility for customers migrating configurations
425    /// from the Core Agent, where this setting represents an entry count rather than a byte size.
426    ///
427    /// Defaults to 4096 entries, which yields 2 MiB when converted.
428    #[serde(
429        rename = "dogstatsd_string_interner_size",
430        default = "default_context_string_interner_entry_count"
431    )]
432    context_string_interner_entry_count: u64,
433
434    /// Total size of the string interner used for contexts, in bytes.
435    ///
436    /// When set, this takes priority over `dogstatsd_string_interner_size`. This controls the amount of memory that
437    /// can be used to intern metric names and tags. If the interner is full, metrics with contexts that haven't
438    /// already been resolved may or may not be dropped, depending on the value of `allow_context_heap_allocations`.
439    #[serde(rename = "dogstatsd_string_interner_size_bytes", default)]
440    context_string_interner_size_bytes: Option<ByteSize>,
441
442    /// The maximum number of cached contexts to allow.
443    ///
444    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit doesn't affect
445    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
446    /// and whether or not heap allocations are allowed.
447    ///
448    /// Defaults to 500,000.
449    #[serde(
450        rename = "dogstatsd_cached_contexts_limit",
451        default = "default_cached_contexts_limit"
452    )]
453    cached_contexts_limit: usize,
454
455    /// The maximum number of cached tagsets to allow.
456    ///
457    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit doesn't affect
458    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
459    /// and whether or not heap allocations are allowed.
460    ///
461    /// Defaults to 500,000.
462    #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
463    cached_tagsets_limit: usize,
464
465    /// The number of seconds after which cached contexts will expire.
466    ///
467    /// Higher values allow for more effective caching for sparse metrics at the cost of increased memory usage.
468    ///
469    /// Defaults to 20 seconds.
470    #[serde(
471        rename = "dogstatsd_context_expiry_seconds",
472        default = "default_context_expiry_seconds"
473    )]
474    context_expiry_seconds: u64,
475
476    /// Whether or not to enable permissive mode in the decoder.
477    ///
478    /// Permissive mode allows the decoder to relax its strictness around the allowed payloads, which lets it match the
479    /// decoding behavior of the Datadog Agent.
480    ///
481    /// Defaults to `true`.
482    #[serde(
483        rename = "dogstatsd_permissive_decoding",
484        default = "default_dogstatsd_permissive_decoding"
485    )]
486    permissive_decoding: bool,
487
488    /// The minimum sample rate allowed for metrics.
489    ///
490    /// When metrics are sent with a sample rate _lower_ than this value then it will be clamped to this value. This is
491    /// done in order to ensure an upper bound on how many equivalent samples are tracked for the metric, as high sample
492    /// rates (very small numbers, such as `0.00000001`) can lead to large memory growth.
493    ///
494    /// A warning log will be emitted when clamping occurs, as this represents an effective loss of metric samples.
495    ///
496    /// Defaults to `0.000000003845`. (~260M samples)
497    #[serde(
498        rename = "dogstatsd_minimum_sample_rate",
499        default = "default_dogstatsd_minimum_sample_rate"
500    )]
501    minimum_sample_rate: f64,
502
503    /// Which payload types to forward to the backend.
504    #[serde(rename = "enable_payloads", default)]
505    enable_payloads: EnablePayloadsConfiguration,
506
507    /// Configuration related to origin detection and enrichment.
508    #[serde(flatten, default)]
509    origin_enrichment: OriginEnrichmentConfiguration,
510
511    /// Workload provider to utilize for origin detection/enrichment.
512    #[serde(skip)]
513    #[cfg_attr(test, derive_where(skip))]
514    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
515
516    /// Resolver to use for mapping live sender PIDs to container entities during traffic capture.
517    #[serde(skip, default)]
518    #[cfg_attr(test, derive_where(skip))]
519    capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
520
521    /// Additional tags to add to all metrics.
522    #[serde(rename = "dogstatsd_tags", default)]
523    additional_tags: Vec<String>,
524
525    /// The directory where DogStatsD capture files are written by default.
526    ///
527    /// When set to an empty path, the source attempts to derive the directory from `run_path` by appending
528    /// `dsd_capture`. If neither value is available, callers must provide an explicit capture path when starting a
529    /// capture session.
530    ///
531    /// Defaults to empty.
532    #[serde(rename = "dogstatsd_capture_path", default)]
533    capture_path: PathBuf,
534
535    /// The maximum number of captured packets that can be queued for persistence.
536    ///
537    /// This controls the depth of the in-process capture queue. Values below `1024` are raised to `1024` before the
538    /// capture writer starts, preventing a zero-depth rendezvous channel from serializing DogStatsD stream handlers
539    /// behind capture persistence.
540    ///
541    /// Defaults to `1024`.
542    #[serde(rename = "dogstatsd_capture_depth", default = "default_capture_depth")]
543    capture_depth: usize,
544
545    #[serde(skip, default)]
546    #[cfg_attr(test, derive_where(skip))]
547    capture_control: DogStatsDCaptureControl,
548
549    #[serde(skip, default)]
550    #[cfg_attr(test, derive_where(skip))]
551    replay_control: DogStatsDReplayControl,
552
553    /// Provider kind tag appended to all metrics as `provider_kind:<value>`.
554    ///
555    /// Set via `DD_PROVIDER_KIND` by the Helm chart on GKE Autopilot (`gke-autopilot`) and GKE on
556    /// Google Distributed Cloud (`gke-gdc`). When empty or absent, no tag is added.
557    ///
558    /// Defaults to `""` (disabled).
559    #[serde(default)]
560    provider_kind: String,
561}
562
563#[derive(Clone, Copy, Default)]
564struct EolRequired {
565    udp: bool,
566    uds: bool,
567}
568
569impl EolRequired {
570    fn from_config_values(values: &[String]) -> Self {
571        let mut eol_required = Self::default();
572
573        for value in values {
574            match value.as_str() {
575                "udp" => eol_required.udp = true,
576                "uds" => eol_required.uds = true,
577                "named_pipe" => {}
578                _ => warn!(
579                    value,
580                    "Invalid dogstatsd_eol_required value. Expected 'udp', 'uds', or 'named_pipe'."
581                ),
582            }
583        }
584
585        eol_required
586    }
587
588    fn for_listener(&self, listen_addr: &ListenAddress) -> bool {
589        match listen_addr {
590            ListenAddress::Udp(_) => self.udp,
591            ListenAddress::Tcp(_) => false,
592            #[cfg(unix)]
593            ListenAddress::Unixgram(_) | ListenAddress::Unix(_) => self.uds,
594        }
595    }
596}
597
598/// Resolves a `bind_host` string to an `IpAddr`.
599///
600/// Accepts either an IP literal (no DNS required) or a hostname (resolved via async DNS). Returns
601/// `UnresolvableBindHost` if the lookup fails, or `BindHostHasNoAddresses` if it succeeds but
602/// returns no addresses.
603async fn resolve_bind_host(host: &str) -> Result<std::net::IpAddr, Error> {
604    let mut addrs = tokio::net::lookup_host((host, 0u16))
605        .await
606        .context(UnresolvableBindHost { host: host.to_string() })?;
607    addrs
608        .next()
609        .map(|sa| sa.ip())
610        .ok_or_else(|| Error::BindHostHasNoAddresses { host: host.to_string() })
611}
612
613impl DogStatsDConfiguration {
614    /// Creates a new `DogStatsDConfiguration` from the given configuration.
615    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
616        let mut dogstatsd_config: Self = config.as_typed()?;
617        dogstatsd_config.fix_empty_capture_path(config);
618        dogstatsd_config.fix_capture_depth();
619        Ok(dogstatsd_config)
620    }
621
622    /// Gets both the `additional_tags` and any others specified by other configuration fields, such as `provider_kind`.
623    fn additional_tags(&self) -> Vec<String> {
624        if self.provider_kind.is_empty() {
625            return self.additional_tags.clone();
626        }
627
628        let mut tags = self.additional_tags.clone();
629        tags.push(format!("provider_kind:{}", self.provider_kind.clone()));
630        tags
631    }
632
633    fn fix_capture_depth(&mut self) {
634        self.capture_depth = self.capture_depth.max(MIN_CAPTURE_DEPTH);
635    }
636
637    /// Returns the effective string interner size in bytes.
638    ///
639    /// If `dogstatsd_string_interner_size_bytes` is set, it's used directly. Otherwise,
640    /// `dogstatsd_string_interner_size` (an entry count) is multiplied by 512 bytes per entry to derive the byte
641    /// size.
642    fn effective_context_string_interner_bytes(&self) -> ByteSize {
643        match self.context_string_interner_size_bytes {
644            Some(explicit_bytes) => explicit_bytes,
645            None => ByteSize::b(self.context_string_interner_entry_count * INTERNER_BASELINE_BYTES_PER_ENTRY),
646        }
647    }
648
649    fn eol_required(&self) -> EolRequired {
650        EolRequired::from_config_values(&self.eol_required)
651    }
652
653    fn statsd_forward_target(&self) -> Option<(&MetaString, u16)> {
654        let host = self.statsd_forward_host.as_ref()?;
655        if self.statsd_forward_port == 0 {
656            return None;
657        }
658
659        Some((host, self.statsd_forward_port))
660    }
661
662    fn packet_forwarder_target(&self) -> Option<PacketForwarderTarget> {
663        let (host, port) = self.statsd_forward_target()?;
664        Some(PacketForwarderTarget::new(host.clone(), port))
665    }
666
667    /// Returns the number of UDP stream handlers to spawn, derived from `dogstatsd_autoscale_udp_listeners` and
668    /// the number of available vCPUs.
669    ///
670    /// Returns `None` when autoscaling is disabled, which keeps the legacy single-socket behavior. The platform
671    /// gate for `SO_REUSEPORT` lives inside the listener—this method intentionally stays platform-agnostic.
672    fn udp_streams_to_yield(&self) -> Option<NonZeroUsize> {
673        if !self.autoscale_udp_listeners {
674            return None;
675        }
676
677        #[cfg(not(target_os = "linux"))]
678        if self.autoscale_udp_listeners {
679            warn!("UDP stream handler autoscaling not supported on non-Linux platforms. Default to single stream handler.");
680            return None;
681        }
682
683        let vcpus = std::thread::available_parallelism().map(NonZeroUsize::get).unwrap_or(1);
684        let streams = (1 + vcpus / 8).min(4);
685        NonZeroUsize::new(streams)
686    }
687
688    /// Sets the workload provider to use for configuring origin detection/enrichment.
689    ///
690    /// A workload provider must be set otherwise origin detection/enrichment won't be enabled.
691    ///
692    /// Defaults to unset.
693    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
694    where
695        W: WorkloadProvider + Send + Sync + 'static,
696    {
697        self.workload_provider = Some(Arc::new(workload_provider));
698        self
699    }
700
701    /// Sets the resolver to use for mapping live sender PIDs while capturing DogStatsD traffic.
702    ///
703    /// This resolver is intentionally configured separately from the workload provider because capture only needs a
704    /// narrow live-PID lookup, while normal origin enrichment uses the broader workload provider contract.
705    ///
706    /// Defaults to unset.
707    pub fn with_capture_entity_resolver<R>(mut self, capture_entity_resolver: R) -> Self
708    where
709        R: CaptureEntityResolver + Send + Sync + 'static,
710    {
711        self.capture_entity_resolver = Some(Arc::new(capture_entity_resolver));
712        self
713    }
714
715    /// Returns the shared control handle for DogStatsD traffic capture.
716    pub fn capture_control(&self) -> DogStatsDCaptureControl {
717        self.capture_control.clone()
718    }
719
720    /// Returns an HTTP API handler exposing the DogStatsD capture control surface.
721    pub fn capture_api_handler(&self) -> DogStatsDCaptureAPIHandler {
722        DogStatsDCaptureAPIHandler::new(self.capture_control.clone())
723    }
724
725    /// Returns the shared control handle for DogStatsD traffic replay.
726    pub fn replay_control(&self) -> DogStatsDReplayControl {
727        self.replay_control.clone()
728    }
729
730    /// Returns an HTTP API handler exposing the DogStatsD replay control surface.
731    pub fn replay_api_handler(&self) -> DogStatsDReplayAPIHandler {
732        DogStatsDReplayAPIHandler::new(self.replay_control.clone())
733    }
734
735    fn fix_empty_capture_path(&mut self, config: &GenericConfiguration) {
736        if self.capture_path.parent().is_some() {
737            return;
738        }
739
740        let capture_path = match config.try_get_typed::<PathBuf>("run_path") {
741            Ok(Some(mut run_path)) => {
742                run_path.push(DOGSTATSD_CAPTURE_DIR);
743                run_path
744            }
745            Ok(None) => {
746                debug!(
747                    "`dogstatsd_capture_path` and `run_path` were empty. Default DogStatsD capture path is unavailable."
748                );
749                return;
750            }
751            Err(e) => {
752                debug!(
753                    error = %e,
754                    "Failed to read `run_path` from configuration. Default DogStatsD capture path is unavailable."
755                );
756                return;
757            }
758        };
759
760        self.capture_path = capture_path;
761    }
762
763    /// Using the current configuration, determines which listeners should be created and adds an address for each into
764    /// a `Vec<ListenAddress>`. This function has no side effects so that it can be unit tested whereas build_listeners`
765    /// actually binds the listeners on the system.
766    ///
767    /// `bind_host` is the pre-resolved IP that UDP and TCP listeners should bind to (provided by
768    /// `resolve_bind_host`). Precedence matches the Agent:
769    ///   - `non_local_traffic=true` → `0.0.0.0` (`bind_host` ignored)
770    ///   - `bind_host=Some(ip)`     → `ip`
771    ///   - `bind_host=None`         → `127.0.0.1`
772    fn build_addresses(&self, bind_host: Option<std::net::IpAddr>) -> Vec<ListenAddress> {
773        let bind_ip: std::net::IpAddr = if self.non_local_traffic {
774            [0, 0, 0, 0].into()
775        } else {
776            bind_host.unwrap_or_else(|| [127, 0, 0, 1].into())
777        };
778
779        let mut addresses: Vec<ListenAddress> = Vec::new();
780
781        if self.port != 0 {
782            addresses.push(ListenAddress::Udp(std::net::SocketAddr::new(bind_ip, self.port)));
783        }
784
785        if self.tcp_port != 0 {
786            addresses.push(ListenAddress::Tcp(std::net::SocketAddr::new(bind_ip, self.tcp_port)));
787        }
788
789        if let Some(socket_path) = &self.socket_path {
790            addresses.push(ListenAddress::Unixgram(socket_path.into()));
791        }
792
793        if let Some(socket_stream_path) = &self.socket_stream_path {
794            addresses.push(ListenAddress::Unix(socket_stream_path.into()));
795        }
796
797        addresses
798    }
799
800    /// Builds the appropriate `Listener` objects.
801    async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
802        // Resolve `bind_host` to an IP (via DNS if needed). Skip the lookup when
803        // `non_local_traffic=true` since `bind_host` is ignored in that branch—matches Go's
804        // laziness and avoids failing startup on an unresolvable hostname that wouldn't be used.
805        let bind_host: Option<std::net::IpAddr> = if self.non_local_traffic {
806            None
807        } else {
808            match &self.bind_host {
809                Some(host) => Some(resolve_bind_host(host).await?),
810                None => None,
811            }
812        };
813
814        let addresses = self.build_addresses(bind_host);
815        let mut listeners = Vec::new();
816        let socket_receive_buffer_size =
817            (self.socket_receive_buffer_size != 0).then_some(self.socket_receive_buffer_size);
818        let udp_streams_to_yield = self.udp_streams_to_yield();
819        for address in addresses {
820            let listener_type = address.listener_type();
821            let listener_streams = matches!(address, ListenAddress::Udp(_))
822                .then_some(udp_streams_to_yield)
823                .flatten();
824            let listener = Listener::from_listen_address(address, listener_streams)
825                .await
826                .context(FailedToCreateListener { listener_type })?
827                .with_receive_buffer_size(socket_receive_buffer_size);
828
829            listeners.push(listener);
830        }
831        Ok(listeners)
832    }
833}
834
835#[async_trait]
836impl SourceBuilder for DogStatsDConfiguration {
837    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
838        let listeners = self.build_listeners().await?;
839        if listeners.is_empty() {
840            return Err(Error::NoListenersConfigured.into());
841        }
842
843        // Every listener requires at least one I/O buffer to ensure that all listeners can be serviced without
844        // deadlocking any of the others. Connectionless listeners retain their buffer for the lifetime of the stream,
845        // so multi-socket UDP listeners require one buffer per yielded socket.
846        let min_buffers: usize = listeners.iter().map(Listener::min_buffer_reservation).sum();
847        if self.buffer_count < min_buffers {
848            return Err(generic_error!(
849                "Must have a minimum of {} I/O buffers to service all configured listeners (have {}).",
850                min_buffers,
851                self.buffer_count,
852            ));
853        }
854
855        let origin_detection_enabled = self.origin_enrichment.enabled();
856        // Single CapturedTaggerHandle is cloned to both the resolver (reader of the captured store) and the replay
857        // control surface (writer). Both sides reference the same atomic slot.
858        let captured_tagger = CapturedTaggerHandle::new();
859
860        let maybe_origin_tags_resolver = self.workload_provider.clone().map(|provider| {
861            DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider, captured_tagger.clone())
862        });
863        let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
864            .error_context("Failed to create context resolvers.")?;
865
866        let codec_config = DogStatsDCodecConfiguration::default()
867            .with_timestamps(self.no_aggregation_pipeline_support)
868            .with_permissive_mode(self.permissive_decoding)
869            .with_minimum_sample_rate(self.minimum_sample_rate)
870            .with_client_origin_detection(self.origin_enrichment.origin_detection_client);
871
872        let codec = DogStatsDCodec::from_configuration(codec_config);
873        let eol_required = self.eol_required();
874
875        let enable_payloads_filter = EnablePayloadsFilter::default()
876            .with_allow_series(self.enable_payloads.series)
877            .with_allow_sketches(self.enable_payloads.sketches)
878            .with_allow_events(self.enable_payloads.events)
879            .with_allow_service_checks(self.enable_payloads.service_checks);
880        let traffic_capture = TrafficCapture::with_workload_provider(
881            self.capture_path.clone(),
882            self.capture_depth.max(MIN_CAPTURE_DEPTH),
883            self.workload_provider.clone(),
884        );
885        self.capture_control.bind(traffic_capture.clone());
886        let packet_forwarder_target = self.packet_forwarder_target();
887
888        self.replay_control.bind(captured_tagger);
889
890        Ok(Box::new(DogStatsD {
891            listeners,
892            io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
893                FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
894            }),
895            codec,
896            context_resolvers,
897            enabled_filter: enable_payloads_filter,
898            origin_detection_enabled,
899            stream_log_too_big: self.stream_log_too_big,
900            eol_required,
901            additional_tags: self.additional_tags().into(),
902            capture_entity_resolver: self.capture_entity_resolver.clone(),
903            traffic_capture,
904            packet_forwarder_target,
905        }))
906    }
907
908    fn outputs(&self) -> &[OutputDefinition<EventType>] {
909        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
910            vec![
911                OutputDefinition::named_output("metrics", EventType::Metric),
912                OutputDefinition::named_output("events", EventType::EventD),
913                OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
914            ]
915        });
916        &OUTPUTS
917    }
918}
919
920impl MemoryBounds for DogStatsDConfiguration {
921    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
922        builder
923            .minimum()
924            // Capture the size of the heap allocation when the component is built.
925            .with_single_value::<DogStatsD>("source struct")
926            // We allocate our I/O buffers entirely up front.
927            .with_expr(UsageExpr::product(
928                "buffers",
929                UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
930                UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
931            ))
932            // We also allocate the backing storage for the string interner up front, which is used by our context
933            // resolver.
934            .with_expr(UsageExpr::config(
935                "dogstatsd_string_interner_size_bytes",
936                self.effective_context_string_interner_bytes().as_u64() as usize,
937            ));
938    }
939}
940
941/// DogStatsD source.
942pub struct DogStatsD {
943    listeners: Vec<Listener>,
944    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
945    codec: DogStatsDCodec,
946    context_resolvers: ContextResolvers,
947    enabled_filter: EnablePayloadsFilter,
948    origin_detection_enabled: bool,
949    stream_log_too_big: bool,
950    eol_required: EolRequired,
951    additional_tags: Arc<[String]>,
952    capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
953    traffic_capture: TrafficCapture,
954    packet_forwarder_target: Option<PacketForwarderTarget>,
955}
956
957struct ListenerContext {
958    shutdown_handle: DynamicShutdownHandle,
959    listener: Listener,
960    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
961    codec: DogStatsDCodec,
962    context_resolvers: ContextResolvers,
963    origin_detection_enabled: bool,
964    stream_log_too_big: bool,
965    eol_required: EolRequired,
966    additional_tags: Arc<[String]>,
967    capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
968    traffic_capture: TrafficCapture,
969    packet_forwarder_target: Option<PacketForwarderTarget>,
970}
971
972struct HandlerContext {
973    listen_addr: ListenAddress,
974    framer: DsdFramer,
975    codec: DogStatsDCodec,
976    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
977    metrics: Metrics,
978    context_resolvers: ContextResolvers,
979    origin_detection_enabled: bool,
980    stream_log_too_big: bool,
981    additional_tags: Arc<[String]>,
982    capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
983    traffic_capture: TrafficCapture,
984    packet_forwarder: Option<PacketForwarder>,
985}
986
987#[async_trait]
988impl Source for DogStatsD {
989    async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
990        let mut global_shutdown = context.take_shutdown_handle();
991        let mut health = context.take_health_handle();
992
993        let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
994
995        // For each listener, spawn a dedicated task to run it.
996        for listener in self.listeners {
997            let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
998
999            // TODO: Create a health handle for each listener.
1000            //
1001            // We need to rework `HealthRegistry` to look a little more like `ComponentRegistry` so that we can have it
1002            // already be scoped properly, otherwise all we can do here at present is either have a relative name, like
1003            // `uds-stream`, or try and hardcode the full component name, which we will inevitably forget to update if
1004            // we tweak the topology configuration, etc.
1005            let listener_context = ListenerContext {
1006                shutdown_handle: listener_shutdown_coordinator.register(),
1007                listener,
1008                io_buffer_pool: self.io_buffer_pool.clone(),
1009                codec: self.codec.clone(),
1010                context_resolvers: self.context_resolvers.clone(),
1011                origin_detection_enabled: self.origin_detection_enabled,
1012                stream_log_too_big: self.stream_log_too_big,
1013                eol_required: self.eol_required,
1014                additional_tags: self.additional_tags.clone(),
1015                capture_entity_resolver: self.capture_entity_resolver.clone(),
1016                traffic_capture: self.traffic_capture.clone(),
1017                packet_forwarder_target: self.packet_forwarder_target.clone(),
1018            };
1019
1020            spawn_traced_named(
1021                task_name,
1022                process_listener(context.clone(), listener_context, self.enabled_filter),
1023            );
1024        }
1025
1026        health.mark_ready();
1027        debug!("DogStatsD source started.");
1028
1029        // Wait for the global shutdown signal, then notify listeners to shutdown.
1030        //
1031        // We also handle liveness here, which doesn't really matter for _this_ task, since the real work is happening
1032        // in the listeners, but we need to satisfy the health checker.
1033        loop {
1034            select! {
1035                _ = &mut global_shutdown => {
1036                    debug!("Received shutdown signal.");
1037                    break
1038                },
1039                _ = health.live() => continue,
1040            }
1041        }
1042
1043        debug!("Stopping DogStatsD source...");
1044
1045        listener_shutdown_coordinator.shutdown().await;
1046
1047        debug!("DogStatsD source stopped.");
1048
1049        Ok(())
1050    }
1051}
1052
1053async fn process_listener(
1054    source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
1055) {
1056    let ListenerContext {
1057        shutdown_handle,
1058        mut listener,
1059        io_buffer_pool,
1060        codec,
1061        context_resolvers,
1062        origin_detection_enabled,
1063        stream_log_too_big,
1064        eol_required,
1065        additional_tags,
1066        capture_entity_resolver,
1067        traffic_capture,
1068        packet_forwarder_target,
1069    } = listener_context;
1070    tokio::pin!(shutdown_handle);
1071
1072    let listen_addr = listener.listen_address().clone();
1073    let metrics = build_metrics(&listen_addr, source_context.component_context());
1074    let packet_forwarder = packet_forwarder_target
1075        .as_ref()
1076        .map(|target| target.to_forwarder(metrics.clone()));
1077    if let Some(packet_forwarder) = &packet_forwarder {
1078        packet_forwarder.spawn_connect();
1079    }
1080
1081    let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
1082    let mut stream_idx: u32 = 0;
1083
1084    info!(%listen_addr, "DogStatsD listener started.");
1085
1086    loop {
1087        select! {
1088            _ = &mut shutdown_handle => {
1089                debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
1090                break;
1091            }
1092            result = listener.accept() => match result {
1093                Ok(stream) => {
1094                    debug!(%listen_addr, "Spawning new stream handler.");
1095
1096                    let handler_context = HandlerContext {
1097                        listen_addr: listen_addr.clone(),
1098                        framer: get_framer(&listen_addr, eol_required.for_listener(&listen_addr)),
1099                        codec: codec.clone(),
1100                        io_buffer_pool: io_buffer_pool.clone(),
1101                        metrics: metrics.clone(),
1102                        context_resolvers: context_resolvers.clone(),
1103                        origin_detection_enabled,
1104                        stream_log_too_big,
1105                        additional_tags: additional_tags.clone(),
1106                        capture_entity_resolver: capture_entity_resolver.clone(),
1107                        traffic_capture: traffic_capture.clone(),
1108                        packet_forwarder: packet_forwarder.clone(),
1109                    };
1110
1111                    let task_name = format!(
1112                        "dogstatsd-stream-handler-{}-{}",
1113                        listen_addr.listener_type(),
1114                        stream_idx,
1115                    );
1116                    stream_idx = stream_idx.wrapping_add(1);
1117                    spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
1118                }
1119                Err(e) => {
1120                    error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
1121                    break
1122                }
1123            }
1124        }
1125    }
1126
1127    stream_shutdown_coordinator.shutdown().await;
1128
1129    info!(%listen_addr, "DogStatsD listener stopped.");
1130}
1131
1132async fn process_stream(
1133    stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
1134    shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
1135) {
1136    tokio::pin!(shutdown_handle);
1137
1138    select! {
1139        _ = &mut shutdown_handle => {
1140            debug!("Stream handler received shutdown signal.");
1141        },
1142        _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
1143    }
1144}
1145
1146async fn drive_stream(
1147    mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
1148    enabled_filter: EnablePayloadsFilter,
1149) {
1150    let HandlerContext {
1151        listen_addr,
1152        mut framer,
1153        codec,
1154        io_buffer_pool,
1155        metrics,
1156        mut context_resolvers,
1157        origin_detection_enabled,
1158        stream_log_too_big,
1159        additional_tags,
1160        capture_entity_resolver,
1161        traffic_capture,
1162        packet_forwarder,
1163    } = handler_context;
1164
1165    debug!(%listen_addr, "Stream handler started.");
1166
1167    if !stream.is_connectionless() {
1168        metrics.connections_active().increment(1);
1169    }
1170
1171    let mut stream_capture = StreamCaptureState::new();
1172    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
1173    // we're otherwise idle and not receiving packets from the client.
1174    let mut buffer_flush = interval(Duration::from_millis(100));
1175    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
1176
1177    let mut event_buffer_manager = EventBufferManager::default();
1178    let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
1179    let memory_limiter = source_context.topology_context().memory_limiter();
1180
1181    'read: loop {
1182        let mut eof = false;
1183
1184        let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
1185
1186        memory_limiter.wait_for_capacity().await;
1187
1188        select! {
1189            // We read from the stream.
1190            read_result = stream.receive(&mut io_buffer) => match read_result {
1191                Ok((bytes_read, peer_addr)) => {
1192                    if bytes_read == 0 {
1193                        eof = true;
1194                    }
1195
1196                    let is_connectionless = stream.is_connectionless();
1197                    let payload = received_payload(io_buffer, bytes_read);
1198
1199                    capture_uds_traffic(
1200                        &listen_addr,
1201                        &traffic_capture,
1202                        capture_entity_resolver.as_deref(),
1203                        &peer_addr,
1204                        payload,
1205                        &mut stream_capture,
1206                    );
1207
1208                    if is_connectionless {
1209                        metrics.packet_receive_success().increment(1);
1210                    }
1211                    metrics.bytes_received().increment(bytes_read as u64);
1212                    metrics.bytes_received_size().record(bytes_read as f64);
1213                    let origin_detection_failed =
1214                        origin_detection_enabled && bytes_read > 0 && peer_addr.has_process_credential_error();
1215                    if origin_detection_failed && is_connectionless {
1216                        metrics.origin_detection_errors().increment(1);
1217                    }
1218
1219                    // When we're actually at EOF, or we're dealing with a connectionless stream, we try to decode in EOF mode.
1220                    //
1221                    // For connectionless streams, we always try to decode the buffer as if it's EOF, since it effectively _is_
1222                    // always the end of file after a receive. For connection-oriented streams, we only want to do this once we've
1223                    // actually hit true EOF.
1224                    let reached_eof = eof || is_connectionless;
1225
1226                    trace!(
1227                        buffer_len = io_buffer.remaining(),
1228                        buffer_cap = io_buffer.remaining_mut(),
1229                        eof = reached_eof,
1230                        %listen_addr,
1231                        %peer_addr,
1232                        "Received {} bytes from stream.",
1233                        bytes_read
1234                    );
1235
1236                    'frame: loop {
1237                        let frame_result = framer.next_frame(io_buffer, reached_eof);
1238                        let completed_outer_frames = framer.take_completed_outer_frames();
1239                        if !is_connectionless && completed_outer_frames > 0 {
1240                            metrics.packet_receive_success().increment(completed_outer_frames as u64);
1241                        }
1242                        if origin_detection_failed && completed_outer_frames > 0 {
1243                            metrics.origin_detection_errors().increment(completed_outer_frames as u64);
1244                        }
1245
1246                        match frame_result {
1247                            Ok(Some(frame)) => {
1248                                trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
1249                                if let Some(forwarder) = &packet_forwarder {
1250                                    forwarder.forward(frame.clone()).await;
1251                                }
1252                                match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
1253                                    Ok(Some(event)) => {
1254                                        if let Some(event_buffer) = event_buffer_manager.try_push(event) {
1255                                            debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
1256                                            dispatch_events(event_buffer, &source_context, &listen_addr).await;
1257                                        }
1258                                    },
1259                                    Ok(None) => {
1260                                        // We didn't decode an event, but there was no inherent error. This is likely
1261                                        // due to hitting resource limits, etc.
1262                                        //
1263                                        // Simply continue on.
1264                                        continue
1265                                    },
1266                                    Err(e) => {
1267                                        let frame_lossy_str = String::from_utf8_lossy(&frame);
1268                                        warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
1269                                    },
1270                                }
1271                            }
1272                            Err(e) => {
1273                                metrics.framing_errors().increment(1);
1274                                if should_warn_stream_log_too_big(&listen_addr, &e, stream_log_too_big) {
1275                                    warn!(
1276                                        %listen_addr,
1277                                        %peer_addr,
1278                                        error = %e,
1279                                        "DogStatsD stream frame exceeded the configured buffer size."
1280                                    );
1281                                }
1282
1283                                if stream.is_connectionless() {
1284                                    io_buffer.clear();
1285                                    // For connectionless streams, we don't want to shutdown the stream since we can just keep
1286                                    // reading more packets.
1287                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
1288                                    continue 'read;
1289                                } else {
1290                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
1291                                    break 'read;
1292                                }
1293                            }
1294                            Ok(None) => {
1295                                trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
1296                                if eof && !stream.is_connectionless() {
1297                                    debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
1298                                    break 'read;
1299                                } else {
1300                                    break 'frame;
1301                                }
1302                            }
1303                        }
1304                    }
1305                },
1306                Err(e) => {
1307                    metrics.packet_receive_failure().increment(1);
1308
1309                    if stream.is_connectionless() {
1310                        // For connectionless streams, we don't want to shutdown the stream since we can just keep
1311                        // reading more packets.
1312                        warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
1313                        continue 'read;
1314                    } else {
1315                        warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
1316                        break 'read;
1317                    }
1318                }
1319            },
1320
1321            _ = buffer_flush.tick() => {
1322                if let Some(event_buffer) = event_buffer_manager.consume() {
1323                    dispatch_events(event_buffer, &source_context, &listen_addr).await;
1324                }
1325            },
1326
1327        }
1328    }
1329
1330    if let Some(event_buffer) = event_buffer_manager.consume() {
1331        dispatch_events(event_buffer, &source_context, &listen_addr).await;
1332    }
1333
1334    metrics.connections_active().decrement(1);
1335
1336    debug!(%listen_addr, "Stream handler stopped.");
1337}
1338
1339fn should_warn_stream_log_too_big(listen_addr: &ListenAddress, error: &FramingError, stream_log_too_big: bool) -> bool {
1340    stream_log_too_big
1341        && matches!(listen_addr, ListenAddress::Unix(_))
1342        && matches!(error, FramingError::InvalidFrame { .. })
1343}
1344
1345fn capture_uds_traffic(
1346    listen_addr: &ListenAddress, traffic_capture: &TrafficCapture,
1347    capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, peer_addr: &ConnectionAddress,
1348    payload: &[u8], stream_capture: &mut StreamCaptureState,
1349) {
1350    if payload.is_empty() || !traffic_capture.is_ongoing() {
1351        return;
1352    }
1353
1354    match listen_addr {
1355        ListenAddress::Unixgram(_) => {
1356            let _ = traffic_capture.enqueue(build_capture_record(
1357                capture_entity_resolver,
1358                process_id_from_peer_addr(peer_addr),
1359                payload,
1360            ));
1361        }
1362        ListenAddress::Unix(_) => {
1363            stream_capture.update_peer_metadata(peer_addr);
1364            stream_capture.pending.extend(payload);
1365
1366            while let Ok(Some(outer_payload)) = stream_capture
1367                .outer_framer
1368                .next_frame(&mut stream_capture.pending, false)
1369            {
1370                let _ = traffic_capture.enqueue(build_capture_record(
1371                    capture_entity_resolver,
1372                    stream_capture.last_pid,
1373                    &outer_payload,
1374                ));
1375            }
1376        }
1377        _ => {}
1378    }
1379}
1380
1381struct StreamCaptureState {
1382    outer_framer: LengthDelimitedFramer,
1383    pending: VecDeque<u8>,
1384    last_pid: Option<i32>,
1385}
1386
1387impl StreamCaptureState {
1388    fn new() -> Self {
1389        Self {
1390            outer_framer: LengthDelimitedFramer,
1391            pending: VecDeque::new(),
1392            last_pid: None,
1393        }
1394    }
1395
1396    fn update_peer_metadata(&mut self, peer_addr: &ConnectionAddress) {
1397        if let Some(process_id) = process_id_from_peer_addr(peer_addr) {
1398            self.last_pid = Some(process_id);
1399        }
1400    }
1401}
1402
1403fn build_capture_record(
1404    capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, process_id: Option<i32>,
1405    payload: &[u8],
1406) -> CaptureRecord {
1407    CaptureRecord {
1408        timestamp_ns: capture_timestamp_ns(),
1409        payload: payload.to_vec(),
1410        pid: process_id,
1411        ancillary: Vec::new(),
1412        container_id: resolve_capture_container_id(capture_entity_resolver, process_id),
1413    }
1414}
1415
1416fn resolve_capture_container_id(
1417    capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, process_id: Option<i32>,
1418) -> Option<String> {
1419    let process_id = u32::try_from(process_id?).ok()?;
1420    capture_entity_resolver
1421        .and_then(|resolver| resolver.resolve_container_entity_for_live_pid(process_id))
1422        .map(|entity_id| entity_id.to_string())
1423}
1424
1425fn process_id_from_peer_addr(peer_addr: &ConnectionAddress) -> Option<i32> {
1426    match peer_addr {
1427        ConnectionAddress::ProcessLike(ProcessIdentity::Credentials(creds)) => Some(creds.pid),
1428        _ => None,
1429    }
1430}
1431
1432/// Applies SCM_CREDENTIALS to the origin, dispatching on the replay marker GID.
1433///
1434/// Live packets carry the sender process's real PID/UID/GID; we use `creds.pid` as the origin's process ID. Replay
1435/// packets carry `gid == REPLAY_CREDENTIALS_GID` and pack the captured (original) PID into `creds.uid`; we recover
1436/// that PID with an internal marker so downstream tag resolution consults the captured tagger store.
1437fn apply_credentials_to_origin(origin: &mut RawOrigin<'_>, creds: &ProcessCredentials) {
1438    if creds.gid == REPLAY_CREDENTIALS_GID {
1439        origin.set_process_id(mark_replay_process_id(creds.uid));
1440    } else {
1441        origin.set_process_id(creds.pid as u32);
1442    }
1443}
1444
1445fn received_payload(buffer: &BytesBuffer, bytes_read: usize) -> &[u8] {
1446    let chunk = buffer.chunk();
1447    let start = chunk.len().saturating_sub(bytes_read);
1448    &chunk[start..]
1449}
1450
1451fn capture_timestamp_ns() -> i64 {
1452    SystemTime::now()
1453        .duration_since(UNIX_EPOCH)
1454        .map(|duration| duration.as_nanos().min(i64::MAX as u128) as i64)
1455        .unwrap_or_default()
1456}
1457
1458fn handle_frame(
1459    frame: &[u8], codec: &DogStatsDCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
1460    peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
1461) -> Result<Option<Event>, ParseError> {
1462    let parsed = match codec.decode_packet(frame) {
1463        Ok(parsed) => parsed,
1464        Err(e) => {
1465            // Try and determine what the message type was, if possible, to increment the correct error counter.
1466            match parse_message_type(frame) {
1467                MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
1468                MessageType::Event => source_metrics.event_decode_failed().increment(1),
1469                MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
1470            }
1471
1472            return Err(e);
1473        }
1474    };
1475
1476    let event = match parsed {
1477        ParsedPacket::Metric(metric_packet) => {
1478            if metric_packet.num_points == 0 {
1479                return Ok(None);
1480            }
1481            let events_len = metric_packet.num_points;
1482            if !enabled_filter.allow_metric(&metric_packet) {
1483                trace!(
1484                    metric.name = metric_packet.metric_name,
1485                    "Skipping metric due to filter configuration."
1486                );
1487                return Ok(None);
1488            }
1489
1490            match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
1491                Some(metric) => {
1492                    source_metrics.metrics_received().increment(events_len);
1493                    Event::Metric(metric)
1494                }
1495                None => {
1496                    // We can only fail to get a metric back if we failed to resolve the context.
1497                    source_metrics.failed_context_resolve_total().increment(1);
1498                    return Ok(None);
1499                }
1500            }
1501        }
1502        ParsedPacket::Event(event) => {
1503            if !enabled_filter.allow_event(&event) {
1504                trace!("Skipping event {} due to filter configuration.", event.title);
1505                return Ok(None);
1506            }
1507            let tags_resolver = context_resolvers.tags();
1508            match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
1509                Some(event) => {
1510                    source_metrics.events_received().increment(1);
1511                    Event::EventD(event)
1512                }
1513                None => {
1514                    source_metrics.failed_context_resolve_total().increment(1);
1515                    return Ok(None);
1516                }
1517            }
1518        }
1519        ParsedPacket::ServiceCheck(service_check) => {
1520            if !enabled_filter.allow_service_check(&service_check) {
1521                trace!(
1522                    "Skipping service check {} due to filter configuration.",
1523                    service_check.name
1524                );
1525                return Ok(None);
1526            }
1527            let tags_resolver = context_resolvers.tags();
1528            match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
1529                Some(service_check) => {
1530                    source_metrics.service_checks_received().increment(1);
1531                    Event::ServiceCheck(service_check)
1532                }
1533                None => {
1534                    source_metrics.failed_context_resolve_total().increment(1);
1535                    return Ok(None);
1536                }
1537            }
1538        }
1539    };
1540
1541    Ok(Some(event))
1542}
1543
1544fn handle_metric_packet(
1545    packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1546    additional_tags: &[String],
1547) -> Option<Metric> {
1548    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1549
1550    let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1551    if let Some(creds) = peer_addr.process_credentials() {
1552        apply_credentials_to_origin(&mut origin, creds);
1553    }
1554
1555    // Choose the right context resolver based on whether or not this metric is pre-aggregated.
1556    let context_resolver = if packet.timestamp.is_some() {
1557        context_resolvers.no_agg()
1558    } else {
1559        context_resolvers.primary()
1560    };
1561
1562    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1563
1564    // Try to resolve the context for this metric.
1565    match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1566        Some(context) => {
1567            let metric_origin = well_known_tags
1568                .jmx_check_name
1569                .map(MetricOrigin::jmx_check)
1570                .unwrap_or_else(MetricOrigin::dogstatsd);
1571            let metadata = MetricMetadata::default()
1572                .with_origin(metric_origin)
1573                .with_hostname(well_known_tags.hostname.map(Arc::from))
1574                .with_unit(packet.unit.map_or_else(MetaString::empty, MetaString::from_static));
1575
1576            Some(Metric::from_parts(context, packet.values, metadata))
1577        }
1578        // We failed to resolve the context, likely due to not having enough interner capacity.
1579        None => None,
1580    }
1581}
1582
1583fn handle_event_packet(
1584    packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1585) -> Option<EventD> {
1586    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1587
1588    let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1589    if let Some(creds) = peer_addr.process_credentials() {
1590        apply_credentials_to_origin(&mut origin, creds);
1591    }
1592    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1593
1594    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1595    let tags = tags_resolver.create_tag_set(tags)?;
1596
1597    // When no d: field is present, backfill the current time—matching the stock Datadog Agent's
1598    // behavior in pkg/aggregator/aggregator.go (addEvent), which sets e.Ts = time.Now().Unix()
1599    // for any event with Ts == 0.
1600    let timestamp = packet
1601        .timestamp
1602        .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1603
1604    let eventd = EventD::new(packet.title, packet.text)
1605        .with_timestamp(timestamp)
1606        .with_hostname(packet.hostname.map(|s| s.into()))
1607        .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1608        .with_alert_type(packet.alert_type)
1609        .with_priority(packet.priority)
1610        // When no source type is provided, default to "api"—the same default the stock Datadog
1611        // Agent applies when serializing DogStatsD events to the intake JSON format. The agent
1612        // groups events by source type name and uses "api" as the key for events without an
1613        // explicit `s:` field. See: pkg/serializer/internal/metrics/events.go (writeItem).
1614        .with_source_type_name(Some(
1615            packet
1616                .source_type_name
1617                .map(|s| s.into())
1618                .unwrap_or_else(|| "api".into()),
1619        ))
1620        .with_alert_type(packet.alert_type)
1621        .with_tags(tags)
1622        .with_origin_tags(origin_tags);
1623
1624    Some(eventd)
1625}
1626
1627fn handle_service_check_packet(
1628    packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1629    additional_tags: &[String],
1630) -> Option<ServiceCheck> {
1631    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1632
1633    let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1634    if let Some(creds) = peer_addr.process_credentials() {
1635        apply_credentials_to_origin(&mut origin, creds);
1636    }
1637    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1638
1639    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1640    let tags = tags_resolver.create_tag_set(tags)?;
1641
1642    // When no d: field is present, backfill the current time—matching the stock Datadog Agent's
1643    // behavior, which sets the timestamp to time.Now().Unix() for any service check with a zero
1644    // timestamp.
1645    let timestamp = packet
1646        .timestamp
1647        .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1648
1649    let service_check = ServiceCheck::new(packet.name, packet.status)
1650        .with_timestamp(timestamp)
1651        .with_hostname(packet.hostname.map(|s| s.into()))
1652        .with_tags(tags)
1653        .with_origin_tags(origin_tags)
1654        .with_message(packet.message.map(|s| s.into()));
1655
1656    Some(service_check)
1657}
1658
1659fn get_filtered_tags_iterator<'a>(
1660    raw_tags: RawTags<'a>, additional_tags: &'a [String],
1661) -> impl Iterator<Item = &'a str> + Clone {
1662    // This filters out "well-known" tags from the raw tags in the DogStatsD packet, and then chains on any additional tags
1663    // that were configured on the source.
1664    RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1665}
1666
1667async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1668    debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1669
1670    // TODO: This is maybe a little dicey because if we fail to dispatch the events, we may not have iterated over all of
1671    // them, so there might still be eventd events when get to the service checks point, and eventd events and/or service
1672    // check events when we get to the metrics point, and so on.
1673    //
1674    // There's probably something to be said for erroring out fully if this happens, since we should only fail to
1675    // dispatch if the downstream component fails entirely... and unless we have a way to restart the component, then
1676    // we're going to continue to fail to dispatch any more events until the process is restarted anyways.
1677
1678    // Dispatch any eventd events, if present.
1679    if event_buffer.has_event_type(EventType::EventD) {
1680        let eventd_events = event_buffer.extract(Event::is_eventd);
1681        if let Err(e) = source_context
1682            .dispatcher()
1683            .buffered_named("events")
1684            .expect("events output should always exist")
1685            .send_all(eventd_events)
1686            .await
1687        {
1688            error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1689        }
1690    }
1691
1692    // Dispatch any service check events, if present.
1693    if event_buffer.has_event_type(EventType::ServiceCheck) {
1694        let service_check_events = event_buffer.extract(Event::is_service_check);
1695        if let Err(e) = source_context
1696            .dispatcher()
1697            .buffered_named("service_checks")
1698            .expect("service checks output should always exist")
1699            .send_all(service_check_events)
1700            .await
1701        {
1702            error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1703        }
1704    }
1705
1706    // Finally, if there are events left, they'll be metrics, so dispatch them.
1707    if !event_buffer.is_empty() {
1708        if let Err(e) = source_context
1709            .dispatcher()
1710            .dispatch_named("metrics", event_buffer)
1711            .await
1712        {
1713            error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1714        }
1715    }
1716}
1717
1718const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1719    // This is a little goofy, but hear me out:
1720    //
1721    // In the Datadog Agent, the way the UDS listener works is that if it's in stream mode, it will do a standalone
1722    // socket read to get _just_ the length delimiter, which is 4 bytes. After that, it will do a read to get the packet
1723    // data itself, up to the limit of `dogstatsd_buffer_size`. This means that a _full_ UDS stream packet can be up to
1724    // `dogstatsd_buffer_size + 4` bytes.
1725    //
1726    // This isn't a problem in the Agent due to how it does the reads, but it's a problem for us because we want to be
1727    // able to get an entire frame in a single buffer for the purpose of decoding the frame. Rather than rewriting our
1728    // read loop such that we have to change the logic depending on UDP/UDS datagram vs UDS stream, we simply increase
1729    // the buffer size by 4 bytes to account for the length delimiter.
1730    //
1731    // We do it this way so that we don't have to change the buffer size in the configuration, since if you just ported
1732    // over a Datadog Agent configuration, the value would be too small, and vise versa.
1733    buffer_size + 4
1734}
1735
1736#[cfg(test)]
1737mod tests {
1738    use std::{
1739        collections::HashMap,
1740        io::ErrorKind,
1741        net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
1742        path::PathBuf,
1743        sync::{Arc, OnceLock},
1744        time::Duration,
1745    };
1746
1747    use bytes::Bytes;
1748    use bytesize::ByteSize;
1749    use saluki_config::ConfigurationLoader;
1750    use saluki_context::{origin::RawOrigin, ContextResolverBuilder, TagsResolverBuilder};
1751    use saluki_core::{components::ComponentContext, topology::ComponentId};
1752    use saluki_env::workload::{CaptureEntityResolver, EntityId};
1753    use saluki_io::{
1754        deser::codec::dogstatsd::{DogStatsDCodec, DogStatsDCodecConfiguration, ParsedPacket},
1755        net::{ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity},
1756    };
1757    use saluki_metrics::test::TestRecorder;
1758    use serde_json::json;
1759    use stringtheory::MetaString;
1760    use tokio::{net::UdpSocket, sync::mpsc, time::timeout};
1761
1762    use super::{
1763        forwarder::{
1764            ConnectedPacketForwarder, ForwardPacket, PacketForwarder, PacketForwarderTarget, FORWARDER_QUEUE_CAPACITY,
1765        },
1766        handle_metric_packet,
1767        metrics::build_metrics,
1768        resolve_capture_container_id, ContextResolvers, DogStatsDConfiguration, DOGSTATSD_CAPTURE_DIR,
1769        MIN_CAPTURE_DEPTH,
1770    };
1771
1772    const LINUX_EAFNOSUPPORT: i32 = 97;
1773    const MACOS_EAFNOSUPPORT: i32 = 47;
1774
1775    fn is_ipv6_unavailable_error(error: &std::io::Error) -> bool {
1776        matches!(error.kind(), ErrorKind::AddrNotAvailable | ErrorKind::Unsupported)
1777            || matches!(error.raw_os_error(), Some(LINUX_EAFNOSUPPORT | MACOS_EAFNOSUPPORT))
1778    }
1779
1780    #[derive(Default)]
1781    struct CaptureTestEntityResolver {
1782        pid_map: HashMap<u32, EntityId>,
1783    }
1784
1785    impl CaptureTestEntityResolver {
1786        fn with_pid_mapping(process_id: u32, entity_id: EntityId) -> Self {
1787            let mut pid_map = HashMap::new();
1788            pid_map.insert(process_id, entity_id);
1789            Self { pid_map }
1790        }
1791    }
1792
1793    impl CaptureEntityResolver for CaptureTestEntityResolver {
1794        fn resolve_container_entity_for_live_pid(&self, process_id: u32) -> Option<EntityId> {
1795            self.pid_map.get(&process_id).cloned()
1796        }
1797    }
1798
1799    fn packet_forwarder_from_sender(
1800        target_port: u16, packets_tx: mpsc::Sender<ForwardPacket>, metrics: super::metrics::Metrics,
1801    ) -> PacketForwarder {
1802        let mut forwarder =
1803            PacketForwarderTarget::new(MetaString::from_static("127.0.0.1"), target_port).to_forwarder(metrics);
1804        forwarder.connected = Arc::new(OnceLock::from(packets_tx));
1805        forwarder
1806    }
1807
1808    #[test]
1809    fn no_metrics_when_interner_full_allocations_disallowed() {
1810        // We're specifically testing here that when we don't allow outside allocations, we should not be able to
1811        // resolve a context if the interner is full. A no-op interner has the smallest possible size, so that's going
1812        // to assure we can't intern anything... but we also need a string (name or one of the tags) that can't be
1813        // _inlined_ either, since that will get around the interner being full.
1814        //
1815        // We set our metric name to be longer than 31 bytes (the inlining limit) to ensure this.
1816
1817        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1818        let tags_resolver = TagsResolverBuilder::for_tests().build();
1819        let context_resolver = ContextResolverBuilder::for_tests()
1820            .with_heap_allocations(false)
1821            .with_tags_resolver(Some(tags_resolver.clone()))
1822            .build();
1823        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1824        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1825
1826        let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1827
1828        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1829            panic!("Failed to parse packet.");
1830        };
1831
1832        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1833        assert!(maybe_metric.is_none());
1834    }
1835
1836    #[test]
1837    fn metric_with_additional_tags() {
1838        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1839        let tags_resolver = TagsResolverBuilder::for_tests().build();
1840        let context_resolver = ContextResolverBuilder::for_tests()
1841            .with_heap_allocations(false)
1842            .with_tags_resolver(Some(tags_resolver.clone()))
1843            .build();
1844        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1845        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1846
1847        let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1848        let existing_tags_str = existing_tags.join(",");
1849
1850        let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1851        let additional_tags = [
1852            "tag4:value4".to_string(),
1853            "tag5:value5".to_string(),
1854            "tag6:value6".to_string(),
1855        ];
1856
1857        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1858            panic!("Failed to parse packet.");
1859        };
1860        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1861        assert!(maybe_metric.is_some());
1862
1863        let metric = maybe_metric.unwrap();
1864        let context = metric.context();
1865
1866        for tag in existing_tags {
1867            assert!(context.tags().has_tag(tag));
1868        }
1869
1870        for tag in additional_tags {
1871            assert!(context.tags().has_tag(tag));
1872        }
1873    }
1874
1875    fn deser_config(json: &str) -> DogStatsDConfiguration {
1876        serde_json::from_str(json).expect("failed to deserialize config")
1877    }
1878
1879    fn udp_listen_address() -> ListenAddress {
1880        ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)))
1881    }
1882
1883    fn tcp_listen_address() -> ListenAddress {
1884        ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)))
1885    }
1886
1887    #[test]
1888    fn interner_size_defaults_to_2mib() {
1889        let config = deser_config("{}");
1890        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
1891    }
1892
1893    #[test]
1894    fn socket_receive_buffer_size_defaults_to_zero() {
1895        let config = deser_config("{}");
1896        assert_eq!(config.socket_receive_buffer_size, 0);
1897    }
1898
1899    #[test]
1900    fn socket_receive_buffer_size_from_config() {
1901        let config = deser_config(r#"{"dogstatsd_so_rcvbuf": 131072}"#);
1902        assert_eq!(config.socket_receive_buffer_size, 131_072);
1903    }
1904
1905    #[test]
1906    fn stream_log_too_big_defaults_to_false() {
1907        let config = deser_config("{}");
1908        assert!(!config.stream_log_too_big);
1909    }
1910
1911    #[test]
1912    fn stream_log_too_big_from_config() {
1913        let config = deser_config(r#"{"dogstatsd_stream_log_too_big": true}"#);
1914        assert!(config.stream_log_too_big);
1915    }
1916
1917    #[test]
1918    fn statsd_forward_defaults_disabled() {
1919        let config = deser_config("{}");
1920        assert!(config.statsd_forward_host.is_none());
1921        assert_eq!(config.statsd_forward_port, 0);
1922        assert!(config.statsd_forward_target().is_none());
1923    }
1924
1925    #[test]
1926    fn statsd_forward_empty_host_disabled() {
1927        let config = deser_config(r#"{"statsd_forward_host": "", "statsd_forward_port": 9125}"#);
1928        assert!(config.statsd_forward_host.is_none());
1929        assert!(config.statsd_forward_target().is_none());
1930    }
1931
1932    #[test]
1933    fn statsd_forward_zero_port_disabled() {
1934        let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 0}"#);
1935        assert_eq!(config.statsd_forward_host.as_deref(), Some("127.0.0.1"));
1936        assert!(config.statsd_forward_target().is_none());
1937    }
1938
1939    #[test]
1940    fn statsd_forward_host_and_port_enabled() {
1941        let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 9125}"#);
1942        let (host, port) = config.statsd_forward_target().expect("forwarding should be enabled");
1943        assert_eq!(host.as_ref(), "127.0.0.1");
1944        assert_eq!(port, 9125);
1945    }
1946
1947    #[test]
1948    fn statsd_forward_invalid_target_still_builds_forwarder_handle() {
1949        let config = deser_config(r#"{"statsd_forward_host": "not a valid host", "statsd_forward_port": 9125}"#);
1950        assert!(config.packet_forwarder_target().is_some());
1951    }
1952
1953    #[tokio::test]
1954    async fn packet_forwarder_sends_payload_bytes() {
1955        let receiver = UdpSocket::bind("127.0.0.1:0").await.expect("receiver should bind");
1956        let receiver_addr = receiver.local_addr().expect("receiver should have an address");
1957        let forwarder = ConnectedPacketForwarder::connect("127.0.0.1", receiver_addr.port())
1958            .await
1959            .expect("forwarder should connect");
1960        let payload = b"daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2";
1961
1962        let recorder = TestRecorder::default();
1963        let _recorder_guard = metrics::set_default_local_recorder(&recorder);
1964        let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
1965        let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
1966        let metrics = build_metrics(&listen_addr, &context);
1967        let (packets_tx, packets_rx) = mpsc::channel(1);
1968        let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
1969        let packet_forwarder = packet_forwarder_from_sender(receiver_addr.port(), packets_tx, metrics);
1970
1971        packet_forwarder.forward(Bytes::copy_from_slice(payload)).await;
1972
1973        let mut actual = [0u8; 128];
1974        let (received_len, _) = timeout(Duration::from_secs(1), receiver.recv_from(&mut actual))
1975            .await
1976            .expect("receive should not time out")
1977            .expect("receiver should receive payload");
1978
1979        assert_eq!(&actual[..received_len], payload);
1980        assert_eq!(
1981            recorder.counter((
1982                "component_packets_forwarded_total",
1983                &[
1984                    ("component_id", "dogstatsd_test"),
1985                    ("component_type", "source"),
1986                    ("listener_type", "udp"),
1987                    ("state", "ok"),
1988                ]
1989            )),
1990            Some(1)
1991        );
1992        assert_eq!(
1993            recorder.counter((
1994                "component_bytes_forwarded_total",
1995                &[
1996                    ("component_id", "dogstatsd_test"),
1997                    ("component_type", "source"),
1998                    ("listener_type", "udp"),
1999                ]
2000            )),
2001            Some(payload.len() as u64)
2002        );
2003        worker.abort();
2004    }
2005
2006    #[tokio::test]
2007    async fn packet_forwarder_sends_payload_bytes_to_ipv6_target() {
2008        let receiver = match UdpSocket::bind("[::1]:0").await {
2009            Ok(receiver) => receiver,
2010            Err(e) if is_ipv6_unavailable_error(&e) => return,
2011            Err(e) => panic!("receiver should bind: {e}"),
2012        };
2013        let receiver_addr = receiver.local_addr().expect("receiver should have an address");
2014        let forwarder = ConnectedPacketForwarder::connect("::1", receiver_addr.port())
2015            .await
2016            .expect("forwarder should connect");
2017        let payload = b"daemon:666|g|#ip:6";
2018
2019        let recorder = TestRecorder::default();
2020        let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2021        let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2022        let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2023        let metrics = build_metrics(&listen_addr, &context);
2024        let (packets_tx, packets_rx) = mpsc::channel(1);
2025        let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2026        let packet_forwarder = packet_forwarder_from_sender(receiver_addr.port(), packets_tx, metrics);
2027
2028        packet_forwarder.forward(Bytes::copy_from_slice(payload)).await;
2029
2030        let mut actual = [0u8; 128];
2031        let (received_len, _) = timeout(Duration::from_secs(1), receiver.recv_from(&mut actual))
2032            .await
2033            .expect("receive should not time out")
2034            .expect("receiver should receive payload");
2035
2036        assert_eq!(&actual[..received_len], payload);
2037        worker.abort();
2038    }
2039
2040    #[tokio::test]
2041    async fn packet_forwarder_waits_when_queue_is_full() {
2042        let recorder = TestRecorder::default();
2043        let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2044        let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2045        let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2046        let metrics = build_metrics(&listen_addr, &context);
2047        let (packets_tx, _packets_rx) = mpsc::channel(FORWARDER_QUEUE_CAPACITY);
2048        let packet_forwarder = packet_forwarder_from_sender(9125, packets_tx, metrics);
2049
2050        for _ in 0..FORWARDER_QUEUE_CAPACITY {
2051            packet_forwarder.forward(Bytes::from_static(b"queued:1|c")).await;
2052        }
2053
2054        assert!(
2055            timeout(
2056                Duration::from_millis(100),
2057                packet_forwarder.forward(Bytes::from_static(b"blocked:1|c")),
2058            )
2059            .await
2060            .is_err(),
2061            "forwarding should wait for queue capacity instead of dropping"
2062        );
2063    }
2064
2065    #[tokio::test]
2066    async fn packet_forwarder_send_error_increments_error_telemetry() {
2067        let recorder = TestRecorder::default();
2068        let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2069        let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2070        let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2071        let metrics = build_metrics(&listen_addr, &context);
2072        let socket = UdpSocket::bind("127.0.0.1:0").await.expect("socket should bind");
2073        let forwarder = ConnectedPacketForwarder {
2074            socket,
2075            target: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9125)),
2076        };
2077        let (packets_tx, packets_rx) = mpsc::channel(1);
2078        let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2079        let packet_forwarder = packet_forwarder_from_sender(9125, packets_tx, metrics);
2080
2081        packet_forwarder.forward(Bytes::from_static(b"daemon:666|g")).await;
2082
2083        let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
2084        loop {
2085            if recorder.counter((
2086                "component_packets_forwarded_total",
2087                &[
2088                    ("component_id", "dogstatsd_test"),
2089                    ("component_type", "source"),
2090                    ("listener_type", "udp"),
2091                    ("state", "error"),
2092                ],
2093            )) == Some(1)
2094            {
2095                break;
2096            }
2097
2098            assert!(
2099                tokio::time::Instant::now() < deadline,
2100                "forwarding error telemetry should be recorded"
2101            );
2102            tokio::time::sleep(Duration::from_millis(10)).await;
2103        }
2104        worker.abort();
2105    }
2106
2107    #[test]
2108    fn autoscale_udp_listeners_defaults_to_false() {
2109        let config = deser_config("{}");
2110        assert!(!config.autoscale_udp_listeners);
2111        assert!(config.udp_streams_to_yield().is_none());
2112    }
2113
2114    #[test]
2115    #[cfg(target_os = "linux")]
2116    fn autoscale_udp_listeners_from_config_linux() {
2117        let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#);
2118        assert!(config.autoscale_udp_listeners);
2119
2120        let streams = config
2121            .udp_streams_to_yield()
2122            .expect("autoscale yields at least 1 stream");
2123        let n = streams.get();
2124        assert!(
2125            (1..=4).contains(&n),
2126            "expected 1..=4 streams from vCPU formula, got {n}"
2127        );
2128    }
2129
2130    #[test]
2131    #[cfg(not(target_os = "linux"))]
2132    fn autoscale_udp_listeners_from_config_non_linux() {
2133        let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#);
2134        assert!(config.autoscale_udp_listeners);
2135
2136        assert_eq!(None, config.udp_streams_to_yield());
2137    }
2138
2139    #[test]
2140    fn eol_required_defaults_to_no_listeners() {
2141        let config = deser_config("{}");
2142        let eol_required = config.eol_required();
2143
2144        assert!(!eol_required.for_listener(&udp_listen_address()));
2145        assert!(!eol_required.for_listener(&tcp_listen_address()));
2146    }
2147
2148    #[test]
2149    fn eol_required_matches_configured_listener_types() {
2150        let config = deser_config(r#"{"dogstatsd_eol_required": ["udp", "uds"]}"#);
2151        let eol_required = config.eol_required();
2152
2153        assert!(eol_required.for_listener(&udp_listen_address()));
2154        assert!(!eol_required.for_listener(&tcp_listen_address()));
2155
2156        #[cfg(unix)]
2157        {
2158            assert!(eol_required.for_listener(&ListenAddress::Unixgram("/tmp/dsd.sock".into())));
2159            assert!(eol_required.for_listener(&ListenAddress::Unix("/tmp/dsd-stream.sock".into())));
2160        }
2161    }
2162
2163    #[test]
2164    fn eol_required_accepts_space_separated_string() {
2165        let config = deser_config(r#"{"dogstatsd_eol_required": "udp uds"}"#);
2166        let eol_required = config.eol_required();
2167
2168        assert!(eol_required.for_listener(&udp_listen_address()));
2169    }
2170
2171    #[test]
2172    fn stream_log_too_big_only_warns_for_enabled_unix_invalid_frames() {
2173        let uds_stream = ListenAddress::Unix("/tmp/dsd-stream.sock".into());
2174        let tcp_stream = ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2175        let error = saluki_io::deser::framing::FramingError::InvalidFrame {
2176            frame_len: 8193,
2177            reason: "frame length exceeds buffer capacity",
2178        };
2179
2180        assert!(super::should_warn_stream_log_too_big(&uds_stream, &error, true));
2181        assert!(!super::should_warn_stream_log_too_big(&uds_stream, &error, false));
2182        assert!(!super::should_warn_stream_log_too_big(&tcp_stream, &error, true));
2183    }
2184
2185    #[test]
2186    fn interner_size_from_entry_count() {
2187        // A Core Agent migration config with entry count 4096 should yield 2 MiB, not 4096 bytes.
2188        let config = deser_config(r#"{"dogstatsd_string_interner_size": 4096}"#);
2189        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
2190    }
2191
2192    #[test]
2193    fn interner_size_from_explicit_bytes() {
2194        let config = deser_config(r#"{"dogstatsd_string_interner_size_bytes": 4194304}"#);
2195        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(4194304));
2196    }
2197
2198    #[test]
2199    fn interner_size_explicit_bytes_takes_priority() {
2200        let config = deser_config(
2201            r#"{"dogstatsd_string_interner_size": 4096, "dogstatsd_string_interner_size_bytes": 8388608}"#,
2202        );
2203        // The _bytes key (8 MiB) takes priority over the entry count.
2204        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(8388608));
2205    }
2206
2207    #[test]
2208    fn interner_size_custom_entry_count() {
2209        let config = deser_config(r#"{"dogstatsd_string_interner_size": 8192}"#);
2210        // 8192 entries * 512 bytes = 4 MiB
2211        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(4));
2212    }
2213
2214    /// Asserts that two lists of ListenAddress are equivalent.
2215    fn address_list_eq(expected: &mut [ListenAddress], actual: &mut [ListenAddress]) -> Result<(), String> {
2216        if expected.len() != actual.len() {
2217            return Err(format!(
2218                "length mismatch: expected {} addresses, got {}",
2219                expected.len(),
2220                actual.len()
2221            ));
2222        }
2223
2224        expected.sort_by_key(|a| a.to_string());
2225        actual.sort_by_key(|a| a.to_string());
2226
2227        for (e, a) in expected.iter().zip(actual.iter()) {
2228            let (es, as_) = (e.to_string(), a.to_string());
2229            if es != as_ {
2230                return Err(format!("address mismatch: expected {}, got {}", es, as_));
2231            }
2232        }
2233
2234        Ok(())
2235    }
2236
2237    /// This test verifies that we didn't accidentally break the `build_addresses_no_listeners` helper function which
2238    /// would render all further tests useless.
2239    #[test]
2240    fn build_addresses_assertion_function_works() {
2241        let config = DogStatsDConfiguration {
2242            port: 0,
2243            tcp_port: 123,
2244            socket_path: None,
2245            socket_stream_path: None,
2246            non_local_traffic: false,
2247            ..Default::default()
2248        };
2249        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2250            // Close, but not quite! This is intentionally *not* 127.0.0.1 to test that the assertion will fail
2251            Ipv4Addr::new(127, 0, 0, 2),
2252            123,
2253        )))];
2254        let mut actual = config.build_addresses(None);
2255        assert!(address_list_eq(&mut expected, &mut actual).is_err())
2256    }
2257
2258    /// With all four listener gates off, `build_addresses` returns an empty Vec.
2259    #[test]
2260    fn build_addresses_no_listeners() {
2261        let config = DogStatsDConfiguration {
2262            port: 0,
2263            tcp_port: 0,
2264            socket_path: None,
2265            socket_stream_path: None,
2266            non_local_traffic: false,
2267            ..Default::default()
2268        };
2269        let mut expected = vec![];
2270        let mut actual = config.build_addresses(None);
2271        address_list_eq(&mut expected, &mut actual).unwrap();
2272    }
2273
2274    /// UDP port set, `non_local_traffic=false` -> UDP listener bound to `127.0.0.1`.
2275    #[test]
2276    fn build_addresses_udp_local_only() {
2277        let config = DogStatsDConfiguration {
2278            port: 8125,
2279            tcp_port: 0,
2280            socket_path: None,
2281            socket_stream_path: None,
2282            non_local_traffic: false,
2283            ..Default::default()
2284        };
2285        let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
2286            Ipv4Addr::new(127, 0, 0, 1),
2287            8125,
2288        )))];
2289        let mut actual = config.build_addresses(None);
2290        address_list_eq(&mut expected, &mut actual).unwrap();
2291    }
2292
2293    /// UDP port set, `non_local_traffic=true` -> UDP listener bound to `0.0.0.0`.
2294    #[test]
2295    fn build_addresses_udp_non_local_only() {
2296        let config = DogStatsDConfiguration {
2297            port: 8125,
2298            tcp_port: 0,
2299            socket_path: None,
2300            socket_stream_path: None,
2301            non_local_traffic: true,
2302            ..Default::default()
2303        };
2304        let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
2305            Ipv4Addr::new(0, 0, 0, 0),
2306            8125,
2307        )))];
2308        let mut actual = config.build_addresses(None);
2309        address_list_eq(&mut expected, &mut actual).unwrap();
2310    }
2311
2312    /// TCP port set, `non_local_traffic=false` -> TCP listener bound to `127.0.0.1`.
2313    #[test]
2314    fn build_addresses_tcp_local_only() {
2315        let config = DogStatsDConfiguration {
2316            port: 0,
2317            tcp_port: 9000,
2318            socket_path: None,
2319            socket_stream_path: None,
2320            non_local_traffic: false,
2321            ..Default::default()
2322        };
2323        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2324            Ipv4Addr::new(127, 0, 0, 1),
2325            9000,
2326        )))];
2327        let mut actual = config.build_addresses(None);
2328        address_list_eq(&mut expected, &mut actual).unwrap();
2329    }
2330
2331    /// TCP port set, `non_local_traffic=true` -> TCP listener bound to `0.0.0.0`.
2332    #[test]
2333    fn build_addresses_tcp_non_local_only() {
2334        let config = DogStatsDConfiguration {
2335            port: 0,
2336            tcp_port: 9000,
2337            socket_path: None,
2338            socket_stream_path: None,
2339            non_local_traffic: true,
2340            ..Default::default()
2341        };
2342        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2343            Ipv4Addr::new(0, 0, 0, 0),
2344            9000,
2345        )))];
2346        let mut actual = config.build_addresses(None);
2347        address_list_eq(&mut expected, &mut actual).unwrap();
2348    }
2349
2350    /// `socket_path` set -> a `Unixgram` address is produced with that path.
2351    #[test]
2352    fn build_addresses_unixgram_only() {
2353        let config = DogStatsDConfiguration {
2354            port: 0,
2355            tcp_port: 0,
2356            socket_path: Some("/tmp/dsd.sock".to_string()),
2357            socket_stream_path: None,
2358            non_local_traffic: false,
2359            ..Default::default()
2360        };
2361        let mut expected = vec![ListenAddress::Unixgram("/tmp/dsd.sock".into())];
2362        let mut actual = config.build_addresses(None);
2363        address_list_eq(&mut expected, &mut actual).unwrap();
2364    }
2365
2366    /// `socket_stream_path` set -> a `Unix` (stream) address is produced with that path.
2367    #[test]
2368    fn build_addresses_unix_stream_only() {
2369        let config = DogStatsDConfiguration {
2370            port: 0,
2371            tcp_port: 0,
2372            socket_path: None,
2373            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2374            non_local_traffic: false,
2375            ..Default::default()
2376        };
2377        let mut expected = vec![ListenAddress::Unix("/tmp/dsd-stream.sock".into())];
2378        let mut actual = config.build_addresses(None);
2379        address_list_eq(&mut expected, &mut actual).unwrap();
2380    }
2381
2382    /// All four listener types enabled at once, with `non_local_traffic=true`.
2383    #[test]
2384    fn build_addresses_all_four_non_local() {
2385        let config = DogStatsDConfiguration {
2386            port: 8125,
2387            tcp_port: 9000,
2388            socket_path: Some("/tmp/dsd.sock".to_string()),
2389            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2390            non_local_traffic: true,
2391            ..Default::default()
2392        };
2393        let mut expected = vec![
2394            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
2395            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
2396            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2397            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2398        ];
2399        let mut actual = config.build_addresses(None);
2400        address_list_eq(&mut expected, &mut actual).unwrap();
2401    }
2402
2403    /// All four listener types enabled at once, with `non_local_traffic=false`.
2404    #[test]
2405    fn build_addresses_all_four_local() {
2406        let config = DogStatsDConfiguration {
2407            port: 8125,
2408            tcp_port: 9000,
2409            socket_path: Some("/tmp/dsd.sock".to_string()),
2410            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2411            non_local_traffic: false,
2412            ..Default::default()
2413        };
2414        let mut expected = vec![
2415            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8125))),
2416            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9000))),
2417            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2418            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2419        ];
2420        let mut actual = config.build_addresses(None);
2421        address_list_eq(&mut expected, &mut actual).unwrap();
2422    }
2423
2424    /// Passing `Some(ip)` to `build_addresses` with `non_local_traffic=false` -> both UDP and TCP
2425    /// bind to that IP. Includes a UDS datagram socket to confirm `bind_host` doesn't affect it.
2426    #[test]
2427    fn build_addresses_bind_host_applies_to_udp_and_tcp() {
2428        let config = DogStatsDConfiguration {
2429            port: 8125,
2430            tcp_port: 9000,
2431            socket_path: Some("/tmp/dsd.sock".to_string()),
2432            socket_stream_path: None,
2433            non_local_traffic: false,
2434            ..Default::default()
2435        };
2436        let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
2437        let mut expected = vec![
2438            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 8125))),
2439            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 9000))),
2440            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2441        ];
2442        let mut actual = config.build_addresses(bind_host);
2443        address_list_eq(&mut expected, &mut actual).unwrap();
2444    }
2445
2446    /// Passing `Some(ip)` to `build_addresses` with `non_local_traffic=true` -> both UDP and TCP
2447    /// bind to `0.0.0.0`; the `bind_host` parameter is ignored (precedence matches the Agent).
2448    /// Includes a UDS stream socket to confirm `bind_host` doesn't affect it.
2449    #[test]
2450    fn build_addresses_non_local_clobbers_bind_host() {
2451        let config = DogStatsDConfiguration {
2452            port: 8125,
2453            tcp_port: 9000,
2454            socket_path: None,
2455            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2456            non_local_traffic: true,
2457            ..Default::default()
2458        };
2459        let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
2460        let mut expected = vec![
2461            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
2462            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
2463            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2464        ];
2465        let mut actual = config.build_addresses(bind_host);
2466        address_list_eq(&mut expected, &mut actual).unwrap();
2467    }
2468
2469    #[test]
2470    fn non_finite_metric_values_are_silently_dropped() {
2471        // The Datadog Agent sends NaN gauges (for example, encode_ms.avg computed as 0.0/0.0 in Go).
2472        // FloatIter skips non-finite values with a debug log, so decode_packet returns Ok with
2473        // num_points == 0. handle_frame then returns Ok(None) for zero-point packets, which is
2474        // the existing silent-drop path (no warning emitted).
2475        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
2476        for input in &[b"my.gauge:NaN|g" as &[u8], b"my.gauge:inf|g", b"my.gauge:-inf|g"] {
2477            match codec.decode_packet(input).expect("should decode without error") {
2478                ParsedPacket::Metric(packet) => assert_eq!(
2479                    packet.num_points, 0,
2480                    "non-finite value should be dropped, leaving 0 valid points"
2481                ),
2482                _ => panic!("expected Metric packet"),
2483            }
2484        }
2485    }
2486
2487    #[tokio::test]
2488    async fn fix_empty_capture_path_sets_path_from_run_path() {
2489        const RUN_PATH: &str = "/my/little/run_path";
2490
2491        let base_config_values = json!({ "run_path": RUN_PATH });
2492        let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2493
2494        let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2495
2496        let expected = PathBuf::from(RUN_PATH).join(DOGSTATSD_CAPTURE_DIR);
2497        assert_eq!(expected, dogstatsd_config.capture_path);
2498    }
2499
2500    #[tokio::test]
2501    async fn fix_empty_capture_path_keeps_explicit_path() {
2502        const RUN_PATH: &str = "/my/little/run_path";
2503        const CAPTURE_PATH: &str = "/custom/path/to/capture";
2504
2505        let base_config_values = json!({ "run_path": RUN_PATH, "dogstatsd_capture_path": CAPTURE_PATH });
2506        let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2507
2508        let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2509
2510        assert_eq!(PathBuf::from(CAPTURE_PATH), dogstatsd_config.capture_path);
2511    }
2512
2513    #[tokio::test]
2514    async fn from_configuration_normalizes_capture_depth() {
2515        let cases = [
2516            (json!({}), MIN_CAPTURE_DEPTH),
2517            (json!({ "dogstatsd_capture_depth": 0 }), MIN_CAPTURE_DEPTH),
2518            (json!({ "dogstatsd_capture_depth": 2048 }), 2048),
2519        ];
2520
2521        for (base_config_values, expected_depth) in cases {
2522            let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2523            let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2524
2525            assert_eq!(expected_depth, dogstatsd_config.capture_depth);
2526        }
2527    }
2528
2529    #[test]
2530    fn capture_entity_resolver_is_configured_separately_from_workload_provider() {
2531        let config =
2532            DogStatsDConfiguration::default().with_capture_entity_resolver(CaptureTestEntityResolver::default());
2533
2534        assert!(config.capture_entity_resolver.is_some());
2535        assert!(config.workload_provider.is_none());
2536    }
2537
2538    #[test]
2539    fn resolve_capture_container_id_uses_live_pid_mapping() {
2540        let capture_entity_resolver = CaptureTestEntityResolver::with_pid_mapping(
2541            42,
2542            EntityId::from_local_data("ci-pid-container").expect("container entity"),
2543        );
2544
2545        assert_eq!(
2546            resolve_capture_container_id(Some(&capture_entity_resolver), Some(42)),
2547            Some("container_id://pid-container".to_string())
2548        );
2549    }
2550
2551    #[test]
2552    fn build_capture_record_ignores_payload_local_data() {
2553        let record = super::build_capture_record(None, None, b"test.metric:1|c|c:ci-local-container\n");
2554
2555        assert_eq!(record.container_id, None);
2556        assert!(record.ancillary.is_empty());
2557    }
2558
2559    #[test]
2560    fn stream_capture_state_preserves_last_pid_without_new_creds() {
2561        let mut stream_capture = super::StreamCaptureState::new();
2562
2563        stream_capture.update_peer_metadata(&ConnectionAddress::ProcessLike(ProcessIdentity::Credentials(
2564            ProcessCredentials {
2565                pid: 42,
2566                uid: 0,
2567                gid: 0,
2568            },
2569        )));
2570        stream_capture.update_peer_metadata(&ConnectionAddress::ProcessLike(ProcessIdentity::Unavailable));
2571
2572        assert_eq!(stream_capture.last_pid, Some(42));
2573    }
2574
2575    #[test]
2576    fn apply_credentials_uses_live_pid_for_normal_packet() {
2577        let mut origin = RawOrigin::default();
2578        let creds = ProcessCredentials {
2579            pid: 12345,
2580            uid: 1000,
2581            gid: 1000,
2582        };
2583        super::apply_credentials_to_origin(&mut origin, &creds);
2584
2585        assert_eq!(origin.process_id(), Some(12345));
2586    }
2587
2588    #[test]
2589    fn apply_credentials_unpacks_captured_pid_when_replay_gid_present() {
2590        let mut origin = RawOrigin::default();
2591        let captured_pid: u32 = 99887766;
2592        let creds = ProcessCredentials {
2593            pid: 12345,        // our PID (irrelevant for replay)
2594            uid: captured_pid, // captured PID packed by the sender
2595            gid: super::REPLAY_CREDENTIALS_GID,
2596        };
2597        super::apply_credentials_to_origin(&mut origin, &creds);
2598
2599        assert_eq!(
2600            origin.process_id(),
2601            Some(super::origin::mark_replay_process_id(captured_pid))
2602        );
2603    }
2604}
2605
2606#[cfg(test)]
2607mod config_smoke {
2608    use serde_json::json;
2609
2610    use super::DogStatsDConfiguration;
2611    use crate::config_registry::structs;
2612    use crate::config_registry::test_support::run_config_smoke_tests;
2613
2614    #[tokio::test]
2615    async fn smoke_test() {
2616        run_config_smoke_tests(structs::DOGSTATSD_CONFIGURATION, &[], json!({}), |cfg| {
2617            cfg.as_typed::<DogStatsDConfiguration>()
2618                .expect("DogStatsDConfiguration should deserialize")
2619        })
2620        .await
2621    }
2622}