Skip to main content

saluki_components/sources/dogstatsd/
mod.rs

1use std::sync::{Arc, LazyLock};
2use std::time::{Duration, SystemTime, UNIX_EPOCH};
3
4use async_trait::async_trait;
5use bytes::{Buf, BufMut};
6use bytesize::ByteSize;
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
8use metrics::{Counter, Gauge, Histogram};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_context::{
12    tags::{RawTags, RawTagsFilter},
13    TagsResolver,
14};
15use saluki_core::data_model::event::metric::Metric;
16use saluki_core::data_model::event::{
17    eventd::EventD,
18    metric::{MetricMetadata, MetricOrigin},
19    service_check::ServiceCheck,
20    Event, EventType,
21};
22use saluki_core::{
23    components::{sources::*, ComponentContext},
24    observability::ComponentMetricsExt as _,
25    pooling::FixedSizeObjectPool,
26    topology::{
27        interconnect::EventBufferManager,
28        shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
29        EventsBuffer, OutputDefinition,
30    },
31};
32use saluki_env::WorkloadProvider;
33use saluki_error::{generic_error, ErrorContext as _, GenericError};
34use saluki_io::{
35    buf::{BytesBuffer, FixedSizeVec},
36    deser::{codec::dogstatsd::*, framing::FramerExt as _},
37    net::{
38        listener::{Listener, ListenerError},
39        ConnectionAddress, ListenAddress, Stream,
40    },
41};
42use saluki_metrics::MetricsBuilder;
43use serde::Deserialize;
44use serde_with::{serde_as, NoneAsEmptyString};
45use snafu::{ResultExt as _, Snafu};
46use stringtheory::MetaString;
47use tokio::{
48    select,
49    time::{interval, MissedTickBehavior},
50};
51use tracing::{debug, error, info, trace, warn};
52
53mod framer;
54use self::framer::{get_framer, DsdFramer};
55use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
56
57mod filters;
58use self::filters::EnablePayloadsFilter;
59
60mod io_buffer;
61use self::io_buffer::IoBufferManager;
62
63mod origin;
64use self::origin::{
65    origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, DogStatsDOriginTagResolver,
66    OriginEnrichmentConfiguration,
67};
68
69mod resolver;
70use self::resolver::ContextResolvers;
71
72mod tags;
73
74#[derive(Debug, Snafu)]
75#[snafu(context(suffix(false)))]
76enum Error {
77    #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
78    FailedToCreateListener {
79        listener_type: &'static str,
80        source: ListenerError,
81    },
82
83    #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
84    NoListenersConfigured,
85
86    #[snafu(display("Could not resolve bind_host '{}': {}", host, source))]
87    UnresolvableBindHost { host: String, source: std::io::Error },
88
89    #[snafu(display("bind_host '{}' resolved to zero IP addresses.", host))]
90    BindHostHasNoAddresses { host: String },
91}
92
93const fn default_buffer_size() -> usize {
94    8192
95}
96
97const fn default_buffer_count() -> usize {
98    128
99}
100
101const fn default_port() -> u16 {
102    8125
103}
104
105const fn default_tcp_port() -> u16 {
106    0
107}
108
109const fn default_socket_receive_buffer_size() -> usize {
110    0
111}
112
113const fn default_allow_context_heap_allocations() -> bool {
114    true
115}
116
117const fn default_no_aggregation_pipeline_support() -> bool {
118    true
119}
120
121const fn default_context_string_interner_entry_count() -> u64 {
122    4096
123}
124
125/// Baseline byte cost per interner entry, used to convert the Core Agent's entry-count-based
126/// `dogstatsd_string_interner_size` to a byte size.
127///
128/// 4096 entries × 512 bytes = 2 MiB, matching ADP's previous default.
129const INTERNER_BASELINE_BYTES_PER_ENTRY: u64 = 512;
130
131const fn default_cached_contexts_limit() -> usize {
132    500_000
133}
134
135const fn default_cached_tagsets_limit() -> usize {
136    500_000
137}
138
139const fn default_dogstatsd_permissive_decoding() -> bool {
140    true
141}
142
143const fn default_dogstatsd_minimum_sample_rate() -> f64 {
144    0.000000003845
145}
146
147/// Controls which payload types are forwarded to the backend.
148#[derive(Deserialize)]
149#[cfg_attr(test, derive(PartialEq, serde::Serialize))]
150pub struct EnablePayloadsConfiguration {
151    /// Whether or not to enable sending series (counter/gauge/rate) payloads.
152    ///
153    /// Defaults to `true`.
154    #[serde(default = "default_true")]
155    pub series: bool,
156
157    /// Whether or not to enable sending sketch (distribution) payloads.
158    ///
159    /// Defaults to `true`.
160    #[serde(default = "default_true")]
161    pub sketches: bool,
162
163    /// Whether or not to enable sending event payloads.
164    ///
165    /// Defaults to `true`.
166    #[serde(default = "default_true")]
167    pub events: bool,
168
169    /// Whether or not to enable sending service check payloads.
170    ///
171    /// Defaults to `true`.
172    #[serde(default = "default_true")]
173    pub service_checks: bool,
174}
175
176impl Default for EnablePayloadsConfiguration {
177    fn default() -> Self {
178        Self {
179            series: true,
180            sketches: true,
181            events: true,
182            service_checks: true,
183        }
184    }
185}
186
187const fn default_true() -> bool {
188    true
189}
190
191/// DogStatsD source.
192///
193/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
194#[serde_as]
195#[derive(Deserialize, Default)]
196#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
197#[cfg_attr(test, derive_where(PartialEq))]
198pub struct DogStatsDConfiguration {
199    /// The size of the buffer used to receive messages into, in bytes.
200    ///
201    /// Payloads cannot exceed this size, or they will be truncated, leading to discarded messages.
202    ///
203    /// Defaults to 8192 bytes.
204    #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
205    buffer_size: usize,
206
207    /// The number of message buffers to allocate overall.
208    ///
209    /// This represents the maximum number of message buffers available for processing incoming metrics, which loosely
210    /// correlates with how many messages can be received per second. The default value should be suitable for the
211    /// majority of workloads, but high-throughput workloads may consider increasing this value.
212    ///
213    /// Defaults to 128.
214    #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
215    buffer_count: usize,
216
217    /// The port to listen on in UDP mode.
218    ///
219    /// If set to `0`, UDP is not used.
220    ///
221    /// Defaults to 8125.
222    #[serde(rename = "dogstatsd_port", default = "default_port")]
223    port: u16,
224
225    /// The size of the DogStatsD UDP/UDS socket receive buffer, in bytes.
226    ///
227    /// If set to `0`, the OS default is used.
228    ///
229    /// Defaults to 0.
230    #[serde(rename = "dogstatsd_so_rcvbuf", default = "default_socket_receive_buffer_size")]
231    socket_receive_buffer_size: usize,
232
233    /// The port to listen on in TCP mode.
234    ///
235    /// If set to `0`, TCP is not used.
236    ///
237    /// Defaults to 0.
238    #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
239    tcp_port: u16,
240
241    /// The Unix domain socket path to listen on, in datagram mode.
242    ///
243    /// If not set, UDS (in datagram mode) is not used.
244    ///
245    /// Defaults to unset.
246    #[serde(rename = "dogstatsd_socket", default)]
247    #[serde_as(as = "NoneAsEmptyString")]
248    socket_path: Option<String>,
249
250    /// The Unix domain socket path to listen on, in stream mode.
251    ///
252    /// If not set, UDS (in stream mode) is not used.
253    ///
254    /// Defaults to unset.
255    #[serde(rename = "dogstatsd_stream_socket", default)]
256    #[serde_as(as = "NoneAsEmptyString")]
257    socket_stream_path: Option<String>,
258
259    /// The host address to bind DogStatsD UDP and TCP listeners to.
260    ///
261    /// When set, UDP and TCP listeners bind to this address. Accepts either an IP literal (e.g.
262    /// `192.168.1.50`, `::1`) or a hostname that resolves via DNS (e.g. `agent.internal`).
263    /// Ignored when `dogstatsd_non_local_traffic` is `true`.
264    ///
265    /// Defaults to unset, which binds to `127.0.0.1`.
266    #[serde(rename = "bind_host", default)]
267    #[serde_as(as = "NoneAsEmptyString")]
268    bind_host: Option<String>,
269
270    /// Whether or not to listen for non-local traffic in UDP mode.
271    ///
272    /// If set to `true`, the listener will accept packets from any interface/address. Otherwise, the source will only
273    /// listen on the address specified by `bind_host`, or `127.0.0.1` if `bind_host` is not set.
274    ///
275    /// Defaults to `false`.
276    #[serde(rename = "dogstatsd_non_local_traffic", default)]
277    non_local_traffic: bool,
278
279    /// Whether or not to allow heap allocations when resolving contexts.
280    ///
281    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
282    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
283    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
284    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
285    /// interned, the metric is skipped.
286    ///
287    /// Defaults to `true`.
288    #[serde(
289        rename = "dogstatsd_allow_context_heap_allocs",
290        default = "default_allow_context_heap_allocations"
291    )]
292    allow_context_heap_allocations: bool,
293
294    /// Whether or not to enable support for no-aggregation pipelines.
295    ///
296    /// When enabled, this influences how metrics are parsed, specifically around user-provided metric timestamps. When
297    /// metric timestamps are present, it is used as a signal to any aggregation transforms that the metric should not
298    /// be aggregated.
299    ///
300    /// Defaults to `true`.
301    #[serde(
302        rename = "dogstatsd_no_aggregation_pipeline",
303        default = "default_no_aggregation_pipeline_support"
304    )]
305    no_aggregation_pipeline_support: bool,
306
307    /// Number of entries for the string interner, as interpreted by the Core Datadog Agent.
308    ///
309    /// When `dogstatsd_string_interner_size_bytes` is not set, this value is multiplied by 512 bytes per entry to
310    /// derive the interner byte size. This provides backwards compatibility for customers migrating configurations
311    /// from the Core Agent, where this setting represents an entry count rather than a byte size.
312    ///
313    /// Defaults to 4096 entries, which yields 2 MiB when converted.
314    #[serde(
315        rename = "dogstatsd_string_interner_size",
316        default = "default_context_string_interner_entry_count"
317    )]
318    context_string_interner_entry_count: u64,
319
320    /// Total size of the string interner used for contexts, in bytes.
321    ///
322    /// When set, this takes priority over `dogstatsd_string_interner_size`. This controls the amount of memory that
323    /// can be used to intern metric names and tags. If the interner is full, metrics with contexts that have not
324    /// already been resolved may or may not be dropped, depending on the value of `allow_context_heap_allocations`.
325    #[serde(rename = "dogstatsd_string_interner_size_bytes", default)]
326    context_string_interner_size_bytes: Option<ByteSize>,
327
328    /// The maximum number of cached contexts to allow.
329    ///
330    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
331    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
332    /// and whether or not heap allocations are allowed.
333    ///
334    /// Defaults to 500,000.
335    #[serde(
336        rename = "dogstatsd_cached_contexts_limit",
337        default = "default_cached_contexts_limit"
338    )]
339    cached_contexts_limit: usize,
340
341    /// The maximum number of cached tagsets to allow.
342    ///
343    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
344    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
345    /// and whether or not heap allocations are allowed.
346    ///
347    /// Defaults to 500,000.
348    #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
349    cached_tagsets_limit: usize,
350
351    /// Whether or not to enable permissive mode in the decoder.
352    ///
353    /// Permissive mode allows the decoder to relax its strictness around the allowed payloads, which lets it match the
354    /// decoding behavior of the Datadog Agent.
355    ///
356    /// Defaults to `true`.
357    #[serde(
358        rename = "dogstatsd_permissive_decoding",
359        default = "default_dogstatsd_permissive_decoding"
360    )]
361    permissive_decoding: bool,
362
363    /// The minimum sample rate allowed for metrics.
364    ///
365    /// When metrics are sent with a sample rate _lower_ than this value then it will be clamped to this value. This is
366    /// done in order to ensure an upper bound on how many equivalent samples are tracked for the metric, as high sample
367    /// rates (very small numbers, such as `0.00000001`) can lead to large memory growth.
368    ///
369    /// A warning log will be emitted when clamping occurs, as this represents an effective loss of metric samples.
370    ///
371    /// Defaults to `0.000000003845`. (~260M samples)
372    #[serde(
373        rename = "dogstatsd_minimum_sample_rate",
374        default = "default_dogstatsd_minimum_sample_rate"
375    )]
376    minimum_sample_rate: f64,
377
378    /// Which payload types to forward to the backend.
379    #[serde(rename = "enable_payloads", default)]
380    enable_payloads: EnablePayloadsConfiguration,
381
382    /// Configuration related to origin detection and enrichment.
383    #[serde(flatten, default)]
384    origin_enrichment: OriginEnrichmentConfiguration,
385
386    /// Workload provider to utilize for origin detection/enrichment.
387    #[serde(skip)]
388    #[cfg_attr(test, derive_where(skip))]
389    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
390
391    /// Additional tags to add to all metrics.
392    #[serde(rename = "dogstatsd_tags", default)]
393    additional_tags: Vec<String>,
394}
395
396/// Resolves a `bind_host` string to an `IpAddr`.
397///
398/// Accepts either an IP literal (no DNS required) or a hostname (resolved via async DNS). Returns
399/// `UnresolvableBindHost` if the lookup fails, or `BindHostHasNoAddresses` if it succeeds but
400/// returns no addresses.
401async fn resolve_bind_host(host: &str) -> Result<std::net::IpAddr, Error> {
402    let mut addrs = tokio::net::lookup_host((host, 0u16))
403        .await
404        .context(UnresolvableBindHost { host: host.to_string() })?;
405    addrs
406        .next()
407        .map(|sa| sa.ip())
408        .ok_or_else(|| Error::BindHostHasNoAddresses { host: host.to_string() })
409}
410
411impl DogStatsDConfiguration {
412    /// Creates a new `DogStatsDConfiguration` from the given configuration.
413    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
414        Ok(config.as_typed()?)
415    }
416
417    /// Returns the effective string interner size in bytes.
418    ///
419    /// If `dogstatsd_string_interner_size_bytes` is set, it is used directly. Otherwise,
420    /// `dogstatsd_string_interner_size` (an entry count) is multiplied by 512 bytes per entry to derive the byte
421    /// size.
422    fn effective_context_string_interner_bytes(&self) -> ByteSize {
423        match self.context_string_interner_size_bytes {
424            Some(explicit_bytes) => explicit_bytes,
425            None => ByteSize::b(self.context_string_interner_entry_count * INTERNER_BASELINE_BYTES_PER_ENTRY),
426        }
427    }
428
429    /// Sets the workload provider to use for configuring origin detection/enrichment.
430    ///
431    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
432    ///
433    /// Defaults to unset.
434    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
435    where
436        W: WorkloadProvider + Send + Sync + 'static,
437    {
438        self.workload_provider = Some(Arc::new(workload_provider));
439        self
440    }
441
442    /// Using the current configuration, determines which listeners should be created and adds an address for each into
443    /// a `Vec<ListenAddress>`. This function has no side effects so that it can be unit tested whereas build_listeners`
444    /// actually binds the listeners on the system.
445    ///
446    /// `bind_host` is the pre-resolved IP that UDP and TCP listeners should bind to (provided by
447    /// `resolve_bind_host`). Precedence matches the Agent:
448    ///   - `non_local_traffic=true` → `0.0.0.0` (bind_host ignored)
449    ///   - `bind_host=Some(ip)`     → `ip`
450    ///   - `bind_host=None`         → `127.0.0.1`
451    fn build_addresses(&self, bind_host: Option<std::net::IpAddr>) -> Vec<ListenAddress> {
452        let bind_ip: std::net::IpAddr = if self.non_local_traffic {
453            [0, 0, 0, 0].into()
454        } else {
455            bind_host.unwrap_or_else(|| [127, 0, 0, 1].into())
456        };
457
458        let mut addresses: Vec<ListenAddress> = Vec::new();
459
460        if self.port != 0 {
461            addresses.push(ListenAddress::Udp(std::net::SocketAddr::new(bind_ip, self.port)));
462        }
463
464        if self.tcp_port != 0 {
465            addresses.push(ListenAddress::Tcp(std::net::SocketAddr::new(bind_ip, self.tcp_port)));
466        }
467
468        if let Some(socket_path) = &self.socket_path {
469            addresses.push(ListenAddress::Unixgram(socket_path.into()));
470        }
471
472        if let Some(socket_stream_path) = &self.socket_stream_path {
473            addresses.push(ListenAddress::Unix(socket_stream_path.into()));
474        }
475
476        addresses
477    }
478
479    /// Builds the appropriate `Listener` objects.
480    async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
481        // Resolve `bind_host` to an IP (via DNS if needed). Skip the lookup when
482        // `non_local_traffic=true` since `bind_host` is ignored in that branch — matches Go's
483        // laziness and avoids failing startup on an unresolvable hostname that wouldn't be used.
484        let bind_host: Option<std::net::IpAddr> = if self.non_local_traffic {
485            None
486        } else {
487            match &self.bind_host {
488                Some(host) => Some(resolve_bind_host(host).await?),
489                None => None,
490            }
491        };
492
493        let addresses = self.build_addresses(bind_host);
494        let mut listeners = Vec::new();
495        let socket_receive_buffer_size =
496            (self.socket_receive_buffer_size != 0).then_some(self.socket_receive_buffer_size);
497        for address in addresses {
498            let listener_type = address.listener_type();
499            let listener = Listener::from_listen_address(address)
500                .await
501                .context(FailedToCreateListener { listener_type })?
502                .with_receive_buffer_size(socket_receive_buffer_size);
503
504            listeners.push(listener);
505        }
506        Ok(listeners)
507    }
508}
509
510#[async_trait]
511impl SourceBuilder for DogStatsDConfiguration {
512    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
513        let listeners = self.build_listeners().await?;
514        if listeners.is_empty() {
515            return Err(Error::NoListenersConfigured.into());
516        }
517
518        // Every listener requires at least one I/O buffer to ensure that all listeners can be serviced without
519        // deadlocking any of the others.
520        if self.buffer_count < listeners.len() {
521            return Err(generic_error!(
522                "Must have a minimum of {} I/O buffers based on the number of listeners configured.",
523                listeners.len()
524            ));
525        }
526
527        let maybe_origin_tags_resolver = self
528            .workload_provider
529            .clone()
530            .map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
531        let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
532            .error_context("Failed to create context resolvers.")?;
533
534        let codec_config = DogStatsDCodecConfiguration::default()
535            .with_timestamps(self.no_aggregation_pipeline_support)
536            .with_permissive_mode(self.permissive_decoding)
537            .with_minimum_sample_rate(self.minimum_sample_rate)
538            .with_client_origin_detection(self.origin_enrichment.origin_detection_client);
539
540        let codec = DogStatsDCodec::from_configuration(codec_config);
541
542        let enable_payloads_filter = EnablePayloadsFilter::default()
543            .with_allow_series(self.enable_payloads.series)
544            .with_allow_sketches(self.enable_payloads.sketches)
545            .with_allow_events(self.enable_payloads.events)
546            .with_allow_service_checks(self.enable_payloads.service_checks);
547
548        Ok(Box::new(DogStatsD {
549            listeners,
550            io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
551                FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
552            }),
553            codec,
554            context_resolvers,
555            enabled_filter: enable_payloads_filter,
556            additional_tags: self.additional_tags.clone().into(),
557        }))
558    }
559
560    fn outputs(&self) -> &[OutputDefinition<EventType>] {
561        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
562            vec![
563                OutputDefinition::named_output("metrics", EventType::Metric),
564                OutputDefinition::named_output("events", EventType::EventD),
565                OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
566            ]
567        });
568        &OUTPUTS
569    }
570}
571
572impl MemoryBounds for DogStatsDConfiguration {
573    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
574        builder
575            .minimum()
576            // Capture the size of the heap allocation when the component is built.
577            .with_single_value::<DogStatsD>("source struct")
578            // We allocate our I/O buffers entirely up front.
579            .with_expr(UsageExpr::product(
580                "buffers",
581                UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
582                UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
583            ))
584            // We also allocate the backing storage for the string interner up front, which is used by our context
585            // resolver.
586            .with_expr(UsageExpr::config(
587                "dogstatsd_string_interner_size_bytes",
588                self.effective_context_string_interner_bytes().as_u64() as usize,
589            ));
590    }
591}
592
593/// DogStatsD source.
594pub struct DogStatsD {
595    listeners: Vec<Listener>,
596    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
597    codec: DogStatsDCodec,
598    context_resolvers: ContextResolvers,
599    enabled_filter: EnablePayloadsFilter,
600    additional_tags: Arc<[String]>,
601}
602
603struct ListenerContext {
604    shutdown_handle: DynamicShutdownHandle,
605    listener: Listener,
606    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
607    codec: DogStatsDCodec,
608    context_resolvers: ContextResolvers,
609    additional_tags: Arc<[String]>,
610}
611
612struct HandlerContext {
613    listen_addr: ListenAddress,
614    framer: DsdFramer,
615    codec: DogStatsDCodec,
616    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
617    metrics: Metrics,
618    context_resolvers: ContextResolvers,
619    additional_tags: Arc<[String]>,
620}
621
622struct Metrics {
623    metrics_received: Counter,
624    events_received: Counter,
625    service_checks_received: Counter,
626    bytes_received: Counter,
627    bytes_received_size: Histogram,
628    framing_errors: Counter,
629    metric_decoder_errors: Counter,
630    event_decoder_errors: Counter,
631    service_check_decoder_errors: Counter,
632    failed_context_resolve_total: Counter,
633    connections_active: Gauge,
634    packet_receive_success: Counter,
635    packet_receive_failure: Counter,
636}
637
638impl Metrics {
639    fn metrics_received(&self) -> &Counter {
640        &self.metrics_received
641    }
642
643    fn events_received(&self) -> &Counter {
644        &self.events_received
645    }
646
647    fn service_checks_received(&self) -> &Counter {
648        &self.service_checks_received
649    }
650
651    fn bytes_received(&self) -> &Counter {
652        &self.bytes_received
653    }
654
655    fn bytes_received_size(&self) -> &Histogram {
656        &self.bytes_received_size
657    }
658
659    fn framing_errors(&self) -> &Counter {
660        &self.framing_errors
661    }
662
663    fn metric_decode_failed(&self) -> &Counter {
664        &self.metric_decoder_errors
665    }
666
667    fn event_decode_failed(&self) -> &Counter {
668        &self.event_decoder_errors
669    }
670
671    fn service_check_decode_failed(&self) -> &Counter {
672        &self.service_check_decoder_errors
673    }
674
675    fn failed_context_resolve_total(&self) -> &Counter {
676        &self.failed_context_resolve_total
677    }
678
679    fn connections_active(&self) -> &Gauge {
680        &self.connections_active
681    }
682
683    fn packet_receive_success(&self) -> &Counter {
684        &self.packet_receive_success
685    }
686
687    fn packet_receive_failure(&self) -> &Counter {
688        &self.packet_receive_failure
689    }
690}
691
692fn build_metrics(listen_addr: &ListenAddress, component_context: &ComponentContext) -> Metrics {
693    let builder = MetricsBuilder::from_component_context(component_context);
694
695    let listener_type = match listen_addr {
696        ListenAddress::Tcp(_) => "tcp",
697        ListenAddress::Udp(_) => "udp",
698        ListenAddress::Unix(_) => "unix",
699        ListenAddress::Unixgram(_) => "unixgram",
700    };
701
702    Metrics {
703        metrics_received: builder.register_counter_with_tags(
704            "component_events_received_total",
705            [("message_type", "metrics"), ("listener_type", listener_type)],
706        ),
707        events_received: builder.register_counter_with_tags(
708            "component_events_received_total",
709            [("message_type", "events"), ("listener_type", listener_type)],
710        ),
711        service_checks_received: builder.register_counter_with_tags(
712            "component_events_received_total",
713            [("message_type", "service_checks"), ("listener_type", listener_type)],
714        ),
715        bytes_received: builder
716            .register_counter_with_tags("component_bytes_received_total", [("listener_type", listener_type)]),
717        bytes_received_size: builder
718            .register_trace_histogram_with_tags("component_bytes_received_size", [("listener_type", listener_type)]),
719        framing_errors: builder.register_counter_with_tags(
720            "component_errors_total",
721            [("listener_type", listener_type), ("error_type", "framing")],
722        ),
723        metric_decoder_errors: builder.register_counter_with_tags(
724            "component_errors_total",
725            [
726                ("listener_type", listener_type),
727                ("error_type", "decode"),
728                ("message_type", "metrics"),
729            ],
730        ),
731        event_decoder_errors: builder.register_counter_with_tags(
732            "component_errors_total",
733            [
734                ("listener_type", listener_type),
735                ("error_type", "decode"),
736                ("message_type", "events"),
737            ],
738        ),
739        service_check_decoder_errors: builder.register_counter_with_tags(
740            "component_errors_total",
741            [
742                ("listener_type", listener_type),
743                ("error_type", "decode"),
744                ("message_type", "service_checks"),
745            ],
746        ),
747        connections_active: builder
748            .register_gauge_with_tags("component_connections_active", [("listener_type", listener_type)]),
749        packet_receive_success: builder.register_debug_counter_with_tags(
750            "component_packets_received_total",
751            [("listener_type", listener_type), ("state", "ok")],
752        ),
753        packet_receive_failure: builder.register_debug_counter_with_tags(
754            "component_packets_received_total",
755            [("listener_type", listener_type), ("state", "error")],
756        ),
757        failed_context_resolve_total: builder.register_debug_counter("component_failed_context_resolve_total"),
758    }
759}
760
761#[async_trait]
762impl Source for DogStatsD {
763    async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
764        let mut global_shutdown = context.take_shutdown_handle();
765        let mut health = context.take_health_handle();
766
767        let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
768
769        // For each listener, spawn a dedicated task to run it.
770        for listener in self.listeners {
771            let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
772
773            // TODO: Create a health handle for each listener.
774            //
775            // We need to rework `HealthRegistry` to look a little more like `ComponentRegistry` so that we can have it
776            // already be scoped properly, otherwise all we can do here at present is either have a relative name, like
777            // `uds-stream`, or try and hardcode the full component name, which we will inevitably forget to update if
778            // we tweak the topology configuration, etc.
779            let listener_context = ListenerContext {
780                shutdown_handle: listener_shutdown_coordinator.register(),
781                listener,
782                io_buffer_pool: self.io_buffer_pool.clone(),
783                codec: self.codec.clone(),
784                context_resolvers: self.context_resolvers.clone(),
785                additional_tags: self.additional_tags.clone(),
786            };
787
788            spawn_traced_named(
789                task_name,
790                process_listener(context.clone(), listener_context, self.enabled_filter),
791            );
792        }
793
794        health.mark_ready();
795        debug!("DogStatsD source started.");
796
797        // Wait for the global shutdown signal, then notify listeners to shutdown.
798        //
799        // We also handle liveness here, which doesn't really matter for _this_ task, since the real work is happening
800        // in the listeners, but we need to satisfy the health checker.
801        loop {
802            select! {
803                _ = &mut global_shutdown => {
804                    debug!("Received shutdown signal.");
805                    break
806                },
807                _ = health.live() => continue,
808            }
809        }
810
811        debug!("Stopping DogStatsD source...");
812
813        listener_shutdown_coordinator.shutdown().await;
814
815        debug!("DogStatsD source stopped.");
816
817        Ok(())
818    }
819}
820
821async fn process_listener(
822    source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
823) {
824    let ListenerContext {
825        shutdown_handle,
826        mut listener,
827        io_buffer_pool,
828        codec,
829        context_resolvers,
830        additional_tags,
831    } = listener_context;
832    tokio::pin!(shutdown_handle);
833
834    let listen_addr = listener.listen_address().clone();
835    let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
836
837    info!(%listen_addr, "DogStatsD listener started.");
838
839    loop {
840        select! {
841            _ = &mut shutdown_handle => {
842                debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
843                break;
844            }
845            result = listener.accept() => match result {
846                Ok(stream) => {
847                    debug!(%listen_addr, "Spawning new stream handler.");
848
849                    let handler_context = HandlerContext {
850                        listen_addr: listen_addr.clone(),
851                        framer: get_framer(&listen_addr),
852                        codec: codec.clone(),
853                        io_buffer_pool: io_buffer_pool.clone(),
854                        metrics: build_metrics(&listen_addr, source_context.component_context()),
855                        context_resolvers: context_resolvers.clone(),
856                        additional_tags: additional_tags.clone(),
857                    };
858
859                    let task_name = format!("dogstatsd-stream-handler-{}", listen_addr.listener_type());
860                    spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
861                }
862                Err(e) => {
863                    error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
864                    break
865                }
866            }
867        }
868    }
869
870    stream_shutdown_coordinator.shutdown().await;
871
872    info!(%listen_addr, "DogStatsD listener stopped.");
873}
874
875async fn process_stream(
876    stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
877    shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
878) {
879    tokio::pin!(shutdown_handle);
880
881    select! {
882        _ = &mut shutdown_handle => {
883            debug!("Stream handler received shutdown signal.");
884        },
885        _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
886    }
887}
888
889async fn drive_stream(
890    mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
891    enabled_filter: EnablePayloadsFilter,
892) {
893    let HandlerContext {
894        listen_addr,
895        mut framer,
896        codec,
897        io_buffer_pool,
898        metrics,
899        mut context_resolvers,
900        additional_tags,
901    } = handler_context;
902
903    debug!(%listen_addr, "Stream handler started.");
904
905    if !stream.is_connectionless() {
906        metrics.connections_active().increment(1);
907    }
908
909    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
910    // we're otherwise idle and not receiving packets from the client.
911    let mut buffer_flush = interval(Duration::from_millis(100));
912    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
913
914    let mut event_buffer_manager = EventBufferManager::default();
915    let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
916    let memory_limiter = source_context.topology_context().memory_limiter();
917
918    'read: loop {
919        let mut eof = false;
920
921        let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
922
923        memory_limiter.wait_for_capacity().await;
924
925        select! {
926            // We read from the stream.
927            read_result = stream.receive(&mut io_buffer) => match read_result {
928                Ok((bytes_read, peer_addr)) => {
929                    if bytes_read == 0 {
930                        eof = true;
931                    }
932
933                    // TODO: This is correct for UDP and UDS in SOCK_DGRAM mode, but not for UDS in SOCK_STREAM mode...
934                    // because to match the Datadog Agent, we would only want to increment the number of successful
935                    // packets for each length-delimited frame, but this is obviously being incremented before we do any
936                    // framing... and even further, with the nested framer, we don't have access to the signal that
937                    // we've gotten a full length-delimited outer frame, only each individual newline-delimited inner
938                    // frame.
939                    //
940                    // As such, we'll potentially be over-reporting this metric for UDS in SOCK_STREAM mode compared to
941                    // the Datadog Agent.
942                    metrics.packet_receive_success().increment(1);
943                    metrics.bytes_received().increment(bytes_read as u64);
944                    metrics.bytes_received_size().record(bytes_read as f64);
945
946                    // When we're actually at EOF, or we're dealing with a connectionless stream, we try to decode in EOF mode.
947                    //
948                    // For connectionless streams, we always try to decode the buffer as if it's EOF, since it effectively _is_
949                    // always the end of file after a receive. For connection-oriented streams, we only want to do this once we've
950                    // actually hit true EOF.
951                    let reached_eof = eof || stream.is_connectionless();
952
953                    trace!(
954                        buffer_len = io_buffer.remaining(),
955                        buffer_cap = io_buffer.remaining_mut(),
956                        eof = reached_eof,
957                        %listen_addr,
958                        %peer_addr,
959                        "Received {} bytes from stream.",
960                        bytes_read
961                    );
962
963                    let mut frames = io_buffer.framed(&mut framer, reached_eof);
964                    'frame: loop {
965                        match frames.next() {
966                            Some(Ok(frame)) => {
967                                trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
968                                match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
969                                    Ok(Some(event)) => {
970                                        if let Some(event_buffer) = event_buffer_manager.try_push(event) {
971                                            debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
972                                            dispatch_events(event_buffer, &source_context, &listen_addr).await;
973                                        }
974                                    },
975                                    Ok(None) => {
976                                        // We didn't decode an event, but there was no inherent error. This is likely
977                                        // due to hitting resource limits, etc.
978                                        //
979                                        // Simply continue on.
980                                        continue
981                                    },
982                                    Err(e) => {
983                                        let frame_lossy_str = String::from_utf8_lossy(&frame);
984                                        warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
985                                    },
986                                }
987                            }
988                            Some(Err(e)) => {
989                                metrics.framing_errors().increment(1);
990
991                                if stream.is_connectionless() {
992                                    // For connectionless streams, we don't want to shutdown the stream since we can just keep
993                                    // reading more packets.
994                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
995                                    continue 'read;
996                                } else {
997                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
998                                    break 'read;
999                                }
1000                            }
1001                            None => {
1002                                trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
1003                                if eof && !stream.is_connectionless() {
1004                                    debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
1005                                    break 'read;
1006                                } else {
1007                                    break 'frame;
1008                                }
1009                            }
1010                        }
1011                    }
1012                },
1013                Err(e) => {
1014                    metrics.packet_receive_failure().increment(1);
1015
1016                    if stream.is_connectionless() {
1017                        // For connectionless streams, we don't want to shutdown the stream since we can just keep
1018                        // reading more packets.
1019                        warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
1020                        continue 'read;
1021                    } else {
1022                        warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
1023                        break 'read;
1024                    }
1025                }
1026            },
1027
1028            _ = buffer_flush.tick() => {
1029                if let Some(event_buffer) = event_buffer_manager.consume() {
1030                    dispatch_events(event_buffer, &source_context, &listen_addr).await;
1031                }
1032            },
1033        }
1034    }
1035
1036    if let Some(event_buffer) = event_buffer_manager.consume() {
1037        dispatch_events(event_buffer, &source_context, &listen_addr).await;
1038    }
1039
1040    metrics.connections_active().decrement(1);
1041
1042    debug!(%listen_addr, "Stream handler stopped.");
1043}
1044
1045fn handle_frame(
1046    frame: &[u8], codec: &DogStatsDCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
1047    peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
1048) -> Result<Option<Event>, ParseError> {
1049    let parsed = match codec.decode_packet(frame) {
1050        Ok(parsed) => parsed,
1051        Err(e) => {
1052            // Try and determine what the message type was, if possible, to increment the correct error counter.
1053            match parse_message_type(frame) {
1054                MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
1055                MessageType::Event => source_metrics.event_decode_failed().increment(1),
1056                MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
1057            }
1058
1059            return Err(e);
1060        }
1061    };
1062
1063    let event = match parsed {
1064        ParsedPacket::Metric(metric_packet) => {
1065            if metric_packet.num_points == 0 {
1066                return Ok(None);
1067            }
1068            let events_len = metric_packet.num_points;
1069            if !enabled_filter.allow_metric(&metric_packet) {
1070                trace!(
1071                    metric.name = metric_packet.metric_name,
1072                    "Skipping metric due to filter configuration."
1073                );
1074                return Ok(None);
1075            }
1076
1077            match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
1078                Some(metric) => {
1079                    source_metrics.metrics_received().increment(events_len);
1080                    Event::Metric(metric)
1081                }
1082                None => {
1083                    // We can only fail to get a metric back if we failed to resolve the context.
1084                    source_metrics.failed_context_resolve_total().increment(1);
1085                    return Ok(None);
1086                }
1087            }
1088        }
1089        ParsedPacket::Event(event) => {
1090            if !enabled_filter.allow_event(&event) {
1091                trace!("Skipping event {} due to filter configuration.", event.title);
1092                return Ok(None);
1093            }
1094            let tags_resolver = context_resolvers.tags();
1095            match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
1096                Some(event) => {
1097                    source_metrics.events_received().increment(1);
1098                    Event::EventD(event)
1099                }
1100                None => {
1101                    source_metrics.failed_context_resolve_total().increment(1);
1102                    return Ok(None);
1103                }
1104            }
1105        }
1106        ParsedPacket::ServiceCheck(service_check) => {
1107            if !enabled_filter.allow_service_check(&service_check) {
1108                trace!(
1109                    "Skipping service check {} due to filter configuration.",
1110                    service_check.name
1111                );
1112                return Ok(None);
1113            }
1114            let tags_resolver = context_resolvers.tags();
1115            match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
1116                Some(service_check) => {
1117                    source_metrics.service_checks_received().increment(1);
1118                    Event::ServiceCheck(service_check)
1119                }
1120                None => {
1121                    source_metrics.failed_context_resolve_total().increment(1);
1122                    return Ok(None);
1123                }
1124            }
1125        }
1126    };
1127
1128    Ok(Some(event))
1129}
1130
1131fn handle_metric_packet(
1132    packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1133    additional_tags: &[String],
1134) -> Option<Metric> {
1135    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1136
1137    let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1138    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1139        origin.set_process_id(creds.pid as u32);
1140    }
1141
1142    // Choose the right context resolver based on whether or not this metric is pre-aggregated.
1143    let context_resolver = if packet.timestamp.is_some() {
1144        context_resolvers.no_agg()
1145    } else {
1146        context_resolvers.primary()
1147    };
1148
1149    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1150
1151    // Try to resolve the context for this metric.
1152    match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1153        Some(context) => {
1154            let metric_origin = well_known_tags
1155                .jmx_check_name
1156                .map(MetricOrigin::jmx_check)
1157                .unwrap_or_else(MetricOrigin::dogstatsd);
1158            let metadata = MetricMetadata::default()
1159                .with_origin(metric_origin)
1160                .with_hostname(well_known_tags.hostname.map(Arc::from))
1161                .with_unit(packet.unit.map_or_else(MetaString::empty, MetaString::from_static));
1162
1163            Some(Metric::from_parts(context, packet.values, metadata))
1164        }
1165        // We failed to resolve the context, likely due to not having enough interner capacity.
1166        None => None,
1167    }
1168}
1169
1170fn handle_event_packet(
1171    packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1172) -> Option<EventD> {
1173    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1174
1175    let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1176    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1177        origin.set_process_id(creds.pid as u32);
1178    }
1179    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1180
1181    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1182    let tags = tags_resolver.create_tag_set(tags)?;
1183
1184    // When no d: field is present, backfill the current time — matching the stock Datadog Agent's
1185    // behavior in pkg/aggregator/aggregator.go (addEvent), which sets e.Ts = time.Now().Unix()
1186    // for any event with Ts == 0.
1187    let timestamp = packet
1188        .timestamp
1189        .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1190
1191    let eventd = EventD::new(packet.title, packet.text)
1192        .with_timestamp(timestamp)
1193        .with_hostname(packet.hostname.map(|s| s.into()))
1194        .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1195        .with_alert_type(packet.alert_type)
1196        .with_priority(packet.priority)
1197        // When no source type is provided, default to "api" — the same default the stock Datadog
1198        // Agent applies when serializing DogStatsD events to the intake JSON format. The agent
1199        // groups events by source type name and uses "api" as the key for events without an
1200        // explicit `s:` field. See: pkg/serializer/internal/metrics/events.go (writeItem).
1201        .with_source_type_name(Some(
1202            packet
1203                .source_type_name
1204                .map(|s| s.into())
1205                .unwrap_or_else(|| "api".into()),
1206        ))
1207        .with_alert_type(packet.alert_type)
1208        .with_tags(tags)
1209        .with_origin_tags(origin_tags);
1210
1211    Some(eventd)
1212}
1213
1214fn handle_service_check_packet(
1215    packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1216    additional_tags: &[String],
1217) -> Option<ServiceCheck> {
1218    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1219
1220    let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1221    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1222        origin.set_process_id(creds.pid as u32);
1223    }
1224    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1225
1226    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1227    let tags = tags_resolver.create_tag_set(tags)?;
1228
1229    // When no d: field is present, backfill the current time — matching the stock Datadog Agent's
1230    // behavior, which sets the timestamp to time.Now().Unix() for any service check with a zero
1231    // timestamp.
1232    let timestamp = packet
1233        .timestamp
1234        .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1235
1236    let service_check = ServiceCheck::new(packet.name, packet.status)
1237        .with_timestamp(timestamp)
1238        .with_hostname(packet.hostname.map(|s| s.into()))
1239        .with_tags(tags)
1240        .with_origin_tags(origin_tags)
1241        .with_message(packet.message.map(|s| s.into()));
1242
1243    Some(service_check)
1244}
1245
1246fn get_filtered_tags_iterator<'a>(
1247    raw_tags: RawTags<'a>, additional_tags: &'a [String],
1248) -> impl Iterator<Item = &'a str> + Clone {
1249    // This filters out "well-known" tags from the raw tags in the DogStatsD packet, and then chains on any additional tags
1250    // that were configured on the source.
1251    RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1252}
1253
1254async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1255    debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1256
1257    // TODO: This is maybe a little dicey because if we fail to dispatch the events, we may not have iterated over all of
1258    // them, so there might still be eventd events when get to the service checks point, and eventd events and/or service
1259    // check events when we get to the metrics point, and so on.
1260    //
1261    // There's probably something to be said for erroring out fully if this happens, since we should only fail to
1262    // dispatch if the downstream component fails entirely... and unless we have a way to restart the component, then
1263    // we're going to continue to fail to dispatch any more events until the process is restarted anyways.
1264
1265    // Dispatch any eventd events, if present.
1266    if event_buffer.has_event_type(EventType::EventD) {
1267        let eventd_events = event_buffer.extract(Event::is_eventd);
1268        if let Err(e) = source_context
1269            .dispatcher()
1270            .buffered_named("events")
1271            .expect("events output should always exist")
1272            .send_all(eventd_events)
1273            .await
1274        {
1275            error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1276        }
1277    }
1278
1279    // Dispatch any service check events, if present.
1280    if event_buffer.has_event_type(EventType::ServiceCheck) {
1281        let service_check_events = event_buffer.extract(Event::is_service_check);
1282        if let Err(e) = source_context
1283            .dispatcher()
1284            .buffered_named("service_checks")
1285            .expect("service checks output should always exist")
1286            .send_all(service_check_events)
1287            .await
1288        {
1289            error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1290        }
1291    }
1292
1293    // Finally, if there are events left, they'll be metrics, so dispatch them.
1294    if !event_buffer.is_empty() {
1295        if let Err(e) = source_context
1296            .dispatcher()
1297            .dispatch_named("metrics", event_buffer)
1298            .await
1299        {
1300            error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1301        }
1302    }
1303}
1304
1305const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1306    // This is a little goofy, but hear me out:
1307    //
1308    // In the Datadog Agent, the way the UDS listener works is that if it's in stream mode, it will do a standalone
1309    // socket read to get _just_ the length delimiter, which is 4 bytes. After that, it will do a read to get the packet
1310    // data itself, up to the limit of `dogstatsd_buffer_size`. This means that a _full_ UDS stream packet can be up to
1311    // `dogstatsd_buffer_size + 4` bytes.
1312    //
1313    // This isn't a problem in the Agent due to how it does the reads, but it's a problem for us because we want to be
1314    // able to get an entire frame in a single buffer for the purpose of decoding the frame. Rather than rewriting our
1315    // read loop such that we have to change the logic depending on UDP/UDS datagram vs UDS stream, we simply increase
1316    // the buffer size by 4 bytes to account for the length delimiter.
1317    //
1318    // We do it this way so that we don't have to change the buffer size in the configuration, since if you just ported
1319    // over a Datadog Agent configuration, the value would be too small, and vise versa.
1320    buffer_size + 4
1321}
1322
1323#[cfg(test)]
1324mod tests {
1325    use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
1326
1327    use bytesize::ByteSize;
1328    use saluki_context::{ContextResolverBuilder, TagsResolverBuilder};
1329    use saluki_io::net::ListenAddress;
1330    use saluki_io::{
1331        deser::codec::dogstatsd::{DogStatsDCodec, DogStatsDCodecConfiguration, ParsedPacket},
1332        net::ConnectionAddress,
1333    };
1334
1335    use super::{handle_metric_packet, ContextResolvers, DogStatsDConfiguration};
1336
1337    #[test]
1338    fn no_metrics_when_interner_full_allocations_disallowed() {
1339        // We're specifically testing here that when we don't allow outside allocations, we should not be able to
1340        // resolve a context if the interner is full. A no-op interner has the smallest possible size, so that's going
1341        // to assure we can't intern anything... but we also need a string (name or one of the tags) that can't be
1342        // _inlined_ either, since that will get around the interner being full.
1343        //
1344        // We set our metric name to be longer than 31 bytes (the inlining limit) to ensure this.
1345
1346        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1347        let tags_resolver = TagsResolverBuilder::for_tests().build();
1348        let context_resolver = ContextResolverBuilder::for_tests()
1349            .with_heap_allocations(false)
1350            .with_tags_resolver(Some(tags_resolver.clone()))
1351            .build();
1352        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1353        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1354
1355        let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1356
1357        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1358            panic!("Failed to parse packet.");
1359        };
1360
1361        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1362        assert!(maybe_metric.is_none());
1363    }
1364
1365    #[test]
1366    fn metric_with_additional_tags() {
1367        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1368        let tags_resolver = TagsResolverBuilder::for_tests().build();
1369        let context_resolver = ContextResolverBuilder::for_tests()
1370            .with_heap_allocations(false)
1371            .with_tags_resolver(Some(tags_resolver.clone()))
1372            .build();
1373        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1374        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1375
1376        let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1377        let existing_tags_str = existing_tags.join(",");
1378
1379        let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1380        let additional_tags = [
1381            "tag4:value4".to_string(),
1382            "tag5:value5".to_string(),
1383            "tag6:value6".to_string(),
1384        ];
1385
1386        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1387            panic!("Failed to parse packet.");
1388        };
1389        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1390        assert!(maybe_metric.is_some());
1391
1392        let metric = maybe_metric.unwrap();
1393        let context = metric.context();
1394
1395        for tag in existing_tags {
1396            assert!(context.tags().has_tag(tag));
1397        }
1398
1399        for tag in additional_tags {
1400            assert!(context.tags().has_tag(tag));
1401        }
1402    }
1403
1404    fn deser_config(json: &str) -> DogStatsDConfiguration {
1405        serde_json::from_str(json).expect("failed to deserialize config")
1406    }
1407
1408    #[test]
1409    fn interner_size_defaults_to_2mib() {
1410        let config = deser_config("{}");
1411        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
1412    }
1413
1414    #[test]
1415    fn socket_receive_buffer_size_defaults_to_zero() {
1416        let config = deser_config("{}");
1417        assert_eq!(config.socket_receive_buffer_size, 0);
1418    }
1419
1420    #[test]
1421    fn socket_receive_buffer_size_from_config() {
1422        let config = deser_config(r#"{"dogstatsd_so_rcvbuf": 131072}"#);
1423        assert_eq!(config.socket_receive_buffer_size, 131_072);
1424    }
1425
1426    #[test]
1427    fn interner_size_from_entry_count() {
1428        // A Core Agent migration config with entry count 4096 should yield 2 MiB, not 4096 bytes.
1429        let config = deser_config(r#"{"dogstatsd_string_interner_size": 4096}"#);
1430        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
1431    }
1432
1433    #[test]
1434    fn interner_size_from_explicit_bytes() {
1435        let config = deser_config(r#"{"dogstatsd_string_interner_size_bytes": 4194304}"#);
1436        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(4194304));
1437    }
1438
1439    #[test]
1440    fn interner_size_explicit_bytes_takes_priority() {
1441        let config = deser_config(
1442            r#"{"dogstatsd_string_interner_size": 4096, "dogstatsd_string_interner_size_bytes": 8388608}"#,
1443        );
1444        // The _bytes key (8 MiB) takes priority over the entry count.
1445        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(8388608));
1446    }
1447
1448    #[test]
1449    fn interner_size_custom_entry_count() {
1450        let config = deser_config(r#"{"dogstatsd_string_interner_size": 8192}"#);
1451        // 8192 entries * 512 bytes = 4 MiB
1452        assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(4));
1453    }
1454
1455    /// Asserts that two lists of ListenAddress are equivalent.
1456    fn address_list_eq(expected: &mut [ListenAddress], actual: &mut [ListenAddress]) -> Result<(), String> {
1457        if expected.len() != actual.len() {
1458            return Err(format!(
1459                "length mismatch: expected {} addresses, got {}",
1460                expected.len(),
1461                actual.len()
1462            ));
1463        }
1464
1465        expected.sort_by_key(|a| a.to_string());
1466        actual.sort_by_key(|a| a.to_string());
1467
1468        for (e, a) in expected.iter().zip(actual.iter()) {
1469            let (es, as_) = (e.to_string(), a.to_string());
1470            if es != as_ {
1471                return Err(format!("address mismatch: expected {}, got {}", es, as_));
1472            }
1473        }
1474
1475        Ok(())
1476    }
1477
1478    /// This test verifies that we didn't accidentally break the `build_addresses_no_listeners` helper function which
1479    /// would render all further tests useless.
1480    #[test]
1481    fn build_addresses_assertion_function_works() {
1482        let config = DogStatsDConfiguration {
1483            port: 0,
1484            tcp_port: 123,
1485            socket_path: None,
1486            socket_stream_path: None,
1487            non_local_traffic: false,
1488            ..Default::default()
1489        };
1490        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
1491            // Close, but not quite! This is intentionally *not* 127.0.0.1 to test that the assertion will fail
1492            Ipv4Addr::new(127, 0, 0, 2),
1493            123,
1494        )))];
1495        let mut actual = config.build_addresses(None);
1496        assert!(address_list_eq(&mut expected, &mut actual).is_err())
1497    }
1498
1499    /// With all four listener gates off, `build_addresses` returns an empty Vec.
1500    #[test]
1501    fn build_addresses_no_listeners() {
1502        let config = DogStatsDConfiguration {
1503            port: 0,
1504            tcp_port: 0,
1505            socket_path: None,
1506            socket_stream_path: None,
1507            non_local_traffic: false,
1508            ..Default::default()
1509        };
1510        let mut expected = vec![];
1511        let mut actual = config.build_addresses(None);
1512        address_list_eq(&mut expected, &mut actual).unwrap();
1513    }
1514
1515    /// UDP port set, `non_local_traffic=false` -> UDP listener bound to `127.0.0.1`.
1516    #[test]
1517    fn build_addresses_udp_local_only() {
1518        let config = DogStatsDConfiguration {
1519            port: 8125,
1520            tcp_port: 0,
1521            socket_path: None,
1522            socket_stream_path: None,
1523            non_local_traffic: false,
1524            ..Default::default()
1525        };
1526        let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
1527            Ipv4Addr::new(127, 0, 0, 1),
1528            8125,
1529        )))];
1530        let mut actual = config.build_addresses(None);
1531        address_list_eq(&mut expected, &mut actual).unwrap();
1532    }
1533
1534    /// UDP port set, `non_local_traffic=true` -> UDP listener bound to `0.0.0.0`.
1535    #[test]
1536    fn build_addresses_udp_non_local_only() {
1537        let config = DogStatsDConfiguration {
1538            port: 8125,
1539            tcp_port: 0,
1540            socket_path: None,
1541            socket_stream_path: None,
1542            non_local_traffic: true,
1543            ..Default::default()
1544        };
1545        let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
1546            Ipv4Addr::new(0, 0, 0, 0),
1547            8125,
1548        )))];
1549        let mut actual = config.build_addresses(None);
1550        address_list_eq(&mut expected, &mut actual).unwrap();
1551    }
1552
1553    /// TCP port set, `non_local_traffic=false` -> TCP listener bound to `127.0.0.1`.
1554    #[test]
1555    fn build_addresses_tcp_local_only() {
1556        let config = DogStatsDConfiguration {
1557            port: 0,
1558            tcp_port: 9000,
1559            socket_path: None,
1560            socket_stream_path: None,
1561            non_local_traffic: false,
1562            ..Default::default()
1563        };
1564        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
1565            Ipv4Addr::new(127, 0, 0, 1),
1566            9000,
1567        )))];
1568        let mut actual = config.build_addresses(None);
1569        address_list_eq(&mut expected, &mut actual).unwrap();
1570    }
1571
1572    /// TCP port set, `non_local_traffic=true` -> TCP listener bound to `0.0.0.0`.
1573    #[test]
1574    fn build_addresses_tcp_non_local_only() {
1575        let config = DogStatsDConfiguration {
1576            port: 0,
1577            tcp_port: 9000,
1578            socket_path: None,
1579            socket_stream_path: None,
1580            non_local_traffic: true,
1581            ..Default::default()
1582        };
1583        let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
1584            Ipv4Addr::new(0, 0, 0, 0),
1585            9000,
1586        )))];
1587        let mut actual = config.build_addresses(None);
1588        address_list_eq(&mut expected, &mut actual).unwrap();
1589    }
1590
1591    /// `socket_path` set -> a `Unixgram` address is produced with that path.
1592    #[test]
1593    fn build_addresses_unixgram_only() {
1594        let config = DogStatsDConfiguration {
1595            port: 0,
1596            tcp_port: 0,
1597            socket_path: Some("/tmp/dsd.sock".to_string()),
1598            socket_stream_path: None,
1599            non_local_traffic: false,
1600            ..Default::default()
1601        };
1602        let mut expected = vec![ListenAddress::Unixgram("/tmp/dsd.sock".into())];
1603        let mut actual = config.build_addresses(None);
1604        address_list_eq(&mut expected, &mut actual).unwrap();
1605    }
1606
1607    /// `socket_stream_path` set -> a `Unix` (stream) address is produced with that path.
1608    #[test]
1609    fn build_addresses_unix_stream_only() {
1610        let config = DogStatsDConfiguration {
1611            port: 0,
1612            tcp_port: 0,
1613            socket_path: None,
1614            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
1615            non_local_traffic: false,
1616            ..Default::default()
1617        };
1618        let mut expected = vec![ListenAddress::Unix("/tmp/dsd-stream.sock".into())];
1619        let mut actual = config.build_addresses(None);
1620        address_list_eq(&mut expected, &mut actual).unwrap();
1621    }
1622
1623    /// All four listener types enabled at once, with `non_local_traffic=true`.
1624    #[test]
1625    fn build_addresses_all_four_non_local() {
1626        let config = DogStatsDConfiguration {
1627            port: 8125,
1628            tcp_port: 9000,
1629            socket_path: Some("/tmp/dsd.sock".to_string()),
1630            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
1631            non_local_traffic: true,
1632            ..Default::default()
1633        };
1634        let mut expected = vec![
1635            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
1636            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
1637            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
1638            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
1639        ];
1640        let mut actual = config.build_addresses(None);
1641        address_list_eq(&mut expected, &mut actual).unwrap();
1642    }
1643
1644    /// All four listener types enabled at once, with `non_local_traffic=false`.
1645    #[test]
1646    fn build_addresses_all_four_local() {
1647        let config = DogStatsDConfiguration {
1648            port: 8125,
1649            tcp_port: 9000,
1650            socket_path: Some("/tmp/dsd.sock".to_string()),
1651            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
1652            non_local_traffic: false,
1653            ..Default::default()
1654        };
1655        let mut expected = vec![
1656            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8125))),
1657            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9000))),
1658            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
1659            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
1660        ];
1661        let mut actual = config.build_addresses(None);
1662        address_list_eq(&mut expected, &mut actual).unwrap();
1663    }
1664
1665    /// Passing `Some(ip)` to `build_addresses` with `non_local_traffic=false` -> both UDP and TCP
1666    /// bind to that IP. Includes a UDS datagram socket to confirm `bind_host` doesn't affect it.
1667    #[test]
1668    fn build_addresses_bind_host_applies_to_udp_and_tcp() {
1669        let config = DogStatsDConfiguration {
1670            port: 8125,
1671            tcp_port: 9000,
1672            socket_path: Some("/tmp/dsd.sock".to_string()),
1673            socket_stream_path: None,
1674            non_local_traffic: false,
1675            ..Default::default()
1676        };
1677        let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
1678        let mut expected = vec![
1679            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 8125))),
1680            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 9000))),
1681            ListenAddress::Unixgram("/tmp/dsd.sock".into()),
1682        ];
1683        let mut actual = config.build_addresses(bind_host);
1684        address_list_eq(&mut expected, &mut actual).unwrap();
1685    }
1686
1687    /// Passing `Some(ip)` to `build_addresses` with `non_local_traffic=true` -> both UDP and TCP
1688    /// bind to `0.0.0.0`; the bind_host parameter is ignored (precedence matches the Agent).
1689    /// Includes a UDS stream socket to confirm `bind_host` doesn't affect it.
1690    #[test]
1691    fn build_addresses_non_local_clobbers_bind_host() {
1692        let config = DogStatsDConfiguration {
1693            port: 8125,
1694            tcp_port: 9000,
1695            socket_path: None,
1696            socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
1697            non_local_traffic: true,
1698            ..Default::default()
1699        };
1700        let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
1701        let mut expected = vec![
1702            ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
1703            ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
1704            ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
1705        ];
1706        let mut actual = config.build_addresses(bind_host);
1707        address_list_eq(&mut expected, &mut actual).unwrap();
1708    }
1709
1710    #[test]
1711    fn non_finite_metric_values_are_silently_dropped() {
1712        // The Datadog Agent sends NaN gauges (e.g. encode_ms.avg computed as 0.0/0.0 in Go).
1713        // FloatIter skips non-finite values with a debug log, so decode_packet returns Ok with
1714        // num_points == 0. handle_frame then returns Ok(None) for zero-point packets, which is
1715        // the existing silent-drop path (no warning emitted).
1716        let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1717        for input in &[b"my.gauge:NaN|g" as &[u8], b"my.gauge:inf|g", b"my.gauge:-inf|g"] {
1718            match codec.decode_packet(input).expect("should decode without error") {
1719                ParsedPacket::Metric(packet) => assert_eq!(
1720                    packet.num_points, 0,
1721                    "non-finite value should be dropped, leaving 0 valid points"
1722                ),
1723                _ => panic!("expected Metric packet"),
1724            }
1725        }
1726    }
1727}
1728
1729#[cfg(test)]
1730mod config_smoke {
1731    use serde_json::json;
1732
1733    use super::DogStatsDConfiguration;
1734    use crate::config_registry::structs;
1735    use crate::config_registry::test_support::run_config_smoke_tests;
1736
1737    #[tokio::test]
1738    async fn smoke_test() {
1739        run_config_smoke_tests(structs::DOGSTATSD_CONFIGURATION, &[], json!({}), |cfg| {
1740            cfg.as_typed::<DogStatsDConfiguration>()
1741                .expect("DogStatsDConfiguration should deserialize")
1742        })
1743        .await
1744    }
1745}