Skip to main content

saluki_components/destinations/dsd_debug_log/
mod.rs

1use std::{
2    io::Write,
3    path::{Path, PathBuf},
4};
5
6use async_trait::async_trait;
7use bytesize::ByteSize;
8use chrono::{DateTime, Utc};
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use saluki_common::collections::FastHashMap;
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::TagSet;
13use saluki_core::{
14    components::{
15        destinations::{Destination, DestinationBuilder, DestinationContext},
16        ComponentContext,
17    },
18    data_model::event::{metric::Metric, Event, EventType},
19};
20use saluki_error::{generic_error, GenericError};
21use serde::Deserialize;
22use stringtheory::MetaString;
23use tokio::select;
24use tracing::{debug, warn};
25use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
26use tracing_rolling_file::{RollingConditionBase, RollingFileAppenderBase};
27
28const DEFAULT_DOGSTATSD_LOG_FILE_MAX_SIZE: ByteSize = ByteSize::mb(10);
29const DEFAULT_DOGSTATSD_LOG_FILE_MAX_ROLLS: usize = 3;
30const DEBUG_LOG_WRITER_BUFFER_LINES: usize = 4096;
31const DOGSTATSD_METRICS_STATS_ENABLE_KEY: &str = "dogstatsd_metrics_stats_enable";
32
33const fn default_true() -> bool {
34    true
35}
36
37const fn default_log_file_max_size() -> ByteSize {
38    DEFAULT_DOGSTATSD_LOG_FILE_MAX_SIZE
39}
40
41const fn default_log_file_max_rolls() -> usize {
42    DEFAULT_DOGSTATSD_LOG_FILE_MAX_ROLLS
43}
44
45/// Configuration for the DogStatsD debug log destination.
46#[derive(Clone, Debug, Deserialize)]
47#[cfg_attr(test, derive(serde::Serialize))]
48pub struct DogStatsDDebugLogConfiguration {
49    /// Whether DogStatsD metric-level statistics are enabled.
50    ///
51    /// The debug log destination is always present when `dogstatsd_logging_enabled` is `true`,
52    /// but it drops metrics until this runtime flag becomes `true`, and only during that period.
53    ///
54    /// Defaults to `false`.
55    #[serde(rename = "dogstatsd_metrics_stats_enable", default)]
56    metrics_stats_enabled: bool,
57
58    #[serde(skip)]
59    configuration: Option<GenericConfiguration>,
60
61    /// Whether DogStatsD metric-level statistics should also be written to a log file.
62    ///
63    /// This controls whether the destination is added to the topology.
64    /// Defaults to `true`.
65    #[serde(rename = "dogstatsd_logging_enabled", default = "default_true")]
66    logging_enabled: bool,
67
68    /// Path to the DogStatsD debug log file.
69    ///
70    /// This defaults to the platform-specific core Agent DogStatsD stats log path when the configured value is empty.
71    #[serde(rename = "dogstatsd_log_file", default)]
72    log_file: PathBuf,
73
74    /// Maximum size of the active debug log file before rotation.
75    ///
76    /// Defaults to `10Mb`.
77    #[serde(rename = "dogstatsd_log_file_max_size", default = "default_log_file_max_size")]
78    log_file_max_size: ByteSize,
79
80    /// Number of rotated debug log files to keep.
81    ///
82    /// Defaults to `3`.
83    #[serde(rename = "dogstatsd_log_file_max_rolls", default = "default_log_file_max_rolls")]
84    log_file_max_rolls: usize,
85}
86
87#[cfg(test)]
88impl PartialEq for DogStatsDDebugLogConfiguration {
89    fn eq(&self, other: &Self) -> bool {
90        self.metrics_stats_enabled == other.metrics_stats_enabled
91            && self.logging_enabled == other.logging_enabled
92            && self.log_file == other.log_file
93            && self.log_file_max_size == other.log_file_max_size
94            && self.log_file_max_rolls == other.log_file_max_rolls
95    }
96}
97
98/// DogStatsD destination that writes metric debug lines to a rotating file.
99struct DogStatsDDebugLog {
100    log_file: PathBuf,
101    log_file_max_size: ByteSize,
102    log_file_max_rolls: usize,
103    writer: Option<DebugLogWriter>,
104    metrics_stats_enabled: bool,
105    configuration: GenericConfiguration,
106    stats: FastHashMap<ContextNoOrigin, MetricSample>,
107}
108
109struct DebugLogWriter {
110    writer: NonBlocking,
111    _guard: WorkerGuard,
112}
113
114#[derive(Debug, Default)]
115struct MetricSample {
116    count: u64,
117    last_seen: u64,
118}
119
120#[derive(Eq, Hash, PartialEq)]
121struct ContextNoOrigin {
122    name: MetaString,
123    tags: TagSet,
124}
125
126impl DogStatsDDebugLogConfiguration {
127    /// Creates a new `DogStatsDDebugLogConfiguration` from the given configuration.
128    ///
129    /// If `dogstatsd_log_file` is empty, `default_log_file_path` is used.
130    pub fn from_configuration(
131        config: &GenericConfiguration, default_log_file_path: PathBuf,
132    ) -> Result<Self, GenericError> {
133        let mut cfg: Self = config.as_typed()?;
134
135        if cfg.log_file.as_os_str().is_empty() {
136            cfg.log_file = default_log_file_path;
137        }
138
139        if cfg.log_file.to_str().is_none() {
140            return Err(generic_error!(
141                "dogstatsd_log_file must be valid UTF-8, got '{}'",
142                cfg.log_file.display()
143            ));
144        }
145
146        cfg.configuration = Some(config.clone());
147
148        Ok(cfg)
149    }
150
151    /// Returns `true` if the debug log destination should be added to the topology.
152    pub const fn enabled(&self) -> bool {
153        self.logging_enabled
154    }
155
156    /// Returns the DogStatsD debug log file path.
157    pub fn log_file(&self) -> &Path {
158        &self.log_file
159    }
160
161    /// Returns the maximum size of the active debug log file before rotation.
162    pub const fn log_file_max_size(&self) -> ByteSize {
163        self.log_file_max_size
164    }
165
166    /// Returns the number of rotated debug log files to keep.
167    pub const fn log_file_max_rolls(&self) -> usize {
168        self.log_file_max_rolls
169    }
170}
171
172impl DogStatsDDebugLog {
173    fn from_configuration(config: &DogStatsDDebugLogConfiguration) -> Result<Self, GenericError> {
174        let mut destination = Self {
175            log_file: config.log_file.clone(),
176            log_file_max_size: config.log_file_max_size,
177            log_file_max_rolls: config.log_file_max_rolls,
178            writer: None,
179            metrics_stats_enabled: config.metrics_stats_enabled,
180            configuration: config
181                .configuration
182                .clone()
183                .expect("configuration must be set via from_configuration"),
184            stats: FastHashMap::default(),
185        };
186
187        if destination.metrics_stats_enabled {
188            destination.ensure_writer()?;
189        }
190
191        Ok(destination)
192    }
193
194    fn process_metric(&mut self, metric: &Metric) -> Result<(), GenericError> {
195        if !self.metrics_stats_enabled {
196            return Ok(());
197        }
198
199        self.write_metric(metric)
200    }
201
202    fn write_metric(&mut self, metric: &Metric) -> Result<(), GenericError> {
203        self.ensure_writer()?;
204
205        let context = metric.context();
206        let metric_context = ContextNoOrigin {
207            name: context.name().clone(),
208            tags: context.tags().clone(),
209        };
210
211        let timestamp = saluki_common::time::get_coarse_unix_timestamp();
212        let sample = self.stats.entry(metric_context).or_default();
213        sample.count += 1;
214        sample.last_seen = timestamp;
215
216        let writer = self.writer.as_mut().expect("writer should be initialized");
217        writeln!(
218            writer.writer,
219            "Metric Name: {} | Tags: {{{}}} | Count: {} | Last Seen: {}",
220            context.name(),
221            format_tags(context.tags()),
222            sample.count,
223            format_timestamp(sample.last_seen)
224        )
225        .map_err(|e| {
226            generic_error!(
227                "Failed to write to DogStatsD debug log file '{}': {}",
228                self.log_file.display(),
229                e
230            )
231        })
232    }
233
234    fn ensure_writer(&mut self) -> Result<(), GenericError> {
235        if self.writer.is_some() {
236            return Ok(());
237        }
238
239        let appender = RollingFileAppenderBase::new(
240            &self.log_file,
241            RollingConditionBase::new().max_size(self.log_file_max_size.as_u64()),
242            self.log_file_max_rolls,
243        )
244        .map_err(|e| generic_error!("Failed to open dogstatsd_log_file '{}': {}", self.log_file.display(), e))?;
245
246        let (writer, guard) = NonBlockingBuilder::default()
247            .thread_name("dsd-dbg-writer")
248            .buffered_lines_limit(DEBUG_LOG_WRITER_BUFFER_LINES)
249            // Drop debug log lines rather than slow DogStatsD metric ingestion.
250            .lossy(true)
251            .finish(appender);
252
253        self.writer = Some(DebugLogWriter { writer, _guard: guard });
254
255        Ok(())
256    }
257}
258
259#[async_trait]
260impl Destination for DogStatsDDebugLog {
261    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
262        let mut health = context.take_health_handle();
263        health.mark_ready();
264
265        let mut metrics_stats_enabled_watcher =
266            self.configuration.watch_for_updates(DOGSTATSD_METRICS_STATS_ENABLE_KEY);
267
268        loop {
269            select! {
270                _ = health.live() => continue,
271                maybe_events = context.events().next() => match maybe_events {
272                    Some(events) => {
273                        for event in events {
274                            if let Event::Metric(metric) = event {
275                                if let Err(error) = self.process_metric(&metric) {
276                                    warn!(error = %error, "Failed to write DogStatsD debug log line; continuing.");
277                                }
278                            }
279                        }
280                    },
281                    None => break,
282                },
283                (_, maybe_metrics_stats_enabled) = metrics_stats_enabled_watcher.changed::<bool>() => {
284                    if let Some(metrics_stats_enabled) = maybe_metrics_stats_enabled {
285                        self.metrics_stats_enabled = metrics_stats_enabled;
286                        debug!(metrics_stats_enabled, "Updated DogStatsD metrics stats debug logging gate.");
287                    }
288                },
289            }
290        }
291
292        Ok(())
293    }
294}
295
296#[async_trait]
297impl DestinationBuilder for DogStatsDDebugLogConfiguration {
298    fn input_event_type(&self) -> EventType {
299        EventType::Metric
300    }
301
302    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
303        DogStatsDDebugLog::from_configuration(self)
304            .map(|destination| Box::new(destination) as Box<dyn Destination + Send>)
305    }
306}
307
308impl MemoryBounds for DogStatsDDebugLogConfiguration {
309    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
310        builder
311            .minimum()
312            .with_single_value::<DogStatsDDebugLog>("component struct");
313    }
314}
315
316fn format_tags(tags: &TagSet) -> String {
317    let mut formatted = String::new();
318
319    for tag in tags {
320        if !formatted.is_empty() {
321            formatted.push(' ');
322        }
323        formatted.push_str(tag.as_str());
324    }
325
326    formatted
327}
328
329fn format_timestamp(timestamp: u64) -> String {
330    i64::try_from(timestamp)
331        .ok()
332        .and_then(|ts| DateTime::<Utc>::from_timestamp(ts, 0))
333        .map(|dt| dt.format("%Y-%m-%d %H:%M:%S +0000 UTC").to_string())
334        .unwrap_or_else(|| timestamp.to_string())
335}
336
337#[cfg(test)]
338mod tests {
339    use std::{
340        fs,
341        path::{Path, PathBuf},
342    };
343
344    use bytesize::ByteSize;
345    use saluki_context::Context;
346    use saluki_core::data_model::event::metric::Metric;
347    use serde_json::json;
348    use tempfile::tempdir;
349
350    use super::{DogStatsDDebugLog, DogStatsDDebugLogConfiguration};
351    use crate::config_registry::structs;
352    use crate::config_registry::test_support::run_config_smoke_tests;
353
354    fn test_default_log_file_path() -> PathBuf {
355        PathBuf::from("/tmp/default-dogstatsd-stats.log")
356    }
357
358    async fn deser_config(raw_json: &str) -> DogStatsDDebugLogConfiguration {
359        let value = serde_json::from_str(raw_json).expect("test config should be valid JSON");
360        let (config, _) = saluki_config::ConfigurationLoader::for_tests(Some(value), None, false).await;
361
362        DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
363            .expect("DogStatsDDebugLogConfiguration should deserialize")
364    }
365
366    #[tokio::test]
367    async fn defaults_match_core_agent() {
368        let config = deser_config("{}").await;
369
370        assert!(config.enabled());
371        assert!(!config.metrics_stats_enabled);
372        assert!(config.logging_enabled);
373        assert_eq!(config.log_file(), test_default_log_file_path());
374        assert_eq!(config.log_file_max_size(), ByteSize::mb(10));
375        assert_eq!(config.log_file_max_rolls(), 3);
376    }
377
378    #[tokio::test]
379    async fn logging_enabled_controls_topology_wiring() {
380        let config = deser_config(r#"{ "dogstatsd_metrics_stats_enable": true }"#).await;
381        assert!(config.enabled());
382        assert!(config.metrics_stats_enabled);
383
384        let config = deser_config(
385            r#"{
386                "dogstatsd_metrics_stats_enable": true,
387                "dogstatsd_logging_enabled": true
388            }"#,
389        )
390        .await;
391        assert!(config.enabled());
392        assert!(config.metrics_stats_enabled);
393
394        let config = deser_config(
395            r#"{
396                "dogstatsd_metrics_stats_enable": true,
397                "dogstatsd_logging_enabled": false
398            }"#,
399        )
400        .await;
401        assert!(!config.enabled());
402        assert!(config.metrics_stats_enabled);
403
404        let config = deser_config(
405            r#"{
406                "dogstatsd_metrics_stats_enable": false,
407                "dogstatsd_logging_enabled": true
408            }"#,
409        )
410        .await;
411        assert!(config.enabled());
412        assert!(!config.metrics_stats_enabled);
413    }
414
415    #[tokio::test]
416    async fn explicit_log_file_path_is_preserved() {
417        let config = deser_config(r#"{ "dogstatsd_log_file": "/tmp/dsd-debug.log" }"#).await;
418
419        assert_eq!(config.log_file(), std::path::Path::new("/tmp/dsd-debug.log"));
420    }
421
422    #[tokio::test]
423    async fn negative_log_file_max_rolls_is_rejected() {
424        let value = json!({ "dogstatsd_log_file_max_rolls": -1 });
425        let (config, _) = saluki_config::ConfigurationLoader::for_tests(Some(value), None, false).await;
426
427        let result = DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path());
428        assert!(result.is_err());
429    }
430
431    #[tokio::test]
432    async fn smoke_test() {
433        run_config_smoke_tests(structs::DOGSTATSD_DEBUG_LOG_CONFIGURATION, &[], json!({}), |cfg| {
434            DogStatsDDebugLogConfiguration::from_configuration(&cfg, test_default_log_file_path())
435                .expect("DogStatsDDebugLogConfiguration should deserialize")
436        })
437        .await
438    }
439
440    async fn test_config(log_file: PathBuf, max_size: ByteSize, max_rolls: usize) -> DogStatsDDebugLogConfiguration {
441        let (config, _) = saluki_config::ConfigurationLoader::for_tests(
442            Some(json!({
443                "dogstatsd_metrics_stats_enable": true,
444                "dogstatsd_logging_enabled": true,
445                "dogstatsd_log_file": log_file.display().to_string(),
446                "dogstatsd_log_file_max_size": max_size.as_u64(),
447                "dogstatsd_log_file_max_rolls": max_rolls,
448            })),
449            None,
450            false,
451        )
452        .await;
453
454        DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
455            .expect("DogStatsDDebugLogConfiguration should deserialize")
456    }
457
458    fn read_log_files(log_file: &Path, max_rolls: usize) -> String {
459        let mut output = String::new();
460
461        for roll in (0..=max_rolls).rev() {
462            let path = rolled_path(log_file, roll);
463            if path.exists() {
464                output.push_str(&fs::read_to_string(&path).expect("debug log file should be readable"));
465            }
466        }
467
468        output
469    }
470
471    fn rolled_path(log_file: &Path, roll: usize) -> PathBuf {
472        if roll == 0 {
473            log_file.to_path_buf()
474        } else {
475            PathBuf::from(format!("{}.{}", log_file.display(), roll))
476        }
477    }
478
479    fn tagged_metric() -> Metric {
480        let context = Context::from_static_parts("custom.metric", &["env:prod", "service:web"]);
481        Metric::counter(context, 1.0)
482    }
483
484    #[tokio::test]
485    async fn writes_metric_debug_lines_and_updates_count() {
486        let tempdir = tempdir().expect("temporary directory should be created");
487        let log_file = tempdir.path().join("dogstatsd-stats.log");
488        let config = test_config(log_file.clone(), ByteSize::kb(64), 3).await;
489        let metric = tagged_metric();
490
491        let mut destination =
492            DogStatsDDebugLog::from_configuration(&config).expect("debug log destination should be built");
493        destination
494            .write_metric(&metric)
495            .expect("first metric should be written");
496        destination
497            .write_metric(&metric)
498            .expect("second metric should be written");
499        drop(destination);
500
501        let output = read_log_files(&log_file, config.log_file_max_rolls());
502        let lines = output.lines().collect::<Vec<_>>();
503
504        assert_eq!(lines.len(), 2);
505        assert!(lines[0].contains("Metric Name: custom.metric"));
506        assert!(lines[0].contains("Tags: {env:prod service:web}"));
507        assert!(lines[0].contains("Count: 1"));
508        assert!(lines[0].contains("Last Seen: "));
509        assert!(lines[1].contains("Count: 2"));
510    }
511
512    #[tokio::test]
513    async fn dynamically_drops_until_metrics_stats_are_enabled() {
514        use std::time::Duration;
515
516        use saluki_config::dynamic::ConfigUpdate;
517
518        let tempdir = tempdir().expect("temporary directory should be created");
519        let log_file = tempdir.path().join("dogstatsd-stats.log");
520        let (config, sender) = saluki_config::ConfigurationLoader::for_tests(
521            Some(json!({
522                "dogstatsd_log_file": log_file.display().to_string(),
523                "dogstatsd_log_file_max_size": "64kb",
524                "dogstatsd_log_file_max_rolls": 3,
525                "dogstatsd_logging_enabled": true
526            })),
527            None,
528            true,
529        )
530        .await;
531        let sender = sender.expect("dynamic sender should be present");
532        sender
533            .send(ConfigUpdate::Snapshot(json!({})))
534            .await
535            .expect("initial dynamic snapshot should be sent");
536        config.ready().await;
537
538        let dsd_config = DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
539            .expect("DogStatsDDebugLogConfiguration should deserialize");
540        assert!(dsd_config.enabled());
541        assert!(!dsd_config.metrics_stats_enabled);
542
543        let metric = tagged_metric();
544        let mut destination =
545            DogStatsDDebugLog::from_configuration(&dsd_config).expect("debug log destination should be built");
546        let mut watcher = destination
547            .configuration
548            .watch_for_updates(super::DOGSTATSD_METRICS_STATS_ENABLE_KEY);
549        destination
550            .process_metric(&metric)
551            .expect("disabled metric should be dropped cleanly");
552        assert!(!log_file.exists());
553
554        sender
555            .send(ConfigUpdate::Partial {
556                key: "dogstatsd_metrics_stats_enable".to_string(),
557                value: json!(true),
558            })
559            .await
560            .expect("dynamic update should be sent");
561
562        let (_, maybe_enabled) = tokio::time::timeout(Duration::from_secs(2), watcher.changed::<bool>())
563            .await
564            .expect("metrics stats flag should receive enabled update");
565        destination.metrics_stats_enabled = maybe_enabled.expect("metrics stats update should have a new value");
566
567        destination
568            .process_metric(&metric)
569            .expect("enabled metric should be written");
570
571        sender
572            .send(ConfigUpdate::Partial {
573                key: "dogstatsd_metrics_stats_enable".to_string(),
574                value: json!(false),
575            })
576            .await
577            .expect("dynamic update should be sent");
578
579        let (_, maybe_enabled) = tokio::time::timeout(Duration::from_secs(2), watcher.changed::<bool>())
580            .await
581            .expect("metrics stats flag should receive disabled update");
582        destination.metrics_stats_enabled = maybe_enabled.expect("metrics stats update should have a new value");
583
584        destination
585            .process_metric(&metric)
586            .expect("disabled metric should be dropped cleanly");
587        drop(destination);
588
589        let output = read_log_files(&log_file, dsd_config.log_file_max_rolls());
590        let lines = output.lines().collect::<Vec<_>>();
591
592        assert_eq!(lines.len(), 1);
593        assert!(lines[0].contains("Metric Name: custom.metric"));
594        assert!(lines[0].contains("Count: 1"));
595    }
596
597    #[tokio::test]
598    async fn rotates_log_file_at_configured_size() {
599        let tempdir = tempdir().expect("temporary directory should be created");
600        let log_file = tempdir.path().join("dogstatsd-stats.log");
601        let min_debug_line_len =
602            "Metric Name: custom.metric | Tags: {env:prod service:web} | Count: 1 | Last Seen: ".len();
603        let config = test_config(log_file.clone(), ByteSize::b(min_debug_line_len as u64), 2).await;
604        let metric = tagged_metric();
605
606        let mut destination =
607            DogStatsDDebugLog::from_configuration(&config).expect("debug log destination should be built");
608        for _ in 0..12 {
609            destination.write_metric(&metric).expect("metric should be written");
610        }
611        drop(destination);
612
613        assert!(log_file.exists());
614        assert!(rolled_path(&log_file, 1).exists());
615        assert!(rolled_path(&log_file, 2).exists());
616        assert!(!rolled_path(&log_file, 3).exists());
617
618        let output = read_log_files(&log_file, config.log_file_max_rolls());
619        assert!(output.contains("Metric Name: custom.metric"));
620    }
621
622    #[tokio::test]
623    async fn build_error_mentions_log_file_config_key_and_path() {
624        let tempdir = tempdir().expect("temporary directory should be created");
625        let blocked_parent = tempdir.path().join("not-a-directory");
626        fs::write(&blocked_parent, "not a directory").expect("blocking file should be written");
627        let log_file = blocked_parent.join("dogstatsd-stats.log");
628        let config = test_config(log_file.clone(), ByteSize::kb(64), 3).await;
629
630        let err = match DogStatsDDebugLog::from_configuration(&config) {
631            Ok(_) => panic!("build should fail"),
632            Err(err) => err,
633        };
634        let err = err.to_string();
635
636        assert!(err.contains("dogstatsd_log_file"));
637        assert!(err.contains(&log_file.display().to_string()));
638    }
639}