Skip to main content

saluki_components/sources/dogstatsd/
mod.rs

1//! DogStatsD source.
2//!
3//! # Missing
4//!
5//! - Create a health handle for each listener.
6//! - Handle UDS stream framing without treating EOF the same way as UDP and UDS datagram framing.
7//! - Track dispatch failures without depending on whether all events were already iterated.
8
9use std::{
10    collections::VecDeque,
11    num::NonZeroUsize,
12    path::PathBuf,
13    sync::{Arc, LazyLock},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15};
16
17use async_trait::async_trait;
18use bytes::{Buf, BufMut};
19use bytesize::ByteSize;
20use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
21use saluki_common::{
22    sync::shutdown::{ShutdownCoordinator, ShutdownHandle},
23    task::spawn_traced_named,
24};
25use saluki_config::{deserialize_space_separated_or_seq, GenericConfiguration};
26use saluki_context::{
27    origin::RawOrigin,
28    tags::{RawTags, RawTagsFilter},
29    TagsResolver,
30};
31use saluki_core::data_model::event::{
32    eventd::EventD,
33    metric::{Metric, MetricMetadata, MetricOrigin},
34    service_check::ServiceCheck,
35    Event, EventType,
36};
37use saluki_core::{
38    components::{sources::*, ComponentContext},
39    pooling::FixedSizeObjectPool,
40    topology::{interconnect::EventBufferManager, EventsBuffer, OutputDefinition},
41};
42use saluki_env::{workload::CaptureEntityResolver, WorkloadProvider};
43use saluki_error::{generic_error, ErrorContext as _, GenericError};
44use saluki_io::{
45    buf::{BytesBuffer, ClearableIoBuffer as _, FixedSizeVec},
46    deser::{
47        codec::dogstatsd::*,
48        framing::{Framer as _, FramingError, LengthDelimitedFramer},
49    },
50    net::{
51        listener::{Listener, ListenerError},
52        ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity, Stream,
53    },
54};
55use serde::{Deserialize, Deserializer};
56use serde_with::{serde_as, NoneAsEmptyString};
57use snafu::{ResultExt as _, Snafu};
58use stringtheory::MetaString;
59use tokio::{
60    pin, select,
61    time::{interval, MissedTickBehavior},
62};
63use tracing::{debug, error, info, trace, warn};
64
65mod forwarder;
66use self::forwarder::{PacketForwarder, PacketForwarderTarget};
67
68mod framer;
69use self::framer::{get_framer, DsdFramer};
70use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
71
72mod filters;
73use self::filters::EnablePayloadsFilter;
74
75mod io_buffer;
76use self::io_buffer::IoBufferManager;
77
78mod metrics;
79use self::metrics::{build_metrics, Metrics};
80
81mod replay;
82use self::replay::{CaptureRecord, CapturedTaggerHandle, TrafficCapture};
83pub use self::replay::{
84    DogStatsDCaptureAPIHandler, DogStatsDCaptureControl, DogStatsDReplayAPIHandler, DogStatsDReplayControl,
85    ReplaySession, TimestampResolution, TrafficCaptureReader, DEFAULT_REPLAY_LOOPS, REPLAY_CREDENTIALS_GID,
86};
87
88mod origin;
89use self::origin::{
90    mark_replay_process_id, origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet,
91    DogStatsDOriginTagResolver, OriginEnrichmentConfiguration,
92};
93
94mod resolver;
95use self::resolver::ContextResolvers;
96
97mod tags;
98
99#[derive(Debug, Snafu)]
100#[snafu(context(suffix(false)))]
101enum Error {
102    #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
103    FailedToCreateListener {
104        listener_type: &'static str,
105        source: ListenerError,
106    },
107
108    #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
109    NoListenersConfigured,
110
111    #[snafu(display("Could not resolve bind_host '{}': {}", host, source))]
112    UnresolvableBindHost { host: String, source: std::io::Error },
113
114    #[snafu(display("bind_host '{}' resolved to zero IP addresses.", host))]
115    BindHostHasNoAddresses { host: String },
116}
117
118/// Baseline byte cost per interner entry, used to convert the Core Agent's entry-count-based
119/// `dogstatsd_string_interner_size` to a byte size.
120///
121/// 4096 entries × 512 bytes = 2 MiB, matching ADP's previous default.
122const INTERNER_BASELINE_BYTES_PER_ENTRY: u64 = 512;
123
124const fn default_buffer_size() -> usize {
125    8192
126}
127
128const fn default_buffer_count() -> usize {
129    128
130}
131
132const fn default_port() -> u16 {
133    8125
134}
135
136const fn default_tcp_port() -> u16 {
137    0
138}
139
140const fn default_statsd_forward_port() -> u16 {
141    0
142}
143
144const fn default_socket_receive_buffer_size() -> usize {
145    0
146}
147
148const fn default_allow_context_heap_allocations() -> bool {
149    true
150}
151
152const fn default_no_aggregation_pipeline_support() -> bool {
153    true
154}
155
156const fn default_context_string_interner_entry_count() -> u64 {
157    4096
158}
159
160const fn default_cached_contexts_limit() -> usize {
161    500_000
162}
163
164const fn default_cached_tagsets_limit() -> usize {
165    500_000
166}
167
168const fn default_context_expiry_seconds() -> u64 {
169    20
170}
171
172const fn default_dogstatsd_permissive_decoding() -> bool {
173    true
174}
175
176const fn default_dogstatsd_minimum_sample_rate() -> f64 {
177    0.000000003845
178}
179
180const fn default_true() -> bool {
181    true
182}
183
184/// Controls which payload types are forwarded to the backend.
185#[derive(Deserialize)]
186#[cfg_attr(test, derive(PartialEq, serde::Serialize))]
187pub struct EnablePayloadsConfiguration {
188    /// Whether or not to enable sending series (counter/gauge/rate) payloads.
189    ///
190    /// Defaults to `true`.
191    #[serde(default = "default_true")]
192    pub series: bool,
193
194    /// Whether or not to enable sending sketch (distribution) payloads.
195    ///
196    /// Defaults to `true`.
197    #[serde(default = "default_true")]
198    pub sketches: bool,
199
200    /// Whether or not to enable sending event payloads.
201    ///
202    /// Defaults to `true`.
203    #[serde(default = "default_true")]
204    pub events: bool,
205
206    /// Whether or not to enable sending service check payloads.
207    ///
208    /// Defaults to `true`.
209    #[serde(default = "default_true")]
210    pub service_checks: bool,
211}
212
213impl Default for EnablePayloadsConfiguration {
214    fn default() -> Self {
215        Self {
216            series: true,
217            sketches: true,
218            events: true,
219            service_checks: true,
220        }
221    }
222}
223
224const MIN_CAPTURE_DEPTH: usize = 1024;
225
226const fn default_capture_depth() -> usize {
227    MIN_CAPTURE_DEPTH
228}
229
230const DOGSTATSD_CAPTURE_DIR: &str = "dsd_capture";
231
232fn deserialize_empty_metastring_as_none<'de, D>(deserializer: D) -> Result<Option<MetaString>, D::Error>
233where
234    D: Deserializer<'de>,
235{
236    let value = Option::<MetaString>::deserialize(deserializer)?;
237    Ok(value.filter(|host| !host.is_empty()))
238}
239
240/// DogStatsD source.
241///
242/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
243#[serde_as]
244#[derive(Deserialize, Default)]
245#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
246#[cfg_attr(test, derive_where(PartialEq))]
247pub struct DogStatsDConfiguration {
248    /// The size of the buffer used to receive messages into, in bytes.
249    ///
250    /// Payloads can't exceed this size, or they will be truncated, leading to discarded messages.
251    ///
252    /// Defaults to 8192 bytes.
253    #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
254    buffer_size: usize,
255
256    /// The number of message buffers to allocate overall.
257    ///
258    /// This represents the maximum number of message buffers available for processing incoming metrics, which loosely
259    /// correlates with how many messages can be received per second. The default value should be suitable for the
260    /// majority of workloads, but high-throughput workloads may consider increasing this value.
261    ///
262    /// Defaults to 128.
263    #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
264    buffer_count: usize,
265
266    /// The port to listen on in UDP mode.
267    ///
268    /// If set to `0`, UDP isn't used.
269    ///
270    /// Defaults to 8125.
271    #[serde(rename = "dogstatsd_port", default = "default_port")]
272    port: u16,
273
274    /// The size of the DogStatsD UDP/UDS socket receive buffer, in bytes.
275    ///
276    /// If set to `0`, the OS default is used.
277    ///
278    /// Defaults to 0.
279    #[serde(rename = "dogstatsd_so_rcvbuf", default = "default_socket_receive_buffer_size")]
280    socket_receive_buffer_size: usize,
281
282    /// The port to listen on in TCP mode.
283    ///
284    /// If set to `0`, TCP isn't used.
285    ///
286    /// Defaults to 0.
287    #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
288    tcp_port: u16,
289
290    /// The host to forward framed DogStatsD messages to over UDP.
291    ///
292    /// Forwarding is enabled only when this value is non-empty and `statsd_forward_port` is non-zero. Setup failures
293    /// are logged, and send failures are tracked through telemetry.
294    ///
295    /// Defaults to unset.
296    #[serde(
297        rename = "statsd_forward_host",
298        default,
299        deserialize_with = "deserialize_empty_metastring_as_none"
300    )]
301    statsd_forward_host: Option<MetaString>,
302
303    /// The port to forward framed DogStatsD messages to over UDP.
304    ///
305    /// Forwarding is enabled only when this value is non-zero and `statsd_forward_host` is non-empty.
306    ///
307    /// Defaults to 0.
308    #[serde(rename = "statsd_forward_port", default = "default_statsd_forward_port")]
309    statsd_forward_port: u16,
310
311    /// The Unix domain socket path to listen on, in datagram mode.
312    ///
313    /// If not set, UDS (in datagram mode) isn't used.
314    ///
315    /// Defaults to unset.
316    #[serde(rename = "dogstatsd_socket", default)]
317    #[serde_as(as = "NoneAsEmptyString")]
318    socket_path: Option<String>,
319
320    /// The Unix domain socket path to listen on, in stream mode.
321    ///
322    /// If not set, UDS (in stream mode) isn't used.
323    ///
324    /// Defaults to unset.
325    #[serde(rename = "dogstatsd_stream_socket", default)]
326    #[serde_as(as = "NoneAsEmptyString")]
327    socket_stream_path: Option<String>,
328
329    /// Controls whether ADP logs oversized DogStatsD stream frames.
330    ///
331    /// When set to `true`, ADP emits a warning when a UDS stream frame exceeds the
332    /// configured DogStatsD buffer size. The frame is still rejected either way.
333    ///
334    /// Enable this when diagnosing clients that send oversized UDS stream frames.
335    ///
336    /// Defaults to `false`.
337    #[serde(rename = "dogstatsd_stream_log_too_big", default)]
338    stream_log_too_big: bool,
339
340    /// Whether ADP lowers DogStatsD parse-failure logs to debug level.
341    ///
342    /// When set to `true`, invalid metrics, events, and service checks still increment decode-failure telemetry, but
343    /// their parse-failure logs are emitted at debug level instead of warning level. Enable this to suppress noisy
344    /// parse-error logs from misbehaving clients.
345    ///
346    /// Defaults to `false`.
347    #[serde(rename = "dogstatsd_disable_verbose_logs", default)]
348    disable_verbose_logs: bool,
349
350    /// Listener types that require DogStatsD messages to be newline-terminated.
351    ///
352    /// Valid values are `udp`, `uds`, and `named_pipe`. ADP accepts `named_pipe` for compatibility, but it has no effect
353    /// until named pipe listeners are supported. Invalid values are ignored.
354    ///
355    /// Enable this when DogStatsD clients must reject packets or stream frames that don't end with a newline.
356    ///
357    /// Defaults to unset, which accepts the final message without a newline.
358    #[serde(
359        rename = "dogstatsd_eol_required",
360        default,
361        deserialize_with = "deserialize_space_separated_or_seq"
362    )]
363    eol_required: Vec<String>,
364
365    /// The host address to bind DogStatsD UDP and TCP listeners to.
366    ///
367    /// When set, UDP and TCP listeners bind to this address. Accepts either an IP literal (for example,
368    /// `192.168.1.50`, `::1`) or a hostname that resolves via DNS (for example, `agent.internal`).
369    /// Ignored when `dogstatsd_non_local_traffic` is `true`.
370    ///
371    /// Defaults to unset, which binds to `127.0.0.1`.
372    #[serde(rename = "bind_host", default)]
373    #[serde_as(as = "NoneAsEmptyString")]
374    bind_host: Option<String>,
375
376    /// Whether or not to listen for non-local traffic in UDP mode.
377    ///
378    /// If set to `true`, the listener will accept packets from any interface/address. Otherwise, the source will only
379    /// listen on the address specified by `bind_host`, or `127.0.0.1` if `bind_host` isn't set.
380    ///
381    /// Defaults to `false`.
382    #[serde(rename = "dogstatsd_non_local_traffic", default)]
383    non_local_traffic: bool,
384
385    /// Whether to autoscale UDP stream handlers using `SO_REUSEPORT`.
386    ///
387    /// When enabled on Linux, the DogStatsD source binds multiple UDP sockets to the configured port with
388    /// `SO_REUSEPORT`, allowing the kernel to load-balance incoming datagrams across independent stream handler
389    /// tasks. The number of sockets scales with available vCPUs: one stream handler base, plus one additional
390    /// per 8 vCPUs, capped at 4 total.
391    ///
392    /// Has no effect on non-Linux platforms because `SO_REUSEPORT` doesn't provide kernel-level load balancing
393    /// there; a warning is logged at startup if enabled outside of Linux.
394    ///
395    /// Enable this on multi-vCPU Linux deployments where UDP DogStatsD throughput is bottlenecked on a single
396    /// receive task.
397    ///
398    /// Defaults to `false`.
399    #[serde(rename = "dogstatsd_autoscale_udp_listeners", default)]
400    autoscale_udp_listeners: bool,
401
402    /// Whether or not to allow heap allocations when resolving contexts.
403    ///
404    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
405    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
406    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
407    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags can't be
408    /// interned, the metric is skipped.
409    ///
410    /// Defaults to `true`.
411    #[serde(
412        rename = "dogstatsd_allow_context_heap_allocs",
413        default = "default_allow_context_heap_allocations"
414    )]
415    allow_context_heap_allocations: bool,
416
417    /// Whether or not to enable support for no-aggregation pipelines.
418    ///
419    /// When enabled, this influences how metrics are parsed, specifically around user-provided metric timestamps. When
420    /// metric timestamps are present, it's used as a signal to any aggregation transforms that the metric shouldn't
421    /// be aggregated.
422    ///
423    /// Defaults to `true`.
424    #[serde(
425        rename = "dogstatsd_no_aggregation_pipeline",
426        default = "default_no_aggregation_pipeline_support"
427    )]
428    no_aggregation_pipeline_support: bool,
429
430    /// Number of entries for the string interner, as interpreted by the Core Datadog Agent.
431    ///
432    /// When `dogstatsd_string_interner_size_bytes` isn't set, this value is multiplied by 512 bytes per entry to
433    /// derive the interner byte size. This provides backwards compatibility for customers migrating configurations
434    /// from the Core Agent, where this setting represents an entry count rather than a byte size.
435    ///
436    /// Defaults to 4096 entries, which yields 2 MiB when converted.
437    #[serde(
438        rename = "dogstatsd_string_interner_size",
439        default = "default_context_string_interner_entry_count"
440    )]
441    context_string_interner_entry_count: u64,
442
443    /// Total size of the string interner used for contexts, in bytes.
444    ///
445    /// When set, this takes priority over `dogstatsd_string_interner_size`. This controls the amount of memory that
446    /// can be used to intern metric names and tags. If the interner is full, metrics with contexts that haven't
447    /// already been resolved may or may not be dropped, depending on the value of `allow_context_heap_allocations`.
448    #[serde(rename = "dogstatsd_string_interner_size_bytes", default)]
449    context_string_interner_size_bytes: Option<ByteSize>,
450
451    /// The maximum number of cached contexts to allow.
452    ///
453    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit doesn't affect
454    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
455    /// and whether or not heap allocations are allowed.
456    ///
457    /// Defaults to 500,000.
458    #[serde(
459        rename = "dogstatsd_cached_contexts_limit",
460        default = "default_cached_contexts_limit"
461    )]
462    cached_contexts_limit: usize,
463
464    /// The maximum number of cached tagsets to allow.
465    ///
466    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit doesn't affect
467    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
468    /// and whether or not heap allocations are allowed.
469    ///
470    /// Defaults to 500,000.
471    #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
472    cached_tagsets_limit: usize,
473
474    /// The number of seconds after which cached contexts will expire.
475    ///
476    /// Higher values allow for more effective caching for sparse metrics at the cost of increased memory usage.
477    ///
478    /// Defaults to 20 seconds.
479    #[serde(
480        rename = "dogstatsd_context_expiry_seconds",
481        default = "default_context_expiry_seconds"
482    )]
483    context_expiry_seconds: u64,
484
485    /// Whether or not to enable permissive mode in the decoder.
486    ///
487    /// Permissive mode allows the decoder to relax its strictness around the allowed payloads, which lets it match the
488    /// decoding behavior of the Datadog Agent.
489    ///
490    /// Defaults to `true`.
491    #[serde(
492        rename = "dogstatsd_permissive_decoding",
493        default = "default_dogstatsd_permissive_decoding"
494    )]
495    permissive_decoding: bool,
496
497    /// The minimum sample rate allowed for metrics.
498    ///
499    /// When metrics are sent with a sample rate _lower_ than this value then it will be clamped to this value. This is
500    /// done in order to ensure an upper bound on how many equivalent samples are tracked for the metric, as high sample
501    /// rates (very small numbers, such as `0.00000001`) can lead to large memory growth.
502    ///
503    /// A warning log will be emitted when clamping occurs, as this represents an effective loss of metric samples.
504    ///
505    /// Defaults to `0.000000003845`. (~260M samples)
506    #[serde(
507        rename = "dogstatsd_minimum_sample_rate",
508        default = "default_dogstatsd_minimum_sample_rate"
509    )]
510    minimum_sample_rate: f64,
511
512    /// Which payload types to forward to the backend.
513    #[serde(rename = "enable_payloads", default)]
514    enable_payloads: EnablePayloadsConfiguration,
515
516    /// Configuration related to origin detection and enrichment.
517    #[serde(flatten, default)]
518    origin_enrichment: OriginEnrichmentConfiguration,
519
520    /// Workload provider to utilize for origin detection/enrichment.
521    #[serde(skip)]
522    #[cfg_attr(test, derive_where(skip))]
523    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
524
525    /// Resolver to use for mapping live sender PIDs to container entities during traffic capture.
526    #[serde(skip, default)]
527    #[cfg_attr(test, derive_where(skip))]
528    capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
529
530    /// Additional tags to add to all metrics.
531    #[serde(rename = "dogstatsd_tags", default)]
532    additional_tags: Vec<String>,
533
534    /// The directory where DogStatsD capture files are written by default.
535    ///
536    /// When set to an empty path, the source attempts to derive the directory from `run_path` by appending
537    /// `dsd_capture`. If neither value is available, callers must provide an explicit capture path when starting a
538    /// capture session.
539    ///
540    /// Defaults to empty.
541    #[serde(rename = "dogstatsd_capture_path", default)]
542    capture_path: PathBuf,
543
544    /// The maximum number of captured packets that can be queued for persistence.
545    ///
546    /// This controls the depth of the in-process capture queue. Values below `1024` are raised to `1024` before the
547    /// capture writer starts, preventing a zero-depth rendezvous channel from serializing DogStatsD stream handlers
548    /// behind capture persistence.
549    ///
550    /// Defaults to `1024`.
551    #[serde(rename = "dogstatsd_capture_depth", default = "default_capture_depth")]
552    capture_depth: usize,
553
554    #[serde(skip, default)]
555    #[cfg_attr(test, derive_where(skip))]
556    capture_control: DogStatsDCaptureControl,
557
558    #[serde(skip, default)]
559    #[cfg_attr(test, derive_where(skip))]
560    replay_control: DogStatsDReplayControl,
561
562    /// Provider kind tag appended to all metrics as `provider_kind:<value>`.
563    ///
564    /// Set via `DD_PROVIDER_KIND` by the Helm chart on GKE Autopilot (`gke-autopilot`) and GKE on
565    /// Google Distributed Cloud (`gke-gdc`). When empty or absent, no tag is added.
566    ///
567    /// Defaults to `""` (disabled).
568    #[serde(default)]
569    provider_kind: String,
570}
571
572#[derive(Clone, Copy, Default)]
573struct EolRequired {
574    udp: bool,
575    uds: bool,
576}
577
578impl EolRequired {
579    fn from_config_values(values: &[String]) -> Self {
580        let mut eol_required = Self::default();
581
582        for value in values {
583            match value.as_str() {
584                "udp" => eol_required.udp = true,
585                "uds" => eol_required.uds = true,
586                "named_pipe" => {}
587                _ => warn!(
588                    value,
589                    "Invalid dogstatsd_eol_required value. Expected 'udp', 'uds', or 'named_pipe'."
590                ),
591            }
592        }
593
594        eol_required
595    }
596
597    fn for_listener(&self, listen_addr: &ListenAddress) -> bool {
598        match listen_addr {
599            ListenAddress::Udp(_) => self.udp,
600            ListenAddress::Tcp(_) => false,
601            ListenAddress::Unixgram(_) | ListenAddress::Unix(_) => self.uds,
602        }
603    }
604}
605
606/// Resolves a `bind_host` string to an `IpAddr`.
607///
608/// Accepts either an IP literal (no DNS required) or a hostname (resolved via async DNS). Returns
609/// `UnresolvableBindHost` if the lookup fails, or `BindHostHasNoAddresses` if it succeeds but
610/// returns no addresses.
611async fn resolve_bind_host(host: &str) -> Result<std::net::IpAddr, Error> {
612    let mut addrs = tokio::net::lookup_host((host, 0u16))
613        .await
614        .context(UnresolvableBindHost { host: host.to_string() })?;
615    addrs
616        .next()
617        .map(|sa| sa.ip())
618        .ok_or_else(|| Error::BindHostHasNoAddresses { host: host.to_string() })
619}
620
621impl DogStatsDConfiguration {
622    /// Creates a new `DogStatsDConfiguration` from the given configuration.
623    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
624        let mut dogstatsd_config: Self = config.as_typed()?;
625        dogstatsd_config.fix_empty_capture_path(config);
626        dogstatsd_config.fix_capture_depth();
627        Ok(dogstatsd_config)
628    }
629
630    /// Gets both the `additional_tags` and any others specified by other configuration fields, such as `provider_kind`.
631    fn additional_tags(&self) -> Vec<String> {
632        if self.provider_kind.is_empty() {
633            return self.additional_tags.clone();
634        }
635
636        let mut tags = self.additional_tags.clone();
637        tags.push(format!("provider_kind:{}", self.provider_kind.clone()));
638        tags
639    }
640
641    fn fix_capture_depth(&mut self) {
642        self.capture_depth = self.capture_depth.max(MIN_CAPTURE_DEPTH);
643    }
644
645    /// Returns the effective string interner size in bytes.
646    ///
647    /// If `dogstatsd_string_interner_size_bytes` is set, it's used directly. Otherwise,
648    /// `dogstatsd_string_interner_size` (an entry count) is multiplied by 512 bytes per entry to derive the byte
649    /// size.
650    fn effective_context_string_interner_bytes(&self) -> ByteSize {
651        match self.context_string_interner_size_bytes {
652            Some(explicit_bytes) => explicit_bytes,
653            None => ByteSize::b(self.context_string_interner_entry_count * INTERNER_BASELINE_BYTES_PER_ENTRY),
654        }
655    }
656
657    fn eol_required(&self) -> EolRequired {
658        EolRequired::from_config_values(&self.eol_required)
659    }
660
661    fn statsd_forward_target(&self) -> Option<(&MetaString, u16)> {
662        let host = self.statsd_forward_host.as_ref()?;
663        if self.statsd_forward_port == 0 {
664            return None;
665        }
666
667        Some((host, self.statsd_forward_port))
668    }
669
670    fn packet_forwarder_target(&self) -> Option<PacketForwarderTarget> {
671        let (host, port) = self.statsd_forward_target()?;
672        Some(PacketForwarderTarget::new(host.clone(), port))
673    }
674
675    /// Returns the number of UDP stream handlers to spawn, derived from `dogstatsd_autoscale_udp_listeners` and
676    /// the number of available vCPUs.
677    ///
678    /// Returns `None` when autoscaling is disabled, which keeps the legacy single-socket behavior. The platform
679    /// gate for `SO_REUSEPORT` lives inside the listener—this method intentionally stays platform-agnostic.
680    fn udp_streams_to_yield(&self) -> Option<NonZeroUsize> {
681        if !self.autoscale_udp_listeners {
682            return None;
683        }
684
685        #[cfg(not(target_os = "linux"))]
686        if self.autoscale_udp_listeners {
687            warn!("UDP stream handler autoscaling not supported on non-Linux platforms. Default to single stream handler.");
688            return None;
689        }
690
691        let vcpus = std::thread::available_parallelism().map(NonZeroUsize::get).unwrap_or(1);
692        let streams = (1 + vcpus / 8).min(4);
693        NonZeroUsize::new(streams)
694    }
695
696    /// Sets the workload provider to use for configuring origin detection/enrichment.
697    ///
698    /// A workload provider must be set otherwise origin detection/enrichment won't be enabled.
699    ///
700    /// Defaults to unset.
701    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
702    where
703        W: WorkloadProvider + Send + Sync + 'static,
704    {
705        self.workload_provider = Some(Arc::new(workload_provider));
706        self
707    }
708
709    /// Sets the resolver to use for mapping live sender PIDs while capturing DogStatsD traffic.
710    ///
711    /// This resolver is intentionally configured separately from the workload provider because capture only needs a
712    /// narrow live-PID lookup, while normal origin enrichment uses the broader workload provider contract.
713    ///
714    /// Defaults to unset.
715    pub fn with_capture_entity_resolver<R>(mut self, capture_entity_resolver: R) -> Self
716    where
717        R: CaptureEntityResolver + Send + Sync + 'static,
718    {
719        self.capture_entity_resolver = Some(Arc::new(capture_entity_resolver));
720        self
721    }
722
723    /// Returns the shared control handle for DogStatsD traffic capture.
724    pub fn capture_control(&self) -> DogStatsDCaptureControl {
725        self.capture_control.clone()
726    }
727
728    /// Returns an HTTP API handler exposing the DogStatsD capture control surface.
729    pub fn capture_api_handler(&self) -> DogStatsDCaptureAPIHandler {
730        DogStatsDCaptureAPIHandler::new(self.capture_control.clone())
731    }
732
733    /// Returns the shared control handle for DogStatsD traffic replay.
734    pub fn replay_control(&self) -> DogStatsDReplayControl {
735        self.replay_control.clone()
736    }
737
738    /// Returns an HTTP API handler exposing the DogStatsD replay control surface.
739    pub fn replay_api_handler(&self) -> DogStatsDReplayAPIHandler {
740        DogStatsDReplayAPIHandler::new(self.replay_control.clone())
741    }
742
743    fn fix_empty_capture_path(&mut self, config: &GenericConfiguration) {
744        if self.capture_path.parent().is_some() {
745            return;
746        }
747
748        let capture_path = match config.try_get_typed::<PathBuf>("run_path") {
749            Ok(Some(mut run_path)) => {
750                run_path.push(DOGSTATSD_CAPTURE_DIR);
751                run_path
752            }
753            Ok(None) => {
754                debug!(
755                    "`dogstatsd_capture_path` and `run_path` were empty. Default DogStatsD capture path is unavailable."
756                );
757                return;
758            }
759            Err(e) => {
760                debug!(
761                    error = %e,
762                    "Failed to read `run_path` from configuration. Default DogStatsD capture path is unavailable."
763                );
764                return;
765            }
766        };
767
768        self.capture_path = capture_path;
769    }
770
771    /// Using the current configuration, determines which listeners should be created and adds an address for each into
772    /// a `Vec<ListenAddress>`. This function has no side effects so that it can be unit tested whereas build_listeners`
773    /// actually binds the listeners on the system.
774    ///
775    /// `bind_host` is the pre-resolved IP that UDP and TCP listeners should bind to (provided by
776    /// `resolve_bind_host`). Precedence matches the Agent:
777    ///   - `non_local_traffic=true` → `0.0.0.0` (`bind_host` ignored)
778    ///   - `bind_host=Some(ip)`     → `ip`
779    ///   - `bind_host=None`         → `127.0.0.1`
780    fn build_addresses(&self, bind_host: Option<std::net::IpAddr>) -> Vec<ListenAddress> {
781        let bind_ip: std::net::IpAddr = if self.non_local_traffic {
782            [0, 0, 0, 0].into()
783        } else {
784            bind_host.unwrap_or_else(|| [127, 0, 0, 1].into())
785        };
786
787        let mut addresses: Vec<ListenAddress> = Vec::new();
788
789        if self.port != 0 {
790            addresses.push(ListenAddress::Udp(std::net::SocketAddr::new(bind_ip, self.port)));
791        }
792
793        if self.tcp_port != 0 {
794            addresses.push(ListenAddress::Tcp(std::net::SocketAddr::new(bind_ip, self.tcp_port)));
795        }
796
797        if let Some(socket_path) = &self.socket_path {
798            addresses.push(ListenAddress::Unixgram(socket_path.into()));
799        }
800
801        if let Some(socket_stream_path) = &self.socket_stream_path {
802            addresses.push(ListenAddress::Unix(socket_stream_path.into()));
803        }
804
805        addresses
806    }
807
808    /// Builds the appropriate `Listener` objects.
809    async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
810        // Resolve `bind_host` to an IP (via DNS if needed). Skip the lookup when
811        // `non_local_traffic=true` since `bind_host` is ignored in that branch—matches Go's
812        // laziness and avoids failing startup on an unresolvable hostname that wouldn't be used.
813        let bind_host: Option<std::net::IpAddr> = if self.non_local_traffic {
814            None
815        } else {
816            match &self.bind_host {
817                Some(host) => Some(resolve_bind_host(host).await?),
818                None => None,
819            }
820        };
821
822        let addresses = self.build_addresses(bind_host);
823        let mut listeners = Vec::new();
824        let socket_receive_buffer_size =
825            (self.socket_receive_buffer_size != 0).then_some(self.socket_receive_buffer_size);
826        let udp_streams_to_yield = self.udp_streams_to_yield();
827        for address in addresses {
828            let listener_type = address.listener_type();
829            let listener_streams = matches!(address, ListenAddress::Udp(_))
830                .then_some(udp_streams_to_yield)
831                .flatten();
832            let listener = Listener::from_listen_address(address, listener_streams)
833                .await
834                .context(FailedToCreateListener { listener_type })?
835                .with_receive_buffer_size(socket_receive_buffer_size);
836
837            listeners.push(listener);
838        }
839        Ok(listeners)
840    }
841}
842
843#[async_trait]
844impl SourceBuilder for DogStatsDConfiguration {
845    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
846        let listeners = self.build_listeners().await?;
847        if listeners.is_empty() {
848            return Err(Error::NoListenersConfigured.into());
849        }
850
851        // Every listener requires at least one I/O buffer to ensure that all listeners can be serviced without
852        // deadlocking any of the others. Connectionless listeners retain their buffer for the lifetime of the stream,
853        // so multi-socket UDP listeners require one buffer per yielded socket.
854        let min_buffers: usize = listeners.iter().map(Listener::min_buffer_reservation).sum();
855        if self.buffer_count < min_buffers {
856            return Err(generic_error!(
857                "Must have a minimum of {} I/O buffers to service all configured listeners (have {}).",
858                min_buffers,
859                self.buffer_count,
860            ));
861        }
862
863        let origin_detection_enabled = self.origin_enrichment.enabled();
864        // Single CapturedTaggerHandle is cloned to both the resolver (reader of the captured store) and the replay
865        // control surface (writer). Both sides reference the same atomic slot.
866        let captured_tagger = CapturedTaggerHandle::new();
867
868        let maybe_origin_tags_resolver = self.workload_provider.clone().map(|provider| {
869            DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider, captured_tagger.clone())
870        });
871        let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
872            .error_context("Failed to create context resolvers.")?;
873
874        let codec_config = DogStatsDCodecConfiguration::default()
875            .with_timestamps(self.no_aggregation_pipeline_support)
876            .with_permissive_mode(self.permissive_decoding)
877            .with_minimum_sample_rate(self.minimum_sample_rate)
878            .with_client_origin_detection(self.origin_enrichment.origin_detection_client);
879
880        let codec = DogStatsDCodec::from_configuration(codec_config);
881        let eol_required = self.eol_required();
882
883        let enable_payloads_filter = EnablePayloadsFilter::default()
884            .with_allow_series(self.enable_payloads.series)
885            .with_allow_sketches(self.enable_payloads.sketches)
886            .with_allow_events(self.enable_payloads.events)
887            .with_allow_service_checks(self.enable_payloads.service_checks);
888        let traffic_capture = TrafficCapture::with_workload_provider(
889            self.capture_path.clone(),
890            self.capture_depth.max(MIN_CAPTURE_DEPTH),
891            self.workload_provider.clone(),
892        );
893        self.capture_control.bind(traffic_capture.clone());
894        let packet_forwarder_target = self.packet_forwarder_target();
895
896        self.replay_control.bind(captured_tagger);
897
898        Ok(Box::new(DogStatsD {
899            listeners,
900            io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
901                FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
902            }),
903            codec,
904            context_resolvers,
905            enabled_filter: enable_payloads_filter,
906            origin_detection_enabled,
907            stream_log_too_big: self.stream_log_too_big,
908            disable_verbose_logs: self.disable_verbose_logs,
909            eol_required,
910            additional_tags: self.additional_tags().into(),
911            capture_entity_resolver: self.capture_entity_resolver.clone(),
912            traffic_capture,
913            packet_forwarder_target,
914        }))
915    }
916
917    fn outputs(&self) -> &[OutputDefinition<EventType>] {
918        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
919            vec![
920                OutputDefinition::named_output("metrics", EventType::Metric),
921                OutputDefinition::named_output("events", EventType::EventD),
922                OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
923            ]
924        });
925        &OUTPUTS
926    }
927}
928
929impl MemoryBounds for DogStatsDConfiguration {
930    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
931        builder
932            .minimum()
933            // Capture the size of the heap allocation when the component is built.
934            .with_single_value::<DogStatsD>("source struct")
935            // We allocate our I/O buffers entirely up front.
936            .with_expr(UsageExpr::product(
937                "buffers",
938                UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
939                UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
940            ))
941            // We also allocate the backing storage for the string interner up front, which is used by our context
942            // resolver.
943            .with_expr(UsageExpr::config(
944                "dogstatsd_string_interner_size_bytes",
945                self.effective_context_string_interner_bytes().as_u64() as usize,
946            ));
947    }
948}
949
950/// DogStatsD source.
951pub struct DogStatsD {
952    listeners: Vec<Listener>,
953    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
954    codec: DogStatsDCodec,
955    context_resolvers: ContextResolvers,
956    enabled_filter: EnablePayloadsFilter,
957    origin_detection_enabled: bool,
958    stream_log_too_big: bool,
959    disable_verbose_logs: bool,
960    eol_required: EolRequired,
961    additional_tags: Arc<[String]>,
962    capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
963    traffic_capture: TrafficCapture,
964    packet_forwarder_target: Option<PacketForwarderTarget>,
965}
966
967struct ListenerContext {
968    shutdown_handle: ShutdownHandle,
969    listener: Listener,
970    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
971    codec: DogStatsDCodec,
972    context_resolvers: ContextResolvers,
973    origin_detection_enabled: bool,
974    stream_log_too_big: bool,
975    disable_verbose_logs: bool,
976    eol_required: EolRequired,
977    additional_tags: Arc<[String]>,
978    capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
979    traffic_capture: TrafficCapture,
980    packet_forwarder_target: Option<PacketForwarderTarget>,
981}
982
983struct HandlerContext {
984    listen_addr: ListenAddress,
985    framer: DsdFramer,
986    codec: DogStatsDCodec,
987    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
988    metrics: Metrics,
989    context_resolvers: ContextResolvers,
990    origin_detection_enabled: bool,
991    stream_log_too_big: bool,
992    disable_verbose_logs: bool,
993    additional_tags: Arc<[String]>,
994    capture_entity_resolver: Option<Arc<dyn CaptureEntityResolver + Send + Sync>>,
995    traffic_capture: TrafficCapture,
996    packet_forwarder: Option<PacketForwarder>,
997}
998
999#[async_trait]
1000impl Source for DogStatsD {
1001    async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
1002        let global_shutdown = context.take_shutdown_handle();
1003        pin!(global_shutdown);
1004
1005        let mut health = context.take_health_handle();
1006
1007        let mut listener_shutdown_coordinator = ShutdownCoordinator::default();
1008
1009        // For each listener, spawn a dedicated task to run it.
1010        for listener in self.listeners {
1011            let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
1012
1013            // TODO: Create a health handle for each listener.
1014            //
1015            // We need to rework `HealthRegistry` to look a little more like `ComponentRegistry` so that we can have it
1016            // already be scoped properly, otherwise all we can do here at present is either have a relative name, like
1017            // `uds-stream`, or try and hardcode the full component name, which we will inevitably forget to update if
1018            // we tweak the topology configuration, etc.
1019            let listener_context = ListenerContext {
1020                shutdown_handle: listener_shutdown_coordinator.register(),
1021                listener,
1022                io_buffer_pool: self.io_buffer_pool.clone(),
1023                codec: self.codec.clone(),
1024                context_resolvers: self.context_resolvers.clone(),
1025                origin_detection_enabled: self.origin_detection_enabled,
1026                stream_log_too_big: self.stream_log_too_big,
1027                disable_verbose_logs: self.disable_verbose_logs,
1028                eol_required: self.eol_required,
1029                additional_tags: self.additional_tags.clone(),
1030                capture_entity_resolver: self.capture_entity_resolver.clone(),
1031                traffic_capture: self.traffic_capture.clone(),
1032                packet_forwarder_target: self.packet_forwarder_target.clone(),
1033            };
1034
1035            spawn_traced_named(
1036                task_name,
1037                process_listener(context.clone(), listener_context, self.enabled_filter),
1038            );
1039        }
1040
1041        health.mark_ready();
1042        debug!("DogStatsD source started.");
1043
1044        // Wait for the global shutdown signal, then notify listeners to shutdown.
1045        //
1046        // We also handle liveness here, which doesn't really matter for _this_ task, since the real work is happening
1047        // in the listeners, but we need to satisfy the health checker.
1048        loop {
1049            select! {
1050                _ = &mut global_shutdown => {
1051                    debug!("Received shutdown signal.");
1052                    break
1053                },
1054                _ = health.live() => continue,
1055            }
1056        }
1057
1058        debug!("Stopping DogStatsD source...");
1059
1060        listener_shutdown_coordinator.shutdown_and_wait().await;
1061
1062        debug!("DogStatsD source stopped.");
1063
1064        Ok(())
1065    }
1066}
1067
1068async fn process_listener(
1069    source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
1070) {
1071    let ListenerContext {
1072        shutdown_handle,
1073        mut listener,
1074        io_buffer_pool,
1075        codec,
1076        context_resolvers,
1077        origin_detection_enabled,
1078        stream_log_too_big,
1079        disable_verbose_logs,
1080        eol_required,
1081        additional_tags,
1082        capture_entity_resolver,
1083        traffic_capture,
1084        packet_forwarder_target,
1085    } = listener_context;
1086
1087    pin!(shutdown_handle);
1088
1089    let listen_addr = listener.listen_address().clone();
1090    let metrics = build_metrics(&listen_addr, source_context.component_context());
1091    let packet_forwarder = packet_forwarder_target
1092        .as_ref()
1093        .map(|target| target.to_forwarder(metrics.clone()));
1094    if let Some(packet_forwarder) = &packet_forwarder {
1095        packet_forwarder.spawn_connect();
1096    }
1097
1098    let mut stream_shutdown_coordinator = ShutdownCoordinator::default();
1099    let mut stream_idx: u32 = 0;
1100
1101    info!(%listen_addr, "DogStatsD listener started.");
1102
1103    loop {
1104        select! {
1105            _ = &mut shutdown_handle => {
1106                debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
1107                break;
1108            }
1109            result = listener.accept() => match result {
1110                Ok(stream) => {
1111                    debug!(%listen_addr, "Spawning new stream handler.");
1112
1113                    let handler_context = HandlerContext {
1114                        listen_addr: listen_addr.clone(),
1115                        framer: get_framer(&listen_addr, eol_required.for_listener(&listen_addr)),
1116                        codec: codec.clone(),
1117                        io_buffer_pool: io_buffer_pool.clone(),
1118                        metrics: metrics.clone(),
1119                        context_resolvers: context_resolvers.clone(),
1120                        origin_detection_enabled,
1121                        stream_log_too_big,
1122                        disable_verbose_logs,
1123                        additional_tags: additional_tags.clone(),
1124                        capture_entity_resolver: capture_entity_resolver.clone(),
1125                        traffic_capture: traffic_capture.clone(),
1126                        packet_forwarder: packet_forwarder.clone(),
1127                    };
1128
1129                    let task_name = format!(
1130                        "dogstatsd-stream-handler-{}-{}",
1131                        listen_addr.listener_type(),
1132                        stream_idx,
1133                    );
1134                    stream_idx = stream_idx.wrapping_add(1);
1135                    spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
1136                }
1137                Err(e) => {
1138                    error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
1139                    break
1140                }
1141            }
1142        }
1143    }
1144
1145    stream_shutdown_coordinator.shutdown_and_wait().await;
1146
1147    info!(%listen_addr, "DogStatsD listener stopped.");
1148}
1149
1150async fn process_stream(
1151    stream: Stream, source_context: SourceContext, handler_context: HandlerContext, shutdown_handle: ShutdownHandle,
1152    enabled_filter: EnablePayloadsFilter,
1153) {
1154    select! {
1155        _ = shutdown_handle => {
1156            debug!("Stream handler received shutdown signal.");
1157        },
1158        _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
1159    }
1160}
1161
1162async fn drive_stream(
1163    mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
1164    enabled_filter: EnablePayloadsFilter,
1165) {
1166    let HandlerContext {
1167        listen_addr,
1168        mut framer,
1169        codec,
1170        io_buffer_pool,
1171        metrics,
1172        mut context_resolvers,
1173        origin_detection_enabled,
1174        stream_log_too_big,
1175        disable_verbose_logs,
1176        additional_tags,
1177        capture_entity_resolver,
1178        traffic_capture,
1179        packet_forwarder,
1180    } = handler_context;
1181
1182    debug!(%listen_addr, "Stream handler started.");
1183
1184    if !stream.is_connectionless() {
1185        metrics.connections_active().increment(1);
1186    }
1187
1188    let mut stream_capture = StreamCaptureState::new();
1189    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
1190    // we're otherwise idle and not receiving packets from the client.
1191    let mut buffer_flush = interval(Duration::from_millis(100));
1192    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
1193
1194    let mut event_buffer_manager = EventBufferManager::default();
1195    let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
1196    let memory_limiter = source_context.topology_context().memory_limiter();
1197
1198    'read: loop {
1199        let mut eof = false;
1200
1201        let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
1202
1203        memory_limiter.wait_for_capacity().await;
1204
1205        select! {
1206            // We read from the stream.
1207            read_result = stream.receive(&mut io_buffer) => match read_result {
1208                Ok((bytes_read, peer_addr)) => {
1209                    if bytes_read == 0 {
1210                        eof = true;
1211                    }
1212
1213                    let is_connectionless = stream.is_connectionless();
1214                    let payload = received_payload(io_buffer, bytes_read);
1215
1216                    capture_uds_traffic(
1217                        &listen_addr,
1218                        &traffic_capture,
1219                        capture_entity_resolver.as_deref(),
1220                        &peer_addr,
1221                        payload,
1222                        &mut stream_capture,
1223                    );
1224
1225                    if is_connectionless {
1226                        metrics.packet_receive_success().increment(1);
1227                    }
1228                    metrics.bytes_received().increment(bytes_read as u64);
1229                    metrics.bytes_received_size().record(bytes_read as f64);
1230                    let origin_detection_failed =
1231                        origin_detection_enabled && bytes_read > 0 && peer_addr.has_process_credential_error();
1232                    if origin_detection_failed && is_connectionless {
1233                        metrics.origin_detection_errors().increment(1);
1234                    }
1235
1236                    // When we're actually at EOF, or we're dealing with a connectionless stream, we try to decode in EOF mode.
1237                    //
1238                    // For connectionless streams, we always try to decode the buffer as if it's EOF, since it effectively _is_
1239                    // always the end of file after a receive. For connection-oriented streams, we only want to do this once we've
1240                    // actually hit true EOF.
1241                    let reached_eof = eof || is_connectionless;
1242
1243                    trace!(
1244                        buffer_len = io_buffer.remaining(),
1245                        buffer_cap = io_buffer.remaining_mut(),
1246                        eof = reached_eof,
1247                        %listen_addr,
1248                        %peer_addr,
1249                        "Received {} bytes from stream.",
1250                        bytes_read
1251                    );
1252
1253                    'frame: loop {
1254                        let frame_result = framer.next_frame(io_buffer, reached_eof);
1255                        let completed_outer_frames = framer.take_completed_outer_frames();
1256                        if !is_connectionless && completed_outer_frames > 0 {
1257                            metrics.packet_receive_success().increment(completed_outer_frames as u64);
1258                        }
1259                        if origin_detection_failed && completed_outer_frames > 0 {
1260                            metrics.origin_detection_errors().increment(completed_outer_frames as u64);
1261                        }
1262
1263                        match frame_result {
1264                            Ok(Some(frame)) => {
1265                                trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
1266                                if let Some(forwarder) = &packet_forwarder {
1267                                    forwarder.forward(frame.clone()).await;
1268                                }
1269                                match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
1270                                    Ok(Some(event)) => {
1271                                        if let Some(event_buffer) = event_buffer_manager.try_push(event) {
1272                                            debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
1273                                            dispatch_events(event_buffer, &source_context, &listen_addr).await;
1274                                        }
1275                                    },
1276                                    Ok(None) => {
1277                                        // We didn't decode an event, but there was no inherent error. This is likely
1278                                        // due to hitting resource limits, etc.
1279                                        //
1280                                        // Simply continue on.
1281                                        continue
1282                                    },
1283                                    Err(e) => {
1284                                        log_parse_failure(disable_verbose_logs, &listen_addr, &peer_addr, &frame, &e);
1285                                    },
1286                                }
1287                            }
1288                            Err(e) => {
1289                                metrics.framing_errors().increment(1);
1290                                if should_warn_stream_log_too_big(&listen_addr, &e, stream_log_too_big) {
1291                                    warn!(
1292                                        %listen_addr,
1293                                        %peer_addr,
1294                                        error = %e,
1295                                        "DogStatsD stream frame exceeded the configured buffer size."
1296                                    );
1297                                }
1298
1299                                if stream.is_connectionless() {
1300                                    io_buffer.clear();
1301                                    // For connectionless streams, we don't want to shutdown the stream since we can just keep
1302                                    // reading more packets.
1303                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
1304                                    continue 'read;
1305                                } else {
1306                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
1307                                    break 'read;
1308                                }
1309                            }
1310                            Ok(None) => {
1311                                trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
1312                                if eof && !stream.is_connectionless() {
1313                                    debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
1314                                    break 'read;
1315                                } else {
1316                                    break 'frame;
1317                                }
1318                            }
1319                        }
1320                    }
1321                },
1322                Err(e) => {
1323                    metrics.packet_receive_failure().increment(1);
1324
1325                    if stream.is_connectionless() {
1326                        // For connectionless streams, we don't want to shutdown the stream since we can just keep
1327                        // reading more packets.
1328                        warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
1329                        continue 'read;
1330                    } else {
1331                        warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
1332                        break 'read;
1333                    }
1334                }
1335            },
1336
1337            _ = buffer_flush.tick() => {
1338                if let Some(event_buffer) = event_buffer_manager.consume() {
1339                    dispatch_events(event_buffer, &source_context, &listen_addr).await;
1340                }
1341            },
1342
1343        }
1344    }
1345
1346    if let Some(event_buffer) = event_buffer_manager.consume() {
1347        dispatch_events(event_buffer, &source_context, &listen_addr).await;
1348    }
1349
1350    metrics.connections_active().decrement(1);
1351
1352    debug!(%listen_addr, "Stream handler stopped.");
1353}
1354
1355fn should_warn_stream_log_too_big(listen_addr: &ListenAddress, error: &FramingError, stream_log_too_big: bool) -> bool {
1356    stream_log_too_big
1357        && matches!(listen_addr, ListenAddress::Unix(_))
1358        && matches!(error, FramingError::InvalidFrame { .. })
1359}
1360
1361fn log_parse_failure(
1362    disable_verbose_logs: bool, listen_addr: &ListenAddress, peer_addr: &ConnectionAddress, frame: &[u8],
1363    error: &ParseError,
1364) {
1365    let frame = String::from_utf8_lossy(frame);
1366    if disable_verbose_logs {
1367        debug!(%listen_addr, %peer_addr, %frame, %error, "Failed to parse frame.");
1368    } else {
1369        warn!(%listen_addr, %peer_addr, %frame, %error, "Failed to parse frame.");
1370    }
1371}
1372
1373fn capture_uds_traffic(
1374    listen_addr: &ListenAddress, traffic_capture: &TrafficCapture,
1375    capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, peer_addr: &ConnectionAddress,
1376    payload: &[u8], stream_capture: &mut StreamCaptureState,
1377) {
1378    if payload.is_empty() || !traffic_capture.is_ongoing() {
1379        return;
1380    }
1381
1382    match listen_addr {
1383        ListenAddress::Unixgram(_) => {
1384            let _ = traffic_capture.enqueue(build_capture_record(
1385                capture_entity_resolver,
1386                process_id_from_peer_addr(peer_addr),
1387                payload,
1388            ));
1389        }
1390        ListenAddress::Unix(_) => {
1391            stream_capture.update_peer_metadata(peer_addr);
1392            stream_capture.pending.extend(payload);
1393
1394            while let Ok(Some(outer_payload)) = stream_capture
1395                .outer_framer
1396                .next_frame(&mut stream_capture.pending, false)
1397            {
1398                let _ = traffic_capture.enqueue(build_capture_record(
1399                    capture_entity_resolver,
1400                    stream_capture.last_pid,
1401                    &outer_payload,
1402                ));
1403            }
1404        }
1405        _ => {}
1406    }
1407}
1408
1409struct StreamCaptureState {
1410    outer_framer: LengthDelimitedFramer,
1411    pending: VecDeque<u8>,
1412    last_pid: Option<i32>,
1413}
1414
1415impl StreamCaptureState {
1416    fn new() -> Self {
1417        Self {
1418            outer_framer: LengthDelimitedFramer,
1419            pending: VecDeque::new(),
1420            last_pid: None,
1421        }
1422    }
1423
1424    fn update_peer_metadata(&mut self, peer_addr: &ConnectionAddress) {
1425        if let Some(process_id) = process_id_from_peer_addr(peer_addr) {
1426            self.last_pid = Some(process_id);
1427        }
1428    }
1429}
1430
1431fn build_capture_record(
1432    capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, process_id: Option<i32>,
1433    payload: &[u8],
1434) -> CaptureRecord {
1435    CaptureRecord {
1436        timestamp_ns: capture_timestamp_ns(),
1437        payload: payload.to_vec(),
1438        pid: process_id,
1439        ancillary: Vec::new(),
1440        container_id: resolve_capture_container_id(capture_entity_resolver, process_id),
1441    }
1442}
1443
1444fn resolve_capture_container_id(
1445    capture_entity_resolver: Option<&(dyn CaptureEntityResolver + Send + Sync)>, process_id: Option<i32>,
1446) -> Option<String> {
1447    let process_id = u32::try_from(process_id?).ok()?;
1448    capture_entity_resolver
1449        .and_then(|resolver| resolver.resolve_container_entity_for_live_pid(process_id))
1450        .map(|entity_id| entity_id.to_string())
1451}
1452
1453fn process_id_from_peer_addr(peer_addr: &ConnectionAddress) -> Option<i32> {
1454    match peer_addr {
1455        ConnectionAddress::ProcessLike(ProcessIdentity::Credentials(creds)) => Some(creds.pid),
1456        _ => None,
1457    }
1458}
1459
1460/// Applies SCM_CREDENTIALS to the origin, dispatching on the replay marker GID.
1461///
1462/// Live packets carry the sender process's real PID/UID/GID; we use `creds.pid` as the origin's process ID. Replay
1463/// packets carry `gid == REPLAY_CREDENTIALS_GID` and pack the captured (original) PID into `creds.uid`; we recover
1464/// that PID with an internal marker so downstream tag resolution consults the captured tagger store.
1465fn apply_credentials_to_origin(origin: &mut RawOrigin<'_>, creds: &ProcessCredentials) {
1466    if creds.gid == REPLAY_CREDENTIALS_GID {
1467        origin.set_process_id(mark_replay_process_id(creds.uid));
1468    } else {
1469        origin.set_process_id(creds.pid as u32);
1470    }
1471}
1472
1473fn received_payload(buffer: &BytesBuffer, bytes_read: usize) -> &[u8] {
1474    let chunk = buffer.chunk();
1475    let start = chunk.len().saturating_sub(bytes_read);
1476    &chunk[start..]
1477}
1478
1479fn capture_timestamp_ns() -> i64 {
1480    SystemTime::now()
1481        .duration_since(UNIX_EPOCH)
1482        .map(|duration| duration.as_nanos().min(i64::MAX as u128) as i64)
1483        .unwrap_or_default()
1484}
1485
1486fn handle_frame(
1487    frame: &[u8], codec: &DogStatsDCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
1488    peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
1489) -> Result<Option<Event>, ParseError> {
1490    let parsed = match codec.decode_packet(frame) {
1491        Ok(parsed) => parsed,
1492        Err(e) => {
1493            // Try and determine what the message type was, if possible, to increment the correct error counter.
1494            match parse_message_type(frame) {
1495                MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
1496                MessageType::Event => source_metrics.event_decode_failed().increment(1),
1497                MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
1498            }
1499
1500            return Err(e);
1501        }
1502    };
1503
1504    let event = match parsed {
1505        ParsedPacket::Metric(metric_packet) => {
1506            if metric_packet.num_points == 0 {
1507                return Ok(None);
1508            }
1509            let events_len = metric_packet.num_points;
1510            if !enabled_filter.allow_metric(&metric_packet) {
1511                trace!(
1512                    metric.name = metric_packet.metric_name,
1513                    "Skipping metric due to filter configuration."
1514                );
1515                return Ok(None);
1516            }
1517
1518            match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
1519                Some(metric) => {
1520                    source_metrics.metrics_received().increment(events_len);
1521                    Event::Metric(metric)
1522                }
1523                None => {
1524                    // We can only fail to get a metric back if we failed to resolve the context.
1525                    source_metrics.failed_context_resolve_total().increment(1);
1526                    return Ok(None);
1527                }
1528            }
1529        }
1530        ParsedPacket::Event(event) => {
1531            if !enabled_filter.allow_event(&event) {
1532                trace!("Skipping event {} due to filter configuration.", event.title);
1533                return Ok(None);
1534            }
1535            let tags_resolver = context_resolvers.tags();
1536            match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
1537                Some(event) => {
1538                    source_metrics.events_received().increment(1);
1539                    Event::EventD(event)
1540                }
1541                None => {
1542                    source_metrics.failed_context_resolve_total().increment(1);
1543                    return Ok(None);
1544                }
1545            }
1546        }
1547        ParsedPacket::ServiceCheck(service_check) => {
1548            if !enabled_filter.allow_service_check(&service_check) {
1549                trace!(
1550                    "Skipping service check {} due to filter configuration.",
1551                    service_check.name
1552                );
1553                return Ok(None);
1554            }
1555            let tags_resolver = context_resolvers.tags();
1556            match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
1557                Some(service_check) => {
1558                    source_metrics.service_checks_received().increment(1);
1559                    Event::ServiceCheck(service_check)
1560                }
1561                None => {
1562                    source_metrics.failed_context_resolve_total().increment(1);
1563                    return Ok(None);
1564                }
1565            }
1566        }
1567    };
1568
1569    Ok(Some(event))
1570}
1571
1572fn handle_metric_packet(
1573    packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1574    additional_tags: &[String],
1575) -> Option<Metric> {
1576    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1577
1578    let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1579    if let Some(creds) = peer_addr.process_credentials() {
1580        apply_credentials_to_origin(&mut origin, creds);
1581    }
1582
1583    // Choose the right context resolver based on whether or not this metric is pre-aggregated.
1584    let context_resolver = if packet.timestamp.is_some() {
1585        context_resolvers.no_agg()
1586    } else {
1587        context_resolvers.primary()
1588    };
1589
1590    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1591
1592    // Try to resolve the context for this metric.
1593    match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1594        Some(context) => {
1595            let metric_origin = well_known_tags
1596                .jmx_check_name
1597                .map(MetricOrigin::jmx_check)
1598                .unwrap_or_else(MetricOrigin::dogstatsd);
1599            let metadata = MetricMetadata::default()
1600                .with_origin(metric_origin)
1601                .with_hostname(well_known_tags.hostname.map(Arc::from))
1602                .with_unit(packet.unit.map_or_else(MetaString::empty, MetaString::from_static));
1603
1604            Some(Metric::from_parts(context, packet.values, metadata))
1605        }
1606        // We failed to resolve the context, likely due to not having enough interner capacity.
1607        None => None,
1608    }
1609}
1610
1611fn handle_event_packet(
1612    packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1613) -> Option<EventD> {
1614    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1615
1616    let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1617    if let Some(creds) = peer_addr.process_credentials() {
1618        apply_credentials_to_origin(&mut origin, creds);
1619    }
1620    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1621
1622    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1623    let tags = tags_resolver.create_tag_set(tags)?;
1624
1625    // When no d: field is present, backfill the current time—matching the stock Datadog Agent's
1626    // behavior in pkg/aggregator/aggregator.go (addEvent), which sets e.Ts = time.Now().Unix()
1627    // for any event with Ts == 0.
1628    let timestamp = packet
1629        .timestamp
1630        .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1631
1632    let eventd = EventD::new(packet.title, packet.text)
1633        .with_timestamp(timestamp)
1634        .with_hostname(packet.hostname.map(|s| s.into()))
1635        .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1636        .with_alert_type(packet.alert_type)
1637        .with_priority(packet.priority)
1638        // When no source type is provided, default to "api"—the same default the stock Datadog
1639        // Agent applies when serializing DogStatsD events to the intake JSON format. The agent
1640        // groups events by source type name and uses "api" as the key for events without an
1641        // explicit `s:` field. See: pkg/serializer/internal/metrics/events.go (writeItem).
1642        .with_source_type_name(Some(
1643            packet
1644                .source_type_name
1645                .map(|s| s.into())
1646                .unwrap_or_else(|| "api".into()),
1647        ))
1648        .with_alert_type(packet.alert_type)
1649        .with_tags(tags)
1650        .with_origin_tags(origin_tags);
1651
1652    Some(eventd)
1653}
1654
1655fn handle_service_check_packet(
1656    packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1657    additional_tags: &[String],
1658) -> Option<ServiceCheck> {
1659    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1660
1661    let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1662    if let Some(creds) = peer_addr.process_credentials() {
1663        apply_credentials_to_origin(&mut origin, creds);
1664    }
1665    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1666
1667    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1668    let tags = tags_resolver.create_tag_set(tags)?;
1669
1670    // When no d: field is present, backfill the current time—matching the stock Datadog Agent's
1671    // behavior, which sets the timestamp to time.Now().Unix() for any service check with a zero
1672    // timestamp.
1673    let timestamp = packet
1674        .timestamp
1675        .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1676
1677    let service_check = ServiceCheck::new(packet.name, packet.status)
1678        .with_timestamp(timestamp)
1679        .with_hostname(packet.hostname.map(|s| s.into()))
1680        .with_tags(tags)
1681        .with_origin_tags(origin_tags)
1682        .with_message(packet.message.map(|s| s.into()));
1683
1684    Some(service_check)
1685}
1686
1687fn get_filtered_tags_iterator<'a>(
1688    raw_tags: RawTags<'a>, additional_tags: &'a [String],
1689) -> impl Iterator<Item = &'a str> + Clone {
1690    // This filters out "well-known" tags from the raw tags in the DogStatsD packet, and then chains on any additional tags
1691    // that were configured on the source.
1692    RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1693}
1694
1695async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1696    debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1697
1698    // TODO: This is maybe a little dicey because if we fail to dispatch the events, we may not have iterated over all of
1699    // them, so there might still be eventd events when get to the service checks point, and eventd events and/or service
1700    // check events when we get to the metrics point, and so on.
1701    //
1702    // There's probably something to be said for erroring out fully if this happens, since we should only fail to
1703    // dispatch if the downstream component fails entirely... and unless we have a way to restart the component, then
1704    // we're going to continue to fail to dispatch any more events until the process is restarted anyways.
1705
1706    // Dispatch any eventd events, if present.
1707    if event_buffer.has_event_type(EventType::EventD) {
1708        let eventd_events = event_buffer.extract(Event::is_eventd);
1709        if let Err(e) = source_context
1710            .dispatcher()
1711            .buffered_named("events")
1712            .expect("events output should always exist")
1713            .send_all(eventd_events)
1714            .await
1715        {
1716            error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1717        }
1718    }
1719
1720    // Dispatch any service check events, if present.
1721    if event_buffer.has_event_type(EventType::ServiceCheck) {
1722        let service_check_events = event_buffer.extract(Event::is_service_check);
1723        if let Err(e) = source_context
1724            .dispatcher()
1725            .buffered_named("service_checks")
1726            .expect("service checks output should always exist")
1727            .send_all(service_check_events)
1728            .await
1729        {
1730            error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1731        }
1732    }
1733
1734    // Finally, if there are events left, they'll be metrics, so dispatch them.
1735    if !event_buffer.is_empty() {
1736        if let Err(e) = source_context
1737            .dispatcher()
1738            .dispatch_named("metrics", event_buffer)
1739            .await
1740        {
1741            error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1742        }
1743    }
1744}
1745
1746const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1747    // This is a little goofy, but hear me out:
1748    //
1749    // In the Datadog Agent, the way the UDS listener works is that if it's in stream mode, it will do a standalone
1750    // socket read to get _just_ the length delimiter, which is 4 bytes. After that, it will do a read to get the packet
1751    // data itself, up to the limit of `dogstatsd_buffer_size`. This means that a _full_ UDS stream packet can be up to
1752    // `dogstatsd_buffer_size + 4` bytes.
1753    //
1754    // This isn't a problem in the Agent due to how it does the reads, but it's a problem for us because we want to be
1755    // able to get an entire frame in a single buffer for the purpose of decoding the frame. Rather than rewriting our
1756    // read loop such that we have to change the logic depending on UDP/UDS datagram vs UDS stream, we simply increase
1757    // the buffer size by 4 bytes to account for the length delimiter.
1758    //
1759    // We do it this way so that we don't have to change the buffer size in the configuration, since if you just ported
1760    // over a Datadog Agent configuration, the value would be too small, and vise versa.
1761    buffer_size + 4
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766    use std::{
1767        collections::HashMap,
1768        io::ErrorKind,
1769        net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
1770        path::PathBuf,
1771        sync::{Arc, OnceLock},
1772        time::Duration,
1773    };
1774
1775    use bytes::Bytes;
1776    use bytesize::ByteSize;
1777    use saluki_config::ConfigurationLoader;
1778    use saluki_context::{origin::RawOrigin, ContextResolverBuilder, TagsResolverBuilder};
1779    use saluki_core::{components::ComponentContext, topology::ComponentId};
1780    use saluki_env::workload::{CaptureEntityResolver, EntityId};
1781    use saluki_io::{
1782        deser::codec::dogstatsd::{DogStatsDCodec, DogStatsDCodecConfiguration, ParsedPacket},
1783        net::{ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity},
1784    };
1785    use saluki_metrics::test::TestRecorder;
1786    use serde_json::json;
1787    use stringtheory::MetaString;
1788    use tokio::{net::UdpSocket, sync::mpsc, time::timeout};
1789
1790    use super::{
1791        forwarder::{
1792            ConnectedPacketForwarder, ForwardPacket, PacketForwarder, PacketForwarderTarget, FORWARDER_QUEUE_CAPACITY,
1793        },
1794        handle_metric_packet,
1795        metrics::build_metrics,
1796        resolve_capture_container_id, ContextResolvers, DogStatsDConfiguration, DOGSTATSD_CAPTURE_DIR,
1797        MIN_CAPTURE_DEPTH,
1798    };
1799
1800    const LINUX_EAFNOSUPPORT: i32 = 97;
1801    const MACOS_EAFNOSUPPORT: i32 = 47;
1802
1803    fn is_ipv6_unavailable_error(error: &std::io::Error) -> bool {
1804        matches!(error.kind(), ErrorKind::AddrNotAvailable | ErrorKind::Unsupported)
1805            || matches!(error.raw_os_error(), Some(LINUX_EAFNOSUPPORT | MACOS_EAFNOSUPPORT))
1806    }
1807
1808    #[derive(Default)]
1809    struct CaptureTestEntityResolver {
1810        pid_map: HashMap<u32, EntityId>,
1811    }
1812
1813    impl CaptureTestEntityResolver {
1814        fn with_pid_mapping(process_id: u32, entity_id: EntityId) -> Self {
1815            let mut pid_map = HashMap::new();
1816            pid_map.insert(process_id, entity_id);
1817            Self { pid_map }
1818        }
1819    }
1820
1821    impl CaptureEntityResolver for CaptureTestEntityResolver {
1822        fn resolve_container_entity_for_live_pid(&self, process_id: u32) -> Option<EntityId> {
1823            self.pid_map.get(&process_id).cloned()
1824        }
1825    }
1826
1827    fn packet_forwarder_from_sender(
1828        target_port: u16, packets_tx: mpsc::Sender<ForwardPacket>, metrics: super::metrics::Metrics,
1829    ) -> PacketForwarder {
1830        let mut forwarder =
1831            PacketForwarderTarget::new(MetaString::from_static("127.0.0.1"), target_port).to_forwarder(metrics);
1832        forwarder.connected = Arc::new(OnceLock::from(packets_tx));
1833        forwarder
1834    }
1835
1836    #[test]
1837    fn no_metrics_when_interner_full_allocations_disallowed() {
1838        // We're specifically testing here that when we don't allow outside allocations, we should not be able to
1839        // resolve a context if the interner is full. A no-op interner has the smallest possible size, so that's going
1840        // to assure we can't intern anything... but we also need a string (name or one of the tags) that can't be
1841        // _inlined_ either, since that will get around the interner being full.
1842        //
1843        // We set our metric name to be longer than 31 bytes (the inlining limit) to ensure this.
1844
1845        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1846        let tags_resolver = TagsResolverBuilder::for_tests().build();
1847        let context_resolver = ContextResolverBuilder::for_tests()
1848            .with_heap_allocations(false)
1849            .with_tags_resolver(Some(tags_resolver.clone()))
1850            .build();
1851        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1852        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1853
1854        let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1855
1856        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1857            panic!("Failed to parse packet.");
1858        };
1859
1860        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1861        assert!(maybe_metric.is_none());
1862    }
1863
1864    #[test]
1865    fn metric_with_additional_tags() {
1866        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1867        let tags_resolver = TagsResolverBuilder::for_tests().build();
1868        let context_resolver = ContextResolverBuilder::for_tests()
1869            .with_heap_allocations(false)
1870            .with_tags_resolver(Some(tags_resolver.clone()))
1871            .build();
1872        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1873        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1874
1875        let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1876        let existing_tags_str = existing_tags.join(",");
1877
1878        let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1879        let additional_tags = [
1880            "tag4:value4".to_string(),
1881            "tag5:value5".to_string(),
1882            "tag6:value6".to_string(),
1883        ];
1884
1885        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1886            panic!("Failed to parse packet.");
1887        };
1888        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1889        assert!(maybe_metric.is_some());
1890
1891        let metric = maybe_metric.unwrap();
1892        let context = metric.context();
1893
1894        for tag in existing_tags {
1895            assert!(context.tags().has_tag(tag));
1896        }
1897
1898        for tag in additional_tags {
1899            assert!(context.tags().has_tag(tag));
1900        }
1901    }
1902
1903    fn deser_config(json: &str) -> DogStatsDConfiguration {
1904        serde_json::from_str(json).expect("failed to deserialize config")
1905    }
1906
1907    fn udp_listen_address() -> ListenAddress {
1908        ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)))
1909    }
1910
1911    fn tcp_listen_address() -> ListenAddress {
1912        ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)))
1913    }
1914
1915    #[test]
1916    fn interner_size_defaults_to_2mib() {
1917        let config = deser_config("{}");
1918        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
1919    }
1920
1921    #[test]
1922    fn socket_receive_buffer_size_defaults_to_zero() {
1923        let config = deser_config("{}");
1924        assert_eq!(config.socket_receive_buffer_size, 0);
1925    }
1926
1927    #[test]
1928    fn socket_receive_buffer_size_from_config() {
1929        let config = deser_config(r#"{"dogstatsd_so_rcvbuf": 131072}"#);
1930        assert_eq!(config.socket_receive_buffer_size, 131_072);
1931    }
1932
1933    #[test]
1934    fn stream_log_too_big_defaults_to_false() {
1935        let config = deser_config("{}");
1936        assert!(!config.stream_log_too_big);
1937    }
1938
1939    #[test]
1940    fn stream_log_too_big_from_config() {
1941        let config = deser_config(r#"{"dogstatsd_stream_log_too_big": true}"#);
1942        assert!(config.stream_log_too_big);
1943    }
1944
1945    #[test]
1946    fn disable_verbose_logs_defaults_to_false() {
1947        let config = deser_config("{}");
1948        assert!(!config.disable_verbose_logs);
1949    }
1950
1951    #[test]
1952    fn disable_verbose_logs_from_config() {
1953        let config = deser_config(r#"{"dogstatsd_disable_verbose_logs": true}"#);
1954        assert!(config.disable_verbose_logs);
1955    }
1956
1957    #[test]
1958    fn statsd_forward_defaults_disabled() {
1959        let config = deser_config("{}");
1960        assert!(config.statsd_forward_host.is_none());
1961        assert_eq!(config.statsd_forward_port, 0);
1962        assert!(config.statsd_forward_target().is_none());
1963    }
1964
1965    #[test]
1966    fn statsd_forward_empty_host_disabled() {
1967        let config = deser_config(r#"{"statsd_forward_host": "", "statsd_forward_port": 9125}"#);
1968        assert!(config.statsd_forward_host.is_none());
1969        assert!(config.statsd_forward_target().is_none());
1970    }
1971
1972    #[test]
1973    fn statsd_forward_zero_port_disabled() {
1974        let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 0}"#);
1975        assert_eq!(config.statsd_forward_host.as_deref(), Some("127.0.0.1"));
1976        assert!(config.statsd_forward_target().is_none());
1977    }
1978
1979    #[test]
1980    fn statsd_forward_host_and_port_enabled() {
1981        let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 9125}"#);
1982        let (host, port) = config.statsd_forward_target().expect("forwarding should be enabled");
1983        assert_eq!(host.as_ref(), "127.0.0.1");
1984        assert_eq!(port, 9125);
1985    }
1986
1987    #[test]
1988    fn statsd_forward_invalid_target_still_builds_forwarder_handle() {
1989        let config = deser_config(r#"{"statsd_forward_host": "not a valid host", "statsd_forward_port": 9125}"#);
1990        assert!(config.packet_forwarder_target().is_some());
1991    }
1992
1993    #[tokio::test]
1994    async fn packet_forwarder_sends_payload_bytes() {
1995        let receiver = UdpSocket::bind("127.0.0.1:0").await.expect("receiver should bind");
1996        let receiver_addr = receiver.local_addr().expect("receiver should have an address");
1997        let forwarder = ConnectedPacketForwarder::connect("127.0.0.1", receiver_addr.port())
1998            .await
1999            .expect("forwarder should connect");
2000        let payload = b"daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2";
2001
2002        let recorder = TestRecorder::default();
2003        let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2004        let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2005        let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2006        let metrics = build_metrics(&listen_addr, &context);
2007        let (packets_tx, packets_rx) = mpsc::channel(1);
2008        let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2009        let packet_forwarder = packet_forwarder_from_sender(receiver_addr.port(), packets_tx, metrics);
2010
2011        packet_forwarder.forward(Bytes::copy_from_slice(payload)).await;
2012
2013        let mut actual = [0u8; 128];
2014        let (received_len, _) = timeout(Duration::from_secs(1), receiver.recv_from(&mut actual))
2015            .await
2016            .expect("receive should not time out")
2017            .expect("receiver should receive payload");
2018
2019        assert_eq!(&actual[..received_len], payload);
2020        assert_eq!(
2021            recorder.counter((
2022                "component_packets_forwarded_total",
2023                &[
2024                    ("component_id", "dogstatsd_test"),
2025                    ("component_type", "source"),
2026                    ("listener_type", "udp"),
2027                    ("state", "ok"),
2028                ]
2029            )),
2030            Some(1)
2031        );
2032        assert_eq!(
2033            recorder.counter((
2034                "component_bytes_forwarded_total",
2035                &[
2036                    ("component_id", "dogstatsd_test"),
2037                    ("component_type", "source"),
2038                    ("listener_type", "udp"),
2039                ]
2040            )),
2041            Some(payload.len() as u64)
2042        );
2043        worker.abort();
2044    }
2045
2046    #[tokio::test]
2047    async fn packet_forwarder_sends_payload_bytes_to_ipv6_target() {
2048        let receiver = match UdpSocket::bind("[::1]:0").await {
2049            Ok(receiver) => receiver,
2050            Err(e) if is_ipv6_unavailable_error(&e) => return,
2051            Err(e) => panic!("receiver should bind: {e}"),
2052        };
2053        let receiver_addr = receiver.local_addr().expect("receiver should have an address");
2054        let forwarder = ConnectedPacketForwarder::connect("::1", receiver_addr.port())
2055            .await
2056            .expect("forwarder should connect");
2057        let payload = b"daemon:666|g|#ip:6";
2058
2059        let recorder = TestRecorder::default();
2060        let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2061        let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2062        let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2063        let metrics = build_metrics(&listen_addr, &context);
2064        let (packets_tx, packets_rx) = mpsc::channel(1);
2065        let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2066        let packet_forwarder = packet_forwarder_from_sender(receiver_addr.port(), packets_tx, metrics);
2067
2068        packet_forwarder.forward(Bytes::copy_from_slice(payload)).await;
2069
2070        let mut actual = [0u8; 128];
2071        let (received_len, _) = timeout(Duration::from_secs(1), receiver.recv_from(&mut actual))
2072            .await
2073            .expect("receive should not time out")
2074            .expect("receiver should receive payload");
2075
2076        assert_eq!(&actual[..received_len], payload);
2077        worker.abort();
2078    }
2079
2080    #[tokio::test]
2081    async fn packet_forwarder_waits_when_queue_is_full() {
2082        let recorder = TestRecorder::default();
2083        let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2084        let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2085        let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2086        let metrics = build_metrics(&listen_addr, &context);
2087        let (packets_tx, _packets_rx) = mpsc::channel(FORWARDER_QUEUE_CAPACITY);
2088        let packet_forwarder = packet_forwarder_from_sender(9125, packets_tx, metrics);
2089
2090        for _ in 0..FORWARDER_QUEUE_CAPACITY {
2091            packet_forwarder.forward(Bytes::from_static(b"queued:1|c")).await;
2092        }
2093
2094        assert!(
2095            timeout(
2096                Duration::from_millis(100),
2097                packet_forwarder.forward(Bytes::from_static(b"blocked:1|c")),
2098            )
2099            .await
2100            .is_err(),
2101            "forwarding should wait for queue capacity instead of dropping"
2102        );
2103    }
2104
2105    #[tokio::test]
2106    async fn packet_forwarder_send_error_increments_error_telemetry() {
2107        let recorder = TestRecorder::default();
2108        let _recorder_guard = metrics::set_default_local_recorder(&recorder);
2109        let listen_addr = ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2110        let context = ComponentContext::source(ComponentId::try_from("dogstatsd_test").expect("valid component ID"));
2111        let metrics = build_metrics(&listen_addr, &context);
2112        let socket = UdpSocket::bind("127.0.0.1:0").await.expect("socket should bind");
2113        let forwarder = ConnectedPacketForwarder {
2114            socket,
2115            target: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9125)),
2116        };
2117        let (packets_tx, packets_rx) = mpsc::channel(1);
2118        let worker = tokio::spawn(forwarder.run(packets_rx, metrics.clone()));
2119        let packet_forwarder = packet_forwarder_from_sender(9125, packets_tx, metrics);
2120
2121        packet_forwarder.forward(Bytes::from_static(b"daemon:666|g")).await;
2122
2123        let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
2124        loop {
2125            if recorder.counter((
2126                "component_packets_forwarded_total",
2127                &[
2128                    ("component_id", "dogstatsd_test"),
2129                    ("component_type", "source"),
2130                    ("listener_type", "udp"),
2131                    ("state", "error"),
2132                ],
2133            )) == Some(1)
2134            {
2135                break;
2136            }
2137
2138            assert!(
2139                tokio::time::Instant::now() < deadline,
2140                "forwarding error telemetry should be recorded"
2141            );
2142            tokio::time::sleep(Duration::from_millis(10)).await;
2143        }
2144        worker.abort();
2145    }
2146
2147    #[test]
2148    fn autoscale_udp_listeners_defaults_to_false() {
2149        let config = deser_config("{}");
2150        assert!(!config.autoscale_udp_listeners);
2151        assert!(config.udp_streams_to_yield().is_none());
2152    }
2153
2154    #[test]
2155    #[cfg(target_os = "linux")]
2156    fn autoscale_udp_listeners_from_config_linux() {
2157        let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#);
2158        assert!(config.autoscale_udp_listeners);
2159
2160        let streams = config
2161            .udp_streams_to_yield()
2162            .expect("autoscale yields at least 1 stream");
2163        let n = streams.get();
2164        assert!(
2165            (1..=4).contains(&n),
2166            "expected 1..=4 streams from vCPU formula, got {n}"
2167        );
2168    }
2169
2170    #[test]
2171    #[cfg(not(target_os = "linux"))]
2172    fn autoscale_udp_listeners_from_config_non_linux() {
2173        let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#);
2174        assert!(config.autoscale_udp_listeners);
2175
2176        assert_eq!(None, config.udp_streams_to_yield());
2177    }
2178
2179    #[test]
2180    fn eol_required_defaults_to_no_listeners() {
2181        let config = deser_config("{}");
2182        let eol_required = config.eol_required();
2183
2184        assert!(!eol_required.for_listener(&udp_listen_address()));
2185        assert!(!eol_required.for_listener(&tcp_listen_address()));
2186    }
2187
2188    #[test]
2189    fn eol_required_matches_configured_listener_types() {
2190        let config = deser_config(r#"{"dogstatsd_eol_required": ["udp", "uds"]}"#);
2191        let eol_required = config.eol_required();
2192
2193        assert!(eol_required.for_listener(&udp_listen_address()));
2194        assert!(!eol_required.for_listener(&tcp_listen_address()));
2195
2196        #[cfg(unix)]
2197        {
2198            assert!(eol_required.for_listener(&ListenAddress::Unixgram("/tmp/dsd.sock".into())));
2199            assert!(eol_required.for_listener(&ListenAddress::Unix("/tmp/dsd-stream.sock".into())));
2200        }
2201    }
2202
2203    #[test]
2204    fn eol_required_accepts_space_separated_string() {
2205        let config = deser_config(r#"{"dogstatsd_eol_required": "udp uds"}"#);
2206        let eol_required = config.eol_required();
2207
2208        assert!(eol_required.for_listener(&udp_listen_address()));
2209    }
2210
2211    #[test]
2212    fn stream_log_too_big_only_warns_for_enabled_unix_invalid_frames() {
2213        let uds_stream = ListenAddress::Unix("/tmp/dsd-stream.sock".into());
2214        let tcp_stream = ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125)));
2215        let error = saluki_io::deser::framing::FramingError::InvalidFrame {
2216            frame_len: 8193,
2217            reason: "frame length exceeds buffer capacity",
2218        };
2219
2220        assert!(super::should_warn_stream_log_too_big(&uds_stream, &error, true));
2221        assert!(!super::should_warn_stream_log_too_big(&uds_stream, &error, false));
2222        assert!(!super::should_warn_stream_log_too_big(&tcp_stream, &error, true));
2223    }
2224
2225    #[test]
2226    fn interner_size_from_entry_count() {
2227        // A Core Agent migration config with entry count 4096 should yield 2 MiB, not 4096 bytes.
2228        let config = deser_config(r#"{"dogstatsd_string_interner_size": 4096}"#);
2229        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
2230    }
2231
2232    #[test]
2233    fn interner_size_from_explicit_bytes() {
2234        let config = deser_config(r#"{"dogstatsd_string_interner_size_bytes": 4194304}"#);
2235        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(4194304));
2236    }
2237
2238    #[test]
2239    fn interner_size_explicit_bytes_takes_priority() {
2240        let config = deser_config(
2241            r#"{"dogstatsd_string_interner_size": 4096, "dogstatsd_string_interner_size_bytes": 8388608}"#,
2242        );
2243        // The _bytes key (8 MiB) takes priority over the entry count.
2244        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(8388608));
2245    }
2246
2247    #[test]
2248    fn interner_size_custom_entry_count() {
2249        let config = deser_config(r#"{"dogstatsd_string_interner_size": 8192}"#);
2250        // 8192 entries * 512 bytes = 4 MiB
2251        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(4));
2252    }
2253
2254    /// Asserts that two lists of ListenAddress are equivalent.
2255    fn address_list_eq(expected: &mut [ListenAddress], actual: &mut [ListenAddress]) -> Result<(), String> {
2256        if expected.len() != actual.len() {
2257            return Err(format!(
2258                "length mismatch: expected {} addresses, got {}",
2259                expected.len(),
2260                actual.len()
2261            ));
2262        }
2263
2264        expected.sort_by_key(|a| a.to_string());
2265        actual.sort_by_key(|a| a.to_string());
2266
2267        for (e, a) in expected.iter().zip(actual.iter()) {
2268            let (es, as_) = (e.to_string(), a.to_string());
2269            if es != as_ {
2270                return Err(format!("address mismatch: expected {}, got {}", es, as_));
2271            }
2272        }
2273
2274        Ok(())
2275    }
2276
2277    /// This test verifies that we didn't accidentally break the `build_addresses_no_listeners` helper function which
2278    /// would render all further tests useless.
2279    #[test]
2280    fn build_addresses_assertion_function_works() {
2281        let config = DogStatsDConfiguration {
2282            port: 0,
2283            tcp_port: 123,
2284            socket_path: None,
2285            socket_stream_path: None,
2286            non_local_traffic: false,
2287            ..Default::default()
2288        };
2289        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2290            // Close, but not quite! This is intentionally *not* 127.0.0.1 to test that the assertion will fail
2291            Ipv4Addr::new(127, 0, 0, 2),
2292            123,
2293        )))];
2294        let mut actual = config.build_addresses(None);
2295        assert!(address_list_eq(&mut expected, &mut actual).is_err())
2296    }
2297
2298    /// With all four listener gates off, `build_addresses` returns an empty Vec.
2299    #[test]
2300    fn build_addresses_no_listeners() {
2301        let config = DogStatsDConfiguration {
2302            port: 0,
2303            tcp_port: 0,
2304            socket_path: None,
2305            socket_stream_path: None,
2306            non_local_traffic: false,
2307            ..Default::default()
2308        };
2309        let mut expected = vec![];
2310        let mut actual = config.build_addresses(None);
2311        address_list_eq(&mut expected, &mut actual).unwrap();
2312    }
2313
2314    /// UDP port set, `non_local_traffic=false` -> UDP listener bound to `127.0.0.1`.
2315    #[test]
2316    fn build_addresses_udp_local_only() {
2317        let config = DogStatsDConfiguration {
2318            port: 8125,
2319            tcp_port: 0,
2320            socket_path: None,
2321            socket_stream_path: None,
2322            non_local_traffic: false,
2323            ..Default::default()
2324        };
2325        let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
2326            Ipv4Addr::new(127, 0, 0, 1),
2327            8125,
2328        )))];
2329        let mut actual = config.build_addresses(None);
2330        address_list_eq(&mut expected, &mut actual).unwrap();
2331    }
2332
2333    /// UDP port set, `non_local_traffic=true` -> UDP listener bound to `0.0.0.0`.
2334    #[test]
2335    fn build_addresses_udp_non_local_only() {
2336        let config = DogStatsDConfiguration {
2337            port: 8125,
2338            tcp_port: 0,
2339            socket_path: None,
2340            socket_stream_path: None,
2341            non_local_traffic: true,
2342            ..Default::default()
2343        };
2344        let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
2345            Ipv4Addr::new(0, 0, 0, 0),
2346            8125,
2347        )))];
2348        let mut actual = config.build_addresses(None);
2349        address_list_eq(&mut expected, &mut actual).unwrap();
2350    }
2351
2352    /// TCP port set, `non_local_traffic=false` -> TCP listener bound to `127.0.0.1`.
2353    #[test]
2354    fn build_addresses_tcp_local_only() {
2355        let config = DogStatsDConfiguration {
2356            port: 0,
2357            tcp_port: 9000,
2358            socket_path: None,
2359            socket_stream_path: None,
2360            non_local_traffic: false,
2361            ..Default::default()
2362        };
2363        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2364            Ipv4Addr::new(127, 0, 0, 1),
2365            9000,
2366        )))];
2367        let mut actual = config.build_addresses(None);
2368        address_list_eq(&mut expected, &mut actual).unwrap();
2369    }
2370
2371    /// TCP port set, `non_local_traffic=true` -> TCP listener bound to `0.0.0.0`.
2372    #[test]
2373    fn build_addresses_tcp_non_local_only() {
2374        let config = DogStatsDConfiguration {
2375            port: 0,
2376            tcp_port: 9000,
2377            socket_path: None,
2378            socket_stream_path: None,
2379            non_local_traffic: true,
2380            ..Default::default()
2381        };
2382        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
2383            Ipv4Addr::new(0, 0, 0, 0),
2384            9000,
2385        )))];
2386        let mut actual = config.build_addresses(None);
2387        address_list_eq(&mut expected, &mut actual).unwrap();
2388    }
2389
2390    /// `socket_path` set -> a `Unixgram` address is produced with that path.
2391    #[test]
2392    fn build_addresses_unixgram_only() {
2393        let config = DogStatsDConfiguration {
2394            port: 0,
2395            tcp_port: 0,
2396            socket_path: Some("/tmp/dsd.sock".to_string()),
2397            socket_stream_path: None,
2398            non_local_traffic: false,
2399            ..Default::default()
2400        };
2401        let mut expected = vec![ListenAddress::Unixgram("/tmp/dsd.sock".into())];
2402        let mut actual = config.build_addresses(None);
2403        address_list_eq(&mut expected, &mut actual).unwrap();
2404    }
2405
2406    /// `socket_stream_path` set -> a `Unix` (stream) address is produced with that path.
2407    #[test]
2408    fn build_addresses_unix_stream_only() {
2409        let config = DogStatsDConfiguration {
2410            port: 0,
2411            tcp_port: 0,
2412            socket_path: None,
2413            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2414            non_local_traffic: false,
2415            ..Default::default()
2416        };
2417        let mut expected = vec![ListenAddress::Unix("/tmp/dsd-stream.sock".into())];
2418        let mut actual = config.build_addresses(None);
2419        address_list_eq(&mut expected, &mut actual).unwrap();
2420    }
2421
2422    /// All four listener types enabled at once, with `non_local_traffic=true`.
2423    #[test]
2424    fn build_addresses_all_four_non_local() {
2425        let config = DogStatsDConfiguration {
2426            port: 8125,
2427            tcp_port: 9000,
2428            socket_path: Some("/tmp/dsd.sock".to_string()),
2429            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2430            non_local_traffic: true,
2431            ..Default::default()
2432        };
2433        let mut expected = vec![
2434            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
2435            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
2436            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2437            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2438        ];
2439        let mut actual = config.build_addresses(None);
2440        address_list_eq(&mut expected, &mut actual).unwrap();
2441    }
2442
2443    /// All four listener types enabled at once, with `non_local_traffic=false`.
2444    #[test]
2445    fn build_addresses_all_four_local() {
2446        let config = DogStatsDConfiguration {
2447            port: 8125,
2448            tcp_port: 9000,
2449            socket_path: Some("/tmp/dsd.sock".to_string()),
2450            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2451            non_local_traffic: false,
2452            ..Default::default()
2453        };
2454        let mut expected = vec![
2455            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8125))),
2456            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9000))),
2457            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2458            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2459        ];
2460        let mut actual = config.build_addresses(None);
2461        address_list_eq(&mut expected, &mut actual).unwrap();
2462    }
2463
2464    /// Passing `Some(ip)` to `build_addresses` with `non_local_traffic=false` -> both UDP and TCP
2465    /// bind to that IP. Includes a UDS datagram socket to confirm `bind_host` doesn't affect it.
2466    #[test]
2467    fn build_addresses_bind_host_applies_to_udp_and_tcp() {
2468        let config = DogStatsDConfiguration {
2469            port: 8125,
2470            tcp_port: 9000,
2471            socket_path: Some("/tmp/dsd.sock".to_string()),
2472            socket_stream_path: None,
2473            non_local_traffic: false,
2474            ..Default::default()
2475        };
2476        let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
2477        let mut expected = vec![
2478            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 8125))),
2479            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 9000))),
2480            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
2481        ];
2482        let mut actual = config.build_addresses(bind_host);
2483        address_list_eq(&mut expected, &mut actual).unwrap();
2484    }
2485
2486    /// Passing `Some(ip)` to `build_addresses` with `non_local_traffic=true` -> both UDP and TCP
2487    /// bind to `0.0.0.0`; the `bind_host` parameter is ignored (precedence matches the Agent).
2488    /// Includes a UDS stream socket to confirm `bind_host` doesn't affect it.
2489    #[test]
2490    fn build_addresses_non_local_clobbers_bind_host() {
2491        let config = DogStatsDConfiguration {
2492            port: 8125,
2493            tcp_port: 9000,
2494            socket_path: None,
2495            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
2496            non_local_traffic: true,
2497            ..Default::default()
2498        };
2499        let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
2500        let mut expected = vec![
2501            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
2502            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
2503            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
2504        ];
2505        let mut actual = config.build_addresses(bind_host);
2506        address_list_eq(&mut expected, &mut actual).unwrap();
2507    }
2508
2509    #[test]
2510    fn non_finite_metric_values_are_silently_dropped() {
2511        // The Datadog Agent sends NaN gauges (for example, encode_ms.avg computed as 0.0/0.0 in Go).
2512        // FloatIter skips non-finite values with a debug log, so decode_packet returns Ok with
2513        // num_points == 0. handle_frame then returns Ok(None) for zero-point packets, which is
2514        // the existing silent-drop path (no warning emitted).
2515        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
2516        for input in &[b"my.gauge:NaN|g" as &[u8], b"my.gauge:inf|g", b"my.gauge:-inf|g"] {
2517            match codec.decode_packet(input).expect("should decode without error") {
2518                ParsedPacket::Metric(packet) => assert_eq!(
2519                    packet.num_points, 0,
2520                    "non-finite value should be dropped, leaving 0 valid points"
2521                ),
2522                _ => panic!("expected Metric packet"),
2523            }
2524        }
2525    }
2526
2527    #[tokio::test]
2528    async fn fix_empty_capture_path_sets_path_from_run_path() {
2529        const RUN_PATH: &str = "/my/little/run_path";
2530
2531        let base_config_values = json!({ "run_path": RUN_PATH });
2532        let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2533
2534        let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2535
2536        let expected = PathBuf::from(RUN_PATH).join(DOGSTATSD_CAPTURE_DIR);
2537        assert_eq!(expected, dogstatsd_config.capture_path);
2538    }
2539
2540    #[tokio::test]
2541    async fn fix_empty_capture_path_keeps_explicit_path() {
2542        const RUN_PATH: &str = "/my/little/run_path";
2543        const CAPTURE_PATH: &str = "/custom/path/to/capture";
2544
2545        let base_config_values = json!({ "run_path": RUN_PATH, "dogstatsd_capture_path": CAPTURE_PATH });
2546        let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2547
2548        let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2549
2550        assert_eq!(PathBuf::from(CAPTURE_PATH), dogstatsd_config.capture_path);
2551    }
2552
2553    #[tokio::test]
2554    async fn from_configuration_normalizes_capture_depth() {
2555        let cases = [
2556            (json!({}), MIN_CAPTURE_DEPTH),
2557            (json!({ "dogstatsd_capture_depth": 0 }), MIN_CAPTURE_DEPTH),
2558            (json!({ "dogstatsd_capture_depth": 2048 }), 2048),
2559        ];
2560
2561        for (base_config_values, expected_depth) in cases {
2562            let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await;
2563            let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize");
2564
2565            assert_eq!(expected_depth, dogstatsd_config.capture_depth);
2566        }
2567    }
2568
2569    #[test]
2570    fn capture_entity_resolver_is_configured_separately_from_workload_provider() {
2571        let config =
2572            DogStatsDConfiguration::default().with_capture_entity_resolver(CaptureTestEntityResolver::default());
2573
2574        assert!(config.capture_entity_resolver.is_some());
2575        assert!(config.workload_provider.is_none());
2576    }
2577
2578    #[test]
2579    fn resolve_capture_container_id_uses_live_pid_mapping() {
2580        let capture_entity_resolver = CaptureTestEntityResolver::with_pid_mapping(
2581            42,
2582            EntityId::from_local_data("ci-pid-container").expect("container entity"),
2583        );
2584
2585        assert_eq!(
2586            resolve_capture_container_id(Some(&capture_entity_resolver), Some(42)),
2587            Some("container_id://pid-container".to_string())
2588        );
2589    }
2590
2591    #[test]
2592    fn build_capture_record_ignores_payload_local_data() {
2593        let record = super::build_capture_record(None, None, b"test.metric:1|c|c:ci-local-container\n");
2594
2595        assert_eq!(record.container_id, None);
2596        assert!(record.ancillary.is_empty());
2597    }
2598
2599    #[test]
2600    fn stream_capture_state_preserves_last_pid_without_new_creds() {
2601        let mut stream_capture = super::StreamCaptureState::new();
2602
2603        stream_capture.update_peer_metadata(&ConnectionAddress::ProcessLike(ProcessIdentity::Credentials(
2604            ProcessCredentials {
2605                pid: 42,
2606                uid: 0,
2607                gid: 0,
2608            },
2609        )));
2610        stream_capture.update_peer_metadata(&ConnectionAddress::ProcessLike(ProcessIdentity::Unavailable));
2611
2612        assert_eq!(stream_capture.last_pid, Some(42));
2613    }
2614
2615    #[test]
2616    fn apply_credentials_uses_live_pid_for_normal_packet() {
2617        let mut origin = RawOrigin::default();
2618        let creds = ProcessCredentials {
2619            pid: 12345,
2620            uid: 1000,
2621            gid: 1000,
2622        };
2623        super::apply_credentials_to_origin(&mut origin, &creds);
2624
2625        assert_eq!(origin.process_id(), Some(12345));
2626    }
2627
2628    #[test]
2629    fn apply_credentials_unpacks_captured_pid_when_replay_gid_present() {
2630        let mut origin = RawOrigin::default();
2631        let captured_pid: u32 = 99887766;
2632        let creds = ProcessCredentials {
2633            pid: 12345,        // our PID (irrelevant for replay)
2634            uid: captured_pid, // captured PID packed by the sender
2635            gid: super::REPLAY_CREDENTIALS_GID,
2636        };
2637        super::apply_credentials_to_origin(&mut origin, &creds);
2638
2639        assert_eq!(
2640            origin.process_id(),
2641            Some(super::origin::mark_replay_process_id(captured_pid))
2642        );
2643    }
2644}
2645
2646#[cfg(test)]
2647mod config_smoke {
2648    use datadog_agent_config_testing::config_registry::structs;
2649    use datadog_agent_config_testing::run_config_smoke_tests;
2650    use serde_json::json;
2651
2652    use super::DogStatsDConfiguration;
2653    use crate::config::{DatadogRemapper, KEY_ALIASES};
2654
2655    #[tokio::test]
2656    async fn smoke_test() {
2657        run_config_smoke_tests(
2658            structs::DOGSTATSD_CONFIGURATION,
2659            &[],
2660            json!({}),
2661            |cfg| {
2662                cfg.as_typed::<DogStatsDConfiguration>()
2663                    .expect("DogStatsDConfiguration should deserialize")
2664            },
2665            KEY_ALIASES,
2666            DatadogRemapper::new,
2667        )
2668        .await
2669    }
2670}