saluki_components/sources/dogstatsd/
mod.rs

1use std::sync::{Arc, LazyLock};
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::{Buf, BufMut};
6use bytesize::ByteSize;
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
8use metrics::{Counter, Gauge, Histogram};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_context::tags::{RawTags, RawTagsFilter};
12use saluki_context::TagsResolver;
13use saluki_core::data_model::event::eventd::EventD;
14use saluki_core::data_model::event::metric::{MetricMetadata, MetricOrigin};
15use saluki_core::data_model::event::service_check::ServiceCheck;
16use saluki_core::data_model::event::{metric::Metric, Event, EventType};
17use saluki_core::topology::EventsBuffer;
18use saluki_core::{
19    components::{sources::*, ComponentContext},
20    observability::ComponentMetricsExt as _,
21    pooling::FixedSizeObjectPool,
22    topology::{
23        interconnect::EventBufferManager,
24        shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
25        OutputDefinition,
26    },
27};
28use saluki_env::WorkloadProvider;
29use saluki_error::{generic_error, ErrorContext as _, GenericError};
30use saluki_io::deser::codec::dogstatsd::{EventPacket, ServiceCheckPacket};
31use saluki_io::{
32    buf::{BytesBuffer, FixedSizeVec},
33    deser::{
34        codec::{
35            dogstatsd::{parse_message_type, MessageType, MetricPacket, ParseError, ParsedPacket},
36            DogstatsdCodec, DogstatsdCodecConfiguration,
37        },
38        framing::FramerExt as _,
39    },
40    net::{
41        listener::{Listener, ListenerError},
42        ConnectionAddress, ListenAddress, Stream,
43    },
44};
45use saluki_metrics::MetricsBuilder;
46use serde::Deserialize;
47use snafu::{ResultExt as _, Snafu};
48use tokio::{
49    select,
50    time::{interval, MissedTickBehavior},
51};
52use tracing::{debug, error, info, trace, warn};
53
54mod framer;
55use self::framer::{get_framer, DsdFramer};
56use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
57
58mod filters;
59use self::filters::EnablePayloadsFilter;
60
61mod io_buffer;
62use self::io_buffer::IoBufferManager;
63
64mod origin;
65use self::origin::{
66    origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, DogStatsDOriginTagResolver,
67    OriginEnrichmentConfiguration,
68};
69
70mod resolver;
71use self::resolver::ContextResolvers;
72
73mod tags;
74
75#[derive(Debug, Snafu)]
76#[snafu(context(suffix(false)))]
77enum Error {
78    #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
79    FailedToCreateListener {
80        listener_type: &'static str,
81        source: ListenerError,
82    },
83
84    #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
85    NoListenersConfigured,
86}
87
88const fn default_buffer_size() -> usize {
89    8192
90}
91
92const fn default_buffer_count() -> usize {
93    128
94}
95
96const fn default_port() -> u16 {
97    8125
98}
99
100const fn default_tcp_port() -> u16 {
101    0
102}
103
104const fn default_allow_context_heap_allocations() -> bool {
105    true
106}
107
108const fn default_no_aggregation_pipeline_support() -> bool {
109    true
110}
111
112const fn default_context_string_interner_size() -> ByteSize {
113    ByteSize::mib(2)
114}
115
116const fn default_cached_contexts_limit() -> usize {
117    500_000
118}
119
120const fn default_cached_tagsets_limit() -> usize {
121    500_000
122}
123
124const fn default_dogstatsd_permissive_decoding() -> bool {
125    true
126}
127
128const fn default_enable_payloads_series() -> bool {
129    true
130}
131
132const fn default_enable_payloads_sketches() -> bool {
133    true
134}
135
136const fn default_enable_payloads_events() -> bool {
137    true
138}
139
140const fn default_enable_payloads_service_checks() -> bool {
141    true
142}
143
144/// DogStatsD source.
145///
146/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
147#[derive(Deserialize)]
148pub struct DogStatsDConfiguration {
149    /// The size of the buffer used to receive messages into, in bytes.
150    ///
151    /// Payloads cannot exceed this size, or they will be truncated, leading to discarded messages.
152    ///
153    /// Defaults to 8192 bytes.
154    #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
155    buffer_size: usize,
156
157    /// The number of message buffers to allocate overall.
158    ///
159    /// This represents the maximum number of message buffers available for processing incoming metrics, which loosely
160    /// correlates with how many messages can be received per second. The default value should be suitable for the
161    /// majority of workloads, but high-throughput workloads may consider increasing this value.
162    ///
163    /// Defaults to 128.
164    #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
165    buffer_count: usize,
166
167    /// The port to listen on in UDP mode.
168    ///
169    /// If set to `0`, UDP is not used.
170    ///
171    /// Defaults to 8125.
172    #[serde(rename = "dogstatsd_port", default = "default_port")]
173    port: u16,
174
175    /// The port to listen on in TCP mode.
176    ///
177    /// If set to `0`, TCP is not used.
178    ///
179    /// Defaults to 0.
180    #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
181    tcp_port: u16,
182
183    /// The Unix domain socket path to listen on, in datagram mode.
184    ///
185    /// If not set, UDS (in datagram mode) is not used.
186    ///
187    /// Defaults to unset.
188    #[serde(rename = "dogstatsd_socket")]
189    socket_path: Option<String>,
190
191    /// The Unix domain socket path to listen on, in stream mode.
192    ///
193    /// If not set, UDS (in stream mode) is not used.
194    ///
195    /// Defaults to unset.
196    #[serde(rename = "dogstatsd_stream_socket")]
197    socket_stream_path: Option<String>,
198
199    /// Whether or not to listen for non-local traffic in UDP mode.
200    ///
201    /// If set to `true`, the listener will accept packets from any interface/address. Otherwise, the source will only
202    /// listen on `localhost`.
203    ///
204    /// Defaults to `false`.
205    #[serde(rename = "dogstatsd_non_local_traffic", default)]
206    non_local_traffic: bool,
207
208    /// Whether or not to allow heap allocations when resolving contexts.
209    ///
210    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
211    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
212    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
213    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
214    /// interned, the metric is skipped.
215    ///
216    /// Defaults to `true`.
217    #[serde(
218        rename = "dogstatsd_allow_context_heap_allocs",
219        default = "default_allow_context_heap_allocations"
220    )]
221    allow_context_heap_allocations: bool,
222
223    /// Whether or not to enable support for no-aggregation pipelines.
224    ///
225    /// When enabled, this influences how metrics are parsed, specifically around user-provided metric timestamps. When
226    /// metric timestamps are present, it is used as a signal to any aggregation transforms that the metric should not
227    /// be aggregated.
228    ///
229    /// Defaults to `true`.
230    #[serde(
231        rename = "dogstatsd_no_aggregation_pipeline",
232        default = "default_no_aggregation_pipeline_support"
233    )]
234    no_aggregation_pipeline_support: bool,
235
236    /// Total size of the string interner used for contexts.
237    ///
238    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
239    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
240    /// `allow_context_heap_allocations`.
241    #[serde(
242        rename = "dogstatsd_string_interner_size",
243        default = "default_context_string_interner_size"
244    )]
245    context_string_interner_bytes: ByteSize,
246
247    /// The maximum number of cached contexts to allow.
248    ///
249    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
250    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
251    /// and whether or not heap allocations are allowed.
252    ///
253    /// Defaults to 500,000.
254    #[serde(
255        rename = "dogstatsd_cached_contexts_limit",
256        default = "default_cached_contexts_limit"
257    )]
258    cached_contexts_limit: usize,
259
260    /// The maximum number of cached tagsets to allow.
261    ///
262    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
263    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
264    /// and whether or not heap allocations are allowed.
265    ///
266    /// Defaults to 500,000.
267    #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
268    cached_tagsets_limit: usize,
269
270    /// Whether or not to enable permissive mode in the decoder.
271    ///
272    /// Permissive mode allows the decoder to relax its strictness around the allowed payloads, which lets it match the
273    /// decoding behavior of the Datadog Agent.
274    ///
275    /// Defaults to `true`.
276    #[serde(
277        rename = "dogstatsd_permissive_decoding",
278        default = "default_dogstatsd_permissive_decoding"
279    )]
280    permissive_decoding: bool,
281
282    /// Whether or not to enable sending serie payloads.
283    ///
284    /// Defaults to `true`.
285    #[serde(default = "default_enable_payloads_series")]
286    enable_payloads_series: bool,
287
288    /// Whether or not to enable sending sketch payloads.
289    ///
290    /// Defaults to `true`.
291    #[serde(default = "default_enable_payloads_sketches")]
292    enable_payloads_sketches: bool,
293
294    /// Whether or not to enable sending event payloads.
295    ///
296    /// Defaults to `true`.
297    #[serde(default = "default_enable_payloads_events")]
298    enable_payloads_events: bool,
299
300    /// Whether or not to enable sending service check payloads.
301    ///
302    /// Defaults to `true`.
303    #[serde(default = "default_enable_payloads_service_checks")]
304    enable_payloads_service_checks: bool,
305
306    /// Configuration related to origin detection and enrichment.
307    #[serde(flatten, default)]
308    origin_enrichment: OriginEnrichmentConfiguration,
309
310    /// Workload provider to utilize for origin detection/enrichment.
311    #[serde(skip)]
312    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
313
314    /// Additional tags to add to all metrics.
315    #[serde(rename = "dogstatsd_tags", default)]
316    additional_tags: Vec<String>,
317}
318
319impl DogStatsDConfiguration {
320    /// Creates a new `DogStatsDConfiguration` from the given configuration.
321    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
322        Ok(config.as_typed()?)
323    }
324
325    /// Sets the workload provider to use for configuring origin detection/enrichment.
326    ///
327    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
328    ///
329    /// Defaults to unset.
330    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
331    where
332        W: WorkloadProvider + Send + Sync + 'static,
333    {
334        self.workload_provider = Some(Arc::new(workload_provider));
335        self
336    }
337
338    async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
339        let mut listeners = Vec::new();
340
341        if self.port != 0 {
342            let address = if self.non_local_traffic {
343                ListenAddress::Udp(([0, 0, 0, 0], self.port).into())
344            } else {
345                ListenAddress::Udp(([127, 0, 0, 1], self.port).into())
346            };
347
348            let listener = Listener::from_listen_address(address)
349                .await
350                .context(FailedToCreateListener { listener_type: "UDP" })?;
351            listeners.push(listener);
352        }
353
354        if self.tcp_port != 0 {
355            let address = if self.non_local_traffic {
356                ListenAddress::Tcp(([0, 0, 0, 0], self.tcp_port).into())
357            } else {
358                ListenAddress::Tcp(([127, 0, 0, 1], self.tcp_port).into())
359            };
360
361            let listener = Listener::from_listen_address(address)
362                .await
363                .context(FailedToCreateListener { listener_type: "TCP" })?;
364            listeners.push(listener);
365        }
366
367        if let Some(socket_path) = &self.socket_path {
368            let address = ListenAddress::Unixgram(socket_path.into());
369            let listener = Listener::from_listen_address(address)
370                .await
371                .context(FailedToCreateListener {
372                    listener_type: "UDS (datagram)",
373                })?;
374            listeners.push(listener);
375        }
376
377        if let Some(socket_stream_path) = &self.socket_stream_path {
378            let address = ListenAddress::Unix(socket_stream_path.into());
379            let listener = Listener::from_listen_address(address)
380                .await
381                .context(FailedToCreateListener {
382                    listener_type: "UDS (stream)",
383                })?;
384            listeners.push(listener);
385        }
386
387        Ok(listeners)
388    }
389}
390
391#[async_trait]
392impl SourceBuilder for DogStatsDConfiguration {
393    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
394        let listeners = self.build_listeners().await?;
395        if listeners.is_empty() {
396            return Err(Error::NoListenersConfigured.into());
397        }
398
399        // Every listener requires at least one I/O buffer to ensure that all listeners can be serviced without
400        // deadlocking any of the others.
401        if self.buffer_count < listeners.len() {
402            return Err(generic_error!(
403                "Must have a minimum of {} I/O buffers based on the number of listeners configured.",
404                listeners.len()
405            ));
406        }
407
408        let maybe_origin_tags_resolver = self
409            .workload_provider
410            .clone()
411            .map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
412        let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
413            .error_context("Failed to create context resolvers.")?;
414
415        let codec_config = DogstatsdCodecConfiguration::default()
416            .with_timestamps(self.no_aggregation_pipeline_support)
417            .with_permissive_mode(self.permissive_decoding);
418
419        let codec = DogstatsdCodec::from_configuration(codec_config);
420
421        let enable_payloads_filter = EnablePayloadsFilter::default()
422            .with_allow_series(self.enable_payloads_series)
423            .with_allow_sketches(self.enable_payloads_sketches)
424            .with_allow_events(self.enable_payloads_events)
425            .with_allow_service_checks(self.enable_payloads_service_checks);
426
427        Ok(Box::new(DogStatsD {
428            listeners,
429            io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
430                FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
431            }),
432            codec,
433            context_resolvers,
434            enabled_filter: enable_payloads_filter,
435            additional_tags: self.additional_tags.clone().into(),
436        }))
437    }
438
439    fn outputs(&self) -> &[OutputDefinition] {
440        static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
441            vec![
442                OutputDefinition::named_output("metrics", EventType::Metric),
443                OutputDefinition::named_output("events", EventType::EventD),
444                OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
445            ]
446        });
447
448        &OUTPUTS
449    }
450}
451
452impl MemoryBounds for DogStatsDConfiguration {
453    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
454        builder
455            .minimum()
456            // Capture the size of the heap allocation when the component is built.
457            .with_single_value::<DogStatsD>("source struct")
458            // We allocate our I/O buffers entirely up front.
459            .with_expr(UsageExpr::product(
460                "buffers",
461                UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
462                UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
463            ))
464            // We also allocate the backing storage for the string interner up front, which is used by our context
465            // resolver.
466            .with_expr(UsageExpr::config(
467                "dogstatsd_string_interner_size",
468                self.context_string_interner_bytes.as_u64() as usize,
469            ));
470    }
471}
472
473/// DogStatsD source.
474pub struct DogStatsD {
475    listeners: Vec<Listener>,
476    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
477    codec: DogstatsdCodec,
478    context_resolvers: ContextResolvers,
479    enabled_filter: EnablePayloadsFilter,
480    additional_tags: Arc<[String]>,
481}
482
483struct ListenerContext {
484    shutdown_handle: DynamicShutdownHandle,
485    listener: Listener,
486    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
487    codec: DogstatsdCodec,
488    context_resolvers: ContextResolvers,
489    additional_tags: Arc<[String]>,
490}
491
492struct HandlerContext {
493    listen_addr: ListenAddress,
494    framer: DsdFramer,
495    codec: DogstatsdCodec,
496    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
497    metrics: Metrics,
498    context_resolvers: ContextResolvers,
499    additional_tags: Arc<[String]>,
500}
501
502struct Metrics {
503    metrics_received: Counter,
504    events_received: Counter,
505    service_checks_received: Counter,
506    bytes_received: Counter,
507    bytes_received_size: Histogram,
508    framing_errors: Counter,
509    metric_decoder_errors: Counter,
510    event_decoder_errors: Counter,
511    service_check_decoder_errors: Counter,
512    failed_context_resolve_total: Counter,
513    connections_active: Gauge,
514    packet_receive_success: Counter,
515    packet_receive_failure: Counter,
516}
517
518impl Metrics {
519    fn metrics_received(&self) -> &Counter {
520        &self.metrics_received
521    }
522
523    fn events_received(&self) -> &Counter {
524        &self.events_received
525    }
526
527    fn service_checks_received(&self) -> &Counter {
528        &self.service_checks_received
529    }
530
531    fn bytes_received(&self) -> &Counter {
532        &self.bytes_received
533    }
534
535    fn bytes_received_size(&self) -> &Histogram {
536        &self.bytes_received_size
537    }
538
539    fn framing_errors(&self) -> &Counter {
540        &self.framing_errors
541    }
542
543    fn metric_decode_failed(&self) -> &Counter {
544        &self.metric_decoder_errors
545    }
546
547    fn event_decode_failed(&self) -> &Counter {
548        &self.event_decoder_errors
549    }
550
551    fn service_check_decode_failed(&self) -> &Counter {
552        &self.service_check_decoder_errors
553    }
554
555    fn failed_context_resolve_total(&self) -> &Counter {
556        &self.failed_context_resolve_total
557    }
558
559    fn connections_active(&self) -> &Gauge {
560        &self.connections_active
561    }
562
563    fn packet_receive_success(&self) -> &Counter {
564        &self.packet_receive_success
565    }
566
567    fn packet_receive_failure(&self) -> &Counter {
568        &self.packet_receive_failure
569    }
570}
571
572fn build_metrics(listen_addr: &ListenAddress, component_context: &ComponentContext) -> Metrics {
573    let builder = MetricsBuilder::from_component_context(component_context);
574
575    let listener_type = match listen_addr {
576        ListenAddress::Tcp(_) => "tcp",
577        ListenAddress::Udp(_) => "udp",
578        ListenAddress::Unix(_) => "unix",
579        ListenAddress::Unixgram(_) => "unixgram",
580    };
581
582    Metrics {
583        metrics_received: builder.register_counter_with_tags(
584            "component_events_received_total",
585            [("message_type", "metrics"), ("listener_type", listener_type)],
586        ),
587        events_received: builder.register_counter_with_tags(
588            "component_events_received_total",
589            [("message_type", "events"), ("listener_type", listener_type)],
590        ),
591        service_checks_received: builder.register_counter_with_tags(
592            "component_events_received_total",
593            [("message_type", "service_checks"), ("listener_type", listener_type)],
594        ),
595        bytes_received: builder
596            .register_counter_with_tags("component_bytes_received_total", [("listener_type", listener_type)]),
597        bytes_received_size: builder
598            .register_trace_histogram_with_tags("component_bytes_received_size", [("listener_type", listener_type)]),
599        framing_errors: builder.register_counter_with_tags(
600            "component_errors_total",
601            [("listener_type", listener_type), ("error_type", "framing")],
602        ),
603        metric_decoder_errors: builder.register_counter_with_tags(
604            "component_errors_total",
605            [
606                ("listener_type", listener_type),
607                ("error_type", "decode"),
608                ("message_type", "metrics"),
609            ],
610        ),
611        event_decoder_errors: builder.register_counter_with_tags(
612            "component_errors_total",
613            [
614                ("listener_type", listener_type),
615                ("error_type", "decode"),
616                ("message_type", "events"),
617            ],
618        ),
619        service_check_decoder_errors: builder.register_counter_with_tags(
620            "component_errors_total",
621            [
622                ("listener_type", listener_type),
623                ("error_type", "decode"),
624                ("message_type", "service_checks"),
625            ],
626        ),
627        connections_active: builder
628            .register_gauge_with_tags("component_connections_active", [("listener_type", listener_type)]),
629        packet_receive_success: builder.register_debug_counter_with_tags(
630            "component_packets_received_total",
631            [("listener_type", listener_type), ("state", "ok")],
632        ),
633        packet_receive_failure: builder.register_debug_counter_with_tags(
634            "component_packets_received_total",
635            [("listener_type", listener_type), ("state", "error")],
636        ),
637        failed_context_resolve_total: builder.register_debug_counter("component_failed_context_resolve_total"),
638    }
639}
640
641#[async_trait]
642impl Source for DogStatsD {
643    async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
644        let mut global_shutdown = context.take_shutdown_handle();
645        let mut health = context.take_health_handle();
646
647        let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
648
649        // For each listener, spawn a dedicated task to run it.
650        for listener in self.listeners {
651            let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
652
653            // TODO: Create a health handle for each listener.
654            //
655            // We need to rework `HealthRegistry` to look a little more like `ComponentRegistry` so that we can have it
656            // already be scoped properly, otherwise all we can do here at present is either have a relative name, like
657            // `uds-stream`, or try and hardcode the full component name, which we will inevitably forget to update if
658            // we tweak the topology configuration, etc.
659            let listener_context = ListenerContext {
660                shutdown_handle: listener_shutdown_coordinator.register(),
661                listener,
662                io_buffer_pool: self.io_buffer_pool.clone(),
663                codec: self.codec.clone(),
664                context_resolvers: self.context_resolvers.clone(),
665                additional_tags: self.additional_tags.clone(),
666            };
667
668            spawn_traced_named(
669                task_name,
670                process_listener(context.clone(), listener_context, self.enabled_filter),
671            );
672        }
673
674        health.mark_ready();
675        debug!("DogStatsD source started.");
676
677        // Wait for the global shutdown signal, then notify listeners to shutdown.
678        //
679        // We also handle liveness here, which doesn't really matter for _this_ task, since the real work is happening
680        // in the listeners, but we need to satisfy the health checker.
681        loop {
682            select! {
683                _ = &mut global_shutdown => {
684                    debug!("Received shutdown signal.");
685                    break
686                },
687                _ = health.live() => continue,
688            }
689        }
690
691        debug!("Stopping DogStatsD source...");
692
693        listener_shutdown_coordinator.shutdown().await;
694
695        debug!("DogStatsD source stopped.");
696
697        Ok(())
698    }
699}
700
701async fn process_listener(
702    source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
703) {
704    let ListenerContext {
705        shutdown_handle,
706        mut listener,
707        io_buffer_pool,
708        codec,
709        context_resolvers,
710        additional_tags,
711    } = listener_context;
712    tokio::pin!(shutdown_handle);
713
714    let listen_addr = listener.listen_address().clone();
715    let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
716
717    info!(%listen_addr, "DogStatsD listener started.");
718
719    loop {
720        select! {
721            _ = &mut shutdown_handle => {
722                debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
723                break;
724            }
725            result = listener.accept() => match result {
726                Ok(stream) => {
727                    debug!(%listen_addr, "Spawning new stream handler.");
728
729                    let handler_context = HandlerContext {
730                        listen_addr: listen_addr.clone(),
731                        framer: get_framer(&listen_addr),
732                        codec: codec.clone(),
733                        io_buffer_pool: io_buffer_pool.clone(),
734                        metrics: build_metrics(&listen_addr, source_context.component_context()),
735                        context_resolvers: context_resolvers.clone(),
736                        additional_tags: additional_tags.clone(),
737                    };
738
739                    let task_name = format!("dogstatsd-stream-handler-{}", listen_addr.listener_type());
740                    spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
741                }
742                Err(e) => {
743                    error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
744                    break
745                }
746            }
747        }
748    }
749
750    stream_shutdown_coordinator.shutdown().await;
751
752    info!(%listen_addr, "DogStatsD listener stopped.");
753}
754
755async fn process_stream(
756    stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
757    shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
758) {
759    tokio::pin!(shutdown_handle);
760
761    select! {
762        _ = &mut shutdown_handle => {
763            debug!("Stream handler received shutdown signal.");
764        },
765        _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
766    }
767}
768
769async fn drive_stream(
770    mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
771    enabled_filter: EnablePayloadsFilter,
772) {
773    let HandlerContext {
774        listen_addr,
775        mut framer,
776        codec,
777        io_buffer_pool,
778        metrics,
779        mut context_resolvers,
780        additional_tags,
781    } = handler_context;
782
783    debug!(%listen_addr, "Stream handler started.");
784
785    if !stream.is_connectionless() {
786        metrics.connections_active().increment(1);
787    }
788
789    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
790    // we're otherwise idle and not receiving packets from the client.
791    let mut buffer_flush = interval(Duration::from_millis(100));
792    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
793
794    let mut event_buffer_manager = EventBufferManager::default();
795    let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
796    let memory_limiter = source_context.topology_context().memory_limiter();
797
798    'read: loop {
799        let mut eof = false;
800
801        let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
802
803        memory_limiter.wait_for_capacity().await;
804
805        select! {
806            // We read from the stream.
807            read_result = stream.receive(&mut io_buffer) => match read_result {
808                Ok((bytes_read, peer_addr)) => {
809                    if bytes_read == 0 {
810                        eof = true;
811                    }
812
813                    // TODO: This is correct for UDP and UDS in SOCK_DGRAM mode, but not for UDS in SOCK_STREAM mode...
814                    // because to match the Datadog Agent, we would only want to increment the number of successful
815                    // packets for each length-delimited frame, but this is obviously being incremented before we do any
816                    // framing... and even further, with the nested framer, we don't have access to the signal that
817                    // we've gotten a full length-delimited outer frame, only each individual newline-delimited inner
818                    // frame.
819                    //
820                    // As such, we'll potentially be over-reporting this metric for UDS in SOCK_STREAM mode compared to
821                    // the Datadog Agent.
822                    metrics.packet_receive_success().increment(1);
823                    metrics.bytes_received().increment(bytes_read as u64);
824                    metrics.bytes_received_size().record(bytes_read as f64);
825
826                    // When we're actually at EOF, or we're dealing with a connectionless stream, we try to decode in EOF mode.
827                    //
828                    // For connectionless streams, we always try to decode the buffer as if it's EOF, since it effectively _is_
829                    // always the end of file after a receive. For connection-oriented streams, we only want to do this once we've
830                    // actually hit true EOF.
831                    let reached_eof = eof || stream.is_connectionless();
832
833                    trace!(
834                        buffer_len = io_buffer.remaining(),
835                        buffer_cap = io_buffer.remaining_mut(),
836                        eof = reached_eof,
837                        %listen_addr,
838                        %peer_addr,
839                        "Received {} bytes from stream.",
840                        bytes_read
841                    );
842
843                    let mut frames = io_buffer.framed(&mut framer, reached_eof);
844                    'frame: loop {
845                        match frames.next() {
846                            Some(Ok(frame)) => {
847                                trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
848                                match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
849                                    Ok(Some(event)) => {
850                                        if let Some(event_buffer) = event_buffer_manager.try_push(event) {
851                                            debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
852                                            dispatch_events(event_buffer, &source_context, &listen_addr).await;
853                                        }
854                                    },
855                                    Ok(None) => {
856                                        // We didn't decode an event, but there was no inherent error. This is likely
857                                        // due to hitting resource limits, etc.
858                                        //
859                                        // Simply continue on.
860                                        continue
861                                    },
862                                    Err(e) => {
863                                        let frame_lossy_str = String::from_utf8_lossy(&frame);
864                                        warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
865                                    },
866                                }
867                            }
868                            Some(Err(e)) => {
869                                metrics.framing_errors().increment(1);
870
871                                if stream.is_connectionless() {
872                                    // For connectionless streams, we don't want to shutdown the stream since we can just keep
873                                    // reading more packets.
874                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
875                                    continue 'read;
876                                } else {
877                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
878                                    break 'read;
879                                }
880                            }
881                            None => {
882                                trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
883                                if eof && !stream.is_connectionless() {
884                                    debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
885                                    break 'read;
886                                } else {
887                                    break 'frame;
888                                }
889                            }
890                        }
891                    }
892                },
893                Err(e) => {
894                    metrics.packet_receive_failure().increment(1);
895
896                    if stream.is_connectionless() {
897                        // For connectionless streams, we don't want to shutdown the stream since we can just keep
898                        // reading more packets.
899                        warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
900                        continue 'read;
901                    } else {
902                        warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
903                        break 'read;
904                    }
905                }
906            },
907
908            _ = buffer_flush.tick() => {
909                if let Some(event_buffer) = event_buffer_manager.consume() {
910                    dispatch_events(event_buffer, &source_context, &listen_addr).await;
911                }
912            },
913        }
914    }
915
916    if let Some(event_buffer) = event_buffer_manager.consume() {
917        dispatch_events(event_buffer, &source_context, &listen_addr).await;
918    }
919
920    metrics.connections_active().decrement(1);
921
922    debug!(%listen_addr, "Stream handler stopped.");
923}
924
925fn handle_frame(
926    frame: &[u8], codec: &DogstatsdCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
927    peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
928) -> Result<Option<Event>, ParseError> {
929    let parsed = match codec.decode_packet(frame) {
930        Ok(parsed) => parsed,
931        Err(e) => {
932            // Try and determine what the message type was, if possible, to increment the correct error counter.
933            match parse_message_type(frame) {
934                MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
935                MessageType::Event => source_metrics.event_decode_failed().increment(1),
936                MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
937            }
938
939            return Err(e);
940        }
941    };
942
943    let event = match parsed {
944        ParsedPacket::Metric(metric_packet) => {
945            let events_len = metric_packet.num_points;
946            if !enabled_filter.allow_metric(&metric_packet) {
947                trace!(
948                    metric.name = metric_packet.metric_name,
949                    "Skipping metric due to filter configuration."
950                );
951                return Ok(None);
952            }
953
954            match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
955                Some(metric) => {
956                    source_metrics.metrics_received().increment(events_len);
957                    Event::Metric(metric)
958                }
959                None => {
960                    // We can only fail to get a metric back if we failed to resolve the context.
961                    source_metrics.failed_context_resolve_total().increment(1);
962                    return Ok(None);
963                }
964            }
965        }
966        ParsedPacket::Event(event) => {
967            if !enabled_filter.allow_event(&event) {
968                trace!("Skipping event {} due to filter configuration.", event.title);
969                return Ok(None);
970            }
971            let tags_resolver = context_resolvers.tags();
972            match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
973                Some(event) => {
974                    source_metrics.events_received().increment(1);
975                    Event::EventD(event)
976                }
977                None => {
978                    source_metrics.failed_context_resolve_total().increment(1);
979                    return Ok(None);
980                }
981            }
982        }
983        ParsedPacket::ServiceCheck(service_check) => {
984            if !enabled_filter.allow_service_check(&service_check) {
985                trace!(
986                    "Skipping service check {} due to filter configuration.",
987                    service_check.name
988                );
989                return Ok(None);
990            }
991            let tags_resolver = context_resolvers.tags();
992            match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
993                Some(service_check) => {
994                    source_metrics.service_checks_received().increment(1);
995                    Event::ServiceCheck(service_check)
996                }
997                None => {
998                    source_metrics.failed_context_resolve_total().increment(1);
999                    return Ok(None);
1000                }
1001            }
1002        }
1003    };
1004
1005    Ok(Some(event))
1006}
1007
1008fn handle_metric_packet(
1009    packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1010    additional_tags: &[String],
1011) -> Option<Metric> {
1012    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1013
1014    let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1015    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1016        origin.set_process_id(creds.pid as u32);
1017    }
1018
1019    // Choose the right context resolver based on whether or not this metric is pre-aggregated.
1020    let context_resolver = if packet.timestamp.is_some() {
1021        context_resolvers.no_agg()
1022    } else {
1023        context_resolvers.primary()
1024    };
1025
1026    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1027
1028    // Try to resolve the context for this metric.
1029    match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1030        Some(context) => {
1031            let metric_origin = well_known_tags
1032                .jmx_check_name
1033                .map(MetricOrigin::jmx_check)
1034                .unwrap_or_else(MetricOrigin::dogstatsd);
1035            let metadata = MetricMetadata::default()
1036                .with_origin(metric_origin)
1037                .with_hostname(well_known_tags.hostname.map(Arc::from));
1038
1039            Some(Metric::from_parts(context, packet.values, metadata))
1040        }
1041        // We failed to resolve the context, likely due to not having enough interner capacity.
1042        None => None,
1043    }
1044}
1045
1046fn handle_event_packet(
1047    packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1048) -> Option<EventD> {
1049    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1050
1051    let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1052    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1053        origin.set_process_id(creds.pid as u32);
1054    }
1055    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1056
1057    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1058    let tags = tags_resolver.create_tag_set(tags)?;
1059
1060    let eventd = EventD::new(packet.title, packet.text)
1061        .with_timestamp(packet.timestamp)
1062        .with_hostname(packet.hostname.map(|s| s.into()))
1063        .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1064        .with_alert_type(packet.alert_type)
1065        .with_priority(packet.priority)
1066        .with_source_type_name(packet.source_type_name.map(|s| s.into()))
1067        .with_alert_type(packet.alert_type)
1068        .with_tags(tags)
1069        .with_origin_tags(origin_tags);
1070
1071    Some(eventd)
1072}
1073
1074fn handle_service_check_packet(
1075    packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1076    additional_tags: &[String],
1077) -> Option<ServiceCheck> {
1078    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1079
1080    let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1081    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1082        origin.set_process_id(creds.pid as u32);
1083    }
1084    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1085
1086    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1087    let tags = tags_resolver.create_tag_set(tags)?;
1088
1089    let service_check = ServiceCheck::new(packet.name, packet.status)
1090        .with_timestamp(packet.timestamp)
1091        .with_hostname(packet.hostname.map(|s| s.into()))
1092        .with_tags(tags)
1093        .with_origin_tags(origin_tags)
1094        .with_message(packet.message.map(|s| s.into()));
1095
1096    Some(service_check)
1097}
1098
1099fn get_filtered_tags_iterator<'a>(
1100    raw_tags: RawTags<'a>, additional_tags: &'a [String],
1101) -> impl Iterator<Item = &'a str> + Clone {
1102    // This filters out "well-known" tags from the raw tags in the DogStatsD packet, and then chains on any additional tags
1103    // that were configured on the source.
1104    RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1105}
1106
1107async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1108    debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1109
1110    // TODO: This is maybe a little dicey because if we fail to dispatch the events, we may not have iterated over all of
1111    // them, so there might still be eventd events when get to the service checks point, and eventd events and/or service
1112    // check events when we get to the metrics point, and so on.
1113    //
1114    // There's probably something to be said for erroring out fully if this happens, since we should only fail to
1115    // dispatch if the downstream component fails entirely... and unless we have a way to restart the component, then
1116    // we're going to continue to fail to dispatch any more events until the process is restarted anyways.
1117
1118    // Dispatch any eventd events, if present.
1119    if event_buffer.has_event_type(EventType::EventD) {
1120        let eventd_events = event_buffer.extract(Event::is_eventd);
1121        if let Err(e) = source_context
1122            .dispatcher()
1123            .buffered_named("events")
1124            .expect("events output should always exist")
1125            .send_all(eventd_events)
1126            .await
1127        {
1128            error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1129        }
1130    }
1131
1132    // Dispatch any service check events, if present.
1133    if event_buffer.has_event_type(EventType::ServiceCheck) {
1134        let service_check_events = event_buffer.extract(Event::is_service_check);
1135        if let Err(e) = source_context
1136            .dispatcher()
1137            .buffered_named("service_checks")
1138            .expect("service checks output should always exist")
1139            .send_all(service_check_events)
1140            .await
1141        {
1142            error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1143        }
1144    }
1145
1146    // Finally, if there are events left, they'll be metrics, so dispatch them.
1147    if !event_buffer.is_empty() {
1148        if let Err(e) = source_context
1149            .dispatcher()
1150            .dispatch_named("metrics", event_buffer)
1151            .await
1152        {
1153            error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1154        }
1155    }
1156}
1157
1158const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1159    // This is a little goofy, but hear me out:
1160    //
1161    // In the Datadog Agent, the way the UDS listener works is that if it's in stream mode, it will do a standalone
1162    // socket read to get _just_ the length delimiter, which is 4 bytes. After that, it will do a read to get the packet
1163    // data itself, up to the limit of `dogstatsd_buffer_size`. This means that a _full_ UDS stream packet can be up to
1164    // `dogstatsd_buffer_size + 4` bytes.
1165    //
1166    // This isn't a problem in the Agent due to how it does the reads, but it's a problem for us because we want to be
1167    // able to get an entire frame in a single buffer for the purpose of decoding the frame. Rather than rewriting our
1168    // read loop such that we have to change the logic depending on UDP/UDS datagram vs UDS stream, we simply increase
1169    // the buffer size by 4 bytes to account for the length delimiter.
1170    //
1171    // We do it this way so that we don't have to change the buffer size in the configuration, since if you just ported
1172    // over a Datadog Agent configuration, the value would be too small, and vise versa.
1173    buffer_size + 4
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use std::net::SocketAddr;
1179
1180    use saluki_context::{ContextResolverBuilder, TagsResolverBuilder};
1181    use saluki_io::{
1182        deser::codec::{dogstatsd::ParsedPacket, DogstatsdCodec, DogstatsdCodecConfiguration},
1183        net::ConnectionAddress,
1184    };
1185
1186    use super::{handle_metric_packet, ContextResolvers};
1187
1188    #[test]
1189    fn no_metrics_when_interner_full_allocations_disallowed() {
1190        // We're specifically testing here that when we don't allow outside allocations, we should not be able to
1191        // resolve a context if the interner is full. A no-op interner has the smallest possible size, so that's going
1192        // to assure we can't intern anything... but we also need a string (name or one of the tags) that can't be
1193        // _inlined_ either, since that will get around the interner being full.
1194        //
1195        // We set our metric name to be longer than 31 bytes (the inlining limit) to ensure this.
1196
1197        let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1198        let tags_resolver = TagsResolverBuilder::for_tests().build();
1199        let context_resolver = ContextResolverBuilder::for_tests()
1200            .with_heap_allocations(false)
1201            .with_tags_resolver(Some(tags_resolver.clone()))
1202            .build();
1203        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1204        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1205
1206        let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1207
1208        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1209            panic!("Failed to parse packet.");
1210        };
1211
1212        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1213        assert!(maybe_metric.is_none());
1214    }
1215
1216    #[test]
1217    fn metric_with_additional_tags() {
1218        let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1219        let tags_resolver = TagsResolverBuilder::for_tests().build();
1220        let context_resolver = ContextResolverBuilder::for_tests()
1221            .with_heap_allocations(false)
1222            .with_tags_resolver(Some(tags_resolver.clone()))
1223            .build();
1224        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1225        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1226
1227        let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1228        let existing_tags_str = existing_tags.join(",");
1229
1230        let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1231        let additional_tags = [
1232            "tag4:value4".to_string(),
1233            "tag5:value5".to_string(),
1234            "tag6:value6".to_string(),
1235        ];
1236
1237        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1238            panic!("Failed to parse packet.");
1239        };
1240        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1241        assert!(maybe_metric.is_some());
1242
1243        let metric = maybe_metric.unwrap();
1244        let context = metric.context();
1245
1246        for tag in existing_tags {
1247            assert!(context.tags().has_tag(tag));
1248        }
1249
1250        for tag in additional_tags {
1251            assert!(context.tags().has_tag(tag));
1252        }
1253    }
1254}