saluki_components/sources/dogstatsd/
mod.rs

1use std::sync::{Arc, LazyLock};
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::{Buf, BufMut};
6use bytesize::ByteSize;
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
8use metrics::{Counter, Gauge, Histogram};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_context::tags::{RawTags, RawTagsFilter};
12use saluki_context::TagsResolver;
13use saluki_core::data_model::event::eventd::EventD;
14use saluki_core::data_model::event::metric::{MetricMetadata, MetricOrigin};
15use saluki_core::data_model::event::service_check::ServiceCheck;
16use saluki_core::data_model::event::{metric::Metric, Event, EventType};
17use saluki_core::topology::EventsBuffer;
18use saluki_core::{
19    components::{sources::*, ComponentContext},
20    observability::ComponentMetricsExt as _,
21    pooling::FixedSizeObjectPool,
22    topology::{
23        interconnect::EventBufferManager,
24        shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
25        OutputDefinition,
26    },
27};
28use saluki_env::WorkloadProvider;
29use saluki_error::{generic_error, ErrorContext as _, GenericError};
30use saluki_io::deser::codec::dogstatsd::{EventPacket, ServiceCheckPacket};
31use saluki_io::{
32    buf::{BytesBuffer, FixedSizeVec},
33    deser::{
34        codec::{
35            dogstatsd::{parse_message_type, MessageType, MetricPacket, ParseError, ParsedPacket},
36            DogstatsdCodec, DogstatsdCodecConfiguration,
37        },
38        framing::FramerExt as _,
39    },
40    net::{
41        listener::{Listener, ListenerError},
42        ConnectionAddress, ListenAddress, Stream,
43    },
44};
45use saluki_metrics::MetricsBuilder;
46use serde::Deserialize;
47use snafu::{ResultExt as _, Snafu};
48use tokio::{
49    select,
50    time::{interval, MissedTickBehavior},
51};
52use tracing::{debug, error, info, trace, warn};
53
54mod framer;
55use self::framer::{get_framer, DsdFramer};
56use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
57
58mod filters;
59use self::filters::EnablePayloadsFilter;
60
61mod io_buffer;
62use self::io_buffer::IoBufferManager;
63
64mod origin;
65use self::origin::{
66    origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, DogStatsDOriginTagResolver,
67    OriginEnrichmentConfiguration,
68};
69
70mod resolver;
71use self::resolver::ContextResolvers;
72
73mod tags;
74
75#[derive(Debug, Snafu)]
76#[snafu(context(suffix(false)))]
77enum Error {
78    #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
79    FailedToCreateListener {
80        listener_type: &'static str,
81        source: ListenerError,
82    },
83
84    #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
85    NoListenersConfigured,
86}
87
88const fn default_buffer_size() -> usize {
89    8192
90}
91
92const fn default_buffer_count() -> usize {
93    128
94}
95
96const fn default_port() -> u16 {
97    8125
98}
99
100const fn default_tcp_port() -> u16 {
101    0
102}
103
104const fn default_allow_context_heap_allocations() -> bool {
105    true
106}
107
108const fn default_no_aggregation_pipeline_support() -> bool {
109    true
110}
111
112const fn default_context_string_interner_size() -> ByteSize {
113    ByteSize::mib(2)
114}
115
116const fn default_cached_contexts_limit() -> usize {
117    500_000
118}
119
120const fn default_cached_tagsets_limit() -> usize {
121    500_000
122}
123
124const fn default_dogstatsd_permissive_decoding() -> bool {
125    true
126}
127
128const fn default_enable_payloads_series() -> bool {
129    true
130}
131
132const fn default_enable_payloads_sketches() -> bool {
133    true
134}
135
136const fn default_enable_payloads_events() -> bool {
137    true
138}
139
140const fn default_enable_payloads_service_checks() -> bool {
141    true
142}
143
144/// DogStatsD source.
145///
146/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
147#[derive(Deserialize)]
148pub struct DogStatsDConfiguration {
149    /// The size of the buffer used to receive messages into, in bytes.
150    ///
151    /// Payloads cannot exceed this size, or they will be truncated, leading to discarded messages.
152    ///
153    /// Defaults to 8192 bytes.
154    #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
155    buffer_size: usize,
156
157    /// The number of message buffers to allocate overall.
158    ///
159    /// This represents the maximum number of message buffers available for processing incoming metrics, which loosely
160    /// correlates with how many messages can be received per second. The default value should be suitable for the
161    /// majority of workloads, but high-throughput workloads may consider increasing this value.
162    ///
163    /// Defaults to 128.
164    #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
165    buffer_count: usize,
166
167    /// The port to listen on in UDP mode.
168    ///
169    /// If set to `0`, UDP is not used.
170    ///
171    /// Defaults to 8125.
172    #[serde(rename = "dogstatsd_port", default = "default_port")]
173    port: u16,
174
175    /// The port to listen on in TCP mode.
176    ///
177    /// If set to `0`, TCP is not used.
178    ///
179    /// Defaults to 0.
180    #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
181    tcp_port: u16,
182
183    /// The Unix domain socket path to listen on, in datagram mode.
184    ///
185    /// If not set, UDS (in datagram mode) is not used.
186    ///
187    /// Defaults to unset.
188    #[serde(rename = "dogstatsd_socket")]
189    socket_path: Option<String>,
190
191    /// The Unix domain socket path to listen on, in stream mode.
192    ///
193    /// If not set, UDS (in stream mode) is not used.
194    ///
195    /// Defaults to unset.
196    #[serde(rename = "dogstatsd_stream_socket")]
197    socket_stream_path: Option<String>,
198
199    /// Whether or not to listen for non-local traffic in UDP mode.
200    ///
201    /// If set to `true`, the listener will accept packets from any interface/address. Otherwise, the source will only
202    /// listen on `localhost`.
203    ///
204    /// Defaults to `false`.
205    #[serde(rename = "dogstatsd_non_local_traffic", default)]
206    non_local_traffic: bool,
207
208    /// Whether or not to allow heap allocations when resolving contexts.
209    ///
210    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
211    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
212    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
213    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
214    /// interned, the metric is skipped.
215    ///
216    /// Defaults to `true`.
217    #[serde(
218        rename = "dogstatsd_allow_context_heap_allocs",
219        default = "default_allow_context_heap_allocations"
220    )]
221    allow_context_heap_allocations: bool,
222
223    /// Whether or not to enable support for no-aggregation pipelines.
224    ///
225    /// When enabled, this influences how metrics are parsed, specifically around user-provided metric timestamps. When
226    /// metric timestamps are present, it is used as a signal to any aggregation transforms that the metric should not
227    /// be aggregated.
228    ///
229    /// Defaults to `true`.
230    #[serde(
231        rename = "dogstatsd_no_aggregation_pipeline",
232        default = "default_no_aggregation_pipeline_support"
233    )]
234    no_aggregation_pipeline_support: bool,
235
236    /// Total size of the string interner used for contexts.
237    ///
238    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
239    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
240    /// `allow_context_heap_allocations`.
241    #[serde(
242        rename = "dogstatsd_string_interner_size",
243        default = "default_context_string_interner_size"
244    )]
245    context_string_interner_bytes: ByteSize,
246
247    /// The maximum number of cached contexts to allow.
248    ///
249    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
250    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
251    /// and whether or not heap allocations are allowed.
252    ///
253    /// Defaults to 500,000.
254    #[serde(
255        rename = "dogstatsd_cached_contexts_limit",
256        default = "default_cached_contexts_limit"
257    )]
258    cached_contexts_limit: usize,
259
260    /// The maximum number of cached tagsets to allow.
261    ///
262    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
263    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
264    /// and whether or not heap allocations are allowed.
265    ///
266    /// Defaults to 500,000.
267    #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
268    cached_tagsets_limit: usize,
269
270    /// Whether or not to enable permissive mode in the decoder.
271    ///
272    /// Permissive mode allows the decoder to relax its strictness around the allowed payloads, which lets it match the
273    /// decoding behavior of the Datadog Agent.
274    ///
275    /// Defaults to `true`.
276    #[serde(
277        rename = "dogstatsd_permissive_decoding",
278        default = "default_dogstatsd_permissive_decoding"
279    )]
280    permissive_decoding: bool,
281
282    /// Whether or not to enable sending serie payloads.
283    ///
284    /// Defaults to `true`.
285    #[serde(default = "default_enable_payloads_series")]
286    enable_payloads_series: bool,
287
288    /// Whether or not to enable sending sketch payloads.
289    ///
290    /// Defaults to `true`.
291    #[serde(default = "default_enable_payloads_sketches")]
292    enable_payloads_sketches: bool,
293
294    /// Whether or not to enable sending event payloads.
295    ///
296    /// Defaults to `true`.
297    #[serde(default = "default_enable_payloads_events")]
298    enable_payloads_events: bool,
299
300    /// Whether or not to enable sending service check payloads.
301    ///
302    /// Defaults to `true`.
303    #[serde(default = "default_enable_payloads_service_checks")]
304    enable_payloads_service_checks: bool,
305
306    /// Configuration related to origin detection and enrichment.
307    #[serde(flatten, default)]
308    origin_enrichment: OriginEnrichmentConfiguration,
309
310    /// Workload provider to utilize for origin detection/enrichment.
311    #[serde(skip)]
312    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
313
314    /// Additional tags to add to all metrics.
315    #[serde(rename = "dogstatsd_tags", default)]
316    additional_tags: Vec<String>,
317}
318
319impl DogStatsDConfiguration {
320    /// Creates a new `DogStatsDConfiguration` from the given configuration.
321    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
322        Ok(config.as_typed()?)
323    }
324
325    /// Sets the workload provider to use for configuring origin detection/enrichment.
326    ///
327    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
328    ///
329    /// Defaults to unset.
330    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
331    where
332        W: WorkloadProvider + Send + Sync + 'static,
333    {
334        self.workload_provider = Some(Arc::new(workload_provider));
335        self
336    }
337
338    async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
339        let mut listeners = Vec::new();
340
341        if self.port != 0 {
342            let address = if self.non_local_traffic {
343                ListenAddress::Udp(([0, 0, 0, 0], self.port).into())
344            } else {
345                ListenAddress::Udp(([127, 0, 0, 1], self.port).into())
346            };
347
348            let listener = Listener::from_listen_address(address)
349                .await
350                .context(FailedToCreateListener { listener_type: "UDP" })?;
351            listeners.push(listener);
352        }
353
354        if self.tcp_port != 0 {
355            let address = if self.non_local_traffic {
356                ListenAddress::Tcp(([0, 0, 0, 0], self.tcp_port).into())
357            } else {
358                ListenAddress::Tcp(([127, 0, 0, 1], self.tcp_port).into())
359            };
360
361            let listener = Listener::from_listen_address(address)
362                .await
363                .context(FailedToCreateListener { listener_type: "TCP" })?;
364            listeners.push(listener);
365        }
366
367        if let Some(socket_path) = &self.socket_path {
368            let address = ListenAddress::Unixgram(socket_path.into());
369            let listener = Listener::from_listen_address(address)
370                .await
371                .context(FailedToCreateListener {
372                    listener_type: "UDS (datagram)",
373                })?;
374            listeners.push(listener);
375        }
376
377        if let Some(socket_stream_path) = &self.socket_stream_path {
378            let address = ListenAddress::Unix(socket_stream_path.into());
379            let listener = Listener::from_listen_address(address)
380                .await
381                .context(FailedToCreateListener {
382                    listener_type: "UDS (stream)",
383                })?;
384            listeners.push(listener);
385        }
386
387        Ok(listeners)
388    }
389}
390
391#[async_trait]
392impl SourceBuilder for DogStatsDConfiguration {
393    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
394        let listeners = self.build_listeners().await?;
395        if listeners.is_empty() {
396            return Err(Error::NoListenersConfigured.into());
397        }
398
399        // Every listener requires at least one I/O buffer to ensure that all listeners can be serviced without
400        // deadlocking any of the others.
401        if self.buffer_count < listeners.len() {
402            return Err(generic_error!(
403                "Must have a minimum of {} I/O buffers based on the number of listeners configured.",
404                listeners.len()
405            ));
406        }
407
408        let maybe_origin_tags_resolver = self
409            .workload_provider
410            .clone()
411            .map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
412        let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
413            .error_context("Failed to create context resolvers.")?;
414
415        let codec_config = DogstatsdCodecConfiguration::default()
416            .with_timestamps(self.no_aggregation_pipeline_support)
417            .with_permissive_mode(self.permissive_decoding);
418
419        let codec = DogstatsdCodec::from_configuration(codec_config);
420
421        let enable_payloads_filter = EnablePayloadsFilter::default()
422            .with_allow_series(self.enable_payloads_series)
423            .with_allow_sketches(self.enable_payloads_sketches)
424            .with_allow_events(self.enable_payloads_events)
425            .with_allow_service_checks(self.enable_payloads_service_checks);
426
427        Ok(Box::new(DogStatsD {
428            listeners,
429            io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
430                FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
431            }),
432            codec,
433            context_resolvers,
434            enabled_filter: enable_payloads_filter,
435            additional_tags: self.additional_tags.clone().into(),
436        }))
437    }
438
439    fn outputs(&self) -> &[OutputDefinition<EventType>] {
440        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
441            vec![
442                OutputDefinition::named_output("metrics", EventType::Metric),
443                OutputDefinition::named_output("events", EventType::EventD),
444                OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
445            ]
446        });
447        &OUTPUTS
448    }
449}
450
451impl MemoryBounds for DogStatsDConfiguration {
452    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
453        builder
454            .minimum()
455            // Capture the size of the heap allocation when the component is built.
456            .with_single_value::<DogStatsD>("source struct")
457            // We allocate our I/O buffers entirely up front.
458            .with_expr(UsageExpr::product(
459                "buffers",
460                UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
461                UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
462            ))
463            // We also allocate the backing storage for the string interner up front, which is used by our context
464            // resolver.
465            .with_expr(UsageExpr::config(
466                "dogstatsd_string_interner_size",
467                self.context_string_interner_bytes.as_u64() as usize,
468            ));
469    }
470}
471
472/// DogStatsD source.
473pub struct DogStatsD {
474    listeners: Vec<Listener>,
475    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
476    codec: DogstatsdCodec,
477    context_resolvers: ContextResolvers,
478    enabled_filter: EnablePayloadsFilter,
479    additional_tags: Arc<[String]>,
480}
481
482struct ListenerContext {
483    shutdown_handle: DynamicShutdownHandle,
484    listener: Listener,
485    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
486    codec: DogstatsdCodec,
487    context_resolvers: ContextResolvers,
488    additional_tags: Arc<[String]>,
489}
490
491struct HandlerContext {
492    listen_addr: ListenAddress,
493    framer: DsdFramer,
494    codec: DogstatsdCodec,
495    io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
496    metrics: Metrics,
497    context_resolvers: ContextResolvers,
498    additional_tags: Arc<[String]>,
499}
500
501struct Metrics {
502    metrics_received: Counter,
503    events_received: Counter,
504    service_checks_received: Counter,
505    bytes_received: Counter,
506    bytes_received_size: Histogram,
507    framing_errors: Counter,
508    metric_decoder_errors: Counter,
509    event_decoder_errors: Counter,
510    service_check_decoder_errors: Counter,
511    failed_context_resolve_total: Counter,
512    connections_active: Gauge,
513    packet_receive_success: Counter,
514    packet_receive_failure: Counter,
515}
516
517impl Metrics {
518    fn metrics_received(&self) -> &Counter {
519        &self.metrics_received
520    }
521
522    fn events_received(&self) -> &Counter {
523        &self.events_received
524    }
525
526    fn service_checks_received(&self) -> &Counter {
527        &self.service_checks_received
528    }
529
530    fn bytes_received(&self) -> &Counter {
531        &self.bytes_received
532    }
533
534    fn bytes_received_size(&self) -> &Histogram {
535        &self.bytes_received_size
536    }
537
538    fn framing_errors(&self) -> &Counter {
539        &self.framing_errors
540    }
541
542    fn metric_decode_failed(&self) -> &Counter {
543        &self.metric_decoder_errors
544    }
545
546    fn event_decode_failed(&self) -> &Counter {
547        &self.event_decoder_errors
548    }
549
550    fn service_check_decode_failed(&self) -> &Counter {
551        &self.service_check_decoder_errors
552    }
553
554    fn failed_context_resolve_total(&self) -> &Counter {
555        &self.failed_context_resolve_total
556    }
557
558    fn connections_active(&self) -> &Gauge {
559        &self.connections_active
560    }
561
562    fn packet_receive_success(&self) -> &Counter {
563        &self.packet_receive_success
564    }
565
566    fn packet_receive_failure(&self) -> &Counter {
567        &self.packet_receive_failure
568    }
569}
570
571fn build_metrics(listen_addr: &ListenAddress, component_context: &ComponentContext) -> Metrics {
572    let builder = MetricsBuilder::from_component_context(component_context);
573
574    let listener_type = match listen_addr {
575        ListenAddress::Tcp(_) => "tcp",
576        ListenAddress::Udp(_) => "udp",
577        ListenAddress::Unix(_) => "unix",
578        ListenAddress::Unixgram(_) => "unixgram",
579    };
580
581    Metrics {
582        metrics_received: builder.register_counter_with_tags(
583            "component_events_received_total",
584            [("message_type", "metrics"), ("listener_type", listener_type)],
585        ),
586        events_received: builder.register_counter_with_tags(
587            "component_events_received_total",
588            [("message_type", "events"), ("listener_type", listener_type)],
589        ),
590        service_checks_received: builder.register_counter_with_tags(
591            "component_events_received_total",
592            [("message_type", "service_checks"), ("listener_type", listener_type)],
593        ),
594        bytes_received: builder
595            .register_counter_with_tags("component_bytes_received_total", [("listener_type", listener_type)]),
596        bytes_received_size: builder
597            .register_trace_histogram_with_tags("component_bytes_received_size", [("listener_type", listener_type)]),
598        framing_errors: builder.register_counter_with_tags(
599            "component_errors_total",
600            [("listener_type", listener_type), ("error_type", "framing")],
601        ),
602        metric_decoder_errors: builder.register_counter_with_tags(
603            "component_errors_total",
604            [
605                ("listener_type", listener_type),
606                ("error_type", "decode"),
607                ("message_type", "metrics"),
608            ],
609        ),
610        event_decoder_errors: builder.register_counter_with_tags(
611            "component_errors_total",
612            [
613                ("listener_type", listener_type),
614                ("error_type", "decode"),
615                ("message_type", "events"),
616            ],
617        ),
618        service_check_decoder_errors: builder.register_counter_with_tags(
619            "component_errors_total",
620            [
621                ("listener_type", listener_type),
622                ("error_type", "decode"),
623                ("message_type", "service_checks"),
624            ],
625        ),
626        connections_active: builder
627            .register_gauge_with_tags("component_connections_active", [("listener_type", listener_type)]),
628        packet_receive_success: builder.register_debug_counter_with_tags(
629            "component_packets_received_total",
630            [("listener_type", listener_type), ("state", "ok")],
631        ),
632        packet_receive_failure: builder.register_debug_counter_with_tags(
633            "component_packets_received_total",
634            [("listener_type", listener_type), ("state", "error")],
635        ),
636        failed_context_resolve_total: builder.register_debug_counter("component_failed_context_resolve_total"),
637    }
638}
639
640#[async_trait]
641impl Source for DogStatsD {
642    async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
643        let mut global_shutdown = context.take_shutdown_handle();
644        let mut health = context.take_health_handle();
645
646        let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
647
648        // For each listener, spawn a dedicated task to run it.
649        for listener in self.listeners {
650            let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
651
652            // TODO: Create a health handle for each listener.
653            //
654            // We need to rework `HealthRegistry` to look a little more like `ComponentRegistry` so that we can have it
655            // already be scoped properly, otherwise all we can do here at present is either have a relative name, like
656            // `uds-stream`, or try and hardcode the full component name, which we will inevitably forget to update if
657            // we tweak the topology configuration, etc.
658            let listener_context = ListenerContext {
659                shutdown_handle: listener_shutdown_coordinator.register(),
660                listener,
661                io_buffer_pool: self.io_buffer_pool.clone(),
662                codec: self.codec.clone(),
663                context_resolvers: self.context_resolvers.clone(),
664                additional_tags: self.additional_tags.clone(),
665            };
666
667            spawn_traced_named(
668                task_name,
669                process_listener(context.clone(), listener_context, self.enabled_filter),
670            );
671        }
672
673        health.mark_ready();
674        debug!("DogStatsD source started.");
675
676        // Wait for the global shutdown signal, then notify listeners to shutdown.
677        //
678        // We also handle liveness here, which doesn't really matter for _this_ task, since the real work is happening
679        // in the listeners, but we need to satisfy the health checker.
680        loop {
681            select! {
682                _ = &mut global_shutdown => {
683                    debug!("Received shutdown signal.");
684                    break
685                },
686                _ = health.live() => continue,
687            }
688        }
689
690        debug!("Stopping DogStatsD source...");
691
692        listener_shutdown_coordinator.shutdown().await;
693
694        debug!("DogStatsD source stopped.");
695
696        Ok(())
697    }
698}
699
700async fn process_listener(
701    source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
702) {
703    let ListenerContext {
704        shutdown_handle,
705        mut listener,
706        io_buffer_pool,
707        codec,
708        context_resolvers,
709        additional_tags,
710    } = listener_context;
711    tokio::pin!(shutdown_handle);
712
713    let listen_addr = listener.listen_address().clone();
714    let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
715
716    info!(%listen_addr, "DogStatsD listener started.");
717
718    loop {
719        select! {
720            _ = &mut shutdown_handle => {
721                debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
722                break;
723            }
724            result = listener.accept() => match result {
725                Ok(stream) => {
726                    debug!(%listen_addr, "Spawning new stream handler.");
727
728                    let handler_context = HandlerContext {
729                        listen_addr: listen_addr.clone(),
730                        framer: get_framer(&listen_addr),
731                        codec: codec.clone(),
732                        io_buffer_pool: io_buffer_pool.clone(),
733                        metrics: build_metrics(&listen_addr, source_context.component_context()),
734                        context_resolvers: context_resolvers.clone(),
735                        additional_tags: additional_tags.clone(),
736                    };
737
738                    let task_name = format!("dogstatsd-stream-handler-{}", listen_addr.listener_type());
739                    spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
740                }
741                Err(e) => {
742                    error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
743                    break
744                }
745            }
746        }
747    }
748
749    stream_shutdown_coordinator.shutdown().await;
750
751    info!(%listen_addr, "DogStatsD listener stopped.");
752}
753
754async fn process_stream(
755    stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
756    shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
757) {
758    tokio::pin!(shutdown_handle);
759
760    select! {
761        _ = &mut shutdown_handle => {
762            debug!("Stream handler received shutdown signal.");
763        },
764        _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
765    }
766}
767
768async fn drive_stream(
769    mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
770    enabled_filter: EnablePayloadsFilter,
771) {
772    let HandlerContext {
773        listen_addr,
774        mut framer,
775        codec,
776        io_buffer_pool,
777        metrics,
778        mut context_resolvers,
779        additional_tags,
780    } = handler_context;
781
782    debug!(%listen_addr, "Stream handler started.");
783
784    if !stream.is_connectionless() {
785        metrics.connections_active().increment(1);
786    }
787
788    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
789    // we're otherwise idle and not receiving packets from the client.
790    let mut buffer_flush = interval(Duration::from_millis(100));
791    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
792
793    let mut event_buffer_manager = EventBufferManager::default();
794    let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
795    let memory_limiter = source_context.topology_context().memory_limiter();
796
797    'read: loop {
798        let mut eof = false;
799
800        let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
801
802        memory_limiter.wait_for_capacity().await;
803
804        select! {
805            // We read from the stream.
806            read_result = stream.receive(&mut io_buffer) => match read_result {
807                Ok((bytes_read, peer_addr)) => {
808                    if bytes_read == 0 {
809                        eof = true;
810                    }
811
812                    // TODO: This is correct for UDP and UDS in SOCK_DGRAM mode, but not for UDS in SOCK_STREAM mode...
813                    // because to match the Datadog Agent, we would only want to increment the number of successful
814                    // packets for each length-delimited frame, but this is obviously being incremented before we do any
815                    // framing... and even further, with the nested framer, we don't have access to the signal that
816                    // we've gotten a full length-delimited outer frame, only each individual newline-delimited inner
817                    // frame.
818                    //
819                    // As such, we'll potentially be over-reporting this metric for UDS in SOCK_STREAM mode compared to
820                    // the Datadog Agent.
821                    metrics.packet_receive_success().increment(1);
822                    metrics.bytes_received().increment(bytes_read as u64);
823                    metrics.bytes_received_size().record(bytes_read as f64);
824
825                    // When we're actually at EOF, or we're dealing with a connectionless stream, we try to decode in EOF mode.
826                    //
827                    // For connectionless streams, we always try to decode the buffer as if it's EOF, since it effectively _is_
828                    // always the end of file after a receive. For connection-oriented streams, we only want to do this once we've
829                    // actually hit true EOF.
830                    let reached_eof = eof || stream.is_connectionless();
831
832                    trace!(
833                        buffer_len = io_buffer.remaining(),
834                        buffer_cap = io_buffer.remaining_mut(),
835                        eof = reached_eof,
836                        %listen_addr,
837                        %peer_addr,
838                        "Received {} bytes from stream.",
839                        bytes_read
840                    );
841
842                    let mut frames = io_buffer.framed(&mut framer, reached_eof);
843                    'frame: loop {
844                        match frames.next() {
845                            Some(Ok(frame)) => {
846                                trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
847                                match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
848                                    Ok(Some(event)) => {
849                                        if let Some(event_buffer) = event_buffer_manager.try_push(event) {
850                                            debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
851                                            dispatch_events(event_buffer, &source_context, &listen_addr).await;
852                                        }
853                                    },
854                                    Ok(None) => {
855                                        // We didn't decode an event, but there was no inherent error. This is likely
856                                        // due to hitting resource limits, etc.
857                                        //
858                                        // Simply continue on.
859                                        continue
860                                    },
861                                    Err(e) => {
862                                        let frame_lossy_str = String::from_utf8_lossy(&frame);
863                                        warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
864                                    },
865                                }
866                            }
867                            Some(Err(e)) => {
868                                metrics.framing_errors().increment(1);
869
870                                if stream.is_connectionless() {
871                                    // For connectionless streams, we don't want to shutdown the stream since we can just keep
872                                    // reading more packets.
873                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
874                                    continue 'read;
875                                } else {
876                                    debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
877                                    break 'read;
878                                }
879                            }
880                            None => {
881                                trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
882                                if eof && !stream.is_connectionless() {
883                                    debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
884                                    break 'read;
885                                } else {
886                                    break 'frame;
887                                }
888                            }
889                        }
890                    }
891                },
892                Err(e) => {
893                    metrics.packet_receive_failure().increment(1);
894
895                    if stream.is_connectionless() {
896                        // For connectionless streams, we don't want to shutdown the stream since we can just keep
897                        // reading more packets.
898                        warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
899                        continue 'read;
900                    } else {
901                        warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
902                        break 'read;
903                    }
904                }
905            },
906
907            _ = buffer_flush.tick() => {
908                if let Some(event_buffer) = event_buffer_manager.consume() {
909                    dispatch_events(event_buffer, &source_context, &listen_addr).await;
910                }
911            },
912        }
913    }
914
915    if let Some(event_buffer) = event_buffer_manager.consume() {
916        dispatch_events(event_buffer, &source_context, &listen_addr).await;
917    }
918
919    metrics.connections_active().decrement(1);
920
921    debug!(%listen_addr, "Stream handler stopped.");
922}
923
924fn handle_frame(
925    frame: &[u8], codec: &DogstatsdCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
926    peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
927) -> Result<Option<Event>, ParseError> {
928    let parsed = match codec.decode_packet(frame) {
929        Ok(parsed) => parsed,
930        Err(e) => {
931            // Try and determine what the message type was, if possible, to increment the correct error counter.
932            match parse_message_type(frame) {
933                MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
934                MessageType::Event => source_metrics.event_decode_failed().increment(1),
935                MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
936            }
937
938            return Err(e);
939        }
940    };
941
942    let event = match parsed {
943        ParsedPacket::Metric(metric_packet) => {
944            let events_len = metric_packet.num_points;
945            if !enabled_filter.allow_metric(&metric_packet) {
946                trace!(
947                    metric.name = metric_packet.metric_name,
948                    "Skipping metric due to filter configuration."
949                );
950                return Ok(None);
951            }
952
953            match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
954                Some(metric) => {
955                    source_metrics.metrics_received().increment(events_len);
956                    Event::Metric(metric)
957                }
958                None => {
959                    // We can only fail to get a metric back if we failed to resolve the context.
960                    source_metrics.failed_context_resolve_total().increment(1);
961                    return Ok(None);
962                }
963            }
964        }
965        ParsedPacket::Event(event) => {
966            if !enabled_filter.allow_event(&event) {
967                trace!("Skipping event {} due to filter configuration.", event.title);
968                return Ok(None);
969            }
970            let tags_resolver = context_resolvers.tags();
971            match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
972                Some(event) => {
973                    source_metrics.events_received().increment(1);
974                    Event::EventD(event)
975                }
976                None => {
977                    source_metrics.failed_context_resolve_total().increment(1);
978                    return Ok(None);
979                }
980            }
981        }
982        ParsedPacket::ServiceCheck(service_check) => {
983            if !enabled_filter.allow_service_check(&service_check) {
984                trace!(
985                    "Skipping service check {} due to filter configuration.",
986                    service_check.name
987                );
988                return Ok(None);
989            }
990            let tags_resolver = context_resolvers.tags();
991            match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
992                Some(service_check) => {
993                    source_metrics.service_checks_received().increment(1);
994                    Event::ServiceCheck(service_check)
995                }
996                None => {
997                    source_metrics.failed_context_resolve_total().increment(1);
998                    return Ok(None);
999                }
1000            }
1001        }
1002    };
1003
1004    Ok(Some(event))
1005}
1006
1007fn handle_metric_packet(
1008    packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1009    additional_tags: &[String],
1010) -> Option<Metric> {
1011    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1012
1013    let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1014    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1015        origin.set_process_id(creds.pid as u32);
1016    }
1017
1018    // Choose the right context resolver based on whether or not this metric is pre-aggregated.
1019    let context_resolver = if packet.timestamp.is_some() {
1020        context_resolvers.no_agg()
1021    } else {
1022        context_resolvers.primary()
1023    };
1024
1025    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1026
1027    // Try to resolve the context for this metric.
1028    match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1029        Some(context) => {
1030            let metric_origin = well_known_tags
1031                .jmx_check_name
1032                .map(MetricOrigin::jmx_check)
1033                .unwrap_or_else(MetricOrigin::dogstatsd);
1034            let metadata = MetricMetadata::default()
1035                .with_origin(metric_origin)
1036                .with_hostname(well_known_tags.hostname.map(Arc::from));
1037
1038            Some(Metric::from_parts(context, packet.values, metadata))
1039        }
1040        // We failed to resolve the context, likely due to not having enough interner capacity.
1041        None => None,
1042    }
1043}
1044
1045fn handle_event_packet(
1046    packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1047) -> Option<EventD> {
1048    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1049
1050    let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1051    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1052        origin.set_process_id(creds.pid as u32);
1053    }
1054    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1055
1056    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1057    let tags = tags_resolver.create_tag_set(tags)?;
1058
1059    let eventd = EventD::new(packet.title, packet.text)
1060        .with_timestamp(packet.timestamp)
1061        .with_hostname(packet.hostname.map(|s| s.into()))
1062        .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1063        .with_alert_type(packet.alert_type)
1064        .with_priority(packet.priority)
1065        .with_source_type_name(packet.source_type_name.map(|s| s.into()))
1066        .with_alert_type(packet.alert_type)
1067        .with_tags(tags)
1068        .with_origin_tags(origin_tags);
1069
1070    Some(eventd)
1071}
1072
1073fn handle_service_check_packet(
1074    packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1075    additional_tags: &[String],
1076) -> Option<ServiceCheck> {
1077    let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1078
1079    let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1080    if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1081        origin.set_process_id(creds.pid as u32);
1082    }
1083    let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1084
1085    let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1086    let tags = tags_resolver.create_tag_set(tags)?;
1087
1088    let service_check = ServiceCheck::new(packet.name, packet.status)
1089        .with_timestamp(packet.timestamp)
1090        .with_hostname(packet.hostname.map(|s| s.into()))
1091        .with_tags(tags)
1092        .with_origin_tags(origin_tags)
1093        .with_message(packet.message.map(|s| s.into()));
1094
1095    Some(service_check)
1096}
1097
1098fn get_filtered_tags_iterator<'a>(
1099    raw_tags: RawTags<'a>, additional_tags: &'a [String],
1100) -> impl Iterator<Item = &'a str> + Clone {
1101    // This filters out "well-known" tags from the raw tags in the DogStatsD packet, and then chains on any additional tags
1102    // that were configured on the source.
1103    RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1104}
1105
1106async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1107    debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1108
1109    // TODO: This is maybe a little dicey because if we fail to dispatch the events, we may not have iterated over all of
1110    // them, so there might still be eventd events when get to the service checks point, and eventd events and/or service
1111    // check events when we get to the metrics point, and so on.
1112    //
1113    // There's probably something to be said for erroring out fully if this happens, since we should only fail to
1114    // dispatch if the downstream component fails entirely... and unless we have a way to restart the component, then
1115    // we're going to continue to fail to dispatch any more events until the process is restarted anyways.
1116
1117    // Dispatch any eventd events, if present.
1118    if event_buffer.has_event_type(EventType::EventD) {
1119        let eventd_events = event_buffer.extract(Event::is_eventd);
1120        if let Err(e) = source_context
1121            .dispatcher()
1122            .buffered_named("events")
1123            .expect("events output should always exist")
1124            .send_all(eventd_events)
1125            .await
1126        {
1127            error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1128        }
1129    }
1130
1131    // Dispatch any service check events, if present.
1132    if event_buffer.has_event_type(EventType::ServiceCheck) {
1133        let service_check_events = event_buffer.extract(Event::is_service_check);
1134        if let Err(e) = source_context
1135            .dispatcher()
1136            .buffered_named("service_checks")
1137            .expect("service checks output should always exist")
1138            .send_all(service_check_events)
1139            .await
1140        {
1141            error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1142        }
1143    }
1144
1145    // Finally, if there are events left, they'll be metrics, so dispatch them.
1146    if !event_buffer.is_empty() {
1147        if let Err(e) = source_context
1148            .dispatcher()
1149            .dispatch_named("metrics", event_buffer)
1150            .await
1151        {
1152            error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1153        }
1154    }
1155}
1156
1157const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1158    // This is a little goofy, but hear me out:
1159    //
1160    // In the Datadog Agent, the way the UDS listener works is that if it's in stream mode, it will do a standalone
1161    // socket read to get _just_ the length delimiter, which is 4 bytes. After that, it will do a read to get the packet
1162    // data itself, up to the limit of `dogstatsd_buffer_size`. This means that a _full_ UDS stream packet can be up to
1163    // `dogstatsd_buffer_size + 4` bytes.
1164    //
1165    // This isn't a problem in the Agent due to how it does the reads, but it's a problem for us because we want to be
1166    // able to get an entire frame in a single buffer for the purpose of decoding the frame. Rather than rewriting our
1167    // read loop such that we have to change the logic depending on UDP/UDS datagram vs UDS stream, we simply increase
1168    // the buffer size by 4 bytes to account for the length delimiter.
1169    //
1170    // We do it this way so that we don't have to change the buffer size in the configuration, since if you just ported
1171    // over a Datadog Agent configuration, the value would be too small, and vise versa.
1172    buffer_size + 4
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177    use std::net::SocketAddr;
1178
1179    use saluki_context::{ContextResolverBuilder, TagsResolverBuilder};
1180    use saluki_io::{
1181        deser::codec::{dogstatsd::ParsedPacket, DogstatsdCodec, DogstatsdCodecConfiguration},
1182        net::ConnectionAddress,
1183    };
1184
1185    use super::{handle_metric_packet, ContextResolvers};
1186
1187    #[test]
1188    fn no_metrics_when_interner_full_allocations_disallowed() {
1189        // We're specifically testing here that when we don't allow outside allocations, we should not be able to
1190        // resolve a context if the interner is full. A no-op interner has the smallest possible size, so that's going
1191        // to assure we can't intern anything... but we also need a string (name or one of the tags) that can't be
1192        // _inlined_ either, since that will get around the interner being full.
1193        //
1194        // We set our metric name to be longer than 31 bytes (the inlining limit) to ensure this.
1195
1196        let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1197        let tags_resolver = TagsResolverBuilder::for_tests().build();
1198        let context_resolver = ContextResolverBuilder::for_tests()
1199            .with_heap_allocations(false)
1200            .with_tags_resolver(Some(tags_resolver.clone()))
1201            .build();
1202        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1203        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1204
1205        let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1206
1207        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1208            panic!("Failed to parse packet.");
1209        };
1210
1211        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1212        assert!(maybe_metric.is_none());
1213    }
1214
1215    #[test]
1216    fn metric_with_additional_tags() {
1217        let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1218        let tags_resolver = TagsResolverBuilder::for_tests().build();
1219        let context_resolver = ContextResolverBuilder::for_tests()
1220            .with_heap_allocations(false)
1221            .with_tags_resolver(Some(tags_resolver.clone()))
1222            .build();
1223        let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1224        let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1225
1226        let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1227        let existing_tags_str = existing_tags.join(",");
1228
1229        let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1230        let additional_tags = [
1231            "tag4:value4".to_string(),
1232            "tag5:value5".to_string(),
1233            "tag6:value6".to_string(),
1234        ];
1235
1236        let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1237            panic!("Failed to parse packet.");
1238        };
1239        let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1240        assert!(maybe_metric.is_some());
1241
1242        let metric = maybe_metric.unwrap();
1243        let context = metric.context();
1244
1245        for tag in existing_tags {
1246            assert!(context.tags().has_tag(tag));
1247        }
1248
1249        for tag in additional_tags {
1250            assert!(context.tags().has_tag(tag));
1251        }
1252    }
1253}