1use std::sync::{Arc, LazyLock};
2use std::time::Duration;
3
4use async_trait::async_trait;
5use bytes::{Buf, BufMut};
6use bytesize::ByteSize;
7use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
8use metrics::{Counter, Gauge, Histogram};
9use saluki_common::task::spawn_traced_named;
10use saluki_config::GenericConfiguration;
11use saluki_context::tags::{RawTags, RawTagsFilter};
12use saluki_context::TagsResolver;
13use saluki_core::data_model::event::eventd::EventD;
14use saluki_core::data_model::event::metric::{MetricMetadata, MetricOrigin};
15use saluki_core::data_model::event::service_check::ServiceCheck;
16use saluki_core::data_model::event::{metric::Metric, Event, EventType};
17use saluki_core::topology::EventsBuffer;
18use saluki_core::{
19 components::{sources::*, ComponentContext},
20 observability::ComponentMetricsExt as _,
21 pooling::FixedSizeObjectPool,
22 topology::{
23 interconnect::EventBufferManager,
24 shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle},
25 OutputDefinition,
26 },
27};
28use saluki_env::WorkloadProvider;
29use saluki_error::{generic_error, ErrorContext as _, GenericError};
30use saluki_io::deser::codec::dogstatsd::{EventPacket, ServiceCheckPacket};
31use saluki_io::{
32 buf::{BytesBuffer, FixedSizeVec},
33 deser::{
34 codec::{
35 dogstatsd::{parse_message_type, MessageType, MetricPacket, ParseError, ParsedPacket},
36 DogstatsdCodec, DogstatsdCodecConfiguration,
37 },
38 framing::FramerExt as _,
39 },
40 net::{
41 listener::{Listener, ListenerError},
42 ConnectionAddress, ListenAddress, Stream,
43 },
44};
45use saluki_metrics::MetricsBuilder;
46use serde::Deserialize;
47use snafu::{ResultExt as _, Snafu};
48use tokio::{
49 select,
50 time::{interval, MissedTickBehavior},
51};
52use tracing::{debug, error, info, trace, warn};
53
54mod framer;
55use self::framer::{get_framer, DsdFramer};
56use crate::sources::dogstatsd::tags::{WellKnownTags, WellKnownTagsFilterPredicate};
57
58mod filters;
59use self::filters::EnablePayloadsFilter;
60
61mod io_buffer;
62use self::io_buffer::IoBufferManager;
63
64mod origin;
65use self::origin::{
66 origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, DogStatsDOriginTagResolver,
67 OriginEnrichmentConfiguration,
68};
69
70mod resolver;
71use self::resolver::ContextResolvers;
72
73mod tags;
74
75#[derive(Debug, Snafu)]
76#[snafu(context(suffix(false)))]
77enum Error {
78 #[snafu(display("Failed to create {} listener: {}", listener_type, source))]
79 FailedToCreateListener {
80 listener_type: &'static str,
81 source: ListenerError,
82 },
83
84 #[snafu(display("No listeners configured. Please specify a port (`dogstatsd_port`) or a socket path (`dogstatsd_socket` or `dogstatsd_stream_socket`) to enable a listener."))]
85 NoListenersConfigured,
86}
87
88const fn default_buffer_size() -> usize {
89 8192
90}
91
92const fn default_buffer_count() -> usize {
93 128
94}
95
96const fn default_port() -> u16 {
97 8125
98}
99
100const fn default_tcp_port() -> u16 {
101 0
102}
103
104const fn default_allow_context_heap_allocations() -> bool {
105 true
106}
107
108const fn default_no_aggregation_pipeline_support() -> bool {
109 true
110}
111
112const fn default_context_string_interner_size() -> ByteSize {
113 ByteSize::mib(2)
114}
115
116const fn default_cached_contexts_limit() -> usize {
117 500_000
118}
119
120const fn default_cached_tagsets_limit() -> usize {
121 500_000
122}
123
124const fn default_dogstatsd_permissive_decoding() -> bool {
125 true
126}
127
128const fn default_enable_payloads_series() -> bool {
129 true
130}
131
132const fn default_enable_payloads_sketches() -> bool {
133 true
134}
135
136const fn default_enable_payloads_events() -> bool {
137 true
138}
139
140const fn default_enable_payloads_service_checks() -> bool {
141 true
142}
143
144#[derive(Deserialize)]
148pub struct DogStatsDConfiguration {
149 #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")]
155 buffer_size: usize,
156
157 #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")]
165 buffer_count: usize,
166
167 #[serde(rename = "dogstatsd_port", default = "default_port")]
173 port: u16,
174
175 #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")]
181 tcp_port: u16,
182
183 #[serde(rename = "dogstatsd_socket")]
189 socket_path: Option<String>,
190
191 #[serde(rename = "dogstatsd_stream_socket")]
197 socket_stream_path: Option<String>,
198
199 #[serde(rename = "dogstatsd_non_local_traffic", default)]
206 non_local_traffic: bool,
207
208 #[serde(
218 rename = "dogstatsd_allow_context_heap_allocs",
219 default = "default_allow_context_heap_allocations"
220 )]
221 allow_context_heap_allocations: bool,
222
223 #[serde(
231 rename = "dogstatsd_no_aggregation_pipeline",
232 default = "default_no_aggregation_pipeline_support"
233 )]
234 no_aggregation_pipeline_support: bool,
235
236 #[serde(
242 rename = "dogstatsd_string_interner_size",
243 default = "default_context_string_interner_size"
244 )]
245 context_string_interner_bytes: ByteSize,
246
247 #[serde(
255 rename = "dogstatsd_cached_contexts_limit",
256 default = "default_cached_contexts_limit"
257 )]
258 cached_contexts_limit: usize,
259
260 #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
268 cached_tagsets_limit: usize,
269
270 #[serde(
277 rename = "dogstatsd_permissive_decoding",
278 default = "default_dogstatsd_permissive_decoding"
279 )]
280 permissive_decoding: bool,
281
282 #[serde(default = "default_enable_payloads_series")]
286 enable_payloads_series: bool,
287
288 #[serde(default = "default_enable_payloads_sketches")]
292 enable_payloads_sketches: bool,
293
294 #[serde(default = "default_enable_payloads_events")]
298 enable_payloads_events: bool,
299
300 #[serde(default = "default_enable_payloads_service_checks")]
304 enable_payloads_service_checks: bool,
305
306 #[serde(flatten, default)]
308 origin_enrichment: OriginEnrichmentConfiguration,
309
310 #[serde(skip)]
312 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
313
314 #[serde(rename = "dogstatsd_tags", default)]
316 additional_tags: Vec<String>,
317}
318
319impl DogStatsDConfiguration {
320 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
322 Ok(config.as_typed()?)
323 }
324
325 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
331 where
332 W: WorkloadProvider + Send + Sync + 'static,
333 {
334 self.workload_provider = Some(Arc::new(workload_provider));
335 self
336 }
337
338 async fn build_listeners(&self) -> Result<Vec<Listener>, Error> {
339 let mut listeners = Vec::new();
340
341 if self.port != 0 {
342 let address = if self.non_local_traffic {
343 ListenAddress::Udp(([0, 0, 0, 0], self.port).into())
344 } else {
345 ListenAddress::Udp(([127, 0, 0, 1], self.port).into())
346 };
347
348 let listener = Listener::from_listen_address(address)
349 .await
350 .context(FailedToCreateListener { listener_type: "UDP" })?;
351 listeners.push(listener);
352 }
353
354 if self.tcp_port != 0 {
355 let address = if self.non_local_traffic {
356 ListenAddress::Tcp(([0, 0, 0, 0], self.tcp_port).into())
357 } else {
358 ListenAddress::Tcp(([127, 0, 0, 1], self.tcp_port).into())
359 };
360
361 let listener = Listener::from_listen_address(address)
362 .await
363 .context(FailedToCreateListener { listener_type: "TCP" })?;
364 listeners.push(listener);
365 }
366
367 if let Some(socket_path) = &self.socket_path {
368 let address = ListenAddress::Unixgram(socket_path.into());
369 let listener = Listener::from_listen_address(address)
370 .await
371 .context(FailedToCreateListener {
372 listener_type: "UDS (datagram)",
373 })?;
374 listeners.push(listener);
375 }
376
377 if let Some(socket_stream_path) = &self.socket_stream_path {
378 let address = ListenAddress::Unix(socket_stream_path.into());
379 let listener = Listener::from_listen_address(address)
380 .await
381 .context(FailedToCreateListener {
382 listener_type: "UDS (stream)",
383 })?;
384 listeners.push(listener);
385 }
386
387 Ok(listeners)
388 }
389}
390
391#[async_trait]
392impl SourceBuilder for DogStatsDConfiguration {
393 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
394 let listeners = self.build_listeners().await?;
395 if listeners.is_empty() {
396 return Err(Error::NoListenersConfigured.into());
397 }
398
399 if self.buffer_count < listeners.len() {
402 return Err(generic_error!(
403 "Must have a minimum of {} I/O buffers based on the number of listeners configured.",
404 listeners.len()
405 ));
406 }
407
408 let maybe_origin_tags_resolver = self
409 .workload_provider
410 .clone()
411 .map(|provider| DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider));
412 let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver)
413 .error_context("Failed to create context resolvers.")?;
414
415 let codec_config = DogstatsdCodecConfiguration::default()
416 .with_timestamps(self.no_aggregation_pipeline_support)
417 .with_permissive_mode(self.permissive_decoding);
418
419 let codec = DogstatsdCodec::from_configuration(codec_config);
420
421 let enable_payloads_filter = EnablePayloadsFilter::default()
422 .with_allow_series(self.enable_payloads_series)
423 .with_allow_sketches(self.enable_payloads_sketches)
424 .with_allow_events(self.enable_payloads_events)
425 .with_allow_service_checks(self.enable_payloads_service_checks);
426
427 Ok(Box::new(DogStatsD {
428 listeners,
429 io_buffer_pool: FixedSizeObjectPool::with_builder("dsd_packet_bufs", self.buffer_count, || {
430 FixedSizeVec::with_capacity(get_adjusted_buffer_size(self.buffer_size))
431 }),
432 codec,
433 context_resolvers,
434 enabled_filter: enable_payloads_filter,
435 additional_tags: self.additional_tags.clone().into(),
436 }))
437 }
438
439 fn outputs(&self) -> &[OutputDefinition] {
440 static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
441 vec![
442 OutputDefinition::named_output("metrics", EventType::Metric),
443 OutputDefinition::named_output("events", EventType::EventD),
444 OutputDefinition::named_output("service_checks", EventType::ServiceCheck),
445 ]
446 });
447
448 &OUTPUTS
449 }
450}
451
452impl MemoryBounds for DogStatsDConfiguration {
453 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
454 builder
455 .minimum()
456 .with_single_value::<DogStatsD>("source struct")
458 .with_expr(UsageExpr::product(
460 "buffers",
461 UsageExpr::config("dogstatsd_buffer_count", self.buffer_count),
462 UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)),
463 ))
464 .with_expr(UsageExpr::config(
467 "dogstatsd_string_interner_size",
468 self.context_string_interner_bytes.as_u64() as usize,
469 ));
470 }
471}
472
473pub struct DogStatsD {
475 listeners: Vec<Listener>,
476 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
477 codec: DogstatsdCodec,
478 context_resolvers: ContextResolvers,
479 enabled_filter: EnablePayloadsFilter,
480 additional_tags: Arc<[String]>,
481}
482
483struct ListenerContext {
484 shutdown_handle: DynamicShutdownHandle,
485 listener: Listener,
486 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
487 codec: DogstatsdCodec,
488 context_resolvers: ContextResolvers,
489 additional_tags: Arc<[String]>,
490}
491
492struct HandlerContext {
493 listen_addr: ListenAddress,
494 framer: DsdFramer,
495 codec: DogstatsdCodec,
496 io_buffer_pool: FixedSizeObjectPool<BytesBuffer>,
497 metrics: Metrics,
498 context_resolvers: ContextResolvers,
499 additional_tags: Arc<[String]>,
500}
501
502struct Metrics {
503 metrics_received: Counter,
504 events_received: Counter,
505 service_checks_received: Counter,
506 bytes_received: Counter,
507 bytes_received_size: Histogram,
508 framing_errors: Counter,
509 metric_decoder_errors: Counter,
510 event_decoder_errors: Counter,
511 service_check_decoder_errors: Counter,
512 failed_context_resolve_total: Counter,
513 connections_active: Gauge,
514 packet_receive_success: Counter,
515 packet_receive_failure: Counter,
516}
517
518impl Metrics {
519 fn metrics_received(&self) -> &Counter {
520 &self.metrics_received
521 }
522
523 fn events_received(&self) -> &Counter {
524 &self.events_received
525 }
526
527 fn service_checks_received(&self) -> &Counter {
528 &self.service_checks_received
529 }
530
531 fn bytes_received(&self) -> &Counter {
532 &self.bytes_received
533 }
534
535 fn bytes_received_size(&self) -> &Histogram {
536 &self.bytes_received_size
537 }
538
539 fn framing_errors(&self) -> &Counter {
540 &self.framing_errors
541 }
542
543 fn metric_decode_failed(&self) -> &Counter {
544 &self.metric_decoder_errors
545 }
546
547 fn event_decode_failed(&self) -> &Counter {
548 &self.event_decoder_errors
549 }
550
551 fn service_check_decode_failed(&self) -> &Counter {
552 &self.service_check_decoder_errors
553 }
554
555 fn failed_context_resolve_total(&self) -> &Counter {
556 &self.failed_context_resolve_total
557 }
558
559 fn connections_active(&self) -> &Gauge {
560 &self.connections_active
561 }
562
563 fn packet_receive_success(&self) -> &Counter {
564 &self.packet_receive_success
565 }
566
567 fn packet_receive_failure(&self) -> &Counter {
568 &self.packet_receive_failure
569 }
570}
571
572fn build_metrics(listen_addr: &ListenAddress, component_context: &ComponentContext) -> Metrics {
573 let builder = MetricsBuilder::from_component_context(component_context);
574
575 let listener_type = match listen_addr {
576 ListenAddress::Tcp(_) => "tcp",
577 ListenAddress::Udp(_) => "udp",
578 ListenAddress::Unix(_) => "unix",
579 ListenAddress::Unixgram(_) => "unixgram",
580 };
581
582 Metrics {
583 metrics_received: builder.register_counter_with_tags(
584 "component_events_received_total",
585 [("message_type", "metrics"), ("listener_type", listener_type)],
586 ),
587 events_received: builder.register_counter_with_tags(
588 "component_events_received_total",
589 [("message_type", "events"), ("listener_type", listener_type)],
590 ),
591 service_checks_received: builder.register_counter_with_tags(
592 "component_events_received_total",
593 [("message_type", "service_checks"), ("listener_type", listener_type)],
594 ),
595 bytes_received: builder
596 .register_counter_with_tags("component_bytes_received_total", [("listener_type", listener_type)]),
597 bytes_received_size: builder
598 .register_trace_histogram_with_tags("component_bytes_received_size", [("listener_type", listener_type)]),
599 framing_errors: builder.register_counter_with_tags(
600 "component_errors_total",
601 [("listener_type", listener_type), ("error_type", "framing")],
602 ),
603 metric_decoder_errors: builder.register_counter_with_tags(
604 "component_errors_total",
605 [
606 ("listener_type", listener_type),
607 ("error_type", "decode"),
608 ("message_type", "metrics"),
609 ],
610 ),
611 event_decoder_errors: builder.register_counter_with_tags(
612 "component_errors_total",
613 [
614 ("listener_type", listener_type),
615 ("error_type", "decode"),
616 ("message_type", "events"),
617 ],
618 ),
619 service_check_decoder_errors: builder.register_counter_with_tags(
620 "component_errors_total",
621 [
622 ("listener_type", listener_type),
623 ("error_type", "decode"),
624 ("message_type", "service_checks"),
625 ],
626 ),
627 connections_active: builder
628 .register_gauge_with_tags("component_connections_active", [("listener_type", listener_type)]),
629 packet_receive_success: builder.register_debug_counter_with_tags(
630 "component_packets_received_total",
631 [("listener_type", listener_type), ("state", "ok")],
632 ),
633 packet_receive_failure: builder.register_debug_counter_with_tags(
634 "component_packets_received_total",
635 [("listener_type", listener_type), ("state", "error")],
636 ),
637 failed_context_resolve_total: builder.register_debug_counter("component_failed_context_resolve_total"),
638 }
639}
640
641#[async_trait]
642impl Source for DogStatsD {
643 async fn run(mut self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
644 let mut global_shutdown = context.take_shutdown_handle();
645 let mut health = context.take_health_handle();
646
647 let mut listener_shutdown_coordinator = DynamicShutdownCoordinator::default();
648
649 for listener in self.listeners {
651 let task_name = format!("dogstatsd-listener-{}", listener.listen_address().listener_type());
652
653 let listener_context = ListenerContext {
660 shutdown_handle: listener_shutdown_coordinator.register(),
661 listener,
662 io_buffer_pool: self.io_buffer_pool.clone(),
663 codec: self.codec.clone(),
664 context_resolvers: self.context_resolvers.clone(),
665 additional_tags: self.additional_tags.clone(),
666 };
667
668 spawn_traced_named(
669 task_name,
670 process_listener(context.clone(), listener_context, self.enabled_filter),
671 );
672 }
673
674 health.mark_ready();
675 debug!("DogStatsD source started.");
676
677 loop {
682 select! {
683 _ = &mut global_shutdown => {
684 debug!("Received shutdown signal.");
685 break
686 },
687 _ = health.live() => continue,
688 }
689 }
690
691 debug!("Stopping DogStatsD source...");
692
693 listener_shutdown_coordinator.shutdown().await;
694
695 debug!("DogStatsD source stopped.");
696
697 Ok(())
698 }
699}
700
701async fn process_listener(
702 source_context: SourceContext, listener_context: ListenerContext, enabled_filter: EnablePayloadsFilter,
703) {
704 let ListenerContext {
705 shutdown_handle,
706 mut listener,
707 io_buffer_pool,
708 codec,
709 context_resolvers,
710 additional_tags,
711 } = listener_context;
712 tokio::pin!(shutdown_handle);
713
714 let listen_addr = listener.listen_address().clone();
715 let mut stream_shutdown_coordinator = DynamicShutdownCoordinator::default();
716
717 info!(%listen_addr, "DogStatsD listener started.");
718
719 loop {
720 select! {
721 _ = &mut shutdown_handle => {
722 debug!(%listen_addr, "Received shutdown signal. Waiting for existing stream handlers to finish...");
723 break;
724 }
725 result = listener.accept() => match result {
726 Ok(stream) => {
727 debug!(%listen_addr, "Spawning new stream handler.");
728
729 let handler_context = HandlerContext {
730 listen_addr: listen_addr.clone(),
731 framer: get_framer(&listen_addr),
732 codec: codec.clone(),
733 io_buffer_pool: io_buffer_pool.clone(),
734 metrics: build_metrics(&listen_addr, source_context.component_context()),
735 context_resolvers: context_resolvers.clone(),
736 additional_tags: additional_tags.clone(),
737 };
738
739 let task_name = format!("dogstatsd-stream-handler-{}", listen_addr.listener_type());
740 spawn_traced_named(task_name, process_stream(stream, source_context.clone(), handler_context, stream_shutdown_coordinator.register(), enabled_filter));
741 }
742 Err(e) => {
743 error!(%listen_addr, error = %e, "Failed to accept connection. Stopping listener.");
744 break
745 }
746 }
747 }
748 }
749
750 stream_shutdown_coordinator.shutdown().await;
751
752 info!(%listen_addr, "DogStatsD listener stopped.");
753}
754
755async fn process_stream(
756 stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
757 shutdown_handle: DynamicShutdownHandle, enabled_filter: EnablePayloadsFilter,
758) {
759 tokio::pin!(shutdown_handle);
760
761 select! {
762 _ = &mut shutdown_handle => {
763 debug!("Stream handler received shutdown signal.");
764 },
765 _ = drive_stream(stream, source_context, handler_context, enabled_filter) => {},
766 }
767}
768
769async fn drive_stream(
770 mut stream: Stream, source_context: SourceContext, handler_context: HandlerContext,
771 enabled_filter: EnablePayloadsFilter,
772) {
773 let HandlerContext {
774 listen_addr,
775 mut framer,
776 codec,
777 io_buffer_pool,
778 metrics,
779 mut context_resolvers,
780 additional_tags,
781 } = handler_context;
782
783 debug!(%listen_addr, "Stream handler started.");
784
785 if !stream.is_connectionless() {
786 metrics.connections_active().increment(1);
787 }
788
789 let mut buffer_flush = interval(Duration::from_millis(100));
792 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
793
794 let mut event_buffer_manager = EventBufferManager::default();
795 let mut io_buffer_manager = IoBufferManager::new(&io_buffer_pool, &stream);
796 let memory_limiter = source_context.topology_context().memory_limiter();
797
798 'read: loop {
799 let mut eof = false;
800
801 let mut io_buffer = io_buffer_manager.get_buffer_mut().await;
802
803 memory_limiter.wait_for_capacity().await;
804
805 select! {
806 read_result = stream.receive(&mut io_buffer) => match read_result {
808 Ok((bytes_read, peer_addr)) => {
809 if bytes_read == 0 {
810 eof = true;
811 }
812
813 metrics.packet_receive_success().increment(1);
823 metrics.bytes_received().increment(bytes_read as u64);
824 metrics.bytes_received_size().record(bytes_read as f64);
825
826 let reached_eof = eof || stream.is_connectionless();
832
833 trace!(
834 buffer_len = io_buffer.remaining(),
835 buffer_cap = io_buffer.remaining_mut(),
836 eof = reached_eof,
837 %listen_addr,
838 %peer_addr,
839 "Received {} bytes from stream.",
840 bytes_read
841 );
842
843 let mut frames = io_buffer.framed(&mut framer, reached_eof);
844 'frame: loop {
845 match frames.next() {
846 Some(Ok(frame)) => {
847 trace!(%listen_addr, %peer_addr, ?frame, "Decoded frame.");
848 match handle_frame(&frame[..], &codec, &mut context_resolvers, &metrics, &peer_addr, enabled_filter, &additional_tags) {
849 Ok(Some(event)) => {
850 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
851 debug!(%listen_addr, %peer_addr, "Event buffer is full. Forwarding events.");
852 dispatch_events(event_buffer, &source_context, &listen_addr).await;
853 }
854 },
855 Ok(None) => {
856 continue
861 },
862 Err(e) => {
863 let frame_lossy_str = String::from_utf8_lossy(&frame);
864 warn!(%listen_addr, %peer_addr, frame = %frame_lossy_str, error = %e, "Failed to parse frame.");
865 },
866 }
867 }
868 Some(Err(e)) => {
869 metrics.framing_errors().increment(1);
870
871 if stream.is_connectionless() {
872 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Continuing stream.");
875 continue 'read;
876 } else {
877 debug!(%listen_addr, %peer_addr, error = %e, "Error decoding frame. Stopping stream.");
878 break 'read;
879 }
880 }
881 None => {
882 trace!(%listen_addr, %peer_addr, "Not enough data to decode another frame.");
883 if eof && !stream.is_connectionless() {
884 debug!(%listen_addr, %peer_addr, "Stream received EOF. Shutting down handler.");
885 break 'read;
886 } else {
887 break 'frame;
888 }
889 }
890 }
891 }
892 },
893 Err(e) => {
894 metrics.packet_receive_failure().increment(1);
895
896 if stream.is_connectionless() {
897 warn!(%listen_addr, error = %e, "I/O error while decoding. Continuing stream.");
900 continue 'read;
901 } else {
902 warn!(%listen_addr, error = %e, "I/O error while decoding. Stopping stream.");
903 break 'read;
904 }
905 }
906 },
907
908 _ = buffer_flush.tick() => {
909 if let Some(event_buffer) = event_buffer_manager.consume() {
910 dispatch_events(event_buffer, &source_context, &listen_addr).await;
911 }
912 },
913 }
914 }
915
916 if let Some(event_buffer) = event_buffer_manager.consume() {
917 dispatch_events(event_buffer, &source_context, &listen_addr).await;
918 }
919
920 metrics.connections_active().decrement(1);
921
922 debug!(%listen_addr, "Stream handler stopped.");
923}
924
925fn handle_frame(
926 frame: &[u8], codec: &DogstatsdCodec, context_resolvers: &mut ContextResolvers, source_metrics: &Metrics,
927 peer_addr: &ConnectionAddress, enabled_filter: EnablePayloadsFilter, additional_tags: &[String],
928) -> Result<Option<Event>, ParseError> {
929 let parsed = match codec.decode_packet(frame) {
930 Ok(parsed) => parsed,
931 Err(e) => {
932 match parse_message_type(frame) {
934 MessageType::MetricSample => source_metrics.metric_decode_failed().increment(1),
935 MessageType::Event => source_metrics.event_decode_failed().increment(1),
936 MessageType::ServiceCheck => source_metrics.service_check_decode_failed().increment(1),
937 }
938
939 return Err(e);
940 }
941 };
942
943 let event = match parsed {
944 ParsedPacket::Metric(metric_packet) => {
945 let events_len = metric_packet.num_points;
946 if !enabled_filter.allow_metric(&metric_packet) {
947 trace!(
948 metric.name = metric_packet.metric_name,
949 "Skipping metric due to filter configuration."
950 );
951 return Ok(None);
952 }
953
954 match handle_metric_packet(metric_packet, context_resolvers, peer_addr, additional_tags) {
955 Some(metric) => {
956 source_metrics.metrics_received().increment(events_len);
957 Event::Metric(metric)
958 }
959 None => {
960 source_metrics.failed_context_resolve_total().increment(1);
962 return Ok(None);
963 }
964 }
965 }
966 ParsedPacket::Event(event) => {
967 if !enabled_filter.allow_event(&event) {
968 trace!("Skipping event {} due to filter configuration.", event.title);
969 return Ok(None);
970 }
971 let tags_resolver = context_resolvers.tags();
972 match handle_event_packet(event, tags_resolver, peer_addr, additional_tags) {
973 Some(event) => {
974 source_metrics.events_received().increment(1);
975 Event::EventD(event)
976 }
977 None => {
978 source_metrics.failed_context_resolve_total().increment(1);
979 return Ok(None);
980 }
981 }
982 }
983 ParsedPacket::ServiceCheck(service_check) => {
984 if !enabled_filter.allow_service_check(&service_check) {
985 trace!(
986 "Skipping service check {} due to filter configuration.",
987 service_check.name
988 );
989 return Ok(None);
990 }
991 let tags_resolver = context_resolvers.tags();
992 match handle_service_check_packet(service_check, tags_resolver, peer_addr, additional_tags) {
993 Some(service_check) => {
994 source_metrics.service_checks_received().increment(1);
995 Event::ServiceCheck(service_check)
996 }
997 None => {
998 source_metrics.failed_context_resolve_total().increment(1);
999 return Ok(None);
1000 }
1001 }
1002 }
1003 };
1004
1005 Ok(Some(event))
1006}
1007
1008fn handle_metric_packet(
1009 packet: MetricPacket, context_resolvers: &mut ContextResolvers, peer_addr: &ConnectionAddress,
1010 additional_tags: &[String],
1011) -> Option<Metric> {
1012 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1013
1014 let mut origin = origin_from_metric_packet(&packet, &well_known_tags);
1015 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1016 origin.set_process_id(creds.pid as u32);
1017 }
1018
1019 let context_resolver = if packet.timestamp.is_some() {
1021 context_resolvers.no_agg()
1022 } else {
1023 context_resolvers.primary()
1024 };
1025
1026 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1027
1028 match context_resolver.resolve(packet.metric_name, tags, Some(origin)) {
1030 Some(context) => {
1031 let metric_origin = well_known_tags
1032 .jmx_check_name
1033 .map(MetricOrigin::jmx_check)
1034 .unwrap_or_else(MetricOrigin::dogstatsd);
1035 let metadata = MetricMetadata::default()
1036 .with_origin(metric_origin)
1037 .with_hostname(well_known_tags.hostname.map(Arc::from));
1038
1039 Some(Metric::from_parts(context, packet.values, metadata))
1040 }
1041 None => None,
1043 }
1044}
1045
1046fn handle_event_packet(
1047 packet: EventPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress, additional_tags: &[String],
1048) -> Option<EventD> {
1049 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1050
1051 let mut origin = origin_from_event_packet(&packet, &well_known_tags);
1052 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1053 origin.set_process_id(creds.pid as u32);
1054 }
1055 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1056
1057 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1058 let tags = tags_resolver.create_tag_set(tags)?;
1059
1060 let eventd = EventD::new(packet.title, packet.text)
1061 .with_timestamp(packet.timestamp)
1062 .with_hostname(packet.hostname.map(|s| s.into()))
1063 .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
1064 .with_alert_type(packet.alert_type)
1065 .with_priority(packet.priority)
1066 .with_source_type_name(packet.source_type_name.map(|s| s.into()))
1067 .with_alert_type(packet.alert_type)
1068 .with_tags(tags)
1069 .with_origin_tags(origin_tags);
1070
1071 Some(eventd)
1072}
1073
1074fn handle_service_check_packet(
1075 packet: ServiceCheckPacket, tags_resolver: &mut TagsResolver, peer_addr: &ConnectionAddress,
1076 additional_tags: &[String],
1077) -> Option<ServiceCheck> {
1078 let well_known_tags = WellKnownTags::from_raw_tags(packet.tags.clone());
1079
1080 let mut origin = origin_from_service_check_packet(&packet, &well_known_tags);
1081 if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
1082 origin.set_process_id(creds.pid as u32);
1083 }
1084 let origin_tags = tags_resolver.resolve_origin_tags(Some(origin));
1085
1086 let tags = get_filtered_tags_iterator(packet.tags, additional_tags);
1087 let tags = tags_resolver.create_tag_set(tags)?;
1088
1089 let service_check = ServiceCheck::new(packet.name, packet.status)
1090 .with_timestamp(packet.timestamp)
1091 .with_hostname(packet.hostname.map(|s| s.into()))
1092 .with_tags(tags)
1093 .with_origin_tags(origin_tags)
1094 .with_message(packet.message.map(|s| s.into()));
1095
1096 Some(service_check)
1097}
1098
1099fn get_filtered_tags_iterator<'a>(
1100 raw_tags: RawTags<'a>, additional_tags: &'a [String],
1101) -> impl Iterator<Item = &'a str> + Clone {
1102 RawTagsFilter::exclude(raw_tags, WellKnownTagsFilterPredicate).chain(additional_tags.iter().map(|s| s.as_str()))
1105}
1106
1107async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &SourceContext, listen_addr: &ListenAddress) {
1108 debug!(%listen_addr, events_len = event_buffer.len(), "Forwarding events.");
1109
1110 if event_buffer.has_event_type(EventType::EventD) {
1120 let eventd_events = event_buffer.extract(Event::is_eventd);
1121 if let Err(e) = source_context
1122 .dispatcher()
1123 .buffered_named("events")
1124 .expect("events output should always exist")
1125 .send_all(eventd_events)
1126 .await
1127 {
1128 error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1129 }
1130 }
1131
1132 if event_buffer.has_event_type(EventType::ServiceCheck) {
1134 let service_check_events = event_buffer.extract(Event::is_service_check);
1135 if let Err(e) = source_context
1136 .dispatcher()
1137 .buffered_named("service_checks")
1138 .expect("service checks output should always exist")
1139 .send_all(service_check_events)
1140 .await
1141 {
1142 error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1143 }
1144 }
1145
1146 if !event_buffer.is_empty() {
1148 if let Err(e) = source_context
1149 .dispatcher()
1150 .dispatch_named("metrics", event_buffer)
1151 .await
1152 {
1153 error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1154 }
1155 }
1156}
1157
1158const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
1159 buffer_size + 4
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178 use std::net::SocketAddr;
1179
1180 use saluki_context::{ContextResolverBuilder, TagsResolverBuilder};
1181 use saluki_io::{
1182 deser::codec::{dogstatsd::ParsedPacket, DogstatsdCodec, DogstatsdCodecConfiguration},
1183 net::ConnectionAddress,
1184 };
1185
1186 use super::{handle_metric_packet, ContextResolvers};
1187
1188 #[test]
1189 fn no_metrics_when_interner_full_allocations_disallowed() {
1190 let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1198 let tags_resolver = TagsResolverBuilder::for_tests().build();
1199 let context_resolver = ContextResolverBuilder::for_tests()
1200 .with_heap_allocations(false)
1201 .with_tags_resolver(Some(tags_resolver.clone()))
1202 .build();
1203 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1204 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1205
1206 let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
1207
1208 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1209 panic!("Failed to parse packet.");
1210 };
1211
1212 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &[]);
1213 assert!(maybe_metric.is_none());
1214 }
1215
1216 #[test]
1217 fn metric_with_additional_tags() {
1218 let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
1219 let tags_resolver = TagsResolverBuilder::for_tests().build();
1220 let context_resolver = ContextResolverBuilder::for_tests()
1221 .with_heap_allocations(false)
1222 .with_tags_resolver(Some(tags_resolver.clone()))
1223 .build();
1224 let mut context_resolvers = ContextResolvers::manual(context_resolver.clone(), context_resolver, tags_resolver);
1225 let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());
1226
1227 let existing_tags = ["tag1:value1", "tag2:value2", "tag3:value3"];
1228 let existing_tags_str = existing_tags.join(",");
1229
1230 let input = format!("test_metric_name:1|c|#{}", existing_tags_str);
1231 let additional_tags = [
1232 "tag4:value4".to_string(),
1233 "tag5:value5".to_string(),
1234 "tag6:value6".to_string(),
1235 ];
1236
1237 let Ok(ParsedPacket::Metric(packet)) = codec.decode_packet(input.as_bytes()) else {
1238 panic!("Failed to parse packet.");
1239 };
1240 let maybe_metric = handle_metric_packet(packet, &mut context_resolvers, &peer_addr, &additional_tags);
1241 assert!(maybe_metric.is_some());
1242
1243 let metric = maybe_metric.unwrap();
1244 let context = metric.context();
1245
1246 for tag in existing_tags {
1247 assert!(context.tags().has_tag(tag));
1248 }
1249
1250 for tag in additional_tags {
1251 assert!(context.tags().has_tag(tag));
1252 }
1253 }
1254}