saluki_components/sources/dogstatsd/
mod.rs

1use std::sync::{Arc, LazyLock};
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::{Buf, BufMut};
6use bytesize::ByteSize;
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
8use metrics::{Counter, Gauge, Histogram};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_context::{
12    tags::{RawTags, RawTagsFilter},
13    TagsResolver,
14};
15use saluki_core::data_model::event::metric::Metric;
16use saluki_core::data_model::event::{
17    eventd::EventD,
18    metric::{MetricMetadata, MetricOrigin},
19    service_check::ServiceCheck,
20    Event, EventType,
21};
22use saluki_core::{
23    components::{sources::*, ComponentContext},
24    observability::ComponentMetricsExt as _,
25    pooling::FixedSizeObjectPool,
26    topology::{
27        interconnect::EventBufferManager,
28        shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
29        EventsBuffer, OutputDefinition,
30    },
31};
32use saluki_env::WorkloadProvider;
33use saluki_error::{generic_error, ErrorContext as _, GenericError};
34use saluki_io::{
35    buf::{BytesBuffer, FixedSizeVec},
36    deser::{codec::dogstatsd::*, framing::FramerExt as _},
37    net::{
38        listener::{Listener, ListenerError},
39        ConnectionAddress, ListenAddress, Stream,
40    },
41};
42use saluki_metrics::MetricsBuilder;
43use serde::Deserialize;
44use serde_with::{serde_as, NoneAsEmptyString};
45use snafu::{ResultExt as _, Snafu};
46use tokio::{
47    select,
48    time::{interval, MissedTickBehavior},
49};
50use tracing::{debug, error, info, trace, warn};
51
52mod framer;
53use self::framer::{get_framer, DsdFramer};
54use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
55
56mod filters;
57use self::filters::EnablePayloadsFilter;
58
59mod io_buffer;
60use self::io_buffer::IoBufferManager;
61
62mod origin;
63use self::origin::{
64    origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, DogStatsDOriginTagResolver,
65    OriginEnrichmentConfiguration,
66};
67
68mod resolver;
69use self::resolver::ContextResolvers;
70
71mod tags;
72
73#[derive(Debug, Snafu)]
74#[snafu(context(suffix(false)))]
75enum Error {
76    #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
77    FailedToCreateListener {
78        listener_type: &'static str,
79        source: ListenerError,
80    },
81
82    #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
83    NoListenersConfigured,
84}
85
86const fn default_buffer_size() -> usize {
87    8192
88}
89
90const fn default_buffer_count() -> usize {
91    128
92}
93
94const fn default_port() -> u16 {
95    8125
96}
97
98const fn default_tcp_port() -> u16 {
99    0
100}
101
102const fn default_allow_context_heap_allocations() -> bool {
103    true
104}
105
106const fn default_no_aggregation_pipeline_support() -> bool {
107    true
108}
109
110const fn default_context_string_interner_size() -> ByteSize {
111    ByteSize::mib(2)
112}
113
114const fn default_cached_contexts_limit() -> usize {
115    500_000
116}
117
118const fn default_cached_tagsets_limit() -> usize {
119    500_000
120}
121
122const fn default_dogstatsd_permissive_decoding() -> bool {
123    true
124}
125
126const fn default_dogstatsd_minimum_sample_rate() -> f64 {
127    0.000000003845
128}
129
130const fn default_enable_payloads_series() -> bool {
131    true
132}
133
134const fn default_enable_payloads_sketches() -> bool {
135    true
136}
137
138const fn default_enable_payloads_events() -> bool {
139    true
140}
141
142const fn default_enable_payloads_service_checks() -> bool {
143    true
144}
145
146/// DogStatsD source.
147///
148/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
149#[serde_as]
150#[derive(Deserialize)]
151pub struct DogStatsDConfiguration {
152    /// The size of the buffer used to receive messages into, in bytes.
153    ///
154    /// Payloads cannot exceed this size, or they will be truncated, leading to discarded messages.
155    ///
156    /// Defaults to 8192 bytes.
157    #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
158    buffer_size: usize,
159
160    /// The number of message buffers to allocate overall.
161    ///
162    /// This represents the maximum number of message buffers available for processing incoming metrics, which loosely
163    /// correlates with how many messages can be received per second. The default value should be suitable for the
164    /// majority of workloads, but high-throughput workloads may consider increasing this value.
165    ///
166    /// Defaults to 128.
167    #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
168    buffer_count: usize,
169
170    /// The port to listen on in UDP mode.
171    ///
172    /// If set to `0`, UDP is not used.
173    ///
174    /// Defaults to 8125.
175    #[serde(rename = "dogstatsd_port", default = "default_port")]
176    port: u16,
177
178    /// The port to listen on in TCP mode.
179    ///
180    /// If set to `0`, TCP is not used.
181    ///
182    /// Defaults to 0.
183    #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
184    tcp_port: u16,
185
186    /// The Unix domain socket path to listen on, in datagram mode.
187    ///
188    /// If not set, UDS (in datagram mode) is not used.
189    ///
190    /// Defaults to unset.
191    #[serde(rename = "dogstatsd_socket", default)]
192    #[serde_as(as = "NoneAsEmptyString")]
193    socket_path: Option<String>,
194
195    /// The Unix domain socket path to listen on, in stream mode.
196    ///
197    /// If not set, UDS (in stream mode) is not used.
198    ///
199    /// Defaults to unset.
200    #[serde(rename = "dogstatsd_stream_socket", default)]
201    #[serde_as(as = "NoneAsEmptyString")]
202    socket_stream_path: Option<String>,
203
204    /// Whether or not to listen for non-local traffic in UDP mode.
205    ///
206    /// If set to `true`, the listener will accept packets from any interface/address. Otherwise, the source will only
207    /// listen on `localhost`.
208    ///
209    /// Defaults to `false`.
210    #[serde(rename = "dogstatsd_non_local_traffic", default)]
211    non_local_traffic: bool,
212
213    /// Whether or not to allow heap allocations when resolving contexts.
214    ///
215    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
216    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
217    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
218    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
219    /// interned, the metric is skipped.
220    ///
221    /// Defaults to `true`.
222    #[serde(
223        rename = "dogstatsd_allow_context_heap_allocs",
224        default = "default_allow_context_heap_allocations"
225    )]
226    allow_context_heap_allocations: bool,
227
228    /// Whether or not to enable support for no-aggregation pipelines.
229    ///
230    /// When enabled, this influences how metrics are parsed, specifically around user-provided metric timestamps. When
231    /// metric timestamps are present, it is used as a signal to any aggregation transforms that the metric should not
232    /// be aggregated.
233    ///
234    /// Defaults to `true`.
235    #[serde(
236        rename = "dogstatsd_no_aggregation_pipeline",
237        default = "default_no_aggregation_pipeline_support"
238    )]
239    no_aggregation_pipeline_support: bool,
240
241    /// Total size of the string interner used for contexts.
242    ///
243    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
244    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
245    /// `allow_context_heap_allocations`.
246    #[serde(
247        rename = "dogstatsd_string_interner_size",
248        default = "default_context_string_interner_size"
249    )]
250    context_string_interner_bytes: ByteSize,
251
252    /// The maximum number of cached contexts to allow.
253    ///
254    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
255    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
256    /// and whether or not heap allocations are allowed.
257    ///
258    /// Defaults to 500,000.
259    #[serde(
260        rename = "dogstatsd_cached_contexts_limit",
261        default = "default_cached_contexts_limit"
262    )]
263    cached_contexts_limit: usize,
264
265    /// The maximum number of cached tagsets to allow.
266    ///
267    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
268    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
269    /// and whether or not heap allocations are allowed.
270    ///
271    /// Defaults to 500,000.
272    #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
273    cached_tagsets_limit: usize,
274
275    /// Whether or not to enable permissive mode in the decoder.
276    ///
277    /// Permissive mode allows the decoder to relax its strictness around the allowed payloads, which lets it match the
278    /// decoding behavior of the Datadog Agent.
279    ///
280    /// Defaults to `true`.
281    #[serde(
282        rename = "dogstatsd_permissive_decoding",
283        default = "default_dogstatsd_permissive_decoding"
284    )]
285    permissive_decoding: bool,
286
287    /// The minimum sample rate allowed for metrics.
288    ///
289    /// When metrics are sent with a sample rate _lower_ than this value then it will be clamped to this value. This is
290    /// done in order to ensure an upper bound on how many equivalent samples are tracked for the metric, as high sample
291    /// rates (very small numbers, such as `0.00000001`) can lead to large memory growth.
292    ///
293    /// A warning log will be emitted when clamping occurs, as this represents an effective loss of metric samples.
294    ///
295    /// Defaults to `0.000000003845`. (~260M samples)
296    #[serde(
297        rename = "dogstatsd_minimum_sample_rate",
298        default = "default_dogstatsd_minimum_sample_rate"
299    )]
300    minimum_sample_rate: f64,
301
302    /// Whether or not to enable sending serie payloads.
303    ///
304    /// Defaults to `true`.
305    #[serde(default = "default_enable_payloads_series")]
306    enable_payloads_series: bool,
307
308    /// Whether or not to enable sending sketch payloads.
309    ///
310    /// Defaults to `true`.
311    #[serde(default = "default_enable_payloads_sketches")]
312    enable_payloads_sketches: bool,
313
314    /// Whether or not to enable sending event payloads.
315    ///
316    /// Defaults to `true`.
317    #[serde(default = "default_enable_payloads_events")]
318    enable_payloads_events: bool,
319
320    /// Whether or not to enable sending service check payloads.
321    ///
322    /// Defaults to `true`.
323    #[serde(default = "default_enable_payloads_service_checks")]
324    enable_payloads_service_checks: bool,
325
326    /// Configuration related to origin detection and enrichment.
327    #[serde(flatten, default)]
328    origin_enrichment: OriginEnrichmentConfiguration,
329
330    /// Workload provider to utilize for origin detection/enrichment.
331    #[serde(skip)]
332    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
333
334    /// Additional tags to add to all metrics.
335    #[serde(rename = "dogstatsd_tags", default)]
336    additional_tags: Vec<String>,
337}
338
339impl DogStatsDConfiguration {
340    /// Creates a new `DogStatsDConfiguration` from the given configuration.
341    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
342        Ok(config.as_typed()?)
343    }
344
345    /// Sets the workload provider to use for configuring origin detection/enrichment.
346    ///
347    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
348    ///
349    /// Defaults to unset.
350    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
351    where
352        W: WorkloadProvider + Send + Sync + 'static,
353    {
354        self.workload_provider = Some(Arc::new(workload_provider));
355        self
356    }
357
358    async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
359        let mut listeners = Vec::new();
360
361        if self.port != 0 {
362            let address = if self.non_local_traffic {
363                ListenAddress::Udp(([0, 0, 0, 0], self.port).into())
364            } else {
365                ListenAddress::Udp(([127, 0, 0, 1], self.port).into())
366            };
367
368            let listener = Listener::from_listen_address(address)
369                .await
370                .context(FailedToCreateListener { listener_type: "UDP" })?;
371            listeners.push(listener);
372        }
373
374        if self.tcp_port != 0 {
375            let address = if self.non_local_traffic {
376                ListenAddress::Tcp(([0, 0, 0, 0], self.tcp_port).into())
377            } else {
378                ListenAddress::Tcp(([127, 0, 0, 1], self.tcp_port).into())
379            };
380
381            let listener = Listener::from_listen_address(address)
382                .await
383                .context(FailedToCreateListener { listener_type: "TCP" })?;
384            listeners.push(listener);
385        }
386
387        if let Some(socket_path) = &self.socket_path {
388            let address = ListenAddress::Unixgram(socket_path.into());
389            let listener = Listener::from_listen_address(address)
390                .await
391                .context(FailedToCreateListener {
392                    listener_type: "UDS (datagram)",
393                })?;
394            listeners.push(listener);
395        }
396
397        if let Some(socket_stream_path) = &self.socket_stream_path {
398            let address = ListenAddress::Unix(socket_stream_path.into());
399            let listener = Listener::from_listen_address(address)
400                .await
401                .context(FailedToCreateListener {
402                    listener_type: "UDS (stream)",
403                })?;
404            listeners.push(listener);
405        }
406
407        Ok(listeners)
408    }
409}
410
411#[async_trait]
412impl SourceBuilder for DogStatsDConfiguration {
413    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
414        let listeners = self.build_listeners().await?;
415        if listeners.is_empty() {
416            return Err(Error::NoListenersConfigured.into());
417        }
418
419        // Every listener requires at least one I/O buffer to ensure that all listeners can be serviced without
420        // deadlocking any of the others.
421        if self.buffer_count < listeners.len() {
422            return Err(generic_error!(
423                "Must have a minimum of {} I/O buffers based on the number of listeners configured.",
424                listeners.len()
425            ));
426        }
427
428        let maybe_origin_tags_resolver = self
429            .workload_provider
430            .clone()
431            .map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
432        let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
433            .error_context("Failed to create context resolvers.")?;
434
435        let codec_config = DogstatsdCodecConfiguration::default()
436            .with_timestamps(self.no_aggregation_pipeline_support)
437            .with_permissive_mode(self.permissive_decoding)
438            .with_minimum_sample_rate(self.minimum_sample_rate);
439
440        let codec = DogstatsdCodec::from_configuration(codec_config);
441
442        let enable_payloads_filter = EnablePayloadsFilter::default()
443            .with_allow_series(self.enable_payloads_series)
444            .with_allow_sketches(self.enable_payloads_sketches)
445            .with_allow_events(self.enable_payloads_events)
446            .with_allow_service_checks(self.enable_payloads_service_checks);
447
448        Ok(Box::new(DogStatsD {
449            listeners,
450            io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
451                FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
452            }),
453            codec,
454            context_resolvers,
455            enabled_filter: enable_payloads_filter,
456            additional_tags: self.additional_tags.clone().into(),
457        }))
458    }
459
460    fn outputs(&self) -> &[OutputDefinition<EventType>] {
461        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
462            vec![
463                OutputDefinition::named_output("metrics", EventType::Metric),
464                OutputDefinition::named_output("events", EventType::EventD),
465                OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
466            ]
467        });
468        &OUTPUTS
469    }
470}
471
472impl MemoryBounds for DogStatsDConfiguration {
473    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
474        builder
475            .minimum()
476            // Capture the size of the heap allocation when the component is built.
477            .with_single_value::<DogStatsD>("source struct")
478            // We allocate our I/O buffers entirely up front.
479            .with_expr(UsageExpr::product(
480                "buffers",
481                UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
482                UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
483            ))
484            // We also allocate the backing storage for the string interner up front, which is used by our context
485            // resolver.
486            .with_expr(UsageExpr::config(
487                "dogstatsd_string_interner_size",
488                self.context_string_interner_bytes.as_u64() as usize,
489            ));
490    }
491}
492
493/// DogStatsD source.
494pub struct DogStatsD {
495    listeners: Vec<Listener>,
496    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
497    codec: DogstatsdCodec,
498    context_resolvers: ContextResolvers,
499    enabled_filter: EnablePayloadsFilter,
500    additional_tags: Arc<[String]>,
501}
502
503struct ListenerContext {
504    shutdown_handle: DynamicShutdownHandle,
505    listener: Listener,
506    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
507    codec: DogstatsdCodec,
508    context_resolvers: ContextResolvers,
509    additional_tags: Arc<[String]>,
510}
511
512struct HandlerContext {
513    listen_addr: ListenAddress,
514    framer: DsdFramer,
515    codec: DogstatsdCodec,
516    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
517    metrics: Metrics,
518    context_resolvers: ContextResolvers,
519    additional_tags: Arc<[String]>,
520}
521
522struct Metrics {
523    metrics_received: Counter,
524    events_received: Counter,
525    service_checks_received: Counter,
526    bytes_received: Counter,
527    bytes_received_size: Histogram,
528    framing_errors: Counter,
529    metric_decoder_errors: Counter,
530    event_decoder_errors: Counter,
531    service_check_decoder_errors: Counter,
532    failed_context_resolve_total: Counter,
533    connections_active: Gauge,
534    packet_receive_success: Counter,
535    packet_receive_failure: Counter,
536}
537
538impl Metrics {
539    fn metrics_received(&self) -> &Counter {
540        &self.metrics_received
541    }
542
543    fn events_received(&self) -> &Counter {
544        &self.events_received
545    }
546
547    fn service_checks_received(&self) -> &Counter {
548        &self.service_checks_received
549    }
550
551    fn bytes_received(&self) -> &Counter {
552        &self.bytes_received
553    }
554
555    fn bytes_received_size(&self) -> &Histogram {
556        &self.bytes_received_size
557    }
558
559    fn framing_errors(&self) -> &Counter {
560        &self.framing_errors
561    }
562
563    fn metric_decode_failed(&self) -> &Counter {
564        &self.metric_decoder_errors
565    }
566
567    fn event_decode_failed(&self) -> &Counter {
568        &self.event_decoder_errors
569    }
570
571    fn service_check_decode_failed(&self) -> &Counter {
572        &self.service_check_decoder_errors
573    }
574
575    fn failed_context_resolve_total(&self) -> &Counter {
576        &self.failed_context_resolve_total
577    }
578
579    fn connections_active(&self) -> &Gauge {
580        &self.connections_active
581    }
582
583    fn packet_receive_success(&self) -> &Counter {
584        &self.packet_receive_success
585    }
586
587    fn packet_receive_failure(&self) -> &Counter {
588        &self.packet_receive_failure
589    }
590}
591
592fn build_metrics(listen_addr: &ListenAddress, component_context: &ComponentContext) -> Metrics {
593    let builder = MetricsBuilder::from_component_context(component_context);
594
595    let listener_type = match listen_addr {
596        ListenAddress::Tcp(_) => "tcp",
597        ListenAddress::Udp(_) => "udp",
598        ListenAddress::Unix(_) => "unix",
599        ListenAddress::Unixgram(_) => "unixgram",
600    };
601
602    Metrics {
603        metrics_received: builder.register_counter_with_tags(
604            "component_events_received_total",
605            [("message_type", "metrics"), ("listener_type", listener_type)],
606        ),
607        events_received: builder.register_counter_with_tags(
608            "component_events_received_total",
609            [("message_type", "events"), ("listener_type", listener_type)],
610        ),
611        service_checks_received: builder.register_counter_with_tags(
612            "component_events_received_total",
613            [("message_type", "service_checks"), ("listener_type", listener_type)],
614        ),
615        bytes_received: builder
616            .register_counter_with_tags("component_bytes_received_total", [("listener_type", listener_type)]),
617        bytes_received_size: builder
618            .register_trace_histogram_with_tags("component_bytes_received_size", [("listener_type", listener_type)]),
619        framing_errors: builder.register_counter_with_tags(
620            "component_errors_total",
621            [("listener_type", listener_type), ("error_type", "framing")],
622        ),
623        metric_decoder_errors: builder.register_counter_with_tags(
624            "component_errors_total",
625            [
626                ("listener_type", listener_type),
627                ("error_type", "decode"),
628                ("message_type", "metrics"),
629            ],
630        ),
631        event_decoder_errors: builder.register_counter_with_tags(
632            "component_errors_total",
633            [
634                ("listener_type", listener_type),
635                ("error_type", "decode"),
636                ("message_type", "events"),
637            ],
638        ),
639        service_check_decoder_errors: builder.register_counter_with_tags(
640            "component_errors_total",
641            [
642                ("listener_type", listener_type),
643                ("error_type", "decode"),
644                ("message_type", "service_checks"),
645            ],
646        ),
647        connections_active: builder
648            .register_gauge_with_tags("component_connections_active", [("listener_type", listener_type)]),
649        packet_receive_success: builder.register_debug_counter_with_tags(
650            "component_packets_received_total",
651            [("listener_type", listener_type), ("state", "ok")],
652        ),
653        packet_receive_failure: builder.register_debug_counter_with_tags(
654            "component_packets_received_total",
655            [("listener_type", listener_type), ("state", "error")],
656        ),
657        failed_context_resolve_total: builder.register_debug_counter("component_failed_context_resolve_total"),
658    }
659}
660
661#[async_trait]
662impl Source for DogStatsD {
663    async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
664        let mut global_shutdown = context.take_shutdown_handle();
665        let mut health = context.take_health_handle();
666
667        let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
668
669        // For each listener, spawn a dedicated task to run it.
670        for listener in self.listeners {
671            let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
672
673            // TODO: Create a health handle for each listener.
674            //
675            // We need to rework `HealthRegistry` to look a little more like `ComponentRegistry` so that we can have it
676            // already be scoped properly, otherwise all we can do here at present is either have a relative name, like
677            // `uds-stream`, or try and hardcode the full component name, which we will inevitably forget to update if
678            // we tweak the topology configuration, etc.
679            let listener_context = ListenerContext {
680                shutdown_handle: listener_shutdown_coordinator.register(),
681                listener,
682                io_buffer_pool: self.io_buffer_pool.clone(),
683                codec: self.codec.clone(),
684                context_resolvers: self.context_resolvers.clone(),
685                additional_tags: self.additional_tags.clone(),
686            };
687
688            spawn_traced_named(
689                task_name,
690                process_listener(context.clone(), listener_context, self.enabled_filter),
691            );
692        }
693
694        health.mark_ready();
695        debug!("DogStatsD source started.");
696
697        // Wait for the global shutdown signal, then notify listeners to shutdown.
698        //
699        // We also handle liveness here, which doesn't really matter for _this_ task, since the real work is happening
700        // in the listeners, but we need to satisfy the health checker.
701        loop {
702            select! {
703                _ = &mut global_shutdown => {
704                    debug!("Received shutdown signal.");
705                    break
706                },
707                _ = health.live() => continue,
708            }
709        }
710
711        debug!("Stopping DogStatsD source...");
712
713        listener_shutdown_coordinator.shutdown().await;
714
715        debug!("DogStatsD source stopped.");
716
717        Ok(())
718    }
719}
720
721async fn process_listener(
722    source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
723) {
724    let ListenerContext {
725        shutdown_handle,
726        mut listener,
727        io_buffer_pool,
728        codec,
729        context_resolvers,
730        additional_tags,
731    } = listener_context;
732    tokio::pin!(shutdown_handle);
733
734    let listen_addr = listener.listen_address().clone();
735    let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
736
737    info!(%listen_addr, "DogStatsD listener started.");
738
739    loop {
740        select! {
741            _ = &mut shutdown_handle => {
742                debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
743                break;
744            }
745            result = listener.accept() => match result {
746                Ok(stream) => {
747                    debug!(%listen_addr, "Spawning new stream handler.");
748
749                    let handler_context = HandlerContext {
750                        listen_addr: listen_addr.clone(),
751                        framer: get_framer(&listen_addr),
752                        codec: codec.clone(),
753                        io_buffer_pool: io_buffer_pool.clone(),
754                        metrics: build_metrics(&listen_addr, source_context.component_context()),
755                        context_resolvers: context_resolvers.clone(),
756                        additional_tags: additional_tags.clone(),
757                    };
758
759                    let task_name = format!("dogstatsd-stream-handler-{}", listen_addr.listener_type());
760                    spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
761                }
762                Err(e) => {
763                    error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
764                    break
765                }
766            }
767        }
768    }
769
770    stream_shutdown_coordinator.shutdown().await;
771
772    info!(%listen_addr, "DogStatsD listener stopped.");
773}
774
775async fn process_stream(
776    stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
777    shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
778) {
779    tokio::pin!(shutdown_handle);
780
781    select! {
782        _ = &mut shutdown_handle => {
783            debug!("Stream handler received shutdown signal.");
784        },
785        _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
786    }
787}
788
789async fn drive_stream(
790    mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
791    enabled_filter: EnablePayloadsFilter,
792) {
793    let HandlerContext {
794        listen_addr,
795        mut framer,
796        codec,
797        io_buffer_pool,
798        metrics,
799        mut context_resolvers,
800        additional_tags,
801    } = handler_context;
802
803    debug!(%listen_addr, "Stream handler started.");
804
805    if !stream.is_connectionless() {
806        metrics.connections_active().increment(1);
807    }
808
809    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
810    // we're otherwise idle and not receiving packets from the client.
811    let mut buffer_flush = interval(Duration::from_millis(100));
812    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
813
814    let mut event_buffer_manager = EventBufferManager::default();
815    let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
816    let memory_limiter = source_context.topology_context().memory_limiter();
817
818    'read: loop {
819        let mut eof = false;
820
821        let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
822
823        memory_limiter.wait_for_capacity().await;
824
825        select! {
826            // We read from the stream.
827            read_result = stream.receive(&mut io_buffer) => match read_result {
828                Ok((bytes_read, peer_addr)) => {
829                    if bytes_read == 0 {
830                        eof = true;
831                    }
832
833                    // TODO: This is correct for UDP and UDS in SOCK_DGRAM mode, but not for UDS in SOCK_STREAM mode...
834                    // because to match the Datadog Agent, we would only want to increment the number of successful
835                    // packets for each length-delimited frame, but this is obviously being incremented before we do any
836                    // framing... and even further, with the nested framer, we don't have access to the signal that
837                    // we've gotten a full length-delimited outer frame, only each individual newline-delimited inner
838                    // frame.
839                    //
840                    // As such, we'll potentially be over-reporting this metric for UDS in SOCK_STREAM mode compared to
841                    // the Datadog Agent.
842                    metrics.packet_receive_success().increment(1);
843                    metrics.bytes_received().increment(bytes_read as u64);
844                    metrics.bytes_received_size().record(bytes_read as f64);
845
846                    // When we're actually at EOF, or we're dealing with a connectionless stream, we try to decode in EOF mode.
847                    //
848                    // For connectionless streams, we always try to decode the buffer as if it's EOF, since it effectively _is_
849                    // always the end of file after a receive. For connection-oriented streams, we only want to do this once we've
850                    // actually hit true EOF.
851                    let reached_eof = eof || stream.is_connectionless();
852
853                    trace!(
854                        buffer_len = io_buffer.remaining(),
855                        buffer_cap = io_buffer.remaining_mut(),
856                        eof = reached_eof,
857                        %listen_addr,
858                        %peer_addr,
859                        "Received {} bytes from stream.",
860                        bytes_read
861                    );
862
863                    let mut frames = io_buffer.framed(&mut framer, reached_eof);
864                    'frame: loop {
865                        match frames.next() {
866                            Some(Ok(frame)) => {
867                                trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
868                                match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
869                                    Ok(Some(event)) => {
870                                        if let Some(event_buffer) = event_buffer_manager.try_push(event) {
871                                            debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
872                                            dispatch_events(event_buffer, &source_context, &listen_addr).await;
873                                        }
874                                    },
875                                    Ok(None) => {
876                                        // We didn't decode an event, but there was no inherent error. This is likely
877                                        // due to hitting resource limits, etc.
878                                        //
879                                        // Simply continue on.
880                                        continue
881                                    },
882                                    Err(e) => {
883                                        let frame_lossy_str = String::from_utf8_lossy(&frame);
884                                        warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
885                                    },
886                                }
887                            }
888                            Some(Err(e)) => {
889                                metrics.framing_errors().increment(1);
890
891                                if stream.is_connectionless() {
892                                    // For connectionless streams, we don't want to shutdown the stream since we can just keep
893                                    // reading more packets.
894                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
895                                    continue 'read;
896                                } else {
897                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
898                                    break 'read;
899                                }
900                            }
901                            None => {
902                                trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
903                                if eof && !stream.is_connectionless() {
904                                    debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
905                                    break 'read;
906                                } else {
907                                    break 'frame;
908                                }
909                            }
910                        }
911                    }
912                },
913                Err(e) => {
914                    metrics.packet_receive_failure().increment(1);
915
916                    if stream.is_connectionless() {
917                        // For connectionless streams, we don't want to shutdown the stream since we can just keep
918                        // reading more packets.
919                        warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
920                        continue 'read;
921                    } else {
922                        warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
923                        break 'read;
924                    }
925                }
926            },
927
928            _ = buffer_flush.tick() => {
929                if let Some(event_buffer) = event_buffer_manager.consume() {
930                    dispatch_events(event_buffer, &source_context, &listen_addr).await;
931                }
932            },
933        }
934    }
935
936    if let Some(event_buffer) = event_buffer_manager.consume() {
937        dispatch_events(event_buffer, &source_context, &listen_addr).await;
938    }
939
940    metrics.connections_active().decrement(1);
941
942    debug!(%listen_addr, "Stream handler stopped.");
943}
944
945fn handle_frame(
946    frame: &[u8], codec: &DogstatsdCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
947    peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
948) -> Result<Option<Event>, ParseError> {
949    let parsed = match codec.decode_packet(frame) {
950        Ok(parsed) => parsed,
951        Err(e) => {
952            // Try and determine what the message type was, if possible, to increment the correct error counter.
953            match parse_message_type(frame) {
954                MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
955                MessageType::Event => source_metrics.event_decode_failed().increment(1),
956                MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
957            }
958
959            return Err(e);
960        }
961    };
962
963    let event = match parsed {
964        ParsedPacket::Metric(metric_packet) => {
965            let events_len = metric_packet.num_points;
966            if !enabled_filter.allow_metric(&metric_packet) {
967                trace!(
968                    metric.name = metric_packet.metric_name,
969                    "Skipping metric due to filter configuration."
970                );
971                return Ok(None);
972            }
973
974            match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
975                Some(metric) => {
976                    source_metrics.metrics_received().increment(events_len);
977                    Event::Metric(metric)
978                }
979                None => {
980                    // We can only fail to get a metric back if we failed to resolve the context.
981                    source_metrics.failed_context_resolve_total().increment(1);
982                    return Ok(None);
983                }
984            }
985        }
986        ParsedPacket::Event(event) => {
987            if !enabled_filter.allow_event(&event) {
988                trace!("Skipping event {} due to filter configuration.", event.title);
989                return Ok(None);
990            }
991            let tags_resolver = context_resolvers.tags();
992            match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
993                Some(event) => {
994                    source_metrics.events_received().increment(1);
995                    Event::EventD(event)
996                }
997                None => {
998                    source_metrics.failed_context_resolve_total().increment(1);
999                    return Ok(None);
1000                }
1001            }
1002        }
1003        ParsedPacket::ServiceCheck(service_check) => {
1004            if !enabled_filter.allow_service_check(&service_check) {
1005                trace!(
1006                    "Skipping service check {} due to filter configuration.",
1007                    service_check.name
1008                );
1009                return Ok(None);
1010            }
1011            let tags_resolver = context_resolvers.tags();
1012            match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
1013                Some(service_check) => {
1014                    source_metrics.service_checks_received().increment(1);
1015                    Event::ServiceCheck(service_check)
1016                }
1017                None => {
1018                    source_metrics.failed_context_resolve_total().increment(1);
1019                    return Ok(None);
1020                }
1021            }
1022        }
1023    };
1024
1025    Ok(Some(event))
1026}
1027
1028fn handle_metric_packet(
1029    packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1030    additional_tags: &[String],
1031) -> Option<Metric> {
1032    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1033
1034    let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1035    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1036        origin.set_process_id(creds.pid as u32);
1037    }
1038
1039    // Choose the right context resolver based on whether or not this metric is pre-aggregated.
1040    let context_resolver = if packet.timestamp.is_some() {
1041        context_resolvers.no_agg()
1042    } else {
1043        context_resolvers.primary()
1044    };
1045
1046    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1047
1048    // Try to resolve the context for this metric.
1049    match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1050        Some(context) => {
1051            let metric_origin = well_known_tags
1052                .jmx_check_name
1053                .map(MetricOrigin::jmx_check)
1054                .unwrap_or_else(MetricOrigin::dogstatsd);
1055            let metadata = MetricMetadata::default()
1056                .with_origin(metric_origin)
1057                .with_hostname(well_known_tags.hostname.map(Arc::from));
1058
1059            Some(Metric::from_parts(context, packet.values, metadata))
1060        }
1061        // We failed to resolve the context, likely due to not having enough interner capacity.
1062        None => None,
1063    }
1064}
1065
1066fn handle_event_packet(
1067    packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1068) -> Option<EventD> {
1069    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1070
1071    let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1072    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1073        origin.set_process_id(creds.pid as u32);
1074    }
1075    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1076
1077    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1078    let tags = tags_resolver.create_tag_set(tags)?;
1079
1080    let eventd = EventD::new(packet.title, packet.text)
1081        .with_timestamp(packet.timestamp)
1082        .with_hostname(packet.hostname.map(|s| s.into()))
1083        .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1084        .with_alert_type(packet.alert_type)
1085        .with_priority(packet.priority)
1086        .with_source_type_name(packet.source_type_name.map(|s| s.into()))
1087        .with_alert_type(packet.alert_type)
1088        .with_tags(tags)
1089        .with_origin_tags(origin_tags);
1090
1091    Some(eventd)
1092}
1093
1094fn handle_service_check_packet(
1095    packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1096    additional_tags: &[String],
1097) -> Option<ServiceCheck> {
1098    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1099
1100    let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1101    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1102        origin.set_process_id(creds.pid as u32);
1103    }
1104    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1105
1106    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1107    let tags = tags_resolver.create_tag_set(tags)?;
1108
1109    let service_check = ServiceCheck::new(packet.name, packet.status)
1110        .with_timestamp(packet.timestamp)
1111        .with_hostname(packet.hostname.map(|s| s.into()))
1112        .with_tags(tags)
1113        .with_origin_tags(origin_tags)
1114        .with_message(packet.message.map(|s| s.into()));
1115
1116    Some(service_check)
1117}
1118
1119fn get_filtered_tags_iterator<'a>(
1120    raw_tags: RawTags<'a>, additional_tags: &'a [String],
1121) -> impl Iterator<Item = &'a str> + Clone {
1122    // This filters out "well-known" tags from the raw tags in the DogStatsD packet, and then chains on any additional tags
1123    // that were configured on the source.
1124    RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1125}
1126
1127async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1128    debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1129
1130    // TODO: This is maybe a little dicey because if we fail to dispatch the events, we may not have iterated over all of
1131    // them, so there might still be eventd events when get to the service checks point, and eventd events and/or service
1132    // check events when we get to the metrics point, and so on.
1133    //
1134    // There's probably something to be said for erroring out fully if this happens, since we should only fail to
1135    // dispatch if the downstream component fails entirely... and unless we have a way to restart the component, then
1136    // we're going to continue to fail to dispatch any more events until the process is restarted anyways.
1137
1138    // Dispatch any eventd events, if present.
1139    if event_buffer.has_event_type(EventType::EventD) {
1140        let eventd_events = event_buffer.extract(Event::is_eventd);
1141        if let Err(e) = source_context
1142            .dispatcher()
1143            .buffered_named("events")
1144            .expect("events output should always exist")
1145            .send_all(eventd_events)
1146            .await
1147        {
1148            error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1149        }
1150    }
1151
1152    // Dispatch any service check events, if present.
1153    if event_buffer.has_event_type(EventType::ServiceCheck) {
1154        let service_check_events = event_buffer.extract(Event::is_service_check);
1155        if let Err(e) = source_context
1156            .dispatcher()
1157            .buffered_named("service_checks")
1158            .expect("service checks output should always exist")
1159            .send_all(service_check_events)
1160            .await
1161        {
1162            error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1163        }
1164    }
1165
1166    // Finally, if there are events left, they'll be metrics, so dispatch them.
1167    if !event_buffer.is_empty() {
1168        if let Err(e) = source_context
1169            .dispatcher()
1170            .dispatch_named("metrics", event_buffer)
1171            .await
1172        {
1173            error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1174        }
1175    }
1176}
1177
1178const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1179    // This is a little goofy, but hear me out:
1180    //
1181    // In the Datadog Agent, the way the UDS listener works is that if it's in stream mode, it will do a standalone
1182    // socket read to get _just_ the length delimiter, which is 4 bytes. After that, it will do a read to get the packet
1183    // data itself, up to the limit of `dogstatsd_buffer_size`. This means that a _full_ UDS stream packet can be up to
1184    // `dogstatsd_buffer_size + 4` bytes.
1185    //
1186    // This isn't a problem in the Agent due to how it does the reads, but it's a problem for us because we want to be
1187    // able to get an entire frame in a single buffer for the purpose of decoding the frame. Rather than rewriting our
1188    // read loop such that we have to change the logic depending on UDP/UDS datagram vs UDS stream, we simply increase
1189    // the buffer size by 4 bytes to account for the length delimiter.
1190    //
1191    // We do it this way so that we don't have to change the buffer size in the configuration, since if you just ported
1192    // over a Datadog Agent configuration, the value would be too small, and vise versa.
1193    buffer_size + 4
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198    use std::net::SocketAddr;
1199
1200    use saluki_context::{ContextResolverBuilder, TagsResolverBuilder};
1201    use saluki_io::{
1202        deser::codec::dogstatsd::{DogstatsdCodec, DogstatsdCodecConfiguration, ParsedPacket},
1203        net::ConnectionAddress,
1204    };
1205
1206    use super::{handle_metric_packet, ContextResolvers};
1207
1208    #[test]
1209    fn no_metrics_when_interner_full_allocations_disallowed() {
1210        // We're specifically testing here that when we don't allow outside allocations, we should not be able to
1211        // resolve a context if the interner is full. A no-op interner has the smallest possible size, so that's going
1212        // to assure we can't intern anything... but we also need a string (name or one of the tags) that can't be
1213        // _inlined_ either, since that will get around the interner being full.
1214        //
1215        // We set our metric name to be longer than 31 bytes (the inlining limit) to ensure this.
1216
1217        let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1218        let tags_resolver = TagsResolverBuilder::for_tests().build();
1219        let context_resolver = ContextResolverBuilder::for_tests()
1220            .with_heap_allocations(false)
1221            .with_tags_resolver(Some(tags_resolver.clone()))
1222            .build();
1223        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1224        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1225
1226        let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1227
1228        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1229            panic!("Failed to parse packet.");
1230        };
1231
1232        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1233        assert!(maybe_metric.is_none());
1234    }
1235
1236    #[test]
1237    fn metric_with_additional_tags() {
1238        let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1239        let tags_resolver = TagsResolverBuilder::for_tests().build();
1240        let context_resolver = ContextResolverBuilder::for_tests()
1241            .with_heap_allocations(false)
1242            .with_tags_resolver(Some(tags_resolver.clone()))
1243            .build();
1244        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1245        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1246
1247        let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1248        let existing_tags_str = existing_tags.join(",");
1249
1250        let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1251        let additional_tags = [
1252            "tag4:value4".to_string(),
1253            "tag5:value5".to_string(),
1254            "tag6:value6".to_string(),
1255        ];
1256
1257        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1258            panic!("Failed to parse packet.");
1259        };
1260        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1261        assert!(maybe_metric.is_some());
1262
1263        let metric = maybe_metric.unwrap();
1264        let context = metric.context();
1265
1266        for tag in existing_tags {
1267            assert!(context.tags().has_tag(tag));
1268        }
1269
1270        for tag in additional_tags {
1271            assert!(context.tags().has_tag(tag));
1272        }
1273    }
1274}