1use std::sync::{Arc, LazyLock};
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::{Buf, BufMut};
6use bytesize::ByteSize;
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
8use metrics::{Counter, Gauge, Histogram};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_context::{
12 tags::{RawTags, RawTagsFilter},
13 TagsResolver,
14};
15use saluki_core::data_model::event::metric::Metric;
16use saluki_core::data_model::event::{
17 eventd::EventD,
18 metric::{MetricMetadata, MetricOrigin},
19 service_check::ServiceCheck,
20 Event, EventType,
21};
22use saluki_core::{
23 components::{sources::*, ComponentContext},
24 observability::ComponentMetricsExt as _,
25 pooling::FixedSizeObjectPool,
26 topology::{
27 interconnect::EventBufferManager,
28 shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
29 EventsBuffer, OutputDefinition,
30 },
31};
32use saluki_env::WorkloadProvider;
33use saluki_error::{generic_error, ErrorContext as _, GenericError};
34use saluki_io::{
35 buf::{BytesBuffer, FixedSizeVec},
36 deser::{codec::dogstatsd::*, framing::FramerExt as _},
37 net::{
38 listener::{Listener, ListenerError},
39 ConnectionAddress, ListenAddress, Stream,
40 },
41};
42use saluki_metrics::MetricsBuilder;
43use serde::Deserialize;
44use serde_with::{serde_as, NoneAsEmptyString};
45use snafu::{ResultExt as _, Snafu};
46use tokio::{
47 select,
48 time::{interval, MissedTickBehavior},
49};
50use tracing::{debug, error, info, trace, warn};
51
52mod framer;
53use self::framer::{get_framer, DsdFramer};
54use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
55
56mod filters;
57use self::filters::EnablePayloadsFilter;
58
59mod io_buffer;
60use self::io_buffer::IoBufferManager;
61
62mod origin;
63use self::origin::{
64 origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, DogStatsDOriginTagResolver,
65 OriginEnrichmentConfiguration,
66};
67
68mod resolver;
69use self::resolver::ContextResolvers;
70
71mod tags;
72
73#[derive(Debug, Snafu)]
74#[snafu(context(suffix(false)))]
75enum Error {
76 #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
77 FailedToCreateListener {
78 listener_type: &'static str,
79 source: ListenerError,
80 },
81
82 #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
83 NoListenersConfigured,
84}
85
86const fn default_buffer_size() -> usize {
87 8192
88}
89
90const fn default_buffer_count() -> usize {
91 128
92}
93
94const fn default_port() -> u16 {
95 8125
96}
97
98const fn default_tcp_port() -> u16 {
99 0
100}
101
102const fn default_allow_context_heap_allocations() -> bool {
103 true
104}
105
106const fn default_no_aggregation_pipeline_support() -> bool {
107 true
108}
109
110const fn default_context_string_interner_size() -> ByteSize {
111 ByteSize::mib(2)
112}
113
114const fn default_cached_contexts_limit() -> usize {
115 500_000
116}
117
118const fn default_cached_tagsets_limit() -> usize {
119 500_000
120}
121
122const fn default_dogstatsd_permissive_decoding() -> bool {
123 true
124}
125
126const fn default_dogstatsd_minimum_sample_rate() -> f64 {
127 0.000000003845
128}
129
130const fn default_enable_payloads_series() -> bool {
131 true
132}
133
134const fn default_enable_payloads_sketches() -> bool {
135 true
136}
137
138const fn default_enable_payloads_events() -> bool {
139 true
140}
141
142const fn default_enable_payloads_service_checks() -> bool {
143 true
144}
145
146#[serde_as]
150#[derive(Deserialize)]
151pub struct DogStatsDConfiguration {
152 #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
158 buffer_size: usize,
159
160 #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
168 buffer_count: usize,
169
170 #[serde(rename = "dogstatsd_port", default = "default_port")]
176 port: u16,
177
178 #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
184 tcp_port: u16,
185
186 #[serde(rename = "dogstatsd_socket", default)]
192 #[serde_as(as = "NoneAsEmptyString")]
193 socket_path: Option<String>,
194
195 #[serde(rename = "dogstatsd_stream_socket", default)]
201 #[serde_as(as = "NoneAsEmptyString")]
202 socket_stream_path: Option<String>,
203
204 #[serde(rename = "dogstatsd_non_local_traffic", default)]
211 non_local_traffic: bool,
212
213 #[serde(
223 rename = "dogstatsd_allow_context_heap_allocs",
224 default = "default_allow_context_heap_allocations"
225 )]
226 allow_context_heap_allocations: bool,
227
228 #[serde(
236 rename = "dogstatsd_no_aggregation_pipeline",
237 default = "default_no_aggregation_pipeline_support"
238 )]
239 no_aggregation_pipeline_support: bool,
240
241 #[serde(
247 rename = "dogstatsd_string_interner_size",
248 default = "default_context_string_interner_size"
249 )]
250 context_string_interner_bytes: ByteSize,
251
252 #[serde(
260 rename = "dogstatsd_cached_contexts_limit",
261 default = "default_cached_contexts_limit"
262 )]
263 cached_contexts_limit: usize,
264
265 #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
273 cached_tagsets_limit: usize,
274
275 #[serde(
282 rename = "dogstatsd_permissive_decoding",
283 default = "default_dogstatsd_permissive_decoding"
284 )]
285 permissive_decoding: bool,
286
287 #[serde(
297 rename = "dogstatsd_minimum_sample_rate",
298 default = "default_dogstatsd_minimum_sample_rate"
299 )]
300 minimum_sample_rate: f64,
301
302 #[serde(default = "default_enable_payloads_series")]
306 enable_payloads_series: bool,
307
308 #[serde(default = "default_enable_payloads_sketches")]
312 enable_payloads_sketches: bool,
313
314 #[serde(default = "default_enable_payloads_events")]
318 enable_payloads_events: bool,
319
320 #[serde(default = "default_enable_payloads_service_checks")]
324 enable_payloads_service_checks: bool,
325
326 #[serde(flatten, default)]
328 origin_enrichment: OriginEnrichmentConfiguration,
329
330 #[serde(skip)]
332 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
333
334 #[serde(rename = "dogstatsd_tags", default)]
336 additional_tags: Vec<String>,
337}
338
339impl DogStatsDConfiguration {
340 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
342 Ok(config.as_typed()?)
343 }
344
345 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
351 where
352 W: WorkloadProvider + Send + Sync + 'static,
353 {
354 self.workload_provider = Some(Arc::new(workload_provider));
355 self
356 }
357
358 async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
359 let mut listeners = Vec::new();
360
361 if self.port != 0 {
362 let address = if self.non_local_traffic {
363 ListenAddress::Udp(([0, 0, 0, 0], self.port).into())
364 } else {
365 ListenAddress::Udp(([127, 0, 0, 1], self.port).into())
366 };
367
368 let listener = Listener::from_listen_address(address)
369 .await
370 .context(FailedToCreateListener { listener_type: "UDP" })?;
371 listeners.push(listener);
372 }
373
374 if self.tcp_port != 0 {
375 let address = if self.non_local_traffic {
376 ListenAddress::Tcp(([0, 0, 0, 0], self.tcp_port).into())
377 } else {
378 ListenAddress::Tcp(([127, 0, 0, 1], self.tcp_port).into())
379 };
380
381 let listener = Listener::from_listen_address(address)
382 .await
383 .context(FailedToCreateListener { listener_type: "TCP" })?;
384 listeners.push(listener);
385 }
386
387 if let Some(socket_path) = &self.socket_path {
388 let address = ListenAddress::Unixgram(socket_path.into());
389 let listener = Listener::from_listen_address(address)
390 .await
391 .context(FailedToCreateListener {
392 listener_type: "UDS (datagram)",
393 })?;
394 listeners.push(listener);
395 }
396
397 if let Some(socket_stream_path) = &self.socket_stream_path {
398 let address = ListenAddress::Unix(socket_stream_path.into());
399 let listener = Listener::from_listen_address(address)
400 .await
401 .context(FailedToCreateListener {
402 listener_type: "UDS (stream)",
403 })?;
404 listeners.push(listener);
405 }
406
407 Ok(listeners)
408 }
409}
410
411#[async_trait]
412impl SourceBuilder for DogStatsDConfiguration {
413 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
414 let listeners = self.build_listeners().await?;
415 if listeners.is_empty() {
416 return Err(Error::NoListenersConfigured.into());
417 }
418
419 if self.buffer_count < listeners.len() {
422 return Err(generic_error!(
423 "Must have a minimum of {} I/O buffers based on the number of listeners configured.",
424 listeners.len()
425 ));
426 }
427
428 let maybe_origin_tags_resolver = self
429 .workload_provider
430 .clone()
431 .map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
432 let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
433 .error_context("Failed to create context resolvers.")?;
434
435 let codec_config = DogstatsdCodecConfiguration::default()
436 .with_timestamps(self.no_aggregation_pipeline_support)
437 .with_permissive_mode(self.permissive_decoding)
438 .with_minimum_sample_rate(self.minimum_sample_rate);
439
440 let codec = DogstatsdCodec::from_configuration(codec_config);
441
442 let enable_payloads_filter = EnablePayloadsFilter::default()
443 .with_allow_series(self.enable_payloads_series)
444 .with_allow_sketches(self.enable_payloads_sketches)
445 .with_allow_events(self.enable_payloads_events)
446 .with_allow_service_checks(self.enable_payloads_service_checks);
447
448 Ok(Box::new(DogStatsD {
449 listeners,
450 io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
451 FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
452 }),
453 codec,
454 context_resolvers,
455 enabled_filter: enable_payloads_filter,
456 additional_tags: self.additional_tags.clone().into(),
457 }))
458 }
459
460 fn outputs(&self) -> &[OutputDefinition<EventType>] {
461 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
462 vec![
463 OutputDefinition::named_output("metrics", EventType::Metric),
464 OutputDefinition::named_output("events", EventType::EventD),
465 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
466 ]
467 });
468 &OUTPUTS
469 }
470}
471
472impl MemoryBounds for DogStatsDConfiguration {
473 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
474 builder
475 .minimum()
476 .with_single_value::<DogStatsD>("source struct")
478 .with_expr(UsageExpr::product(
480 "buffers",
481 UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
482 UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
483 ))
484 .with_expr(UsageExpr::config(
487 "dogstatsd_string_interner_size",
488 self.context_string_interner_bytes.as_u64() as usize,
489 ));
490 }
491}
492
493pub struct DogStatsD {
495 listeners: Vec<Listener>,
496 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
497 codec: DogstatsdCodec,
498 context_resolvers: ContextResolvers,
499 enabled_filter: EnablePayloadsFilter,
500 additional_tags: Arc<[String]>,
501}
502
503struct ListenerContext {
504 shutdown_handle: DynamicShutdownHandle,
505 listener: Listener,
506 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
507 codec: DogstatsdCodec,
508 context_resolvers: ContextResolvers,
509 additional_tags: Arc<[String]>,
510}
511
512struct HandlerContext {
513 listen_addr: ListenAddress,
514 framer: DsdFramer,
515 codec: DogstatsdCodec,
516 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
517 metrics: Metrics,
518 context_resolvers: ContextResolvers,
519 additional_tags: Arc<[String]>,
520}
521
522struct Metrics {
523 metrics_received: Counter,
524 events_received: Counter,
525 service_checks_received: Counter,
526 bytes_received: Counter,
527 bytes_received_size: Histogram,
528 framing_errors: Counter,
529 metric_decoder_errors: Counter,
530 event_decoder_errors: Counter,
531 service_check_decoder_errors: Counter,
532 failed_context_resolve_total: Counter,
533 connections_active: Gauge,
534 packet_receive_success: Counter,
535 packet_receive_failure: Counter,
536}
537
538impl Metrics {
539 fn metrics_received(&self) -> &Counter {
540 &self.metrics_received
541 }
542
543 fn events_received(&self) -> &Counter {
544 &self.events_received
545 }
546
547 fn service_checks_received(&self) -> &Counter {
548 &self.service_checks_received
549 }
550
551 fn bytes_received(&self) -> &Counter {
552 &self.bytes_received
553 }
554
555 fn bytes_received_size(&self) -> &Histogram {
556 &self.bytes_received_size
557 }
558
559 fn framing_errors(&self) -> &Counter {
560 &self.framing_errors
561 }
562
563 fn metric_decode_failed(&self) -> &Counter {
564 &self.metric_decoder_errors
565 }
566
567 fn event_decode_failed(&self) -> &Counter {
568 &self.event_decoder_errors
569 }
570
571 fn service_check_decode_failed(&self) -> &Counter {
572 &self.service_check_decoder_errors
573 }
574
575 fn failed_context_resolve_total(&self) -> &Counter {
576 &self.failed_context_resolve_total
577 }
578
579 fn connections_active(&self) -> &Gauge {
580 &self.connections_active
581 }
582
583 fn packet_receive_success(&self) -> &Counter {
584 &self.packet_receive_success
585 }
586
587 fn packet_receive_failure(&self) -> &Counter {
588 &self.packet_receive_failure
589 }
590}
591
592fn build_metrics(listen_addr: &ListenAddress, component_context: &ComponentContext) -> Metrics {
593 let builder = MetricsBuilder::from_component_context(component_context);
594
595 let listener_type = match listen_addr {
596 ListenAddress::Tcp(_) => "tcp",
597 ListenAddress::Udp(_) => "udp",
598 ListenAddress::Unix(_) => "unix",
599 ListenAddress::Unixgram(_) => "unixgram",
600 };
601
602 Metrics {
603 metrics_received: builder.register_counter_with_tags(
604 "component_events_received_total",
605 [("message_type", "metrics"), ("listener_type", listener_type)],
606 ),
607 events_received: builder.register_counter_with_tags(
608 "component_events_received_total",
609 [("message_type", "events"), ("listener_type", listener_type)],
610 ),
611 service_checks_received: builder.register_counter_with_tags(
612 "component_events_received_total",
613 [("message_type", "service_checks"), ("listener_type", listener_type)],
614 ),
615 bytes_received: builder
616 .register_counter_with_tags("component_bytes_received_total", [("listener_type", listener_type)]),
617 bytes_received_size: builder
618 .register_trace_histogram_with_tags("component_bytes_received_size", [("listener_type", listener_type)]),
619 framing_errors: builder.register_counter_with_tags(
620 "component_errors_total",
621 [("listener_type", listener_type), ("error_type", "framing")],
622 ),
623 metric_decoder_errors: builder.register_counter_with_tags(
624 "component_errors_total",
625 [
626 ("listener_type", listener_type),
627 ("error_type", "decode"),
628 ("message_type", "metrics"),
629 ],
630 ),
631 event_decoder_errors: builder.register_counter_with_tags(
632 "component_errors_total",
633 [
634 ("listener_type", listener_type),
635 ("error_type", "decode"),
636 ("message_type", "events"),
637 ],
638 ),
639 service_check_decoder_errors: builder.register_counter_with_tags(
640 "component_errors_total",
641 [
642 ("listener_type", listener_type),
643 ("error_type", "decode"),
644 ("message_type", "service_checks"),
645 ],
646 ),
647 connections_active: builder
648 .register_gauge_with_tags("component_connections_active", [("listener_type", listener_type)]),
649 packet_receive_success: builder.register_debug_counter_with_tags(
650 "component_packets_received_total",
651 [("listener_type", listener_type), ("state", "ok")],
652 ),
653 packet_receive_failure: builder.register_debug_counter_with_tags(
654 "component_packets_received_total",
655 [("listener_type", listener_type), ("state", "error")],
656 ),
657 failed_context_resolve_total: builder.register_debug_counter("component_failed_context_resolve_total"),
658 }
659}
660
661#[async_trait]
662impl Source for DogStatsD {
663 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
664 let mut global_shutdown = context.take_shutdown_handle();
665 let mut health = context.take_health_handle();
666
667 let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
668
669 for listener in self.listeners {
671 let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
672
673 let listener_context = ListenerContext {
680 shutdown_handle: listener_shutdown_coordinator.register(),
681 listener,
682 io_buffer_pool: self.io_buffer_pool.clone(),
683 codec: self.codec.clone(),
684 context_resolvers: self.context_resolvers.clone(),
685 additional_tags: self.additional_tags.clone(),
686 };
687
688 spawn_traced_named(
689 task_name,
690 process_listener(context.clone(), listener_context, self.enabled_filter),
691 );
692 }
693
694 health.mark_ready();
695 debug!("DogStatsD source started.");
696
697 loop {
702 select! {
703 _ = &mut global_shutdown => {
704 debug!("Received shutdown signal.");
705 break
706 },
707 _ = health.live() => continue,
708 }
709 }
710
711 debug!("Stopping DogStatsD source...");
712
713 listener_shutdown_coordinator.shutdown().await;
714
715 debug!("DogStatsD source stopped.");
716
717 Ok(())
718 }
719}
720
721async fn process_listener(
722 source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
723) {
724 let ListenerContext {
725 shutdown_handle,
726 mut listener,
727 io_buffer_pool,
728 codec,
729 context_resolvers,
730 additional_tags,
731 } = listener_context;
732 tokio::pin!(shutdown_handle);
733
734 let listen_addr = listener.listen_address().clone();
735 let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
736
737 info!(%listen_addr, "DogStatsD listener started.");
738
739 loop {
740 select! {
741 _ = &mut shutdown_handle => {
742 debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
743 break;
744 }
745 result = listener.accept() => match result {
746 Ok(stream) => {
747 debug!(%listen_addr, "Spawning new stream handler.");
748
749 let handler_context = HandlerContext {
750 listen_addr: listen_addr.clone(),
751 framer: get_framer(&listen_addr),
752 codec: codec.clone(),
753 io_buffer_pool: io_buffer_pool.clone(),
754 metrics: build_metrics(&listen_addr, source_context.component_context()),
755 context_resolvers: context_resolvers.clone(),
756 additional_tags: additional_tags.clone(),
757 };
758
759 let task_name = format!("dogstatsd-stream-handler-{}", listen_addr.listener_type());
760 spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
761 }
762 Err(e) => {
763 error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
764 break
765 }
766 }
767 }
768 }
769
770 stream_shutdown_coordinator.shutdown().await;
771
772 info!(%listen_addr, "DogStatsD listener stopped.");
773}
774
775async fn process_stream(
776 stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
777 shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
778) {
779 tokio::pin!(shutdown_handle);
780
781 select! {
782 _ = &mut shutdown_handle => {
783 debug!("Stream handler received shutdown signal.");
784 },
785 _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
786 }
787}
788
789async fn drive_stream(
790 mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
791 enabled_filter: EnablePayloadsFilter,
792) {
793 let HandlerContext {
794 listen_addr,
795 mut framer,
796 codec,
797 io_buffer_pool,
798 metrics,
799 mut context_resolvers,
800 additional_tags,
801 } = handler_context;
802
803 debug!(%listen_addr, "Stream handler started.");
804
805 if !stream.is_connectionless() {
806 metrics.connections_active().increment(1);
807 }
808
809 let mut buffer_flush = interval(Duration::from_millis(100));
812 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
813
814 let mut event_buffer_manager = EventBufferManager::default();
815 let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
816 let memory_limiter = source_context.topology_context().memory_limiter();
817
818 'read: loop {
819 let mut eof = false;
820
821 let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
822
823 memory_limiter.wait_for_capacity().await;
824
825 select! {
826 read_result = stream.receive(&mut io_buffer) => match read_result {
828 Ok((bytes_read, peer_addr)) => {
829 if bytes_read == 0 {
830 eof = true;
831 }
832
833 metrics.packet_receive_success().increment(1);
843 metrics.bytes_received().increment(bytes_read as u64);
844 metrics.bytes_received_size().record(bytes_read as f64);
845
846 let reached_eof = eof || stream.is_connectionless();
852
853 trace!(
854 buffer_len = io_buffer.remaining(),
855 buffer_cap = io_buffer.remaining_mut(),
856 eof = reached_eof,
857 %listen_addr,
858 %peer_addr,
859 "Received {} bytes from stream.",
860 bytes_read
861 );
862
863 let mut frames = io_buffer.framed(&mut framer, reached_eof);
864 'frame: loop {
865 match frames.next() {
866 Some(Ok(frame)) => {
867 trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
868 match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
869 Ok(Some(event)) => {
870 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
871 debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
872 dispatch_events(event_buffer, &source_context, &listen_addr).await;
873 }
874 },
875 Ok(None) => {
876 continue
881 },
882 Err(e) => {
883 let frame_lossy_str = String::from_utf8_lossy(&frame);
884 warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
885 },
886 }
887 }
888 Some(Err(e)) => {
889 metrics.framing_errors().increment(1);
890
891 if stream.is_connectionless() {
892 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
895 continue 'read;
896 } else {
897 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
898 break 'read;
899 }
900 }
901 None => {
902 trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
903 if eof && !stream.is_connectionless() {
904 debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
905 break 'read;
906 } else {
907 break 'frame;
908 }
909 }
910 }
911 }
912 },
913 Err(e) => {
914 metrics.packet_receive_failure().increment(1);
915
916 if stream.is_connectionless() {
917 warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
920 continue 'read;
921 } else {
922 warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
923 break 'read;
924 }
925 }
926 },
927
928 _ = buffer_flush.tick() => {
929 if let Some(event_buffer) = event_buffer_manager.consume() {
930 dispatch_events(event_buffer, &source_context, &listen_addr).await;
931 }
932 },
933 }
934 }
935
936 if let Some(event_buffer) = event_buffer_manager.consume() {
937 dispatch_events(event_buffer, &source_context, &listen_addr).await;
938 }
939
940 metrics.connections_active().decrement(1);
941
942 debug!(%listen_addr, "Stream handler stopped.");
943}
944
945fn handle_frame(
946 frame: &[u8], codec: &DogstatsdCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
947 peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
948) -> Result<Option<Event>, ParseError> {
949 let parsed = match codec.decode_packet(frame) {
950 Ok(parsed) => parsed,
951 Err(e) => {
952 match parse_message_type(frame) {
954 MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
955 MessageType::Event => source_metrics.event_decode_failed().increment(1),
956 MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
957 }
958
959 return Err(e);
960 }
961 };
962
963 let event = match parsed {
964 ParsedPacket::Metric(metric_packet) => {
965 let events_len = metric_packet.num_points;
966 if !enabled_filter.allow_metric(&metric_packet) {
967 trace!(
968 metric.name = metric_packet.metric_name,
969 "Skipping metric due to filter configuration."
970 );
971 return Ok(None);
972 }
973
974 match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
975 Some(metric) => {
976 source_metrics.metrics_received().increment(events_len);
977 Event::Metric(metric)
978 }
979 None => {
980 source_metrics.failed_context_resolve_total().increment(1);
982 return Ok(None);
983 }
984 }
985 }
986 ParsedPacket::Event(event) => {
987 if !enabled_filter.allow_event(&event) {
988 trace!("Skipping event {} due to filter configuration.", event.title);
989 return Ok(None);
990 }
991 let tags_resolver = context_resolvers.tags();
992 match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
993 Some(event) => {
994 source_metrics.events_received().increment(1);
995 Event::EventD(event)
996 }
997 None => {
998 source_metrics.failed_context_resolve_total().increment(1);
999 return Ok(None);
1000 }
1001 }
1002 }
1003 ParsedPacket::ServiceCheck(service_check) => {
1004 if !enabled_filter.allow_service_check(&service_check) {
1005 trace!(
1006 "Skipping service check {} due to filter configuration.",
1007 service_check.name
1008 );
1009 return Ok(None);
1010 }
1011 let tags_resolver = context_resolvers.tags();
1012 match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
1013 Some(service_check) => {
1014 source_metrics.service_checks_received().increment(1);
1015 Event::ServiceCheck(service_check)
1016 }
1017 None => {
1018 source_metrics.failed_context_resolve_total().increment(1);
1019 return Ok(None);
1020 }
1021 }
1022 }
1023 };
1024
1025 Ok(Some(event))
1026}
1027
1028fn handle_metric_packet(
1029 packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1030 additional_tags: &[String],
1031) -> Option<Metric> {
1032 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1033
1034 let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1035 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1036 origin.set_process_id(creds.pid as u32);
1037 }
1038
1039 let context_resolver = if packet.timestamp.is_some() {
1041 context_resolvers.no_agg()
1042 } else {
1043 context_resolvers.primary()
1044 };
1045
1046 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1047
1048 match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1050 Some(context) => {
1051 let metric_origin = well_known_tags
1052 .jmx_check_name
1053 .map(MetricOrigin::jmx_check)
1054 .unwrap_or_else(MetricOrigin::dogstatsd);
1055 let metadata = MetricMetadata::default()
1056 .with_origin(metric_origin)
1057 .with_hostname(well_known_tags.hostname.map(Arc::from));
1058
1059 Some(Metric::from_parts(context, packet.values, metadata))
1060 }
1061 None => None,
1063 }
1064}
1065
1066fn handle_event_packet(
1067 packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1068) -> Option<EventD> {
1069 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1070
1071 let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1072 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1073 origin.set_process_id(creds.pid as u32);
1074 }
1075 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1076
1077 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1078 let tags = tags_resolver.create_tag_set(tags)?;
1079
1080 let eventd = EventD::new(packet.title, packet.text)
1081 .with_timestamp(packet.timestamp)
1082 .with_hostname(packet.hostname.map(|s| s.into()))
1083 .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1084 .with_alert_type(packet.alert_type)
1085 .with_priority(packet.priority)
1086 .with_source_type_name(packet.source_type_name.map(|s| s.into()))
1087 .with_alert_type(packet.alert_type)
1088 .with_tags(tags)
1089 .with_origin_tags(origin_tags);
1090
1091 Some(eventd)
1092}
1093
1094fn handle_service_check_packet(
1095 packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1096 additional_tags: &[String],
1097) -> Option<ServiceCheck> {
1098 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1099
1100 let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1101 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1102 origin.set_process_id(creds.pid as u32);
1103 }
1104 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1105
1106 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1107 let tags = tags_resolver.create_tag_set(tags)?;
1108
1109 let service_check = ServiceCheck::new(packet.name, packet.status)
1110 .with_timestamp(packet.timestamp)
1111 .with_hostname(packet.hostname.map(|s| s.into()))
1112 .with_tags(tags)
1113 .with_origin_tags(origin_tags)
1114 .with_message(packet.message.map(|s| s.into()));
1115
1116 Some(service_check)
1117}
1118
1119fn get_filtered_tags_iterator<'a>(
1120 raw_tags: RawTags<'a>, additional_tags: &'a [String],
1121) -> impl Iterator<Item = &'a str> + Clone {
1122 RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1125}
1126
1127async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1128 debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1129
1130 if event_buffer.has_event_type(EventType::EventD) {
1140 let eventd_events = event_buffer.extract(Event::is_eventd);
1141 if let Err(e) = source_context
1142 .dispatcher()
1143 .buffered_named("events")
1144 .expect("events output should always exist")
1145 .send_all(eventd_events)
1146 .await
1147 {
1148 error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1149 }
1150 }
1151
1152 if event_buffer.has_event_type(EventType::ServiceCheck) {
1154 let service_check_events = event_buffer.extract(Event::is_service_check);
1155 if let Err(e) = source_context
1156 .dispatcher()
1157 .buffered_named("service_checks")
1158 .expect("service checks output should always exist")
1159 .send_all(service_check_events)
1160 .await
1161 {
1162 error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1163 }
1164 }
1165
1166 if !event_buffer.is_empty() {
1168 if let Err(e) = source_context
1169 .dispatcher()
1170 .dispatch_named("metrics", event_buffer)
1171 .await
1172 {
1173 error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1174 }
1175 }
1176}
1177
1178const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1179 buffer_size + 4
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198 use std::net::SocketAddr;
1199
1200 use saluki_context::{ContextResolverBuilder, TagsResolverBuilder};
1201 use saluki_io::{
1202 deser::codec::dogstatsd::{DogstatsdCodec, DogstatsdCodecConfiguration, ParsedPacket},
1203 net::ConnectionAddress,
1204 };
1205
1206 use super::{handle_metric_packet, ContextResolvers};
1207
1208 #[test]
1209 fn no_metrics_when_interner_full_allocations_disallowed() {
1210 let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1218 let tags_resolver = TagsResolverBuilder::for_tests().build();
1219 let context_resolver = ContextResolverBuilder::for_tests()
1220 .with_heap_allocations(false)
1221 .with_tags_resolver(Some(tags_resolver.clone()))
1222 .build();
1223 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1224 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1225
1226 let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1227
1228 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1229 panic!("Failed to parse packet.");
1230 };
1231
1232 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1233 assert!(maybe_metric.is_none());
1234 }
1235
1236 #[test]
1237 fn metric_with_additional_tags() {
1238 let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1239 let tags_resolver = TagsResolverBuilder::for_tests().build();
1240 let context_resolver = ContextResolverBuilder::for_tests()
1241 .with_heap_allocations(false)
1242 .with_tags_resolver(Some(tags_resolver.clone()))
1243 .build();
1244 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1245 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1246
1247 let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1248 let existing_tags_str = existing_tags.join(",");
1249
1250 let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1251 let additional_tags = [
1252 "tag4:value4".to_string(),
1253 "tag5:value5".to_string(),
1254 "tag6:value6".to_string(),
1255 ];
1256
1257 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1258 panic!("Failed to parse packet.");
1259 };
1260 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1261 assert!(maybe_metric.is_some());
1262
1263 let metric = maybe_metric.unwrap();
1264 let context = metric.context();
1265
1266 for tag in existing_tags {
1267 assert!(context.tags().has_tag(tag));
1268 }
1269
1270 for tag in additional_tags {
1271 assert!(context.tags().has_tag(tag));
1272 }
1273 }
1274}