1use std::sync::{Arc, LazyLock};
2use std::time::{Duration, SystemTime, UNIX_EPOCH};
3
4use async_trait::async_trait;
5use bytes::{Buf, BufMut};
6use bytesize::ByteSize;
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
8use metrics::{Counter, Gauge, Histogram};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_context::{
12 tags::{RawTags, RawTagsFilter},
13 TagsResolver,
14};
15use saluki_core::data_model::event::metric::Metric;
16use saluki_core::data_model::event::{
17 eventd::EventD,
18 metric::{MetricMetadata, MetricOrigin},
19 service_check::ServiceCheck,
20 Event, EventType,
21};
22use saluki_core::{
23 components::{sources::*, ComponentContext},
24 observability::ComponentMetricsExt as _,
25 pooling::FixedSizeObjectPool,
26 topology::{
27 interconnect::EventBufferManager,
28 shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
29 EventsBuffer, OutputDefinition,
30 },
31};
32use saluki_env::WorkloadProvider;
33use saluki_error::{generic_error, ErrorContext as _, GenericError};
34use saluki_io::{
35 buf::{BytesBuffer, FixedSizeVec},
36 deser::{codec::dogstatsd::*, framing::FramerExt as _},
37 net::{
38 listener::{Listener, ListenerError},
39 ConnectionAddress, ListenAddress, Stream,
40 },
41};
42use saluki_metrics::MetricsBuilder;
43use serde::Deserialize;
44use serde_with::{serde_as, NoneAsEmptyString};
45use snafu::{ResultExt as _, Snafu};
46use stringtheory::MetaString;
47use tokio::{
48 select,
49 time::{interval, MissedTickBehavior},
50};
51use tracing::{debug, error, info, trace, warn};
52
53mod framer;
54use self::framer::{get_framer, DsdFramer};
55use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
56
57mod filters;
58use self::filters::EnablePayloadsFilter;
59
60mod io_buffer;
61use self::io_buffer::IoBufferManager;
62
63mod origin;
64use self::origin::{
65 origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, DogStatsDOriginTagResolver,
66 OriginEnrichmentConfiguration,
67};
68
69mod resolver;
70use self::resolver::ContextResolvers;
71
72mod tags;
73
74#[derive(Debug, Snafu)]
75#[snafu(context(suffix(false)))]
76enum Error {
77 #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
78 FailedToCreateListener {
79 listener_type: &'static str,
80 source: ListenerError,
81 },
82
83 #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
84 NoListenersConfigured,
85
86 #[snafu(display("Could not resolve bind_host '{}': {}", host, source))]
87 UnresolvableBindHost { host: String, source: std::io::Error },
88
89 #[snafu(display("bind_host '{}' resolved to zero IP addresses.", host))]
90 BindHostHasNoAddresses { host: String },
91}
92
93const fn default_buffer_size() -> usize {
94 8192
95}
96
97const fn default_buffer_count() -> usize {
98 128
99}
100
101const fn default_port() -> u16 {
102 8125
103}
104
105const fn default_tcp_port() -> u16 {
106 0
107}
108
109const fn default_socket_receive_buffer_size() -> usize {
110 0
111}
112
113const fn default_allow_context_heap_allocations() -> bool {
114 true
115}
116
117const fn default_no_aggregation_pipeline_support() -> bool {
118 true
119}
120
121const fn default_context_string_interner_entry_count() -> u64 {
122 4096
123}
124
125const INTERNER_BASELINE_BYTES_PER_ENTRY: u64 = 512;
130
131const fn default_cached_contexts_limit() -> usize {
132 500_000
133}
134
135const fn default_cached_tagsets_limit() -> usize {
136 500_000
137}
138
139const fn default_dogstatsd_permissive_decoding() -> bool {
140 true
141}
142
143const fn default_dogstatsd_minimum_sample_rate() -> f64 {
144 0.000000003845
145}
146
147#[derive(Deserialize)]
149#[cfg_attr(test, derive(PartialEq, serde::Serialize))]
150pub struct EnablePayloadsConfiguration {
151 #[serde(default = "default_true")]
155 pub series: bool,
156
157 #[serde(default = "default_true")]
161 pub sketches: bool,
162
163 #[serde(default = "default_true")]
167 pub events: bool,
168
169 #[serde(default = "default_true")]
173 pub service_checks: bool,
174}
175
176impl Default for EnablePayloadsConfiguration {
177 fn default() -> Self {
178 Self {
179 series: true,
180 sketches: true,
181 events: true,
182 service_checks: true,
183 }
184 }
185}
186
187const fn default_true() -> bool {
188 true
189}
190
191#[serde_as]
195#[derive(Deserialize, Default)]
196#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
197#[cfg_attr(test, derive_where(PartialEq))]
198pub struct DogStatsDConfiguration {
199 #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
205 buffer_size: usize,
206
207 #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
215 buffer_count: usize,
216
217 #[serde(rename = "dogstatsd_port", default = "default_port")]
223 port: u16,
224
225 #[serde(rename = "dogstatsd_so_rcvbuf", default = "default_socket_receive_buffer_size")]
231 socket_receive_buffer_size: usize,
232
233 #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
239 tcp_port: u16,
240
241 #[serde(rename = "dogstatsd_socket", default)]
247 #[serde_as(as = "NoneAsEmptyString")]
248 socket_path: Option<String>,
249
250 #[serde(rename = "dogstatsd_stream_socket", default)]
256 #[serde_as(as = "NoneAsEmptyString")]
257 socket_stream_path: Option<String>,
258
259 #[serde(rename = "bind_host", default)]
267 #[serde_as(as = "NoneAsEmptyString")]
268 bind_host: Option<String>,
269
270 #[serde(rename = "dogstatsd_non_local_traffic", default)]
277 non_local_traffic: bool,
278
279 #[serde(
289 rename = "dogstatsd_allow_context_heap_allocs",
290 default = "default_allow_context_heap_allocations"
291 )]
292 allow_context_heap_allocations: bool,
293
294 #[serde(
302 rename = "dogstatsd_no_aggregation_pipeline",
303 default = "default_no_aggregation_pipeline_support"
304 )]
305 no_aggregation_pipeline_support: bool,
306
307 #[serde(
315 rename = "dogstatsd_string_interner_size",
316 default = "default_context_string_interner_entry_count"
317 )]
318 context_string_interner_entry_count: u64,
319
320 #[serde(rename = "dogstatsd_string_interner_size_bytes", default)]
326 context_string_interner_size_bytes: Option<ByteSize>,
327
328 #[serde(
336 rename = "dogstatsd_cached_contexts_limit",
337 default = "default_cached_contexts_limit"
338 )]
339 cached_contexts_limit: usize,
340
341 #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
349 cached_tagsets_limit: usize,
350
351 #[serde(
358 rename = "dogstatsd_permissive_decoding",
359 default = "default_dogstatsd_permissive_decoding"
360 )]
361 permissive_decoding: bool,
362
363 #[serde(
373 rename = "dogstatsd_minimum_sample_rate",
374 default = "default_dogstatsd_minimum_sample_rate"
375 )]
376 minimum_sample_rate: f64,
377
378 #[serde(rename = "enable_payloads", default)]
380 enable_payloads: EnablePayloadsConfiguration,
381
382 #[serde(flatten, default)]
384 origin_enrichment: OriginEnrichmentConfiguration,
385
386 #[serde(skip)]
388 #[cfg_attr(test, derive_where(skip))]
389 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
390
391 #[serde(rename = "dogstatsd_tags", default)]
393 additional_tags: Vec<String>,
394}
395
396async fn resolve_bind_host(host: &str) -> Result<std::net::IpAddr, Error> {
402 let mut addrs = tokio::net::lookup_host((host, 0u16))
403 .await
404 .context(UnresolvableBindHost { host: host.to_string() })?;
405 addrs
406 .next()
407 .map(|sa| sa.ip())
408 .ok_or_else(|| Error::BindHostHasNoAddresses { host: host.to_string() })
409}
410
411impl DogStatsDConfiguration {
412 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
414 Ok(config.as_typed()?)
415 }
416
417 fn effective_context_string_interner_bytes(&self) -> ByteSize {
423 match self.context_string_interner_size_bytes {
424 Some(explicit_bytes) => explicit_bytes,
425 None => ByteSize::b(self.context_string_interner_entry_count * INTERNER_BASELINE_BYTES_PER_ENTRY),
426 }
427 }
428
429 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
435 where
436 W: WorkloadProvider + Send + Sync + 'static,
437 {
438 self.workload_provider = Some(Arc::new(workload_provider));
439 self
440 }
441
442 fn build_addresses(&self, bind_host: Option<std::net::IpAddr>) -> Vec<ListenAddress> {
452 let bind_ip: std::net::IpAddr = if self.non_local_traffic {
453 [0, 0, 0, 0].into()
454 } else {
455 bind_host.unwrap_or_else(|| [127, 0, 0, 1].into())
456 };
457
458 let mut addresses: Vec<ListenAddress> = Vec::new();
459
460 if self.port != 0 {
461 addresses.push(ListenAddress::Udp(std::net::SocketAddr::new(bind_ip, self.port)));
462 }
463
464 if self.tcp_port != 0 {
465 addresses.push(ListenAddress::Tcp(std::net::SocketAddr::new(bind_ip, self.tcp_port)));
466 }
467
468 if let Some(socket_path) = &self.socket_path {
469 addresses.push(ListenAddress::Unixgram(socket_path.into()));
470 }
471
472 if let Some(socket_stream_path) = &self.socket_stream_path {
473 addresses.push(ListenAddress::Unix(socket_stream_path.into()));
474 }
475
476 addresses
477 }
478
479 async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
481 let bind_host: Option<std::net::IpAddr> = if self.non_local_traffic {
485 None
486 } else {
487 match &self.bind_host {
488 Some(host) => Some(resolve_bind_host(host).await?),
489 None => None,
490 }
491 };
492
493 let addresses = self.build_addresses(bind_host);
494 let mut listeners = Vec::new();
495 let socket_receive_buffer_size =
496 (self.socket_receive_buffer_size != 0).then_some(self.socket_receive_buffer_size);
497 for address in addresses {
498 let listener_type = address.listener_type();
499 let listener = Listener::from_listen_address(address)
500 .await
501 .context(FailedToCreateListener { listener_type })?
502 .with_receive_buffer_size(socket_receive_buffer_size);
503
504 listeners.push(listener);
505 }
506 Ok(listeners)
507 }
508}
509
510#[async_trait]
511impl SourceBuilder for DogStatsDConfiguration {
512 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
513 let listeners = self.build_listeners().await?;
514 if listeners.is_empty() {
515 return Err(Error::NoListenersConfigured.into());
516 }
517
518 if self.buffer_count < listeners.len() {
521 return Err(generic_error!(
522 "Must have a minimum of {} I/O buffers based on the number of listeners configured.",
523 listeners.len()
524 ));
525 }
526
527 let maybe_origin_tags_resolver = self
528 .workload_provider
529 .clone()
530 .map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
531 let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
532 .error_context("Failed to create context resolvers.")?;
533
534 let codec_config = DogStatsDCodecConfiguration::default()
535 .with_timestamps(self.no_aggregation_pipeline_support)
536 .with_permissive_mode(self.permissive_decoding)
537 .with_minimum_sample_rate(self.minimum_sample_rate)
538 .with_client_origin_detection(self.origin_enrichment.origin_detection_client);
539
540 let codec = DogStatsDCodec::from_configuration(codec_config);
541
542 let enable_payloads_filter = EnablePayloadsFilter::default()
543 .with_allow_series(self.enable_payloads.series)
544 .with_allow_sketches(self.enable_payloads.sketches)
545 .with_allow_events(self.enable_payloads.events)
546 .with_allow_service_checks(self.enable_payloads.service_checks);
547
548 Ok(Box::new(DogStatsD {
549 listeners,
550 io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
551 FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
552 }),
553 codec,
554 context_resolvers,
555 enabled_filter: enable_payloads_filter,
556 additional_tags: self.additional_tags.clone().into(),
557 }))
558 }
559
560 fn outputs(&self) -> &[OutputDefinition<EventType>] {
561 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
562 vec![
563 OutputDefinition::named_output("metrics", EventType::Metric),
564 OutputDefinition::named_output("events", EventType::EventD),
565 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
566 ]
567 });
568 &OUTPUTS
569 }
570}
571
572impl MemoryBounds for DogStatsDConfiguration {
573 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
574 builder
575 .minimum()
576 .with_single_value::<DogStatsD>("source struct")
578 .with_expr(UsageExpr::product(
580 "buffers",
581 UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
582 UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
583 ))
584 .with_expr(UsageExpr::config(
587 "dogstatsd_string_interner_size_bytes",
588 self.effective_context_string_interner_bytes().as_u64() as usize,
589 ));
590 }
591}
592
593pub struct DogStatsD {
595 listeners: Vec<Listener>,
596 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
597 codec: DogStatsDCodec,
598 context_resolvers: ContextResolvers,
599 enabled_filter: EnablePayloadsFilter,
600 additional_tags: Arc<[String]>,
601}
602
603struct ListenerContext {
604 shutdown_handle: DynamicShutdownHandle,
605 listener: Listener,
606 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
607 codec: DogStatsDCodec,
608 context_resolvers: ContextResolvers,
609 additional_tags: Arc<[String]>,
610}
611
612struct HandlerContext {
613 listen_addr: ListenAddress,
614 framer: DsdFramer,
615 codec: DogStatsDCodec,
616 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
617 metrics: Metrics,
618 context_resolvers: ContextResolvers,
619 additional_tags: Arc<[String]>,
620}
621
622struct Metrics {
623 metrics_received: Counter,
624 events_received: Counter,
625 service_checks_received: Counter,
626 bytes_received: Counter,
627 bytes_received_size: Histogram,
628 framing_errors: Counter,
629 metric_decoder_errors: Counter,
630 event_decoder_errors: Counter,
631 service_check_decoder_errors: Counter,
632 failed_context_resolve_total: Counter,
633 connections_active: Gauge,
634 packet_receive_success: Counter,
635 packet_receive_failure: Counter,
636}
637
638impl Metrics {
639 fn metrics_received(&self) -> &Counter {
640 &self.metrics_received
641 }
642
643 fn events_received(&self) -> &Counter {
644 &self.events_received
645 }
646
647 fn service_checks_received(&self) -> &Counter {
648 &self.service_checks_received
649 }
650
651 fn bytes_received(&self) -> &Counter {
652 &self.bytes_received
653 }
654
655 fn bytes_received_size(&self) -> &Histogram {
656 &self.bytes_received_size
657 }
658
659 fn framing_errors(&self) -> &Counter {
660 &self.framing_errors
661 }
662
663 fn metric_decode_failed(&self) -> &Counter {
664 &self.metric_decoder_errors
665 }
666
667 fn event_decode_failed(&self) -> &Counter {
668 &self.event_decoder_errors
669 }
670
671 fn service_check_decode_failed(&self) -> &Counter {
672 &self.service_check_decoder_errors
673 }
674
675 fn failed_context_resolve_total(&self) -> &Counter {
676 &self.failed_context_resolve_total
677 }
678
679 fn connections_active(&self) -> &Gauge {
680 &self.connections_active
681 }
682
683 fn packet_receive_success(&self) -> &Counter {
684 &self.packet_receive_success
685 }
686
687 fn packet_receive_failure(&self) -> &Counter {
688 &self.packet_receive_failure
689 }
690}
691
692fn build_metrics(listen_addr: &ListenAddress, component_context: &ComponentContext) -> Metrics {
693 let builder = MetricsBuilder::from_component_context(component_context);
694
695 let listener_type = match listen_addr {
696 ListenAddress::Tcp(_) => "tcp",
697 ListenAddress::Udp(_) => "udp",
698 ListenAddress::Unix(_) => "unix",
699 ListenAddress::Unixgram(_) => "unixgram",
700 };
701
702 Metrics {
703 metrics_received: builder.register_counter_with_tags(
704 "component_events_received_total",
705 [("message_type", "metrics"), ("listener_type", listener_type)],
706 ),
707 events_received: builder.register_counter_with_tags(
708 "component_events_received_total",
709 [("message_type", "events"), ("listener_type", listener_type)],
710 ),
711 service_checks_received: builder.register_counter_with_tags(
712 "component_events_received_total",
713 [("message_type", "service_checks"), ("listener_type", listener_type)],
714 ),
715 bytes_received: builder
716 .register_counter_with_tags("component_bytes_received_total", [("listener_type", listener_type)]),
717 bytes_received_size: builder
718 .register_trace_histogram_with_tags("component_bytes_received_size", [("listener_type", listener_type)]),
719 framing_errors: builder.register_counter_with_tags(
720 "component_errors_total",
721 [("listener_type", listener_type), ("error_type", "framing")],
722 ),
723 metric_decoder_errors: builder.register_counter_with_tags(
724 "component_errors_total",
725 [
726 ("listener_type", listener_type),
727 ("error_type", "decode"),
728 ("message_type", "metrics"),
729 ],
730 ),
731 event_decoder_errors: builder.register_counter_with_tags(
732 "component_errors_total",
733 [
734 ("listener_type", listener_type),
735 ("error_type", "decode"),
736 ("message_type", "events"),
737 ],
738 ),
739 service_check_decoder_errors: builder.register_counter_with_tags(
740 "component_errors_total",
741 [
742 ("listener_type", listener_type),
743 ("error_type", "decode"),
744 ("message_type", "service_checks"),
745 ],
746 ),
747 connections_active: builder
748 .register_gauge_with_tags("component_connections_active", [("listener_type", listener_type)]),
749 packet_receive_success: builder.register_debug_counter_with_tags(
750 "component_packets_received_total",
751 [("listener_type", listener_type), ("state", "ok")],
752 ),
753 packet_receive_failure: builder.register_debug_counter_with_tags(
754 "component_packets_received_total",
755 [("listener_type", listener_type), ("state", "error")],
756 ),
757 failed_context_resolve_total: builder.register_debug_counter("component_failed_context_resolve_total"),
758 }
759}
760
761#[async_trait]
762impl Source for DogStatsD {
763 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
764 let mut global_shutdown = context.take_shutdown_handle();
765 let mut health = context.take_health_handle();
766
767 let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
768
769 for listener in self.listeners {
771 let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
772
773 let listener_context = ListenerContext {
780 shutdown_handle: listener_shutdown_coordinator.register(),
781 listener,
782 io_buffer_pool: self.io_buffer_pool.clone(),
783 codec: self.codec.clone(),
784 context_resolvers: self.context_resolvers.clone(),
785 additional_tags: self.additional_tags.clone(),
786 };
787
788 spawn_traced_named(
789 task_name,
790 process_listener(context.clone(), listener_context, self.enabled_filter),
791 );
792 }
793
794 health.mark_ready();
795 debug!("DogStatsD source started.");
796
797 loop {
802 select! {
803 _ = &mut global_shutdown => {
804 debug!("Received shutdown signal.");
805 break
806 },
807 _ = health.live() => continue,
808 }
809 }
810
811 debug!("Stopping DogStatsD source...");
812
813 listener_shutdown_coordinator.shutdown().await;
814
815 debug!("DogStatsD source stopped.");
816
817 Ok(())
818 }
819}
820
821async fn process_listener(
822 source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
823) {
824 let ListenerContext {
825 shutdown_handle,
826 mut listener,
827 io_buffer_pool,
828 codec,
829 context_resolvers,
830 additional_tags,
831 } = listener_context;
832 tokio::pin!(shutdown_handle);
833
834 let listen_addr = listener.listen_address().clone();
835 let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
836
837 info!(%listen_addr, "DogStatsD listener started.");
838
839 loop {
840 select! {
841 _ = &mut shutdown_handle => {
842 debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
843 break;
844 }
845 result = listener.accept() => match result {
846 Ok(stream) => {
847 debug!(%listen_addr, "Spawning new stream handler.");
848
849 let handler_context = HandlerContext {
850 listen_addr: listen_addr.clone(),
851 framer: get_framer(&listen_addr),
852 codec: codec.clone(),
853 io_buffer_pool: io_buffer_pool.clone(),
854 metrics: build_metrics(&listen_addr, source_context.component_context()),
855 context_resolvers: context_resolvers.clone(),
856 additional_tags: additional_tags.clone(),
857 };
858
859 let task_name = format!("dogstatsd-stream-handler-{}", listen_addr.listener_type());
860 spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
861 }
862 Err(e) => {
863 error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
864 break
865 }
866 }
867 }
868 }
869
870 stream_shutdown_coordinator.shutdown().await;
871
872 info!(%listen_addr, "DogStatsD listener stopped.");
873}
874
875async fn process_stream(
876 stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
877 shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
878) {
879 tokio::pin!(shutdown_handle);
880
881 select! {
882 _ = &mut shutdown_handle => {
883 debug!("Stream handler received shutdown signal.");
884 },
885 _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
886 }
887}
888
889async fn drive_stream(
890 mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
891 enabled_filter: EnablePayloadsFilter,
892) {
893 let HandlerContext {
894 listen_addr,
895 mut framer,
896 codec,
897 io_buffer_pool,
898 metrics,
899 mut context_resolvers,
900 additional_tags,
901 } = handler_context;
902
903 debug!(%listen_addr, "Stream handler started.");
904
905 if !stream.is_connectionless() {
906 metrics.connections_active().increment(1);
907 }
908
909 let mut buffer_flush = interval(Duration::from_millis(100));
912 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
913
914 let mut event_buffer_manager = EventBufferManager::default();
915 let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
916 let memory_limiter = source_context.topology_context().memory_limiter();
917
918 'read: loop {
919 let mut eof = false;
920
921 let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
922
923 memory_limiter.wait_for_capacity().await;
924
925 select! {
926 read_result = stream.receive(&mut io_buffer) => match read_result {
928 Ok((bytes_read, peer_addr)) => {
929 if bytes_read == 0 {
930 eof = true;
931 }
932
933 metrics.packet_receive_success().increment(1);
943 metrics.bytes_received().increment(bytes_read as u64);
944 metrics.bytes_received_size().record(bytes_read as f64);
945
946 let reached_eof = eof || stream.is_connectionless();
952
953 trace!(
954 buffer_len = io_buffer.remaining(),
955 buffer_cap = io_buffer.remaining_mut(),
956 eof = reached_eof,
957 %listen_addr,
958 %peer_addr,
959 "Received {} bytes from stream.",
960 bytes_read
961 );
962
963 let mut frames = io_buffer.framed(&mut framer, reached_eof);
964 'frame: loop {
965 match frames.next() {
966 Some(Ok(frame)) => {
967 trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
968 match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
969 Ok(Some(event)) => {
970 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
971 debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
972 dispatch_events(event_buffer, &source_context, &listen_addr).await;
973 }
974 },
975 Ok(None) => {
976 continue
981 },
982 Err(e) => {
983 let frame_lossy_str = String::from_utf8_lossy(&frame);
984 warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
985 },
986 }
987 }
988 Some(Err(e)) => {
989 metrics.framing_errors().increment(1);
990
991 if stream.is_connectionless() {
992 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
995 continue 'read;
996 } else {
997 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
998 break 'read;
999 }
1000 }
1001 None => {
1002 trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
1003 if eof && !stream.is_connectionless() {
1004 debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
1005 break 'read;
1006 } else {
1007 break 'frame;
1008 }
1009 }
1010 }
1011 }
1012 },
1013 Err(e) => {
1014 metrics.packet_receive_failure().increment(1);
1015
1016 if stream.is_connectionless() {
1017 warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
1020 continue 'read;
1021 } else {
1022 warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
1023 break 'read;
1024 }
1025 }
1026 },
1027
1028 _ = buffer_flush.tick() => {
1029 if let Some(event_buffer) = event_buffer_manager.consume() {
1030 dispatch_events(event_buffer, &source_context, &listen_addr).await;
1031 }
1032 },
1033 }
1034 }
1035
1036 if let Some(event_buffer) = event_buffer_manager.consume() {
1037 dispatch_events(event_buffer, &source_context, &listen_addr).await;
1038 }
1039
1040 metrics.connections_active().decrement(1);
1041
1042 debug!(%listen_addr, "Stream handler stopped.");
1043}
1044
1045fn handle_frame(
1046 frame: &[u8], codec: &DogStatsDCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
1047 peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
1048) -> Result<Option<Event>, ParseError> {
1049 let parsed = match codec.decode_packet(frame) {
1050 Ok(parsed) => parsed,
1051 Err(e) => {
1052 match parse_message_type(frame) {
1054 MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
1055 MessageType::Event => source_metrics.event_decode_failed().increment(1),
1056 MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
1057 }
1058
1059 return Err(e);
1060 }
1061 };
1062
1063 let event = match parsed {
1064 ParsedPacket::Metric(metric_packet) => {
1065 if metric_packet.num_points == 0 {
1066 return Ok(None);
1067 }
1068 let events_len = metric_packet.num_points;
1069 if !enabled_filter.allow_metric(&metric_packet) {
1070 trace!(
1071 metric.name = metric_packet.metric_name,
1072 "Skipping metric due to filter configuration."
1073 );
1074 return Ok(None);
1075 }
1076
1077 match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
1078 Some(metric) => {
1079 source_metrics.metrics_received().increment(events_len);
1080 Event::Metric(metric)
1081 }
1082 None => {
1083 source_metrics.failed_context_resolve_total().increment(1);
1085 return Ok(None);
1086 }
1087 }
1088 }
1089 ParsedPacket::Event(event) => {
1090 if !enabled_filter.allow_event(&event) {
1091 trace!("Skipping event {} due to filter configuration.", event.title);
1092 return Ok(None);
1093 }
1094 let tags_resolver = context_resolvers.tags();
1095 match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
1096 Some(event) => {
1097 source_metrics.events_received().increment(1);
1098 Event::EventD(event)
1099 }
1100 None => {
1101 source_metrics.failed_context_resolve_total().increment(1);
1102 return Ok(None);
1103 }
1104 }
1105 }
1106 ParsedPacket::ServiceCheck(service_check) => {
1107 if !enabled_filter.allow_service_check(&service_check) {
1108 trace!(
1109 "Skipping service check {} due to filter configuration.",
1110 service_check.name
1111 );
1112 return Ok(None);
1113 }
1114 let tags_resolver = context_resolvers.tags();
1115 match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
1116 Some(service_check) => {
1117 source_metrics.service_checks_received().increment(1);
1118 Event::ServiceCheck(service_check)
1119 }
1120 None => {
1121 source_metrics.failed_context_resolve_total().increment(1);
1122 return Ok(None);
1123 }
1124 }
1125 }
1126 };
1127
1128 Ok(Some(event))
1129}
1130
1131fn handle_metric_packet(
1132 packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1133 additional_tags: &[String],
1134) -> Option<Metric> {
1135 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1136
1137 let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1138 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1139 origin.set_process_id(creds.pid as u32);
1140 }
1141
1142 let context_resolver = if packet.timestamp.is_some() {
1144 context_resolvers.no_agg()
1145 } else {
1146 context_resolvers.primary()
1147 };
1148
1149 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1150
1151 match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1153 Some(context) => {
1154 let metric_origin = well_known_tags
1155 .jmx_check_name
1156 .map(MetricOrigin::jmx_check)
1157 .unwrap_or_else(MetricOrigin::dogstatsd);
1158 let metadata = MetricMetadata::default()
1159 .with_origin(metric_origin)
1160 .with_hostname(well_known_tags.hostname.map(Arc::from))
1161 .with_unit(packet.unit.map_or_else(MetaString::empty, MetaString::from_static));
1162
1163 Some(Metric::from_parts(context, packet.values, metadata))
1164 }
1165 None => None,
1167 }
1168}
1169
1170fn handle_event_packet(
1171 packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1172) -> Option<EventD> {
1173 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1174
1175 let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1176 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1177 origin.set_process_id(creds.pid as u32);
1178 }
1179 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1180
1181 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1182 let tags = tags_resolver.create_tag_set(tags)?;
1183
1184 let timestamp = packet
1188 .timestamp
1189 .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1190
1191 let eventd = EventD::new(packet.title, packet.text)
1192 .with_timestamp(timestamp)
1193 .with_hostname(packet.hostname.map(|s| s.into()))
1194 .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1195 .with_alert_type(packet.alert_type)
1196 .with_priority(packet.priority)
1197 .with_source_type_name(Some(
1202 packet
1203 .source_type_name
1204 .map(|s| s.into())
1205 .unwrap_or_else(|| "api".into()),
1206 ))
1207 .with_alert_type(packet.alert_type)
1208 .with_tags(tags)
1209 .with_origin_tags(origin_tags);
1210
1211 Some(eventd)
1212}
1213
1214fn handle_service_check_packet(
1215 packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1216 additional_tags: &[String],
1217) -> Option<ServiceCheck> {
1218 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1219
1220 let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1221 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1222 origin.set_process_id(creds.pid as u32);
1223 }
1224 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1225
1226 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1227 let tags = tags_resolver.create_tag_set(tags)?;
1228
1229 let timestamp = packet
1233 .timestamp
1234 .or_else(|| SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()));
1235
1236 let service_check = ServiceCheck::new(packet.name, packet.status)
1237 .with_timestamp(timestamp)
1238 .with_hostname(packet.hostname.map(|s| s.into()))
1239 .with_tags(tags)
1240 .with_origin_tags(origin_tags)
1241 .with_message(packet.message.map(|s| s.into()));
1242
1243 Some(service_check)
1244}
1245
1246fn get_filtered_tags_iterator<'a>(
1247 raw_tags: RawTags<'a>, additional_tags: &'a [String],
1248) -> impl Iterator<Item = &'a str> + Clone {
1249 RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1252}
1253
1254async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1255 debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1256
1257 if event_buffer.has_event_type(EventType::EventD) {
1267 let eventd_events = event_buffer.extract(Event::is_eventd);
1268 if let Err(e) = source_context
1269 .dispatcher()
1270 .buffered_named("events")
1271 .expect("events output should always exist")
1272 .send_all(eventd_events)
1273 .await
1274 {
1275 error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1276 }
1277 }
1278
1279 if event_buffer.has_event_type(EventType::ServiceCheck) {
1281 let service_check_events = event_buffer.extract(Event::is_service_check);
1282 if let Err(e) = source_context
1283 .dispatcher()
1284 .buffered_named("service_checks")
1285 .expect("service checks output should always exist")
1286 .send_all(service_check_events)
1287 .await
1288 {
1289 error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1290 }
1291 }
1292
1293 if !event_buffer.is_empty() {
1295 if let Err(e) = source_context
1296 .dispatcher()
1297 .dispatch_named("metrics", event_buffer)
1298 .await
1299 {
1300 error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1301 }
1302 }
1303}
1304
1305const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1306 buffer_size + 4
1321}
1322
1323#[cfg(test)]
1324mod tests {
1325 use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
1326
1327 use bytesize::ByteSize;
1328 use saluki_context::{ContextResolverBuilder, TagsResolverBuilder};
1329 use saluki_io::net::ListenAddress;
1330 use saluki_io::{
1331 deser::codec::dogstatsd::{DogStatsDCodec, DogStatsDCodecConfiguration, ParsedPacket},
1332 net::ConnectionAddress,
1333 };
1334
1335 use super::{handle_metric_packet, ContextResolvers, DogStatsDConfiguration};
1336
1337 #[test]
1338 fn no_metrics_when_interner_full_allocations_disallowed() {
1339 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1347 let tags_resolver = TagsResolverBuilder::for_tests().build();
1348 let context_resolver = ContextResolverBuilder::for_tests()
1349 .with_heap_allocations(false)
1350 .with_tags_resolver(Some(tags_resolver.clone()))
1351 .build();
1352 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1353 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1354
1355 let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1356
1357 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1358 panic!("Failed to parse packet.");
1359 };
1360
1361 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1362 assert!(maybe_metric.is_none());
1363 }
1364
1365 #[test]
1366 fn metric_with_additional_tags() {
1367 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1368 let tags_resolver = TagsResolverBuilder::for_tests().build();
1369 let context_resolver = ContextResolverBuilder::for_tests()
1370 .with_heap_allocations(false)
1371 .with_tags_resolver(Some(tags_resolver.clone()))
1372 .build();
1373 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1374 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1375
1376 let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1377 let existing_tags_str = existing_tags.join(",");
1378
1379 let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1380 let additional_tags = [
1381 "tag4:value4".to_string(),
1382 "tag5:value5".to_string(),
1383 "tag6:value6".to_string(),
1384 ];
1385
1386 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1387 panic!("Failed to parse packet.");
1388 };
1389 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1390 assert!(maybe_metric.is_some());
1391
1392 let metric = maybe_metric.unwrap();
1393 let context = metric.context();
1394
1395 for tag in existing_tags {
1396 assert!(context.tags().has_tag(tag));
1397 }
1398
1399 for tag in additional_tags {
1400 assert!(context.tags().has_tag(tag));
1401 }
1402 }
1403
1404 fn deser_config(json: &str) -> DogStatsDConfiguration {
1405 serde_json::from_str(json).expect("failed to deserialize config")
1406 }
1407
1408 #[test]
1409 fn interner_size_defaults_to_2mib() {
1410 let config = deser_config("{}");
1411 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
1412 }
1413
1414 #[test]
1415 fn socket_receive_buffer_size_defaults_to_zero() {
1416 let config = deser_config("{}");
1417 assert_eq!(config.socket_receive_buffer_size, 0);
1418 }
1419
1420 #[test]
1421 fn socket_receive_buffer_size_from_config() {
1422 let config = deser_config(r#"{"dogstatsd_so_rcvbuf": 131072}"#);
1423 assert_eq!(config.socket_receive_buffer_size, 131_072);
1424 }
1425
1426 #[test]
1427 fn interner_size_from_entry_count() {
1428 let config = deser_config(r#"{"dogstatsd_string_interner_size": 4096}"#);
1430 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2));
1431 }
1432
1433 #[test]
1434 fn interner_size_from_explicit_bytes() {
1435 let config = deser_config(r#"{"dogstatsd_string_interner_size_bytes": 4194304}"#);
1436 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(4194304));
1437 }
1438
1439 #[test]
1440 fn interner_size_explicit_bytes_takes_priority() {
1441 let config = deser_config(
1442 r#"{"dogstatsd_string_interner_size": 4096, "dogstatsd_string_interner_size_bytes": 8388608}"#,
1443 );
1444 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(8388608));
1446 }
1447
1448 #[test]
1449 fn interner_size_custom_entry_count() {
1450 let config = deser_config(r#"{"dogstatsd_string_interner_size": 8192}"#);
1451 assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(4));
1453 }
1454
1455 fn address_list_eq(expected: &mut [ListenAddress], actual: &mut [ListenAddress]) -> Result<(), String> {
1457 if expected.len() != actual.len() {
1458 return Err(format!(
1459 "length mismatch: expected {} addresses, got {}",
1460 expected.len(),
1461 actual.len()
1462 ));
1463 }
1464
1465 expected.sort_by_key(|a| a.to_string());
1466 actual.sort_by_key(|a| a.to_string());
1467
1468 for (e, a) in expected.iter().zip(actual.iter()) {
1469 let (es, as_) = (e.to_string(), a.to_string());
1470 if es != as_ {
1471 return Err(format!("address mismatch: expected {}, got {}", es, as_));
1472 }
1473 }
1474
1475 Ok(())
1476 }
1477
1478 #[test]
1481 fn build_addresses_assertion_function_works() {
1482 let config = DogStatsDConfiguration {
1483 port: 0,
1484 tcp_port: 123,
1485 socket_path: None,
1486 socket_stream_path: None,
1487 non_local_traffic: false,
1488 ..Default::default()
1489 };
1490 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
1491 Ipv4Addr::new(127, 0, 0, 2),
1493 123,
1494 )))];
1495 let mut actual = config.build_addresses(None);
1496 assert!(address_list_eq(&mut expected, &mut actual).is_err())
1497 }
1498
1499 #[test]
1501 fn build_addresses_no_listeners() {
1502 let config = DogStatsDConfiguration {
1503 port: 0,
1504 tcp_port: 0,
1505 socket_path: None,
1506 socket_stream_path: None,
1507 non_local_traffic: false,
1508 ..Default::default()
1509 };
1510 let mut expected = vec![];
1511 let mut actual = config.build_addresses(None);
1512 address_list_eq(&mut expected, &mut actual).unwrap();
1513 }
1514
1515 #[test]
1517 fn build_addresses_udp_local_only() {
1518 let config = DogStatsDConfiguration {
1519 port: 8125,
1520 tcp_port: 0,
1521 socket_path: None,
1522 socket_stream_path: None,
1523 non_local_traffic: false,
1524 ..Default::default()
1525 };
1526 let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
1527 Ipv4Addr::new(127, 0, 0, 1),
1528 8125,
1529 )))];
1530 let mut actual = config.build_addresses(None);
1531 address_list_eq(&mut expected, &mut actual).unwrap();
1532 }
1533
1534 #[test]
1536 fn build_addresses_udp_non_local_only() {
1537 let config = DogStatsDConfiguration {
1538 port: 8125,
1539 tcp_port: 0,
1540 socket_path: None,
1541 socket_stream_path: None,
1542 non_local_traffic: true,
1543 ..Default::default()
1544 };
1545 let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(
1546 Ipv4Addr::new(0, 0, 0, 0),
1547 8125,
1548 )))];
1549 let mut actual = config.build_addresses(None);
1550 address_list_eq(&mut expected, &mut actual).unwrap();
1551 }
1552
1553 #[test]
1555 fn build_addresses_tcp_local_only() {
1556 let config = DogStatsDConfiguration {
1557 port: 0,
1558 tcp_port: 9000,
1559 socket_path: None,
1560 socket_stream_path: None,
1561 non_local_traffic: false,
1562 ..Default::default()
1563 };
1564 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
1565 Ipv4Addr::new(127, 0, 0, 1),
1566 9000,
1567 )))];
1568 let mut actual = config.build_addresses(None);
1569 address_list_eq(&mut expected, &mut actual).unwrap();
1570 }
1571
1572 #[test]
1574 fn build_addresses_tcp_non_local_only() {
1575 let config = DogStatsDConfiguration {
1576 port: 0,
1577 tcp_port: 9000,
1578 socket_path: None,
1579 socket_stream_path: None,
1580 non_local_traffic: true,
1581 ..Default::default()
1582 };
1583 let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(
1584 Ipv4Addr::new(0, 0, 0, 0),
1585 9000,
1586 )))];
1587 let mut actual = config.build_addresses(None);
1588 address_list_eq(&mut expected, &mut actual).unwrap();
1589 }
1590
1591 #[test]
1593 fn build_addresses_unixgram_only() {
1594 let config = DogStatsDConfiguration {
1595 port: 0,
1596 tcp_port: 0,
1597 socket_path: Some("/tmp/dsd.sock".to_string()),
1598 socket_stream_path: None,
1599 non_local_traffic: false,
1600 ..Default::default()
1601 };
1602 let mut expected = vec![ListenAddress::Unixgram("/tmp/dsd.sock".into())];
1603 let mut actual = config.build_addresses(None);
1604 address_list_eq(&mut expected, &mut actual).unwrap();
1605 }
1606
1607 #[test]
1609 fn build_addresses_unix_stream_only() {
1610 let config = DogStatsDConfiguration {
1611 port: 0,
1612 tcp_port: 0,
1613 socket_path: None,
1614 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
1615 non_local_traffic: false,
1616 ..Default::default()
1617 };
1618 let mut expected = vec![ListenAddress::Unix("/tmp/dsd-stream.sock".into())];
1619 let mut actual = config.build_addresses(None);
1620 address_list_eq(&mut expected, &mut actual).unwrap();
1621 }
1622
1623 #[test]
1625 fn build_addresses_all_four_non_local() {
1626 let config = DogStatsDConfiguration {
1627 port: 8125,
1628 tcp_port: 9000,
1629 socket_path: Some("/tmp/dsd.sock".to_string()),
1630 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
1631 non_local_traffic: true,
1632 ..Default::default()
1633 };
1634 let mut expected = vec![
1635 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
1636 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
1637 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
1638 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
1639 ];
1640 let mut actual = config.build_addresses(None);
1641 address_list_eq(&mut expected, &mut actual).unwrap();
1642 }
1643
1644 #[test]
1646 fn build_addresses_all_four_local() {
1647 let config = DogStatsDConfiguration {
1648 port: 8125,
1649 tcp_port: 9000,
1650 socket_path: Some("/tmp/dsd.sock".to_string()),
1651 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
1652 non_local_traffic: false,
1653 ..Default::default()
1654 };
1655 let mut expected = vec![
1656 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8125))),
1657 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9000))),
1658 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
1659 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
1660 ];
1661 let mut actual = config.build_addresses(None);
1662 address_list_eq(&mut expected, &mut actual).unwrap();
1663 }
1664
1665 #[test]
1668 fn build_addresses_bind_host_applies_to_udp_and_tcp() {
1669 let config = DogStatsDConfiguration {
1670 port: 8125,
1671 tcp_port: 9000,
1672 socket_path: Some("/tmp/dsd.sock".to_string()),
1673 socket_stream_path: None,
1674 non_local_traffic: false,
1675 ..Default::default()
1676 };
1677 let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
1678 let mut expected = vec![
1679 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 8125))),
1680 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 50), 9000))),
1681 ListenAddress::Unixgram("/tmp/dsd.sock".into()),
1682 ];
1683 let mut actual = config.build_addresses(bind_host);
1684 address_list_eq(&mut expected, &mut actual).unwrap();
1685 }
1686
1687 #[test]
1691 fn build_addresses_non_local_clobbers_bind_host() {
1692 let config = DogStatsDConfiguration {
1693 port: 8125,
1694 tcp_port: 9000,
1695 socket_path: None,
1696 socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()),
1697 non_local_traffic: true,
1698 ..Default::default()
1699 };
1700 let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50)));
1701 let mut expected = vec![
1702 ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8125))),
1703 ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 9000))),
1704 ListenAddress::Unix("/tmp/dsd-stream.sock".into()),
1705 ];
1706 let mut actual = config.build_addresses(bind_host);
1707 address_list_eq(&mut expected, &mut actual).unwrap();
1708 }
1709
1710 #[test]
1711 fn non_finite_metric_values_are_silently_dropped() {
1712 let codec = DogStatsDCodec::from_configuration(DogStatsDCodecConfiguration::default());
1717 for input in &[b"my.gauge:NaN|g" as &[u8], b"my.gauge:inf|g", b"my.gauge:-inf|g"] {
1718 match codec.decode_packet(input).expect("should decode without error") {
1719 ParsedPacket::Metric(packet) => assert_eq!(
1720 packet.num_points, 0,
1721 "non-finite value should be dropped, leaving 0 valid points"
1722 ),
1723 _ => panic!("expected Metric packet"),
1724 }
1725 }
1726 }
1727}
1728
1729#[cfg(test)]
1730mod config_smoke {
1731 use serde_json::json;
1732
1733 use super::DogStatsDConfiguration;
1734 use crate::config_registry::structs;
1735 use crate::config_registry::test_support::run_config_smoke_tests;
1736
1737 #[tokio::test]
1738 async fn smoke_test() {
1739 run_config_smoke_tests(structs::DOGSTATSD_CONFIGURATION, &[], json!({}), |cfg| {
1740 cfg.as_typed::<DogStatsDConfiguration>()
1741 .expect("DogStatsDConfiguration should deserialize")
1742 })
1743 .await
1744 }
1745}